systemd runs services with a minimal PATH that may not include /usr/bin or /usr/sbin. Add _safe_env() helpers that ensure standard paths are present, and expand apt-cache discovery to check common locations directly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
356 lines
12 KiB
Python
356 lines
12 KiB
Python
"""Network configuration via NetworkManager (nmcli)."""
|
|
|
|
import asyncio
|
|
import ipaddress
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
from typing import Literal, Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
from backend.auth import get_current_user
|
|
|
|
router = APIRouter(prefix="/api/network", tags=["network"])
|
|
log = logging.getLogger("atlus.network")
|
|
|
|
_HAS_NMCLI = bool(shutil.which("nmcli"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class IPv4Config(BaseModel):
|
|
method: Literal["auto", "manual"]
|
|
address: Optional[str] = None # CIDR notation e.g. "192.168.1.100/24"
|
|
gateway: Optional[str] = None
|
|
dns: Optional[list[str]] = None
|
|
|
|
|
|
class WifiConnectRequest(BaseModel):
|
|
ssid: str
|
|
password: Optional[str] = None
|
|
hidden: bool = False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _require_nmcli():
|
|
"""Raise 503 if NetworkManager is not available."""
|
|
if not _HAS_NMCLI:
|
|
raise HTTPException(503, "NetworkManager (nmcli) not available on this system")
|
|
|
|
|
|
def _safe_env():
|
|
"""Build environment with full PATH for nmcli commands."""
|
|
env = {**os.environ, "LC_ALL": "C"}
|
|
path = env.get("PATH", "")
|
|
for p in ("/usr/bin", "/usr/sbin", "/bin", "/sbin"):
|
|
if p not in path:
|
|
path = p + ":" + path
|
|
env["PATH"] = path
|
|
return env
|
|
|
|
|
|
async def _nmcli(*args: str, timeout: float = 30) -> str:
|
|
"""Run nmcli with C locale, return stdout. Raise HTTPException on failure."""
|
|
_require_nmcli()
|
|
cmd = ["nmcli"] + list(args)
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
env=_safe_env(),
|
|
)
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
raise HTTPException(504, "nmcli operation timed out")
|
|
|
|
if proc.returncode != 0:
|
|
err_msg = stderr.decode().strip()
|
|
log.warning("nmcli failed (%d): %s", proc.returncode, err_msg)
|
|
raise HTTPException(500, f"nmcli error: {err_msg}")
|
|
return stdout.decode()
|
|
|
|
|
|
def _parse_terse(output: str, fields: list[str]) -> list[dict]:
|
|
"""Parse nmcli terse (-t) colon-separated output into list of dicts.
|
|
|
|
Handles escaped colons (\\:) in values.
|
|
"""
|
|
results = []
|
|
for line in output.strip().splitlines():
|
|
if not line:
|
|
continue
|
|
# Split on unescaped colons
|
|
parts = re.split(r"(?<!\\):", line)
|
|
parts = [p.replace("\\:", ":") for p in parts]
|
|
if len(parts) >= len(fields):
|
|
results.append(dict(zip(fields, parts[:len(fields)])))
|
|
return results
|
|
|
|
|
|
def _parse_keyvalue(output: str) -> dict[str, str]:
|
|
"""Parse nmcli key:value output (from `connection show <name>`)."""
|
|
props = {}
|
|
for line in output.strip().splitlines():
|
|
if ":" not in line:
|
|
continue
|
|
key, _, value = line.partition(":")
|
|
props[key.strip()] = value.strip()
|
|
return props
|
|
|
|
|
|
def _validate_ipv4_config(cfg: IPv4Config):
|
|
"""Validate IPv4 configuration fields."""
|
|
if cfg.method == "manual":
|
|
if not cfg.address:
|
|
raise HTTPException(422, "IP address is required for static configuration")
|
|
if not cfg.gateway:
|
|
raise HTTPException(422, "Gateway is required for static configuration")
|
|
# Validate address (must be CIDR)
|
|
try:
|
|
ipaddress.ip_interface(cfg.address)
|
|
except ValueError:
|
|
raise HTTPException(422, f"Invalid IP address/prefix: {cfg.address}. Use CIDR notation (e.g. 192.168.1.100/24)")
|
|
# Validate gateway
|
|
try:
|
|
ipaddress.ip_address(cfg.gateway)
|
|
except ValueError:
|
|
raise HTTPException(422, f"Invalid gateway address: {cfg.gateway}")
|
|
# Validate DNS
|
|
if cfg.dns:
|
|
for dns in cfg.dns:
|
|
try:
|
|
ipaddress.ip_address(dns)
|
|
except ValueError:
|
|
raise HTTPException(422, f"Invalid DNS address: {dns}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/status")
|
|
async def network_status(_user: str = Depends(get_current_user)):
|
|
"""Get device list and active connections."""
|
|
_require_nmcli()
|
|
|
|
# Device status
|
|
dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE,CONNECTION", "device", "status")
|
|
devices = _parse_terse(dev_out, ["device", "type", "state", "connection"])
|
|
|
|
# Filter out loopback and uninteresting devices
|
|
devices = [d for d in devices if d["type"] in ("ethernet", "wifi", "bridge")]
|
|
|
|
# Get IP addresses for connected devices
|
|
for dev in devices:
|
|
dev["ipv4"] = None
|
|
if dev["state"] == "connected" and dev["connection"]:
|
|
try:
|
|
conn_out = await _nmcli("-t", "-f", "IP4.ADDRESS,IP4.GATEWAY,IP4.DNS",
|
|
"connection", "show", dev["connection"])
|
|
props = _parse_keyvalue(conn_out)
|
|
# nmcli may output IP4.ADDRESS[1], IP4.DNS[1], etc.
|
|
dev["ipv4"] = {
|
|
"address": props.get("IP4.ADDRESS[1]", ""),
|
|
"gateway": props.get("IP4.GATEWAY", ""),
|
|
"dns": [],
|
|
}
|
|
# Collect all DNS entries
|
|
for k, v in props.items():
|
|
if k.startswith("IP4.DNS") and v:
|
|
dev["ipv4"]["dns"].append(v)
|
|
except HTTPException:
|
|
pass # Connection details unavailable
|
|
|
|
return {"devices": devices}
|
|
|
|
|
|
@router.get("/connections")
|
|
async def list_connections(_user: str = Depends(get_current_user)):
|
|
"""List all saved NetworkManager connections."""
|
|
out = await _nmcli("-t", "-f", "NAME,UUID,TYPE,DEVICE", "connection", "show")
|
|
connections = _parse_terse(out, ["name", "uuid", "type", "device"])
|
|
return connections
|
|
|
|
|
|
@router.get("/connection/{name}")
|
|
async def connection_detail(name: str, _user: str = Depends(get_current_user)):
|
|
"""Get detailed info for a single connection."""
|
|
try:
|
|
out = await _nmcli("connection", "show", name)
|
|
except HTTPException:
|
|
raise HTTPException(404, f"Connection not found: {name}")
|
|
|
|
props = _parse_keyvalue(out)
|
|
|
|
# Extract IPv4 config (what's configured)
|
|
ipv4_method = props.get("ipv4.method", "auto")
|
|
ipv4_addresses = props.get("ipv4.addresses", "")
|
|
ipv4_gateway = props.get("ipv4.gateway", "")
|
|
ipv4_dns = props.get("ipv4.dns", "")
|
|
|
|
# Extract active IPv4 (what's currently in use)
|
|
active_address = props.get("IP4.ADDRESS[1]", "")
|
|
active_gateway = props.get("IP4.GATEWAY", "")
|
|
active_dns = []
|
|
for k, v in props.items():
|
|
if k.startswith("IP4.DNS") and v:
|
|
active_dns.append(v)
|
|
|
|
return {
|
|
"name": props.get("connection.id", name),
|
|
"uuid": props.get("connection.uuid", ""),
|
|
"type": props.get("connection.type", ""),
|
|
"device": props.get("GENERAL.DEVICES", props.get("connection.interface-name", "")),
|
|
"state": props.get("GENERAL.STATE", ""),
|
|
"ipv4": {
|
|
"method": ipv4_method,
|
|
"addresses": ipv4_addresses,
|
|
"gateway": ipv4_gateway,
|
|
"dns": ipv4_dns,
|
|
},
|
|
"active_ipv4": {
|
|
"address": active_address,
|
|
"gateway": active_gateway,
|
|
"dns": active_dns,
|
|
},
|
|
}
|
|
|
|
|
|
@router.put("/connection/{name}/ipv4")
|
|
async def configure_ipv4(
|
|
name: str,
|
|
cfg: IPv4Config,
|
|
_user: str = Depends(get_current_user),
|
|
):
|
|
"""Set IPv4 configuration (DHCP or static) and apply."""
|
|
_validate_ipv4_config(cfg)
|
|
|
|
if cfg.method == "auto":
|
|
# Switch to DHCP — clear static settings
|
|
await _nmcli("connection", "modify", name,
|
|
"ipv4.method", "auto",
|
|
"ipv4.addresses", "",
|
|
"ipv4.gateway", "",
|
|
"ipv4.dns", "")
|
|
else:
|
|
# Static configuration
|
|
dns_str = " ".join(cfg.dns) if cfg.dns else ""
|
|
modify_args = [
|
|
"connection", "modify", name,
|
|
"ipv4.method", "manual",
|
|
"ipv4.addresses", cfg.address,
|
|
"ipv4.gateway", cfg.gateway,
|
|
]
|
|
if dns_str:
|
|
modify_args.extend(["ipv4.dns", dns_str])
|
|
else:
|
|
modify_args.extend(["ipv4.dns", ""])
|
|
await _nmcli(*modify_args)
|
|
|
|
# Apply the changes
|
|
try:
|
|
await _nmcli("connection", "up", name, timeout=15)
|
|
except HTTPException as e:
|
|
# Connection up may fail if we just changed the IP of our own interface
|
|
# Return success anyway — the config was saved
|
|
log.warning("connection up after config change: %s", e.detail)
|
|
|
|
return {"success": True, "method": cfg.method}
|
|
|
|
|
|
@router.get("/wifi/list")
|
|
async def wifi_scan(_user: str = Depends(get_current_user)):
|
|
"""Scan for available WiFi networks."""
|
|
_require_nmcli()
|
|
|
|
# Find the wifi device
|
|
dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE", "device", "status")
|
|
devices = _parse_terse(dev_out, ["device", "type", "state"])
|
|
wifi_dev = next((d for d in devices if d["type"] == "wifi"), None)
|
|
if not wifi_dev:
|
|
raise HTTPException(404, "No WiFi device found")
|
|
|
|
# Scan
|
|
try:
|
|
out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE",
|
|
"device", "wifi", "list", "--rescan", "yes",
|
|
timeout=20)
|
|
except HTTPException:
|
|
# Rescan may fail; try without rescan
|
|
out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE",
|
|
"device", "wifi", "list")
|
|
|
|
networks = _parse_terse(out, ["ssid", "bssid", "signal", "security", "in_use"])
|
|
|
|
# Filter empty SSIDs, deduplicate by SSID (keep strongest signal)
|
|
seen = {}
|
|
for net in networks:
|
|
if not net["ssid"]:
|
|
continue
|
|
net["signal"] = int(net["signal"]) if net["signal"].isdigit() else 0
|
|
net["in_use"] = net["in_use"] == "*"
|
|
existing = seen.get(net["ssid"])
|
|
if not existing or net["signal"] > existing["signal"]:
|
|
seen[net["ssid"]] = net
|
|
|
|
result = sorted(seen.values(), key=lambda x: x["signal"], reverse=True)
|
|
return result
|
|
|
|
|
|
@router.post("/wifi/connect")
|
|
async def wifi_connect(req: WifiConnectRequest, _user: str = Depends(get_current_user)):
|
|
"""Connect to a WiFi network."""
|
|
_require_nmcli()
|
|
|
|
if not req.ssid or len(req.ssid) > 64:
|
|
raise HTTPException(422, "Invalid SSID")
|
|
|
|
args = ["device", "wifi", "connect", req.ssid]
|
|
if req.password:
|
|
args.extend(["password", req.password])
|
|
if req.hidden:
|
|
args.extend(["hidden", "yes"])
|
|
|
|
await _nmcli(*args, timeout=30)
|
|
return {"success": True, "ssid": req.ssid}
|
|
|
|
|
|
@router.post("/wifi/disconnect")
|
|
async def wifi_disconnect(_user: str = Depends(get_current_user)):
|
|
"""Disconnect the WiFi device."""
|
|
_require_nmcli()
|
|
|
|
# Find the wifi device name
|
|
dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE", "device", "status")
|
|
devices = _parse_terse(dev_out, ["device", "type"])
|
|
wifi_dev = next((d for d in devices if d["type"] == "wifi"), None)
|
|
if not wifi_dev:
|
|
raise HTTPException(404, "No WiFi device found")
|
|
|
|
await _nmcli("device", "disconnect", wifi_dev["device"])
|
|
return {"success": True}
|
|
|
|
|
|
@router.delete("/connection/{name}")
|
|
async def delete_connection(name: str, _user: str = Depends(get_current_user)):
|
|
"""Delete a saved connection."""
|
|
await _nmcli("connection", "delete", name)
|
|
return {"success": True}
|
|
|
|
|
|
@router.post("/connection/{name}/apply")
|
|
async def apply_connection(name: str, _user: str = Depends(get_current_user)):
|
|
"""Reapply/reconnect a connection."""
|
|
await _nmcli("connection", "up", name, timeout=15)
|
|
return {"success": True}
|