babycam/app/main.py

325 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Babycam Flask Webinterface
- Live-Stream via MJPEG (rpicam-vid)
- WLAN-Verwaltung (scan, connect)
- Systemstatus
"""
import subprocess
import re
import time
import os
import threading
from flask import Flask, Response, render_template, request, jsonify
from flask_sock import Sock
app = Flask(__name__)
sock = Sock(app)
STREAM_WIDTH = 1280
STREAM_HEIGHT = 720
STREAM_FPS = 20
# ---------------------------------------------------------------------------
# Kamera-Broadcaster ein Prozess, N Clients
# ---------------------------------------------------------------------------
class CameraBroadcaster:
"""
Startet rpicam-vid einmal in einem Hintergrund-Thread.
Alle Clients lesen denselben aktuellen Frame keine parallelen Kameraprozesse.
"""
def __init__(self):
self._frame: bytes | None = None
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
self._running = False
def start(self):
if self._thread and self._thread.is_alive():
return
self._running = True
self._thread = threading.Thread(target=self._capture, daemon=True)
self._thread.start()
def _capture(self):
cmd = [
"rpicam-vid",
"-t", "0",
"--width", str(STREAM_WIDTH),
"--height", str(STREAM_HEIGHT),
"--framerate", str(STREAM_FPS),
"--codec", "mjpeg",
"-o", "-",
"--nopreview",
]
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
)
buf = b""
try:
while self._running:
chunk = process.stdout.read(4096)
if not chunk:
break
buf += chunk
start = buf.find(b"\xff\xd8")
end = buf.find(b"\xff\xd9")
if start != -1 and end != -1 and end > start:
frame = buf[start:end + 2]
buf = buf[end + 2:]
with self._lock:
self._frame = frame
finally:
process.kill()
self._running = False
def get_frame(self) -> bytes | None:
with self._lock:
return self._frame
def generate(self):
"""Generator für einen MJPEG-Client."""
boundary = b"--frame\r\nContent-Type: image/jpeg\r\n\r\n"
last_frame = None
while True:
frame = self.get_frame()
if frame and frame is not last_frame:
last_frame = frame
yield boundary + frame + b"\r\n"
else:
time.sleep(1 / STREAM_FPS)
camera = CameraBroadcaster()
camera.start()
@app.route("/stream")
def stream():
return Response(
camera.generate(),
mimetype="multipart/x-mixed-replace; boundary=frame",
)
# ---------------------------------------------------------------------------
# Audio-Stream via WebSocket (Raw PCM → Web Audio API)
# Minimale Latenz: kein HTTP-Buffering, kein Codec-Delay
# ---------------------------------------------------------------------------
AUDIO_SAMPLERATE = 16000
AUDIO_CHUNK = 512 # ~32ms bei 16kHz
def find_audio_device() -> str:
"""Findet die USB-Soundkarte dynamisch anhand des Namens."""
try:
with open("/proc/asound/cards") as f:
for line in f:
# Zeile mit Kartennummer: " 1 [Device ]: USB-Audio ..."
m = re.match(r"^\s*(\d+)\s+\[.*\].*USB", line)
if m:
return f"plughw:{m.group(1)},0"
except Exception:
pass
return "plughw:1,0" # Fallback
@sock.route("/audio-ws")
def audio_ws(ws):
device = find_audio_device()
process = subprocess.Popen(
[
"arecord",
"-D", device,
"-f", "S16_LE",
"-r", str(AUDIO_SAMPLERATE),
"-c", "1",
],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
try:
while True:
chunk = process.stdout.read(AUDIO_CHUNK)
if not chunk:
break
ws.send(chunk)
except Exception:
pass
finally:
process.kill()
# ---------------------------------------------------------------------------
# WLAN-Verwaltung
# ---------------------------------------------------------------------------
def get_wifi_networks():
result = subprocess.run(
["nmcli", "-t", "-f", "SSID,SIGNAL,SECURITY", "device", "wifi", "list", "ifname", "wlan0"],
capture_output=True, text=True,
)
networks = []
seen = set()
for line in result.stdout.splitlines():
parts = line.split(":")
if len(parts) >= 2:
ssid = parts[0].strip()
if ssid and ssid not in seen:
seen.add(ssid)
signal = parts[1] if len(parts) > 1 else "0"
security = parts[2] if len(parts) > 2 else ""
networks.append({"ssid": ssid, "signal": signal, "security": security})
return sorted(networks, key=lambda x: int(x["signal"]) if x["signal"].isdigit() else 0, reverse=True)
def get_system_status():
# IP-Adressen
ip_result = subprocess.run(["hostname", "-I"], capture_output=True, text=True)
ips = ip_result.stdout.strip().split()
# Aktive WLAN-Verbindung
wifi_result = subprocess.run(
["nmcli", "-t", "-f", "GENERAL.CONNECTION", "device", "show", "wlan0"],
capture_output=True, text=True,
)
wifi_match = re.search(r"GENERAL\.CONNECTION:(.*)", wifi_result.stdout)
wifi_ssid = wifi_match.group(1).strip() if wifi_match else "nicht verbunden"
# AP-Status
ap_result = subprocess.run(["systemctl", "is-active", "hostapd"], capture_output=True, text=True)
ap_active = ap_result.stdout.strip() == "active"
# Kamera verfügbar?
cam_result = subprocess.run(["rpicam-hello", "--list-cameras"], capture_output=True, text=True)
cam_available = "No cameras" not in cam_result.stderr and "No cameras" not in cam_result.stdout
return {
"ips": ips,
"wifi_ssid": wifi_ssid,
"ap_active": ap_active,
"ap_override": get_ap_override(),
"cam_available": cam_available,
"uptime": _get_uptime(),
}
def _get_uptime():
try:
with open("/proc/uptime") as f:
seconds = float(f.read().split()[0])
m, s = divmod(int(seconds), 60)
h, m = divmod(m, 60)
return f"{h}h {m}m"
except Exception:
return "unbekannt"
# ---------------------------------------------------------------------------
# Routen
# ---------------------------------------------------------------------------
@app.route("/")
def index():
status = get_system_status()
return render_template("index.html", status=status)
@app.route("/live")
def live():
status = get_system_status()
return render_template("live.html", status=status)
@app.route("/api/status")
def api_status():
return jsonify(get_system_status())
@app.route("/wifi/scan")
def wifi_scan():
# Kurz neu scannen
subprocess.run(["nmcli", "device", "wifi", "rescan", "ifname", "wlan0"],
capture_output=True)
time.sleep(2)
return jsonify(get_wifi_networks())
@app.route("/ap/<action>", methods=["POST"])
def ap_control(action):
override_file = "/var/lib/babycam/ap-override"
if action == "on":
with open(override_file, "w") as f:
f.write("on")
return jsonify({"success": True, "mode": "on"})
elif action == "off":
with open(override_file, "w") as f:
f.write("off")
return jsonify({"success": True, "mode": "off"})
elif action == "auto":
try:
os.remove(override_file)
except FileNotFoundError:
pass
return jsonify({"success": True, "mode": "auto"})
return jsonify({"success": False, "error": "Unbekannte Aktion"}), 400
def get_ap_override():
try:
with open("/var/lib/babycam/ap-override") as f:
return f.read().strip().lower()
except FileNotFoundError:
return "auto"
@app.route("/wifi/connect", methods=["POST"])
def wifi_connect():
data = request.get_json()
ssid = data.get("ssid", "").strip()
password = data.get("password", "").strip()
if not ssid:
return jsonify({"success": False, "error": "SSID fehlt"}), 400
cmd = ["nmcli", "device", "wifi", "connect", ssid, "ifname", "wlan0"]
if password:
cmd += ["password", password]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
success = result.returncode == 0
return jsonify({
"success": success,
"message": result.stdout.strip() or result.stderr.strip(),
})
# ---------------------------------------------------------------------------
# System
# ---------------------------------------------------------------------------
@app.route("/system/shutdown", methods=["POST"])
def system_shutdown():
subprocess.Popen(["sudo", "shutdown", "-h", "now"])
return jsonify({"success": True, "message": "Pi wird heruntergefahren…"})
@app.route("/system/reboot", methods=["POST"])
def system_reboot():
subprocess.Popen(["sudo", "shutdown", "-r", "now"])
return jsonify({"success": True, "message": "Pi wird neu gestartet…"})
# ---------------------------------------------------------------------------
# Start
# ---------------------------------------------------------------------------
if __name__ == "__main__":
app.run(host="0.0.0.0", port=80, debug=False, threaded=True)