atlus/backend/display.py
roberts 8d48e3eeb8 App exit diagnostics, desktop file discovery, WiFi wpa_supplicant fix
- Capture stderr from GUI apps and show meaningful exit reasons (signal names,
  error messages) instead of generic "Application exited"
- Add /api/display/discover-apps endpoint that scans .desktop files for
  installed GUI apps, replacing manual-only app configuration
- Settings > Applications now shows discoverable apps with one-click Add,
  with manual form available as fallback
- Fix WiFi: start wpa_supplicant before bringing interface up, handle rfkill
  blocks, add retry logic and error messages to scan results
- Fix WiFi scan frontend bug: response is {networks:[...]} not a bare array
- Add iw and wpasupplicant to install.sh dependencies

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 00:03:13 -05:00

601 lines
21 KiB
Python

"""Atlus — GUI application display manager.
Manages a virtual X11 display (Xvfb) and per-window frame capture
for streaming native GUI applications to the browser via WebSocket.
Each GUI app window is captured independently and streamed as JPEG frames.
"""
import asyncio
import getpass
import logging
import os
import re
import shutil
import signal
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional
log = logging.getLogger("atlus.display")
# ---------------------------------------------------------------------------
# Binary discovery (systemd strips PATH)
# ---------------------------------------------------------------------------
_SEARCH_PATHS = ("/usr/bin", "/usr/local/bin", "/bin", "/usr/sbin", "/sbin")
def _find_bin(name: str) -> Optional[str]:
found = shutil.which(name)
if found:
return found
for d in _SEARCH_PATHS:
p = os.path.join(d, name)
if os.path.isfile(p) and os.access(p, os.X_OK):
return p
return None
XVFB_BIN = _find_bin("Xvfb")
XDOTOOL_BIN = _find_bin("xdotool")
IMPORT_BIN = _find_bin("import") # ImageMagick
HAS_DISPLAY_DEPS = all((XVFB_BIN, XDOTOOL_BIN, IMPORT_BIN))
# Allowed command pattern — alphanumeric + hyphens only
_SAFE_CMD = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._\-]*$")
# Key translation: DOM key names → xdotool key names
_KEY_MAP = {
"Control": "ctrl",
"Shift": "shift",
"Alt": "alt",
"Meta": "super",
"Enter": "Return",
"Backspace": "BackSpace",
"Delete": "Delete",
"Escape": "Escape",
"Tab": "Tab",
"ArrowUp": "Up",
"ArrowDown": "Down",
"ArrowLeft": "Left",
"ArrowRight": "Right",
"Home": "Home",
"End": "End",
"PageUp": "Prior",
"PageDown": "Next",
"Insert": "Insert",
" ": "space",
"F1": "F1", "F2": "F2", "F3": "F3", "F4": "F4",
"F5": "F5", "F6": "F6", "F7": "F7", "F8": "F8",
"F9": "F9", "F10": "F10", "F11": "F11", "F12": "F12",
}
# ---------------------------------------------------------------------------
# ManagedGuiApp — a GUI process with per-window frame capture
# ---------------------------------------------------------------------------
@dataclass
class ManagedGuiApp:
"""A GUI application running on a virtual X display."""
app_id: str
command: str
title: str
display_num: int
process: Optional[asyncio.subprocess.Process] = None
window_id: Optional[int] = None
target_fps: int = 10
created_at: float = field(default_factory=time.time)
last_frame: Optional[bytes] = field(default=None, repr=False)
exit_reason: Optional[str] = field(default=None, repr=False)
_capture_task: Optional[asyncio.Task] = field(default=None, repr=False)
_stderr_task: Optional[asyncio.Task] = field(default=None, repr=False)
_stderr_lines: list = field(default_factory=list, repr=False)
_websockets: list = field(default_factory=list, repr=False)
_streaming: bool = field(default=False, repr=False)
@property
def alive(self) -> bool:
return self.process is not None and self.process.returncode is None
def _display_env(self) -> dict:
env = {**os.environ, "DISPLAY": f":{self.display_num}"}
# Ensure PATH has common binary dirs
path = env.get("PATH", "")
for p in _SEARCH_PATHS:
if p not in path:
path = p + ":" + path
env["PATH"] = path
return env
# ---- WebSocket fan-out ----
def attach_ws(self, ws):
if ws not in self._websockets:
self._websockets.append(ws)
self._streaming = True
def detach_ws(self, ws):
try:
self._websockets.remove(ws)
except ValueError:
pass
if not self._websockets:
self._streaming = False
# ---- Frame capture ----
def start_capture(self):
if self._capture_task is None or self._capture_task.done():
self._capture_task = asyncio.create_task(self._capture_loop())
if self._stderr_task is None or self._stderr_task.done():
self._stderr_task = asyncio.create_task(self._read_stderr())
async def _read_stderr(self):
"""Read stderr in background, keep last N lines for exit diagnostics."""
if not self.process or not self.process.stderr:
return
try:
while True:
line = await self.process.stderr.readline()
if not line:
break
decoded = line.decode(errors="replace").rstrip()
if decoded:
self._stderr_lines.append(decoded)
# Keep only last 20 lines
if len(self._stderr_lines) > 20:
self._stderr_lines = self._stderr_lines[-20:]
except Exception:
pass
async def _capture_loop(self):
"""Background: capture window pixmap → JPEG → fan-out."""
# Wait for window to appear
for attempt in range(20):
if not self.alive:
return
await self._discover_window()
if self.window_id:
break
await asyncio.sleep(0.5 * (1 + attempt * 0.2))
if not self.window_id:
log.warning("No window found for app %s (%s)", self.app_id, self.command)
# Notify any attached viewers
for ws in list(self._websockets):
try:
await ws.send_json({"type": "error", "data": "No window found for application"})
except Exception:
pass
return
log.info("Capture loop started for app %s window %d", self.app_id, self.window_id)
while self.alive:
if not self._streaming or not self._websockets:
await asyncio.sleep(0.5)
continue
try:
frame = await self._capture_frame()
if frame:
self.last_frame = frame
dead = []
for ws in self._websockets:
try:
await ws.send_bytes(frame)
except Exception:
dead.append(ws)
for ws in dead:
self.detach_ws(ws)
except Exception:
log.exception("Capture error for %s", self.app_id)
break
await asyncio.sleep(1.0 / self.target_fps)
# Process exited — build exit reason from stderr + return code
exit_msg = self._build_exit_reason()
self.exit_reason = exit_msg
log.info("App %s exited: %s", self.app_id, exit_msg)
for ws in list(self._websockets):
try:
await ws.send_json({"type": "closed", "data": exit_msg})
except Exception:
pass
log.info("Capture loop ended for app %s", self.app_id)
def _build_exit_reason(self) -> str:
"""Build a human-readable exit reason from stderr output and return code."""
rc = self.process.returncode if self.process else None
# Filter stderr for meaningful error lines
error_hints = []
for line in self._stderr_lines:
low = line.lower()
# Skip noisy Qt/GTK debug lines
if any(skip in low for skip in ("qt.qpa.xcb: qxcb", "libgl", "mesa",
"dbind-warning", "g_dbus", "gtk-warning")):
continue
# Keep lines with error-like keywords
if any(kw in low for kw in ("error", "fatal", "failed", "abort",
"cannot", "could not", "not found",
"no such", "missing", "permission denied",
"segfault", "signal", "killed", "crash")):
error_hints.append(line)
if error_hints:
# Show last few relevant error lines
hint_text = "\n".join(error_hints[-5:])
if rc is not None and rc < 0:
sig = abs(rc)
return f"Killed by signal {sig}\n{hint_text}"
return hint_text
# No meaningful stderr — use return code
if rc is None:
return "Application exited unexpectedly"
if rc == 0:
return "Application closed normally"
if rc < 0:
sig = abs(rc)
sig_names = {6: "SIGABRT", 9: "SIGKILL", 11: "SIGSEGV", 15: "SIGTERM"}
sig_name = sig_names.get(sig, f"signal {sig}")
return f"Crashed ({sig_name})"
return f"Exited with code {rc}"
async def _capture_frame(self) -> Optional[bytes]:
"""Capture window as JPEG using ImageMagick import."""
if not self.window_id or not IMPORT_BIN:
return None
try:
proc = await asyncio.create_subprocess_exec(
IMPORT_BIN, "-window", str(self.window_id),
"-quality", "60", "jpeg:-",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._display_env(),
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=5)
if proc.returncode == 0 and stdout:
return stdout
return None
except asyncio.TimeoutError:
return None
except Exception:
return None
async def _discover_window(self):
"""Find X11 window ID for this app's process."""
if not XDOTOOL_BIN or not self.process:
return
try:
proc = await asyncio.create_subprocess_exec(
XDOTOOL_BIN, "search", "--pid", str(self.process.pid),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._display_env(),
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
lines = stdout.decode().strip().splitlines()
if lines:
self.window_id = int(lines[0])
log.debug("Discovered window %d for app %s (pid %d)",
self.window_id, self.app_id, self.process.pid)
except Exception:
pass
# ---- Input forwarding ----
async def send_input(self, msg: dict):
"""Forward mouse/keyboard input to the X11 window."""
if not self.window_id or not XDOTOOL_BIN:
return
env = self._display_env()
msg_type = msg.get("type")
try:
if msg_type == "mouse":
await self._handle_mouse(msg, env)
elif msg_type == "key":
await self._handle_key(msg, env)
except Exception:
log.debug("Input forwarding error for %s", self.app_id, exc_info=True)
async def _handle_mouse(self, msg: dict, env: dict):
action = msg.get("action", "")
x, y = str(msg.get("x", 0)), str(msg.get("y", 0))
wid = str(self.window_id)
if action == "click":
btn = str(msg.get("button", 1))
await self._xdotool(
"mousemove", "--window", wid, x, y,
"click", "--window", wid, btn,
env=env,
)
elif action == "dblclick":
btn = str(msg.get("button", 1))
await self._xdotool(
"mousemove", "--window", wid, x, y,
"click", "--window", wid, "--repeat", "2", btn,
env=env,
)
elif action == "move":
await self._xdotool(
"mousemove", "--window", wid, x, y,
env=env,
)
elif action == "scroll":
delta = msg.get("delta", 0)
btn = "4" if delta < 0 else "5" # X11: 4=up, 5=down
await self._xdotool(
"mousemove", "--window", wid, x, y,
"click", "--window", wid, btn,
env=env,
)
async def _handle_key(self, msg: dict, env: dict):
action = msg.get("action", "press")
key = msg.get("key", "")
modifiers = msg.get("modifiers", [])
# Translate key name
xkey = _KEY_MAP.get(key, key)
# Skip standalone modifier key events
if xkey in ("ctrl", "shift", "alt", "super"):
return
# Build modifier prefix
mod_parts = []
for m in modifiers:
xmod = _KEY_MAP.get(m, m.lower())
if xmod not in ("ctrl", "shift", "alt", "super"):
continue
mod_parts.append(xmod)
if mod_parts:
xkey = "+".join(mod_parts) + "+" + xkey
wid = str(self.window_id)
if action == "press":
await self._xdotool("key", "--window", wid, xkey, env=env)
# release events handled implicitly by xdotool key
async def _xdotool(self, *args, env=None):
proc = await asyncio.create_subprocess_exec(
XDOTOOL_BIN, *args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
await asyncio.wait_for(proc.communicate(), timeout=5)
# ---- Lifecycle ----
def kill(self):
if self._capture_task and not self._capture_task.done():
self._capture_task.cancel()
if self.alive:
try:
self.process.terminate()
except Exception:
pass
def to_dict(self) -> dict:
d = {
"app_id": self.app_id,
"command": self.command,
"title": self.title,
"alive": self.alive,
"pid": self.process.pid if self.process else None,
"window_id": self.window_id,
"streaming": self._streaming,
"viewers": len(self._websockets),
"created_at": self.created_at,
}
if self.exit_reason:
d["exit_reason"] = self.exit_reason
return d
# ---------------------------------------------------------------------------
# Display Manager — manages Xvfb displays and GUI apps
# ---------------------------------------------------------------------------
class DisplayManager:
"""Manages virtual X displays and GUI application lifecycle."""
def __init__(self):
self._xvfb_procs: dict[str, asyncio.subprocess.Process] = {}
self._display_nums: dict[str, int] = {}
self._apps: dict[str, dict[str, ManagedGuiApp]] = {}
self._next_display = 99
def _require_deps(self):
if not HAS_DISPLAY_DEPS:
missing = []
if not XVFB_BIN:
missing.append("Xvfb")
if not XDOTOOL_BIN:
missing.append("xdotool")
if not IMPORT_BIN:
missing.append("import (ImageMagick)")
raise RuntimeError(f"Missing display dependencies: {', '.join(missing)}")
async def get_or_create_display(self, username: str) -> int:
"""Start Xvfb if needed, return display number."""
self._require_deps()
if username in self._display_nums:
# Check if Xvfb is still running
xvfb = self._xvfb_procs.get(username)
if xvfb and xvfb.returncode is None:
return self._display_nums[username]
# Xvfb died — restart
log.warning("Xvfb died for %s, restarting", username)
display_num = self._next_display
self._next_display += 1
log.info("Starting Xvfb :%d for %s", display_num, username)
xvfb = await asyncio.create_subprocess_exec(
XVFB_BIN, f":{display_num}",
"-screen", "0", "1280x1024x24",
"-ac", # disable access control
"-nolisten", "tcp",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Wait briefly for Xvfb to start
await asyncio.sleep(0.5)
if xvfb.returncode is not None:
raise RuntimeError(f"Xvfb failed to start (rc={xvfb.returncode})")
self._xvfb_procs[username] = xvfb
self._display_nums[username] = display_num
log.info("Xvfb :%d started for %s (pid %d)", display_num, username, xvfb.pid)
return display_num
async def launch_app(
self, username: str, command: str, title: str = "",
args: list[str] | None = None, target_fps: int = 10,
) -> ManagedGuiApp:
"""Launch a GUI application on the user's virtual display."""
# Validate command
if not _SAFE_CMD.match(command):
raise ValueError(f"Invalid command name: {command}")
# Find the binary
cmd_bin = _find_bin(command)
if not cmd_bin:
raise FileNotFoundError(f"Command not found: {command}")
display_num = await self.get_or_create_display(username)
env = {
**os.environ,
"DISPLAY": f":{display_num}",
"HOME": os.path.expanduser("~"),
}
# Ensure PATH
path = env.get("PATH", "")
for p in _SEARCH_PATHS:
if p not in path:
path = p + ":" + path
env["PATH"] = path
cmd = [cmd_bin] + (args or [])
log.info("Launching GUI app: %s (display :%d, user %s)", " ".join(cmd), display_num, username)
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
app_id = str(uuid.uuid4())[:8]
app = ManagedGuiApp(
app_id=app_id,
command=command,
title=title or command,
display_num=display_num,
process=process,
target_fps=target_fps,
)
app.start_capture()
self._apps.setdefault(username, {})[app_id] = app
log.info("GUI app %s launched: %s (pid %d)", app_id, command, process.pid)
return app
def get_app(self, username: str, app_id: str) -> Optional[ManagedGuiApp]:
return self._apps.get(username, {}).get(app_id)
def get_app_by_command(self, username: str, command: str) -> Optional[ManagedGuiApp]:
"""Find a running app by command name."""
for app in self._apps.get(username, {}).values():
if app.command == command and app.alive:
return app
return None
def close_app(self, username: str, app_id: str) -> bool:
user_apps = self._apps.get(username, {})
app = user_apps.pop(app_id, None)
if not app:
return False
app.kill()
log.info("Closed GUI app %s (%s) for %s", app_id, app.command, username)
return True
def list_apps(self, username: str) -> list[dict]:
user_apps = self._apps.get(username, {})
# Prune dead apps
dead = [aid for aid, a in user_apps.items() if not a.alive]
for aid in dead:
app = user_apps.pop(aid)
app.kill()
return [a.to_dict() for a in user_apps.values()]
async def autostart_apps(self, gui_apps: list[dict]):
"""Launch all apps with autostart=True. Called on service startup."""
if not HAS_DISPLAY_DEPS:
log.info("Display deps not available, skipping autostart")
return
username = getpass.getuser()
started = 0
for app_cfg in gui_apps:
if not app_cfg.get("autostart"):
continue
command = app_cfg.get("command", "")
if not command:
continue
# Skip if already running
if self.get_app_by_command(username, command):
log.debug("Autostart skip (already running): %s", command)
continue
try:
await self.launch_app(
username=username,
command=command,
title=app_cfg.get("name", command),
args=app_cfg.get("args", []),
target_fps=app_cfg.get("target_fps", 10),
)
started += 1
log.info("Autostarted: %s", command)
except Exception as e:
log.warning("Failed to autostart %s: %s", command, e)
if started:
log.info("Autostarted %d application(s)", started)
async def shutdown_all(self):
"""Kill all apps and Xvfb displays."""
for username, apps in self._apps.items():
for app in apps.values():
app.kill()
self._apps.clear()
for username, xvfb in self._xvfb_procs.items():
if xvfb.returncode is None:
try:
xvfb.terminate()
await asyncio.wait_for(xvfb.wait(), timeout=5)
except Exception:
xvfb.kill()
self._xvfb_procs.clear()
self._display_nums.clear()
log.info("All displays shut down")
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
display_manager = DisplayManager()