atlus/backend/routers/packages.py
roberts c3e127421b Fix package manager: better error handling, debug endpoint, robust PATH
- Rewrote _apt helper as _run_cmd that never raises (returns rc/stdout/stderr)
- Handle stderr warnings from apt-cache gracefully (common on Armbian)
- Catch FileNotFoundError if binary missing despite PATH fix
- Show actual backend error message in frontend instead of generic "Search failed"
- Add /api/packages/debug endpoint for troubleshooting apt-cache issues
- Add logging throughout for server-side diagnostics

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 19:33:04 -05:00

270 lines
9.2 KiB
Python

"""Package management — apt / dpkg wrapper for Debian-based systems."""
import asyncio
import logging
import os
import re
import shutil
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from backend.auth import get_current_user
router = APIRouter(prefix="/api/packages", tags=["packages"])
log = logging.getLogger("atlus.packages")
# ---------------------------------------------------------------------------
# Guard
# ---------------------------------------------------------------------------
def _find_apt():
"""Find apt binary, checking common paths if PATH is restricted."""
for name in ("apt-cache", "apt"):
if shutil.which(name):
return True
# systemd services may have a minimal PATH — check directly
for p in ("/usr/bin/apt-cache", "/usr/bin/apt"):
if os.path.isfile(p) and os.access(p, os.X_OK):
return True
return False
_HAS_APT = _find_apt()
def _require_apt():
if not _HAS_APT:
raise HTTPException(503, "apt package manager not available on this system")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_PKG_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9.+\-:]+$")
def _validate_name(name: str) -> str:
name = name.strip()
if not _PKG_NAME_RE.match(name):
raise HTTPException(400, f"Invalid package name: {name}")
return name
def _safe_env():
"""Build environment with full PATH for apt/dpkg commands."""
env = {**os.environ, "LC_ALL": "C", "DEBIAN_FRONTEND": "noninteractive"}
# Ensure standard paths are included (systemd may use minimal PATH)
path = env.get("PATH", "")
for p in ("/usr/bin", "/usr/sbin", "/bin", "/sbin", "/usr/local/bin"):
if p not in path:
path = p + ":" + path
env["PATH"] = path
return env
async def _run_cmd(cmd: list[str], timeout: float = 30) -> tuple[int, str, str]:
"""Run a command, return (returncode, stdout, stderr). Never raises."""
_require_apt()
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=_safe_env(),
)
except FileNotFoundError as e:
log.error("Command not found: %s%s", cmd[0], e)
return 127, "", f"Command not found: {cmd[0]}"
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return 1, "", "Operation timed out"
return proc.returncode, stdout.decode(errors="replace"), stderr.decode(errors="replace")
# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------
class PackageAction(BaseModel):
name: str
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("/search")
async def search_packages(
q: str = Query(..., min_length=2, description="Search query"),
_user: str = Depends(get_current_user),
):
"""Search for packages matching a query."""
rc, stdout, stderr = await _run_cmd(["apt-cache", "search", q])
# apt-cache search returns 0 even with no results — stderr warnings are normal
if rc != 0:
log.warning("apt-cache search failed (rc=%d): %s", rc, stderr.strip())
raise HTTPException(500, f"Package search failed: {stderr.strip()[:200]}")
if stderr.strip():
log.debug("apt-cache search stderr (non-fatal): %s", stderr.strip()[:200])
results = []
for line in stdout.strip().splitlines():
if not line.strip():
continue
parts = line.split(" - ", 1)
if len(parts) == 2:
results.append({"name": parts[0].strip(), "summary": parts[1].strip()})
elif parts[0].strip():
# Some lines may not have " - " separator
results.append({"name": parts[0].strip(), "summary": ""})
if len(results) >= 100:
break
return results
@router.get("/info/{name}")
async def package_info(
name: str,
_user: str = Depends(get_current_user),
):
"""Get detailed information about a package."""
name = _validate_name(name)
# Get package info from apt-cache
rc, stdout, stderr = await _run_cmd(["apt-cache", "show", name])
if rc != 0:
log.warning("apt-cache show %s failed (rc=%d): %s", name, rc, stderr.strip())
raise HTTPException(404, f"Package not found: {name}")
# Parse key-value fields
info = {}
current_key = None
current_val = ""
for line in stdout.splitlines():
if line.startswith(" "):
# Continuation of previous field
current_val += "\n" + line.strip()
elif ": " in line:
if current_key:
info[current_key] = current_val
current_key, current_val = line.split(": ", 1)
elif line == "":
if current_key:
info[current_key] = current_val
current_key = None
current_val = ""
break # Only parse first stanza
if current_key:
info[current_key] = current_val
# Check installed status via dpkg-query
rc2, dpkg_out, _ = await _run_cmd(
["dpkg-query", "-W", "-f=${Status}\t${Version}\n", name]
)
installed_version = None
is_installed = False
if rc2 == 0 and dpkg_out.strip():
parts = dpkg_out.strip().split("\t")
if len(parts) >= 2 and "install ok installed" in parts[0]:
is_installed = True
installed_version = parts[1].strip()
return {
"name": info.get("Package", name),
"version": info.get("Version", ""),
"installed": is_installed,
"installed_version": installed_version,
"description": info.get("Description", info.get("Description-en", "")),
"size": info.get("Size", ""),
"installed_size": info.get("Installed-Size", ""),
"depends": info.get("Depends", ""),
"section": info.get("Section", ""),
"maintainer": info.get("Maintainer", ""),
"homepage": info.get("Homepage", ""),
"architecture": info.get("Architecture", ""),
}
@router.post("/install")
async def install_package(
req: PackageAction,
_user: str = Depends(get_current_user),
):
"""Install a package via apt-get."""
name = _validate_name(req.name)
rc, stdout, stderr = await _run_cmd(["apt-get", "install", "-y", name], timeout=300)
if rc != 0:
msg = stderr.strip() or stdout.strip()
log.warning("apt-get install %s failed (rc=%d): %s", name, rc, msg[:200])
raise HTTPException(500, f"Install failed: {msg[:300]}")
return {"success": True, "package": name, "output": stdout[-500:]}
@router.post("/remove")
async def remove_package(
req: PackageAction,
_user: str = Depends(get_current_user),
):
"""Remove a package via apt-get."""
name = _validate_name(req.name)
rc, stdout, stderr = await _run_cmd(["apt-get", "remove", "-y", name], timeout=120)
if rc != 0:
msg = stderr.strip() or stdout.strip()
log.warning("apt-get remove %s failed (rc=%d): %s", name, rc, msg[:200])
raise HTTPException(500, f"Remove failed: {msg[:300]}")
return {"success": True, "package": name, "output": stdout[-500:]}
@router.post("/update-cache")
async def update_cache(_user: str = Depends(get_current_user)):
"""Run apt-get update to refresh package lists."""
rc, stdout, stderr = await _run_cmd(["apt-get", "update"], timeout=120)
if rc != 0:
msg = stderr.strip() or stdout.strip()
log.warning("apt-get update failed (rc=%d): %s", rc, msg[:200])
raise HTTPException(500, f"Cache update failed: {msg[:300]}")
return {"success": True, "output": stdout[-500:]}
@router.get("/debug")
async def debug_apt(_user: str = Depends(get_current_user)):
"""Debug endpoint — test apt-cache availability."""
results = {
"has_apt": _HAS_APT,
"which_apt_cache": shutil.which("apt-cache"),
"which_apt": shutil.which("apt"),
"which_dpkg": shutil.which("dpkg-query"),
"path": os.environ.get("PATH", ""),
"safe_path": _safe_env().get("PATH", ""),
}
# Try running apt-cache directly
for cmd_path in ["/usr/bin/apt-cache", "apt-cache"]:
try:
proc = await asyncio.create_subprocess_exec(
cmd_path, "search", "bash",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=_safe_env(),
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10)
lines = stdout.decode(errors="replace").strip().splitlines()
results[f"test_{cmd_path}"] = {
"rc": proc.returncode,
"stdout_lines": len(lines),
"first_line": lines[0] if lines else "",
"stderr": stderr.decode(errors="replace").strip()[:200],
}
except Exception as e:
results[f"test_{cmd_path}"] = {"error": str(e)}
return results