atlus/backend/routers/network.py
2026-03-14 22:41:00 -05:00

788 lines
26 KiB
Python

"""Network configuration via NetworkManager (nmcli) or direct ip/interfaces."""
import asyncio
import ipaddress
import logging
import os
import re
import shutil
from pathlib import Path
from typing import Literal, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from backend.auth import get_current_user
router = APIRouter(prefix="/api/network", tags=["network"])
log = logging.getLogger("atlus.network")
def _find_bin(name: str) -> Optional[str]:
"""Find a binary, even with systemd's minimal PATH."""
found = shutil.which(name)
if found:
return found
for p in ("/usr/bin", "/usr/local/bin", "/usr/sbin", "/sbin", "/bin"):
full = os.path.join(p, name)
if os.path.isfile(full) and os.access(full, os.X_OK):
return full
return None
_HAS_NMCLI = bool(_find_bin("nmcli"))
_IP_BIN = _find_bin("ip") or "ip"
# Debian/Armbian network config paths
_INTERFACES_FILE = Path("/etc/network/interfaces")
_INTERFACES_DIR = Path("/etc/network/interfaces.d")
_RESOLV_CONF = Path("/etc/resolv.conf")
# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------
class IPv4Config(BaseModel):
method: Literal["auto", "manual"]
address: Optional[str] = None # CIDR notation e.g. "192.168.1.100/24"
gateway: Optional[str] = None
dns: Optional[list[str]] = None
class WifiConnectRequest(BaseModel):
ssid: str
password: Optional[str] = None
hidden: bool = False
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _require_nmcli():
"""Raise 503 if NetworkManager is not available."""
if not _HAS_NMCLI:
raise HTTPException(503, "NetworkManager (nmcli) not available on this system")
def _safe_env():
"""Build environment with full PATH for nmcli commands."""
env = {**os.environ, "LC_ALL": "C"}
path = env.get("PATH", "")
for p in ("/usr/bin", "/usr/sbin", "/bin", "/sbin"):
if p not in path:
path = p + ":" + path
env["PATH"] = path
return env
async def _nmcli(*args: str, timeout: float = 30) -> str:
"""Run nmcli with C locale, return stdout. Raise HTTPException on failure."""
_require_nmcli()
cmd = ["nmcli"] + list(args)
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=_safe_env(),
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise HTTPException(504, "nmcli operation timed out")
if proc.returncode != 0:
err_msg = stderr.decode().strip()
log.warning("nmcli failed (%d): %s", proc.returncode, err_msg)
raise HTTPException(500, f"nmcli error: {err_msg}")
return stdout.decode()
def _parse_terse(output: str, fields: list[str]) -> list[dict]:
"""Parse nmcli terse (-t) colon-separated output into list of dicts.
Handles escaped colons (\\:) in values.
"""
results = []
for line in output.strip().splitlines():
if not line:
continue
# Split on unescaped colons
parts = re.split(r"(?<!\\):", line)
parts = [p.replace("\\:", ":") for p in parts]
if len(parts) >= len(fields):
results.append(dict(zip(fields, parts[:len(fields)])))
return results
def _parse_keyvalue(output: str) -> dict[str, str]:
"""Parse nmcli key:value output (from `connection show <name>`)."""
props = {}
for line in output.strip().splitlines():
if ":" not in line:
continue
key, _, value = line.partition(":")
props[key.strip()] = value.strip()
return props
def _validate_ipv4_config(cfg: IPv4Config):
"""Validate IPv4 configuration fields."""
if cfg.method == "manual":
if not cfg.address:
raise HTTPException(422, "IP address is required for static configuration")
if not cfg.gateway:
raise HTTPException(422, "Gateway is required for static configuration")
# Validate address (must be CIDR)
try:
ipaddress.ip_interface(cfg.address)
except ValueError:
raise HTTPException(422, f"Invalid IP address/prefix: {cfg.address}. Use CIDR notation (e.g. 192.168.1.100/24)")
# Validate gateway
try:
ipaddress.ip_address(cfg.gateway)
except ValueError:
raise HTTPException(422, f"Invalid gateway address: {cfg.gateway}")
# Validate DNS
if cfg.dns:
for dns in cfg.dns:
try:
ipaddress.ip_address(dns)
except ValueError:
raise HTTPException(422, f"Invalid DNS address: {dns}")
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("/status")
async def network_status(_user: str = Depends(get_current_user)):
"""Get device list and active connections."""
_require_nmcli()
# Device status
dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE,CONNECTION", "device", "status")
devices = _parse_terse(dev_out, ["device", "type", "state", "connection"])
# Filter out loopback and uninteresting devices
devices = [d for d in devices if d["type"] in ("ethernet", "wifi", "bridge")]
# Get IP addresses for connected devices
for dev in devices:
dev["ipv4"] = None
if dev["state"] == "connected" and dev["connection"]:
try:
conn_out = await _nmcli("-t", "-f", "IP4.ADDRESS,IP4.GATEWAY,IP4.DNS",
"connection", "show", dev["connection"])
props = _parse_keyvalue(conn_out)
# nmcli may output IP4.ADDRESS[1], IP4.DNS[1], etc.
dev["ipv4"] = {
"address": props.get("IP4.ADDRESS[1]", ""),
"gateway": props.get("IP4.GATEWAY", ""),
"dns": [],
}
# Collect all DNS entries
for k, v in props.items():
if k.startswith("IP4.DNS") and v:
dev["ipv4"]["dns"].append(v)
except HTTPException:
pass # Connection details unavailable
return {"devices": devices}
@router.get("/connections")
async def list_connections(_user: str = Depends(get_current_user)):
"""List all saved NetworkManager connections."""
out = await _nmcli("-t", "-f", "NAME,UUID,TYPE,DEVICE", "connection", "show")
connections = _parse_terse(out, ["name", "uuid", "type", "device"])
return connections
@router.get("/connection/{name}")
async def connection_detail(name: str, _user: str = Depends(get_current_user)):
"""Get detailed info for a single connection."""
try:
out = await _nmcli("connection", "show", name)
except HTTPException:
raise HTTPException(404, f"Connection not found: {name}")
props = _parse_keyvalue(out)
# Extract IPv4 config (what's configured)
ipv4_method = props.get("ipv4.method", "auto")
ipv4_addresses = props.get("ipv4.addresses", "")
ipv4_gateway = props.get("ipv4.gateway", "")
ipv4_dns = props.get("ipv4.dns", "")
# Extract active IPv4 (what's currently in use)
active_address = props.get("IP4.ADDRESS[1]", "")
active_gateway = props.get("IP4.GATEWAY", "")
active_dns = []
for k, v in props.items():
if k.startswith("IP4.DNS") and v:
active_dns.append(v)
return {
"name": props.get("connection.id", name),
"uuid": props.get("connection.uuid", ""),
"type": props.get("connection.type", ""),
"device": props.get("GENERAL.DEVICES", props.get("connection.interface-name", "")),
"state": props.get("GENERAL.STATE", ""),
"ipv4": {
"method": ipv4_method,
"addresses": ipv4_addresses,
"gateway": ipv4_gateway,
"dns": ipv4_dns,
},
"active_ipv4": {
"address": active_address,
"gateway": active_gateway,
"dns": active_dns,
},
}
@router.put("/connection/{name}/ipv4")
async def configure_ipv4(
name: str,
cfg: IPv4Config,
_user: str = Depends(get_current_user),
):
"""Set IPv4 configuration (DHCP or static) and apply."""
_validate_ipv4_config(cfg)
if cfg.method == "auto":
# Switch to DHCP — clear static settings
await _nmcli("connection", "modify", name,
"ipv4.method", "auto",
"ipv4.addresses", "",
"ipv4.gateway", "",
"ipv4.dns", "")
else:
# Static configuration
dns_str = " ".join(cfg.dns) if cfg.dns else ""
modify_args = [
"connection", "modify", name,
"ipv4.method", "manual",
"ipv4.addresses", cfg.address,
"ipv4.gateway", cfg.gateway,
]
if dns_str:
modify_args.extend(["ipv4.dns", dns_str])
else:
modify_args.extend(["ipv4.dns", ""])
await _nmcli(*modify_args)
# Apply the changes
try:
await _nmcli("connection", "up", name, timeout=15)
except HTTPException as e:
# Connection up may fail if we just changed the IP of our own interface
# Return success anyway — the config was saved
log.warning("connection up after config change: %s", e.detail)
return {"success": True, "method": cfg.method}
@router.get("/wifi/list")
async def wifi_scan(_user: str = Depends(get_current_user)):
"""Scan for available WiFi networks."""
_require_nmcli()
# Find the wifi device
dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE", "device", "status")
devices = _parse_terse(dev_out, ["device", "type", "state"])
wifi_dev = next((d for d in devices if d["type"] == "wifi"), None)
if not wifi_dev:
raise HTTPException(404, "No WiFi device found")
# Scan
try:
out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE",
"device", "wifi", "list", "--rescan", "yes",
timeout=20)
except HTTPException:
# Rescan may fail; try without rescan
out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE",
"device", "wifi", "list")
networks = _parse_terse(out, ["ssid", "bssid", "signal", "security", "in_use"])
# Filter empty SSIDs, deduplicate by SSID (keep strongest signal)
seen = {}
for net in networks:
if not net["ssid"]:
continue
net["signal"] = int(net["signal"]) if net["signal"].isdigit() else 0
net["in_use"] = net["in_use"] == "*"
existing = seen.get(net["ssid"])
if not existing or net["signal"] > existing["signal"]:
seen[net["ssid"]] = net
result = sorted(seen.values(), key=lambda x: x["signal"], reverse=True)
return result
@router.post("/wifi/connect")
async def wifi_connect(req: WifiConnectRequest, _user: str = Depends(get_current_user)):
"""Connect to a WiFi network."""
_require_nmcli()
if not req.ssid or len(req.ssid) > 64:
raise HTTPException(422, "Invalid SSID")
args = ["device", "wifi", "connect", req.ssid]
if req.password:
args.extend(["password", req.password])
if req.hidden:
args.extend(["hidden", "yes"])
await _nmcli(*args, timeout=30)
return {"success": True, "ssid": req.ssid}
@router.post("/wifi/disconnect")
async def wifi_disconnect(_user: str = Depends(get_current_user)):
"""Disconnect the WiFi device."""
_require_nmcli()
# Find the wifi device name
dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE", "device", "status")
devices = _parse_terse(dev_out, ["device", "type"])
wifi_dev = next((d for d in devices if d["type"] == "wifi"), None)
if not wifi_dev:
raise HTTPException(404, "No WiFi device found")
await _nmcli("device", "disconnect", wifi_dev["device"])
return {"success": True}
@router.delete("/connection/{name}")
async def delete_connection(name: str, _user: str = Depends(get_current_user)):
"""Delete a saved connection."""
await _nmcli("connection", "delete", name)
return {"success": True}
@router.post("/connection/{name}/apply")
async def apply_connection(name: str, _user: str = Depends(get_current_user)):
"""Reapply/reconnect a connection."""
await _nmcli("connection", "up", name, timeout=15)
return {"success": True}
# ===========================================================================
# Fallback: direct `ip` + /etc/network/interfaces (no NetworkManager)
# ===========================================================================
_IFACE_NAME_RE = re.compile(r"^[a-zA-Z0-9_.-]+$")
def _validate_iface_name(name: str):
"""Prevent injection via interface name."""
if not _IFACE_NAME_RE.match(name) or len(name) > 32:
raise HTTPException(400, "Invalid interface name")
async def _run_cmd(*args: str, timeout: float = 15) -> tuple[int, str, str]:
"""Run a command with safe env, return (rc, stdout, stderr)."""
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=_safe_env(),
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return 1, "", "Command timed out"
return proc.returncode, stdout.decode(), stderr.decode()
async def _parse_ip_addr() -> list[dict]:
"""Parse `ip -j addr show` into a list of interface dicts."""
rc, stdout, _ = await _run_cmd(_IP_BIN, "-j", "addr", "show")
if rc != 0 or not stdout.strip():
# Fallback: parse non-json output
return await _parse_ip_addr_text()
import json
try:
ifaces = json.loads(stdout)
except json.JSONDecodeError:
return await _parse_ip_addr_text()
results = []
for iface in ifaces:
name = iface.get("ifname", "")
if name == "lo":
continue
state = iface.get("operstate", "UNKNOWN").upper()
flags = iface.get("flags", [])
mac = iface.get("address", "")
ipv4 = ""
ipv6 = ""
gateway = ""
for ai in iface.get("addr_info", []):
if ai.get("family") == "inet" and not ipv4:
ipv4 = f"{ai['local']}/{ai.get('prefixlen', 24)}"
elif ai.get("family") == "inet6" and not ipv6:
if not ai.get("local", "").startswith("fe80"):
ipv6 = ai.get("local", "")
results.append({
"name": name,
"state": state,
"up": state == "UP" or "UP" in flags,
"mac": mac,
"ipv4": ipv4,
"ipv6": ipv6,
})
# Get default gateway
rc2, gw_out, _ = await _run_cmd(_IP_BIN, "-j", "route", "show", "default")
if rc2 == 0 and gw_out.strip():
try:
routes = json.loads(gw_out)
for route in routes:
gw = route.get("gateway", "")
dev = route.get("dev", "")
for iface in results:
if iface["name"] == dev:
iface["gateway"] = gw
break
except json.JSONDecodeError:
pass
return results
async def _parse_ip_addr_text() -> list[dict]:
"""Fallback parser for `ip addr show` plain text output."""
rc, stdout, _ = await _run_cmd(_IP_BIN, "addr", "show")
if rc != 0:
return []
results = []
current = None
for line in stdout.splitlines():
# New interface line: "2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> ..."
m = re.match(r"^\d+:\s+(\S+?):\s+<([^>]*)>", line)
if m:
if current:
results.append(current)
name = m.group(1).rstrip(":")
flags = m.group(2)
if name == "lo":
current = None
continue
current = {
"name": name,
"state": "UP" if "UP" in flags else "DOWN",
"up": "UP" in flags,
"mac": "",
"ipv4": "",
"ipv6": "",
"gateway": "",
}
continue
if not current:
continue
# MAC address: " link/ether aa:bb:cc:dd:ee:ff ..."
m = re.match(r"\s+link/ether\s+(\S+)", line)
if m:
current["mac"] = m.group(1)
# IPv4: " inet 192.168.1.100/24 ..."
m = re.match(r"\s+inet\s+(\S+)", line)
if m and not current["ipv4"]:
current["ipv4"] = m.group(1)
# IPv6: " inet6 2001:db8::1/64 ..."
m = re.match(r"\s+inet6\s+(\S+)", line)
if m and not current["ipv6"]:
addr = m.group(1).split("/")[0]
if not addr.startswith("fe80"):
current["ipv6"] = addr
if current:
results.append(current)
# Get default gateway
rc2, gw_out, _ = await _run_cmd(_IP_BIN, "route", "show", "default")
if rc2 == 0:
for line in gw_out.splitlines():
m = re.match(r"default via (\S+) dev (\S+)", line)
if m:
for iface in results:
if iface["name"] == m.group(2):
iface["gateway"] = m.group(1)
break
return results
def _parse_interfaces_file() -> dict[str, dict]:
"""Parse /etc/network/interfaces into per-iface config dicts."""
configs = {}
# Also include files from interfaces.d
content = ""
if _INTERFACES_FILE.exists():
content = _INTERFACES_FILE.read_text()
# Check for source lines
for line in content.splitlines():
m = re.match(r"^source\s+(.+)", line)
if m:
pattern = m.group(1).strip()
from glob import glob as gglob
for path in gglob(pattern):
try:
content += "\n" + Path(path).read_text()
except OSError:
pass
# Also read interfaces.d/*.cfg and interfaces.d/* files
if _INTERFACES_DIR.is_dir():
for f in sorted(_INTERFACES_DIR.iterdir()):
if f.is_file():
try:
content += "\n" + f.read_text()
except OSError:
pass
current_iface = None
for line in content.splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
# "auto eth0" or "allow-hotplug eth0"
m = re.match(r"^(auto|allow-hotplug)\s+(\S+)", stripped)
if m:
iface_name = m.group(2)
if iface_name not in configs:
configs[iface_name] = {"auto": False, "method": "dhcp"}
if m.group(1) == "auto":
configs[iface_name]["auto"] = True
continue
# "iface eth0 inet static"
m = re.match(r"^iface\s+(\S+)\s+inet\s+(\S+)", stripped)
if m:
iface_name = m.group(1)
method = m.group(2) # static, dhcp, manual, loopback
if iface_name not in configs:
configs[iface_name] = {"auto": False}
configs[iface_name]["method"] = method
current_iface = iface_name
continue
# Indented config lines under an iface block
if current_iface and line[0] in (" ", "\t"):
m = re.match(r"\s+(address|netmask|gateway|dns-nameservers|network|broadcast)\s+(.+)", stripped)
if m:
key = m.group(1)
val = m.group(2).strip()
configs[current_iface][key] = val
else:
current_iface = None
return configs
def _write_interface_config(iface: str, method: str, address: str = "",
gateway: str = "", dns: list[str] = None):
"""Write or update an interface block in /etc/network/interfaces.d/{iface}.
Uses interfaces.d to avoid clobbering the main file.
"""
_INTERFACES_DIR.mkdir(parents=True, exist_ok=True)
if method == "dhcp":
content = f"""# Managed by Atlus
auto {iface}
iface {iface} inet dhcp
"""
else:
# Parse CIDR to address + netmask
try:
net_if = ipaddress.ip_interface(address)
addr = str(net_if.ip)
mask = str(net_if.netmask)
except ValueError:
raise HTTPException(422, f"Invalid address: {address}")
lines = [
f"# Managed by Atlus",
f"auto {iface}",
f"iface {iface} inet static",
f" address {addr}",
f" netmask {mask}",
]
if gateway:
lines.append(f" gateway {gateway}")
if dns:
lines.append(f" dns-nameservers {' '.join(dns)}")
content = "\n".join(lines) + "\n"
config_file = _INTERFACES_DIR / iface
config_file.write_text(content)
log.info("Wrote interface config for %s: method=%s", iface, method)
# Also update resolv.conf for static DNS
if method == "static" and dns:
_write_resolv_conf(dns)
def _write_resolv_conf(dns: list[str]):
"""Write DNS servers to /etc/resolv.conf."""
try:
lines = ["# Managed by Atlus\n"]
for d in dns:
lines.append(f"nameserver {d}\n")
_RESOLV_CONF.write_text("".join(lines))
except OSError as e:
log.warning("Failed to write resolv.conf: %s", e)
# --- Fallback endpoints ---
@router.get("/interfaces")
async def list_interfaces(_user: str = Depends(get_current_user)):
"""List network interfaces using ip commands (no NetworkManager required)."""
ifaces = await _parse_ip_addr()
# Merge with /etc/network/interfaces config
configs = _parse_interfaces_file()
for iface in ifaces:
cfg = configs.get(iface["name"], {})
iface["config_method"] = cfg.get("method", "unknown")
iface["config_address"] = cfg.get("address", "")
iface["config_netmask"] = cfg.get("netmask", "")
iface["config_gateway"] = cfg.get("gateway", "")
iface["config_dns"] = cfg.get("dns-nameservers", "")
# Get DNS from resolv.conf
dns_servers = []
if _RESOLV_CONF.exists():
try:
for line in _RESOLV_CONF.read_text().splitlines():
m = re.match(r"^nameserver\s+(\S+)", line.strip())
if m:
dns_servers.append(m.group(1))
except OSError:
pass
return {
"interfaces": ifaces,
"dns": dns_servers,
"has_nmcli": _HAS_NMCLI,
}
class InterfaceConfig(BaseModel):
method: Literal["dhcp", "static"]
address: Optional[str] = None # CIDR e.g. "192.168.1.100/24"
gateway: Optional[str] = None
dns: Optional[list[str]] = None
@router.put("/interfaces/{name}/config")
async def configure_interface(
name: str,
cfg: InterfaceConfig,
_user: str = Depends(get_current_user),
):
"""Configure an interface in /etc/network/interfaces.d/ and apply."""
_validate_iface_name(name)
if cfg.method == "static":
if not cfg.address:
raise HTTPException(422, "IP address is required for static configuration")
if not cfg.gateway:
raise HTTPException(422, "Gateway is required for static configuration")
_validate_ipv4_config(IPv4Config(method="manual", address=cfg.address,
gateway=cfg.gateway, dns=cfg.dns))
# Write config file
_write_interface_config(
iface=name,
method=cfg.method,
address=cfg.address or "",
gateway=cfg.gateway or "",
dns=cfg.dns,
)
# Apply: bring interface down then up
# Use ifdown/ifup if available, else ip commands
ifdown = _find_bin("ifdown")
ifup = _find_bin("ifup")
if ifdown and ifup:
await _run_cmd(ifdown, name, timeout=10)
rc, _, stderr = await _run_cmd(ifup, name, timeout=15)
if rc != 0:
log.warning("ifup %s failed: %s", name, stderr.strip())
# Try ip commands as fallback
await _apply_with_ip(name, cfg)
else:
await _apply_with_ip(name, cfg)
return {"success": True, "method": cfg.method}
async def _apply_with_ip(name: str, cfg: InterfaceConfig):
"""Apply network config using ip commands directly."""
# Flush existing addresses
await _run_cmd(_IP_BIN, "addr", "flush", "dev", name)
if cfg.method == "static" and cfg.address:
# Add address
await _run_cmd(_IP_BIN, "addr", "add", cfg.address, "dev", name)
# Bring up
await _run_cmd(_IP_BIN, "link", "set", name, "up")
# Set default route
if cfg.gateway:
await _run_cmd(_IP_BIN, "route", "del", "default", "dev", name)
await _run_cmd(_IP_BIN, "route", "add", "default", "via", cfg.gateway, "dev", name)
# Set DNS
if cfg.dns:
_write_resolv_conf(cfg.dns)
else:
# DHCP — try dhclient or udhcpc
dhclient = _find_bin("dhclient")
udhcpc = _find_bin("udhcpc")
if dhclient:
await _run_cmd(dhclient, "-r", name, timeout=5)
await _run_cmd(dhclient, name, timeout=15)
elif udhcpc:
await _run_cmd(udhcpc, "-i", name, "-n", timeout=15)
else:
await _run_cmd(_IP_BIN, "link", "set", name, "up")
log.warning("No DHCP client found — interface %s brought up without DHCP", name)
@router.post("/interfaces/{name}/up")
async def interface_up(name: str, _user: str = Depends(get_current_user)):
"""Bring an interface up."""
_validate_iface_name(name)
rc, _, stderr = await _run_cmd(_IP_BIN, "link", "set", name, "up")
if rc != 0:
raise HTTPException(500, f"Failed to bring up {name}: {stderr.strip()}")
return {"success": True}
@router.post("/interfaces/{name}/down")
async def interface_down(name: str, _user: str = Depends(get_current_user)):
"""Bring an interface down."""
_validate_iface_name(name)
rc, _, stderr = await _run_cmd(_IP_BIN, "link", "set", name, "down")
if rc != 0:
raise HTTPException(500, f"Failed to bring down {name}: {stderr.strip()}")
return {"success": True}