init commit

This commit is contained in:
2025-04-19 17:09:13 +02:00
commit 5ff83ecaeb
39 changed files with 2020 additions and 0 deletions

62
server/database.py Normal file
View File

@@ -0,0 +1,62 @@
import sqlite3
from contextlib import closing
DB_FILE = 'lights.db'
DEFAULT_LIGHTS = ['green', 'orange', 'red']
def connect():
"""Get a new connection to the database."""
return sqlite3.connect(DB_FILE)
def setup_database():
"""Initializes the database and sets default states for lights."""
with closing(connect()) as conn:
with conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS light_state (
name TEXT PRIMARY KEY,
state INTEGER NOT NULL CHECK (state IN (0, 1))
)
''')
# Insert default lights with state=0 if they do not exist
for light in DEFAULT_LIGHTS:
conn.execute('''
INSERT OR IGNORE INTO light_state (name, state)
VALUES (?, 0)
''', (light,))
def ensure_database():
"""Ensures the DB is initialized. Safe to call multiple times."""
setup_database()
def set_light_state(name, state):
ensure_database()
with closing(connect()) as conn:
with conn:
conn.execute('''
INSERT INTO light_state (name, state)
VALUES (?, ?)
ON CONFLICT(name) DO UPDATE SET state = excluded.state
''', (name, state))
def get_light_state(name):
ensure_database()
with closing(connect()) as conn:
cur = conn.cursor()
cur.execute('SELECT state FROM light_state WHERE name = ?', (name,))
row = cur.fetchone()
return row[0] if row else 0
def get_all_states():
ensure_database()
with closing(connect()) as conn:
cur = conn.cursor()
cur.execute('SELECT name, state FROM light_state')
return dict(cur.fetchall())

73
server/light.py Normal file
View File

@@ -0,0 +1,73 @@
import RPi.GPIO as GPIO
import time
import database
FUNCTIONS = ['green', 'orange', 'red']
class Light:
def __init__(self, pin, name):
self.pin = pin
self.name = name
self.state = database.get_light_state(name)
self.setup()
def setup(self):
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.pin, GPIO.OUT)
self._set(self.state)
def _set(self, state):
if state == self.state:
print('%s is equal %s' % (state, self.state,))
print('setting %s state to: %s' % (self.name, state,))
GPIO.output(self.pin, GPIO.LOW if state == GPIO.HIGH else GPIO.HIGH)
self.state = state
database.set_light_state(self.name, state)
def on(self):
self._set(GPIO.HIGH)
def off(self):
self._set(GPIO.LOW)
def flash(self, count=2, interval=0.2):
for _ in range(count):
self.on()
time.sleep(interval)
self.off()
time.sleep(interval)
class Tower:
def __init__(self, pinList):
self.pins = pinList
self.setupLights()
self.green: Light
self.orange: Light
self.red: Light
def setupLights(self):
if len(FUNCTIONS) != len(self.pins):
raise ValueError("only %s pins provided, but %s available" %
(len(self.pins), len(FUNCTIONS)))
for pin in self.pins:
light = Light(pin['pin'], pin['name'])
if light.name == 'green':
self.green = light
elif light.name == 'orange':
self.orange = light
elif light.name == 'red':
self.red = light
def on(self):
self.green.on()
self.orange.on()
self.red.on()
def off(self):
self.green.off()
self.orange.off()
self.red.off()

100
server/main.py Normal file
View File

@@ -0,0 +1,100 @@
import time
import argparse
import RPi.GPIO as GPIO
from light import Tower
import database
from flask import Flask, jsonify, request
# Pins configuration (order: green, orange, red)
GPIO_PINS = [{'name': 'green', 'pin': 22},
{'name': 'orange', 'pin': 17},
{'name': 'red', 'pin': 27}]
# Create a Flask app
app = Flask(__name__)
# Initialize the database
database.ensure_database()
# Initialize the tower (lights)
tower = Tower(GPIO_PINS)
@app.after_request
def add_cors_headers(response):
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type'
return response
@app.route('/api/state/set', methods=['POST'])
def set_state():
data = request.data.decode('utf-8')
state = [int(n.strip()) for n in data.split(',') if n.strip().isdigit()]
"""Endpoint to get the current state of all lights."""
tower.green.on() if state[0] else tower.green.off()
tower.orange.on() if state[1] else tower.orange.off()
tower.red.on() if state[2] else tower.red.off()
return jsonify(state)
@app.route('/api/state', methods=['GET'])
def get_state():
"""Endpoint to get the current state of all lights."""
state = database.get_all_states()
return jsonify(state)
@app.route('/api/toggle/<color>', methods=['POST'])
def toggle_light(color):
"""Endpoint to toggle the state of a specified light."""
if color not in ['green', 'orange', 'red']:
return jsonify({'error': 'Invalid color'}), 404
# Get the light object based on the color
light = getattr(tower, color)
# Toggle light state (on/off)
new_state = GPIO.LOW if light.state == GPIO.HIGH else GPIO.HIGH
light._set(new_state) # Update the light state in the system
return jsonify({color: new_state})
def run(tower):
"""Run the tower with manual light control."""
try:
time.sleep(0.5)
while True:
print('on')
tower.green.on()
tower.red.off()
tower.orange.off()
time.sleep(10)
except KeyboardInterrupt:
print("\nInterrupted by user. Cleaning up...")
finally:
GPIO.cleanup()
def main():
parser = argparse.ArgumentParser(
description="Control lights or run server.")
parser.add_argument('--server', action='store_true',
help='Run the Flask server to control lights via API')
args = parser.parse_args()
if args.server:
print("Starting Flask server...")
app.run(host='0.0.0.0', port=5000)
else:
print("Running tower control manually...")
run(tower)
if __name__ == '__main__':
main()

1
server/requirements.txt Normal file
View File

@@ -0,0 +1 @@
Flask==3.1.0