#!/usr/bin/env python3 """ Babycam Flask Webinterface - Live-Stream via MJPEG (rpicam-vid) - WLAN-Verwaltung (scan, connect) - Systemstatus """ import subprocess import re import time import os import threading from flask import Flask, Response, render_template, request, jsonify from flask_sock import Sock app = Flask(__name__) sock = Sock(app) STREAM_WIDTH = 1280 STREAM_HEIGHT = 720 STREAM_FPS = 20 # --------------------------------------------------------------------------- # Kamera-Broadcaster – ein Prozess, N Clients # --------------------------------------------------------------------------- class CameraBroadcaster: """ Startet rpicam-vid einmal in einem Hintergrund-Thread. Alle Clients lesen denselben aktuellen Frame – keine parallelen Kameraprozesse. """ def __init__(self): self._frame: bytes | None = None self._lock = threading.Lock() self._thread: threading.Thread | None = None self._running = False def start(self): if self._thread and self._thread.is_alive(): return self._running = True self._thread = threading.Thread(target=self._capture, daemon=True) self._thread.start() def _capture(self): cmd = [ "rpicam-vid", "-t", "0", "--width", str(STREAM_WIDTH), "--height", str(STREAM_HEIGHT), "--framerate", str(STREAM_FPS), "--codec", "mjpeg", "-o", "-", "--nopreview", ] process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) buf = b"" try: while self._running: chunk = process.stdout.read(4096) if not chunk: break buf += chunk start = buf.find(b"\xff\xd8") end = buf.find(b"\xff\xd9") if start != -1 and end != -1 and end > start: frame = buf[start:end + 2] buf = buf[end + 2:] with self._lock: self._frame = frame finally: process.kill() self._running = False def get_frame(self) -> bytes | None: with self._lock: return self._frame def generate(self): """Generator für einen MJPEG-Client.""" boundary = b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" last_frame = None while True: frame = self.get_frame() if frame and frame is not last_frame: last_frame = frame yield boundary + frame + b"\r\n" else: time.sleep(1 / STREAM_FPS) camera = CameraBroadcaster() camera.start() @app.route("/stream") def stream(): return Response( camera.generate(), mimetype="multipart/x-mixed-replace; boundary=frame", ) # --------------------------------------------------------------------------- # Audio-Stream via WebSocket (Raw PCM → Web Audio API) # Minimale Latenz: kein HTTP-Buffering, kein Codec-Delay # --------------------------------------------------------------------------- AUDIO_SAMPLERATE = 16000 AUDIO_CHUNK = 512 # ~32ms bei 16kHz def find_audio_device() -> str: """Findet die USB-Soundkarte dynamisch anhand des Namens.""" try: with open("/proc/asound/cards") as f: for line in f: # Zeile mit Kartennummer: " 1 [Device ]: USB-Audio ..." m = re.match(r"^\s*(\d+)\s+\[.*\].*USB", line) if m: return f"plughw:{m.group(1)},0" except Exception: pass return "plughw:1,0" # Fallback @sock.route("/audio-ws") def audio_ws(ws): device = find_audio_device() process = subprocess.Popen( [ "arecord", "-D", device, "-f", "S16_LE", "-r", str(AUDIO_SAMPLERATE), "-c", "1", ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) try: while True: chunk = process.stdout.read(AUDIO_CHUNK) if not chunk: break ws.send(chunk) except Exception: pass finally: process.kill() # --------------------------------------------------------------------------- # WLAN-Verwaltung # --------------------------------------------------------------------------- def get_wifi_networks(): result = subprocess.run( ["nmcli", "-t", "-f", "SSID,SIGNAL,SECURITY", "device", "wifi", "list", "ifname", "wlan0"], capture_output=True, text=True, ) networks = [] seen = set() for line in result.stdout.splitlines(): parts = line.split(":") if len(parts) >= 2: ssid = parts[0].strip() if ssid and ssid not in seen: seen.add(ssid) signal = parts[1] if len(parts) > 1 else "0" security = parts[2] if len(parts) > 2 else "" networks.append({"ssid": ssid, "signal": signal, "security": security}) return sorted(networks, key=lambda x: int(x["signal"]) if x["signal"].isdigit() else 0, reverse=True) def get_system_status(): # IP-Adressen ip_result = subprocess.run(["hostname", "-I"], capture_output=True, text=True) ips = ip_result.stdout.strip().split() # Aktive WLAN-Verbindung wifi_result = subprocess.run( ["nmcli", "-t", "-f", "GENERAL.CONNECTION", "device", "show", "wlan0"], capture_output=True, text=True, ) wifi_match = re.search(r"GENERAL\.CONNECTION:(.*)", wifi_result.stdout) wifi_ssid = wifi_match.group(1).strip() if wifi_match else "nicht verbunden" # AP-Status ap_result = subprocess.run(["systemctl", "is-active", "hostapd"], capture_output=True, text=True) ap_active = ap_result.stdout.strip() == "active" # Kamera verfügbar? cam_result = subprocess.run(["rpicam-hello", "--list-cameras"], capture_output=True, text=True) cam_available = "No cameras" not in cam_result.stderr and "No cameras" not in cam_result.stdout return { "ips": ips, "wifi_ssid": wifi_ssid, "ap_active": ap_active, "ap_override": get_ap_override(), "cam_available": cam_available, "uptime": _get_uptime(), } def _get_uptime(): try: with open("/proc/uptime") as f: seconds = float(f.read().split()[0]) m, s = divmod(int(seconds), 60) h, m = divmod(m, 60) return f"{h}h {m}m" except Exception: return "unbekannt" # --------------------------------------------------------------------------- # Routen # --------------------------------------------------------------------------- @app.route("/") def index(): status = get_system_status() return render_template("index.html", status=status) @app.route("/live") def live(): status = get_system_status() return render_template("live.html", status=status) @app.route("/api/status") def api_status(): return jsonify(get_system_status()) @app.route("/wifi/scan") def wifi_scan(): # Kurz neu scannen subprocess.run(["nmcli", "device", "wifi", "rescan", "ifname", "wlan0"], capture_output=True) time.sleep(2) return jsonify(get_wifi_networks()) @app.route("/ap/", methods=["POST"]) def ap_control(action): override_file = "/var/lib/babycam/ap-override" if action == "on": with open(override_file, "w") as f: f.write("on") return jsonify({"success": True, "mode": "on"}) elif action == "off": with open(override_file, "w") as f: f.write("off") return jsonify({"success": True, "mode": "off"}) elif action == "auto": try: os.remove(override_file) except FileNotFoundError: pass return jsonify({"success": True, "mode": "auto"}) return jsonify({"success": False, "error": "Unbekannte Aktion"}), 400 def get_ap_override(): try: with open("/var/lib/babycam/ap-override") as f: return f.read().strip().lower() except FileNotFoundError: return "auto" @app.route("/wifi/connect", methods=["POST"]) def wifi_connect(): data = request.get_json() ssid = data.get("ssid", "").strip() password = data.get("password", "").strip() if not ssid: return jsonify({"success": False, "error": "SSID fehlt"}), 400 cmd = ["nmcli", "device", "wifi", "connect", ssid, "ifname", "wlan0"] if password: cmd += ["password", password] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) success = result.returncode == 0 return jsonify({ "success": success, "message": result.stdout.strip() or result.stderr.strip(), }) # --------------------------------------------------------------------------- # Start # --------------------------------------------------------------------------- if __name__ == "__main__": app.run(host="0.0.0.0", port=80, debug=False, threaded=True)