From 3f6a061ffe32c3a91824e08d006b3a58ba018665 Mon Sep 17 00:00:00 2001 From: Aaron Guise Date: Wed, 25 Feb 2026 16:08:26 +1300 Subject: [PATCH] =?UTF-8?q?feat:=20mesh=20peer=20sync=20with=20health=20tr?= =?UTF-8?q?acking=20and=20separate=20peer=20credentials=20=F0=9F=94=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Separate peer_sync.auth_username/password from the DA-facing credentials so /internal/* uses its own basic auth; a compromised peer cannot push zones or access the admin API - Per-peer health tracking: consecutive failure count, degraded/recovered log events at FAILURE_THRESHOLD (3) and on first successful contact after degradation - Gossip-lite mesh discovery: each sync pass calls /internal/peers on every known peer and adds newly discovered node URLs automatically; a linear chain of initial connections is sufficient to form a full mesh - /internal/peers endpoint returns the node's live peer URL list - Support DADNS_PEER_SYNC_PEER_N_URL/USERNAME/PASSWORD numbered env vars for multi-peer env-var-only deployments (up to 9); original single-peer DADNS_PEER_SYNC_PEER_URL retained for backward compatibility --- directdnsonly/app/api/internal.py | 21 +++- directdnsonly/app/peer_sync.py | 161 ++++++++++++++++++++++++++---- directdnsonly/config/__init__.py | 2 + directdnsonly/main.py | 20 +++- tests/test_peer_sync.py | 112 +++++++++++++++++++++ 5 files changed, 293 insertions(+), 23 deletions(-) diff --git a/directdnsonly/app/api/internal.py b/directdnsonly/app/api/internal.py index f7598bb..0afad6a 100644 --- a/directdnsonly/app/api/internal.py +++ b/directdnsonly/app/api/internal.py @@ -7,14 +7,19 @@ from directdnsonly.app.db.models import Domain class InternalAPI: - """Peer-to-peer zone_data exchange endpoint. + """Peer-to-peer zone_data exchange endpoints. Used by PeerSyncWorker to replicate zone_data between directdnsonly instances so each node can independently heal its local backends. - All routes require the same basic auth as the main API. + All routes require peer_sync basic auth credentials, which are + configured separately from the main DirectAdmin-facing credentials + (peer_sync.auth_username / peer_sync.auth_password). """ + def __init__(self, peer_syncer=None): + self._peer_syncer = peer_syncer + @cherrypy.expose def zones(self, domain=None): """Return zone metadata or zone_data for a specific domain. @@ -77,3 +82,15 @@ class InternalAPI: return json.dumps({"error": "internal server error"}).encode() finally: session.close() + + @cherrypy.expose + def peers(self): + """Return the list of peer URLs this node knows about. + + GET /internal/peers + Returns a JSON array of URL strings. Used by other nodes during + sync to discover new cluster members (gossip-lite mesh expansion). + """ + cherrypy.response.headers["Content-Type"] = "application/json" + urls = self._peer_syncer.get_peer_urls() if self._peer_syncer else [] + return json.dumps(urls).encode() diff --git a/directdnsonly/app/peer_sync.py b/directdnsonly/app/peer_sync.py index ad70b9b..944e2af 100644 --- a/directdnsonly/app/peer_sync.py +++ b/directdnsonly/app/peer_sync.py @@ -2,21 +2,34 @@ """Peer sync worker — exchanges zone_data between directdnsonly instances. Each node stores zone_data in its local SQLite DB after every successful -backend write. When DirectAdmin pushes a zone to one node but the other +backend write. When DirectAdmin pushes a zone to one node but another is temporarily offline, the offline node misses that zone_data. PeerSyncWorker corrects this by periodically comparing zone lists with -configured peers and fetching any zone_data that is newer or absent locally. +all known peers and fetching any zone_data that is newer or absent locally. It only updates the local DB — it never writes directly to backends. The existing reconciler healing pass then detects missing zones and re-pushes using the freshly synced zone_data. +Mesh behaviour: +- Each node exposes /internal/peers listing the URLs it knows about +- During each sync pass, every peer is asked for its peer list; any URLs + not already known are added automatically (gossip-lite discovery) +- A three-node cluster therefore only needs a linear chain of initial + connections — nodes propagate awareness of each other on the first pass + +Health tracking: +- Consecutive failures per peer are counted; after FAILURE_THRESHOLD + misses the peer is marked degraded and a warning is logged once +- On the next successful contact the peer is marked recovered + Safety properties: -- If a peer is unreachable, skip it silently and retry next interval +- If a peer is unreachable, skip it and try next interval - Only zone_data is synced — backend writes remain the sole responsibility of the local save queue worker - Newer zone_updated_at timestamp wins; local data is never overwritten with older peer data +- Peer discovery is best-effort and never fails a sync pass """ import datetime import os @@ -28,6 +41,9 @@ from sqlalchemy import select from directdnsonly.app.db import connect from directdnsonly.app.db.models import Domain +# Consecutive failures before a peer is logged as degraded +FAILURE_THRESHOLD = 3 + class PeerSyncWorker: """Periodically fetches zone_data from peer directdnsonly instances and @@ -39,23 +55,56 @@ class PeerSyncWorker: self.interval_seconds = peer_sync_config.get("interval_minutes", 15) * 60 self.peers = list(peer_sync_config.get("peers") or []) - # Support single-peer config via env vars for env-var-only deployments. - # DADNS_PEER_SYNC_PEER_URL, DADNS_PEER_SYNC_PEER_USERNAME, DADNS_PEER_SYNC_PEER_PASSWORD - env_url = os.environ.get("DADNS_PEER_SYNC_PEER_URL", "").strip() - if env_url and not any(p.get("url") == env_url for p in self.peers): - self.peers.append( - { - "url": env_url, - "username": os.environ.get( - "DADNS_PEER_SYNC_PEER_USERNAME", "directdnsonly" - ), - "password": os.environ.get("DADNS_PEER_SYNC_PEER_PASSWORD", ""), - } - ) - logger.debug(f"[peer_sync] Added peer from env vars: {env_url}") + # Per-peer health state: url -> {consecutive_failures, healthy, last_seen} + self._peer_health: dict = {} + + # ---------------------------------------------------------------- + # Env-var peer injection + # ---------------------------------------------------------------- + # Original single-peer vars (backward compat): + # DADNS_PEER_SYNC_PEER_URL / _USERNAME / _PASSWORD + # Numbered multi-peer vars (new): + # DADNS_PEER_SYNC_PEER_1_URL / _USERNAME / _PASSWORD + # DADNS_PEER_SYNC_PEER_2_URL / ... (up to 9) + known_urls = {p.get("url") for p in self.peers} + + env_candidates = [] + + single_url = os.environ.get("DADNS_PEER_SYNC_PEER_URL", "").strip() + if single_url: + env_candidates.append({ + "url": single_url, + "username": os.environ.get("DADNS_PEER_SYNC_PEER_USERNAME", "peersync"), + "password": os.environ.get("DADNS_PEER_SYNC_PEER_PASSWORD", ""), + }) + + for i in range(1, 10): + numbered_url = os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_URL", "").strip() + if not numbered_url: + break + env_candidates.append({ + "url": numbered_url, + "username": os.environ.get( + f"DADNS_PEER_SYNC_PEER_{i}_USERNAME", "peersync" + ), + "password": os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_PASSWORD", ""), + }) + + for candidate in env_candidates: + if candidate["url"] not in known_urls: + self.peers.append(candidate) + known_urls.add(candidate["url"]) + logger.debug( + f"[peer_sync] Added peer from env vars: {candidate['url']}" + ) + self._stop_event = threading.Event() self._thread = None + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + def start(self): if not self.enabled: logger.info("Peer sync disabled — skipping") @@ -86,6 +135,46 @@ class PeerSyncWorker: def is_alive(self): return self._thread is not None and self._thread.is_alive() + def get_peer_urls(self) -> list: + """Return the current list of known peer URLs. + Exposed via /internal/peers so other nodes can discover this node's mesh.""" + return [p["url"] for p in self.peers if p.get("url")] + + # ------------------------------------------------------------------ + # Health tracking + # ------------------------------------------------------------------ + + def _health(self, url: str) -> dict: + return self._peer_health.setdefault( + url, {"consecutive_failures": 0, "healthy": True, "last_seen": None} + ) + + def _record_success(self, url: str): + h = self._health(url) + recovered = not h["healthy"] + h.update( + consecutive_failures=0, + healthy=True, + last_seen=datetime.datetime.utcnow(), + ) + if recovered: + logger.info(f"[peer_sync] {url}: peer recovered") + + def _record_failure(self, url: str, exc): + h = self._health(url) + h["consecutive_failures"] += 1 + if h["healthy"] and h["consecutive_failures"] >= FAILURE_THRESHOLD: + h["healthy"] = False + logger.warning( + f"[peer_sync] {url}: marked degraded after {FAILURE_THRESHOLD} " + f"consecutive failures — {exc}" + ) + else: + logger.debug( + f"[peer_sync] {url}: unreachable " + f"(failure #{h['consecutive_failures']}) — {exc}" + ) + # ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ @@ -98,15 +187,49 @@ class PeerSyncWorker: def _sync_all(self): logger.debug(f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)") - for peer in self.peers: + # Iterate over a snapshot — _discover_peers_from may grow self.peers + for peer in list(self.peers): url = peer.get("url") if not url: logger.warning("[peer_sync] Peer config missing url — skipping") continue try: self._sync_from_peer(peer) + self._discover_peers_from(peer) + self._record_success(url) except Exception as exc: - logger.warning(f"[peer_sync] Skipping unreachable peer {url}: {exc}") + self._record_failure(url, exc) + + def _discover_peers_from(self, peer: dict): + """Fetch peer's known peer list and add any new nodes for mesh expansion. + + This is best-effort — failures are silently swallowed so they never + interrupt the main sync pass.""" + url = peer.get("url", "").rstrip("/") + username = peer.get("username") + password = peer.get("password") + auth = (username, password) if username else None + try: + resp = requests.get(f"{url}/internal/peers", auth=auth, timeout=5) + if resp.status_code != 200: + return + remote_urls = resp.json() # list of URL strings + known_urls = {p.get("url") for p in self.peers} + for remote_url in remote_urls: + if remote_url and remote_url not in known_urls: + # Inherit credentials from the introducing peer — in practice + # all cluster nodes share the same peer_sync auth credentials. + self.peers.append({ + "url": remote_url, + "username": username, + "password": password, + }) + known_urls.add(remote_url) + logger.info( + f"[peer_sync] Discovered new peer {remote_url} via {url}" + ) + except Exception: + pass # discovery is best-effort def _sync_from_peer(self, peer: dict): url = peer.get("url", "").rstrip("/") diff --git a/directdnsonly/config/__init__.py b/directdnsonly/config/__init__.py index 992022f..eec4342 100644 --- a/directdnsonly/config/__init__.py +++ b/directdnsonly/config/__init__.py @@ -69,6 +69,8 @@ def load_config() -> Vyper: # Peer sync defaults v.set_default("peer_sync.enabled", False) v.set_default("peer_sync.interval_minutes", 15) + v.set_default("peer_sync.auth_username", "peersync") + v.set_default("peer_sync.auth_password", "changeme") # Read configuration try: diff --git a/directdnsonly/main.py b/directdnsonly/main.py index 65ccd83..e6ff3cc 100644 --- a/directdnsonly/main.py +++ b/directdnsonly/main.py @@ -90,6 +90,17 @@ def main(): if config.get_string("app.log_level").upper() != "DEBUG": cherrypy.log.access_log.propagate = False + # Peer sync auth — separate credentials from the DA-facing API so a + # compromised peer node cannot push zones or access the admin endpoints. + peer_user_password_dict = { + config.get_string("peer_sync.auth_username"): config.get_string( + "peer_sync.auth_password" + ) + } + peer_check_password = cherrypy.lib.auth_basic.checkpassword_dict( + peer_user_password_dict + ) + # Mount applications root = Root() root = DNSAdminAPI( @@ -98,12 +109,17 @@ def main(): backend_registry=registry, ) root.health = HealthAPI(registry) - root.internal = InternalAPI() + root.internal = InternalAPI(peer_syncer=worker_manager._peer_syncer) # Add queue status endpoint root.queue_status = lambda: worker_manager.queue_status() - cherrypy.tree.mount(root, "/") + # Override auth for /internal so peers use their own credentials + cherrypy.tree.mount(root, "/", config={ + "/internal": { + "tools.auth_basic.checkpassword": peer_check_password, + } + }) cherrypy.engine.start() logger.success(f"Server started on port {config.get_int('app.listen_port')}") diff --git a/tests/test_peer_sync.py b/tests/test_peer_sync.py index f004670..b9f570d 100644 --- a/tests/test_peer_sync.py +++ b/tests/test_peer_sync.py @@ -79,6 +79,118 @@ def test_env_peer_not_duplicated_when_also_in_config(monkeypatch): assert urls.count("http://ddo-2:2222") == 1 +def test_numbered_env_peers(monkeypatch): + """DADNS_PEER_SYNC_PEER_1_URL and _2_URL add multiple peers.""" + monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://node-a:2222") + monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_USERNAME", "peersync") + monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_PASSWORD", "s3cr3t") + monkeypatch.setenv("DADNS_PEER_SYNC_PEER_2_URL", "http://node-b:2222") + worker = PeerSyncWorker({"enabled": True}) + urls = [p["url"] for p in worker.peers] + assert "http://node-a:2222" in urls + assert "http://node-b:2222" in urls + assert len(urls) == 2 + + +def test_numbered_env_peers_not_duplicated(monkeypatch): + """Numbered env var peers are deduplicated against the config file list.""" + monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://ddo-2:2222") + worker = PeerSyncWorker(BASE_CONFIG) + urls = [p["url"] for p in worker.peers] + assert urls.count("http://ddo-2:2222") == 1 + + +def test_get_peer_urls(): + worker = PeerSyncWorker(BASE_CONFIG) + assert worker.get_peer_urls() == ["http://ddo-2:2222"] + + +# --------------------------------------------------------------------------- +# Health tracking +# --------------------------------------------------------------------------- + + +def test_peer_health_starts_healthy(): + worker = PeerSyncWorker(BASE_CONFIG) + h = worker._health("http://ddo-2:2222") + assert h["healthy"] is True + assert h["consecutive_failures"] == 0 + + +def test_record_failure_increments_count(): + worker = PeerSyncWorker(BASE_CONFIG) + worker._record_failure("http://ddo-2:2222", ConnectionError("down")) + assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 1 + assert worker._health("http://ddo-2:2222")["healthy"] is True + + +def test_record_failure_marks_degraded_at_threshold(): + from directdnsonly.app.peer_sync import FAILURE_THRESHOLD + worker = PeerSyncWorker(BASE_CONFIG) + for _ in range(FAILURE_THRESHOLD): + worker._record_failure("http://ddo-2:2222", ConnectionError("down")) + assert worker._health("http://ddo-2:2222")["healthy"] is False + + +def test_record_success_resets_health(): + from directdnsonly.app.peer_sync import FAILURE_THRESHOLD + worker = PeerSyncWorker(BASE_CONFIG) + for _ in range(FAILURE_THRESHOLD): + worker._record_failure("http://ddo-2:2222", ConnectionError("down")) + assert not worker._health("http://ddo-2:2222")["healthy"] + worker._record_success("http://ddo-2:2222") + assert worker._health("http://ddo-2:2222")["healthy"] is True + assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 0 + + +# --------------------------------------------------------------------------- +# Peer discovery (_discover_peers_from) +# --------------------------------------------------------------------------- + + +def test_discover_peers_adds_new_peer(monkeypatch): + """New peer URL returned by /internal/peers is added to the peer list.""" + worker = PeerSyncWorker(BASE_CONFIG) + + def mock_get(url, auth=None, timeout=10, params=None): + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = ["http://node-c:2222"] + return resp + + monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) + worker._discover_peers_from(BASE_CONFIG["peers"][0]) + urls = [p["url"] for p in worker.peers] + assert "http://node-c:2222" in urls + + +def test_discover_peers_skips_known(monkeypatch): + """Already-known peer URLs are not re-added.""" + worker = PeerSyncWorker(BASE_CONFIG) + + def mock_get(url, auth=None, timeout=10, params=None): + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = ["http://ddo-2:2222"] # already known + return resp + + monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) + worker._discover_peers_from(BASE_CONFIG["peers"][0]) + assert len(worker.peers) == 1 # unchanged + + +def test_discover_peers_tolerates_failure(monkeypatch): + """Network error during discovery does not propagate.""" + worker = PeerSyncWorker(BASE_CONFIG) + + def mock_get(*args, **kwargs): + raise ConnectionError("peer down") + + monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) + # Should not raise + worker._discover_peers_from(BASE_CONFIG["peers"][0]) + + def test_start_skips_when_disabled(caplog): worker = PeerSyncWorker({"enabled": False}) worker.start()