"""Network configuration via NetworkManager (nmcli) or direct ip/interfaces.""" import asyncio import ipaddress import logging import os import re import shutil from pathlib import Path from typing import Literal, Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from backend.auth import get_current_user router = APIRouter(prefix="/api/network", tags=["network"]) log = logging.getLogger("atlus.network") def _find_bin(name: str) -> Optional[str]: """Find a binary, even with systemd's minimal PATH.""" found = shutil.which(name) if found: return found for p in ("/usr/bin", "/usr/local/bin", "/usr/sbin", "/sbin", "/bin"): full = os.path.join(p, name) if os.path.isfile(full) and os.access(full, os.X_OK): return full return None _HAS_NMCLI = bool(_find_bin("nmcli")) _IP_BIN = _find_bin("ip") or "ip" # Debian/Armbian network config paths _INTERFACES_FILE = Path("/etc/network/interfaces") _INTERFACES_DIR = Path("/etc/network/interfaces.d") _RESOLV_CONF = Path("/etc/resolv.conf") # --------------------------------------------------------------------------- # Models # --------------------------------------------------------------------------- class IPv4Config(BaseModel): method: Literal["auto", "manual"] address: Optional[str] = None # CIDR notation e.g. "192.168.1.100/24" gateway: Optional[str] = None dns: Optional[list[str]] = None class WifiConnectRequest(BaseModel): ssid: str password: Optional[str] = None hidden: bool = False # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _require_nmcli(): """Raise 503 if NetworkManager is not available.""" if not _HAS_NMCLI: raise HTTPException(503, "NetworkManager (nmcli) not available on this system") def _safe_env(): """Build environment with full PATH for nmcli commands.""" env = {**os.environ, "LC_ALL": "C"} path = env.get("PATH", "") for p in ("/usr/bin", "/usr/sbin", "/bin", "/sbin"): if p not in path: path = p + ":" + path env["PATH"] = path return env async def _nmcli(*args: str, timeout: float = 30) -> str: """Run nmcli with C locale, return stdout. Raise HTTPException on failure.""" _require_nmcli() cmd = ["nmcli"] + list(args) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=_safe_env(), ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(504, "nmcli operation timed out") if proc.returncode != 0: err_msg = stderr.decode().strip() log.warning("nmcli failed (%d): %s", proc.returncode, err_msg) raise HTTPException(500, f"nmcli error: {err_msg}") return stdout.decode() def _parse_terse(output: str, fields: list[str]) -> list[dict]: """Parse nmcli terse (-t) colon-separated output into list of dicts. Handles escaped colons (\\:) in values. """ results = [] for line in output.strip().splitlines(): if not line: continue # Split on unescaped colons parts = re.split(r"(?= len(fields): results.append(dict(zip(fields, parts[:len(fields)]))) return results def _parse_keyvalue(output: str) -> dict[str, str]: """Parse nmcli key:value output (from `connection show `).""" props = {} for line in output.strip().splitlines(): if ":" not in line: continue key, _, value = line.partition(":") props[key.strip()] = value.strip() return props def _validate_ipv4_config(cfg: IPv4Config): """Validate IPv4 configuration fields.""" if cfg.method == "manual": if not cfg.address: raise HTTPException(422, "IP address is required for static configuration") if not cfg.gateway: raise HTTPException(422, "Gateway is required for static configuration") # Validate address (must be CIDR) try: ipaddress.ip_interface(cfg.address) except ValueError: raise HTTPException(422, f"Invalid IP address/prefix: {cfg.address}. Use CIDR notation (e.g. 192.168.1.100/24)") # Validate gateway try: ipaddress.ip_address(cfg.gateway) except ValueError: raise HTTPException(422, f"Invalid gateway address: {cfg.gateway}") # Validate DNS if cfg.dns: for dns in cfg.dns: try: ipaddress.ip_address(dns) except ValueError: raise HTTPException(422, f"Invalid DNS address: {dns}") # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.get("/status") async def network_status(_user: str = Depends(get_current_user)): """Get device list and active connections.""" _require_nmcli() # Device status dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE,CONNECTION", "device", "status") devices = _parse_terse(dev_out, ["device", "type", "state", "connection"]) # Filter out loopback and uninteresting devices devices = [d for d in devices if d["type"] in ("ethernet", "wifi", "bridge")] # Get IP addresses for connected devices for dev in devices: dev["ipv4"] = None if dev["state"] == "connected" and dev["connection"]: try: conn_out = await _nmcli("-t", "-f", "IP4.ADDRESS,IP4.GATEWAY,IP4.DNS", "connection", "show", dev["connection"]) props = _parse_keyvalue(conn_out) # nmcli may output IP4.ADDRESS[1], IP4.DNS[1], etc. dev["ipv4"] = { "address": props.get("IP4.ADDRESS[1]", ""), "gateway": props.get("IP4.GATEWAY", ""), "dns": [], } # Collect all DNS entries for k, v in props.items(): if k.startswith("IP4.DNS") and v: dev["ipv4"]["dns"].append(v) except HTTPException: pass # Connection details unavailable return {"devices": devices} @router.get("/connections") async def list_connections(_user: str = Depends(get_current_user)): """List all saved NetworkManager connections.""" out = await _nmcli("-t", "-f", "NAME,UUID,TYPE,DEVICE", "connection", "show") connections = _parse_terse(out, ["name", "uuid", "type", "device"]) return connections @router.get("/connection/{name}") async def connection_detail(name: str, _user: str = Depends(get_current_user)): """Get detailed info for a single connection.""" try: out = await _nmcli("connection", "show", name) except HTTPException: raise HTTPException(404, f"Connection not found: {name}") props = _parse_keyvalue(out) # Extract IPv4 config (what's configured) ipv4_method = props.get("ipv4.method", "auto") ipv4_addresses = props.get("ipv4.addresses", "") ipv4_gateway = props.get("ipv4.gateway", "") ipv4_dns = props.get("ipv4.dns", "") # Extract active IPv4 (what's currently in use) active_address = props.get("IP4.ADDRESS[1]", "") active_gateway = props.get("IP4.GATEWAY", "") active_dns = [] for k, v in props.items(): if k.startswith("IP4.DNS") and v: active_dns.append(v) return { "name": props.get("connection.id", name), "uuid": props.get("connection.uuid", ""), "type": props.get("connection.type", ""), "device": props.get("GENERAL.DEVICES", props.get("connection.interface-name", "")), "state": props.get("GENERAL.STATE", ""), "ipv4": { "method": ipv4_method, "addresses": ipv4_addresses, "gateway": ipv4_gateway, "dns": ipv4_dns, }, "active_ipv4": { "address": active_address, "gateway": active_gateway, "dns": active_dns, }, } @router.put("/connection/{name}/ipv4") async def configure_ipv4( name: str, cfg: IPv4Config, _user: str = Depends(get_current_user), ): """Set IPv4 configuration (DHCP or static) and apply.""" _validate_ipv4_config(cfg) if cfg.method == "auto": # Switch to DHCP — clear static settings await _nmcli("connection", "modify", name, "ipv4.method", "auto", "ipv4.addresses", "", "ipv4.gateway", "", "ipv4.dns", "") else: # Static configuration dns_str = " ".join(cfg.dns) if cfg.dns else "" modify_args = [ "connection", "modify", name, "ipv4.method", "manual", "ipv4.addresses", cfg.address, "ipv4.gateway", cfg.gateway, ] if dns_str: modify_args.extend(["ipv4.dns", dns_str]) else: modify_args.extend(["ipv4.dns", ""]) await _nmcli(*modify_args) # Apply the changes try: await _nmcli("connection", "up", name, timeout=15) except HTTPException as e: # Connection up may fail if we just changed the IP of our own interface # Return success anyway — the config was saved log.warning("connection up after config change: %s", e.detail) return {"success": True, "method": cfg.method} @router.get("/wifi/list") async def wifi_scan(_user: str = Depends(get_current_user)): """Scan for available WiFi networks.""" _require_nmcli() # Find the wifi device dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE", "device", "status") devices = _parse_terse(dev_out, ["device", "type", "state"]) wifi_dev = next((d for d in devices if d["type"] == "wifi"), None) if not wifi_dev: raise HTTPException(404, "No WiFi device found") # Scan try: out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE", "device", "wifi", "list", "--rescan", "yes", timeout=20) except HTTPException: # Rescan may fail; try without rescan out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE", "device", "wifi", "list") networks = _parse_terse(out, ["ssid", "bssid", "signal", "security", "in_use"]) # Filter empty SSIDs, deduplicate by SSID (keep strongest signal) seen = {} for net in networks: if not net["ssid"]: continue net["signal"] = int(net["signal"]) if net["signal"].isdigit() else 0 net["in_use"] = net["in_use"] == "*" existing = seen.get(net["ssid"]) if not existing or net["signal"] > existing["signal"]: seen[net["ssid"]] = net result = sorted(seen.values(), key=lambda x: x["signal"], reverse=True) return result @router.post("/wifi/connect") async def wifi_connect(req: WifiConnectRequest, _user: str = Depends(get_current_user)): """Connect to a WiFi network.""" _require_nmcli() if not req.ssid or len(req.ssid) > 64: raise HTTPException(422, "Invalid SSID") args = ["device", "wifi", "connect", req.ssid] if req.password: args.extend(["password", req.password]) if req.hidden: args.extend(["hidden", "yes"]) await _nmcli(*args, timeout=30) return {"success": True, "ssid": req.ssid} @router.post("/wifi/disconnect") async def wifi_disconnect(_user: str = Depends(get_current_user)): """Disconnect the WiFi device.""" _require_nmcli() # Find the wifi device name dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE", "device", "status") devices = _parse_terse(dev_out, ["device", "type"]) wifi_dev = next((d for d in devices if d["type"] == "wifi"), None) if not wifi_dev: raise HTTPException(404, "No WiFi device found") await _nmcli("device", "disconnect", wifi_dev["device"]) return {"success": True} @router.delete("/connection/{name}") async def delete_connection(name: str, _user: str = Depends(get_current_user)): """Delete a saved connection.""" await _nmcli("connection", "delete", name) return {"success": True} @router.post("/connection/{name}/apply") async def apply_connection(name: str, _user: str = Depends(get_current_user)): """Reapply/reconnect a connection.""" await _nmcli("connection", "up", name, timeout=15) return {"success": True} # =========================================================================== # Fallback: direct `ip` + /etc/network/interfaces (no NetworkManager) # =========================================================================== _IFACE_NAME_RE = re.compile(r"^[a-zA-Z0-9_.-]+$") def _validate_iface_name(name: str): """Prevent injection via interface name.""" if not _IFACE_NAME_RE.match(name) or len(name) > 32: raise HTTPException(400, "Invalid interface name") async def _run_cmd(*args: str, timeout: float = 15) -> tuple[int, str, str]: """Run a command with safe env, return (rc, stdout, stderr).""" proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=_safe_env(), ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() return 1, "", "Command timed out" return proc.returncode, stdout.decode(), stderr.decode() async def _parse_ip_addr() -> list[dict]: """Parse `ip -j addr show` into a list of interface dicts.""" rc, stdout, _ = await _run_cmd(_IP_BIN, "-j", "addr", "show") if rc != 0 or not stdout.strip(): # Fallback: parse non-json output return await _parse_ip_addr_text() import json try: ifaces = json.loads(stdout) except json.JSONDecodeError: return await _parse_ip_addr_text() results = [] for iface in ifaces: name = iface.get("ifname", "") if name == "lo": continue state = iface.get("operstate", "UNKNOWN").upper() flags = iface.get("flags", []) mac = iface.get("address", "") ipv4 = "" ipv6 = "" gateway = "" for ai in iface.get("addr_info", []): if ai.get("family") == "inet" and not ipv4: ipv4 = f"{ai['local']}/{ai.get('prefixlen', 24)}" elif ai.get("family") == "inet6" and not ipv6: if not ai.get("local", "").startswith("fe80"): ipv6 = ai.get("local", "") results.append({ "name": name, "state": state, "up": state == "UP" or "UP" in flags, "mac": mac, "ipv4": ipv4, "ipv6": ipv6, }) # Get default gateway rc2, gw_out, _ = await _run_cmd(_IP_BIN, "-j", "route", "show", "default") if rc2 == 0 and gw_out.strip(): try: routes = json.loads(gw_out) for route in routes: gw = route.get("gateway", "") dev = route.get("dev", "") for iface in results: if iface["name"] == dev: iface["gateway"] = gw break except json.JSONDecodeError: pass return results async def _parse_ip_addr_text() -> list[dict]: """Fallback parser for `ip addr show` plain text output.""" rc, stdout, _ = await _run_cmd(_IP_BIN, "addr", "show") if rc != 0: return [] results = [] current = None for line in stdout.splitlines(): # New interface line: "2: eth0: ..." m = re.match(r"^\d+:\s+(\S+?):\s+<([^>]*)>", line) if m: if current: results.append(current) name = m.group(1).rstrip(":") flags = m.group(2) if name == "lo": current = None continue current = { "name": name, "state": "UP" if "UP" in flags else "DOWN", "up": "UP" in flags, "mac": "", "ipv4": "", "ipv6": "", "gateway": "", } continue if not current: continue # MAC address: " link/ether aa:bb:cc:dd:ee:ff ..." m = re.match(r"\s+link/ether\s+(\S+)", line) if m: current["mac"] = m.group(1) # IPv4: " inet 192.168.1.100/24 ..." m = re.match(r"\s+inet\s+(\S+)", line) if m and not current["ipv4"]: current["ipv4"] = m.group(1) # IPv6: " inet6 2001:db8::1/64 ..." m = re.match(r"\s+inet6\s+(\S+)", line) if m and not current["ipv6"]: addr = m.group(1).split("/")[0] if not addr.startswith("fe80"): current["ipv6"] = addr if current: results.append(current) # Get default gateway rc2, gw_out, _ = await _run_cmd(_IP_BIN, "route", "show", "default") if rc2 == 0: for line in gw_out.splitlines(): m = re.match(r"default via (\S+) dev (\S+)", line) if m: for iface in results: if iface["name"] == m.group(2): iface["gateway"] = m.group(1) break return results def _parse_interfaces_file() -> dict[str, dict]: """Parse /etc/network/interfaces into per-iface config dicts.""" configs = {} # Also include files from interfaces.d content = "" if _INTERFACES_FILE.exists(): content = _INTERFACES_FILE.read_text() # Check for source lines for line in content.splitlines(): m = re.match(r"^source\s+(.+)", line) if m: pattern = m.group(1).strip() from glob import glob as gglob for path in gglob(pattern): try: content += "\n" + Path(path).read_text() except OSError: pass # Also read interfaces.d/*.cfg and interfaces.d/* files if _INTERFACES_DIR.is_dir(): for f in sorted(_INTERFACES_DIR.iterdir()): if f.is_file(): try: content += "\n" + f.read_text() except OSError: pass current_iface = None for line in content.splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#"): continue # "auto eth0" or "allow-hotplug eth0" m = re.match(r"^(auto|allow-hotplug)\s+(\S+)", stripped) if m: iface_name = m.group(2) if iface_name not in configs: configs[iface_name] = {"auto": False, "method": "dhcp"} if m.group(1) == "auto": configs[iface_name]["auto"] = True continue # "iface eth0 inet static" m = re.match(r"^iface\s+(\S+)\s+inet\s+(\S+)", stripped) if m: iface_name = m.group(1) method = m.group(2) # static, dhcp, manual, loopback if iface_name not in configs: configs[iface_name] = {"auto": False} configs[iface_name]["method"] = method current_iface = iface_name continue # Indented config lines under an iface block if current_iface and line[0] in (" ", "\t"): m = re.match(r"\s+(address|netmask|gateway|dns-nameservers|network|broadcast)\s+(.+)", stripped) if m: key = m.group(1) val = m.group(2).strip() configs[current_iface][key] = val else: current_iface = None return configs def _write_interface_config(iface: str, method: str, address: str = "", gateway: str = "", dns: list[str] = None): """Write or update an interface block in /etc/network/interfaces.d/{iface}. Uses interfaces.d to avoid clobbering the main file. """ _INTERFACES_DIR.mkdir(parents=True, exist_ok=True) if method == "dhcp": content = f"""# Managed by Atlus auto {iface} iface {iface} inet dhcp """ else: # Parse CIDR to address + netmask try: net_if = ipaddress.ip_interface(address) addr = str(net_if.ip) mask = str(net_if.netmask) except ValueError: raise HTTPException(422, f"Invalid address: {address}") lines = [ f"# Managed by Atlus", f"auto {iface}", f"iface {iface} inet static", f" address {addr}", f" netmask {mask}", ] if gateway: lines.append(f" gateway {gateway}") if dns: lines.append(f" dns-nameservers {' '.join(dns)}") content = "\n".join(lines) + "\n" config_file = _INTERFACES_DIR / iface config_file.write_text(content) log.info("Wrote interface config for %s: method=%s", iface, method) # Also update resolv.conf for static DNS if method == "static" and dns: _write_resolv_conf(dns) def _write_resolv_conf(dns: list[str]): """Write DNS servers to /etc/resolv.conf.""" try: lines = ["# Managed by Atlus\n"] for d in dns: lines.append(f"nameserver {d}\n") _RESOLV_CONF.write_text("".join(lines)) except OSError as e: log.warning("Failed to write resolv.conf: %s", e) # --- Fallback endpoints --- @router.get("/interfaces") async def list_interfaces(_user: str = Depends(get_current_user)): """List network interfaces using ip commands (no NetworkManager required).""" ifaces = await _parse_ip_addr() # Merge with /etc/network/interfaces config configs = _parse_interfaces_file() for iface in ifaces: cfg = configs.get(iface["name"], {}) iface["config_method"] = cfg.get("method", "unknown") iface["config_address"] = cfg.get("address", "") iface["config_netmask"] = cfg.get("netmask", "") iface["config_gateway"] = cfg.get("gateway", "") iface["config_dns"] = cfg.get("dns-nameservers", "") # Get DNS from resolv.conf dns_servers = [] if _RESOLV_CONF.exists(): try: for line in _RESOLV_CONF.read_text().splitlines(): m = re.match(r"^nameserver\s+(\S+)", line.strip()) if m: dns_servers.append(m.group(1)) except OSError: pass return { "interfaces": ifaces, "dns": dns_servers, "has_nmcli": _HAS_NMCLI, } class InterfaceConfig(BaseModel): method: Literal["dhcp", "static"] address: Optional[str] = None # CIDR e.g. "192.168.1.100/24" gateway: Optional[str] = None dns: Optional[list[str]] = None @router.put("/interfaces/{name}/config") async def configure_interface( name: str, cfg: InterfaceConfig, _user: str = Depends(get_current_user), ): """Configure an interface in /etc/network/interfaces.d/ and apply.""" _validate_iface_name(name) if cfg.method == "static": if not cfg.address: raise HTTPException(422, "IP address is required for static configuration") if not cfg.gateway: raise HTTPException(422, "Gateway is required for static configuration") _validate_ipv4_config(IPv4Config(method="manual", address=cfg.address, gateway=cfg.gateway, dns=cfg.dns)) # Write config file _write_interface_config( iface=name, method=cfg.method, address=cfg.address or "", gateway=cfg.gateway or "", dns=cfg.dns, ) # Apply: bring interface down then up # Use ifdown/ifup if available, else ip commands ifdown = _find_bin("ifdown") ifup = _find_bin("ifup") if ifdown and ifup: await _run_cmd(ifdown, name, timeout=10) rc, _, stderr = await _run_cmd(ifup, name, timeout=15) if rc != 0: log.warning("ifup %s failed: %s", name, stderr.strip()) # Try ip commands as fallback await _apply_with_ip(name, cfg) else: await _apply_with_ip(name, cfg) return {"success": True, "method": cfg.method} async def _apply_with_ip(name: str, cfg: InterfaceConfig): """Apply network config using ip commands directly.""" # Flush existing addresses await _run_cmd(_IP_BIN, "addr", "flush", "dev", name) if cfg.method == "static" and cfg.address: # Add address await _run_cmd(_IP_BIN, "addr", "add", cfg.address, "dev", name) # Bring up await _run_cmd(_IP_BIN, "link", "set", name, "up") # Set default route if cfg.gateway: await _run_cmd(_IP_BIN, "route", "del", "default", "dev", name) await _run_cmd(_IP_BIN, "route", "add", "default", "via", cfg.gateway, "dev", name) # Set DNS if cfg.dns: _write_resolv_conf(cfg.dns) else: # DHCP — try dhclient or udhcpc dhclient = _find_bin("dhclient") udhcpc = _find_bin("udhcpc") if dhclient: await _run_cmd(dhclient, "-r", name, timeout=5) await _run_cmd(dhclient, name, timeout=15) elif udhcpc: await _run_cmd(udhcpc, "-i", name, "-n", timeout=15) else: await _run_cmd(_IP_BIN, "link", "set", name, "up") log.warning("No DHCP client found — interface %s brought up without DHCP", name) @router.post("/interfaces/{name}/up") async def interface_up(name: str, _user: str = Depends(get_current_user)): """Bring an interface up.""" _validate_iface_name(name) rc, _, stderr = await _run_cmd(_IP_BIN, "link", "set", name, "up") if rc != 0: raise HTTPException(500, f"Failed to bring up {name}: {stderr.strip()}") return {"success": True} @router.post("/interfaces/{name}/down") async def interface_down(name: str, _user: str = Depends(get_current_user)): """Bring an interface down.""" _validate_iface_name(name) rc, _, stderr = await _run_cmd(_IP_BIN, "link", "set", name, "down") if rc != 0: raise HTTPException(500, f"Failed to bring down {name}: {stderr.strip()}") return {"success": True}