feat: add peer sync worker for zone_data exchange between nodes 🔄

Adds optional peer-to-peer zone_data replication between directdnsonly
instances. Enables eventual consistency in DA Multi-Server topologies
without a shared datastore.

- InternalAPI: GET /internal/zones (list) and ?domain= (detail)
  exposes zone_data to peers via existing basic auth
- PeerSyncWorker: interval-based daemon thread that fetches zone_data
  from configured peers, storing newer entries locally; peer downtime
  is silently skipped and retried next interval
- WorkerManager: wires PeerSyncWorker alongside reconciler; exposes
  peer_syncer_alive in queue_status
- Config: peer_sync block with enabled/interval_minutes/peers[]
- Tests: 13 tests covering sync, skip-older, skip-unreachable, empty
  peer list, bad status, and missing zone_data scenarios
This commit is contained in:
2026-02-19 22:16:55 +13:00
parent 33f4f30b5f
commit 143cf9c792
8 changed files with 568 additions and 1 deletions

View File

@@ -0,0 +1,81 @@
import cherrypy
import json
from loguru import logger
from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain
class InternalAPI:
"""Peer-to-peer zone_data exchange endpoint.
Used by PeerSyncWorker to replicate zone_data between directdnsonly
instances so each node can independently heal its local backends.
All routes require the same basic auth as the main API.
"""
@cherrypy.expose
def zones(self, domain=None):
"""Return zone metadata or zone_data for a specific domain.
GET /internal/zones
Returns a JSON array of {domain, zone_updated_at, hostname, username}
for all domains that have stored zone_data.
GET /internal/zones?domain=example.com
Returns {domain, zone_data, zone_updated_at, hostname, username}
for the requested domain, or 404 if not found / no zone_data.
"""
cherrypy.response.headers["Content-Type"] = "application/json"
session = connect()
try:
if domain:
record = (
session.query(Domain)
.filter_by(domain=domain)
.filter(Domain.zone_data.isnot(None))
.first()
)
if not record:
cherrypy.response.status = 404
return json.dumps({"error": "not found"}).encode()
return json.dumps(
{
"domain": record.domain,
"zone_data": record.zone_data,
"zone_updated_at": (
record.zone_updated_at.isoformat()
if record.zone_updated_at
else None
),
"hostname": record.hostname,
"username": record.username,
}
).encode()
else:
records = (
session.query(Domain)
.filter(Domain.zone_data.isnot(None))
.all()
)
return json.dumps(
[
{
"domain": r.domain,
"zone_updated_at": (
r.zone_updated_at.isoformat()
if r.zone_updated_at
else None
),
"hostname": r.hostname,
"username": r.username,
}
for r in records
]
).encode()
except Exception as exc:
logger.error(f"[internal] Error serving /internal/zones: {exc}")
cherrypy.response.status = 500
return json.dumps({"error": "internal server error"}).encode()
finally:
session.close()

View File

@@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""Peer sync worker — exchanges zone_data between directdnsonly instances.
Each node stores zone_data in its local SQLite DB after every successful
backend write. When DirectAdmin pushes a zone to one node but the other
is temporarily offline, the offline node misses that zone_data.
PeerSyncWorker corrects this by periodically comparing zone lists with
configured peers and fetching any zone_data that is newer or absent locally.
It only updates the local DB — it never writes directly to backends. The
existing reconciler healing pass then detects missing zones and re-pushes
using the freshly synced zone_data.
Safety properties:
- If a peer is unreachable, skip it silently and retry next interval
- Only zone_data is synced — backend writes remain the sole responsibility
of the local save queue worker
- Newer zone_updated_at timestamp wins; local data is never overwritten
with older peer data
"""
import datetime
import threading
from loguru import logger
import requests
from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain
class PeerSyncWorker:
"""Periodically fetches zone_data from peer directdnsonly instances and
stores it locally so the healing pass can re-push missing zones without
waiting for a DirectAdmin re-push."""
def __init__(self, peer_sync_config: dict):
self.enabled = peer_sync_config.get("enabled", False)
self.interval_seconds = peer_sync_config.get("interval_minutes", 15) * 60
self.peers = peer_sync_config.get("peers") or []
self._stop_event = threading.Event()
self._thread = None
def start(self):
if not self.enabled:
logger.info("Peer sync disabled — skipping")
return
if not self.peers:
logger.warning(
"Peer sync enabled but no peers configured"
)
return
self._stop_event.clear()
self._thread = threading.Thread(
target=self._run, daemon=True, name="peer_sync_worker"
)
self._thread.start()
peer_urls = [p.get("url", "?") for p in self.peers]
logger.info(
f"Peer sync worker started — "
f"interval: {self.interval_seconds // 60}m, "
f"peers: {peer_urls}"
)
def stop(self):
self._stop_event.set()
if self._thread:
self._thread.join(timeout=10)
logger.info("Peer sync worker stopped")
@property
def is_alive(self):
return self._thread is not None and self._thread.is_alive()
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _run(self):
logger.info("Peer sync worker starting — running initial sync now")
self._sync_all()
while not self._stop_event.wait(timeout=self.interval_seconds):
self._sync_all()
def _sync_all(self):
logger.debug(
f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)"
)
for peer in self.peers:
url = peer.get("url")
if not url:
logger.warning("[peer_sync] Peer config missing url — skipping")
continue
try:
self._sync_from_peer(peer)
except Exception as exc:
logger.warning(
f"[peer_sync] Skipping unreachable peer {url}: {exc}"
)
def _sync_from_peer(self, peer: dict):
url = peer.get("url", "").rstrip("/")
username = peer.get("username")
password = peer.get("password")
auth = (username, password) if username else None
# Fetch the peer's zone list
resp = requests.get(
f"{url}/internal/zones", auth=auth, timeout=10
)
if resp.status_code != 200:
logger.warning(
f"[peer_sync] {url}: /internal/zones returned {resp.status_code}"
)
return
peer_zones = resp.json() # [{domain, zone_updated_at, hostname, username}]
if not peer_zones:
logger.debug(f"[peer_sync] {url}: no zone_data on peer yet")
return
session = connect()
try:
synced = 0
for entry in peer_zones:
domain = entry.get("domain")
if not domain:
continue
peer_ts_str = entry.get("zone_updated_at")
peer_ts = (
datetime.datetime.fromisoformat(peer_ts_str)
if peer_ts_str
else None
)
local = session.query(Domain).filter_by(domain=domain).first()
needs_sync = (
local is None
or local.zone_data is None
or (peer_ts and not local.zone_updated_at)
or (
peer_ts
and local.zone_updated_at
and peer_ts > local.zone_updated_at
)
)
if not needs_sync:
continue
# Fetch full zone_data from peer
zresp = requests.get(
f"{url}/internal/zones",
params={"domain": domain},
auth=auth,
timeout=10,
)
if zresp.status_code != 200:
logger.warning(
f"[peer_sync] {url}: could not fetch zone_data "
f"for {domain} (HTTP {zresp.status_code})"
)
continue
zdata = zresp.json()
zone_data = zdata.get("zone_data")
if not zone_data:
continue
if local is None:
local = Domain(
domain=domain,
hostname=entry.get("hostname"),
username=entry.get("username"),
zone_data=zone_data,
zone_updated_at=peer_ts,
)
session.add(local)
logger.debug(
f"[peer_sync] {url}: created local record for {domain}"
)
else:
local.zone_data = zone_data
local.zone_updated_at = peer_ts
logger.debug(
f"[peer_sync] {url}: updated zone_data for {domain}"
)
synced += 1
if synced:
session.commit()
logger.info(
f"[peer_sync] Synced {synced} zone(s) from {url}"
)
else:
logger.debug(f"[peer_sync] {url}: already up to date")
finally:
session.close()

View File

@@ -62,6 +62,10 @@ def load_config() -> Vyper:
v.set_default("reconciliation.interval_minutes", 60) v.set_default("reconciliation.interval_minutes", 60)
v.set_default("reconciliation.verify_ssl", True) v.set_default("reconciliation.verify_ssl", True)
# Peer sync defaults
v.set_default("peer_sync.enabled", False)
v.set_default("peer_sync.interval_minutes", 15)
# Read configuration # Read configuration
try: try:
if not v.read_in_config(): if not v.read_in_config():

View File

@@ -30,6 +30,18 @@ app:
# password: secret # password: secret
# ssl: true # ssl: true
# Peer sync — exchange zone_data between directdnsonly instances
# Enables eventual consistency without a shared datastore.
# If a peer is offline, the sync is silently skipped and retried next interval.
# Use the same credentials as the peer's app.auth_username / auth_password.
#peer_sync:
# enabled: true
# interval_minutes: 15
# peers:
# - url: http://ddo-2:2222 # URL of the peer directdnsonly instance
# username: directdnsonly
# password: changeme
dns: dns:
default_backend: bind default_backend: bind
backends: backends:

View File

@@ -3,6 +3,7 @@ import cherrypy
from app.backends import BackendRegistry from app.backends import BackendRegistry
from app.api.admin import DNSAdminAPI from app.api.admin import DNSAdminAPI
from app.api.health import HealthAPI from app.api.health import HealthAPI
from app.api.internal import InternalAPI
from app import configure_logging from app import configure_logging
from worker import WorkerManager from worker import WorkerManager
from directdnsonly.config import config from directdnsonly.config import config
@@ -38,10 +39,12 @@ def main():
# Setup worker manager # Setup worker manager
reconciliation_config = config.get("reconciliation") or {} reconciliation_config = config.get("reconciliation") or {}
peer_sync_config = config.get("peer_sync") or {}
worker_manager = WorkerManager( worker_manager = WorkerManager(
queue_path=config.get("queue_location"), queue_path=config.get("queue_location"),
backend_registry=registry, backend_registry=registry,
reconciliation_config=reconciliation_config, reconciliation_config=reconciliation_config,
peer_sync_config=peer_sync_config,
) )
worker_manager.start() worker_manager.start()
logger.info( logger.info(
@@ -95,6 +98,7 @@ def main():
backend_registry=registry, backend_registry=registry,
) )
root.health = HealthAPI(registry) root.health = HealthAPI(registry)
root.internal = InternalAPI()
# Add queue status endpoint # Add queue status endpoint
root.queue_status = lambda: worker_manager.queue_status() root.queue_status = lambda: worker_manager.queue_status()

View File

@@ -12,6 +12,7 @@ from app.utils.zone_parser import count_zone_records
from directdnsonly.app.db.models import Domain from directdnsonly.app.db.models import Domain
from directdnsonly.app.db import connect from directdnsonly.app.db import connect
from directdnsonly.app.reconciler import ReconciliationWorker from directdnsonly.app.reconciler import ReconciliationWorker
from directdnsonly.app.peer_sync import PeerSyncWorker
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Retry configuration # Retry configuration
@@ -25,7 +26,11 @@ RETRY_DRAIN_INTERVAL = 30 # how often the retry drain thread wakes
class WorkerManager: class WorkerManager:
def __init__( def __init__(
self, queue_path: str, backend_registry, reconciliation_config: dict = None self,
queue_path: str,
backend_registry,
reconciliation_config: dict = None,
peer_sync_config: dict = None,
): ):
self.queue_path = queue_path self.queue_path = queue_path
self.backend_registry = backend_registry self.backend_registry = backend_registry
@@ -34,7 +39,9 @@ class WorkerManager:
self._delete_thread = None self._delete_thread = None
self._retry_thread = None self._retry_thread = None
self._reconciler = None self._reconciler = None
self._peer_syncer = None
self._reconciliation_config = reconciliation_config or {} self._reconciliation_config = reconciliation_config or {}
self._peer_sync_config = peer_sync_config or {}
try: try:
os.makedirs(queue_path, exist_ok=True) os.makedirs(queue_path, exist_ok=True)
@@ -467,10 +474,15 @@ class WorkerManager:
) )
self._reconciler.start() self._reconciler.start()
self._peer_syncer = PeerSyncWorker(self._peer_sync_config)
self._peer_syncer.start()
def stop(self): def stop(self):
self._running = False self._running = False
if self._reconciler: if self._reconciler:
self._reconciler.stop() self._reconciler.stop()
if self._peer_syncer:
self._peer_syncer.stop()
for thread in (self._save_thread, self._delete_thread, self._retry_thread): for thread in (self._save_thread, self._delete_thread, self._retry_thread):
if thread: if thread:
thread.join(timeout=5) thread.join(timeout=5)
@@ -487,4 +499,7 @@ class WorkerManager:
"reconciler_alive": ( "reconciler_alive": (
self._reconciler.is_alive if self._reconciler else False self._reconciler.is_alive if self._reconciler else False
), ),
"peer_syncer_alive": (
self._peer_syncer.is_alive if self._peer_syncer else False
),
} }

View File

@@ -37,4 +37,5 @@ def patch_connect(db_session, monkeypatch):
_factory = lambda: db_session # noqa: E731 _factory = lambda: db_session # noqa: E731
monkeypatch.setattr("directdnsonly.app.utils.connect", _factory) monkeypatch.setattr("directdnsonly.app.utils.connect", _factory)
monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory) monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory)
monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory)
return db_session return db_session

251
tests/test_peer_sync.py Normal file
View File

@@ -0,0 +1,251 @@
"""Tests for directdnsonly.app.peer_sync — PeerSyncWorker."""
import datetime
import json
import pytest
from unittest.mock import patch, MagicMock
from directdnsonly.app.peer_sync import PeerSyncWorker
from directdnsonly.app.db.models import Domain
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
BASE_CONFIG = {
"enabled": True,
"interval_minutes": 15,
"peers": [
{
"url": "http://ddo-2:2222",
"username": "directdnsonly",
"password": "changeme",
}
],
}
NOW = datetime.datetime(2024, 6, 1, 12, 0, 0)
OLDER = datetime.datetime(2024, 6, 1, 11, 0, 0)
ZONE_DATA = "$ORIGIN example.com.\n@ 300 IN SOA ns1 hostmaster 1 3600 900 604800 300\n"
# ---------------------------------------------------------------------------
# Config / startup tests
# ---------------------------------------------------------------------------
def test_disabled_by_default():
worker = PeerSyncWorker({})
assert not worker.enabled
def test_interval_stored():
worker = PeerSyncWorker({"enabled": True, "interval_minutes": 30})
assert worker.interval_seconds == 1800
def test_default_interval():
worker = PeerSyncWorker({"enabled": True})
assert worker.interval_seconds == 15 * 60
def test_peers_stored():
worker = PeerSyncWorker(BASE_CONFIG)
assert len(worker.peers) == 1
assert worker.peers[0]["url"] == "http://ddo-2:2222"
def test_start_skips_when_disabled(caplog):
worker = PeerSyncWorker({"enabled": False})
worker.start()
assert worker._thread is None
def test_start_warns_when_no_peers(caplog):
import logging
worker = PeerSyncWorker({"enabled": True, "peers": []})
with patch.object(worker, "_run"):
worker.start()
# Thread should not have started
assert worker._thread is None
# ---------------------------------------------------------------------------
# _sync_from_peer tests
# ---------------------------------------------------------------------------
def _make_peer():
return BASE_CONFIG["peers"][0]
def _peer_list(domain, ts=None):
"""Simulate the JSON response from GET /internal/zones."""
return [
{
"domain": domain,
"zone_updated_at": ts.isoformat() if ts else None,
"hostname": "da1.example.com",
"username": "admin",
}
]
def _peer_zone(domain, ts=None, zone_data=ZONE_DATA):
"""Simulate the JSON response from GET /internal/zones?domain=X."""
return {
"domain": domain,
"zone_data": zone_data,
"zone_updated_at": ts.isoformat() if ts else None,
"hostname": "da1.example.com",
"username": "admin",
}
def test_sync_creates_new_local_record(patch_connect, monkeypatch):
"""When local DB has no record, peer zone_data is fetched and stored."""
worker = PeerSyncWorker(BASE_CONFIG)
session = patch_connect
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 200
if params and params.get("domain"):
resp.json.return_value = _peer_zone("example.com", NOW)
else:
resp.json.return_value = _peer_list("example.com", NOW)
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._sync_from_peer(_make_peer())
record = session.query(Domain).filter_by(domain="example.com").first()
assert record is not None
assert record.zone_data == ZONE_DATA
assert record.zone_updated_at == NOW
def test_sync_updates_older_local_record(patch_connect, monkeypatch):
"""When local zone_data is older than peer's, it is overwritten."""
session = patch_connect
session.add(Domain(domain="example.com", zone_data="old data", zone_updated_at=OLDER))
session.commit()
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 200
if params and params.get("domain"):
resp.json.return_value = _peer_zone("example.com", NOW)
else:
resp.json.return_value = _peer_list("example.com", NOW)
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._sync_from_peer(_make_peer())
record = session.query(Domain).filter_by(domain="example.com").first()
assert record.zone_data == ZONE_DATA
assert record.zone_updated_at == NOW
def test_sync_skips_when_local_is_newer(patch_connect, monkeypatch):
"""When local zone_data is newer than peer's, it is not overwritten."""
session = patch_connect
session.add(Domain(domain="example.com", zone_data="newer local", zone_updated_at=NOW))
session.commit()
worker = PeerSyncWorker(BASE_CONFIG)
fetch_calls = []
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 200
if params and params.get("domain"):
fetch_calls.append(url)
resp.json.return_value = _peer_zone("example.com", OLDER)
else:
resp.json.return_value = _peer_list("example.com", OLDER)
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._sync_from_peer(_make_peer())
# zone_data fetch should not have been called
assert not fetch_calls
record = session.query(Domain).filter_by(domain="example.com").first()
assert record.zone_data == "newer local"
def test_sync_skips_unreachable_peer(monkeypatch):
"""If the peer raises a connection error, _sync_all catches it gracefully."""
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(*args, **kwargs):
raise ConnectionError("peer down")
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
# Should not raise
worker._sync_all()
def test_sync_skips_peer_with_bad_status(patch_connect, monkeypatch):
"""Non-200 response from peer zone list is silently skipped."""
worker = PeerSyncWorker(BASE_CONFIG)
session = patch_connect
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 503
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._sync_from_peer(_make_peer())
# No records should have been created
assert session.query(Domain).count() == 0
def test_sync_skips_missing_zone_data_in_response(patch_connect, monkeypatch):
"""If the peer returns no zone_data for a domain, it is skipped."""
session = patch_connect
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 200
if params and params.get("domain"):
resp.json.return_value = {"domain": "example.com", "zone_data": None}
else:
resp.json.return_value = _peer_list("example.com", NOW)
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._sync_from_peer(_make_peer())
assert session.query(Domain).count() == 0
def test_sync_empty_peer_list(patch_connect, monkeypatch):
"""Empty zone list from peer results in zero syncs without error."""
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = []
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._sync_from_peer(_make_peer())