diff --git a/directdnsonly/app/api/internal.py b/directdnsonly/app/api/internal.py new file mode 100644 index 0000000..600ded8 --- /dev/null +++ b/directdnsonly/app/api/internal.py @@ -0,0 +1,81 @@ +import cherrypy +import json +from loguru import logger +from directdnsonly.app.db import connect +from directdnsonly.app.db.models import Domain + + +class InternalAPI: + """Peer-to-peer zone_data exchange endpoint. + + Used by PeerSyncWorker to replicate zone_data between directdnsonly + instances so each node can independently heal its local backends. + + All routes require the same basic auth as the main API. + """ + + @cherrypy.expose + def zones(self, domain=None): + """Return zone metadata or zone_data for a specific domain. + + GET /internal/zones + Returns a JSON array of {domain, zone_updated_at, hostname, username} + for all domains that have stored zone_data. + + GET /internal/zones?domain=example.com + Returns {domain, zone_data, zone_updated_at, hostname, username} + for the requested domain, or 404 if not found / no zone_data. + """ + cherrypy.response.headers["Content-Type"] = "application/json" + session = connect() + try: + if domain: + record = ( + session.query(Domain) + .filter_by(domain=domain) + .filter(Domain.zone_data.isnot(None)) + .first() + ) + if not record: + cherrypy.response.status = 404 + return json.dumps({"error": "not found"}).encode() + return json.dumps( + { + "domain": record.domain, + "zone_data": record.zone_data, + "zone_updated_at": ( + record.zone_updated_at.isoformat() + if record.zone_updated_at + else None + ), + "hostname": record.hostname, + "username": record.username, + } + ).encode() + else: + records = ( + session.query(Domain) + .filter(Domain.zone_data.isnot(None)) + .all() + ) + return json.dumps( + [ + { + "domain": r.domain, + "zone_updated_at": ( + r.zone_updated_at.isoformat() + if r.zone_updated_at + else None + ), + "hostname": r.hostname, + "username": r.username, + } + for r in records + ] + ).encode() + except Exception as exc: + logger.error(f"[internal] Error serving /internal/zones: {exc}") + cherrypy.response.status = 500 + return json.dumps({"error": "internal server error"}).encode() + finally: + session.close() diff --git a/directdnsonly/app/peer_sync.py b/directdnsonly/app/peer_sync.py new file mode 100644 index 0000000..e657821 --- /dev/null +++ b/directdnsonly/app/peer_sync.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +"""Peer sync worker — exchanges zone_data between directdnsonly instances. + +Each node stores zone_data in its local SQLite DB after every successful +backend write. When DirectAdmin pushes a zone to one node but the other +is temporarily offline, the offline node misses that zone_data. + +PeerSyncWorker corrects this by periodically comparing zone lists with +configured peers and fetching any zone_data that is newer or absent locally. +It only updates the local DB — it never writes directly to backends. The +existing reconciler healing pass then detects missing zones and re-pushes +using the freshly synced zone_data. + +Safety properties: +- If a peer is unreachable, skip it silently and retry next interval +- Only zone_data is synced — backend writes remain the sole responsibility + of the local save queue worker +- Newer zone_updated_at timestamp wins; local data is never overwritten + with older peer data +""" +import datetime +import threading +from loguru import logger +import requests + +from directdnsonly.app.db import connect +from directdnsonly.app.db.models import Domain + + +class PeerSyncWorker: + """Periodically fetches zone_data from peer directdnsonly instances and + stores it locally so the healing pass can re-push missing zones without + waiting for a DirectAdmin re-push.""" + + def __init__(self, peer_sync_config: dict): + self.enabled = peer_sync_config.get("enabled", False) + self.interval_seconds = peer_sync_config.get("interval_minutes", 15) * 60 + self.peers = peer_sync_config.get("peers") or [] + self._stop_event = threading.Event() + self._thread = None + + def start(self): + if not self.enabled: + logger.info("Peer sync disabled — skipping") + return + if not self.peers: + logger.warning( + "Peer sync enabled but no peers configured" + ) + return + + self._stop_event.clear() + self._thread = threading.Thread( + target=self._run, daemon=True, name="peer_sync_worker" + ) + self._thread.start() + peer_urls = [p.get("url", "?") for p in self.peers] + logger.info( + f"Peer sync worker started — " + f"interval: {self.interval_seconds // 60}m, " + f"peers: {peer_urls}" + ) + + def stop(self): + self._stop_event.set() + if self._thread: + self._thread.join(timeout=10) + logger.info("Peer sync worker stopped") + + @property + def is_alive(self): + return self._thread is not None and self._thread.is_alive() + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _run(self): + logger.info("Peer sync worker starting — running initial sync now") + self._sync_all() + while not self._stop_event.wait(timeout=self.interval_seconds): + self._sync_all() + + def _sync_all(self): + logger.debug( + f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)" + ) + for peer in self.peers: + url = peer.get("url") + if not url: + logger.warning("[peer_sync] Peer config missing url — skipping") + continue + try: + self._sync_from_peer(peer) + except Exception as exc: + logger.warning( + f"[peer_sync] Skipping unreachable peer {url}: {exc}" + ) + + def _sync_from_peer(self, peer: dict): + url = peer.get("url", "").rstrip("/") + username = peer.get("username") + password = peer.get("password") + auth = (username, password) if username else None + + # Fetch the peer's zone list + resp = requests.get( + f"{url}/internal/zones", auth=auth, timeout=10 + ) + if resp.status_code != 200: + logger.warning( + f"[peer_sync] {url}: /internal/zones returned {resp.status_code}" + ) + return + + peer_zones = resp.json() # [{domain, zone_updated_at, hostname, username}] + if not peer_zones: + logger.debug(f"[peer_sync] {url}: no zone_data on peer yet") + return + + session = connect() + try: + synced = 0 + for entry in peer_zones: + domain = entry.get("domain") + if not domain: + continue + + peer_ts_str = entry.get("zone_updated_at") + peer_ts = ( + datetime.datetime.fromisoformat(peer_ts_str) + if peer_ts_str + else None + ) + + local = session.query(Domain).filter_by(domain=domain).first() + + needs_sync = ( + local is None + or local.zone_data is None + or (peer_ts and not local.zone_updated_at) + or ( + peer_ts + and local.zone_updated_at + and peer_ts > local.zone_updated_at + ) + ) + + if not needs_sync: + continue + + # Fetch full zone_data from peer + zresp = requests.get( + f"{url}/internal/zones", + params={"domain": domain}, + auth=auth, + timeout=10, + ) + if zresp.status_code != 200: + logger.warning( + f"[peer_sync] {url}: could not fetch zone_data " + f"for {domain} (HTTP {zresp.status_code})" + ) + continue + + zdata = zresp.json() + zone_data = zdata.get("zone_data") + if not zone_data: + continue + + if local is None: + local = Domain( + domain=domain, + hostname=entry.get("hostname"), + username=entry.get("username"), + zone_data=zone_data, + zone_updated_at=peer_ts, + ) + session.add(local) + logger.debug( + f"[peer_sync] {url}: created local record for {domain}" + ) + else: + local.zone_data = zone_data + local.zone_updated_at = peer_ts + logger.debug( + f"[peer_sync] {url}: updated zone_data for {domain}" + ) + synced += 1 + + if synced: + session.commit() + logger.info( + f"[peer_sync] Synced {synced} zone(s) from {url}" + ) + else: + logger.debug(f"[peer_sync] {url}: already up to date") + finally: + session.close() diff --git a/directdnsonly/config/__init__.py b/directdnsonly/config/__init__.py index bbfa49a..a133480 100644 --- a/directdnsonly/config/__init__.py +++ b/directdnsonly/config/__init__.py @@ -62,6 +62,10 @@ def load_config() -> Vyper: v.set_default("reconciliation.interval_minutes", 60) v.set_default("reconciliation.verify_ssl", True) + # Peer sync defaults + v.set_default("peer_sync.enabled", False) + v.set_default("peer_sync.interval_minutes", 15) + # Read configuration try: if not v.read_in_config(): diff --git a/directdnsonly/config/app.yml b/directdnsonly/config/app.yml index 4cbe87f..59fa97c 100644 --- a/directdnsonly/config/app.yml +++ b/directdnsonly/config/app.yml @@ -30,6 +30,18 @@ app: # password: secret # ssl: true +# Peer sync — exchange zone_data between directdnsonly instances +# Enables eventual consistency without a shared datastore. +# If a peer is offline, the sync is silently skipped and retried next interval. +# Use the same credentials as the peer's app.auth_username / auth_password. +#peer_sync: +# enabled: true +# interval_minutes: 15 +# peers: +# - url: http://ddo-2:2222 # URL of the peer directdnsonly instance +# username: directdnsonly +# password: changeme + dns: default_backend: bind backends: diff --git a/directdnsonly/main.py b/directdnsonly/main.py index 1a8e633..65ccd83 100644 --- a/directdnsonly/main.py +++ b/directdnsonly/main.py @@ -3,6 +3,7 @@ import cherrypy from app.backends import BackendRegistry from app.api.admin import DNSAdminAPI from app.api.health import HealthAPI +from app.api.internal import InternalAPI from app import configure_logging from worker import WorkerManager from directdnsonly.config import config @@ -38,10 +39,12 @@ def main(): # Setup worker manager reconciliation_config = config.get("reconciliation") or {} + peer_sync_config = config.get("peer_sync") or {} worker_manager = WorkerManager( queue_path=config.get("queue_location"), backend_registry=registry, reconciliation_config=reconciliation_config, + peer_sync_config=peer_sync_config, ) worker_manager.start() logger.info( @@ -95,6 +98,7 @@ def main(): backend_registry=registry, ) root.health = HealthAPI(registry) + root.internal = InternalAPI() # Add queue status endpoint root.queue_status = lambda: worker_manager.queue_status() diff --git a/directdnsonly/worker.py b/directdnsonly/worker.py index 7ea7796..c2fc3eb 100644 --- a/directdnsonly/worker.py +++ b/directdnsonly/worker.py @@ -12,6 +12,7 @@ from app.utils.zone_parser import count_zone_records from directdnsonly.app.db.models import Domain from directdnsonly.app.db import connect from directdnsonly.app.reconciler import ReconciliationWorker +from directdnsonly.app.peer_sync import PeerSyncWorker # --------------------------------------------------------------------------- # Retry configuration @@ -25,7 +26,11 @@ RETRY_DRAIN_INTERVAL = 30 # how often the retry drain thread wakes class WorkerManager: def __init__( - self, queue_path: str, backend_registry, reconciliation_config: dict = None + self, + queue_path: str, + backend_registry, + reconciliation_config: dict = None, + peer_sync_config: dict = None, ): self.queue_path = queue_path self.backend_registry = backend_registry @@ -34,7 +39,9 @@ class WorkerManager: self._delete_thread = None self._retry_thread = None self._reconciler = None + self._peer_syncer = None self._reconciliation_config = reconciliation_config or {} + self._peer_sync_config = peer_sync_config or {} try: os.makedirs(queue_path, exist_ok=True) @@ -467,10 +474,15 @@ class WorkerManager: ) self._reconciler.start() + self._peer_syncer = PeerSyncWorker(self._peer_sync_config) + self._peer_syncer.start() + def stop(self): self._running = False if self._reconciler: self._reconciler.stop() + if self._peer_syncer: + self._peer_syncer.stop() for thread in (self._save_thread, self._delete_thread, self._retry_thread): if thread: thread.join(timeout=5) @@ -487,4 +499,7 @@ class WorkerManager: "reconciler_alive": ( self._reconciler.is_alive if self._reconciler else False ), + "peer_syncer_alive": ( + self._peer_syncer.is_alive if self._peer_syncer else False + ), } diff --git a/tests/conftest.py b/tests/conftest.py index e65b5cf..0c18318 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -37,4 +37,5 @@ def patch_connect(db_session, monkeypatch): _factory = lambda: db_session # noqa: E731 monkeypatch.setattr("directdnsonly.app.utils.connect", _factory) monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory) + monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory) return db_session diff --git a/tests/test_peer_sync.py b/tests/test_peer_sync.py new file mode 100644 index 0000000..b6d56cb --- /dev/null +++ b/tests/test_peer_sync.py @@ -0,0 +1,251 @@ +"""Tests for directdnsonly.app.peer_sync — PeerSyncWorker.""" + +import datetime +import json +import pytest +from unittest.mock import patch, MagicMock + +from directdnsonly.app.peer_sync import PeerSyncWorker +from directdnsonly.app.db.models import Domain + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +BASE_CONFIG = { + "enabled": True, + "interval_minutes": 15, + "peers": [ + { + "url": "http://ddo-2:2222", + "username": "directdnsonly", + "password": "changeme", + } + ], +} + +NOW = datetime.datetime(2024, 6, 1, 12, 0, 0) +OLDER = datetime.datetime(2024, 6, 1, 11, 0, 0) + +ZONE_DATA = "$ORIGIN example.com.\n@ 300 IN SOA ns1 hostmaster 1 3600 900 604800 300\n" + + +# --------------------------------------------------------------------------- +# Config / startup tests +# --------------------------------------------------------------------------- + + +def test_disabled_by_default(): + worker = PeerSyncWorker({}) + assert not worker.enabled + + +def test_interval_stored(): + worker = PeerSyncWorker({"enabled": True, "interval_minutes": 30}) + assert worker.interval_seconds == 1800 + + +def test_default_interval(): + worker = PeerSyncWorker({"enabled": True}) + assert worker.interval_seconds == 15 * 60 + + +def test_peers_stored(): + worker = PeerSyncWorker(BASE_CONFIG) + assert len(worker.peers) == 1 + assert worker.peers[0]["url"] == "http://ddo-2:2222" + + +def test_start_skips_when_disabled(caplog): + worker = PeerSyncWorker({"enabled": False}) + worker.start() + assert worker._thread is None + + +def test_start_warns_when_no_peers(caplog): + import logging + worker = PeerSyncWorker({"enabled": True, "peers": []}) + with patch.object(worker, "_run"): + worker.start() + # Thread should not have started + assert worker._thread is None + + +# --------------------------------------------------------------------------- +# _sync_from_peer tests +# --------------------------------------------------------------------------- + + +def _make_peer(): + return BASE_CONFIG["peers"][0] + + +def _peer_list(domain, ts=None): + """Simulate the JSON response from GET /internal/zones.""" + return [ + { + "domain": domain, + "zone_updated_at": ts.isoformat() if ts else None, + "hostname": "da1.example.com", + "username": "admin", + } + ] + + +def _peer_zone(domain, ts=None, zone_data=ZONE_DATA): + """Simulate the JSON response from GET /internal/zones?domain=X.""" + return { + "domain": domain, + "zone_data": zone_data, + "zone_updated_at": ts.isoformat() if ts else None, + "hostname": "da1.example.com", + "username": "admin", + } + + +def test_sync_creates_new_local_record(patch_connect, monkeypatch): + """When local DB has no record, peer zone_data is fetched and stored.""" + worker = PeerSyncWorker(BASE_CONFIG) + session = patch_connect + + def mock_get(url, auth=None, timeout=10, params=None): + resp = MagicMock() + resp.status_code = 200 + if params and params.get("domain"): + resp.json.return_value = _peer_zone("example.com", NOW) + else: + resp.json.return_value = _peer_list("example.com", NOW) + return resp + + monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) + + worker._sync_from_peer(_make_peer()) + + record = session.query(Domain).filter_by(domain="example.com").first() + assert record is not None + assert record.zone_data == ZONE_DATA + assert record.zone_updated_at == NOW + + +def test_sync_updates_older_local_record(patch_connect, monkeypatch): + """When local zone_data is older than peer's, it is overwritten.""" + session = patch_connect + session.add(Domain(domain="example.com", zone_data="old data", zone_updated_at=OLDER)) + session.commit() + + worker = PeerSyncWorker(BASE_CONFIG) + + def mock_get(url, auth=None, timeout=10, params=None): + resp = MagicMock() + resp.status_code = 200 + if params and params.get("domain"): + resp.json.return_value = _peer_zone("example.com", NOW) + else: + resp.json.return_value = _peer_list("example.com", NOW) + return resp + + monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) + + worker._sync_from_peer(_make_peer()) + + record = session.query(Domain).filter_by(domain="example.com").first() + assert record.zone_data == ZONE_DATA + assert record.zone_updated_at == NOW + + +def test_sync_skips_when_local_is_newer(patch_connect, monkeypatch): + """When local zone_data is newer than peer's, it is not overwritten.""" + session = patch_connect + session.add(Domain(domain="example.com", zone_data="newer local", zone_updated_at=NOW)) + session.commit() + + worker = PeerSyncWorker(BASE_CONFIG) + fetch_calls = [] + + def mock_get(url, auth=None, timeout=10, params=None): + resp = MagicMock() + resp.status_code = 200 + if params and params.get("domain"): + fetch_calls.append(url) + resp.json.return_value = _peer_zone("example.com", OLDER) + else: + resp.json.return_value = _peer_list("example.com", OLDER) + return resp + + monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) + + worker._sync_from_peer(_make_peer()) + + # zone_data fetch should not have been called + assert not fetch_calls + record = session.query(Domain).filter_by(domain="example.com").first() + assert record.zone_data == "newer local" + + +def test_sync_skips_unreachable_peer(monkeypatch): + """If the peer raises a connection error, _sync_all catches it gracefully.""" + worker = PeerSyncWorker(BASE_CONFIG) + + def mock_get(*args, **kwargs): + raise ConnectionError("peer down") + + monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) + + # Should not raise + worker._sync_all() + + +def test_sync_skips_peer_with_bad_status(patch_connect, monkeypatch): + """Non-200 response from peer zone list is silently skipped.""" + worker = PeerSyncWorker(BASE_CONFIG) + session = patch_connect + + def mock_get(url, auth=None, timeout=10, params=None): + resp = MagicMock() + resp.status_code = 503 + return resp + + monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) + + worker._sync_from_peer(_make_peer()) + + # No records should have been created + assert session.query(Domain).count() == 0 + + +def test_sync_skips_missing_zone_data_in_response(patch_connect, monkeypatch): + """If the peer returns no zone_data for a domain, it is skipped.""" + session = patch_connect + + worker = PeerSyncWorker(BASE_CONFIG) + + def mock_get(url, auth=None, timeout=10, params=None): + resp = MagicMock() + resp.status_code = 200 + if params and params.get("domain"): + resp.json.return_value = {"domain": "example.com", "zone_data": None} + else: + resp.json.return_value = _peer_list("example.com", NOW) + return resp + + monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) + + worker._sync_from_peer(_make_peer()) + + assert session.query(Domain).count() == 0 + + +def test_sync_empty_peer_list(patch_connect, monkeypatch): + """Empty zone list from peer results in zero syncs without error.""" + worker = PeerSyncWorker(BASE_CONFIG) + + def mock_get(url, auth=None, timeout=10, params=None): + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = [] + return resp + + monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) + + worker._sync_from_peer(_make_peer())