feat: mesh peer sync with health tracking and separate peer credentials 🔗

- Separate peer_sync.auth_username/password from the DA-facing credentials
  so /internal/* uses its own basic auth; a compromised peer cannot push
  zones or access the admin API
- Per-peer health tracking: consecutive failure count, degraded/recovered
  log events at FAILURE_THRESHOLD (3) and on first successful contact after
  degradation
- Gossip-lite mesh discovery: each sync pass calls /internal/peers on every
  known peer and adds newly discovered node URLs automatically; a linear
  chain of initial connections is sufficient to form a full mesh
- /internal/peers endpoint returns the node's live peer URL list
- Support DADNS_PEER_SYNC_PEER_N_URL/USERNAME/PASSWORD numbered env vars
  for multi-peer env-var-only deployments (up to 9); original single-peer
  DADNS_PEER_SYNC_PEER_URL retained for backward compatibility
This commit is contained in:
2026-02-25 16:08:26 +13:00
parent 0b31b75789
commit 3f6a061ffe
5 changed files with 293 additions and 23 deletions

View File

@@ -7,14 +7,19 @@ from directdnsonly.app.db.models import Domain
class InternalAPI: class InternalAPI:
"""Peer-to-peer zone_data exchange endpoint. """Peer-to-peer zone_data exchange endpoints.
Used by PeerSyncWorker to replicate zone_data between directdnsonly Used by PeerSyncWorker to replicate zone_data between directdnsonly
instances so each node can independently heal its local backends. instances so each node can independently heal its local backends.
All routes require the same basic auth as the main API. All routes require peer_sync basic auth credentials, which are
configured separately from the main DirectAdmin-facing credentials
(peer_sync.auth_username / peer_sync.auth_password).
""" """
def __init__(self, peer_syncer=None):
self._peer_syncer = peer_syncer
@cherrypy.expose @cherrypy.expose
def zones(self, domain=None): def zones(self, domain=None):
"""Return zone metadata or zone_data for a specific domain. """Return zone metadata or zone_data for a specific domain.
@@ -77,3 +82,15 @@ class InternalAPI:
return json.dumps({"error": "internal server error"}).encode() return json.dumps({"error": "internal server error"}).encode()
finally: finally:
session.close() session.close()
@cherrypy.expose
def peers(self):
"""Return the list of peer URLs this node knows about.
GET /internal/peers
Returns a JSON array of URL strings. Used by other nodes during
sync to discover new cluster members (gossip-lite mesh expansion).
"""
cherrypy.response.headers["Content-Type"] = "application/json"
urls = self._peer_syncer.get_peer_urls() if self._peer_syncer else []
return json.dumps(urls).encode()

View File

@@ -2,21 +2,34 @@
"""Peer sync worker — exchanges zone_data between directdnsonly instances. """Peer sync worker — exchanges zone_data between directdnsonly instances.
Each node stores zone_data in its local SQLite DB after every successful Each node stores zone_data in its local SQLite DB after every successful
backend write. When DirectAdmin pushes a zone to one node but the other backend write. When DirectAdmin pushes a zone to one node but another
is temporarily offline, the offline node misses that zone_data. is temporarily offline, the offline node misses that zone_data.
PeerSyncWorker corrects this by periodically comparing zone lists with PeerSyncWorker corrects this by periodically comparing zone lists with
configured peers and fetching any zone_data that is newer or absent locally. all known peers and fetching any zone_data that is newer or absent locally.
It only updates the local DB — it never writes directly to backends. The It only updates the local DB — it never writes directly to backends. The
existing reconciler healing pass then detects missing zones and re-pushes existing reconciler healing pass then detects missing zones and re-pushes
using the freshly synced zone_data. using the freshly synced zone_data.
Mesh behaviour:
- Each node exposes /internal/peers listing the URLs it knows about
- During each sync pass, every peer is asked for its peer list; any URLs
not already known are added automatically (gossip-lite discovery)
- A three-node cluster therefore only needs a linear chain of initial
connections — nodes propagate awareness of each other on the first pass
Health tracking:
- Consecutive failures per peer are counted; after FAILURE_THRESHOLD
misses the peer is marked degraded and a warning is logged once
- On the next successful contact the peer is marked recovered
Safety properties: Safety properties:
- If a peer is unreachable, skip it silently and retry next interval - If a peer is unreachable, skip it and try next interval
- Only zone_data is synced — backend writes remain the sole responsibility - Only zone_data is synced — backend writes remain the sole responsibility
of the local save queue worker of the local save queue worker
- Newer zone_updated_at timestamp wins; local data is never overwritten - Newer zone_updated_at timestamp wins; local data is never overwritten
with older peer data with older peer data
- Peer discovery is best-effort and never fails a sync pass
""" """
import datetime import datetime
import os import os
@@ -28,6 +41,9 @@ from sqlalchemy import select
from directdnsonly.app.db import connect from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain from directdnsonly.app.db.models import Domain
# Consecutive failures before a peer is logged as degraded
FAILURE_THRESHOLD = 3
class PeerSyncWorker: class PeerSyncWorker:
"""Periodically fetches zone_data from peer directdnsonly instances and """Periodically fetches zone_data from peer directdnsonly instances and
@@ -39,23 +55,56 @@ class PeerSyncWorker:
self.interval_seconds = peer_sync_config.get("interval_minutes", 15) * 60 self.interval_seconds = peer_sync_config.get("interval_minutes", 15) * 60
self.peers = list(peer_sync_config.get("peers") or []) self.peers = list(peer_sync_config.get("peers") or [])
# Support single-peer config via env vars for env-var-only deployments. # Per-peer health state: url -> {consecutive_failures, healthy, last_seen}
# DADNS_PEER_SYNC_PEER_URL, DADNS_PEER_SYNC_PEER_USERNAME, DADNS_PEER_SYNC_PEER_PASSWORD self._peer_health: dict = {}
env_url = os.environ.get("DADNS_PEER_SYNC_PEER_URL", "").strip()
if env_url and not any(p.get("url") == env_url for p in self.peers): # ----------------------------------------------------------------
self.peers.append( # Env-var peer injection
{ # ----------------------------------------------------------------
"url": env_url, # Original single-peer vars (backward compat):
"username": os.environ.get( # DADNS_PEER_SYNC_PEER_URL / _USERNAME / _PASSWORD
"DADNS_PEER_SYNC_PEER_USERNAME", "directdnsonly" # Numbered multi-peer vars (new):
), # DADNS_PEER_SYNC_PEER_1_URL / _USERNAME / _PASSWORD
# DADNS_PEER_SYNC_PEER_2_URL / ... (up to 9)
known_urls = {p.get("url") for p in self.peers}
env_candidates = []
single_url = os.environ.get("DADNS_PEER_SYNC_PEER_URL", "").strip()
if single_url:
env_candidates.append({
"url": single_url,
"username": os.environ.get("DADNS_PEER_SYNC_PEER_USERNAME", "peersync"),
"password": os.environ.get("DADNS_PEER_SYNC_PEER_PASSWORD", ""), "password": os.environ.get("DADNS_PEER_SYNC_PEER_PASSWORD", ""),
} })
for i in range(1, 10):
numbered_url = os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_URL", "").strip()
if not numbered_url:
break
env_candidates.append({
"url": numbered_url,
"username": os.environ.get(
f"DADNS_PEER_SYNC_PEER_{i}_USERNAME", "peersync"
),
"password": os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_PASSWORD", ""),
})
for candidate in env_candidates:
if candidate["url"] not in known_urls:
self.peers.append(candidate)
known_urls.add(candidate["url"])
logger.debug(
f"[peer_sync] Added peer from env vars: {candidate['url']}"
) )
logger.debug(f"[peer_sync] Added peer from env vars: {env_url}")
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._thread = None self._thread = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def start(self): def start(self):
if not self.enabled: if not self.enabled:
logger.info("Peer sync disabled — skipping") logger.info("Peer sync disabled — skipping")
@@ -86,6 +135,46 @@ class PeerSyncWorker:
def is_alive(self): def is_alive(self):
return self._thread is not None and self._thread.is_alive() return self._thread is not None and self._thread.is_alive()
def get_peer_urls(self) -> list:
"""Return the current list of known peer URLs.
Exposed via /internal/peers so other nodes can discover this node's mesh."""
return [p["url"] for p in self.peers if p.get("url")]
# ------------------------------------------------------------------
# Health tracking
# ------------------------------------------------------------------
def _health(self, url: str) -> dict:
return self._peer_health.setdefault(
url, {"consecutive_failures": 0, "healthy": True, "last_seen": None}
)
def _record_success(self, url: str):
h = self._health(url)
recovered = not h["healthy"]
h.update(
consecutive_failures=0,
healthy=True,
last_seen=datetime.datetime.utcnow(),
)
if recovered:
logger.info(f"[peer_sync] {url}: peer recovered")
def _record_failure(self, url: str, exc):
h = self._health(url)
h["consecutive_failures"] += 1
if h["healthy"] and h["consecutive_failures"] >= FAILURE_THRESHOLD:
h["healthy"] = False
logger.warning(
f"[peer_sync] {url}: marked degraded after {FAILURE_THRESHOLD} "
f"consecutive failures — {exc}"
)
else:
logger.debug(
f"[peer_sync] {url}: unreachable "
f"(failure #{h['consecutive_failures']}) — {exc}"
)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Internal # Internal
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -98,15 +187,49 @@ class PeerSyncWorker:
def _sync_all(self): def _sync_all(self):
logger.debug(f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)") logger.debug(f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)")
for peer in self.peers: # Iterate over a snapshot — _discover_peers_from may grow self.peers
for peer in list(self.peers):
url = peer.get("url") url = peer.get("url")
if not url: if not url:
logger.warning("[peer_sync] Peer config missing url — skipping") logger.warning("[peer_sync] Peer config missing url — skipping")
continue continue
try: try:
self._sync_from_peer(peer) self._sync_from_peer(peer)
self._discover_peers_from(peer)
self._record_success(url)
except Exception as exc: except Exception as exc:
logger.warning(f"[peer_sync] Skipping unreachable peer {url}: {exc}") self._record_failure(url, exc)
def _discover_peers_from(self, peer: dict):
"""Fetch peer's known peer list and add any new nodes for mesh expansion.
This is best-effort — failures are silently swallowed so they never
interrupt the main sync pass."""
url = peer.get("url", "").rstrip("/")
username = peer.get("username")
password = peer.get("password")
auth = (username, password) if username else None
try:
resp = requests.get(f"{url}/internal/peers", auth=auth, timeout=5)
if resp.status_code != 200:
return
remote_urls = resp.json() # list of URL strings
known_urls = {p.get("url") for p in self.peers}
for remote_url in remote_urls:
if remote_url and remote_url not in known_urls:
# Inherit credentials from the introducing peer — in practice
# all cluster nodes share the same peer_sync auth credentials.
self.peers.append({
"url": remote_url,
"username": username,
"password": password,
})
known_urls.add(remote_url)
logger.info(
f"[peer_sync] Discovered new peer {remote_url} via {url}"
)
except Exception:
pass # discovery is best-effort
def _sync_from_peer(self, peer: dict): def _sync_from_peer(self, peer: dict):
url = peer.get("url", "").rstrip("/") url = peer.get("url", "").rstrip("/")

View File

@@ -69,6 +69,8 @@ def load_config() -> Vyper:
# Peer sync defaults # Peer sync defaults
v.set_default("peer_sync.enabled", False) v.set_default("peer_sync.enabled", False)
v.set_default("peer_sync.interval_minutes", 15) v.set_default("peer_sync.interval_minutes", 15)
v.set_default("peer_sync.auth_username", "peersync")
v.set_default("peer_sync.auth_password", "changeme")
# Read configuration # Read configuration
try: try:

View File

@@ -90,6 +90,17 @@ def main():
if config.get_string("app.log_level").upper() != "DEBUG": if config.get_string("app.log_level").upper() != "DEBUG":
cherrypy.log.access_log.propagate = False cherrypy.log.access_log.propagate = False
# Peer sync auth — separate credentials from the DA-facing API so a
# compromised peer node cannot push zones or access the admin endpoints.
peer_user_password_dict = {
config.get_string("peer_sync.auth_username"): config.get_string(
"peer_sync.auth_password"
)
}
peer_check_password = cherrypy.lib.auth_basic.checkpassword_dict(
peer_user_password_dict
)
# Mount applications # Mount applications
root = Root() root = Root()
root = DNSAdminAPI( root = DNSAdminAPI(
@@ -98,12 +109,17 @@ def main():
backend_registry=registry, backend_registry=registry,
) )
root.health = HealthAPI(registry) root.health = HealthAPI(registry)
root.internal = InternalAPI() root.internal = InternalAPI(peer_syncer=worker_manager._peer_syncer)
# Add queue status endpoint # Add queue status endpoint
root.queue_status = lambda: worker_manager.queue_status() root.queue_status = lambda: worker_manager.queue_status()
cherrypy.tree.mount(root, "/") # Override auth for /internal so peers use their own credentials
cherrypy.tree.mount(root, "/", config={
"/internal": {
"tools.auth_basic.checkpassword": peer_check_password,
}
})
cherrypy.engine.start() cherrypy.engine.start()
logger.success(f"Server started on port {config.get_int('app.listen_port')}") logger.success(f"Server started on port {config.get_int('app.listen_port')}")

View File

@@ -79,6 +79,118 @@ def test_env_peer_not_duplicated_when_also_in_config(monkeypatch):
assert urls.count("http://ddo-2:2222") == 1 assert urls.count("http://ddo-2:2222") == 1
def test_numbered_env_peers(monkeypatch):
"""DADNS_PEER_SYNC_PEER_1_URL and _2_URL add multiple peers."""
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://node-a:2222")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_USERNAME", "peersync")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_PASSWORD", "s3cr3t")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_2_URL", "http://node-b:2222")
worker = PeerSyncWorker({"enabled": True})
urls = [p["url"] for p in worker.peers]
assert "http://node-a:2222" in urls
assert "http://node-b:2222" in urls
assert len(urls) == 2
def test_numbered_env_peers_not_duplicated(monkeypatch):
"""Numbered env var peers are deduplicated against the config file list."""
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://ddo-2:2222")
worker = PeerSyncWorker(BASE_CONFIG)
urls = [p["url"] for p in worker.peers]
assert urls.count("http://ddo-2:2222") == 1
def test_get_peer_urls():
worker = PeerSyncWorker(BASE_CONFIG)
assert worker.get_peer_urls() == ["http://ddo-2:2222"]
# ---------------------------------------------------------------------------
# Health tracking
# ---------------------------------------------------------------------------
def test_peer_health_starts_healthy():
worker = PeerSyncWorker(BASE_CONFIG)
h = worker._health("http://ddo-2:2222")
assert h["healthy"] is True
assert h["consecutive_failures"] == 0
def test_record_failure_increments_count():
worker = PeerSyncWorker(BASE_CONFIG)
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 1
assert worker._health("http://ddo-2:2222")["healthy"] is True
def test_record_failure_marks_degraded_at_threshold():
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
worker = PeerSyncWorker(BASE_CONFIG)
for _ in range(FAILURE_THRESHOLD):
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
assert worker._health("http://ddo-2:2222")["healthy"] is False
def test_record_success_resets_health():
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
worker = PeerSyncWorker(BASE_CONFIG)
for _ in range(FAILURE_THRESHOLD):
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
assert not worker._health("http://ddo-2:2222")["healthy"]
worker._record_success("http://ddo-2:2222")
assert worker._health("http://ddo-2:2222")["healthy"] is True
assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 0
# ---------------------------------------------------------------------------
# Peer discovery (_discover_peers_from)
# ---------------------------------------------------------------------------
def test_discover_peers_adds_new_peer(monkeypatch):
"""New peer URL returned by /internal/peers is added to the peer list."""
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = ["http://node-c:2222"]
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._discover_peers_from(BASE_CONFIG["peers"][0])
urls = [p["url"] for p in worker.peers]
assert "http://node-c:2222" in urls
def test_discover_peers_skips_known(monkeypatch):
"""Already-known peer URLs are not re-added."""
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = ["http://ddo-2:2222"] # already known
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._discover_peers_from(BASE_CONFIG["peers"][0])
assert len(worker.peers) == 1 # unchanged
def test_discover_peers_tolerates_failure(monkeypatch):
"""Network error during discovery does not propagate."""
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(*args, **kwargs):
raise ConnectionError("peer down")
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
# Should not raise
worker._discover_peers_from(BASE_CONFIG["peers"][0])
def test_start_skips_when_disabled(caplog): def test_start_skips_when_disabled(caplog):
worker = PeerSyncWorker({"enabled": False}) worker = PeerSyncWorker({"enabled": False})
worker.start() worker.start()