"""Package management — apt / dpkg wrapper for Debian-based systems.""" import asyncio import logging import os import re import shutil from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel from backend.auth import get_current_user router = APIRouter(prefix="/api/packages", tags=["packages"]) log = logging.getLogger("atlus.packages") # --------------------------------------------------------------------------- # Guard # --------------------------------------------------------------------------- def _find_apt(): """Find apt binary, checking common paths if PATH is restricted.""" for name in ("apt-cache", "apt"): if shutil.which(name): return True # systemd services may have a minimal PATH — check directly for p in ("/usr/bin/apt-cache", "/usr/bin/apt"): if os.path.isfile(p) and os.access(p, os.X_OK): return True return False _HAS_APT = _find_apt() def _require_apt(): if not _HAS_APT: raise HTTPException(503, "apt package manager not available on this system") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _PKG_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9.+\-:]+$") def _validate_name(name: str) -> str: name = name.strip() if not _PKG_NAME_RE.match(name): raise HTTPException(400, f"Invalid package name: {name}") return name def _safe_env(): """Build environment with full PATH for apt/dpkg commands.""" env = {**os.environ, "LC_ALL": "C", "DEBIAN_FRONTEND": "noninteractive"} # Ensure standard paths are included (systemd may use minimal PATH) path = env.get("PATH", "") for p in ("/usr/bin", "/usr/sbin", "/bin", "/sbin", "/usr/local/bin"): if p not in path: path = p + ":" + path env["PATH"] = path return env async def _run_cmd(cmd: list[str], timeout: float = 30) -> tuple[int, str, str]: """Run a command, return (returncode, stdout, stderr). Never raises.""" _require_apt() try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=_safe_env(), ) except FileNotFoundError as e: log.error("Command not found: %s — %s", cmd[0], e) return 127, "", f"Command not found: {cmd[0]}" try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() return 1, "", "Operation timed out" return proc.returncode, stdout.decode(errors="replace"), stderr.decode(errors="replace") # --------------------------------------------------------------------------- # Models # --------------------------------------------------------------------------- class PackageAction(BaseModel): name: str # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.get("/search") async def search_packages( q: str = Query(..., min_length=2, description="Search query"), _user: str = Depends(get_current_user), ): """Search for packages matching a query.""" rc, stdout, stderr = await _run_cmd(["apt-cache", "search", q]) # apt-cache search returns 0 even with no results — stderr warnings are normal if rc != 0: log.warning("apt-cache search failed (rc=%d): %s", rc, stderr.strip()) raise HTTPException(500, f"Package search failed: {stderr.strip()[:200]}") if stderr.strip(): log.debug("apt-cache search stderr (non-fatal): %s", stderr.strip()[:200]) results = [] for line in stdout.strip().splitlines(): if not line.strip(): continue parts = line.split(" - ", 1) if len(parts) == 2: results.append({"name": parts[0].strip(), "summary": parts[1].strip()}) elif parts[0].strip(): # Some lines may not have " - " separator results.append({"name": parts[0].strip(), "summary": ""}) if len(results) >= 100: break return results @router.get("/info/{name}") async def package_info( name: str, _user: str = Depends(get_current_user), ): """Get detailed information about a package.""" name = _validate_name(name) # Get package info from apt-cache rc, stdout, stderr = await _run_cmd(["apt-cache", "show", name]) if rc != 0: log.warning("apt-cache show %s failed (rc=%d): %s", name, rc, stderr.strip()) raise HTTPException(404, f"Package not found: {name}") # Parse key-value fields info = {} current_key = None current_val = "" for line in stdout.splitlines(): if line.startswith(" "): # Continuation of previous field current_val += "\n" + line.strip() elif ": " in line: if current_key: info[current_key] = current_val current_key, current_val = line.split(": ", 1) elif line == "": if current_key: info[current_key] = current_val current_key = None current_val = "" break # Only parse first stanza if current_key: info[current_key] = current_val # Check installed status via dpkg-query rc2, dpkg_out, _ = await _run_cmd( ["dpkg-query", "-W", "-f=${Status}\t${Version}\n", name] ) installed_version = None is_installed = False if rc2 == 0 and dpkg_out.strip(): parts = dpkg_out.strip().split("\t") if len(parts) >= 2 and "install ok installed" in parts[0]: is_installed = True installed_version = parts[1].strip() return { "name": info.get("Package", name), "version": info.get("Version", ""), "installed": is_installed, "installed_version": installed_version, "description": info.get("Description", info.get("Description-en", "")), "size": info.get("Size", ""), "installed_size": info.get("Installed-Size", ""), "depends": info.get("Depends", ""), "section": info.get("Section", ""), "maintainer": info.get("Maintainer", ""), "homepage": info.get("Homepage", ""), "architecture": info.get("Architecture", ""), } @router.post("/install") async def install_package( req: PackageAction, _user: str = Depends(get_current_user), ): """Install a package via apt-get.""" name = _validate_name(req.name) rc, stdout, stderr = await _run_cmd(["apt-get", "install", "-y", name], timeout=300) if rc != 0: msg = stderr.strip() or stdout.strip() log.warning("apt-get install %s failed (rc=%d): %s", name, rc, msg[:200]) raise HTTPException(500, f"Install failed: {msg[:300]}") return {"success": True, "package": name, "output": stdout[-500:]} @router.post("/remove") async def remove_package( req: PackageAction, _user: str = Depends(get_current_user), ): """Remove a package via apt-get.""" name = _validate_name(req.name) rc, stdout, stderr = await _run_cmd(["apt-get", "remove", "-y", name], timeout=120) if rc != 0: msg = stderr.strip() or stdout.strip() log.warning("apt-get remove %s failed (rc=%d): %s", name, rc, msg[:200]) raise HTTPException(500, f"Remove failed: {msg[:300]}") return {"success": True, "package": name, "output": stdout[-500:]} @router.post("/update-cache") async def update_cache(_user: str = Depends(get_current_user)): """Run apt-get update to refresh package lists.""" rc, stdout, stderr = await _run_cmd(["apt-get", "update"], timeout=120) if rc != 0: msg = stderr.strip() or stdout.strip() log.warning("apt-get update failed (rc=%d): %s", rc, msg[:200]) raise HTTPException(500, f"Cache update failed: {msg[:300]}") return {"success": True, "output": stdout[-500:]} @router.get("/debug") async def debug_apt(_user: str = Depends(get_current_user)): """Debug endpoint — test apt-cache availability.""" results = { "has_apt": _HAS_APT, "which_apt_cache": shutil.which("apt-cache"), "which_apt": shutil.which("apt"), "which_dpkg": shutil.which("dpkg-query"), "path": os.environ.get("PATH", ""), "safe_path": _safe_env().get("PATH", ""), } # Try running apt-cache directly for cmd_path in ["/usr/bin/apt-cache", "apt-cache"]: try: proc = await asyncio.create_subprocess_exec( cmd_path, "search", "bash", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=_safe_env(), ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10) lines = stdout.decode(errors="replace").strip().splitlines() results[f"test_{cmd_path}"] = { "rc": proc.returncode, "stdout_lines": len(lines), "first_line": lines[0] if lines else "", "stderr": stderr.decode(errors="replace").strip()[:200], } except Exception as e: results[f"test_{cmd_path}"] = {"error": str(e)} return results