atlus/backend/netwatch.py
roberts 14f5ce7cf1 Add ethernet link watchdog and prevent disabling last active interface
- Ethernet watchdog: background task polls /sys/class/net every 5s, detects
  cable plug-in on disabled interfaces and auto-enables with DHCP
- interface_down endpoint now refuses to disable the only active interface
  (returns 409), preventing accidental lockout
- Frontend shows the error message instead of silently failing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 00:12:00 -05:00

204 lines
6.6 KiB
Python

"""Ethernet link watchdog — auto-enable interfaces when cable is plugged in.
Monitors /sys/class/net/*/carrier for ethernet interfaces.
When a cable is detected on a DOWN interface, brings it up with DHCP.
This prevents lockout when the only active interface is disabled.
"""
import asyncio
import logging
import os
import shutil
from pathlib import Path
log = logging.getLogger("atlus.netwatch")
_POLL_INTERVAL = 5 # seconds between checks
_SEARCH_PATHS = ("/usr/bin", "/usr/local/bin", "/bin", "/usr/sbin", "/sbin")
def _find_bin(name: str) -> str | None:
found = shutil.which(name)
if found:
return found
for d in _SEARCH_PATHS:
p = os.path.join(d, name)
if os.path.isfile(p) and os.access(p, os.X_OK):
return p
return None
def _safe_env() -> dict:
env = {**os.environ, "LC_ALL": "C"}
path = env.get("PATH", "")
for p in _SEARCH_PATHS:
if p not in path:
path = p + ":" + path
env["PATH"] = path
return env
def _is_ethernet(iface: str) -> bool:
"""Check if an interface is a physical ethernet (not wireless, not virtual)."""
sys_path = Path(f"/sys/class/net/{iface}")
if not sys_path.exists():
return False
# Skip wireless
if (sys_path / "wireless").exists():
return False
# Skip loopback
if iface == "lo":
return False
# Skip virtual interfaces (bridges, veth, docker, etc.)
device_path = sys_path / "device"
if not device_path.exists():
return False
# Has a device backing it — likely physical
return True
def _has_carrier(iface: str) -> bool:
"""Check if a cable is plugged in (carrier detected)."""
try:
carrier = Path(f"/sys/class/net/{iface}/carrier").read_text().strip()
return carrier == "1"
except Exception:
return False
def _is_up(iface: str) -> bool:
"""Check if interface is operationally up."""
try:
flags = Path(f"/sys/class/net/{iface}/flags").read_text().strip()
# IFF_UP = 0x1
return (int(flags, 16) & 0x1) != 0
except Exception:
return False
def _has_ip(iface: str) -> bool:
"""Check if interface has any IPv4 address assigned."""
try:
addrs = Path(f"/proc/net/fib_trie").read_text()
# Quick check: look for the interface in the routing table
# More reliable: check /sys/class/net/{iface}/
import subprocess
result = subprocess.run(
["ip", "-4", "addr", "show", iface],
capture_output=True, text=True, timeout=5
)
return "inet " in result.stdout
except Exception:
return False
async def _run(cmd: list[str], timeout: float = 15) -> tuple[int, str, str]:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=_safe_env(),
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return 1, "", "timeout"
return proc.returncode, stdout.decode(), stderr.decode()
async def _enable_with_dhcp(iface: str):
"""Bring interface up and request DHCP lease."""
ip_bin = _find_bin("ip") or "ip"
log.info("Ethernet watchdog: bringing up %s with DHCP", iface)
# Bring link up
await _run([ip_bin, "link", "set", iface, "up"])
await asyncio.sleep(1)
# Try ifup first (respects /etc/network/interfaces.d/)
ifup = _find_bin("ifup")
if ifup:
rc, _, stderr = await _run([ifup, iface], timeout=20)
if rc == 0:
log.info("Ethernet watchdog: %s enabled via ifup", iface)
return
# Fallback: DHCP client directly
dhclient = _find_bin("dhclient")
udhcpc = _find_bin("udhcpc")
if dhclient:
# Release any stale lease first
await _run([dhclient, "-r", iface], timeout=5)
rc, _, stderr = await _run([dhclient, iface], timeout=20)
if rc == 0:
log.info("Ethernet watchdog: %s enabled via dhclient", iface)
return
log.warning("dhclient failed for %s: %s", iface, stderr.strip())
elif udhcpc:
rc, _, stderr = await _run([udhcpc, "-i", iface, "-n", "-q"], timeout=20)
if rc == 0:
log.info("Ethernet watchdog: %s enabled via udhcpc", iface)
return
log.warning("udhcpc failed for %s: %s", iface, stderr.strip())
else:
log.warning("No DHCP client found — %s brought up without address", iface)
async def ethernet_watchdog():
"""Background task: monitor ethernet interfaces for cable plug-in events.
When a cable is plugged into a DOWN ethernet interface:
1. Bring the interface up
2. Request a DHCP lease
This prevents lockout when the user accidentally disables their only interface.
"""
log.info("Ethernet link watchdog started")
# Track which interfaces we've already auto-enabled to avoid repeated attempts
auto_enabled = set()
while True:
try:
net_dir = Path("/sys/class/net")
if not net_dir.exists():
await asyncio.sleep(_POLL_INTERVAL)
continue
for iface_path in net_dir.iterdir():
iface = iface_path.name
if not _is_ethernet(iface):
continue
has_cable = _has_carrier(iface)
is_up = _is_up(iface)
if has_cable and not is_up:
# Cable plugged in but interface is down — enable it
if iface not in auto_enabled:
log.info("Ethernet watchdog: cable detected on disabled %s", iface)
await _enable_with_dhcp(iface)
auto_enabled.add(iface)
elif has_cable and is_up and not _has_ip(iface):
# Interface is up but has no IP — try DHCP
if iface not in auto_enabled:
log.info("Ethernet watchdog: %s up but no IP, requesting DHCP", iface)
await _enable_with_dhcp(iface)
auto_enabled.add(iface)
elif not has_cable:
# Cable removed — clear from auto_enabled so we re-enable on next plug
auto_enabled.discard(iface)
elif has_cable and is_up and _has_ip(iface):
# All good — clear flag in case it was auto-enabled
auto_enabled.discard(iface)
except Exception:
log.exception("Ethernet watchdog error")
await asyncio.sleep(_POLL_INTERVAL)