atlus/backend/routers/files.py
roberts f9743bb29a Initial commit — Atlus web desktop environment for SBCs
Full-stack implementation: FastAPI backend with PAM auth, WebSocket
stats/terminal, and vanilla JS frontend with tiling desktop shell.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 16:53:46 -05:00

267 lines
7 KiB
Python

"""Filesystem operations — full access, no sandbox."""
import grp
import mimetypes
import os
import pwd
import shutil
import stat
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query
from fastapi.responses import FileResponse
from pydantic import BaseModel
from backend.auth import get_current_user
router = APIRouter(prefix="/api/files", tags=["files"])
# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------
class FileInfo(BaseModel):
name: str
path: str
is_dir: bool
size: int
modified: float
permissions: str
owner: str
group: str
mime: Optional[str] = None
class RenameRequest(BaseModel):
old_path: str
new_name: str
class MoveRequest(BaseModel):
src: str
dest: str
class MkdirRequest(BaseModel):
path: str
class DeleteRequest(BaseModel):
path: str
class WriteFileRequest(BaseModel):
path: str
content: str
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _file_info(p: Path) -> FileInfo:
try:
st = p.stat()
except OSError:
st = p.lstat()
try:
owner = pwd.getpwuid(st.st_uid).pw_name
except KeyError:
owner = str(st.st_uid)
try:
group = grp.getgrgid(st.st_gid).gr_name
except KeyError:
group = str(st.st_gid)
mime = None
if p.is_file():
mime = mimetypes.guess_type(str(p))[0]
return FileInfo(
name=p.name,
path=str(p),
is_dir=p.is_dir(),
size=st.st_size,
modified=st.st_mtime,
permissions=stat.filemode(st.st_mode),
owner=owner,
group=group,
mime=mime,
)
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("/list")
async def list_dir(
path: str = Query("/", description="Directory to list"),
_user: str = Depends(get_current_user),
) -> list[FileInfo]:
p = Path(path)
if not p.is_dir():
raise HTTPException(404, f"Not a directory: {path}")
try:
entries = sorted(p.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower()))
except PermissionError:
raise HTTPException(403, f"Permission denied: {path}")
return [_file_info(e) for e in entries]
@router.get("/info")
async def file_info(
path: str = Query(...),
_user: str = Depends(get_current_user),
) -> FileInfo:
p = Path(path)
if not p.exists():
raise HTTPException(404, f"Not found: {path}")
return _file_info(p)
@router.get("/read")
async def read_file(
path: str = Query(...),
_user: str = Depends(get_current_user),
):
"""Read a text file's contents."""
p = Path(path)
if not p.is_file():
raise HTTPException(404, f"Not a file: {path}")
try:
content = p.read_text(errors="replace")
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(p), "content": content}
@router.get("/download")
async def download_file(
path: str = Query(...),
_user: str = Depends(get_current_user),
):
"""Download a file as binary."""
p = Path(path)
if not p.is_file():
raise HTTPException(404, f"Not a file: {path}")
return FileResponse(str(p), filename=p.name)
@router.post("/upload")
async def upload_file(
dest_dir: str = Query(...),
file: UploadFile = File(...),
_user: str = Depends(get_current_user),
):
dest = Path(dest_dir)
if not dest.is_dir():
raise HTTPException(404, f"Not a directory: {dest_dir}")
target = dest / file.filename
try:
with open(target, "wb") as f:
while chunk := await file.read(1024 * 1024):
f.write(chunk)
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(target), "size": target.stat().st_size}
@router.post("/mkdir")
async def mkdir(req: MkdirRequest, _user: str = Depends(get_current_user)):
p = Path(req.path)
try:
p.mkdir(parents=True, exist_ok=True)
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(p)}
@router.post("/rename")
async def rename(req: RenameRequest, _user: str = Depends(get_current_user)):
src = Path(req.old_path)
if not src.exists():
raise HTTPException(404, f"Not found: {req.old_path}")
dest = src.parent / req.new_name
try:
src.rename(dest)
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(dest)}
@router.post("/move")
async def move(req: MoveRequest, _user: str = Depends(get_current_user)):
src = Path(req.src)
dest = Path(req.dest)
if not src.exists():
raise HTTPException(404, f"Not found: {req.src}")
try:
shutil.move(str(src), str(dest))
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(dest)}
@router.post("/copy")
async def copy(req: MoveRequest, _user: str = Depends(get_current_user)):
src = Path(req.src)
dest = Path(req.dest)
if not src.exists():
raise HTTPException(404, f"Not found: {req.src}")
try:
if src.is_dir():
shutil.copytree(str(src), str(dest))
else:
shutil.copy2(str(src), str(dest))
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(dest)}
@router.post("/delete")
async def delete(req: DeleteRequest, _user: str = Depends(get_current_user)):
p = Path(req.path)
if not p.exists():
raise HTTPException(404, f"Not found: {req.path}")
try:
if p.is_dir():
shutil.rmtree(str(p))
else:
p.unlink()
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"deleted": str(p)}
@router.post("/write")
async def write_file(req: WriteFileRequest, _user: str = Depends(get_current_user)):
"""Write/overwrite a text file."""
p = Path(req.path)
try:
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(req.content)
except PermissionError:
raise HTTPException(403, "Permission denied")
return {"path": str(p), "size": p.stat().st_size}
@router.get("/mounts")
async def list_mounts(_user: str = Depends(get_current_user)):
"""List mounted filesystems."""
mounts = []
for part in __import__("psutil").disk_partitions(all=False):
usage = __import__("psutil").disk_usage(part.mountpoint)
mounts.append({
"device": part.device,
"mountpoint": part.mountpoint,
"fstype": part.fstype,
"total": usage.total,
"used": usage.used,
"percent": usage.percent,
})
return mounts