184 lines
5.2 KiB
Python
184 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Babycam Flask Webinterface
|
|
- Live-Stream via MJPEG (rpicam-vid)
|
|
- WLAN-Verwaltung (scan, connect)
|
|
- Systemstatus
|
|
"""
|
|
|
|
import subprocess
|
|
import json
|
|
import re
|
|
import time
|
|
from flask import Flask, Response, render_template, request, jsonify
|
|
|
|
app = Flask(__name__)
|
|
|
|
STREAM_WIDTH = 1280
|
|
STREAM_HEIGHT = 720
|
|
STREAM_FPS = 20
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Kamera-Stream
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def generate_frames():
|
|
cmd = [
|
|
"rpicam-vid",
|
|
"-t", "0",
|
|
"--width", str(STREAM_WIDTH),
|
|
"--height", str(STREAM_HEIGHT),
|
|
"--framerate", str(STREAM_FPS),
|
|
"--codec", "mjpeg",
|
|
"-o", "-",
|
|
"--nopreview",
|
|
]
|
|
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
|
|
|
|
boundary = b"--frame\r\nContent-Type: image/jpeg\r\n\r\n"
|
|
buf = b""
|
|
|
|
try:
|
|
while True:
|
|
chunk = process.stdout.read(4096)
|
|
if not chunk:
|
|
break
|
|
buf += chunk
|
|
|
|
start = buf.find(b"\xff\xd8")
|
|
end = buf.find(b"\xff\xd9")
|
|
|
|
if start != -1 and end != -1 and end > start:
|
|
frame = buf[start:end + 2]
|
|
buf = buf[end + 2:]
|
|
yield boundary + frame + b"\r\n"
|
|
finally:
|
|
process.kill()
|
|
|
|
|
|
@app.route("/stream")
|
|
def stream():
|
|
return Response(
|
|
generate_frames(),
|
|
mimetype="multipart/x-mixed-replace; boundary=frame",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WLAN-Verwaltung
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_wifi_networks():
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "SSID,SIGNAL,SECURITY", "device", "wifi", "list", "ifname", "wlan0"],
|
|
capture_output=True, text=True,
|
|
)
|
|
networks = []
|
|
seen = set()
|
|
for line in result.stdout.splitlines():
|
|
parts = line.split(":")
|
|
if len(parts) >= 2:
|
|
ssid = parts[0].strip()
|
|
if ssid and ssid not in seen:
|
|
seen.add(ssid)
|
|
signal = parts[1] if len(parts) > 1 else "0"
|
|
security = parts[2] if len(parts) > 2 else ""
|
|
networks.append({"ssid": ssid, "signal": signal, "security": security})
|
|
return sorted(networks, key=lambda x: int(x["signal"]) if x["signal"].isdigit() else 0, reverse=True)
|
|
|
|
|
|
def get_system_status():
|
|
# IP-Adressen
|
|
ip_result = subprocess.run(["hostname", "-I"], capture_output=True, text=True)
|
|
ips = ip_result.stdout.strip().split()
|
|
|
|
# Aktive WLAN-Verbindung
|
|
wifi_result = subprocess.run(
|
|
["nmcli", "-t", "-f", "GENERAL.CONNECTION", "device", "show", "wlan0"],
|
|
capture_output=True, text=True,
|
|
)
|
|
wifi_match = re.search(r"GENERAL\.CONNECTION:(.*)", wifi_result.stdout)
|
|
wifi_ssid = wifi_match.group(1).strip() if wifi_match else "nicht verbunden"
|
|
|
|
# AP-Status
|
|
ap_result = subprocess.run(["systemctl", "is-active", "hostapd"], capture_output=True, text=True)
|
|
ap_active = ap_result.stdout.strip() == "active"
|
|
|
|
# Kamera verfügbar?
|
|
cam_result = subprocess.run(["rpicam-hello", "--list-cameras"], capture_output=True, text=True)
|
|
cam_available = "No cameras" not in cam_result.stderr and "No cameras" not in cam_result.stdout
|
|
|
|
return {
|
|
"ips": ips,
|
|
"wifi_ssid": wifi_ssid,
|
|
"ap_active": ap_active,
|
|
"cam_available": cam_available,
|
|
"uptime": _get_uptime(),
|
|
}
|
|
|
|
|
|
def _get_uptime():
|
|
try:
|
|
with open("/proc/uptime") as f:
|
|
seconds = float(f.read().split()[0])
|
|
m, s = divmod(int(seconds), 60)
|
|
h, m = divmod(m, 60)
|
|
return f"{h}h {m}m"
|
|
except Exception:
|
|
return "unbekannt"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routen
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.route("/")
|
|
def index():
|
|
status = get_system_status()
|
|
return render_template("index.html", status=status)
|
|
|
|
|
|
@app.route("/api/status")
|
|
def api_status():
|
|
return jsonify(get_system_status())
|
|
|
|
|
|
@app.route("/wifi/scan")
|
|
def wifi_scan():
|
|
# Kurz neu scannen
|
|
subprocess.run(["nmcli", "device", "wifi", "rescan", "ifname", "wlan0"],
|
|
capture_output=True)
|
|
time.sleep(2)
|
|
return jsonify(get_wifi_networks())
|
|
|
|
|
|
@app.route("/wifi/connect", methods=["POST"])
|
|
def wifi_connect():
|
|
data = request.get_json()
|
|
ssid = data.get("ssid", "").strip()
|
|
password = data.get("password", "").strip()
|
|
|
|
if not ssid:
|
|
return jsonify({"success": False, "error": "SSID fehlt"}), 400
|
|
|
|
cmd = ["nmcli", "device", "wifi", "connect", ssid, "ifname", "wlan0"]
|
|
if password:
|
|
cmd += ["password", password]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
success = result.returncode == 0
|
|
|
|
return jsonify({
|
|
"success": success,
|
|
"message": result.stdout.strip() or result.stderr.strip(),
|
|
})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Start
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=80, debug=False)
|