"""Network configuration via NetworkManager (nmcli) or direct ip/interfaces.""" import asyncio import ipaddress import logging import os import re import shutil from pathlib import Path from typing import Literal, Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from backend.auth import get_current_user router = APIRouter(prefix="/api/network", tags=["network"]) log = logging.getLogger("atlus.network") def _find_bin(name: str) -> Optional[str]: """Find a binary, even with systemd's minimal PATH.""" found = shutil.which(name) if found: return found for p in ("/usr/bin", "/usr/local/bin", "/usr/sbin", "/sbin", "/bin"): full = os.path.join(p, name) if os.path.isfile(full) and os.access(full, os.X_OK): return full return None _HAS_NMCLI = bool(_find_bin("nmcli")) _IP_BIN = _find_bin("ip") or "ip" # Debian/Armbian network config paths _INTERFACES_FILE = Path("/etc/network/interfaces") _INTERFACES_DIR = Path("/etc/network/interfaces.d") _RESOLV_CONF = Path("/etc/resolv.conf") _WPA_SUPPLICANT_DIR = Path("/etc/wpa_supplicant") _IW_BIN = _find_bin("iw") _WPA_CLI_BIN = _find_bin("wpa_cli") # --------------------------------------------------------------------------- # Models # --------------------------------------------------------------------------- class IPv4Config(BaseModel): method: Literal["auto", "manual"] address: Optional[str] = None # CIDR notation e.g. "192.168.1.100/24" gateway: Optional[str] = None dns: Optional[list[str]] = None class WifiConnectRequest(BaseModel): ssid: str password: Optional[str] = None hidden: bool = False # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _require_nmcli(): """Raise 503 if NetworkManager is not available.""" if not _HAS_NMCLI: raise HTTPException(503, "NetworkManager (nmcli) not available on this system") def _safe_env(): """Build environment with full PATH for nmcli commands.""" env = {**os.environ, "LC_ALL": "C"} path = env.get("PATH", "") for p in ("/usr/bin", "/usr/sbin", "/bin", "/sbin"): if p not in path: path = p + ":" + path env["PATH"] = path return env async def _nmcli(*args: str, timeout: float = 30) -> str: """Run nmcli with C locale, return stdout. Raise HTTPException on failure.""" _require_nmcli() cmd = ["nmcli"] + list(args) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=_safe_env(), ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() raise HTTPException(504, "nmcli operation timed out") if proc.returncode != 0: err_msg = stderr.decode().strip() log.warning("nmcli failed (%d): %s", proc.returncode, err_msg) raise HTTPException(500, f"nmcli error: {err_msg}") return stdout.decode() def _parse_terse(output: str, fields: list[str]) -> list[dict]: """Parse nmcli terse (-t) colon-separated output into list of dicts. Handles escaped colons (\\:) in values. """ results = [] for line in output.strip().splitlines(): if not line: continue # Split on unescaped colons parts = re.split(r"(?= len(fields): results.append(dict(zip(fields, parts[:len(fields)]))) return results def _parse_keyvalue(output: str) -> dict[str, str]: """Parse nmcli key:value output (from `connection show `).""" props = {} for line in output.strip().splitlines(): if ":" not in line: continue key, _, value = line.partition(":") props[key.strip()] = value.strip() return props def _validate_ipv4_config(cfg: IPv4Config): """Validate IPv4 configuration fields.""" if cfg.method == "manual": if not cfg.address: raise HTTPException(422, "IP address is required for static configuration") if not cfg.gateway: raise HTTPException(422, "Gateway is required for static configuration") # Validate address (must be CIDR) try: ipaddress.ip_interface(cfg.address) except ValueError: raise HTTPException(422, f"Invalid IP address/prefix: {cfg.address}. Use CIDR notation (e.g. 192.168.1.100/24)") # Validate gateway try: ipaddress.ip_address(cfg.gateway) except ValueError: raise HTTPException(422, f"Invalid gateway address: {cfg.gateway}") # Validate DNS if cfg.dns: for dns in cfg.dns: try: ipaddress.ip_address(dns) except ValueError: raise HTTPException(422, f"Invalid DNS address: {dns}") # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.get("/status") async def network_status(_user: str = Depends(get_current_user)): """Get device list and active connections.""" _require_nmcli() # Device status dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE,CONNECTION", "device", "status") devices = _parse_terse(dev_out, ["device", "type", "state", "connection"]) # Filter out loopback and uninteresting devices devices = [d for d in devices if d["type"] in ("ethernet", "wifi", "bridge")] # Get IP addresses for connected devices for dev in devices: dev["ipv4"] = None if dev["state"] == "connected" and dev["connection"]: try: conn_out = await _nmcli("-t", "-f", "IP4.ADDRESS,IP4.GATEWAY,IP4.DNS", "connection", "show", dev["connection"]) props = _parse_keyvalue(conn_out) # nmcli may output IP4.ADDRESS[1], IP4.DNS[1], etc. dev["ipv4"] = { "address": props.get("IP4.ADDRESS[1]", ""), "gateway": props.get("IP4.GATEWAY", ""), "dns": [], } # Collect all DNS entries for k, v in props.items(): if k.startswith("IP4.DNS") and v: dev["ipv4"]["dns"].append(v) except HTTPException: pass # Connection details unavailable return {"devices": devices} @router.get("/connections") async def list_connections(_user: str = Depends(get_current_user)): """List all saved NetworkManager connections.""" out = await _nmcli("-t", "-f", "NAME,UUID,TYPE,DEVICE", "connection", "show") connections = _parse_terse(out, ["name", "uuid", "type", "device"]) return connections @router.get("/connection/{name}") async def connection_detail(name: str, _user: str = Depends(get_current_user)): """Get detailed info for a single connection.""" try: out = await _nmcli("connection", "show", name) except HTTPException: raise HTTPException(404, f"Connection not found: {name}") props = _parse_keyvalue(out) # Extract IPv4 config (what's configured) ipv4_method = props.get("ipv4.method", "auto") ipv4_addresses = props.get("ipv4.addresses", "") ipv4_gateway = props.get("ipv4.gateway", "") ipv4_dns = props.get("ipv4.dns", "") # Extract active IPv4 (what's currently in use) active_address = props.get("IP4.ADDRESS[1]", "") active_gateway = props.get("IP4.GATEWAY", "") active_dns = [] for k, v in props.items(): if k.startswith("IP4.DNS") and v: active_dns.append(v) return { "name": props.get("connection.id", name), "uuid": props.get("connection.uuid", ""), "type": props.get("connection.type", ""), "device": props.get("GENERAL.DEVICES", props.get("connection.interface-name", "")), "state": props.get("GENERAL.STATE", ""), "ipv4": { "method": ipv4_method, "addresses": ipv4_addresses, "gateway": ipv4_gateway, "dns": ipv4_dns, }, "active_ipv4": { "address": active_address, "gateway": active_gateway, "dns": active_dns, }, } @router.put("/connection/{name}/ipv4") async def configure_ipv4( name: str, cfg: IPv4Config, _user: str = Depends(get_current_user), ): """Set IPv4 configuration (DHCP or static) and apply.""" _validate_ipv4_config(cfg) if cfg.method == "auto": # Switch to DHCP — clear static settings await _nmcli("connection", "modify", name, "ipv4.method", "auto", "ipv4.addresses", "", "ipv4.gateway", "", "ipv4.dns", "") else: # Static configuration dns_str = " ".join(cfg.dns) if cfg.dns else "" modify_args = [ "connection", "modify", name, "ipv4.method", "manual", "ipv4.addresses", cfg.address, "ipv4.gateway", cfg.gateway, ] if dns_str: modify_args.extend(["ipv4.dns", dns_str]) else: modify_args.extend(["ipv4.dns", ""]) await _nmcli(*modify_args) # Apply the changes try: await _nmcli("connection", "up", name, timeout=15) except HTTPException as e: # Connection up may fail if we just changed the IP of our own interface # Return success anyway — the config was saved log.warning("connection up after config change: %s", e.detail) return {"success": True, "method": cfg.method} @router.get("/wifi/list") async def wifi_scan(_user: str = Depends(get_current_user)): """Scan for available WiFi networks.""" _require_nmcli() # Find the wifi device dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE", "device", "status") devices = _parse_terse(dev_out, ["device", "type", "state"]) wifi_dev = next((d for d in devices if d["type"] == "wifi"), None) if not wifi_dev: raise HTTPException(404, "No WiFi device found") # Scan try: out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE", "device", "wifi", "list", "--rescan", "yes", timeout=20) except HTTPException: # Rescan may fail; try without rescan out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE", "device", "wifi", "list") networks = _parse_terse(out, ["ssid", "bssid", "signal", "security", "in_use"]) # Filter empty SSIDs, deduplicate by SSID (keep strongest signal) seen = {} for net in networks: if not net["ssid"]: continue net["signal"] = int(net["signal"]) if net["signal"].isdigit() else 0 net["in_use"] = net["in_use"] == "*" existing = seen.get(net["ssid"]) if not existing or net["signal"] > existing["signal"]: seen[net["ssid"]] = net result = sorted(seen.values(), key=lambda x: x["signal"], reverse=True) return result @router.post("/wifi/connect") async def wifi_connect(req: WifiConnectRequest, _user: str = Depends(get_current_user)): """Connect to a WiFi network.""" _require_nmcli() if not req.ssid or len(req.ssid) > 64: raise HTTPException(422, "Invalid SSID") args = ["device", "wifi", "connect", req.ssid] if req.password: args.extend(["password", req.password]) if req.hidden: args.extend(["hidden", "yes"]) await _nmcli(*args, timeout=30) return {"success": True, "ssid": req.ssid} @router.post("/wifi/disconnect") async def wifi_disconnect(_user: str = Depends(get_current_user)): """Disconnect the WiFi device.""" _require_nmcli() # Find the wifi device name dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE", "device", "status") devices = _parse_terse(dev_out, ["device", "type"]) wifi_dev = next((d for d in devices if d["type"] == "wifi"), None) if not wifi_dev: raise HTTPException(404, "No WiFi device found") await _nmcli("device", "disconnect", wifi_dev["device"]) return {"success": True} @router.delete("/connection/{name}") async def delete_connection(name: str, _user: str = Depends(get_current_user)): """Delete a saved connection.""" await _nmcli("connection", "delete", name) return {"success": True} @router.post("/connection/{name}/apply") async def apply_connection(name: str, _user: str = Depends(get_current_user)): """Reapply/reconnect a connection.""" await _nmcli("connection", "up", name, timeout=15) return {"success": True} # =========================================================================== # Fallback: direct `ip` + /etc/network/interfaces (no NetworkManager) # =========================================================================== _IFACE_NAME_RE = re.compile(r"^[a-zA-Z0-9_.-]+$") def _validate_iface_name(name: str): """Prevent injection via interface name.""" if not _IFACE_NAME_RE.match(name) or len(name) > 32: raise HTTPException(400, "Invalid interface name") async def _run_cmd(*args: str, timeout: float = 15) -> tuple[int, str, str]: """Run a command with safe env, return (rc, stdout, stderr).""" proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=_safe_env(), ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() return 1, "", "Command timed out" return proc.returncode, stdout.decode(), stderr.decode() async def _parse_ip_addr() -> list[dict]: """Parse `ip -j addr show` into a list of interface dicts.""" rc, stdout, _ = await _run_cmd(_IP_BIN, "-j", "addr", "show") if rc != 0 or not stdout.strip(): # Fallback: parse non-json output return await _parse_ip_addr_text() import json try: ifaces = json.loads(stdout) except json.JSONDecodeError: return await _parse_ip_addr_text() results = [] for iface in ifaces: name = iface.get("ifname", "") if name == "lo": continue state = iface.get("operstate", "UNKNOWN").upper() flags = iface.get("flags", []) mac = iface.get("address", "") ipv4 = "" ipv6 = "" gateway = "" for ai in iface.get("addr_info", []): if ai.get("family") == "inet" and not ipv4: ipv4 = f"{ai['local']}/{ai.get('prefixlen', 24)}" elif ai.get("family") == "inet6" and not ipv6: if not ai.get("local", "").startswith("fe80"): ipv6 = ai.get("local", "") # Detect wireless interfaces is_wireless = Path(f"/sys/class/net/{name}/wireless").exists() results.append({ "name": name, "state": state, "up": state == "UP" or "UP" in flags, "mac": mac, "ipv4": ipv4, "ipv6": ipv6, "wireless": is_wireless, }) # Get default gateway rc2, gw_out, _ = await _run_cmd(_IP_BIN, "-j", "route", "show", "default") if rc2 == 0 and gw_out.strip(): try: routes = json.loads(gw_out) for route in routes: gw = route.get("gateway", "") dev = route.get("dev", "") for iface in results: if iface["name"] == dev: iface["gateway"] = gw break except json.JSONDecodeError: pass return results async def _parse_ip_addr_text() -> list[dict]: """Fallback parser for `ip addr show` plain text output.""" rc, stdout, _ = await _run_cmd(_IP_BIN, "addr", "show") if rc != 0: return [] results = [] current = None for line in stdout.splitlines(): # New interface line: "2: eth0: ..." m = re.match(r"^\d+:\s+(\S+?):\s+<([^>]*)>", line) if m: if current: results.append(current) name = m.group(1).rstrip(":") flags = m.group(2) if name == "lo": current = None continue current = { "name": name, "state": "UP" if "UP" in flags else "DOWN", "up": "UP" in flags, "mac": "", "ipv4": "", "ipv6": "", "gateway": "", "wireless": Path(f"/sys/class/net/{name}/wireless").exists(), } continue if not current: continue # MAC address: " link/ether aa:bb:cc:dd:ee:ff ..." m = re.match(r"\s+link/ether\s+(\S+)", line) if m: current["mac"] = m.group(1) # IPv4: " inet 192.168.1.100/24 ..." m = re.match(r"\s+inet\s+(\S+)", line) if m and not current["ipv4"]: current["ipv4"] = m.group(1) # IPv6: " inet6 2001:db8::1/64 ..." m = re.match(r"\s+inet6\s+(\S+)", line) if m and not current["ipv6"]: addr = m.group(1).split("/")[0] if not addr.startswith("fe80"): current["ipv6"] = addr if current: results.append(current) # Get default gateway rc2, gw_out, _ = await _run_cmd(_IP_BIN, "route", "show", "default") if rc2 == 0: for line in gw_out.splitlines(): m = re.match(r"default via (\S+) dev (\S+)", line) if m: for iface in results: if iface["name"] == m.group(2): iface["gateway"] = m.group(1) break return results def _parse_interfaces_file() -> dict[str, dict]: """Parse /etc/network/interfaces into per-iface config dicts.""" configs = {} # Also include files from interfaces.d content = "" if _INTERFACES_FILE.exists(): content = _INTERFACES_FILE.read_text() # Check for source lines for line in content.splitlines(): m = re.match(r"^source\s+(.+)", line) if m: pattern = m.group(1).strip() from glob import glob as gglob for path in gglob(pattern): try: content += "\n" + Path(path).read_text() except OSError: pass # Also read interfaces.d/*.cfg and interfaces.d/* files if _INTERFACES_DIR.is_dir(): for f in sorted(_INTERFACES_DIR.iterdir()): if f.is_file(): try: content += "\n" + f.read_text() except OSError: pass current_iface = None for line in content.splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#"): continue # "auto eth0" or "allow-hotplug eth0" m = re.match(r"^(auto|allow-hotplug)\s+(\S+)", stripped) if m: iface_name = m.group(2) if iface_name not in configs: configs[iface_name] = {"auto": False, "method": "dhcp"} if m.group(1) == "auto": configs[iface_name]["auto"] = True continue # "iface eth0 inet static" m = re.match(r"^iface\s+(\S+)\s+inet\s+(\S+)", stripped) if m: iface_name = m.group(1) method = m.group(2) # static, dhcp, manual, loopback if iface_name not in configs: configs[iface_name] = {"auto": False} configs[iface_name]["method"] = method current_iface = iface_name continue # Indented config lines under an iface block if current_iface and line[0] in (" ", "\t"): m = re.match(r"\s+(address|netmask|gateway|dns-nameservers|network|broadcast)\s+(.+)", stripped) if m: key = m.group(1) val = m.group(2).strip() configs[current_iface][key] = val else: current_iface = None return configs def _write_interface_config(iface: str, method: str, address: str = "", gateway: str = "", dns: list[str] = None, ssid: str = "", wifi_password: str = ""): """Write or update an interface block in /etc/network/interfaces.d/{iface}. Uses interfaces.d to avoid clobbering the main file. For wireless interfaces, also writes wpa_supplicant config. """ _INTERFACES_DIR.mkdir(parents=True, exist_ok=True) is_wireless = Path(f"/sys/class/net/{iface}/wireless").exists() if method == "dhcp": lines = [ f"# Managed by Atlus", f"auto {iface}", f"iface {iface} inet dhcp", ] else: # Parse CIDR to address + netmask try: net_if = ipaddress.ip_interface(address) addr = str(net_if.ip) mask = str(net_if.netmask) except ValueError: raise HTTPException(422, f"Invalid address: {address}") lines = [ f"# Managed by Atlus", f"auto {iface}", f"iface {iface} inet static", f" address {addr}", f" netmask {mask}", ] if gateway: lines.append(f" gateway {gateway}") if dns: lines.append(f" dns-nameservers {' '.join(dns)}") # Add WiFi config if wireless and SSID provided if is_wireless and ssid: wpa_conf = _WPA_SUPPLICANT_DIR / f"{iface}.conf" _write_wpa_supplicant(iface, ssid, wifi_password) lines.append(f" wpa-ssid {ssid}") lines.append(f" wpa-conf {wpa_conf}") content = "\n".join(lines) + "\n" config_file = _INTERFACES_DIR / iface config_file.write_text(content) log.info("Wrote interface config for %s: method=%s ssid=%s", iface, method, ssid or "(none)") # Also update resolv.conf for static DNS if method == "static" and dns: _write_resolv_conf(dns) def _write_resolv_conf(dns: list[str]): """Write DNS servers to /etc/resolv.conf.""" try: lines = ["# Managed by Atlus\n"] for d in dns: lines.append(f"nameserver {d}\n") _RESOLV_CONF.write_text("".join(lines)) except OSError as e: log.warning("Failed to write resolv.conf: %s", e) # --- Fallback endpoints --- @router.get("/interfaces") async def list_interfaces(_user: str = Depends(get_current_user)): """List network interfaces using ip commands (no NetworkManager required).""" ifaces = await _parse_ip_addr() # Merge with /etc/network/interfaces config configs = _parse_interfaces_file() for iface in ifaces: cfg = configs.get(iface["name"], {}) iface["config_method"] = cfg.get("method", "unknown") iface["config_address"] = cfg.get("address", "") iface["config_netmask"] = cfg.get("netmask", "") iface["config_gateway"] = cfg.get("gateway", "") iface["config_dns"] = cfg.get("dns-nameservers", "") iface["config_ssid"] = cfg.get("wpa-ssid", "") # Get DNS from resolv.conf dns_servers = [] if _RESOLV_CONF.exists(): try: for line in _RESOLV_CONF.read_text().splitlines(): m = re.match(r"^nameserver\s+(\S+)", line.strip()) if m: dns_servers.append(m.group(1)) except OSError: pass return { "interfaces": ifaces, "dns": dns_servers, "has_nmcli": _HAS_NMCLI, } class InterfaceConfig(BaseModel): method: Literal["dhcp", "static"] address: Optional[str] = None # CIDR e.g. "192.168.1.100/24" gateway: Optional[str] = None dns: Optional[list[str]] = None ssid: Optional[str] = None # WiFi SSID wifi_password: Optional[str] = None # WiFi password (WPA/WPA2) @router.put("/interfaces/{name}/config") async def configure_interface( name: str, cfg: InterfaceConfig, _user: str = Depends(get_current_user), ): """Configure an interface in /etc/network/interfaces.d/ and apply.""" _validate_iface_name(name) if cfg.method == "static": if not cfg.address: raise HTTPException(422, "IP address is required for static configuration") if not cfg.gateway: raise HTTPException(422, "Gateway is required for static configuration") _validate_ipv4_config(IPv4Config(method="manual", address=cfg.address, gateway=cfg.gateway, dns=cfg.dns)) # Write config file _write_interface_config( iface=name, method=cfg.method, address=cfg.address or "", gateway=cfg.gateway or "", dns=cfg.dns, ssid=cfg.ssid or "", wifi_password=cfg.wifi_password or "", ) # Apply: bring interface down then up # Use ifdown/ifup if available, else ip commands ifdown = _find_bin("ifdown") ifup = _find_bin("ifup") if ifdown and ifup: await _run_cmd(ifdown, name, timeout=10) rc, _, stderr = await _run_cmd(ifup, name, timeout=15) if rc != 0: log.warning("ifup %s failed: %s", name, stderr.strip()) # Try ip commands as fallback await _apply_with_ip(name, cfg) else: await _apply_with_ip(name, cfg) return {"success": True, "method": cfg.method} async def _apply_with_ip(name: str, cfg: InterfaceConfig): """Apply network config using ip commands directly.""" # Flush existing addresses await _run_cmd(_IP_BIN, "addr", "flush", "dev", name) if cfg.method == "static" and cfg.address: # Add address await _run_cmd(_IP_BIN, "addr", "add", cfg.address, "dev", name) # Bring up await _run_cmd(_IP_BIN, "link", "set", name, "up") # Set default route if cfg.gateway: await _run_cmd(_IP_BIN, "route", "del", "default", "dev", name) await _run_cmd(_IP_BIN, "route", "add", "default", "via", cfg.gateway, "dev", name) # Set DNS if cfg.dns: _write_resolv_conf(cfg.dns) else: # DHCP — try dhclient or udhcpc dhclient = _find_bin("dhclient") udhcpc = _find_bin("udhcpc") if dhclient: await _run_cmd(dhclient, "-r", name, timeout=5) await _run_cmd(dhclient, name, timeout=15) elif udhcpc: await _run_cmd(udhcpc, "-i", name, "-n", timeout=15) else: await _run_cmd(_IP_BIN, "link", "set", name, "up") log.warning("No DHCP client found — interface %s brought up without DHCP", name) # --------------------------------------------------------------------------- # WiFi support (fallback — no NetworkManager) # --------------------------------------------------------------------------- def _write_wpa_supplicant(iface: str, ssid: str, password: str = ""): """Write a wpa_supplicant config file for a wireless interface.""" _WPA_SUPPLICANT_DIR.mkdir(parents=True, exist_ok=True) conf_file = _WPA_SUPPLICANT_DIR / f"{iface}.conf" if password: # WPA/WPA2 PSK content = f"""# Managed by Atlus ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev update_config=1 country=US network={{ ssid="{ssid}" psk="{password}" key_mgmt=WPA-PSK }} """ else: # Open network content = f"""# Managed by Atlus ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev update_config=1 country=US network={{ ssid="{ssid}" key_mgmt=NONE }} """ conf_file.write_text(content) conf_file.chmod(0o600) log.info("Wrote wpa_supplicant config for %s: ssid=%s", iface, ssid) @router.get("/wifi/scan/{name}") async def scan_wifi(name: str, _user: str = Depends(get_current_user)): """Scan for WiFi networks using iw or iwlist (no NetworkManager required).""" _validate_iface_name(name) # Check if interface is wireless if not Path(f"/sys/class/net/{name}/wireless").exists(): raise HTTPException(400, f"{name} is not a wireless interface") networks = [] # Try iw first if _IW_BIN: rc, stdout, _ = await _run_cmd(_IW_BIN, "dev", name, "scan", timeout=15) if rc == 0: networks = _parse_iw_scan(stdout) elif rc != 0: # May need to bring interface up first await _run_cmd(_IP_BIN, "link", "set", name, "up") await asyncio.sleep(1) rc, stdout, _ = await _run_cmd(_IW_BIN, "dev", name, "scan", timeout=15) if rc == 0: networks = _parse_iw_scan(stdout) # Fallback: iwlist if not networks: iwlist = _find_bin("iwlist") if iwlist: rc, stdout, _ = await _run_cmd(iwlist, name, "scan", timeout=15) if rc == 0: networks = _parse_iwlist_scan(stdout) return {"networks": networks} def _parse_iw_scan(output: str) -> list[dict]: """Parse `iw dev wlan0 scan` output into network list.""" networks = [] current = None for line in output.splitlines(): if line.startswith("BSS "): if current and current.get("ssid"): networks.append(current) bssid = re.match(r"BSS\s+(\S+)", line) current = {"bssid": bssid.group(1) if bssid else "", "ssid": "", "signal": 0, "security": "Open"} elif current is None: continue elif "SSID:" in line: m = re.search(r"SSID:\s*(.*)", line) if m: current["ssid"] = m.group(1).strip() elif "signal:" in line: m = re.search(r"signal:\s*(-?\d+)", line) if m: current["signal"] = int(m.group(1)) elif "WPA" in line or "RSN" in line: current["security"] = "WPA2" if "RSN" in line else "WPA" elif "WEP" in line: current["security"] = "WEP" if current and current.get("ssid"): networks.append(current) # Sort by signal strength (strongest first) networks.sort(key=lambda n: n.get("signal", -100), reverse=True) # Deduplicate by SSID seen = set() unique = [] for n in networks: if n["ssid"] not in seen: seen.add(n["ssid"]) unique.append(n) return unique[:30] def _parse_iwlist_scan(output: str) -> list[dict]: """Parse `iwlist wlan0 scan` output into network list.""" networks = [] current = None for line in output.splitlines(): line = line.strip() if "Cell " in line and "Address:" in line: if current and current.get("ssid"): networks.append(current) m = re.search(r"Address:\s*(\S+)", line) current = {"bssid": m.group(1) if m else "", "ssid": "", "signal": 0, "security": "Open"} elif current is None: continue elif line.startswith("ESSID:"): m = re.search(r'ESSID:"(.+)"', line) if m: current["ssid"] = m.group(1) elif "Signal level=" in line: m = re.search(r"Signal level[=:]?\s*(-?\d+)", line) if m: current["signal"] = int(m.group(1)) elif "Encryption key:on" in line: if current.get("security") == "Open": current["security"] = "WEP" elif "WPA2" in line or "IEEE 802.11i" in line: current["security"] = "WPA2" elif "WPA" in line: if current.get("security") != "WPA2": current["security"] = "WPA" if current and current.get("ssid"): networks.append(current) networks.sort(key=lambda n: n.get("signal", -100), reverse=True) seen = set() unique = [] for n in networks: if n["ssid"] not in seen: seen.add(n["ssid"]) unique.append(n) return unique[:30] @router.post("/interfaces/{name}/up") async def interface_up(name: str, _user: str = Depends(get_current_user)): """Bring an interface up.""" _validate_iface_name(name) rc, _, stderr = await _run_cmd(_IP_BIN, "link", "set", name, "up") if rc != 0: raise HTTPException(500, f"Failed to bring up {name}: {stderr.strip()}") return {"success": True} @router.post("/interfaces/{name}/down") async def interface_down(name: str, _user: str = Depends(get_current_user)): """Bring an interface down.""" _validate_iface_name(name) rc, _, stderr = await _run_cmd(_IP_BIN, "link", "set", name, "down") if rc != 0: raise HTTPException(500, f"Failed to bring down {name}: {stderr.strip()}") return {"success": True}