babycam/app/main.py

258 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Babycam Flask Webinterface
- Live-Stream via MJPEG (rpicam-vid)
- WLAN-Verwaltung (scan, connect)
- Systemstatus
"""
import subprocess
import re
import time
import os
import threading
from flask import Flask, Response, render_template, request, jsonify
app = Flask(__name__)
STREAM_WIDTH = 1280
STREAM_HEIGHT = 720
STREAM_FPS = 20
# ---------------------------------------------------------------------------
# Kamera-Broadcaster ein Prozess, N Clients
# ---------------------------------------------------------------------------
class CameraBroadcaster:
"""
Startet rpicam-vid einmal in einem Hintergrund-Thread.
Alle Clients lesen denselben aktuellen Frame keine parallelen Kameraprozesse.
"""
def __init__(self):
self._frame: bytes | None = None
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
self._running = False
def start(self):
if self._thread and self._thread.is_alive():
return
self._running = True
self._thread = threading.Thread(target=self._capture, daemon=True)
self._thread.start()
def _capture(self):
cmd = [
"rpicam-vid",
"-t", "0",
"--width", str(STREAM_WIDTH),
"--height", str(STREAM_HEIGHT),
"--framerate", str(STREAM_FPS),
"--codec", "mjpeg",
"-o", "-",
"--nopreview",
]
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
)
buf = b""
try:
while self._running:
chunk = process.stdout.read(4096)
if not chunk:
break
buf += chunk
start = buf.find(b"\xff\xd8")
end = buf.find(b"\xff\xd9")
if start != -1 and end != -1 and end > start:
frame = buf[start:end + 2]
buf = buf[end + 2:]
with self._lock:
self._frame = frame
finally:
process.kill()
self._running = False
def get_frame(self) -> bytes | None:
with self._lock:
return self._frame
def generate(self):
"""Generator für einen MJPEG-Client."""
boundary = b"--frame\r\nContent-Type: image/jpeg\r\n\r\n"
last_frame = None
while True:
frame = self.get_frame()
if frame and frame is not last_frame:
last_frame = frame
yield boundary + frame + b"\r\n"
else:
time.sleep(1 / STREAM_FPS)
camera = CameraBroadcaster()
camera.start()
@app.route("/stream")
def stream():
return Response(
camera.generate(),
mimetype="multipart/x-mixed-replace; boundary=frame",
)
# ---------------------------------------------------------------------------
# WLAN-Verwaltung
# ---------------------------------------------------------------------------
def get_wifi_networks():
result = subprocess.run(
["nmcli", "-t", "-f", "SSID,SIGNAL,SECURITY", "device", "wifi", "list", "ifname", "wlan0"],
capture_output=True, text=True,
)
networks = []
seen = set()
for line in result.stdout.splitlines():
parts = line.split(":")
if len(parts) >= 2:
ssid = parts[0].strip()
if ssid and ssid not in seen:
seen.add(ssid)
signal = parts[1] if len(parts) > 1 else "0"
security = parts[2] if len(parts) > 2 else ""
networks.append({"ssid": ssid, "signal": signal, "security": security})
return sorted(networks, key=lambda x: int(x["signal"]) if x["signal"].isdigit() else 0, reverse=True)
def get_system_status():
# IP-Adressen
ip_result = subprocess.run(["hostname", "-I"], capture_output=True, text=True)
ips = ip_result.stdout.strip().split()
# Aktive WLAN-Verbindung
wifi_result = subprocess.run(
["nmcli", "-t", "-f", "GENERAL.CONNECTION", "device", "show", "wlan0"],
capture_output=True, text=True,
)
wifi_match = re.search(r"GENERAL\.CONNECTION:(.*)", wifi_result.stdout)
wifi_ssid = wifi_match.group(1).strip() if wifi_match else "nicht verbunden"
# AP-Status
ap_result = subprocess.run(["systemctl", "is-active", "hostapd"], capture_output=True, text=True)
ap_active = ap_result.stdout.strip() == "active"
# Kamera verfügbar?
cam_result = subprocess.run(["rpicam-hello", "--list-cameras"], capture_output=True, text=True)
cam_available = "No cameras" not in cam_result.stderr and "No cameras" not in cam_result.stdout
return {
"ips": ips,
"wifi_ssid": wifi_ssid,
"ap_active": ap_active,
"ap_override": get_ap_override(),
"cam_available": cam_available,
"uptime": _get_uptime(),
}
def _get_uptime():
try:
with open("/proc/uptime") as f:
seconds = float(f.read().split()[0])
m, s = divmod(int(seconds), 60)
h, m = divmod(m, 60)
return f"{h}h {m}m"
except Exception:
return "unbekannt"
# ---------------------------------------------------------------------------
# Routen
# ---------------------------------------------------------------------------
@app.route("/")
def index():
status = get_system_status()
return render_template("index.html", status=status)
@app.route("/live")
def live():
status = get_system_status()
return render_template("live.html", status=status)
@app.route("/api/status")
def api_status():
return jsonify(get_system_status())
@app.route("/wifi/scan")
def wifi_scan():
# Kurz neu scannen
subprocess.run(["nmcli", "device", "wifi", "rescan", "ifname", "wlan0"],
capture_output=True)
time.sleep(2)
return jsonify(get_wifi_networks())
@app.route("/ap/<action>", methods=["POST"])
def ap_control(action):
override_file = "/var/lib/babycam/ap-override"
if action == "on":
with open(override_file, "w") as f:
f.write("on")
return jsonify({"success": True, "mode": "on"})
elif action == "off":
with open(override_file, "w") as f:
f.write("off")
return jsonify({"success": True, "mode": "off"})
elif action == "auto":
try:
os.remove(override_file)
except FileNotFoundError:
pass
return jsonify({"success": True, "mode": "auto"})
return jsonify({"success": False, "error": "Unbekannte Aktion"}), 400
def get_ap_override():
try:
with open("/var/lib/babycam/ap-override") as f:
return f.read().strip().lower()
except FileNotFoundError:
return "auto"
@app.route("/wifi/connect", methods=["POST"])
def wifi_connect():
data = request.get_json()
ssid = data.get("ssid", "").strip()
password = data.get("password", "").strip()
if not ssid:
return jsonify({"success": False, "error": "SSID fehlt"}), 400
cmd = ["nmcli", "device", "wifi", "connect", ssid, "ifname", "wlan0"]
if password:
cmd += ["password", password]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
success = result.returncode == 0
return jsonify({
"success": success,
"message": result.stdout.strip() or result.stderr.strip(),
})
# ---------------------------------------------------------------------------
# Start
# ---------------------------------------------------------------------------
if __name__ == "__main__":
app.run(host="0.0.0.0", port=80, debug=False, threaded=True)