- Ethernet watchdog: background task polls /sys/class/net every 5s, detects cable plug-in on disabled interfaces and auto-enables with DHCP - interface_down endpoint now refuses to disable the only active interface (returns 409), preventing accidental lockout - Frontend shows the error message instead of silently failing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
204 lines
6.6 KiB
Python
204 lines
6.6 KiB
Python
"""Ethernet link watchdog — auto-enable interfaces when cable is plugged in.
|
|
|
|
Monitors /sys/class/net/*/carrier for ethernet interfaces.
|
|
When a cable is detected on a DOWN interface, brings it up with DHCP.
|
|
This prevents lockout when the only active interface is disabled.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger("atlus.netwatch")
|
|
|
|
_POLL_INTERVAL = 5 # seconds between checks
|
|
|
|
_SEARCH_PATHS = ("/usr/bin", "/usr/local/bin", "/bin", "/usr/sbin", "/sbin")
|
|
|
|
|
|
def _find_bin(name: str) -> str | None:
|
|
found = shutil.which(name)
|
|
if found:
|
|
return found
|
|
for d in _SEARCH_PATHS:
|
|
p = os.path.join(d, name)
|
|
if os.path.isfile(p) and os.access(p, os.X_OK):
|
|
return p
|
|
return None
|
|
|
|
|
|
def _safe_env() -> dict:
|
|
env = {**os.environ, "LC_ALL": "C"}
|
|
path = env.get("PATH", "")
|
|
for p in _SEARCH_PATHS:
|
|
if p not in path:
|
|
path = p + ":" + path
|
|
env["PATH"] = path
|
|
return env
|
|
|
|
|
|
def _is_ethernet(iface: str) -> bool:
|
|
"""Check if an interface is a physical ethernet (not wireless, not virtual)."""
|
|
sys_path = Path(f"/sys/class/net/{iface}")
|
|
if not sys_path.exists():
|
|
return False
|
|
# Skip wireless
|
|
if (sys_path / "wireless").exists():
|
|
return False
|
|
# Skip loopback
|
|
if iface == "lo":
|
|
return False
|
|
# Skip virtual interfaces (bridges, veth, docker, etc.)
|
|
device_path = sys_path / "device"
|
|
if not device_path.exists():
|
|
return False
|
|
# Has a device backing it — likely physical
|
|
return True
|
|
|
|
|
|
def _has_carrier(iface: str) -> bool:
|
|
"""Check if a cable is plugged in (carrier detected)."""
|
|
try:
|
|
carrier = Path(f"/sys/class/net/{iface}/carrier").read_text().strip()
|
|
return carrier == "1"
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _is_up(iface: str) -> bool:
|
|
"""Check if interface is operationally up."""
|
|
try:
|
|
flags = Path(f"/sys/class/net/{iface}/flags").read_text().strip()
|
|
# IFF_UP = 0x1
|
|
return (int(flags, 16) & 0x1) != 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _has_ip(iface: str) -> bool:
|
|
"""Check if interface has any IPv4 address assigned."""
|
|
try:
|
|
addrs = Path(f"/proc/net/fib_trie").read_text()
|
|
# Quick check: look for the interface in the routing table
|
|
# More reliable: check /sys/class/net/{iface}/
|
|
import subprocess
|
|
result = subprocess.run(
|
|
["ip", "-4", "addr", "show", iface],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
return "inet " in result.stdout
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def _run(cmd: list[str], timeout: float = 15) -> tuple[int, str, str]:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
env=_safe_env(),
|
|
)
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
return 1, "", "timeout"
|
|
return proc.returncode, stdout.decode(), stderr.decode()
|
|
|
|
|
|
async def _enable_with_dhcp(iface: str):
|
|
"""Bring interface up and request DHCP lease."""
|
|
ip_bin = _find_bin("ip") or "ip"
|
|
|
|
log.info("Ethernet watchdog: bringing up %s with DHCP", iface)
|
|
|
|
# Bring link up
|
|
await _run([ip_bin, "link", "set", iface, "up"])
|
|
await asyncio.sleep(1)
|
|
|
|
# Try ifup first (respects /etc/network/interfaces.d/)
|
|
ifup = _find_bin("ifup")
|
|
if ifup:
|
|
rc, _, stderr = await _run([ifup, iface], timeout=20)
|
|
if rc == 0:
|
|
log.info("Ethernet watchdog: %s enabled via ifup", iface)
|
|
return
|
|
|
|
# Fallback: DHCP client directly
|
|
dhclient = _find_bin("dhclient")
|
|
udhcpc = _find_bin("udhcpc")
|
|
|
|
if dhclient:
|
|
# Release any stale lease first
|
|
await _run([dhclient, "-r", iface], timeout=5)
|
|
rc, _, stderr = await _run([dhclient, iface], timeout=20)
|
|
if rc == 0:
|
|
log.info("Ethernet watchdog: %s enabled via dhclient", iface)
|
|
return
|
|
log.warning("dhclient failed for %s: %s", iface, stderr.strip())
|
|
elif udhcpc:
|
|
rc, _, stderr = await _run([udhcpc, "-i", iface, "-n", "-q"], timeout=20)
|
|
if rc == 0:
|
|
log.info("Ethernet watchdog: %s enabled via udhcpc", iface)
|
|
return
|
|
log.warning("udhcpc failed for %s: %s", iface, stderr.strip())
|
|
else:
|
|
log.warning("No DHCP client found — %s brought up without address", iface)
|
|
|
|
|
|
async def ethernet_watchdog():
|
|
"""Background task: monitor ethernet interfaces for cable plug-in events.
|
|
|
|
When a cable is plugged into a DOWN ethernet interface:
|
|
1. Bring the interface up
|
|
2. Request a DHCP lease
|
|
|
|
This prevents lockout when the user accidentally disables their only interface.
|
|
"""
|
|
log.info("Ethernet link watchdog started")
|
|
|
|
# Track which interfaces we've already auto-enabled to avoid repeated attempts
|
|
auto_enabled = set()
|
|
|
|
while True:
|
|
try:
|
|
net_dir = Path("/sys/class/net")
|
|
if not net_dir.exists():
|
|
await asyncio.sleep(_POLL_INTERVAL)
|
|
continue
|
|
|
|
for iface_path in net_dir.iterdir():
|
|
iface = iface_path.name
|
|
|
|
if not _is_ethernet(iface):
|
|
continue
|
|
|
|
has_cable = _has_carrier(iface)
|
|
is_up = _is_up(iface)
|
|
|
|
if has_cable and not is_up:
|
|
# Cable plugged in but interface is down — enable it
|
|
if iface not in auto_enabled:
|
|
log.info("Ethernet watchdog: cable detected on disabled %s", iface)
|
|
await _enable_with_dhcp(iface)
|
|
auto_enabled.add(iface)
|
|
elif has_cable and is_up and not _has_ip(iface):
|
|
# Interface is up but has no IP — try DHCP
|
|
if iface not in auto_enabled:
|
|
log.info("Ethernet watchdog: %s up but no IP, requesting DHCP", iface)
|
|
await _enable_with_dhcp(iface)
|
|
auto_enabled.add(iface)
|
|
elif not has_cable:
|
|
# Cable removed — clear from auto_enabled so we re-enable on next plug
|
|
auto_enabled.discard(iface)
|
|
elif has_cable and is_up and _has_ip(iface):
|
|
# All good — clear flag in case it was auto-enabled
|
|
auto_enabled.discard(iface)
|
|
|
|
except Exception:
|
|
log.exception("Ethernet watchdog error")
|
|
|
|
await asyncio.sleep(_POLL_INTERVAL)
|