- Capture stderr from GUI apps and show meaningful exit reasons (signal names,
error messages) instead of generic "Application exited"
- Add /api/display/discover-apps endpoint that scans .desktop files for
installed GUI apps, replacing manual-only app configuration
- Settings > Applications now shows discoverable apps with one-click Add,
with manual form available as fallback
- Fix WiFi: start wpa_supplicant before bringing interface up, handle rfkill
blocks, add retry logic and error messages to scan results
- Fix WiFi scan frontend bug: response is {networks:[...]} not a bare array
- Add iw and wpasupplicant to install.sh dependencies
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1045 lines
35 KiB
Python
1045 lines
35 KiB
Python
"""Network configuration via NetworkManager (nmcli) or direct ip/interfaces."""
|
|
|
|
import asyncio
|
|
import ipaddress
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Literal, Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
from backend.auth import get_current_user
|
|
|
|
router = APIRouter(prefix="/api/network", tags=["network"])
|
|
log = logging.getLogger("atlus.network")
|
|
|
|
|
|
def _find_bin(name: str) -> Optional[str]:
|
|
"""Find a binary, even with systemd's minimal PATH."""
|
|
found = shutil.which(name)
|
|
if found:
|
|
return found
|
|
for p in ("/usr/bin", "/usr/local/bin", "/usr/sbin", "/sbin", "/bin"):
|
|
full = os.path.join(p, name)
|
|
if os.path.isfile(full) and os.access(full, os.X_OK):
|
|
return full
|
|
return None
|
|
|
|
|
|
_HAS_NMCLI = bool(_find_bin("nmcli"))
|
|
_IP_BIN = _find_bin("ip") or "ip"
|
|
|
|
# Debian/Armbian network config paths
|
|
_INTERFACES_FILE = Path("/etc/network/interfaces")
|
|
_INTERFACES_DIR = Path("/etc/network/interfaces.d")
|
|
_RESOLV_CONF = Path("/etc/resolv.conf")
|
|
_WPA_SUPPLICANT_DIR = Path("/etc/wpa_supplicant")
|
|
_IW_BIN = _find_bin("iw")
|
|
_WPA_CLI_BIN = _find_bin("wpa_cli")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class IPv4Config(BaseModel):
|
|
method: Literal["auto", "manual"]
|
|
address: Optional[str] = None # CIDR notation e.g. "192.168.1.100/24"
|
|
gateway: Optional[str] = None
|
|
dns: Optional[list[str]] = None
|
|
|
|
|
|
class WifiConnectRequest(BaseModel):
|
|
ssid: str
|
|
password: Optional[str] = None
|
|
hidden: bool = False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _require_nmcli():
|
|
"""Raise 503 if NetworkManager is not available."""
|
|
if not _HAS_NMCLI:
|
|
raise HTTPException(503, "NetworkManager (nmcli) not available on this system")
|
|
|
|
|
|
def _safe_env():
|
|
"""Build environment with full PATH for nmcli commands."""
|
|
env = {**os.environ, "LC_ALL": "C"}
|
|
path = env.get("PATH", "")
|
|
for p in ("/usr/bin", "/usr/sbin", "/bin", "/sbin"):
|
|
if p not in path:
|
|
path = p + ":" + path
|
|
env["PATH"] = path
|
|
return env
|
|
|
|
|
|
async def _nmcli(*args: str, timeout: float = 30) -> str:
|
|
"""Run nmcli with C locale, return stdout. Raise HTTPException on failure."""
|
|
_require_nmcli()
|
|
cmd = ["nmcli"] + list(args)
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
env=_safe_env(),
|
|
)
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
raise HTTPException(504, "nmcli operation timed out")
|
|
|
|
if proc.returncode != 0:
|
|
err_msg = stderr.decode().strip()
|
|
log.warning("nmcli failed (%d): %s", proc.returncode, err_msg)
|
|
raise HTTPException(500, f"nmcli error: {err_msg}")
|
|
return stdout.decode()
|
|
|
|
|
|
def _parse_terse(output: str, fields: list[str]) -> list[dict]:
|
|
"""Parse nmcli terse (-t) colon-separated output into list of dicts.
|
|
|
|
Handles escaped colons (\\:) in values.
|
|
"""
|
|
results = []
|
|
for line in output.strip().splitlines():
|
|
if not line:
|
|
continue
|
|
# Split on unescaped colons
|
|
parts = re.split(r"(?<!\\):", line)
|
|
parts = [p.replace("\\:", ":") for p in parts]
|
|
if len(parts) >= len(fields):
|
|
results.append(dict(zip(fields, parts[:len(fields)])))
|
|
return results
|
|
|
|
|
|
def _parse_keyvalue(output: str) -> dict[str, str]:
|
|
"""Parse nmcli key:value output (from `connection show <name>`)."""
|
|
props = {}
|
|
for line in output.strip().splitlines():
|
|
if ":" not in line:
|
|
continue
|
|
key, _, value = line.partition(":")
|
|
props[key.strip()] = value.strip()
|
|
return props
|
|
|
|
|
|
def _validate_ipv4_config(cfg: IPv4Config):
|
|
"""Validate IPv4 configuration fields."""
|
|
if cfg.method == "manual":
|
|
if not cfg.address:
|
|
raise HTTPException(422, "IP address is required for static configuration")
|
|
if not cfg.gateway:
|
|
raise HTTPException(422, "Gateway is required for static configuration")
|
|
# Validate address (must be CIDR)
|
|
try:
|
|
ipaddress.ip_interface(cfg.address)
|
|
except ValueError:
|
|
raise HTTPException(422, f"Invalid IP address/prefix: {cfg.address}. Use CIDR notation (e.g. 192.168.1.100/24)")
|
|
# Validate gateway
|
|
try:
|
|
ipaddress.ip_address(cfg.gateway)
|
|
except ValueError:
|
|
raise HTTPException(422, f"Invalid gateway address: {cfg.gateway}")
|
|
# Validate DNS
|
|
if cfg.dns:
|
|
for dns in cfg.dns:
|
|
try:
|
|
ipaddress.ip_address(dns)
|
|
except ValueError:
|
|
raise HTTPException(422, f"Invalid DNS address: {dns}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/status")
|
|
async def network_status(_user: str = Depends(get_current_user)):
|
|
"""Get device list and active connections."""
|
|
_require_nmcli()
|
|
|
|
# Device status
|
|
dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE,CONNECTION", "device", "status")
|
|
devices = _parse_terse(dev_out, ["device", "type", "state", "connection"])
|
|
|
|
# Filter out loopback and uninteresting devices
|
|
devices = [d for d in devices if d["type"] in ("ethernet", "wifi", "bridge")]
|
|
|
|
# Get IP addresses for connected devices
|
|
for dev in devices:
|
|
dev["ipv4"] = None
|
|
if dev["state"] == "connected" and dev["connection"]:
|
|
try:
|
|
conn_out = await _nmcli("-t", "-f", "IP4.ADDRESS,IP4.GATEWAY,IP4.DNS",
|
|
"connection", "show", dev["connection"])
|
|
props = _parse_keyvalue(conn_out)
|
|
# nmcli may output IP4.ADDRESS[1], IP4.DNS[1], etc.
|
|
dev["ipv4"] = {
|
|
"address": props.get("IP4.ADDRESS[1]", ""),
|
|
"gateway": props.get("IP4.GATEWAY", ""),
|
|
"dns": [],
|
|
}
|
|
# Collect all DNS entries
|
|
for k, v in props.items():
|
|
if k.startswith("IP4.DNS") and v:
|
|
dev["ipv4"]["dns"].append(v)
|
|
except HTTPException:
|
|
pass # Connection details unavailable
|
|
|
|
return {"devices": devices}
|
|
|
|
|
|
@router.get("/connections")
|
|
async def list_connections(_user: str = Depends(get_current_user)):
|
|
"""List all saved NetworkManager connections."""
|
|
out = await _nmcli("-t", "-f", "NAME,UUID,TYPE,DEVICE", "connection", "show")
|
|
connections = _parse_terse(out, ["name", "uuid", "type", "device"])
|
|
return connections
|
|
|
|
|
|
@router.get("/connection/{name}")
|
|
async def connection_detail(name: str, _user: str = Depends(get_current_user)):
|
|
"""Get detailed info for a single connection."""
|
|
try:
|
|
out = await _nmcli("connection", "show", name)
|
|
except HTTPException:
|
|
raise HTTPException(404, f"Connection not found: {name}")
|
|
|
|
props = _parse_keyvalue(out)
|
|
|
|
# Extract IPv4 config (what's configured)
|
|
ipv4_method = props.get("ipv4.method", "auto")
|
|
ipv4_addresses = props.get("ipv4.addresses", "")
|
|
ipv4_gateway = props.get("ipv4.gateway", "")
|
|
ipv4_dns = props.get("ipv4.dns", "")
|
|
|
|
# Extract active IPv4 (what's currently in use)
|
|
active_address = props.get("IP4.ADDRESS[1]", "")
|
|
active_gateway = props.get("IP4.GATEWAY", "")
|
|
active_dns = []
|
|
for k, v in props.items():
|
|
if k.startswith("IP4.DNS") and v:
|
|
active_dns.append(v)
|
|
|
|
return {
|
|
"name": props.get("connection.id", name),
|
|
"uuid": props.get("connection.uuid", ""),
|
|
"type": props.get("connection.type", ""),
|
|
"device": props.get("GENERAL.DEVICES", props.get("connection.interface-name", "")),
|
|
"state": props.get("GENERAL.STATE", ""),
|
|
"ipv4": {
|
|
"method": ipv4_method,
|
|
"addresses": ipv4_addresses,
|
|
"gateway": ipv4_gateway,
|
|
"dns": ipv4_dns,
|
|
},
|
|
"active_ipv4": {
|
|
"address": active_address,
|
|
"gateway": active_gateway,
|
|
"dns": active_dns,
|
|
},
|
|
}
|
|
|
|
|
|
@router.put("/connection/{name}/ipv4")
|
|
async def configure_ipv4(
|
|
name: str,
|
|
cfg: IPv4Config,
|
|
_user: str = Depends(get_current_user),
|
|
):
|
|
"""Set IPv4 configuration (DHCP or static) and apply."""
|
|
_validate_ipv4_config(cfg)
|
|
|
|
if cfg.method == "auto":
|
|
# Switch to DHCP — clear static settings
|
|
await _nmcli("connection", "modify", name,
|
|
"ipv4.method", "auto",
|
|
"ipv4.addresses", "",
|
|
"ipv4.gateway", "",
|
|
"ipv4.dns", "")
|
|
else:
|
|
# Static configuration
|
|
dns_str = " ".join(cfg.dns) if cfg.dns else ""
|
|
modify_args = [
|
|
"connection", "modify", name,
|
|
"ipv4.method", "manual",
|
|
"ipv4.addresses", cfg.address,
|
|
"ipv4.gateway", cfg.gateway,
|
|
]
|
|
if dns_str:
|
|
modify_args.extend(["ipv4.dns", dns_str])
|
|
else:
|
|
modify_args.extend(["ipv4.dns", ""])
|
|
await _nmcli(*modify_args)
|
|
|
|
# Apply the changes
|
|
try:
|
|
await _nmcli("connection", "up", name, timeout=15)
|
|
except HTTPException as e:
|
|
# Connection up may fail if we just changed the IP of our own interface
|
|
# Return success anyway — the config was saved
|
|
log.warning("connection up after config change: %s", e.detail)
|
|
|
|
return {"success": True, "method": cfg.method}
|
|
|
|
|
|
@router.get("/wifi/list")
|
|
async def wifi_scan(_user: str = Depends(get_current_user)):
|
|
"""Scan for available WiFi networks."""
|
|
_require_nmcli()
|
|
|
|
# Find the wifi device
|
|
dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE,STATE", "device", "status")
|
|
devices = _parse_terse(dev_out, ["device", "type", "state"])
|
|
wifi_dev = next((d for d in devices if d["type"] == "wifi"), None)
|
|
if not wifi_dev:
|
|
raise HTTPException(404, "No WiFi device found")
|
|
|
|
# Scan
|
|
try:
|
|
out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE",
|
|
"device", "wifi", "list", "--rescan", "yes",
|
|
timeout=20)
|
|
except HTTPException:
|
|
# Rescan may fail; try without rescan
|
|
out = await _nmcli("-t", "-f", "SSID,BSSID,SIGNAL,SECURITY,IN-USE",
|
|
"device", "wifi", "list")
|
|
|
|
networks = _parse_terse(out, ["ssid", "bssid", "signal", "security", "in_use"])
|
|
|
|
# Filter empty SSIDs, deduplicate by SSID (keep strongest signal)
|
|
seen = {}
|
|
for net in networks:
|
|
if not net["ssid"]:
|
|
continue
|
|
net["signal"] = int(net["signal"]) if net["signal"].isdigit() else 0
|
|
net["in_use"] = net["in_use"] == "*"
|
|
existing = seen.get(net["ssid"])
|
|
if not existing or net["signal"] > existing["signal"]:
|
|
seen[net["ssid"]] = net
|
|
|
|
result = sorted(seen.values(), key=lambda x: x["signal"], reverse=True)
|
|
return result
|
|
|
|
|
|
@router.post("/wifi/connect")
|
|
async def wifi_connect(req: WifiConnectRequest, _user: str = Depends(get_current_user)):
|
|
"""Connect to a WiFi network."""
|
|
_require_nmcli()
|
|
|
|
if not req.ssid or len(req.ssid) > 64:
|
|
raise HTTPException(422, "Invalid SSID")
|
|
|
|
args = ["device", "wifi", "connect", req.ssid]
|
|
if req.password:
|
|
args.extend(["password", req.password])
|
|
if req.hidden:
|
|
args.extend(["hidden", "yes"])
|
|
|
|
await _nmcli(*args, timeout=30)
|
|
return {"success": True, "ssid": req.ssid}
|
|
|
|
|
|
@router.post("/wifi/disconnect")
|
|
async def wifi_disconnect(_user: str = Depends(get_current_user)):
|
|
"""Disconnect the WiFi device."""
|
|
_require_nmcli()
|
|
|
|
# Find the wifi device name
|
|
dev_out = await _nmcli("-t", "-f", "DEVICE,TYPE", "device", "status")
|
|
devices = _parse_terse(dev_out, ["device", "type"])
|
|
wifi_dev = next((d for d in devices if d["type"] == "wifi"), None)
|
|
if not wifi_dev:
|
|
raise HTTPException(404, "No WiFi device found")
|
|
|
|
await _nmcli("device", "disconnect", wifi_dev["device"])
|
|
return {"success": True}
|
|
|
|
|
|
@router.delete("/connection/{name}")
|
|
async def delete_connection(name: str, _user: str = Depends(get_current_user)):
|
|
"""Delete a saved connection."""
|
|
await _nmcli("connection", "delete", name)
|
|
return {"success": True}
|
|
|
|
|
|
@router.post("/connection/{name}/apply")
|
|
async def apply_connection(name: str, _user: str = Depends(get_current_user)):
|
|
"""Reapply/reconnect a connection."""
|
|
await _nmcli("connection", "up", name, timeout=15)
|
|
return {"success": True}
|
|
|
|
|
|
# ===========================================================================
|
|
# Fallback: direct `ip` + /etc/network/interfaces (no NetworkManager)
|
|
# ===========================================================================
|
|
|
|
_IFACE_NAME_RE = re.compile(r"^[a-zA-Z0-9_.-]+$")
|
|
|
|
|
|
def _validate_iface_name(name: str):
|
|
"""Prevent injection via interface name."""
|
|
if not _IFACE_NAME_RE.match(name) or len(name) > 32:
|
|
raise HTTPException(400, "Invalid interface name")
|
|
|
|
|
|
async def _run_cmd(*args: str, timeout: float = 15) -> tuple[int, str, str]:
|
|
"""Run a command with safe env, return (rc, stdout, stderr)."""
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*args,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
env=_safe_env(),
|
|
)
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
return 1, "", "Command timed out"
|
|
return proc.returncode, stdout.decode(), stderr.decode()
|
|
|
|
|
|
async def _parse_ip_addr() -> list[dict]:
|
|
"""Parse `ip -j addr show` into a list of interface dicts."""
|
|
rc, stdout, _ = await _run_cmd(_IP_BIN, "-j", "addr", "show")
|
|
if rc != 0 or not stdout.strip():
|
|
# Fallback: parse non-json output
|
|
return await _parse_ip_addr_text()
|
|
|
|
import json
|
|
try:
|
|
ifaces = json.loads(stdout)
|
|
except json.JSONDecodeError:
|
|
return await _parse_ip_addr_text()
|
|
|
|
results = []
|
|
for iface in ifaces:
|
|
name = iface.get("ifname", "")
|
|
if name == "lo":
|
|
continue
|
|
state = iface.get("operstate", "UNKNOWN").upper()
|
|
flags = iface.get("flags", [])
|
|
mac = iface.get("address", "")
|
|
|
|
ipv4 = ""
|
|
ipv6 = ""
|
|
gateway = ""
|
|
for ai in iface.get("addr_info", []):
|
|
if ai.get("family") == "inet" and not ipv4:
|
|
ipv4 = f"{ai['local']}/{ai.get('prefixlen', 24)}"
|
|
elif ai.get("family") == "inet6" and not ipv6:
|
|
if not ai.get("local", "").startswith("fe80"):
|
|
ipv6 = ai.get("local", "")
|
|
|
|
# Detect wireless interfaces
|
|
is_wireless = Path(f"/sys/class/net/{name}/wireless").exists()
|
|
|
|
results.append({
|
|
"name": name,
|
|
"state": state,
|
|
"up": state == "UP" or "UP" in flags,
|
|
"mac": mac,
|
|
"ipv4": ipv4,
|
|
"ipv6": ipv6,
|
|
"wireless": is_wireless,
|
|
})
|
|
|
|
# Get default gateway
|
|
rc2, gw_out, _ = await _run_cmd(_IP_BIN, "-j", "route", "show", "default")
|
|
if rc2 == 0 and gw_out.strip():
|
|
try:
|
|
routes = json.loads(gw_out)
|
|
for route in routes:
|
|
gw = route.get("gateway", "")
|
|
dev = route.get("dev", "")
|
|
for iface in results:
|
|
if iface["name"] == dev:
|
|
iface["gateway"] = gw
|
|
break
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return results
|
|
|
|
|
|
async def _parse_ip_addr_text() -> list[dict]:
|
|
"""Fallback parser for `ip addr show` plain text output."""
|
|
rc, stdout, _ = await _run_cmd(_IP_BIN, "addr", "show")
|
|
if rc != 0:
|
|
return []
|
|
|
|
results = []
|
|
current = None
|
|
for line in stdout.splitlines():
|
|
# New interface line: "2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> ..."
|
|
m = re.match(r"^\d+:\s+(\S+?):\s+<([^>]*)>", line)
|
|
if m:
|
|
if current:
|
|
results.append(current)
|
|
name = m.group(1).rstrip(":")
|
|
flags = m.group(2)
|
|
if name == "lo":
|
|
current = None
|
|
continue
|
|
current = {
|
|
"name": name,
|
|
"state": "UP" if "UP" in flags else "DOWN",
|
|
"up": "UP" in flags,
|
|
"mac": "",
|
|
"ipv4": "",
|
|
"ipv6": "",
|
|
"gateway": "",
|
|
"wireless": Path(f"/sys/class/net/{name}/wireless").exists(),
|
|
}
|
|
continue
|
|
|
|
if not current:
|
|
continue
|
|
|
|
# MAC address: " link/ether aa:bb:cc:dd:ee:ff ..."
|
|
m = re.match(r"\s+link/ether\s+(\S+)", line)
|
|
if m:
|
|
current["mac"] = m.group(1)
|
|
|
|
# IPv4: " inet 192.168.1.100/24 ..."
|
|
m = re.match(r"\s+inet\s+(\S+)", line)
|
|
if m and not current["ipv4"]:
|
|
current["ipv4"] = m.group(1)
|
|
|
|
# IPv6: " inet6 2001:db8::1/64 ..."
|
|
m = re.match(r"\s+inet6\s+(\S+)", line)
|
|
if m and not current["ipv6"]:
|
|
addr = m.group(1).split("/")[0]
|
|
if not addr.startswith("fe80"):
|
|
current["ipv6"] = addr
|
|
|
|
if current:
|
|
results.append(current)
|
|
|
|
# Get default gateway
|
|
rc2, gw_out, _ = await _run_cmd(_IP_BIN, "route", "show", "default")
|
|
if rc2 == 0:
|
|
for line in gw_out.splitlines():
|
|
m = re.match(r"default via (\S+) dev (\S+)", line)
|
|
if m:
|
|
for iface in results:
|
|
if iface["name"] == m.group(2):
|
|
iface["gateway"] = m.group(1)
|
|
break
|
|
|
|
return results
|
|
|
|
|
|
def _parse_interfaces_file() -> dict[str, dict]:
|
|
"""Parse /etc/network/interfaces into per-iface config dicts."""
|
|
configs = {}
|
|
# Also include files from interfaces.d
|
|
content = ""
|
|
if _INTERFACES_FILE.exists():
|
|
content = _INTERFACES_FILE.read_text()
|
|
# Check for source lines
|
|
for line in content.splitlines():
|
|
m = re.match(r"^source\s+(.+)", line)
|
|
if m:
|
|
pattern = m.group(1).strip()
|
|
from glob import glob as gglob
|
|
for path in gglob(pattern):
|
|
try:
|
|
content += "\n" + Path(path).read_text()
|
|
except OSError:
|
|
pass
|
|
# Also read interfaces.d/*.cfg and interfaces.d/* files
|
|
if _INTERFACES_DIR.is_dir():
|
|
for f in sorted(_INTERFACES_DIR.iterdir()):
|
|
if f.is_file():
|
|
try:
|
|
content += "\n" + f.read_text()
|
|
except OSError:
|
|
pass
|
|
|
|
current_iface = None
|
|
for line in content.splitlines():
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
# "auto eth0" or "allow-hotplug eth0"
|
|
m = re.match(r"^(auto|allow-hotplug)\s+(\S+)", stripped)
|
|
if m:
|
|
iface_name = m.group(2)
|
|
if iface_name not in configs:
|
|
configs[iface_name] = {"auto": False, "method": "dhcp"}
|
|
if m.group(1) == "auto":
|
|
configs[iface_name]["auto"] = True
|
|
continue
|
|
# "iface eth0 inet static"
|
|
m = re.match(r"^iface\s+(\S+)\s+inet\s+(\S+)", stripped)
|
|
if m:
|
|
iface_name = m.group(1)
|
|
method = m.group(2) # static, dhcp, manual, loopback
|
|
if iface_name not in configs:
|
|
configs[iface_name] = {"auto": False}
|
|
configs[iface_name]["method"] = method
|
|
current_iface = iface_name
|
|
continue
|
|
# Indented config lines under an iface block
|
|
if current_iface and line[0] in (" ", "\t"):
|
|
m = re.match(r"\s+(address|netmask|gateway|dns-nameservers|network|broadcast)\s+(.+)", stripped)
|
|
if m:
|
|
key = m.group(1)
|
|
val = m.group(2).strip()
|
|
configs[current_iface][key] = val
|
|
else:
|
|
current_iface = None
|
|
|
|
return configs
|
|
|
|
|
|
def _write_interface_config(iface: str, method: str, address: str = "",
|
|
gateway: str = "", dns: list[str] = None,
|
|
ssid: str = "", wifi_password: str = ""):
|
|
"""Write or update an interface block in /etc/network/interfaces.d/{iface}.
|
|
|
|
Uses interfaces.d to avoid clobbering the main file.
|
|
For wireless interfaces, also writes wpa_supplicant config.
|
|
"""
|
|
_INTERFACES_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
is_wireless = Path(f"/sys/class/net/{iface}/wireless").exists()
|
|
|
|
if method == "dhcp":
|
|
lines = [
|
|
f"# Managed by Atlus",
|
|
f"auto {iface}",
|
|
f"iface {iface} inet dhcp",
|
|
]
|
|
else:
|
|
# Parse CIDR to address + netmask
|
|
try:
|
|
net_if = ipaddress.ip_interface(address)
|
|
addr = str(net_if.ip)
|
|
mask = str(net_if.netmask)
|
|
except ValueError:
|
|
raise HTTPException(422, f"Invalid address: {address}")
|
|
|
|
lines = [
|
|
f"# Managed by Atlus",
|
|
f"auto {iface}",
|
|
f"iface {iface} inet static",
|
|
f" address {addr}",
|
|
f" netmask {mask}",
|
|
]
|
|
if gateway:
|
|
lines.append(f" gateway {gateway}")
|
|
if dns:
|
|
lines.append(f" dns-nameservers {' '.join(dns)}")
|
|
|
|
# Add WiFi config if wireless and SSID provided
|
|
if is_wireless and ssid:
|
|
wpa_conf = _WPA_SUPPLICANT_DIR / f"{iface}.conf"
|
|
_write_wpa_supplicant(iface, ssid, wifi_password)
|
|
lines.append(f" wpa-ssid {ssid}")
|
|
lines.append(f" wpa-conf {wpa_conf}")
|
|
|
|
content = "\n".join(lines) + "\n"
|
|
config_file = _INTERFACES_DIR / iface
|
|
config_file.write_text(content)
|
|
log.info("Wrote interface config for %s: method=%s ssid=%s", iface, method, ssid or "(none)")
|
|
|
|
# Also update resolv.conf for static DNS
|
|
if method == "static" and dns:
|
|
_write_resolv_conf(dns)
|
|
|
|
|
|
def _write_resolv_conf(dns: list[str]):
|
|
"""Write DNS servers to /etc/resolv.conf."""
|
|
try:
|
|
lines = ["# Managed by Atlus\n"]
|
|
for d in dns:
|
|
lines.append(f"nameserver {d}\n")
|
|
_RESOLV_CONF.write_text("".join(lines))
|
|
except OSError as e:
|
|
log.warning("Failed to write resolv.conf: %s", e)
|
|
|
|
|
|
# --- Fallback endpoints ---
|
|
|
|
@router.get("/interfaces")
|
|
async def list_interfaces(_user: str = Depends(get_current_user)):
|
|
"""List network interfaces using ip commands (no NetworkManager required)."""
|
|
ifaces = await _parse_ip_addr()
|
|
|
|
# Merge with /etc/network/interfaces config
|
|
configs = _parse_interfaces_file()
|
|
for iface in ifaces:
|
|
cfg = configs.get(iface["name"], {})
|
|
iface["config_method"] = cfg.get("method", "unknown")
|
|
iface["config_address"] = cfg.get("address", "")
|
|
iface["config_netmask"] = cfg.get("netmask", "")
|
|
iface["config_gateway"] = cfg.get("gateway", "")
|
|
iface["config_dns"] = cfg.get("dns-nameservers", "")
|
|
iface["config_ssid"] = cfg.get("wpa-ssid", "")
|
|
|
|
# Get DNS from resolv.conf
|
|
dns_servers = []
|
|
if _RESOLV_CONF.exists():
|
|
try:
|
|
for line in _RESOLV_CONF.read_text().splitlines():
|
|
m = re.match(r"^nameserver\s+(\S+)", line.strip())
|
|
if m:
|
|
dns_servers.append(m.group(1))
|
|
except OSError:
|
|
pass
|
|
|
|
return {
|
|
"interfaces": ifaces,
|
|
"dns": dns_servers,
|
|
"has_nmcli": _HAS_NMCLI,
|
|
}
|
|
|
|
|
|
class InterfaceConfig(BaseModel):
|
|
method: Literal["dhcp", "static"]
|
|
address: Optional[str] = None # CIDR e.g. "192.168.1.100/24"
|
|
gateway: Optional[str] = None
|
|
dns: Optional[list[str]] = None
|
|
ssid: Optional[str] = None # WiFi SSID
|
|
wifi_password: Optional[str] = None # WiFi password (WPA/WPA2)
|
|
|
|
|
|
@router.put("/interfaces/{name}/config")
|
|
async def configure_interface(
|
|
name: str,
|
|
cfg: InterfaceConfig,
|
|
_user: str = Depends(get_current_user),
|
|
):
|
|
"""Configure an interface in /etc/network/interfaces.d/ and apply."""
|
|
_validate_iface_name(name)
|
|
|
|
if cfg.method == "static":
|
|
if not cfg.address:
|
|
raise HTTPException(422, "IP address is required for static configuration")
|
|
if not cfg.gateway:
|
|
raise HTTPException(422, "Gateway is required for static configuration")
|
|
_validate_ipv4_config(IPv4Config(method="manual", address=cfg.address,
|
|
gateway=cfg.gateway, dns=cfg.dns))
|
|
|
|
# Write config file
|
|
_write_interface_config(
|
|
iface=name,
|
|
method=cfg.method,
|
|
address=cfg.address or "",
|
|
gateway=cfg.gateway or "",
|
|
dns=cfg.dns,
|
|
ssid=cfg.ssid or "",
|
|
wifi_password=cfg.wifi_password or "",
|
|
)
|
|
|
|
# For wireless interfaces, handle wpa_supplicant
|
|
is_wireless = Path(f"/sys/class/net/{name}/wireless").exists()
|
|
if is_wireless and cfg.ssid:
|
|
await _start_wpa_supplicant(name)
|
|
|
|
# Apply: bring interface down then up
|
|
# Use ifdown/ifup if available, else ip commands
|
|
ifdown = _find_bin("ifdown")
|
|
ifup = _find_bin("ifup")
|
|
|
|
if ifdown and ifup:
|
|
await _run_cmd(ifdown, name, timeout=10)
|
|
rc, _, stderr = await _run_cmd(ifup, name, timeout=15)
|
|
if rc != 0:
|
|
log.warning("ifup %s failed: %s", name, stderr.strip())
|
|
# Try ip commands as fallback
|
|
await _apply_with_ip(name, cfg)
|
|
else:
|
|
await _apply_with_ip(name, cfg)
|
|
|
|
return {"success": True, "method": cfg.method}
|
|
|
|
|
|
async def _apply_with_ip(name: str, cfg: InterfaceConfig):
|
|
"""Apply network config using ip commands directly."""
|
|
# Flush existing addresses
|
|
await _run_cmd(_IP_BIN, "addr", "flush", "dev", name)
|
|
|
|
if cfg.method == "static" and cfg.address:
|
|
# Add address
|
|
await _run_cmd(_IP_BIN, "addr", "add", cfg.address, "dev", name)
|
|
# Bring up
|
|
await _run_cmd(_IP_BIN, "link", "set", name, "up")
|
|
# Set default route
|
|
if cfg.gateway:
|
|
await _run_cmd(_IP_BIN, "route", "del", "default", "dev", name)
|
|
await _run_cmd(_IP_BIN, "route", "add", "default", "via", cfg.gateway, "dev", name)
|
|
# Set DNS
|
|
if cfg.dns:
|
|
_write_resolv_conf(cfg.dns)
|
|
else:
|
|
# DHCP — try dhclient or udhcpc
|
|
dhclient = _find_bin("dhclient")
|
|
udhcpc = _find_bin("udhcpc")
|
|
if dhclient:
|
|
await _run_cmd(dhclient, "-r", name, timeout=5)
|
|
await _run_cmd(dhclient, name, timeout=15)
|
|
elif udhcpc:
|
|
await _run_cmd(udhcpc, "-i", name, "-n", timeout=15)
|
|
else:
|
|
await _run_cmd(_IP_BIN, "link", "set", name, "up")
|
|
log.warning("No DHCP client found — interface %s brought up without DHCP", name)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WiFi support (fallback — no NetworkManager)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _write_wpa_supplicant(iface: str, ssid: str, password: str = ""):
|
|
"""Write a wpa_supplicant config file for a wireless interface."""
|
|
_WPA_SUPPLICANT_DIR.mkdir(parents=True, exist_ok=True)
|
|
conf_file = _WPA_SUPPLICANT_DIR / f"{iface}.conf"
|
|
|
|
if password:
|
|
# WPA/WPA2 PSK
|
|
content = f"""# Managed by Atlus
|
|
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
|
|
update_config=1
|
|
country=US
|
|
|
|
network={{
|
|
ssid="{ssid}"
|
|
psk="{password}"
|
|
key_mgmt=WPA-PSK
|
|
}}
|
|
"""
|
|
else:
|
|
# Open network
|
|
content = f"""# Managed by Atlus
|
|
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
|
|
update_config=1
|
|
country=US
|
|
|
|
network={{
|
|
ssid="{ssid}"
|
|
key_mgmt=NONE
|
|
}}
|
|
"""
|
|
conf_file.write_text(content)
|
|
conf_file.chmod(0o600)
|
|
log.info("Wrote wpa_supplicant config for %s: ssid=%s", iface, ssid)
|
|
|
|
|
|
async def _start_wpa_supplicant(iface: str):
|
|
"""Start wpa_supplicant for a wireless interface.
|
|
|
|
Kills any existing instance for this interface first.
|
|
"""
|
|
wpa_bin = _find_bin("wpa_supplicant")
|
|
if not wpa_bin:
|
|
log.warning("wpa_supplicant not found — WiFi auth will not work")
|
|
return
|
|
|
|
conf_file = _WPA_SUPPLICANT_DIR / f"{iface}.conf"
|
|
if not conf_file.exists():
|
|
log.warning("No wpa_supplicant config for %s", iface)
|
|
return
|
|
|
|
# Kill any existing wpa_supplicant for this interface
|
|
killall = _find_bin("killall")
|
|
pkill = _find_bin("pkill")
|
|
if pkill:
|
|
await _run_cmd(pkill, "-f", f"wpa_supplicant.*{iface}", timeout=5)
|
|
elif killall:
|
|
await _run_cmd(killall, "wpa_supplicant", timeout=5)
|
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Bring interface up first
|
|
await _run_cmd(_IP_BIN, "link", "set", iface, "up")
|
|
|
|
# Start wpa_supplicant in background
|
|
rc, _, stderr = await _run_cmd(
|
|
wpa_bin, "-B", # background/daemonize
|
|
"-i", iface,
|
|
"-c", str(conf_file),
|
|
"-D", "nl80211,wext", # try nl80211 first, fallback to wext
|
|
timeout=10,
|
|
)
|
|
if rc != 0:
|
|
log.warning("wpa_supplicant failed for %s: %s", iface, stderr.strip())
|
|
else:
|
|
log.info("wpa_supplicant started for %s", iface)
|
|
# Wait a moment for association
|
|
await asyncio.sleep(2)
|
|
|
|
|
|
@router.get("/wifi/scan/{name}")
|
|
async def scan_wifi(name: str, _user: str = Depends(get_current_user)):
|
|
"""Scan for WiFi networks using iw or iwlist (no NetworkManager required)."""
|
|
_validate_iface_name(name)
|
|
|
|
# Check if interface is wireless
|
|
if not Path(f"/sys/class/net/{name}/wireless").exists():
|
|
raise HTTPException(400, f"{name} is not a wireless interface")
|
|
|
|
# Check if interface is rfkill blocked
|
|
rfkill = _find_bin("rfkill")
|
|
if rfkill:
|
|
rc, stdout, _ = await _run_cmd(rfkill, "list", "wifi", timeout=5)
|
|
if rc == 0 and "Soft blocked: yes" in stdout:
|
|
# Try to unblock
|
|
await _run_cmd(rfkill, "unblock", "wifi", timeout=5)
|
|
await asyncio.sleep(0.5)
|
|
|
|
networks = []
|
|
last_error = ""
|
|
|
|
# Bring interface up first — scan requires it
|
|
await _run_cmd(_IP_BIN, "link", "set", name, "up")
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Try iw first
|
|
if _IW_BIN:
|
|
rc, stdout, stderr = await _run_cmd(_IW_BIN, "dev", name, "scan", timeout=20)
|
|
if rc == 0:
|
|
networks = _parse_iw_scan(stdout)
|
|
else:
|
|
last_error = stderr.strip()
|
|
log.warning("iw scan failed for %s: %s", name, last_error)
|
|
# Retry once after a brief wait (scan can fail if device is busy)
|
|
await asyncio.sleep(2)
|
|
rc, stdout, stderr = await _run_cmd(_IW_BIN, "dev", name, "scan", timeout=20)
|
|
if rc == 0:
|
|
networks = _parse_iw_scan(stdout)
|
|
else:
|
|
last_error = stderr.strip()
|
|
log.warning("iw scan retry failed for %s: %s", name, last_error)
|
|
|
|
# Fallback: iwlist
|
|
if not networks:
|
|
iwlist = _find_bin("iwlist")
|
|
if iwlist:
|
|
rc, stdout, stderr = await _run_cmd(iwlist, name, "scan", timeout=15)
|
|
if rc == 0:
|
|
networks = _parse_iwlist_scan(stdout)
|
|
else:
|
|
last_error = stderr.strip() or last_error
|
|
log.warning("iwlist scan failed for %s: %s", name, last_error)
|
|
|
|
if not networks and not _IW_BIN and not _find_bin("iwlist"):
|
|
return {"networks": [], "error": "Neither iw nor iwlist found. Install: apt-get install iw wireless-tools"}
|
|
|
|
if not networks and last_error:
|
|
# Return empty with error context
|
|
return {"networks": [], "error": last_error}
|
|
|
|
return {"networks": networks}
|
|
|
|
|
|
def _parse_iw_scan(output: str) -> list[dict]:
|
|
"""Parse `iw dev wlan0 scan` output into network list."""
|
|
networks = []
|
|
current = None
|
|
for line in output.splitlines():
|
|
if line.startswith("BSS "):
|
|
if current and current.get("ssid"):
|
|
networks.append(current)
|
|
bssid = re.match(r"BSS\s+(\S+)", line)
|
|
current = {"bssid": bssid.group(1) if bssid else "", "ssid": "", "signal": 0, "security": "Open"}
|
|
elif current is None:
|
|
continue
|
|
elif "SSID:" in line:
|
|
m = re.search(r"SSID:\s*(.*)", line)
|
|
if m:
|
|
current["ssid"] = m.group(1).strip()
|
|
elif "signal:" in line:
|
|
m = re.search(r"signal:\s*(-?\d+)", line)
|
|
if m:
|
|
current["signal"] = int(m.group(1))
|
|
elif "WPA" in line or "RSN" in line:
|
|
current["security"] = "WPA2" if "RSN" in line else "WPA"
|
|
elif "WEP" in line:
|
|
current["security"] = "WEP"
|
|
if current and current.get("ssid"):
|
|
networks.append(current)
|
|
|
|
# Sort by signal strength (strongest first)
|
|
networks.sort(key=lambda n: n.get("signal", -100), reverse=True)
|
|
# Deduplicate by SSID
|
|
seen = set()
|
|
unique = []
|
|
for n in networks:
|
|
if n["ssid"] not in seen:
|
|
seen.add(n["ssid"])
|
|
unique.append(n)
|
|
return unique[:30]
|
|
|
|
|
|
def _parse_iwlist_scan(output: str) -> list[dict]:
|
|
"""Parse `iwlist wlan0 scan` output into network list."""
|
|
networks = []
|
|
current = None
|
|
for line in output.splitlines():
|
|
line = line.strip()
|
|
if "Cell " in line and "Address:" in line:
|
|
if current and current.get("ssid"):
|
|
networks.append(current)
|
|
m = re.search(r"Address:\s*(\S+)", line)
|
|
current = {"bssid": m.group(1) if m else "", "ssid": "", "signal": 0, "security": "Open"}
|
|
elif current is None:
|
|
continue
|
|
elif line.startswith("ESSID:"):
|
|
m = re.search(r'ESSID:"(.+)"', line)
|
|
if m:
|
|
current["ssid"] = m.group(1)
|
|
elif "Signal level=" in line:
|
|
m = re.search(r"Signal level[=:]?\s*(-?\d+)", line)
|
|
if m:
|
|
current["signal"] = int(m.group(1))
|
|
elif "Encryption key:on" in line:
|
|
if current.get("security") == "Open":
|
|
current["security"] = "WEP"
|
|
elif "WPA2" in line or "IEEE 802.11i" in line:
|
|
current["security"] = "WPA2"
|
|
elif "WPA" in line:
|
|
if current.get("security") != "WPA2":
|
|
current["security"] = "WPA"
|
|
if current and current.get("ssid"):
|
|
networks.append(current)
|
|
|
|
networks.sort(key=lambda n: n.get("signal", -100), reverse=True)
|
|
seen = set()
|
|
unique = []
|
|
for n in networks:
|
|
if n["ssid"] not in seen:
|
|
seen.add(n["ssid"])
|
|
unique.append(n)
|
|
return unique[:30]
|
|
|
|
|
|
@router.post("/interfaces/{name}/up")
|
|
async def interface_up(name: str, _user: str = Depends(get_current_user)):
|
|
"""Bring an interface up."""
|
|
_validate_iface_name(name)
|
|
rc, _, stderr = await _run_cmd(_IP_BIN, "link", "set", name, "up")
|
|
if rc != 0:
|
|
raise HTTPException(500, f"Failed to bring up {name}: {stderr.strip()}")
|
|
return {"success": True}
|
|
|
|
|
|
@router.post("/interfaces/{name}/down")
|
|
async def interface_down(name: str, _user: str = Depends(get_current_user)):
|
|
"""Bring an interface down."""
|
|
_validate_iface_name(name)
|
|
rc, _, stderr = await _run_cmd(_IP_BIN, "link", "set", name, "down")
|
|
if rc != 0:
|
|
raise HTTPException(500, f"Failed to bring down {name}: {stderr.strip()}")
|
|
return {"success": True}
|