"""Atlus — GUI application display manager. Manages a virtual X11 display (Xvfb) and per-window frame capture for streaming native GUI applications to the browser via WebSocket. Each GUI app window is captured independently and streamed as JPEG frames. """ import asyncio import logging import os import re import shutil import signal import time import uuid from dataclasses import dataclass, field from typing import Optional from backend.privdrop import get_user_info, make_preexec_fn log = logging.getLogger("atlus.display") # --------------------------------------------------------------------------- # Binary discovery (systemd strips PATH) # --------------------------------------------------------------------------- _SEARCH_PATHS = ("/usr/bin", "/usr/local/bin", "/bin", "/usr/sbin", "/sbin") def _find_bin(name: str) -> Optional[str]: found = shutil.which(name) if found: return found for d in _SEARCH_PATHS: p = os.path.join(d, name) if os.path.isfile(p) and os.access(p, os.X_OK): return p return None XVFB_BIN = _find_bin("Xvfb") XDOTOOL_BIN = _find_bin("xdotool") IMPORT_BIN = _find_bin("import") # ImageMagick HAS_DISPLAY_DEPS = all((XVFB_BIN, XDOTOOL_BIN, IMPORT_BIN)) # Allowed command pattern — alphanumeric + hyphens only _SAFE_CMD = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._\-]*$") # Key translation: DOM key names → xdotool key names _KEY_MAP = { "Control": "ctrl", "Shift": "shift", "Alt": "alt", "Meta": "super", "Enter": "Return", "Backspace": "BackSpace", "Delete": "Delete", "Escape": "Escape", "Tab": "Tab", "ArrowUp": "Up", "ArrowDown": "Down", "ArrowLeft": "Left", "ArrowRight": "Right", "Home": "Home", "End": "End", "PageUp": "Prior", "PageDown": "Next", "Insert": "Insert", " ": "space", "F1": "F1", "F2": "F2", "F3": "F3", "F4": "F4", "F5": "F5", "F6": "F6", "F7": "F7", "F8": "F8", "F9": "F9", "F10": "F10", "F11": "F11", "F12": "F12", } # --------------------------------------------------------------------------- # ManagedGuiApp — a GUI process with per-window frame capture # --------------------------------------------------------------------------- @dataclass class ManagedGuiApp: """A GUI application running on a virtual X display.""" app_id: str command: str title: str display_num: int process: Optional[asyncio.subprocess.Process] = None window_id: Optional[int] = None target_fps: int = 10 created_at: float = field(default_factory=time.time) last_frame: Optional[bytes] = field(default=None, repr=False) exit_reason: Optional[str] = field(default=None, repr=False) _capture_task: Optional[asyncio.Task] = field(default=None, repr=False) _stderr_task: Optional[asyncio.Task] = field(default=None, repr=False) _stderr_lines: list = field(default_factory=list, repr=False) _websockets: list = field(default_factory=list, repr=False) _streaming: bool = field(default=False, repr=False) _focused: bool = field(default=False, repr=False) @property def alive(self) -> bool: return self.process is not None and self.process.returncode is None def _display_env(self) -> dict: env = {**os.environ, "DISPLAY": f":{self.display_num}"} # Ensure PATH has common binary dirs path = env.get("PATH", "") for p in _SEARCH_PATHS: if p not in path: path = p + ":" + path env["PATH"] = path return env # ---- WebSocket fan-out ---- def attach_ws(self, ws): if ws not in self._websockets: self._websockets.append(ws) self._streaming = True def detach_ws(self, ws): try: self._websockets.remove(ws) except ValueError: pass if not self._websockets: self._streaming = False # ---- Frame capture ---- def start_capture(self): if self._capture_task is None or self._capture_task.done(): self._capture_task = asyncio.create_task(self._capture_loop()) if self._stderr_task is None or self._stderr_task.done(): self._stderr_task = asyncio.create_task(self._read_stderr()) async def _read_stderr(self): """Read stderr in background, keep last N lines for exit diagnostics.""" if not self.process or not self.process.stderr: return try: while True: line = await self.process.stderr.readline() if not line: break decoded = line.decode(errors="replace").rstrip() if decoded: self._stderr_lines.append(decoded) # Keep only last 20 lines if len(self._stderr_lines) > 20: self._stderr_lines = self._stderr_lines[-20:] except Exception: pass async def _capture_loop(self): """Background: capture window pixmap → JPEG → fan-out.""" # Wait for window to appear (up to ~60s for heavy apps like Electron) for attempt in range(60): if not self.alive: log.warning("App %s (%s) exited during window discovery (rc=%s). stderr: %s", self.app_id, self.command, self.process.returncode, "; ".join(self._stderr_lines[-5:]) if self._stderr_lines else "(empty)") # Notify any attached viewers about the crash for ws in list(self._websockets): try: exit_msg = self._build_exit_reason() await ws.send_json({"type": "closed", "data": exit_msg or "Application crashed during startup"}) except Exception: pass return await self._discover_window() if self.window_id: break if attempt % 10 == 9: log.info("Still waiting for window for %s (%s) — attempt %d", self.app_id, self.command, attempt + 1) await asyncio.sleep(1.0) if not self.window_id: log.warning("No window found for app %s (%s) after 60s", self.app_id, self.command) # Notify any attached viewers for ws in list(self._websockets): try: await ws.send_json({"type": "error", "data": "No window found for application"}) except Exception: pass return # Maximize the window to fill the Xvfb display and activate it await self._maximize_window() log.info("Capture loop started for app %s window %d", self.app_id, self.window_id) while self.alive: if not self._streaming or not self._websockets: await asyncio.sleep(0.5) continue try: frame = await self._capture_frame() if frame: self.last_frame = frame dead = [] for ws in self._websockets: try: await ws.send_bytes(frame) except Exception: dead.append(ws) for ws in dead: self.detach_ws(ws) except Exception: log.exception("Capture error for %s", self.app_id) break await asyncio.sleep(1.0 / self.target_fps) # Process exited — build exit reason from stderr + return code exit_msg = self._build_exit_reason() self.exit_reason = exit_msg log.info("App %s exited: %s", self.app_id, exit_msg) for ws in list(self._websockets): try: await ws.send_json({"type": "closed", "data": exit_msg}) except Exception: pass log.info("Capture loop ended for app %s", self.app_id) def _build_exit_reason(self) -> str: """Build a human-readable exit reason from stderr output and return code.""" rc = self.process.returncode if self.process else None # Filter stderr for meaningful error lines error_hints = [] for line in self._stderr_lines: low = line.lower() # Skip noisy Qt/GTK debug lines if any(skip in low for skip in ("qt.qpa.xcb: qxcb", "libgl", "mesa", "dbind-warning", "g_dbus", "gtk-warning")): continue # Keep lines with error-like keywords if any(kw in low for kw in ("error", "fatal", "failed", "abort", "cannot", "could not", "not found", "no such", "missing", "permission denied", "segfault", "signal", "killed", "crash")): error_hints.append(line) if error_hints: # Show last few relevant error lines hint_text = "\n".join(error_hints[-5:]) if rc is not None and rc < 0: sig = abs(rc) return f"Killed by signal {sig}\n{hint_text}" return hint_text # No meaningful stderr — use return code if rc is None: return "Application exited unexpectedly" if rc == 0: return "Application closed normally" if rc < 0: sig = abs(rc) sig_names = {6: "SIGABRT", 9: "SIGKILL", 11: "SIGSEGV", 15: "SIGTERM"} sig_name = sig_names.get(sig, f"signal {sig}") return f"Crashed ({sig_name})" return f"Exited with code {rc}" async def _capture_frame(self) -> Optional[bytes]: """Capture window as JPEG using ImageMagick import.""" if not self.window_id or not IMPORT_BIN: return None try: proc = await asyncio.create_subprocess_exec( IMPORT_BIN, "-window", str(self.window_id), "-quality", "60", "jpeg:-", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=self._display_env(), ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=5) if proc.returncode == 0 and stdout: return stdout return None except asyncio.TimeoutError: return None except Exception: return None async def _discover_window(self): """Find X11 window ID for this app's process. Tries multiple strategies: 1. Direct PID match (works for simple apps) 2. Child PID match (walks /proc for descendants — Electron apps fork) 3. Any visible window on the display (last resort) """ if not XDOTOOL_BIN or not self.process: return env = self._display_env() # Strategy 1: direct PID match wid = await self._xdotool_search("--pid", str(self.process.pid), env=env) if wid: self.window_id = wid log.debug("Window %d found by PID %d for %s", wid, self.process.pid, self.app_id) return # Strategy 2: search child PIDs (Electron/forking apps) try: child_pids = self._get_descendant_pids(self.process.pid) for cpid in child_pids: wid = await self._xdotool_search("--pid", str(cpid), env=env) if wid: self.window_id = wid log.debug("Window %d found by child PID %d for %s", wid, cpid, self.app_id) return except Exception: pass # Strategy 3: search by window name or WM_CLASS for flag in ("--name", "--class"): for query in (self.title, self.command): if query: wid = await self._xdotool_search(flag, query, env=env) if wid: self.window_id = wid log.debug("Window %d found by %s '%s' for %s", wid, flag, query, self.app_id) return # Strategy 4: find ANY visible window on this display (last resort) # Each user gets their own Xvfb, so any window here must be ours wid = await self._xdotool_search("--onlyvisible", "--name", "", env=env) if wid: self.window_id = wid log.debug("Window %d found by display-wide search for %s", wid, self.app_id) return @staticmethod def _get_descendant_pids(parent_pid: int) -> list[int]: """Walk /proc to find all descendant PIDs of a process.""" children = [] try: for entry in os.listdir("/proc"): if not entry.isdigit(): continue try: with open(f"/proc/{entry}/stat") as f: stat = f.read() # Field 4 is PPID parts = stat.split() ppid = int(parts[3]) pid = int(entry) if ppid == parent_pid: children.append(pid) except (OSError, IndexError, ValueError): continue except OSError: pass # Recurse into children grandchildren = [] for cpid in children: grandchildren.extend(ManagedGuiApp._get_descendant_pids(cpid)) return children + grandchildren async def _maximize_window(self): """Activate the window, resize it to fill the Xvfb display, and move to origin.""" if not self.window_id or not XDOTOOL_BIN: return env = self._display_env() wid = str(self.window_id) # Activate (focus) the window await self._xdotool("windowactivate", "--sync", wid, env=env) # Move to origin await self._xdotool("windowmove", "--sync", wid, "0", "0", env=env) # Resize to fill display (Xvfb is 1280x1024) await self._xdotool("windowsize", "--sync", wid, "1280", "1024", env=env) # Focus the window for input rc = await self._xdotool("windowfocus", wid, env=env) if rc == 0: self._focused = True log.info("Window %s maximized and focused for %s", wid, self.app_id) else: log.warning("Window %s maximize completed but focus failed for %s", wid, self.app_id) # Some apps need a moment to redraw after resize await asyncio.sleep(0.3) async def _xdotool_search(self, *args, env=None) -> Optional[int]: """Run xdotool search and return first window ID, or None.""" try: proc = await asyncio.create_subprocess_exec( XDOTOOL_BIN, "search", *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) lines = stdout.decode().strip().splitlines() if lines: return int(lines[0]) except Exception: pass return None # ---- Input forwarding ---- async def _ensure_focus(self, env: dict): """Activate/focus the window once, then skip on subsequent calls.""" if not self._focused: wid = str(self.window_id) rc1 = await self._xdotool("windowactivate", wid, env=env) rc2 = await self._xdotool("windowfocus", wid, env=env) if rc1 == 0 and rc2 == 0: self._focused = True log.info("Window %s focused for input (app %s)", wid, self.app_id) else: log.warning("Failed to focus window %s (activate=%s, focus=%s)", wid, rc1, rc2) async def send_input(self, msg: dict): """Forward mouse/keyboard input to the X11 window.""" if not self.window_id or not XDOTOOL_BIN: return env = self._display_env() msg_type = msg.get("type") try: if msg_type == "mouse": await self._handle_mouse(msg, env) elif msg_type == "key": await self._handle_key(msg, env) except Exception: log.warning("Input forwarding error for %s", self.app_id, exc_info=True) async def _handle_mouse(self, msg: dict, env: dict): action = msg.get("action", "") x, y = str(msg.get("x", 0)), str(msg.get("y", 0)) if action == "click": await self._ensure_focus(env) btn = str(msg.get("button", 1)) # Use absolute coordinates — window is at 0,0 filling display await self._xdotool("mousemove", x, y, env=env) await self._xdotool("click", btn, env=env) elif action == "dblclick": await self._ensure_focus(env) btn = str(msg.get("button", 1)) await self._xdotool("mousemove", x, y, env=env) await self._xdotool("click", "--repeat", "2", btn, env=env) elif action == "move": self._xdotool_fire("mousemove", x, y, env=env) elif action == "scroll": delta = msg.get("delta", 0) btn = "4" if delta < 0 else "5" # X11: 4=up, 5=down await self._xdotool("mousemove", x, y, env=env) await self._xdotool("click", btn, env=env) async def _handle_key(self, msg: dict, env: dict): action = msg.get("action", "press") key = msg.get("key", "") modifiers = msg.get("modifiers", []) # Skip release events — xdotool key handles press+release if action != "press": return # Skip standalone modifier key events if key in ("Control", "Shift", "Alt", "Meta"): return await self._ensure_focus(env) # Determine which modifiers are held (excluding Shift for printable chars) active_mods = [] for m in modifiers: xmod = _KEY_MAP.get(m, m.lower()) if xmod in ("ctrl", "alt", "super"): active_mods.append(xmod) # Single printable character with no ctrl/alt/super modifiers → use xdotool type # xdotool type handles @, #, !, etc. correctly via keyboard simulation if len(key) == 1 and not active_mods: await self._xdotool("type", "--delay", "0", "--clearmodifiers", key, env=env) return # Non-printable key or key combo with modifiers → use xdotool key xkey = _KEY_MAP.get(key, key) if xkey in ("ctrl", "shift", "alt", "super"): return # For modifier combos, include shift too mod_parts = [] for m in modifiers: xmod = _KEY_MAP.get(m, m.lower()) if xmod in ("ctrl", "shift", "alt", "super"): mod_parts.append(xmod) if mod_parts: xkey = "+".join(mod_parts) + "+" + xkey await self._xdotool("key", xkey, env=env) async def _xdotool(self, *args, env=None) -> int: """Run xdotool and wait for completion. Returns exit code.""" try: proc = await asyncio.create_subprocess_exec( XDOTOOL_BIN, *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=5) if proc.returncode != 0: err = stderr.decode(errors="replace").strip() if stderr else "" log.warning("xdotool %s failed (rc=%d): %s", args[0] if args else "?", proc.returncode, err) return proc.returncode except asyncio.TimeoutError: log.warning("xdotool %s timed out", args[0] if args else "?") return -1 except Exception as e: log.warning("xdotool %s exception: %s", args[0] if args else "?", e) return -1 def _xdotool_fire(self, *args, env=None): """Schedule xdotool without waiting — fire and forget for low-latency ops.""" asyncio.create_task(self._xdotool_fire_async(*args, env=env)) async def _xdotool_fire_async(self, *args, env=None): """Fire-and-forget xdotool execution.""" try: proc = await asyncio.create_subprocess_exec( XDOTOOL_BIN, *args, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, env=env, ) await asyncio.wait_for(proc.wait(), timeout=5) except Exception: pass # ---- Lifecycle ---- def kill(self): if self._capture_task and not self._capture_task.done(): self._capture_task.cancel() if self.alive: try: self.process.terminate() except Exception: pass @property def status(self) -> str: """Return app status: 'running', 'starting', 'stopped', or 'error'.""" if not self.alive: if self.exit_reason: return "error" if "crash" in (self.exit_reason or "").lower() or ( self.process and self.process.returncode not in (None, 0) ) else "stopped" return "stopped" if self.window_id: return "running" return "starting" def to_dict(self) -> dict: d = { "app_id": self.app_id, "command": self.command, "title": self.title, "alive": self.alive, "status": self.status, "pid": self.process.pid if self.process else None, "window_id": self.window_id, "streaming": self._streaming, "viewers": len(self._websockets), "created_at": self.created_at, } if self.exit_reason: d["exit_reason"] = self.exit_reason return d # --------------------------------------------------------------------------- # Display Manager — manages Xvfb displays and GUI apps # --------------------------------------------------------------------------- class DisplayManager: """Manages virtual X displays and GUI application lifecycle.""" def __init__(self): self._xvfb_procs: dict[str, asyncio.subprocess.Process] = {} self._display_nums: dict[str, int] = {} self._apps: dict[str, dict[str, ManagedGuiApp]] = {} self._next_display = 99 def _require_deps(self): if not HAS_DISPLAY_DEPS: missing = [] if not XVFB_BIN: missing.append("Xvfb") if not XDOTOOL_BIN: missing.append("xdotool") if not IMPORT_BIN: missing.append("import (ImageMagick)") raise RuntimeError(f"Missing display dependencies: {', '.join(missing)}") async def get_or_create_display(self, username: str) -> int: """Start Xvfb if needed, return display number.""" self._require_deps() if username in self._display_nums: # Check if Xvfb is still running xvfb = self._xvfb_procs.get(username) if xvfb and xvfb.returncode is None: return self._display_nums[username] # Xvfb died — restart log.warning("Xvfb died for %s, restarting", username) display_num = self._next_display self._next_display += 1 log.info("Starting Xvfb :%d for %s", display_num, username) xvfb = await asyncio.create_subprocess_exec( XVFB_BIN, f":{display_num}", "-screen", "0", "1280x1024x24", "-ac", # disable access control "-nolisten", "tcp", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Wait briefly for Xvfb to start await asyncio.sleep(0.5) if xvfb.returncode is not None: raise RuntimeError(f"Xvfb failed to start (rc={xvfb.returncode})") self._xvfb_procs[username] = xvfb self._display_nums[username] = display_num log.info("Xvfb :%d started for %s (pid %d)", display_num, username, xvfb.pid) return display_num async def launch_app( self, username: str, command: str, title: str = "", args: list[str] | None = None, target_fps: int = 10, ) -> ManagedGuiApp: """Launch a GUI application on the user's virtual display.""" # Validate command if not _SAFE_CMD.match(command): raise ValueError(f"Invalid command name: {command}") # Find the binary cmd_bin = _find_bin(command) if not cmd_bin: raise FileNotFoundError(f"Command not found: {command}") display_num = await self.get_or_create_display(username) # Resolve the user's home directory (not root's) try: _uid, _gid, user_home, _shell = get_user_info(username) except KeyError: user_home = os.path.expanduser("~") env = { **os.environ, "DISPLAY": f":{display_num}", "HOME": user_home, "USER": username, "LOGNAME": username, } # Ensure PATH path = env.get("PATH", "") for p in _SEARCH_PATHS: if p not in path: path = p + ":" + path env["PATH"] = path cmd = [cmd_bin] + (args or []) log.info("Launching GUI app: %s (display :%d, user %s)", " ".join(cmd), display_num, username) # Drop privileges to the authenticated user try: preexec = make_preexec_fn(username) except KeyError: preexec = None log.warning("User %s not found on system — app will run as service user", username) process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, cwd=user_home, preexec_fn=preexec, ) app_id = str(uuid.uuid4())[:8] app = ManagedGuiApp( app_id=app_id, command=command, title=title or command, display_num=display_num, process=process, target_fps=target_fps, ) app.start_capture() self._apps.setdefault(username, {})[app_id] = app log.info("GUI app %s launched: %s (pid %d)", app_id, command, process.pid) return app def get_app(self, username: str, app_id: str) -> Optional[ManagedGuiApp]: return self._apps.get(username, {}).get(app_id) def get_app_by_command(self, username: str, command: str) -> Optional[ManagedGuiApp]: """Find a running app by command name.""" for app in self._apps.get(username, {}).values(): if app.command == command and app.alive: return app return None def close_app(self, username: str, app_id: str) -> bool: user_apps = self._apps.get(username, {}) app = user_apps.pop(app_id, None) if not app: return False app.kill() log.info("Closed GUI app %s (%s) for %s", app_id, app.command, username) return True def list_apps(self, username: str) -> list[dict]: user_apps = self._apps.get(username, {}) # Prune dead apps dead = [aid for aid, a in user_apps.items() if not a.alive] for aid in dead: app = user_apps.pop(aid) app.kill() return [a.to_dict() for a in user_apps.values()] async def autostart_apps(self, gui_apps: list[dict], default_user: str | None = None): """Launch all apps with autostart=True. If *default_user* is set, apps start immediately as that user. Otherwise autostart is deferred until the first user logs in. """ if not HAS_DISPLAY_DEPS: log.info("Display deps not available, skipping autostart") return autostart_list = [a for a in gui_apps if a.get("autostart") and a.get("command")] if not autostart_list: return if not default_user: log.info("No default_user configured — deferring autostart until first login") self._pending_autostart = autostart_list return username = default_user started = 0 for app_cfg in autostart_list: command = app_cfg["command"] if self.get_app_by_command(username, command): log.debug("Autostart skip (already running): %s", command) continue try: await self.launch_app( username=username, command=command, title=app_cfg.get("name", command), args=app_cfg.get("args", []), target_fps=app_cfg.get("target_fps", 10), ) started += 1 log.info("Autostarted: %s (as %s)", command, username) except Exception as e: log.warning("Failed to autostart %s: %s", command, e) if started: log.info("Autostarted %d application(s) as %s", started, username) async def trigger_deferred_autostart(self, username: str): """Launch deferred autostart apps on first user login.""" pending = getattr(self, "_pending_autostart", None) if not pending: return self._pending_autostart = None log.info("Triggering deferred autostart for %s", username) await self.autostart_apps(pending, default_user=username) async def shutdown_all(self): """Kill all apps and Xvfb displays.""" for username, apps in self._apps.items(): for app in apps.values(): app.kill() self._apps.clear() for username, xvfb in self._xvfb_procs.items(): if xvfb.returncode is None: try: xvfb.terminate() await asyncio.wait_for(xvfb.wait(), timeout=5) except Exception: xvfb.kill() self._xvfb_procs.clear() self._display_nums.clear() log.info("All displays shut down") # --------------------------------------------------------------------------- # Singleton # --------------------------------------------------------------------------- display_manager = DisplayManager()