atlus/backend/display.py
roberts 36eff81111 Add bidirectional clipboard sync between Xvfb and browser
X11 → Browser:
- Background task polls xclip every 500ms for clipboard changes
- When content changes, sends {"type":"clipboard","data":"..."} via WS
- Browser receives and writes to navigator.clipboard

Browser → X11:
- Ctrl+V reads navigator.clipboard.readText() first
- Sends clipboard content to backend via WS message
- Backend writes to X11 clipboard via xclip -selection clipboard
- Then forwards the Ctrl+V keystroke so the app pastes

This allows copying links/text from GUI apps (like Nextcloud auth URLs)
and pasting content from the host browser into the GUI app.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 01:26:42 -05:00

872 lines
33 KiB
Python

"""Atlus — GUI application display manager.
Manages a virtual X11 display (Xvfb) and per-window frame capture
for streaming native GUI applications to the browser via WebSocket.
Each GUI app window is captured independently and streamed as JPEG frames.
"""
import asyncio
import logging
import os
import re
import shutil
import signal
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional
from backend.privdrop import get_user_info, make_preexec_fn
log = logging.getLogger("atlus.display")
# ---------------------------------------------------------------------------
# Binary discovery (systemd strips PATH)
# ---------------------------------------------------------------------------
_SEARCH_PATHS = ("/usr/bin", "/usr/local/bin", "/bin", "/usr/sbin", "/sbin")
def _find_bin(name: str) -> Optional[str]:
found = shutil.which(name)
if found:
return found
for d in _SEARCH_PATHS:
p = os.path.join(d, name)
if os.path.isfile(p) and os.access(p, os.X_OK):
return p
return None
XVFB_BIN = _find_bin("Xvfb")
XDOTOOL_BIN = _find_bin("xdotool")
IMPORT_BIN = _find_bin("import") # ImageMagick
XCLIP_BIN = _find_bin("xclip")
HAS_DISPLAY_DEPS = all((XVFB_BIN, XDOTOOL_BIN, IMPORT_BIN))
# Allowed command pattern — alphanumeric + hyphens only
_SAFE_CMD = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._\-]*$")
# Key translation: DOM key names → xdotool key names
_KEY_MAP = {
"Control": "ctrl",
"Shift": "shift",
"Alt": "alt",
"Meta": "super",
"Enter": "Return",
"Backspace": "BackSpace",
"Delete": "Delete",
"Escape": "Escape",
"Tab": "Tab",
"ArrowUp": "Up",
"ArrowDown": "Down",
"ArrowLeft": "Left",
"ArrowRight": "Right",
"Home": "Home",
"End": "End",
"PageUp": "Prior",
"PageDown": "Next",
"Insert": "Insert",
" ": "space",
"F1": "F1", "F2": "F2", "F3": "F3", "F4": "F4",
"F5": "F5", "F6": "F6", "F7": "F7", "F8": "F8",
"F9": "F9", "F10": "F10", "F11": "F11", "F12": "F12",
}
# ---------------------------------------------------------------------------
# ManagedGuiApp — a GUI process with per-window frame capture
# ---------------------------------------------------------------------------
@dataclass
class ManagedGuiApp:
"""A GUI application running on a virtual X display."""
app_id: str
command: str
title: str
display_num: int
process: Optional[asyncio.subprocess.Process] = None
window_id: Optional[int] = None
target_fps: int = 10
created_at: float = field(default_factory=time.time)
last_frame: Optional[bytes] = field(default=None, repr=False)
exit_reason: Optional[str] = field(default=None, repr=False)
_capture_task: Optional[asyncio.Task] = field(default=None, repr=False)
_stderr_task: Optional[asyncio.Task] = field(default=None, repr=False)
_stderr_lines: list = field(default_factory=list, repr=False)
_websockets: list = field(default_factory=list, repr=False)
_streaming: bool = field(default=False, repr=False)
_focused: bool = field(default=False, repr=False)
_clipboard_task: Optional[asyncio.Task] = field(default=None, repr=False)
_last_clipboard: str = field(default="", repr=False)
@property
def alive(self) -> bool:
return self.process is not None and self.process.returncode is None
def _display_env(self) -> dict:
env = {**os.environ, "DISPLAY": f":{self.display_num}"}
# Ensure PATH has common binary dirs
path = env.get("PATH", "")
for p in _SEARCH_PATHS:
if p not in path:
path = p + ":" + path
env["PATH"] = path
return env
# ---- WebSocket fan-out ----
def attach_ws(self, ws):
if ws not in self._websockets:
self._websockets.append(ws)
self._streaming = True
def detach_ws(self, ws):
try:
self._websockets.remove(ws)
except ValueError:
pass
if not self._websockets:
self._streaming = False
# ---- Frame capture ----
def start_capture(self):
if self._capture_task is None or self._capture_task.done():
self._capture_task = asyncio.create_task(self._capture_loop())
if self._stderr_task is None or self._stderr_task.done():
self._stderr_task = asyncio.create_task(self._read_stderr())
if XCLIP_BIN and (self._clipboard_task is None or self._clipboard_task.done()):
self._clipboard_task = asyncio.create_task(self._clipboard_loop())
async def _read_stderr(self):
"""Read stderr in background, keep last N lines for exit diagnostics."""
if not self.process or not self.process.stderr:
return
try:
while True:
line = await self.process.stderr.readline()
if not line:
break
decoded = line.decode(errors="replace").rstrip()
if decoded:
self._stderr_lines.append(decoded)
# Keep only last 20 lines
if len(self._stderr_lines) > 20:
self._stderr_lines = self._stderr_lines[-20:]
except Exception:
pass
async def _clipboard_loop(self):
"""Poll X11 clipboard and push changes to connected browsers."""
if not XCLIP_BIN:
return
env = self._display_env()
while self.alive:
try:
await asyncio.sleep(0.5)
if not self._websockets:
continue
# Read clipboard from X11
proc = await asyncio.create_subprocess_exec(
XCLIP_BIN, "-selection", "clipboard", "-o",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=2)
if proc.returncode != 0 or not stdout:
continue
text = stdout.decode(errors="replace")
if text and text != self._last_clipboard:
self._last_clipboard = text
log.debug("Clipboard changed for %s: %d chars", self.app_id, len(text))
dead = []
for ws in self._websockets:
try:
await ws.send_json({"type": "clipboard", "data": text})
except Exception:
dead.append(ws)
for ws in dead:
self.detach_ws(ws)
except asyncio.TimeoutError:
continue
except Exception:
await asyncio.sleep(2)
async def set_clipboard(self, text: str):
"""Write text to the X11 clipboard from the browser."""
if not XCLIP_BIN:
return
env = self._display_env()
try:
proc = await asyncio.create_subprocess_exec(
XCLIP_BIN, "-selection", "clipboard",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
await asyncio.wait_for(proc.communicate(input=text.encode()), timeout=2)
self._last_clipboard = text # prevent echo back
log.debug("Clipboard set from browser for %s: %d chars", self.app_id, len(text))
except Exception as e:
log.warning("Failed to set clipboard for %s: %s", self.app_id, e)
async def _capture_loop(self):
"""Background: capture window pixmap → JPEG → fan-out."""
# Wait for window to appear (up to ~60s for heavy apps like Electron)
for attempt in range(60):
if not self.alive:
log.warning("App %s (%s) exited during window discovery (rc=%s). stderr: %s",
self.app_id, self.command, self.process.returncode,
"; ".join(self._stderr_lines[-5:]) if self._stderr_lines else "(empty)")
# Notify any attached viewers about the crash
for ws in list(self._websockets):
try:
exit_msg = self._build_exit_reason()
await ws.send_json({"type": "closed", "data": exit_msg or "Application crashed during startup"})
except Exception:
pass
return
await self._discover_window()
if self.window_id:
break
if attempt % 10 == 9:
log.info("Still waiting for window for %s (%s) — attempt %d",
self.app_id, self.command, attempt + 1)
await asyncio.sleep(1.0)
if not self.window_id:
log.warning("No window found for app %s (%s) after 60s", self.app_id, self.command)
# Notify any attached viewers
for ws in list(self._websockets):
try:
await ws.send_json({"type": "error", "data": "No window found for application"})
except Exception:
pass
return
# Maximize the window to fill the Xvfb display and activate it
await self._maximize_window()
log.info("Capture loop started for app %s window %d", self.app_id, self.window_id)
while self.alive:
if not self._streaming or not self._websockets:
await asyncio.sleep(0.5)
continue
try:
frame = await self._capture_frame()
if frame:
self.last_frame = frame
dead = []
for ws in self._websockets:
try:
await ws.send_bytes(frame)
except Exception:
dead.append(ws)
for ws in dead:
self.detach_ws(ws)
except Exception:
log.exception("Capture error for %s", self.app_id)
break
await asyncio.sleep(1.0 / self.target_fps)
# Process exited — build exit reason from stderr + return code
exit_msg = self._build_exit_reason()
self.exit_reason = exit_msg
log.info("App %s exited: %s", self.app_id, exit_msg)
for ws in list(self._websockets):
try:
await ws.send_json({"type": "closed", "data": exit_msg})
except Exception:
pass
log.info("Capture loop ended for app %s", self.app_id)
def _build_exit_reason(self) -> str:
"""Build a human-readable exit reason from stderr output and return code."""
rc = self.process.returncode if self.process else None
# Filter stderr for meaningful error lines
error_hints = []
for line in self._stderr_lines:
low = line.lower()
# Skip noisy Qt/GTK debug lines
if any(skip in low for skip in ("qt.qpa.xcb: qxcb", "libgl", "mesa",
"dbind-warning", "g_dbus", "gtk-warning")):
continue
# Keep lines with error-like keywords
if any(kw in low for kw in ("error", "fatal", "failed", "abort",
"cannot", "could not", "not found",
"no such", "missing", "permission denied",
"segfault", "signal", "killed", "crash")):
error_hints.append(line)
if error_hints:
# Show last few relevant error lines
hint_text = "\n".join(error_hints[-5:])
if rc is not None and rc < 0:
sig = abs(rc)
return f"Killed by signal {sig}\n{hint_text}"
return hint_text
# No meaningful stderr — use return code
if rc is None:
return "Application exited unexpectedly"
if rc == 0:
return "Application closed normally"
if rc < 0:
sig = abs(rc)
sig_names = {6: "SIGABRT", 9: "SIGKILL", 11: "SIGSEGV", 15: "SIGTERM"}
sig_name = sig_names.get(sig, f"signal {sig}")
return f"Crashed ({sig_name})"
return f"Exited with code {rc}"
async def _capture_frame(self) -> Optional[bytes]:
"""Capture window as JPEG using ImageMagick import."""
if not self.window_id or not IMPORT_BIN:
return None
try:
proc = await asyncio.create_subprocess_exec(
IMPORT_BIN, "-window", str(self.window_id),
"-quality", "60", "jpeg:-",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._display_env(),
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=5)
if proc.returncode == 0 and stdout:
return stdout
return None
except asyncio.TimeoutError:
return None
except Exception:
return None
async def _discover_window(self):
"""Find X11 window ID for this app's process.
Tries multiple strategies:
1. Direct PID match (works for simple apps)
2. Child PID match (walks /proc for descendants — Electron apps fork)
3. Any visible window on the display (last resort)
"""
if not XDOTOOL_BIN or not self.process:
return
env = self._display_env()
# Strategy 1: direct PID match
wid = await self._xdotool_search("--pid", str(self.process.pid), env=env)
if wid:
self.window_id = wid
log.debug("Window %d found by PID %d for %s", wid, self.process.pid, self.app_id)
return
# Strategy 2: search child PIDs (Electron/forking apps)
try:
child_pids = self._get_descendant_pids(self.process.pid)
for cpid in child_pids:
wid = await self._xdotool_search("--pid", str(cpid), env=env)
if wid:
self.window_id = wid
log.debug("Window %d found by child PID %d for %s", wid, cpid, self.app_id)
return
except Exception:
pass
# Strategy 3: search by window name or WM_CLASS
for flag in ("--name", "--class"):
for query in (self.title, self.command):
if query:
wid = await self._xdotool_search(flag, query, env=env)
if wid:
self.window_id = wid
log.debug("Window %d found by %s '%s' for %s", wid, flag, query, self.app_id)
return
# Strategy 4: find ANY visible window on this display (last resort)
# Each user gets their own Xvfb, so any window here must be ours
wid = await self._xdotool_search("--onlyvisible", "--name", "", env=env)
if wid:
self.window_id = wid
log.debug("Window %d found by display-wide search for %s", wid, self.app_id)
return
@staticmethod
def _get_descendant_pids(parent_pid: int) -> list[int]:
"""Walk /proc to find all descendant PIDs of a process."""
children = []
try:
for entry in os.listdir("/proc"):
if not entry.isdigit():
continue
try:
with open(f"/proc/{entry}/stat") as f:
stat = f.read()
# Field 4 is PPID
parts = stat.split()
ppid = int(parts[3])
pid = int(entry)
if ppid == parent_pid:
children.append(pid)
except (OSError, IndexError, ValueError):
continue
except OSError:
pass
# Recurse into children
grandchildren = []
for cpid in children:
grandchildren.extend(ManagedGuiApp._get_descendant_pids(cpid))
return children + grandchildren
async def _maximize_window(self):
"""Activate the window, resize it to fill the Xvfb display, and move to origin."""
if not self.window_id or not XDOTOOL_BIN:
return
env = self._display_env()
wid = str(self.window_id)
# Activate (focus) the window
await self._xdotool("windowactivate", "--sync", wid, env=env)
# Move to origin
await self._xdotool("windowmove", "--sync", wid, "0", "0", env=env)
# Resize to fill display (Xvfb is 1280x1024)
await self._xdotool("windowsize", "--sync", wid, "1280", "1024", env=env)
# Focus the window for input
rc = await self._xdotool("windowfocus", wid, env=env)
if rc == 0:
self._focused = True
log.info("Window %s maximized and focused for %s", wid, self.app_id)
else:
log.warning("Window %s maximize completed but focus failed for %s", wid, self.app_id)
# Some apps need a moment to redraw after resize
await asyncio.sleep(0.3)
async def _xdotool_search(self, *args, env=None) -> Optional[int]:
"""Run xdotool search and return first window ID, or None."""
try:
proc = await asyncio.create_subprocess_exec(
XDOTOOL_BIN, "search", *args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
lines = stdout.decode().strip().splitlines()
if lines:
return int(lines[0])
except Exception:
pass
return None
# ---- Input forwarding ----
async def _ensure_focus(self, env: dict):
"""Activate/focus the window once, then skip on subsequent calls."""
if not self._focused:
wid = str(self.window_id)
rc1 = await self._xdotool("windowactivate", wid, env=env)
rc2 = await self._xdotool("windowfocus", wid, env=env)
if rc1 == 0 and rc2 == 0:
self._focused = True
log.info("Window %s focused for input (app %s)", wid, self.app_id)
else:
log.warning("Failed to focus window %s (activate=%s, focus=%s)", wid, rc1, rc2)
async def send_input(self, msg: dict):
"""Forward mouse/keyboard input to the X11 window."""
if not self.window_id or not XDOTOOL_BIN:
return
env = self._display_env()
msg_type = msg.get("type")
try:
if msg_type == "mouse":
await self._handle_mouse(msg, env)
elif msg_type == "key":
await self._handle_key(msg, env)
except Exception:
log.warning("Input forwarding error for %s", self.app_id, exc_info=True)
async def _handle_mouse(self, msg: dict, env: dict):
action = msg.get("action", "")
x, y = str(msg.get("x", 0)), str(msg.get("y", 0))
if action == "click":
await self._ensure_focus(env)
btn = str(msg.get("button", 1))
# Use absolute coordinates — window is at 0,0 filling display
await self._xdotool("mousemove", x, y, env=env)
await self._xdotool("click", btn, env=env)
elif action == "dblclick":
await self._ensure_focus(env)
btn = str(msg.get("button", 1))
await self._xdotool("mousemove", x, y, env=env)
await self._xdotool("click", "--repeat", "2", btn, env=env)
elif action == "move":
self._xdotool_fire("mousemove", x, y, env=env)
elif action == "scroll":
delta = msg.get("delta", 0)
btn = "4" if delta < 0 else "5" # X11: 4=up, 5=down
await self._xdotool("mousemove", x, y, env=env)
await self._xdotool("click", btn, env=env)
async def _handle_key(self, msg: dict, env: dict):
action = msg.get("action", "press")
key = msg.get("key", "")
modifiers = msg.get("modifiers", [])
# Skip release events — xdotool key handles press+release
if action != "press":
return
# Skip standalone modifier key events
if key in ("Control", "Shift", "Alt", "Meta"):
return
await self._ensure_focus(env)
# Determine which modifiers are held (excluding Shift for printable chars)
active_mods = []
for m in modifiers:
xmod = _KEY_MAP.get(m, m.lower())
if xmod in ("ctrl", "alt", "super"):
active_mods.append(xmod)
# Single printable character with no ctrl/alt/super modifiers → use xdotool type
# xdotool type handles @, #, !, etc. correctly via keyboard simulation
if len(key) == 1 and not active_mods:
await self._xdotool("type", "--delay", "0", "--clearmodifiers", key, env=env)
return
# Non-printable key or key combo with modifiers → use xdotool key
xkey = _KEY_MAP.get(key, key)
if xkey in ("ctrl", "shift", "alt", "super"):
return
# For modifier combos, include shift too
mod_parts = []
for m in modifiers:
xmod = _KEY_MAP.get(m, m.lower())
if xmod in ("ctrl", "shift", "alt", "super"):
mod_parts.append(xmod)
if mod_parts:
xkey = "+".join(mod_parts) + "+" + xkey
await self._xdotool("key", xkey, env=env)
async def _xdotool(self, *args, env=None) -> int:
"""Run xdotool and wait for completion. Returns exit code."""
try:
proc = await asyncio.create_subprocess_exec(
XDOTOOL_BIN, *args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=5)
if proc.returncode != 0:
err = stderr.decode(errors="replace").strip() if stderr else ""
log.warning("xdotool %s failed (rc=%d): %s", args[0] if args else "?", proc.returncode, err)
return proc.returncode
except asyncio.TimeoutError:
log.warning("xdotool %s timed out", args[0] if args else "?")
return -1
except Exception as e:
log.warning("xdotool %s exception: %s", args[0] if args else "?", e)
return -1
def _xdotool_fire(self, *args, env=None):
"""Schedule xdotool without waiting — fire and forget for low-latency ops."""
asyncio.create_task(self._xdotool_fire_async(*args, env=env))
async def _xdotool_fire_async(self, *args, env=None):
"""Fire-and-forget xdotool execution."""
try:
proc = await asyncio.create_subprocess_exec(
XDOTOOL_BIN, *args,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
env=env,
)
await asyncio.wait_for(proc.wait(), timeout=5)
except Exception:
pass
# ---- Lifecycle ----
def kill(self):
if self._capture_task and not self._capture_task.done():
self._capture_task.cancel()
if self._clipboard_task and not self._clipboard_task.done():
self._clipboard_task.cancel()
if self.alive:
try:
self.process.terminate()
except Exception:
pass
@property
def status(self) -> str:
"""Return app status: 'running', 'starting', 'stopped', or 'error'."""
if not self.alive:
if self.exit_reason:
return "error" if "crash" in (self.exit_reason or "").lower() or (
self.process and self.process.returncode not in (None, 0)
) else "stopped"
return "stopped"
if self.window_id:
return "running"
return "starting"
def to_dict(self) -> dict:
d = {
"app_id": self.app_id,
"command": self.command,
"title": self.title,
"alive": self.alive,
"status": self.status,
"pid": self.process.pid if self.process else None,
"window_id": self.window_id,
"streaming": self._streaming,
"viewers": len(self._websockets),
"created_at": self.created_at,
}
if self.exit_reason:
d["exit_reason"] = self.exit_reason
return d
# ---------------------------------------------------------------------------
# Display Manager — manages Xvfb displays and GUI apps
# ---------------------------------------------------------------------------
class DisplayManager:
"""Manages virtual X displays and GUI application lifecycle."""
def __init__(self):
self._xvfb_procs: dict[str, asyncio.subprocess.Process] = {}
self._display_nums: dict[str, int] = {}
self._apps: dict[str, dict[str, ManagedGuiApp]] = {}
self._next_display = 99
def _require_deps(self):
if not HAS_DISPLAY_DEPS:
missing = []
if not XVFB_BIN:
missing.append("Xvfb")
if not XDOTOOL_BIN:
missing.append("xdotool")
if not IMPORT_BIN:
missing.append("import (ImageMagick)")
raise RuntimeError(f"Missing display dependencies: {', '.join(missing)}")
async def get_or_create_display(self, username: str) -> int:
"""Start Xvfb if needed, return display number."""
self._require_deps()
if username in self._display_nums:
# Check if Xvfb is still running
xvfb = self._xvfb_procs.get(username)
if xvfb and xvfb.returncode is None:
return self._display_nums[username]
# Xvfb died — restart
log.warning("Xvfb died for %s, restarting", username)
display_num = self._next_display
self._next_display += 1
log.info("Starting Xvfb :%d for %s", display_num, username)
xvfb = await asyncio.create_subprocess_exec(
XVFB_BIN, f":{display_num}",
"-screen", "0", "1280x1024x24",
"-ac", # disable access control
"-nolisten", "tcp",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Wait briefly for Xvfb to start
await asyncio.sleep(0.5)
if xvfb.returncode is not None:
raise RuntimeError(f"Xvfb failed to start (rc={xvfb.returncode})")
self._xvfb_procs[username] = xvfb
self._display_nums[username] = display_num
log.info("Xvfb :%d started for %s (pid %d)", display_num, username, xvfb.pid)
return display_num
async def launch_app(
self, username: str, command: str, title: str = "",
args: list[str] | None = None, target_fps: int = 10,
) -> ManagedGuiApp:
"""Launch a GUI application on the user's virtual display."""
# Validate command
if not _SAFE_CMD.match(command):
raise ValueError(f"Invalid command name: {command}")
# Find the binary
cmd_bin = _find_bin(command)
if not cmd_bin:
raise FileNotFoundError(f"Command not found: {command}")
display_num = await self.get_or_create_display(username)
# Resolve the user's home directory (not root's)
try:
_uid, _gid, user_home, _shell = get_user_info(username)
except KeyError:
user_home = os.path.expanduser("~")
env = {
**os.environ,
"DISPLAY": f":{display_num}",
"HOME": user_home,
"USER": username,
"LOGNAME": username,
}
# Ensure PATH
path = env.get("PATH", "")
for p in _SEARCH_PATHS:
if p not in path:
path = p + ":" + path
env["PATH"] = path
cmd = [cmd_bin] + (args or [])
log.info("Launching GUI app: %s (display :%d, user %s)", " ".join(cmd), display_num, username)
# Drop privileges to the authenticated user
try:
preexec = make_preexec_fn(username)
except KeyError:
preexec = None
log.warning("User %s not found on system — app will run as service user", username)
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=user_home,
preexec_fn=preexec,
)
app_id = str(uuid.uuid4())[:8]
app = ManagedGuiApp(
app_id=app_id,
command=command,
title=title or command,
display_num=display_num,
process=process,
target_fps=target_fps,
)
app.start_capture()
self._apps.setdefault(username, {})[app_id] = app
log.info("GUI app %s launched: %s (pid %d)", app_id, command, process.pid)
return app
def get_app(self, username: str, app_id: str) -> Optional[ManagedGuiApp]:
return self._apps.get(username, {}).get(app_id)
def get_app_by_command(self, username: str, command: str) -> Optional[ManagedGuiApp]:
"""Find a running app by command name."""
for app in self._apps.get(username, {}).values():
if app.command == command and app.alive:
return app
return None
def close_app(self, username: str, app_id: str) -> bool:
user_apps = self._apps.get(username, {})
app = user_apps.pop(app_id, None)
if not app:
return False
app.kill()
log.info("Closed GUI app %s (%s) for %s", app_id, app.command, username)
return True
def list_apps(self, username: str) -> list[dict]:
user_apps = self._apps.get(username, {})
# Prune dead apps
dead = [aid for aid, a in user_apps.items() if not a.alive]
for aid in dead:
app = user_apps.pop(aid)
app.kill()
return [a.to_dict() for a in user_apps.values()]
async def autostart_apps(self, gui_apps: list[dict], default_user: str | None = None):
"""Launch all apps with autostart=True.
If *default_user* is set, apps start immediately as that user.
Otherwise autostart is deferred until the first user logs in.
"""
if not HAS_DISPLAY_DEPS:
log.info("Display deps not available, skipping autostart")
return
autostart_list = [a for a in gui_apps if a.get("autostart") and a.get("command")]
if not autostart_list:
return
if not default_user:
log.info("No default_user configured — deferring autostart until first login")
self._pending_autostart = autostart_list
return
username = default_user
started = 0
for app_cfg in autostart_list:
command = app_cfg["command"]
if self.get_app_by_command(username, command):
log.debug("Autostart skip (already running): %s", command)
continue
try:
await self.launch_app(
username=username,
command=command,
title=app_cfg.get("name", command),
args=app_cfg.get("args", []),
target_fps=app_cfg.get("target_fps", 10),
)
started += 1
log.info("Autostarted: %s (as %s)", command, username)
except Exception as e:
log.warning("Failed to autostart %s: %s", command, e)
if started:
log.info("Autostarted %d application(s) as %s", started, username)
async def trigger_deferred_autostart(self, username: str):
"""Launch deferred autostart apps on first user login."""
pending = getattr(self, "_pending_autostart", None)
if not pending:
return
self._pending_autostart = None
log.info("Triggering deferred autostart for %s", username)
await self.autostart_apps(pending, default_user=username)
async def shutdown_all(self):
"""Kill all apps and Xvfb displays."""
for username, apps in self._apps.items():
for app in apps.values():
app.kill()
self._apps.clear()
for username, xvfb in self._xvfb_procs.items():
if xvfb.returncode is None:
try:
xvfb.terminate()
await asyncio.wait_for(xvfb.wait(), timeout=5)
except Exception:
xvfb.kill()
self._xvfb_procs.clear()
self._display_nums.clear()
log.info("All displays shut down")
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
display_manager = DisplayManager()