"""Network configuration via NetworkManager (nmcli).""" import asyncio import ipaddress import logging import os import re import shutil from typing import Literal, Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from backend.auth import get_current_user router = APIRouter(prefix="/api/network", tags=["network"]) log = logging.getLogger("atlus.network") _HAS_NMCLI = bool(shutil.which("nmcli")) # --------------------------------------------------------------------------- # Models # --------------------------------------------------------------------------- class IPv4Config(BaseModel): method: Literal["auto", "manual"] address: Optional[str] = None # CIDR notation e.g. "192.168.1.100/24" gateway: Optional[str] = None dns: Optional[list[str]] = None class WifiConnectRequest(BaseModel): ssid: str password: Optional[str] = None hidden: bool = False # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _require_nmcli(): """Raise 503 if NetworkManager is not available.""" if not _HAS_NMCLI: raise HTTPException(503, "NetworkManager (nmcli) not available on this system") async def _nmcli(*args: str, timeout: float = 30) -> str: """Run nmcli with C locale, return stdout. Raise HTTPException on failure.""" _require_nmcli() cmd = ["nmcli"] + list(args) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env={**os.environ, "LC_ALL": "C"}, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(504, "nmcli operation timed out") if proc.returncode != 0: err_msg = stderr.decode().strip() log.warning("nmcli failed (%d): %s", proc.returncode, err_msg) raise HTTPException(500, f"nmcli error: {err_msg}") return stdout.decode() def _parse_terse(output: str, fields: list[str]) -> list[dict]: """Parse nmcli terse (-t) colon-separated output into list of dicts. Handles escaped colons (\\:) in values. """ results = [] for line in output.strip().splitlines(): if not line: continue # Split on unescaped colons parts = re.split(r"(?= len(fields): results.append(dict(zip(fields, parts[:len(fields)]))) return results def _parse_keyvalue(output: str) -> dict[str, str]: """Parse nmcli key:value output (from `connection show `).""" props = {} for line in output.strip().splitlines(): if ":" not in line: continue key, _, value = line.partition(":") props[key.strip()] = value.strip() return props def _validate_ipv4_config(cfg: IPv4Config): """Validate IPv4 configuration fields.""" if cfg.method == "manual": if not cfg.address: raise HTTPException(422, "IP address is required for static configuration") if not cfg.gateway: raise HTTPException(422, "Gateway is required for static configuration") # Validate address (must be CIDR) try: ipaddress.ip_interface(cfg.address) except ValueError: raise HTTPException(422, f"Invalid IP address/prefix: {cfg.address}. Use CIDR notation (e.g. 192.168.1.100/24)") # Validate gateway try: ipaddress.ip_address(cfg.gateway) except ValueError: raise HTTPException(422, f"Invalid gateway address: {cfg.gateway}") # Validate DNS if cfg.dns: for dns in cfg.dns: try: ipaddress.ip_address(dns) except ValueError: raise HTTPException(422, f"Invalid DNS address: {dns}") # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.get("/status") async def network_status(_user: str = Depends(get_current_user)): """Get device list and active connections.""" _require_nmcli() # Device status dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE,CONNECTION", "device", "status") devices = _parse_terse(dev_out, ["device", "type", "state", "connection"]) # Filter out loopback and uninteresting devices devices = [d for d in devices if d["type"] in ("ethernet", "wifi", "bridge")] # Get IP addresses for connected devices for dev in devices: dev["ipv4"] = None if dev["state"] == "connected" and dev["connection"]: try: conn_out = await _nmcli("-t", "-f", "IP4.ADDRESS,IP4.GATEWAY,IP4.DNS", "connection", "show", dev["connection"]) props = _parse_keyvalue(conn_out) # nmcli may output IP4.ADDRESS[1], IP4.DNS[1], etc. dev["ipv4"] = { "address": props.get("IP4.ADDRESS[1]", ""), "gateway": props.get("IP4.GATEWAY", ""), "dns": [], } # Collect all DNS entries for k, v in props.items(): if k.startswith("IP4.DNS") and v: dev["ipv4"]["dns"].append(v) except HTTPException: pass # Connection details unavailable return {"devices": devices} @router.get("/connections") async def list_connections(_user: str = Depends(get_current_user)): """List all saved NetworkManager connections.""" out = await _nmcli("-t", "-f", "NAME,UUID,TYPE,DEVICE", "connection", "show") connections = _parse_terse(out, ["name", "uuid", "type", "device"]) return connections @router.get("/connection/{name}") async def connection_detail(name: str, _user: str = Depends(get_current_user)): """Get detailed info for a single connection.""" try: out = await _nmcli("connection", "show", name) except HTTPException: raise HTTPException(404, f"Connection not found: {name}") props = _parse_keyvalue(out) # Extract IPv4 config (what's configured) ipv4_method = props.get("ipv4.method", "auto") ipv4_addresses = props.get("ipv4.addresses", "") ipv4_gateway = props.get("ipv4.gateway", "") ipv4_dns = props.get("ipv4.dns", "") # Extract active IPv4 (what's currently in use) active_address = props.get("IP4.ADDRESS[1]", "") active_gateway = props.get("IP4.GATEWAY", "") active_dns = [] for k, v in props.items(): if k.startswith("IP4.DNS") and v: active_dns.append(v) return { "name": props.get("connection.id", name), "uuid": props.get("connection.uuid", ""), "type": props.get("connection.type", ""), "device": props.get("GENERAL.DEVICES", props.get("connection.interface-name", "")), "state": props.get("GENERAL.STATE", ""), "ipv4": { "method": ipv4_method, "addresses": ipv4_addresses, "gateway": ipv4_gateway, "dns": ipv4_dns, }, "active_ipv4": { "address": active_address, "gateway": active_gateway, "dns": active_dns, }, } @router.put("/connection/{name}/ipv4") async def configure_ipv4( name: str, cfg: IPv4Config, _user: str = Depends(get_current_user), ): """Set IPv4 configuration (DHCP or static) and apply.""" _validate_ipv4_config(cfg) if cfg.method == "auto": # Switch to DHCP — clear static settings await _nmcli("connection", "modify", name, "ipv4.method", "auto", "ipv4.addresses", "", "ipv4.gateway", "", "ipv4.dns", "") else: # Static configuration dns_str = " ".join(cfg.dns) if cfg.dns else "" modify_args = [ "connection", "modify", name, "ipv4.method", "manual", "ipv4.addresses", cfg.address, "ipv4.gateway", cfg.gateway, ] if dns_str: modify_args.extend(["ipv4.dns", dns_str]) else: modify_args.extend(["ipv4.dns", ""]) await _nmcli(*modify_args) # Apply the changes try: await _nmcli("connection", "up", name, timeout=15) except HTTPException as e: # Connection up may fail if we just changed the IP of our own interface # Return success anyway — the config was saved log.warning("connection up after config change: %s", e.detail) return {"success": True, "method": cfg.method} @router.get("/wifi/list") async def wifi_scan(_user: str = Depends(get_current_user)): """Scan for available WiFi networks.""" _require_nmcli() # Find the wifi device dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE", "device", "status") devices = _parse_terse(dev_out, ["device", "type", "state"]) wifi_dev = next((d for d in devices if d["type"] == "wifi"), None) if not wifi_dev: raise HTTPException(404, "No WiFi device found") # Scan try: out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE", "device", "wifi", "list", "--rescan", "yes", timeout=20) except HTTPException: # Rescan may fail; try without rescan out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE", "device", "wifi", "list") networks = _parse_terse(out, ["ssid", "bssid", "signal", "security", "in_use"]) # Filter empty SSIDs, deduplicate by SSID (keep strongest signal) seen = {} for net in networks: if not net["ssid"]: continue net["signal"] = int(net["signal"]) if net["signal"].isdigit() else 0 net["in_use"] = net["in_use"] == "*" existing = seen.get(net["ssid"]) if not existing or net["signal"] > existing["signal"]: seen[net["ssid"]] = net result = sorted(seen.values(), key=lambda x: x["signal"], reverse=True) return result @router.post("/wifi/connect") async def wifi_connect(req: WifiConnectRequest, _user: str = Depends(get_current_user)): """Connect to a WiFi network.""" _require_nmcli() if not req.ssid or len(req.ssid) > 64: raise HTTPException(422, "Invalid SSID") args = ["device", "wifi", "connect", req.ssid] if req.password: args.extend(["password", req.password]) if req.hidden: args.extend(["hidden", "yes"]) await _nmcli(*args, timeout=30) return {"success": True, "ssid": req.ssid} @router.post("/wifi/disconnect") async def wifi_disconnect(_user: str = Depends(get_current_user)): """Disconnect the WiFi device.""" _require_nmcli() # Find the wifi device name dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE", "device", "status") devices = _parse_terse(dev_out, ["device", "type"]) wifi_dev = next((d for d in devices if d["type"] == "wifi"), None) if not wifi_dev: raise HTTPException(404, "No WiFi device found") await _nmcli("device", "disconnect", wifi_dev["device"]) return {"success": True} @router.delete("/connection/{name}") async def delete_connection(name: str, _user: str = Depends(get_current_user)): """Delete a saved connection.""" await _nmcli("connection", "delete", name) return {"success": True} @router.post("/connection/{name}/apply") async def apply_connection(name: str, _user: str = Depends(get_current_user)): """Reapply/reconnect a connection.""" await _nmcli("connection", "up", name, timeout=15) return {"success": True}