atlus/backend/display.py
roberts 23e4906d08 Replace panel services with docked apps, add autostart and update scanner settings
Panel: SERVICES section becomes APPLICATIONS — shows configured GUI apps with
status dots, launch/stop controls, and "+ Add" button linking to Settings.

Backend: DisplayManager.autostart_apps() launches autostart-enabled GUI apps on
service startup (always-on desktop session). Lifespan calls it before yield.

Settings: new Applications section for managing GUI apps (add/remove/autostart
toggle). General section gains update scanner interval + enable/disable toggle.
Config adds update_check_enabled and update_check_interval fields.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 23:40:44 -05:00

533 lines
18 KiB
Python

"""Atlus — GUI application display manager.
Manages a virtual X11 display (Xvfb) and per-window frame capture
for streaming native GUI applications to the browser via WebSocket.
Each GUI app window is captured independently and streamed as JPEG frames.
"""
import asyncio
import getpass
import logging
import os
import re
import shutil
import signal
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional
log = logging.getLogger("atlus.display")
# ---------------------------------------------------------------------------
# Binary discovery (systemd strips PATH)
# ---------------------------------------------------------------------------
_SEARCH_PATHS = ("/usr/bin", "/usr/local/bin", "/bin", "/usr/sbin", "/sbin")
def _find_bin(name: str) -> Optional[str]:
found = shutil.which(name)
if found:
return found
for d in _SEARCH_PATHS:
p = os.path.join(d, name)
if os.path.isfile(p) and os.access(p, os.X_OK):
return p
return None
XVFB_BIN = _find_bin("Xvfb")
XDOTOOL_BIN = _find_bin("xdotool")
IMPORT_BIN = _find_bin("import") # ImageMagick
HAS_DISPLAY_DEPS = all((XVFB_BIN, XDOTOOL_BIN, IMPORT_BIN))
# Allowed command pattern — alphanumeric + hyphens only
_SAFE_CMD = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9._\-]*$")
# Key translation: DOM key names → xdotool key names
_KEY_MAP = {
"Control": "ctrl",
"Shift": "shift",
"Alt": "alt",
"Meta": "super",
"Enter": "Return",
"Backspace": "BackSpace",
"Delete": "Delete",
"Escape": "Escape",
"Tab": "Tab",
"ArrowUp": "Up",
"ArrowDown": "Down",
"ArrowLeft": "Left",
"ArrowRight": "Right",
"Home": "Home",
"End": "End",
"PageUp": "Prior",
"PageDown": "Next",
"Insert": "Insert",
" ": "space",
"F1": "F1", "F2": "F2", "F3": "F3", "F4": "F4",
"F5": "F5", "F6": "F6", "F7": "F7", "F8": "F8",
"F9": "F9", "F10": "F10", "F11": "F11", "F12": "F12",
}
# ---------------------------------------------------------------------------
# ManagedGuiApp — a GUI process with per-window frame capture
# ---------------------------------------------------------------------------
@dataclass
class ManagedGuiApp:
"""A GUI application running on a virtual X display."""
app_id: str
command: str
title: str
display_num: int
process: Optional[asyncio.subprocess.Process] = None
window_id: Optional[int] = None
target_fps: int = 10
created_at: float = field(default_factory=time.time)
last_frame: Optional[bytes] = field(default=None, repr=False)
_capture_task: Optional[asyncio.Task] = field(default=None, repr=False)
_websockets: list = field(default_factory=list, repr=False)
_streaming: bool = field(default=False, repr=False)
@property
def alive(self) -> bool:
return self.process is not None and self.process.returncode is None
def _display_env(self) -> dict:
env = {**os.environ, "DISPLAY": f":{self.display_num}"}
# Ensure PATH has common binary dirs
path = env.get("PATH", "")
for p in _SEARCH_PATHS:
if p not in path:
path = p + ":" + path
env["PATH"] = path
return env
# ---- WebSocket fan-out ----
def attach_ws(self, ws):
if ws not in self._websockets:
self._websockets.append(ws)
self._streaming = True
def detach_ws(self, ws):
try:
self._websockets.remove(ws)
except ValueError:
pass
if not self._websockets:
self._streaming = False
# ---- Frame capture ----
def start_capture(self):
if self._capture_task is None or self._capture_task.done():
self._capture_task = asyncio.create_task(self._capture_loop())
async def _capture_loop(self):
"""Background: capture window pixmap → JPEG → fan-out."""
# Wait for window to appear
for attempt in range(20):
if not self.alive:
return
await self._discover_window()
if self.window_id:
break
await asyncio.sleep(0.5 * (1 + attempt * 0.2))
if not self.window_id:
log.warning("No window found for app %s (%s)", self.app_id, self.command)
# Notify any attached viewers
for ws in list(self._websockets):
try:
await ws.send_json({"type": "error", "data": "No window found for application"})
except Exception:
pass
return
log.info("Capture loop started for app %s window %d", self.app_id, self.window_id)
while self.alive:
if not self._streaming or not self._websockets:
await asyncio.sleep(0.5)
continue
try:
frame = await self._capture_frame()
if frame:
self.last_frame = frame
dead = []
for ws in self._websockets:
try:
await ws.send_bytes(frame)
except Exception:
dead.append(ws)
for ws in dead:
self.detach_ws(ws)
except Exception:
log.exception("Capture error for %s", self.app_id)
break
await asyncio.sleep(1.0 / self.target_fps)
# Process exited — notify viewers
for ws in list(self._websockets):
try:
await ws.send_json({"type": "closed", "data": "Application exited"})
except Exception:
pass
log.info("Capture loop ended for app %s", self.app_id)
async def _capture_frame(self) -> Optional[bytes]:
"""Capture window as JPEG using ImageMagick import."""
if not self.window_id or not IMPORT_BIN:
return None
try:
proc = await asyncio.create_subprocess_exec(
IMPORT_BIN, "-window", str(self.window_id),
"-quality", "60", "jpeg:-",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._display_env(),
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=5)
if proc.returncode == 0 and stdout:
return stdout
return None
except asyncio.TimeoutError:
return None
except Exception:
return None
async def _discover_window(self):
"""Find X11 window ID for this app's process."""
if not XDOTOOL_BIN or not self.process:
return
try:
proc = await asyncio.create_subprocess_exec(
XDOTOOL_BIN, "search", "--pid", str(self.process.pid),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._display_env(),
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
lines = stdout.decode().strip().splitlines()
if lines:
self.window_id = int(lines[0])
log.debug("Discovered window %d for app %s (pid %d)",
self.window_id, self.app_id, self.process.pid)
except Exception:
pass
# ---- Input forwarding ----
async def send_input(self, msg: dict):
"""Forward mouse/keyboard input to the X11 window."""
if not self.window_id or not XDOTOOL_BIN:
return
env = self._display_env()
msg_type = msg.get("type")
try:
if msg_type == "mouse":
await self._handle_mouse(msg, env)
elif msg_type == "key":
await self._handle_key(msg, env)
except Exception:
log.debug("Input forwarding error for %s", self.app_id, exc_info=True)
async def _handle_mouse(self, msg: dict, env: dict):
action = msg.get("action", "")
x, y = str(msg.get("x", 0)), str(msg.get("y", 0))
wid = str(self.window_id)
if action == "click":
btn = str(msg.get("button", 1))
await self._xdotool(
"mousemove", "--window", wid, x, y,
"click", "--window", wid, btn,
env=env,
)
elif action == "dblclick":
btn = str(msg.get("button", 1))
await self._xdotool(
"mousemove", "--window", wid, x, y,
"click", "--window", wid, "--repeat", "2", btn,
env=env,
)
elif action == "move":
await self._xdotool(
"mousemove", "--window", wid, x, y,
env=env,
)
elif action == "scroll":
delta = msg.get("delta", 0)
btn = "4" if delta < 0 else "5" # X11: 4=up, 5=down
await self._xdotool(
"mousemove", "--window", wid, x, y,
"click", "--window", wid, btn,
env=env,
)
async def _handle_key(self, msg: dict, env: dict):
action = msg.get("action", "press")
key = msg.get("key", "")
modifiers = msg.get("modifiers", [])
# Translate key name
xkey = _KEY_MAP.get(key, key)
# Skip standalone modifier key events
if xkey in ("ctrl", "shift", "alt", "super"):
return
# Build modifier prefix
mod_parts = []
for m in modifiers:
xmod = _KEY_MAP.get(m, m.lower())
if xmod not in ("ctrl", "shift", "alt", "super"):
continue
mod_parts.append(xmod)
if mod_parts:
xkey = "+".join(mod_parts) + "+" + xkey
wid = str(self.window_id)
if action == "press":
await self._xdotool("key", "--window", wid, xkey, env=env)
# release events handled implicitly by xdotool key
async def _xdotool(self, *args, env=None):
proc = await asyncio.create_subprocess_exec(
XDOTOOL_BIN, *args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
await asyncio.wait_for(proc.communicate(), timeout=5)
# ---- Lifecycle ----
def kill(self):
if self._capture_task and not self._capture_task.done():
self._capture_task.cancel()
if self.alive:
try:
self.process.terminate()
except Exception:
pass
def to_dict(self) -> dict:
return {
"app_id": self.app_id,
"command": self.command,
"title": self.title,
"alive": self.alive,
"pid": self.process.pid if self.process else None,
"window_id": self.window_id,
"streaming": self._streaming,
"viewers": len(self._websockets),
"created_at": self.created_at,
}
# ---------------------------------------------------------------------------
# Display Manager — manages Xvfb displays and GUI apps
# ---------------------------------------------------------------------------
class DisplayManager:
"""Manages virtual X displays and GUI application lifecycle."""
def __init__(self):
self._xvfb_procs: dict[str, asyncio.subprocess.Process] = {}
self._display_nums: dict[str, int] = {}
self._apps: dict[str, dict[str, ManagedGuiApp]] = {}
self._next_display = 99
def _require_deps(self):
if not HAS_DISPLAY_DEPS:
missing = []
if not XVFB_BIN:
missing.append("Xvfb")
if not XDOTOOL_BIN:
missing.append("xdotool")
if not IMPORT_BIN:
missing.append("import (ImageMagick)")
raise RuntimeError(f"Missing display dependencies: {', '.join(missing)}")
async def get_or_create_display(self, username: str) -> int:
"""Start Xvfb if needed, return display number."""
self._require_deps()
if username in self._display_nums:
# Check if Xvfb is still running
xvfb = self._xvfb_procs.get(username)
if xvfb and xvfb.returncode is None:
return self._display_nums[username]
# Xvfb died — restart
log.warning("Xvfb died for %s, restarting", username)
display_num = self._next_display
self._next_display += 1
log.info("Starting Xvfb :%d for %s", display_num, username)
xvfb = await asyncio.create_subprocess_exec(
XVFB_BIN, f":{display_num}",
"-screen", "0", "1280x1024x24",
"-ac", # disable access control
"-nolisten", "tcp",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# Wait briefly for Xvfb to start
await asyncio.sleep(0.5)
if xvfb.returncode is not None:
raise RuntimeError(f"Xvfb failed to start (rc={xvfb.returncode})")
self._xvfb_procs[username] = xvfb
self._display_nums[username] = display_num
log.info("Xvfb :%d started for %s (pid %d)", display_num, username, xvfb.pid)
return display_num
async def launch_app(
self, username: str, command: str, title: str = "",
args: list[str] | None = None, target_fps: int = 10,
) -> ManagedGuiApp:
"""Launch a GUI application on the user's virtual display."""
# Validate command
if not _SAFE_CMD.match(command):
raise ValueError(f"Invalid command name: {command}")
# Find the binary
cmd_bin = _find_bin(command)
if not cmd_bin:
raise FileNotFoundError(f"Command not found: {command}")
display_num = await self.get_or_create_display(username)
env = {
**os.environ,
"DISPLAY": f":{display_num}",
"HOME": os.path.expanduser("~"),
}
# Ensure PATH
path = env.get("PATH", "")
for p in _SEARCH_PATHS:
if p not in path:
path = p + ":" + path
env["PATH"] = path
cmd = [cmd_bin] + (args or [])
log.info("Launching GUI app: %s (display :%d, user %s)", " ".join(cmd), display_num, username)
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
app_id = str(uuid.uuid4())[:8]
app = ManagedGuiApp(
app_id=app_id,
command=command,
title=title or command,
display_num=display_num,
process=process,
target_fps=target_fps,
)
app.start_capture()
self._apps.setdefault(username, {})[app_id] = app
log.info("GUI app %s launched: %s (pid %d)", app_id, command, process.pid)
return app
def get_app(self, username: str, app_id: str) -> Optional[ManagedGuiApp]:
return self._apps.get(username, {}).get(app_id)
def get_app_by_command(self, username: str, command: str) -> Optional[ManagedGuiApp]:
"""Find a running app by command name."""
for app in self._apps.get(username, {}).values():
if app.command == command and app.alive:
return app
return None
def close_app(self, username: str, app_id: str) -> bool:
user_apps = self._apps.get(username, {})
app = user_apps.pop(app_id, None)
if not app:
return False
app.kill()
log.info("Closed GUI app %s (%s) for %s", app_id, app.command, username)
return True
def list_apps(self, username: str) -> list[dict]:
user_apps = self._apps.get(username, {})
# Prune dead apps
dead = [aid for aid, a in user_apps.items() if not a.alive]
for aid in dead:
app = user_apps.pop(aid)
app.kill()
return [a.to_dict() for a in user_apps.values()]
async def autostart_apps(self, gui_apps: list[dict]):
"""Launch all apps with autostart=True. Called on service startup."""
if not HAS_DISPLAY_DEPS:
log.info("Display deps not available, skipping autostart")
return
username = getpass.getuser()
started = 0
for app_cfg in gui_apps:
if not app_cfg.get("autostart"):
continue
command = app_cfg.get("command", "")
if not command:
continue
# Skip if already running
if self.get_app_by_command(username, command):
log.debug("Autostart skip (already running): %s", command)
continue
try:
await self.launch_app(
username=username,
command=command,
title=app_cfg.get("name", command),
args=app_cfg.get("args", []),
target_fps=app_cfg.get("target_fps", 10),
)
started += 1
log.info("Autostarted: %s", command)
except Exception as e:
log.warning("Failed to autostart %s: %s", command, e)
if started:
log.info("Autostarted %d application(s)", started)
async def shutdown_all(self):
"""Kill all apps and Xvfb displays."""
for username, apps in self._apps.items():
for app in apps.values():
app.kill()
self._apps.clear()
for username, xvfb in self._xvfb_procs.items():
if xvfb.returncode is None:
try:
xvfb.terminate()
await asyncio.wait_for(xvfb.wait(), timeout=5)
except Exception:
xvfb.kill()
self._xvfb_procs.clear()
self._display_nums.clear()
log.info("All displays shut down")
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
display_manager = DisplayManager()