"""Ethernet link watchdog — auto-enable interfaces when cable is plugged in. Monitors /sys/class/net/*/carrier for ethernet interfaces. When a cable is detected on a DOWN interface, brings it up with DHCP. This prevents lockout when the only active interface is disabled. """ import asyncio import logging import os import shutil from pathlib import Path log = logging.getLogger("atlus.netwatch") _POLL_INTERVAL = 5 # seconds between checks _SEARCH_PATHS = ("/usr/bin", "/usr/local/bin", "/bin", "/usr/sbin", "/sbin") def _find_bin(name: str) -> str | None: found = shutil.which(name) if found: return found for d in _SEARCH_PATHS: p = os.path.join(d, name) if os.path.isfile(p) and os.access(p, os.X_OK): return p return None def _safe_env() -> dict: env = {**os.environ, "LC_ALL": "C"} path = env.get("PATH", "") for p in _SEARCH_PATHS: if p not in path: path = p + ":" + path env["PATH"] = path return env def _is_ethernet(iface: str) -> bool: """Check if an interface is a physical ethernet (not wireless, not virtual).""" sys_path = Path(f"/sys/class/net/{iface}") if not sys_path.exists(): return False # Skip wireless if (sys_path / "wireless").exists(): return False # Skip loopback if iface == "lo": return False # Skip virtual interfaces (bridges, veth, docker, etc.) device_path = sys_path / "device" if not device_path.exists(): return False # Has a device backing it — likely physical return True def _has_carrier(iface: str) -> bool: """Check if a cable is plugged in (carrier detected).""" try: carrier = Path(f"/sys/class/net/{iface}/carrier").read_text().strip() return carrier == "1" except Exception: return False def _is_up(iface: str) -> bool: """Check if interface is operationally up.""" try: flags = Path(f"/sys/class/net/{iface}/flags").read_text().strip() # IFF_UP = 0x1 return (int(flags, 16) & 0x1) != 0 except Exception: return False def _has_ip(iface: str) -> bool: """Check if interface has any IPv4 address assigned.""" try: addrs = Path(f"/proc/net/fib_trie").read_text() # Quick check: look for the interface in the routing table # More reliable: check /sys/class/net/{iface}/ import subprocess result = subprocess.run( ["ip", "-4", "addr", "show", iface], capture_output=True, text=True, timeout=5 ) return "inet " in result.stdout except Exception: return False async def _run(cmd: list[str], timeout: float = 15) -> tuple[int, str, str]: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=_safe_env(), ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.wait() return 1, "", "timeout" return proc.returncode, stdout.decode(), stderr.decode() async def _enable_with_dhcp(iface: str): """Bring interface up and request DHCP lease.""" ip_bin = _find_bin("ip") or "ip" log.info("Ethernet watchdog: bringing up %s with DHCP", iface) # Bring link up await _run([ip_bin, "link", "set", iface, "up"]) await asyncio.sleep(1) # Try ifup first (respects /etc/network/interfaces.d/) ifup = _find_bin("ifup") if ifup: rc, _, stderr = await _run([ifup, iface], timeout=20) if rc == 0: log.info("Ethernet watchdog: %s enabled via ifup", iface) return # Fallback: DHCP client directly dhclient = _find_bin("dhclient") udhcpc = _find_bin("udhcpc") if dhclient: # Release any stale lease first await _run([dhclient, "-r", iface], timeout=5) rc, _, stderr = await _run([dhclient, iface], timeout=20) if rc == 0: log.info("Ethernet watchdog: %s enabled via dhclient", iface) return log.warning("dhclient failed for %s: %s", iface, stderr.strip()) elif udhcpc: rc, _, stderr = await _run([udhcpc, "-i", iface, "-n", "-q"], timeout=20) if rc == 0: log.info("Ethernet watchdog: %s enabled via udhcpc", iface) return log.warning("udhcpc failed for %s: %s", iface, stderr.strip()) else: log.warning("No DHCP client found — %s brought up without address", iface) async def ethernet_watchdog(): """Background task: monitor ethernet interfaces for cable plug-in events. When a cable is plugged into a DOWN ethernet interface: 1. Bring the interface up 2. Request a DHCP lease This prevents lockout when the user accidentally disables their only interface. """ log.info("Ethernet link watchdog started") # Track which interfaces we've already auto-enabled to avoid repeated attempts auto_enabled = set() while True: try: net_dir = Path("/sys/class/net") if not net_dir.exists(): await asyncio.sleep(_POLL_INTERVAL) continue for iface_path in net_dir.iterdir(): iface = iface_path.name if not _is_ethernet(iface): continue has_cable = _has_carrier(iface) is_up = _is_up(iface) if has_cable and not is_up: # Cable plugged in but interface is down — enable it if iface not in auto_enabled: log.info("Ethernet watchdog: cable detected on disabled %s", iface) await _enable_with_dhcp(iface) auto_enabled.add(iface) elif has_cable and is_up and not _has_ip(iface): # Interface is up but has no IP — try DHCP if iface not in auto_enabled: log.info("Ethernet watchdog: %s up but no IP, requesting DHCP", iface) await _enable_with_dhcp(iface) auto_enabled.add(iface) elif not has_cable: # Cable removed — clear from auto_enabled so we re-enable on next plug auto_enabled.discard(iface) elif has_cable and is_up and _has_ip(iface): # All good — clear flag in case it was auto-enabled auto_enabled.discard(iface) except Exception: log.exception("Ethernet watchdog error") await asyncio.sleep(_POLL_INTERVAL)