atlus/backend/routers/files.py
roberts 342dc0f0cf Fix robustness issues across backend and frontend
- Add shutil.which guard to _run() in settings, asi_bridge routers
- Catch RuntimeError on WebSocket disconnect in services, asi_bridge
- Make file listing resilient to individual entry errors
- Fix keyboard double-fire on touch devices (touchstart + click)
- Update install.sh with correct Gitea repo URL
- Add six to requirements.txt (python-pam dependency)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 17:41:43 -05:00

273 lines
7.1 KiB
Python

"""Filesystem operations — full access, no sandbox."""
import grp
import mimetypes
import os
import pwd
import shutil
import stat
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query
from fastapi.responses import FileResponse
from pydantic import BaseModel
from backend.auth import get_current_user
router = APIRouter(prefix="/api/files", tags=["files"])
# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------
class FileInfo(BaseModel):
name: str
path: str
is_dir: bool
size: int
modified: float
permissions: str
owner: str
group: str
mime: Optional[str] = None
class RenameRequest(BaseModel):
old_path: str
new_name: str
class MoveRequest(BaseModel):
src: str
dest: str
class MkdirRequest(BaseModel):
path: str
class DeleteRequest(BaseModel):
path: str
class WriteFileRequest(BaseModel):
path: str
content: str
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _file_info(p: Path) -> FileInfo:
try:
st = p.stat()
except OSError:
st = p.lstat()
try:
owner = pwd.getpwuid(st.st_uid).pw_name
except KeyError:
owner = str(st.st_uid)
try:
group = grp.getgrgid(st.st_gid).gr_name
except KeyError:
group = str(st.st_gid)
mime = None
if p.is_file():
mime = mimetypes.guess_type(str(p))[0]
return FileInfo(
name=p.name,
path=str(p),
is_dir=p.is_dir(),
size=st.st_size,
modified=st.st_mtime,
permissions=stat.filemode(st.st_mode),
owner=owner,
group=group,
mime=mime,
)
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("/list")
async def list_dir(
path: str = Query("/", description="Directory to list"),
_user: str = Depends(get_current_user),
) -> list[FileInfo]:
p = Path(path)
if not p.is_dir():
raise HTTPException(404, f"Not a directory: {path}")
try:
entries = sorted(p.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower()))
except PermissionError:
raise HTTPException(403, f"Permission denied: {path}")
results = []
for e in entries:
try:
results.append(_file_info(e))
except (OSError, PermissionError):
continue
return results
@router.get("/info")
async def file_info(
path: str = Query(...),
_user: str = Depends(get_current_user),
) -> FileInfo:
p = Path(path)
if not p.exists():
raise HTTPException(404, f"Not found: {path}")
return _file_info(p)
@router.get("/read")
async def read_file(
path: str = Query(...),
_user: str = Depends(get_current_user),
):
"""Read a text file's contents."""
p = Path(path)
if not p.is_file():
raise HTTPException(404, f"Not a file: {path}")
try:
content = p.read_text(errors="replace")
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(p), "content": content}
@router.get("/download")
async def download_file(
path: str = Query(...),
_user: str = Depends(get_current_user),
):
"""Download a file as binary."""
p = Path(path)
if not p.is_file():
raise HTTPException(404, f"Not a file: {path}")
return FileResponse(str(p), filename=p.name)
@router.post("/upload")
async def upload_file(
dest_dir: str = Query(...),
file: UploadFile = File(...),
_user: str = Depends(get_current_user),
):
dest = Path(dest_dir)
if not dest.is_dir():
raise HTTPException(404, f"Not a directory: {dest_dir}")
target = dest / file.filename
try:
with open(target, "wb") as f:
while chunk := await file.read(1024 * 1024):
f.write(chunk)
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(target), "size": target.stat().st_size}
@router.post("/mkdir")
async def mkdir(req: MkdirRequest, _user: str = Depends(get_current_user)):
p = Path(req.path)
try:
p.mkdir(parents=True, exist_ok=True)
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(p)}
@router.post("/rename")
async def rename(req: RenameRequest, _user: str = Depends(get_current_user)):
src = Path(req.old_path)
if not src.exists():
raise HTTPException(404, f"Not found: {req.old_path}")
dest = src.parent / req.new_name
try:
src.rename(dest)
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(dest)}
@router.post("/move")
async def move(req: MoveRequest, _user: str = Depends(get_current_user)):
src = Path(req.src)
dest = Path(req.dest)
if not src.exists():
raise HTTPException(404, f"Not found: {req.src}")
try:
shutil.move(str(src), str(dest))
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(dest)}
@router.post("/copy")
async def copy(req: MoveRequest, _user: str = Depends(get_current_user)):
src = Path(req.src)
dest = Path(req.dest)
if not src.exists():
raise HTTPException(404, f"Not found: {req.src}")
try:
if src.is_dir():
shutil.copytree(str(src), str(dest))
else:
shutil.copy2(str(src), str(dest))
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(dest)}
@router.post("/delete")
async def delete(req: DeleteRequest, _user: str = Depends(get_current_user)):
p = Path(req.path)
if not p.exists():
raise HTTPException(404, f"Not found: {req.path}")
try:
if p.is_dir():
shutil.rmtree(str(p))
else:
p.unlink()
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"deleted": str(p)}
@router.post("/write")
async def write_file(req: WriteFileRequest, _user: str = Depends(get_current_user)):
"""Write/overwrite a text file."""
p = Path(req.path)
try:
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(req.content)
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(p), "size": p.stat().st_size}
@router.get("/mounts")
async def list_mounts(_user: str = Depends(get_current_user)):
"""List mounted filesystems."""
mounts = []
for part in __import__("psutil").disk_partitions(all=False):
usage = __import__("psutil").disk_usage(part.mountpoint)
mounts.append({
"device": part.device,
"mountpoint": part.mountpoint,
"fstype": part.fstype,
"total": usage.total,
"used": usage.used,
"percent": usage.percent,
})
return mounts