Compare commits

...

10 Commits

Author SHA1 Message Date
db60d808de feat: operational status endpoint + reconciler/peer state tracking 📊
- ReconciliationWorker._last_run stores per-pass stats (da_servers_polled,
  zones_in_da/db, orphans_found/queued, hostnames_backfilled/migrated,
  zones_healed, duration_seconds, dry_run flag)
- ReconciliationWorker.get_status() exposes state for API/UI consumption
- _heal_backends() now returns healed count
- PeerSyncWorker.get_peer_status() serialises _peer_health to JSON-safe dict
  (url, healthy, consecutive_failures, last_seen) with summary totals
- WorkerManager tracks dead-letter count; queue_status() now returns nested
  reconciler/peer_sync dicts replacing flat reconciler_alive/peer_syncer_alive
- New GET /status endpoint (StatusAPI) aggregates queue depths, worker liveness,
  reconciler last-run, peer health, and live zone count; computes ok/degraded/error
- .gitignore: exclude .claude/, .vscode/, .env (always local)
- app.yml: add documented datastore section (SQLite default + MySQL commented)
- 164 tests passing (23 new tests added)
2026-02-25 18:51:56 +13:00
0f417da204 feat: add CMD_MULTI_SERVER methods to DirectAdminClient 🔌
Adds get_extra_dns_servers(), add_extra_dns_server(), and the
high-level ensure_extra_dns_server() which registers a node and
enforces dns=yes + domain_check=yes in a single call.  Also adds
the generic post() helper.  10 new tests, 141 total.
2026-02-25 16:29:21 +13:00
3f6a061ffe feat: mesh peer sync with health tracking and separate peer credentials 🔗
- Separate peer_sync.auth_username/password from the DA-facing credentials
  so /internal/* uses its own basic auth; a compromised peer cannot push
  zones or access the admin API
- Per-peer health tracking: consecutive failure count, degraded/recovered
  log events at FAILURE_THRESHOLD (3) and on first successful contact after
  degradation
- Gossip-lite mesh discovery: each sync pass calls /internal/peers on every
  known peer and adds newly discovered node URLs automatically; a linear
  chain of initial connections is sufficient to form a full mesh
- /internal/peers endpoint returns the node's live peer URL list
- Support DADNS_PEER_SYNC_PEER_N_URL/USERNAME/PASSWORD numbered env vars
  for multi-peer env-var-only deployments (up to 9); original single-peer
  DADNS_PEER_SYNC_PEER_URL retained for backward compatibility
2026-02-25 16:08:26 +13:00
0b31b75789 fix: correct RDATA encoding and batch processing in CoreDNS MySQL backend 🐛
- Fix dnspython silently relativizing in-zone FQDN targets to '@' by
  calling rdata.to_text(origin=origin, relativize=False); CoreDNS MySQL
  requires absolute FQDNs in RDATA and was serving '.' for any CNAME/MX
  pointing to the zone apex
- Reorder write_zone to delete stale records before inserting new ones
  so a brief NXDOMAIN is preferred over briefly serving duplicate records
- Rework save-queue batch loop: keep batch open until queue is empty
  rather than closing after a fixed timeout, so sequential DA zone pushes
  accumulate into a single batch
- Add managed_by='directadmin' to _ensure_zone_exists for new and
  legacy NULL rows
2026-02-25 15:43:08 +13:00
83fbb03cad fix: relativize zone-apex hostnames to '@' for CoreDNS MySQL 🐛
CoreDNS MySQL (cybercinch fork) expects '@' for zone-apex references in
record RDATA. Storing the full FQDN (e.g. 'ithome.net.nz.') caused CoreDNS
to strip the zone suffix and serve 'MX 0 .' / 'CNAME .' instead of the
correct apex target.

- Add _relativize_name(): converts zone FQDN → '@', in-zone subdomains →
  relative label, external FQDNs left unchanged. Handles both already-
  relativized output from dnspython ($ORIGIN present) and absolute FQDNs
  when $ORIGIN is absent from the zone file.
- Replace _normalize_cname_data() with _relativize_name(); add
  _normalize_mx_data(), _normalize_ns_data(), _normalize_srv_data() using
  the same helper.
- _parse_zone_to_record_set() now normalizes MX, NS, SRV alongside CNAME.
- _ensure_zone_exists() sets managed_by='directadmin' on create and
  back-fills NULL rows from pre-migration installs.
- Zone.managed_by changed to nullable=True to match ALTER TABLE migration
  where existing rows have no value.
- schema/coredns_mysql.sql updated to reflect actual two-table schema with
  managed_by column and migration comment.
- 11 new tests (130 total, all passing).
2026-02-25 14:37:14 +13:00
5e9a6f19bd fix: add __main__.py so python -m directdnsonly works in container 🐛
- directdnsonly/__main__.py: inserts package dir into sys.path before
  importing main.py (which uses short-form relative imports) then calls
  main(); works for both `python -m directdnsonly` and the dadns script
- pyproject.toml: wire up `dadns` console script entry point
2026-02-20 14:17:53 +13:00
4a4b4f2b98 docs: clarify Knot DNS and PowerDNS are not implemented backends 📝
Add explicit note that only nsd, bind, and coredns_mysql are available —
Knot and PowerDNS are listed as architectural context only.
2026-02-20 06:59:12 +13:00
6e96e78376 docs: CoreDNS MySQL is the recommended choice at all scale levels 🏆
The cybercinch fork's resilience features (cache fallback, health monitoring,
zero downtime, connection pooling) make it the best DNS backend regardless of
zone count — not just at 300+ zones. Update summary recommendation and
topology comparison "Best for" row to reflect this.
2026-02-20 06:53:47 +13:00
e8939bcd82 docs: document CoreDNS fork resilience features accurately 📋
Replace vague "file caching" description with the confirmed feature set:
connection pooling, degraded operation (JSON cache fallback), smart caching,
health monitoring, zero downtime. Update Topology B failure table to reflect
that CoreDNS serves from cache throughout MySQL outages. Add write/read split
summary — retry queue covers writes, CoreDNS cache covers reads.
2026-02-20 06:52:27 +13:00
d98f08a408 feat: peer sync configurable via env vars + document CoreDNS file cache 🔗
- PeerSyncWorker reads DADNS_PEER_SYNC_PEER_URL / _USERNAME / _PASSWORD env
  vars to populate a single peer without a config file; deduped against any
  config-file peers so the URL never appears twice
- 2 new tests (119 total, all passing)
- README: peer sync single-peer env var table; Topology C compose example
  updated to use env vars only (no config file needed for two-node setup)
- README: document cybercinch/coredns_mysql_extend built-in file caching —
  serves from cache during MySQL outages, eliminates per-query round-trips
2026-02-20 06:41:46 +13:00
22 changed files with 1393 additions and 203 deletions

6
.gitignore vendored
View File

@@ -26,3 +26,9 @@ build
*.mypy_cache *.mypy_cache
*.pytest_cache *.pytest_cache
/data/* /data/*
# Editor / tool settings — always local, never committed
.vscode/
.claude/
.env
*.env

View File

@@ -115,8 +115,8 @@ Both MySQL backends are written **concurrently** within the same zone update. A
| Scenario | What happens | | Scenario | What happens |
|---|---| |---|---|
| One MySQL backend unreachable | Other backend(s) succeed immediately. Failed backend queued for retry with exponential backoff (30 s → 2 m → 5 m → 15 m → 30 m, up to 5 attempts). | | One MySQL backend unreachable | Other backend(s) succeed immediately. Failed backend queued for retry with exponential backoff (30 s → 2 m → 5 m → 15 m → 30 m, up to 5 attempts). CoreDNS continues serving from its local JSON cache throughout. |
| MySQL backend down for hours | Retry queue exhausts. On recovery, the reconciliation healing pass detects the backend is missing zones and re-pushes all of them using stored `zone_data` — no DA intervention required. | | MySQL backend down for hours | Retry queue exhausts. CoreDNS serves from cache the entire time — zero query downtime. On recovery, the reconciliation healing pass detects the backend is missing zones and re-pushes all of them using stored `zone_data` — no DA intervention required. |
| directdnsonly container restarts | Persistent queue survives. In-flight zone updates replay on startup. | | directdnsonly container restarts | Persistent queue survives. In-flight zone updates replay on startup. |
| directdnsonly container down during DA push | DA cannot deliver. Persistent queue on disk is intact; when the container comes back, it resumes processing any previously queued items. New pushes during downtime are lost at the DA level (DA does not retry). | | directdnsonly container down during DA push | DA cannot deliver. Persistent queue on disk is intact; when the container comes back, it resumes processing any previously queued items. New pushes during downtime are lost at the DA level (DA does not retry). |
| Zone deleted from DA | Reconciliation poller detects orphan and queues delete across all backends. | | Zone deleted from DA | Reconciliation poller detects orphan and queues delete across all backends. |
@@ -250,7 +250,7 @@ Register each container as a separate Extra DNS server entry in DA → DNS Admin
| **Orphan detection** | Yes — reconciler | Yes — reconciler | Yes — reconciler (per instance) | | **Orphan detection** | Yes — reconciler | Yes — reconciler | Yes — reconciler (per instance) |
| **External DB required** | No | Yes (MySQL per CoreDNS node) | No (NSD) or Yes (CoreDNS MySQL) | | **External DB required** | No | Yes (MySQL per CoreDNS node) | No (NSD) or Yes (CoreDNS MySQL) |
| **Horizontal scaling** | Add DA Extra DNS entries + containers | Add backend stanzas in config | Add DA Extra DNS entries + containers + peer list | | **Horizontal scaling** | Add DA Extra DNS entries + containers | Add backend stanzas in config | Add DA Extra DNS entries + containers + peer list |
| **Best for** | Simple HA, no external DB | Multi-DC, stronger consistency | Most robust HA — survives extended outages without DA re-push | | **Best for** | Simple HA, no external DB | Best overall — resilient writes (retry queue) + resilient reads (CoreDNS cache fallback), no daemon reloads, scales to thousands of zones | Most robust HA — resilient at every layer, survives extended outages without DA re-push |
--- ---
@@ -298,10 +298,12 @@ The container image ships with **both NSD and BIND9** installed. The entrypoint
**Summary recommendation:** **Summary recommendation:**
- **Up to ~300 zones, no external DB:** Use the NSD backend (bundled) — lighter, faster, authoritative-only, same zone file format as BIND. - **Any scale, external DB available:** CoreDNS MySQL ([cybercinch fork](https://github.com/cybercinch/coredns_mysql_extend)) wins at every zone count. Connection pooling, JSON cache fallback, health monitoring, and zero-downtime operation during DB maintenance make it the most resilient choice regardless of size. No daemon reload ever needed — a zone write is a MySQL INSERT.
- **3001 000+ zones:** CoreDNS MySQL wins — zone data in MySQL means no daemon reload at all. - **No external DB, simplicity first:** NSD (bundled) — lightweight, fast, authoritative-only, same RFC 1035 zone file format as BIND.
- **Need zero-interruption zone swaps:** Knot DNS. - **Need zero-interruption zone swaps:** Knot DNS (RCU — serves old zone to in-flight queries while atomically swapping in the new one).
- **Need an HTTP API for zone management (no file I/O):** PowerDNS Authoritative with its native HTTP API and file/SQLite backend. - **Need an HTTP API for zone management:** PowerDNS Authoritative with its native HTTP API.
> **Note:** Knot DNS and PowerDNS backends are **not implemented** in directdnsonly — they are listed here as architectural context only. Implemented backends: `nsd`, `bind`, `coredns_mysql`. Pull requests for additional backends are welcome.
--- ---
@@ -314,18 +316,23 @@ NS records in the additional section, does not set the AA flag, and does not
handle wildcard records. handle wildcard records.
This project is designed to work with a patched fork that resolves all of those This project is designed to work with a patched fork that resolves all of those
issues: issues and adds production-grade resilience:
**[cybercinch/coredns_mysql_extend](https://github.com/cybercinch/coredns_mysql_extend)** **[cybercinch/coredns_mysql_extend](https://github.com/cybercinch/coredns_mysql_extend)**
Key differences from the upstream plugin: | Feature | Detail |
|---|---|
| **Fully authoritative** | Correct AA flag, NXDOMAIN on misses, NS records in the additional section |
| **Wildcard records** | `*` entries served correctly |
| **Connection pooling** | Configurable MySQL connection management — efficient under load |
| **Degraded operation** | Automatic fallback to a local JSON cache when MySQL is unavailable — DNS keeps serving |
| **Smart caching** | Intelligent per-record cache management reduces per-query MySQL round-trips |
| **Health monitoring** | Continuous database health checks with configurable intervals |
| **Zero downtime** | DNS continues serving during database maintenance windows |
- Fully authoritative responses — correct AA flag and NXDOMAIN on misses **Why this matters for Topology B:** directdnsonly's retry queue handles the write side during a MySQL outage — the CoreDNS fork handles the read side. Between them, neither writes nor queries are dropped during transient database failures.
- Wildcard record support (`*` entries served correctly)
- NS records returned in the additional section
Use the BIND backend if you want a zero-dependency setup with no custom CoreDNS Use the NSD or BIND backend if you want a zero-dependency setup with no custom CoreDNS build required.
build required.
--- ---
@@ -502,12 +509,20 @@ The built-in env var mapping targets the backend named `coredns_mysql`. For mult
#### Peer sync #### Peer sync
| Config key | Environment variable | Default | Description | | Config key / Environment variable | Default | Description |
|---|---|---|---| |---|---|---|
| `peer_sync.enabled` | `DADNS_PEER_SYNC_ENABLED` | `false` | Enable background peer-to-peer zone sync | | `peer_sync.enabled` / `DADNS_PEER_SYNC_ENABLED` | `false` | Enable background peer-to-peer zone sync |
| `peer_sync.interval_minutes` | `DADNS_PEER_SYNC_INTERVAL_MINUTES` | `15` | How often each peer is polled | | `peer_sync.interval_minutes` / `DADNS_PEER_SYNC_INTERVAL_MINUTES` | `15` | How often each peer is polled |
> The `peer_sync.peers` list (peer URLs, credentials) requires a config file — it cannot be expressed as simple env vars. For a **single peer** (the typical two-node Topology C setup) the peer can be configured entirely via env vars — no config file required:
| Environment variable | Default | Description |
|---|---|---|
| `DADNS_PEER_SYNC_PEER_URL` | _(unset)_ | URL of the single peer (e.g. `http://ddo-2:2222`). When set, this peer is automatically appended to the peers list. |
| `DADNS_PEER_SYNC_PEER_USERNAME` | `directdnsonly` | Basic auth username for the peer |
| `DADNS_PEER_SYNC_PEER_PASSWORD` | _(empty)_ | Basic auth password for the peer |
> For **multiple peers**, use a config file with the `peer_sync.peers` list. A peer defined via env var is deduped — if the same URL already appears in the config file it will not be added twice.
--- ---
@@ -540,8 +555,11 @@ services:
DADNS_APP_AUTH_PASSWORD: my-strong-secret DADNS_APP_AUTH_PASSWORD: my-strong-secret
DADNS_DNS_DEFAULT_BACKEND: nsd DADNS_DNS_DEFAULT_BACKEND: nsd
DADNS_DNS_BACKENDS_NSD_ENABLED: "true" DADNS_DNS_BACKENDS_NSD_ENABLED: "true"
DADNS_PEER_SYNC_ENABLED: "true"
DADNS_PEER_SYNC_PEER_URL: http://directdnsonly-mlb:2222
DADNS_PEER_SYNC_PEER_USERNAME: directdnsonly
DADNS_PEER_SYNC_PEER_PASSWORD: my-strong-secret
volumes: volumes:
- ./config/syd:/app/config # contains peer_sync.peers list
- syd-data:/app/data - syd-data:/app/data
directdnsonly-mlb: directdnsonly-mlb:
@@ -553,8 +571,11 @@ services:
DADNS_APP_AUTH_PASSWORD: my-strong-secret DADNS_APP_AUTH_PASSWORD: my-strong-secret
DADNS_DNS_DEFAULT_BACKEND: nsd DADNS_DNS_DEFAULT_BACKEND: nsd
DADNS_DNS_BACKENDS_NSD_ENABLED: "true" DADNS_DNS_BACKENDS_NSD_ENABLED: "true"
DADNS_PEER_SYNC_ENABLED: "true"
DADNS_PEER_SYNC_PEER_URL: http://directdnsonly-syd:2222
DADNS_PEER_SYNC_PEER_USERNAME: directdnsonly
DADNS_PEER_SYNC_PEER_PASSWORD: my-strong-secret
volumes: volumes:
- ./config/mlb:/app/config # contains peer_sync.peers list
- mlb-data:/app/data - mlb-data:/app/data
volumes: volumes:

17
directdnsonly/__main__.py Normal file
View File

@@ -0,0 +1,17 @@
import os
import sys
def run():
# main.py uses short-form imports (from app.*, from worker) that resolve
# relative to the directdnsonly/ package directory. Insert it into the
# path before importing so `python -m directdnsonly` and the `dadns`
# console script both work without changing main.py.
sys.path.insert(0, os.path.dirname(__file__))
from main import main
main()
if __name__ == "__main__":
run()

View File

@@ -7,14 +7,19 @@ from directdnsonly.app.db.models import Domain
class InternalAPI: class InternalAPI:
"""Peer-to-peer zone_data exchange endpoint. """Peer-to-peer zone_data exchange endpoints.
Used by PeerSyncWorker to replicate zone_data between directdnsonly Used by PeerSyncWorker to replicate zone_data between directdnsonly
instances so each node can independently heal its local backends. instances so each node can independently heal its local backends.
All routes require the same basic auth as the main API. All routes require peer_sync basic auth credentials, which are
configured separately from the main DirectAdmin-facing credentials
(peer_sync.auth_username / peer_sync.auth_password).
""" """
def __init__(self, peer_syncer=None):
self._peer_syncer = peer_syncer
@cherrypy.expose @cherrypy.expose
def zones(self, domain=None): def zones(self, domain=None):
"""Return zone metadata or zone_data for a specific domain. """Return zone metadata or zone_data for a specific domain.
@@ -77,3 +82,15 @@ class InternalAPI:
return json.dumps({"error": "internal server error"}).encode() return json.dumps({"error": "internal server error"}).encode()
finally: finally:
session.close() session.close()
@cherrypy.expose
def peers(self):
"""Return the list of peer URLs this node knows about.
GET /internal/peers
Returns a JSON array of URL strings. Used by other nodes during
sync to discover new cluster members (gossip-lite mesh expansion).
"""
cherrypy.response.headers["Content-Type"] = "application/json"
urls = self._peer_syncer.get_peer_urls() if self._peer_syncer else []
return json.dumps(urls).encode()

View File

@@ -0,0 +1,82 @@
"""Operational status endpoint — aggregates queue, worker, reconciler, and peer health."""
import json
import cherrypy
from sqlalchemy import func, select
from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain
class StatusAPI:
"""Exposes GET /status as a JSON health/status document.
Aggregates data from WorkerManager.queue_status() and a live DB zone count
into a single response that a UI or monitoring system can poll.
Overall ``status`` field:
- ``ok`` — all workers alive, no dead-letters, all peers healthy
- ``degraded`` — retries pending, dead-letters present, or a peer is unhealthy
- ``error`` — a core worker thread is not alive
"""
def __init__(self, worker_manager):
self._wm = worker_manager
@cherrypy.expose
def index(self):
cherrypy.response.headers["Content-Type"] = "application/json"
return json.dumps(self._build(), default=str).encode()
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _build(self) -> dict:
qs = self._wm.queue_status()
zone_count = self._zone_count()
overall = self._compute_overall(qs)
return {
"status": overall,
"queues": {
"save": qs.get("save_queue_size", 0),
"delete": qs.get("delete_queue_size", 0),
"retry": qs.get("retry_queue_size", 0),
"dead_letters": qs.get("dead_letters", 0),
},
"workers": {
"save": qs.get("save_worker_alive"),
"delete": qs.get("delete_worker_alive"),
"retry_drain": qs.get("retry_worker_alive"),
},
"reconciler": qs.get("reconciler", {}),
"peer_sync": qs.get("peer_sync", {}),
"zones": {"total": zone_count},
}
@staticmethod
def _zone_count() -> int:
session = connect()
try:
return session.execute(select(func.count(Domain.id))).scalar() or 0
except Exception:
return 0
finally:
session.close()
@staticmethod
def _compute_overall(qs: dict) -> str:
if not qs.get("save_worker_alive") or not qs.get("delete_worker_alive"):
return "error"
peer_sync = qs.get("peer_sync", {})
if (
qs.get("retry_queue_size", 0) > 0
or qs.get("dead_letters", 0) > 0
or peer_sync.get("degraded", 0) > 0
):
return "degraded"
return "ok"

View File

@@ -14,6 +14,7 @@ class Zone(Base):
__tablename__ = "zones" __tablename__ = "zones"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
zone_name = Column(String(255), nullable=False, index=True, unique=True) zone_name = Column(String(255), nullable=False, index=True, unique=True)
managed_by = Column(String(255), nullable=True) # 'directadmin' | 'direct' | NULL (legacy)
class Record(Base): class Record(Base):
@@ -90,10 +91,34 @@ class CoreDNSMySQLBackend(DNSBackend):
zone_name, zone_data zone_name, zone_data
) )
# Track changes # Pre-compute the set of (hostname, type, data) keys that should
current_records = set() # remain after this update, so we can identify stale records upfront.
incoming_keys = {
(name, rtype, data) for name, rtype, data, _ in source_records
}
changes = {"added": 0, "updated": 0, "removed": 0} changes = {"added": 0, "updated": 0, "removed": 0}
# --- 1. Remove stale records first ---
# Deleting before inserting means a brief NXDOMAIN is preferable
# to briefly serving both old and new records simultaneously.
for key, record in existing_records.items():
if key not in incoming_keys:
logger.debug(
f"Removed record: {record.hostname} {record.type} {record.data}"
)
session.delete(record)
changes["removed"] += 1
# Handle SOA removal if needed
if existing_soa and not source_soa:
logger.debug(
f"Removed SOA record: {existing_soa.hostname} SOA {existing_soa.data}"
)
session.delete(existing_soa)
changes["removed"] += 1
# --- 2. Add / update incoming records ---
# Handle SOA record # Handle SOA record
if source_soa: if source_soa:
soa_name, soa_content, soa_ttl = source_soa soa_name, soa_content, soa_ttl = source_soa
@@ -123,7 +148,6 @@ class CoreDNSMySQLBackend(DNSBackend):
# Process all non-SOA records # Process all non-SOA records
for record_name, record_type, record_content, record_ttl in source_records: for record_name, record_type, record_content, record_ttl in source_records:
key = (record_name, record_type, record_content) key = (record_name, record_type, record_content)
current_records.add(key)
if key in existing_records: if key in existing_records:
# Update existing record if TTL changed # Update existing record if TTL changed
@@ -151,23 +175,6 @@ class CoreDNSMySQLBackend(DNSBackend):
f"Added new record: {record_name} {record_type} {record_content}" f"Added new record: {record_name} {record_type} {record_content}"
) )
# Remove records that no longer exist in the source zone
for key, record in existing_records.items():
if key not in current_records:
logger.debug(
f"Removed record: {record.hostname} {record.type} {record.data}"
)
session.delete(record)
changes["removed"] += 1
# Handle SOA removal if needed
if existing_soa and not source_soa:
logger.debug(
f"Removed SOA record: {existing_soa.hostname} SOA {existing_soa.data}"
)
session.delete(existing_soa)
changes["removed"] += 1
session.commit() session.commit()
total_changes = changes["added"] + changes["updated"] + changes["removed"] total_changes = changes["added"] + changes["updated"] + changes["removed"]
if total_changes > 0: if total_changes > 0:
@@ -240,43 +247,27 @@ class CoreDNSMySQLBackend(DNSBackend):
session.close() session.close()
def _ensure_zone_exists(self, session, zone_name: str) -> Zone: def _ensure_zone_exists(self, session, zone_name: str) -> Zone:
"""Ensure a zone exists in the database, creating it if necessary""" """Ensure a zone exists in the database, creating it if necessary."""
zone = session.execute( zone = session.execute(
select(Zone).filter_by(zone_name=self.dot_fqdn(zone_name)) select(Zone).filter_by(zone_name=self.dot_fqdn(zone_name))
).scalar_one_or_none() ).scalar_one_or_none()
if not zone: if not zone:
logger.debug(f"Creating new zone: {self.dot_fqdn(zone_name)}") logger.debug(f"Creating new zone: {self.dot_fqdn(zone_name)}")
zone = Zone(zone_name=self.dot_fqdn(zone_name)) zone = Zone(
zone_name=self.dot_fqdn(zone_name),
managed_by="directadmin",
)
session.add(zone) session.add(zone)
session.flush() # Get the zone ID session.flush()
elif not zone.managed_by:
# Migrate pre-existing rows that were created before this field was added
zone.managed_by = "directadmin"
return zone return zone
def _normalize_cname_data(self, zone_name: str, record_content: str) -> str:
"""Normalize CNAME record data to ensure consistent FQDN format.
This ensures CNAME targets are always stored as fully-qualified domain
names so that record comparison between the BIND zone source and the
database is deterministic.
Args:
zone_name: The zone name for relative-name expansion
record_content: The raw CNAME target from the parsed zone
Returns:
The normalized CNAME target string
"""
if record_content.startswith("@"):
logger.debug(f"CNAME target starts with '@', replacing with zone FQDN")
record_content = self.dot_fqdn(zone_name)
elif not record_content.endswith("."):
logger.debug(f"CNAME target {record_content} is relative, appending zone")
record_content = ".".join([record_content, self.dot_fqdn(zone_name)])
return record_content
def _parse_zone_to_record_set( def _parse_zone_to_record_set(
self, zone_name: str, zone_data: str self, zone_name: str, zone_data: str
) -> Tuple[Set[Tuple[str, str, str, int]], Optional[Tuple[str, str, int]]]: ) -> Tuple[Set[Tuple[str, str, str, int]], Optional[Tuple[str, str, int]]]:
"""Parse a BIND zone file into a set of normalised record keys. """Parse a BIND zone file into a set of record keys.
Returns: Returns:
Tuple of: Tuple of:
@@ -287,21 +278,27 @@ class CoreDNSMySQLBackend(DNSBackend):
records: Set[Tuple[str, str, str, int]] = set() records: Set[Tuple[str, str, str, int]] = set()
soa = None soa = None
# Use the zone origin (if available) to expand relative names in RDATA
# back to absolute FQDNs. Without this, dnspython's default relativize=True
# behaviour turns in-zone targets like `wvvcc.co.nz.` into `@` in the
# stored data, which CoreDNS then serves incorrectly.
origin = dns_zone.origin
for name, ttl, rdata in dns_zone.iterate_rdatas(): for name, ttl, rdata in dns_zone.iterate_rdatas():
if rdata.rdclass != IN: if rdata.rdclass != IN:
continue continue
record_name = str(name) record_name = str(name)
record_type = rdata.rdtype.name record_type = rdata.rdtype.name
record_content = rdata.to_text() if origin is not None:
record_content = rdata.to_text(origin=origin, relativize=False)
else:
record_content = rdata.to_text()
if record_type == "SOA": if record_type == "SOA":
soa = (record_name, record_content, ttl) soa = (record_name, record_content, ttl)
continue continue
if record_type == "CNAME":
record_content = self._normalize_cname_data(zone_name, record_content)
records.add((record_name, record_type, record_content, ttl)) records.add((record_name, record_type, record_content, ttl))
return records, soa return records, soa

View File

@@ -70,7 +70,13 @@ class DirectAdminClient:
if response is None: if response is None:
return None return None
if response.is_redirect or response.status_code in (301, 302, 303, 307, 308): if response.is_redirect or response.status_code in (
301,
302,
303,
307,
308,
):
if self._cookies: if self._cookies:
logger.error( logger.error(
f"[da:{self.hostname}] Still redirecting after session login — " f"[da:{self.hostname}] Still redirecting after session login — "
@@ -155,6 +161,136 @@ class DirectAdminClient:
logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}") logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}")
return None return None
def post(
self, command: str, data: Optional[dict] = None
) -> Optional[requests.Response]:
"""Authenticated POST to any DA CMD_* endpoint."""
url = f"{self.scheme}://{self.hostname}:{self.port}/{command}"
kwargs: dict = dict(
data=data or {},
timeout=30,
verify=self.verify_ssl,
allow_redirects=False,
)
if self._cookies:
kwargs["cookies"] = self._cookies
else:
kwargs["auth"] = (self.username, self.password)
try:
return requests.post(url, **kwargs)
except Exception as exc:
logger.error(f"[da:{self.hostname}] POST {command} failed: {exc}")
return None
def get_extra_dns_servers(self) -> dict:
"""Return the Extra DNS server map from CMD_MULTI_SERVER (GET).
Returns a dict keyed by server hostname/IP, each value being the
per-server settings dict (dns, domain_check, port, user, ssl, …).
Returns ``{}`` on any error.
"""
resp = self.get("CMD_MULTI_SERVER", params={"json": "yes"})
if resp is None or resp.status_code != 200:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER GET failed")
return {}
try:
return resp.json().get("servers", {})
except Exception as exc:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER parse error: {exc}")
return {}
def add_extra_dns_server(
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
) -> bool:
"""Register a new Extra DNS server via CMD_MULTI_SERVER action=add.
Returns ``True`` if DA reports success, ``False`` otherwise.
"""
resp = self.post(
"CMD_MULTI_SERVER",
data={
"action": "add",
"json": "yes",
"ip": ip,
"port": str(port),
"user": user,
"passwd": passwd,
"ssl": "yes" if ssl else "no",
},
)
if resp is None or resp.status_code != 200:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add failed for {ip}")
return False
try:
result = resp.json()
if result.get("success"):
logger.info(f"[da:{self.hostname}] Added Extra DNS server {ip}")
return True
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER add error: {result.get('result', result)}"
)
return False
except Exception as exc:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add parse error: {exc}")
return False
def ensure_extra_dns_server(
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
) -> bool:
"""Add (if absent) and configure a directdnsonly Extra DNS server.
Ensures the server is registered with ``dns=yes`` and
``domain_check=yes`` so DirectAdmin pushes zone updates to it.
Returns ``True`` if fully configured, ``False`` on any failure.
"""
servers = self.get_extra_dns_servers()
if ip not in servers:
if not self.add_extra_dns_server(ip, port, user, passwd, ssl):
return False
ssl_str = "yes" if ssl else "no"
resp = self.post(
"CMD_MULTI_SERVER",
data={
"action": "multiple",
"save": "yes",
"json": "yes",
"passwd": "",
"select0": ip,
f"port-{ip}": str(port),
f"user-{ip}": user,
f"ssl-{ip}": ssl_str,
f"dns-{ip}": "yes",
f"domain_check-{ip}": "yes",
f"user_check-{ip}": "no",
f"email-{ip}": "no",
f"show_all_users-{ip}": "no",
},
)
if resp is None or resp.status_code != 200:
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save failed for {ip}"
)
return False
try:
result = resp.json()
if result.get("success"):
logger.info(
f"[da:{self.hostname}] Extra DNS server {ip} configured "
f"(dns=yes domain_check=yes)"
)
return True
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save error: {result.get('result', result)}"
)
return False
except Exception as exc:
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save parse error: {exc}"
)
return False
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Internal # Internal
# ------------------------------------------------------------------ # ------------------------------------------------------------------

View File

@@ -25,8 +25,8 @@ class Domain(Base):
domain = Column(String(255), unique=True) domain = Column(String(255), unique=True)
hostname = Column(String(255)) hostname = Column(String(255))
username = Column(String(255)) username = Column(String(255))
zone_data = Column(Text, nullable=True) # last known zone file from DA zone_data = Column(Text, nullable=True) # last known zone file from DA
zone_updated_at = Column(DateTime, nullable=True) # when zone_data was last stored zone_updated_at = Column(DateTime, nullable=True) # when zone_data was last stored
def __repr__(self): def __repr__(self):
return "<Domain(id='%s', domain='%s', hostname='%s', username='%s')>" % ( return "<Domain(id='%s', domain='%s', hostname='%s', username='%s')>" % (

View File

@@ -2,23 +2,37 @@
"""Peer sync worker — exchanges zone_data between directdnsonly instances. """Peer sync worker — exchanges zone_data between directdnsonly instances.
Each node stores zone_data in its local SQLite DB after every successful Each node stores zone_data in its local SQLite DB after every successful
backend write. When DirectAdmin pushes a zone to one node but the other backend write. When DirectAdmin pushes a zone to one node but another
is temporarily offline, the offline node misses that zone_data. is temporarily offline, the offline node misses that zone_data.
PeerSyncWorker corrects this by periodically comparing zone lists with PeerSyncWorker corrects this by periodically comparing zone lists with
configured peers and fetching any zone_data that is newer or absent locally. all known peers and fetching any zone_data that is newer or absent locally.
It only updates the local DB — it never writes directly to backends. The It only updates the local DB — it never writes directly to backends. The
existing reconciler healing pass then detects missing zones and re-pushes existing reconciler healing pass then detects missing zones and re-pushes
using the freshly synced zone_data. using the freshly synced zone_data.
Mesh behaviour:
- Each node exposes /internal/peers listing the URLs it knows about
- During each sync pass, every peer is asked for its peer list; any URLs
not already known are added automatically (gossip-lite discovery)
- A three-node cluster therefore only needs a linear chain of initial
connections — nodes propagate awareness of each other on the first pass
Health tracking:
- Consecutive failures per peer are counted; after FAILURE_THRESHOLD
misses the peer is marked degraded and a warning is logged once
- On the next successful contact the peer is marked recovered
Safety properties: Safety properties:
- If a peer is unreachable, skip it silently and retry next interval - If a peer is unreachable, skip it and try next interval
- Only zone_data is synced — backend writes remain the sole responsibility - Only zone_data is synced — backend writes remain the sole responsibility
of the local save queue worker of the local save queue worker
- Newer zone_updated_at timestamp wins; local data is never overwritten - Newer zone_updated_at timestamp wins; local data is never overwritten
with older peer data with older peer data
- Peer discovery is best-effort and never fails a sync pass
""" """
import datetime import datetime
import os
import threading import threading
from loguru import logger from loguru import logger
import requests import requests
@@ -27,6 +41,9 @@ from sqlalchemy import select
from directdnsonly.app.db import connect from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain from directdnsonly.app.db.models import Domain
# Consecutive failures before a peer is logged as degraded
FAILURE_THRESHOLD = 3
class PeerSyncWorker: class PeerSyncWorker:
"""Periodically fetches zone_data from peer directdnsonly instances and """Periodically fetches zone_data from peer directdnsonly instances and
@@ -36,10 +53,58 @@ class PeerSyncWorker:
def __init__(self, peer_sync_config: dict): def __init__(self, peer_sync_config: dict):
self.enabled = peer_sync_config.get("enabled", False) self.enabled = peer_sync_config.get("enabled", False)
self.interval_seconds = peer_sync_config.get("interval_minutes", 15) * 60 self.interval_seconds = peer_sync_config.get("interval_minutes", 15) * 60
self.peers = peer_sync_config.get("peers") or [] self.peers = list(peer_sync_config.get("peers") or [])
# Per-peer health state: url -> {consecutive_failures, healthy, last_seen}
self._peer_health: dict = {}
# ----------------------------------------------------------------
# Env-var peer injection
# ----------------------------------------------------------------
# Original single-peer vars (backward compat):
# DADNS_PEER_SYNC_PEER_URL / _USERNAME / _PASSWORD
# Numbered multi-peer vars (new):
# DADNS_PEER_SYNC_PEER_1_URL / _USERNAME / _PASSWORD
# DADNS_PEER_SYNC_PEER_2_URL / ... (up to 9)
known_urls = {p.get("url") for p in self.peers}
env_candidates = []
single_url = os.environ.get("DADNS_PEER_SYNC_PEER_URL", "").strip()
if single_url:
env_candidates.append({
"url": single_url,
"username": os.environ.get("DADNS_PEER_SYNC_PEER_USERNAME", "peersync"),
"password": os.environ.get("DADNS_PEER_SYNC_PEER_PASSWORD", ""),
})
for i in range(1, 10):
numbered_url = os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_URL", "").strip()
if not numbered_url:
break
env_candidates.append({
"url": numbered_url,
"username": os.environ.get(
f"DADNS_PEER_SYNC_PEER_{i}_USERNAME", "peersync"
),
"password": os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_PASSWORD", ""),
})
for candidate in env_candidates:
if candidate["url"] not in known_urls:
self.peers.append(candidate)
known_urls.add(candidate["url"])
logger.debug(
f"[peer_sync] Added peer from env vars: {candidate['url']}"
)
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._thread = None self._thread = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def start(self): def start(self):
if not self.enabled: if not self.enabled:
logger.info("Peer sync disabled — skipping") logger.info("Peer sync disabled — skipping")
@@ -70,6 +135,70 @@ class PeerSyncWorker:
def is_alive(self): def is_alive(self):
return self._thread is not None and self._thread.is_alive() return self._thread is not None and self._thread.is_alive()
def get_peer_urls(self) -> list:
"""Return the current list of known peer URLs.
Exposed via /internal/peers so other nodes can discover this node's mesh."""
return [p["url"] for p in self.peers if p.get("url")]
def get_peer_status(self) -> dict:
"""Return peer health summary for the /status endpoint."""
peers = []
for peer in self.peers:
url = peer.get("url", "")
h = self._peer_health.get(url, {})
last_seen = h.get("last_seen")
peers.append({
"url": url,
"healthy": h.get("healthy", True),
"consecutive_failures": h.get("consecutive_failures", 0),
"last_seen": last_seen.isoformat() if last_seen else None,
})
healthy = sum(1 for p in peers if p["healthy"])
return {
"enabled": self.enabled,
"alive": self.is_alive,
"interval_minutes": self.interval_seconds // 60,
"peers": peers,
"total": len(peers),
"healthy": healthy,
"degraded": len(peers) - healthy,
}
# ------------------------------------------------------------------
# Health tracking
# ------------------------------------------------------------------
def _health(self, url: str) -> dict:
return self._peer_health.setdefault(
url, {"consecutive_failures": 0, "healthy": True, "last_seen": None}
)
def _record_success(self, url: str):
h = self._health(url)
recovered = not h["healthy"]
h.update(
consecutive_failures=0,
healthy=True,
last_seen=datetime.datetime.utcnow(),
)
if recovered:
logger.info(f"[peer_sync] {url}: peer recovered")
def _record_failure(self, url: str, exc):
h = self._health(url)
h["consecutive_failures"] += 1
if h["healthy"] and h["consecutive_failures"] >= FAILURE_THRESHOLD:
h["healthy"] = False
logger.warning(
f"[peer_sync] {url}: marked degraded after {FAILURE_THRESHOLD} "
f"consecutive failures — {exc}"
)
else:
logger.debug(
f"[peer_sync] {url}: unreachable "
f"(failure #{h['consecutive_failures']}) — {exc}"
)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Internal # Internal
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -82,15 +211,49 @@ class PeerSyncWorker:
def _sync_all(self): def _sync_all(self):
logger.debug(f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)") logger.debug(f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)")
for peer in self.peers: # Iterate over a snapshot — _discover_peers_from may grow self.peers
for peer in list(self.peers):
url = peer.get("url") url = peer.get("url")
if not url: if not url:
logger.warning("[peer_sync] Peer config missing url — skipping") logger.warning("[peer_sync] Peer config missing url — skipping")
continue continue
try: try:
self._sync_from_peer(peer) self._sync_from_peer(peer)
self._discover_peers_from(peer)
self._record_success(url)
except Exception as exc: except Exception as exc:
logger.warning(f"[peer_sync] Skipping unreachable peer {url}: {exc}") self._record_failure(url, exc)
def _discover_peers_from(self, peer: dict):
"""Fetch peer's known peer list and add any new nodes for mesh expansion.
This is best-effort — failures are silently swallowed so they never
interrupt the main sync pass."""
url = peer.get("url", "").rstrip("/")
username = peer.get("username")
password = peer.get("password")
auth = (username, password) if username else None
try:
resp = requests.get(f"{url}/internal/peers", auth=auth, timeout=5)
if resp.status_code != 200:
return
remote_urls = resp.json() # list of URL strings
known_urls = {p.get("url") for p in self.peers}
for remote_url in remote_urls:
if remote_url and remote_url not in known_urls:
# Inherit credentials from the introducing peer — in practice
# all cluster nodes share the same peer_sync auth credentials.
self.peers.append({
"url": remote_url,
"username": username,
"password": password,
})
known_urls.add(remote_url)
logger.info(
f"[peer_sync] Discovered new peer {remote_url} via {url}"
)
except Exception:
pass # discovery is best-effort
def _sync_from_peer(self, peer: dict): def _sync_from_peer(self, peer: dict):
url = peer.get("url", "").rstrip("/") url = peer.get("url", "").rstrip("/")

View File

@@ -1,4 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import datetime
import threading import threading
from loguru import logger from loguru import logger
from sqlalchemy import select from sqlalchemy import select
@@ -42,6 +43,17 @@ class ReconciliationWorker:
self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60 self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._thread = None self._thread = None
self._last_run: dict = {}
def get_status(self) -> dict:
"""Return reconciler configuration and last-run statistics."""
return {
"enabled": self.enabled,
"alive": self.is_alive,
"dry_run": self.dry_run,
"interval_minutes": self.interval_seconds // 60,
"last_run": dict(self._last_run),
}
def start(self): def start(self):
if not self.enabled: if not self.enabled:
@@ -104,11 +116,18 @@ class ReconciliationWorker:
self._reconcile_all() self._reconcile_all()
def _reconcile_all(self): def _reconcile_all(self):
started_at = datetime.datetime.utcnow()
self._last_run = {"status": "running", "started_at": started_at.isoformat()}
logger.info( logger.info(
f"[reconciler] Starting reconciliation pass across " f"[reconciler] Starting reconciliation pass across "
f"{len(self.servers)} server(s)" f"{len(self.servers)} server(s)"
) )
total_queued = 0 total_queued = 0
da_servers_polled = 0
da_servers_unreachable = 0
migrated = 0
backfilled = 0
zones_in_db = 0
# Build a map of all domains seen on all DA servers: domain -> hostname # Build a map of all domains seen on all DA servers: domain -> hostname
all_da_domains: dict = {} all_da_domains: dict = {}
@@ -126,23 +145,26 @@ class ReconciliationWorker:
ssl=server.get("ssl", True), ssl=server.get("ssl", True),
verify_ssl=self.verify_ssl, verify_ssl=self.verify_ssl,
) )
da_servers_polled += 1
da_domains = client.list_domains(ipp=self.ipp) da_domains = client.list_domains(ipp=self.ipp)
if da_domains is not None: if da_domains is not None:
for d in da_domains: for d in da_domains:
all_da_domains[d] = hostname all_da_domains[d] = hostname
else:
da_servers_unreachable += 1
logger.debug( logger.debug(
f"[reconciler] {hostname}: " f"[reconciler] {hostname}: "
f"{len(da_domains) if da_domains else 0} active domain(s) in DA" f"{len(da_domains) if da_domains else 0} active domain(s) in DA"
) )
except Exception as exc: except Exception as exc:
logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}") logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}")
da_servers_unreachable += 1
# Compare local DB against what DA reported; update masters and queue deletes # Compare local DB against what DA reported; update masters and queue deletes
session = connect() session = connect()
try: try:
all_local_domains = session.execute(select(Domain)).scalars().all() all_local_domains = session.execute(select(Domain)).scalars().all()
migrated = 0 zones_in_db = len(all_local_domains)
backfilled = 0
known_servers = {s.get("hostname") for s in self.servers} known_servers = {s.get("hostname") for s in self.servers}
for record in all_local_domains: for record in all_local_domains:
domain = record.domain domain = record.domain
@@ -209,10 +231,31 @@ class ReconciliationWorker:
) )
# Option C: heal backends that are missing zones # Option C: heal backends that are missing zones
zones_healed = 0
if self.save_queue is not None and self.backend_registry is not None: if self.save_queue is not None and self.backend_registry is not None:
self._heal_backends() zones_healed = self._heal_backends()
def _heal_backends(self): completed_at = datetime.datetime.utcnow()
self._last_run = {
"status": "ok",
"started_at": started_at.isoformat(),
"completed_at": completed_at.isoformat(),
"duration_seconds": round(
(completed_at - started_at).total_seconds(), 1
),
"da_servers_polled": da_servers_polled,
"da_servers_unreachable": da_servers_unreachable,
"zones_in_da": len(all_da_domains),
"zones_in_db": zones_in_db,
"orphans_found": total_queued,
"orphans_queued": total_queued if not self.dry_run else 0,
"hostnames_backfilled": backfilled,
"hostnames_migrated": migrated,
"zones_healed": zones_healed,
"dry_run": self.dry_run,
}
def _heal_backends(self) -> int:
"""Check every backend for zone presence and re-queue any zone that is """Check every backend for zone presence and re-queue any zone that is
missing from one or more backends, using the stored zone_data as the missing from one or more backends, using the stored zone_data as the
authoritative source. This corrects backends that missed pushes due to authoritative source. This corrects backends that missed pushes due to
@@ -220,9 +263,10 @@ class ReconciliationWorker:
""" """
backends = self.backend_registry.get_available_backends() backends = self.backend_registry.get_available_backends()
if not backends: if not backends:
return return 0
session = connect() session = connect()
healed = 0
try: try:
domains = session.execute( domains = session.execute(
select(Domain).where(Domain.zone_data.isnot(None)) select(Domain).where(Domain.zone_data.isnot(None))
@@ -231,9 +275,7 @@ class ReconciliationWorker:
logger.debug( logger.debug(
"[reconciler] Healing pass: no zone_data stored yet — skipping" "[reconciler] Healing pass: no zone_data stored yet — skipping"
) )
return return 0
healed = 0
for record in domains: for record in domains:
missing = [] missing = []
for backend_name, backend in backends.items(): for backend_name, backend in backends.items():
@@ -277,3 +319,4 @@ class ReconciliationWorker:
) )
finally: finally:
session.close() session.close()
return healed

View File

@@ -69,6 +69,8 @@ def load_config() -> Vyper:
# Peer sync defaults # Peer sync defaults
v.set_default("peer_sync.enabled", False) v.set_default("peer_sync.enabled", False)
v.set_default("peer_sync.interval_minutes", 15) v.set_default("peer_sync.interval_minutes", 15)
v.set_default("peer_sync.auth_username", "peersync")
v.set_default("peer_sync.auth_password", "changeme")
# Read configuration # Read configuration
try: try:

View File

@@ -3,6 +3,20 @@ timezone: Pacific/Auckland
log_level: INFO log_level: INFO
queue_location: ./data/queues queue_location: ./data/queues
# Application datastore — stores domain index and zone_data for healing/peer-sync.
# SQLite (default) requires no extra dependencies and is fine for single-node setups.
# MySQL is recommended for multi-node deployments with a shared datastore.
datastore:
type: sqlite
db_location: ./data/directdnsonly.db
# --- MySQL ---
# type: mysql
# host: "127.0.0.1"
# port: "3306"
# name: "directdnsonly"
# user: "directdnsonly"
# pass: "changeme"
app: app:
auth_username: directdnsonly auth_username: directdnsonly
auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var

View File

@@ -4,6 +4,7 @@ from app.backends import BackendRegistry
from app.api.admin import DNSAdminAPI from app.api.admin import DNSAdminAPI
from app.api.health import HealthAPI from app.api.health import HealthAPI
from app.api.internal import InternalAPI from app.api.internal import InternalAPI
from app.api.status import StatusAPI
from app import configure_logging from app import configure_logging
from worker import WorkerManager from worker import WorkerManager
from directdnsonly.config import config from directdnsonly.config import config
@@ -90,6 +91,17 @@ def main():
if config.get_string("app.log_level").upper() != "DEBUG": if config.get_string("app.log_level").upper() != "DEBUG":
cherrypy.log.access_log.propagate = False cherrypy.log.access_log.propagate = False
# Peer sync auth — separate credentials from the DA-facing API so a
# compromised peer node cannot push zones or access the admin endpoints.
peer_user_password_dict = {
config.get_string("peer_sync.auth_username"): config.get_string(
"peer_sync.auth_password"
)
}
peer_check_password = cherrypy.lib.auth_basic.checkpassword_dict(
peer_user_password_dict
)
# Mount applications # Mount applications
root = Root() root = Root()
root = DNSAdminAPI( root = DNSAdminAPI(
@@ -98,12 +110,18 @@ def main():
backend_registry=registry, backend_registry=registry,
) )
root.health = HealthAPI(registry) root.health = HealthAPI(registry)
root.internal = InternalAPI() root.internal = InternalAPI(peer_syncer=worker_manager._peer_syncer)
root.status = StatusAPI(worker_manager)
# Add queue status endpoint # Add queue status endpoint (debug)
root.queue_status = lambda: worker_manager.queue_status() root.queue_status = lambda: worker_manager.queue_status()
cherrypy.tree.mount(root, "/") # Override auth for /internal so peers use their own credentials
cherrypy.tree.mount(root, "/", config={
"/internal": {
"tools.auth_basic.checkpassword": peer_check_password,
}
})
cherrypy.engine.start() cherrypy.engine.start()
logger.success(f"Server started on port {config.get_int('app.listen_port')}") logger.success(f"Server started on port {config.get_int('app.listen_port')}")

View File

@@ -43,6 +43,7 @@ class WorkerManager:
self._peer_syncer = None self._peer_syncer = None
self._reconciliation_config = reconciliation_config or {} self._reconciliation_config = reconciliation_config or {}
self._peer_sync_config = peer_sync_config or {} self._peer_sync_config = peer_sync_config or {}
self._dead_letter_count = 0
try: try:
os.makedirs(queue_path, exist_ok=True) os.makedirs(queue_path, exist_ok=True)
@@ -62,93 +63,90 @@ class WorkerManager:
logger.info("Save queue worker started") logger.info("Save queue worker started")
session = connect() session = connect()
batch_start = None
batch_processed = 0
batch_failed = 0
while self._running: while self._running:
# Block until at least one item is available
try: try:
item = self.save_queue.get(block=True, timeout=5) item = self.save_queue.get(block=True, timeout=5)
if batch_start is None:
batch_start = time.monotonic()
batch_processed = 0
batch_failed = 0
pending = self.save_queue.qsize()
logger.info(
f"📥 Batch started — {pending + 1} zone(s) queued for processing"
)
domain = item.get("domain", "unknown")
is_retry = item.get("source") in ("retry", "reconciler_heal")
target_backends = item.get("failed_backends") # None = all backends
logger.debug(
f"Processing zone update for {domain}"
+ (f" [retry #{item.get('retry_count', 0)}]" if is_retry else "")
+ (f" [backends: {target_backends}]" if target_backends else "")
)
if not is_retry and not check_zone_exists(domain):
put_zone_index(domain, item.get("hostname"), item.get("username"))
if not all(k in item for k in ["domain", "zone_file"]):
logger.error(f"Invalid queue item: {item}")
self.save_queue.task_done()
batch_failed += 1
continue
backends = self.backend_registry.get_available_backends()
if target_backends:
backends = {
k: v for k, v in backends.items() if k in target_backends
}
if not backends:
logger.warning("No target backends available for this item!")
self.save_queue.task_done()
batch_failed += 1
continue
if len(backends) > 1:
failed = self._process_backends_parallel(backends, item, session)
else:
failed = set()
for backend_name, backend in backends.items():
if not self._process_single_backend(
backend_name, backend, item, session
):
failed.add(backend_name)
if failed:
self._schedule_retry(item, failed)
batch_failed += 1
else:
# Successful write — persist zone_data for Option C healing
self._store_zone_data(session, domain, item["zone_file"])
batch_processed += 1
self.save_queue.task_done()
logger.debug(f"Completed processing for {domain}")
except Empty: except Empty:
if batch_start is not None:
elapsed = time.monotonic() - batch_start
total = batch_processed + batch_failed
rate = batch_processed / elapsed if elapsed > 0 else 0
logger.success(
f"📦 Batch complete — {batch_processed}/{total} zone(s) "
f"processed successfully in {elapsed:.1f}s "
f"({rate:.1f} zones/sec)"
+ (f", {batch_failed} failed" if batch_failed else "")
)
batch_start = None
batch_processed = 0
batch_failed = 0
continue continue
except Exception as e:
logger.error(f"Unexpected worker error: {e}") # Open a batch and keep processing until the queue is empty
batch_failed += 1 batch_start = time.monotonic()
time.sleep(1) batch_processed = 0
batch_failed = 0
logger.info("📥 Batch started")
while True:
try:
domain = item.get("domain", "unknown")
is_retry = item.get("source") in ("retry", "reconciler_heal")
target_backends = item.get("failed_backends") # None = all backends
logger.debug(
f"Processing zone update for {domain}"
+ (f" [retry #{item.get('retry_count', 0)}]" if is_retry else "")
+ (f" [backends: {target_backends}]" if target_backends else "")
)
if not is_retry and not check_zone_exists(domain):
put_zone_index(domain, item.get("hostname"), item.get("username"))
if not all(k in item for k in ["domain", "zone_file"]):
logger.error(f"Invalid queue item: {item}")
self.save_queue.task_done()
batch_failed += 1
else:
backends = self.backend_registry.get_available_backends()
if target_backends:
backends = {
k: v for k, v in backends.items() if k in target_backends
}
if not backends:
logger.warning("No target backends available for this item!")
self.save_queue.task_done()
batch_failed += 1
else:
if len(backends) > 1:
failed = self._process_backends_parallel(backends, item, session)
else:
failed = set()
for backend_name, backend in backends.items():
if not self._process_single_backend(
backend_name, backend, item, session
):
failed.add(backend_name)
if failed:
self._schedule_retry(item, failed)
batch_failed += 1
else:
self._store_zone_data(session, domain, item["zone_file"])
batch_processed += 1
self.save_queue.task_done()
logger.debug(f"Completed processing for {domain}")
except Exception as e:
logger.error(f"Unexpected worker error processing {item.get('domain', '?')}: {e}")
batch_failed += 1
time.sleep(1)
# Check immediately for the next item — keep batch open while
# more work is queued; close it only when the queue is empty.
try:
item = self.save_queue.get_nowait()
except Empty:
break
elapsed = time.monotonic() - batch_start
total = batch_processed + batch_failed
rate = batch_processed / elapsed if elapsed > 0 else 0
logger.success(
f"📦 Batch complete — {batch_processed}/{total} zone(s) "
f"processed successfully in {elapsed:.1f}s "
f"({rate:.1f} zones/sec)"
+ (f", {batch_failed} failed" if batch_failed else "")
)
def _process_single_backend(self, backend_name, backend, item, session) -> bool: def _process_single_backend(self, backend_name, backend, item, session) -> bool:
"""Write a zone to one backend. Returns True on success, False on failure.""" """Write a zone to one backend. Returns True on success, False on failure."""
@@ -208,6 +206,7 @@ class WorkerManager:
Discards to dead-letter after MAX_RETRIES attempts.""" Discards to dead-letter after MAX_RETRIES attempts."""
retry_count = item.get("retry_count", 0) + 1 retry_count = item.get("retry_count", 0) + 1
if retry_count > MAX_RETRIES: if retry_count > MAX_RETRIES:
self._dead_letter_count += 1
logger.error( logger.error(
f"[retry] Dead-letter: {item['domain']} failed on " f"[retry] Dead-letter: {item['domain']} failed on "
f"{failed_backends} after {MAX_RETRIES} attempts — giving up" f"{failed_backends} after {MAX_RETRIES} attempts — giving up"
@@ -499,18 +498,24 @@ class WorkerManager:
logger.info("Workers stopped") logger.info("Workers stopped")
def queue_status(self): def queue_status(self):
reconciler = (
self._reconciler.get_status()
if self._reconciler
else {"enabled": False, "alive": False, "last_run": {}}
)
peer_sync = (
self._peer_syncer.get_peer_status()
if self._peer_syncer
else {"enabled": False, "alive": False, "peers": [], "total": 0, "healthy": 0, "degraded": 0}
)
return { return {
"save_queue_size": self.save_queue.qsize(), "save_queue_size": self.save_queue.qsize(),
"delete_queue_size": self.delete_queue.qsize(), "delete_queue_size": self.delete_queue.qsize(),
"retry_queue_size": self.retry_queue.qsize(), "retry_queue_size": self.retry_queue.qsize(),
"save_worker_alive": self._save_thread and self._save_thread.is_alive(), "dead_letters": self._dead_letter_count,
"delete_worker_alive": self._delete_thread "save_worker_alive": bool(self._save_thread and self._save_thread.is_alive()),
and self._delete_thread.is_alive(), "delete_worker_alive": bool(self._delete_thread and self._delete_thread.is_alive()),
"retry_worker_alive": self._retry_thread and self._retry_thread.is_alive(), "retry_worker_alive": bool(self._retry_thread and self._retry_thread.is_alive()),
"reconciler_alive": ( "reconciler": reconciler,
self._reconciler.is_alive if self._reconciler else False "peer_sync": peer_sync,
),
"peer_syncer_alive": (
self._peer_syncer.is_alive if self._peer_syncer else False
),
} }

View File

@@ -20,6 +20,9 @@ dependencies = [
"requests (>=2.32.0,<3.0.0)", "requests (>=2.32.0,<3.0.0)",
] ]
[project.scripts]
dadns = "directdnsonly.__main__:run"
[tool.poetry] [tool.poetry]
package-mode = true package-mode = true

View File

@@ -1,12 +1,33 @@
CREATE TABLE IF NOT EXISTS `records` ( -- DirectDNSOnly — CoreDNS MySQL schema
`id` int(11) NOT NULL AUTO_INCREMENT, -- Compatible with cybercinch/coredns_mysql_extend
`zone` varchar(255) NOT NULL, --
`name` varchar(255) NOT NULL, -- managed_by values:
`ttl` int(11) DEFAULT NULL, -- 'directadmin' zone is managed via directdnsonly / DirectAdmin push
`type` varchar(10) NOT NULL, -- 'direct' zone was created directly (not via DA)
`data` text NOT NULL, -- NULL legacy row created before this column was added
CREATE TABLE IF NOT EXISTS `zones` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`zone_name` varchar(255) NOT NULL,
`managed_by` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `idx_zone` (`zone`), UNIQUE KEY `uq_zone_name` (`zone_name`)
KEY `idx_name` (`name`), ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
KEY `idx_type` (`type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE IF NOT EXISTS `records` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`zone_id` int(11) NOT NULL,
`hostname` varchar(255) NOT NULL,
`type` varchar(10) NOT NULL,
`data` text NOT NULL,
`ttl` int(11) DEFAULT NULL,
`online` tinyint(1) NOT NULL DEFAULT 0,
PRIMARY KEY (`id`),
KEY `idx_zone_id` (`zone_id`),
KEY `idx_hostname` (`hostname`),
CONSTRAINT `fk_records_zone` FOREIGN KEY (`zone_id`) REFERENCES `zones` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Migration: add managed_by to an existing installation
-- ALTER TABLE `zones` ADD COLUMN `managed_by` varchar(255) DEFAULT NULL;
-- UPDATE `zones` SET `managed_by` = 'directadmin' WHERE `managed_by` IS NULL;

View File

@@ -38,4 +38,5 @@ def patch_connect(db_session, monkeypatch):
monkeypatch.setattr("directdnsonly.app.utils.connect", _factory) monkeypatch.setattr("directdnsonly.app.utils.connect", _factory)
monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory) monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory)
monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory) monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory)
monkeypatch.setattr("directdnsonly.app.api.status.connect", _factory)
return db_session return db_session

View File

@@ -165,3 +165,37 @@ def test_reconcile_no_changes_when_zone_matches(mysql_backend):
success, removed = mysql_backend.reconcile_zone_records("example.com", ZONE_DATA) success, removed = mysql_backend.reconcile_zone_records("example.com", ZONE_DATA)
assert success assert success
assert removed == 0 assert removed == 0
# ---------------------------------------------------------------------------
# managed_by field
# ---------------------------------------------------------------------------
def test_write_zone_sets_managed_by_directadmin(mysql_backend):
mysql_backend.write_zone("example.com", ZONE_DATA)
session = mysql_backend.Session()
zone = session.execute(
select(Zone).filter_by(zone_name="example.com.")
).scalar_one_or_none()
assert zone.managed_by == "directadmin"
session.close()
def test_write_zone_migrates_null_managed_by(mysql_backend):
"""Zones that pre-exist without managed_by get it set on next write."""
session = mysql_backend.Session()
zone = Zone(zone_name="example.com.", managed_by=None)
session.add(zone)
session.commit()
session.close()
mysql_backend.write_zone("example.com", ZONE_DATA)
session = mysql_backend.Session()
zone = session.execute(
select(Zone).filter_by(zone_name="example.com.")
).scalar_one_or_none()
assert zone.managed_by == "directadmin"
session.close()

View File

@@ -24,7 +24,9 @@ def _make_json_response(domains_list, total_pages=1):
def _client(): def _client():
return DirectAdminClient("da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True) return DirectAdminClient(
"da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -105,7 +107,9 @@ def test_html_response_returns_none():
def test_connection_error_returns_none(): def test_connection_error_returns_none():
with patch("requests.get", side_effect=requests.exceptions.ConnectionError("refused")): with patch(
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
):
result = _client().list_domains() result = _client().list_domains()
assert result is None assert result is None
@@ -119,7 +123,9 @@ def test_timeout_returns_none():
def test_ssl_error_returns_none(): def test_ssl_error_returns_none():
with patch("requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")): with patch(
"requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")
):
result = _client().list_domains() result = _client().list_domains()
assert result is None assert result is None
@@ -131,12 +137,16 @@ def test_ssl_error_returns_none():
def test_parse_standard_querystring(): def test_parse_standard_querystring():
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com&list[]=test.com") result = DirectAdminClient._parse_legacy_domain_list(
"list[]=example.com&list[]=test.com"
)
assert result == {"example.com", "test.com"} assert result == {"example.com", "test.com"}
def test_parse_newline_separated(): def test_parse_newline_separated():
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com\nlist[]=test.com") result = DirectAdminClient._parse_legacy_domain_list(
"list[]=example.com\nlist[]=test.com"
)
assert result == {"example.com", "test.com"} assert result == {"example.com", "test.com"}
@@ -190,3 +200,171 @@ def test_login_returns_false_on_exception():
result = client._login() result = client._login()
assert result is False assert result is False
# ---------------------------------------------------------------------------
# get_extra_dns_servers
# ---------------------------------------------------------------------------
def _multi_server_get_resp(servers=None):
mock = MagicMock()
mock.status_code = 200
mock.is_redirect = False
mock.headers = {"Content-Type": "application/json"}
mock.json.return_value = {"CLUSTER_ON": "yes", "servers": servers or {}}
mock.raise_for_status = MagicMock()
return mock
def test_get_extra_dns_servers_returns_servers_dict():
servers = {
"1.2.3.4": {"dns": "yes", "domain_check": "yes", "port": "2222", "ssl": "no"}
}
with patch("requests.get", return_value=_multi_server_get_resp(servers)):
result = _client().get_extra_dns_servers()
assert "1.2.3.4" in result
assert result["1.2.3.4"]["dns"] == "yes"
def test_get_extra_dns_servers_returns_empty_on_http_error():
mock_resp = MagicMock()
mock_resp.status_code = 500
with patch("requests.get", return_value=mock_resp):
result = _client().get_extra_dns_servers()
assert result == {}
def test_get_extra_dns_servers_returns_empty_on_connection_error():
with patch(
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
):
result = _client().get_extra_dns_servers()
assert result == {}
# ---------------------------------------------------------------------------
# add_extra_dns_server
# ---------------------------------------------------------------------------
def test_add_extra_dns_server_returns_true_on_success():
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {"result": "", "success": "Connection Added"}
with patch("requests.post", return_value=mock_resp):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is True
def test_add_extra_dns_server_returns_false_on_da_error():
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {"result": "Server already exists", "success": ""}
with patch("requests.post", return_value=mock_resp):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is False
def test_add_extra_dns_server_returns_false_on_connection_error():
with patch(
"requests.post", side_effect=requests.exceptions.ConnectionError("refused")
):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is False
# ---------------------------------------------------------------------------
# ensure_extra_dns_server
# ---------------------------------------------------------------------------
def _add_success_resp():
mock = MagicMock()
mock.status_code = 200
mock.json.return_value = {"result": "", "success": "Connection Added"}
return mock
def _save_success_resp():
mock = MagicMock()
mock.status_code = 200
mock.json.return_value = {"result": "", "success": "Connections Saved"}
return mock
def test_ensure_extra_dns_server_adds_and_configures_new_server():
"""Server not yet registered — adds it, then saves dns+domain_check settings."""
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch(
"requests.post",
side_effect=[_add_success_resp(), _save_success_resp()],
),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is True
def test_ensure_extra_dns_server_skips_add_when_already_present():
"""Server already registered — no add call, only saves settings."""
existing = {
"1.2.3.4": {"dns": "no", "domain_check": "no", "port": "2222", "ssl": "no"}
}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers=existing)),
patch("requests.post", return_value=_save_success_resp()) as mock_post,
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is True
assert mock_post.call_count == 1 # save only, no add
def test_ensure_extra_dns_server_returns_false_when_add_fails():
fail_resp = MagicMock()
fail_resp.status_code = 200
fail_resp.json.return_value = {"result": "error", "success": ""}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch("requests.post", return_value=fail_resp),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is False
def test_ensure_extra_dns_server_returns_false_when_save_fails():
"""Add succeeds but the subsequent settings save fails."""
fail_save = MagicMock()
fail_save.status_code = 200
fail_save.json.return_value = {"result": "error", "success": ""}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch(
"requests.post",
side_effect=[_add_success_resp(), fail_save],
),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is False

View File

@@ -58,6 +58,139 @@ def test_peers_stored():
assert worker.peers[0]["url"] == "http://ddo-2:2222" assert worker.peers[0]["url"] == "http://ddo-2:2222"
def test_peer_from_env_var(monkeypatch):
"""DADNS_PEER_SYNC_PEER_URL adds a peer without a config file."""
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_URL", "http://ddo-env:2222")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_USERNAME", "admin")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_PASSWORD", "secret")
worker = PeerSyncWorker({"enabled": True})
assert len(worker.peers) == 1
assert worker.peers[0]["url"] == "http://ddo-env:2222"
assert worker.peers[0]["username"] == "admin"
assert worker.peers[0]["password"] == "secret"
def test_env_peer_not_duplicated_when_also_in_config(monkeypatch):
"""Env var peer is not added if it already appears in the config file peers list."""
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_URL", "http://ddo-2:2222")
worker = PeerSyncWorker(BASE_CONFIG)
# BASE_CONFIG already has http://ddo-2:2222 — must remain exactly one entry
urls = [p["url"] for p in worker.peers]
assert urls.count("http://ddo-2:2222") == 1
def test_numbered_env_peers(monkeypatch):
"""DADNS_PEER_SYNC_PEER_1_URL and _2_URL add multiple peers."""
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://node-a:2222")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_USERNAME", "peersync")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_PASSWORD", "s3cr3t")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_2_URL", "http://node-b:2222")
worker = PeerSyncWorker({"enabled": True})
urls = [p["url"] for p in worker.peers]
assert "http://node-a:2222" in urls
assert "http://node-b:2222" in urls
assert len(urls) == 2
def test_numbered_env_peers_not_duplicated(monkeypatch):
"""Numbered env var peers are deduplicated against the config file list."""
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://ddo-2:2222")
worker = PeerSyncWorker(BASE_CONFIG)
urls = [p["url"] for p in worker.peers]
assert urls.count("http://ddo-2:2222") == 1
def test_get_peer_urls():
worker = PeerSyncWorker(BASE_CONFIG)
assert worker.get_peer_urls() == ["http://ddo-2:2222"]
# ---------------------------------------------------------------------------
# Health tracking
# ---------------------------------------------------------------------------
def test_peer_health_starts_healthy():
worker = PeerSyncWorker(BASE_CONFIG)
h = worker._health("http://ddo-2:2222")
assert h["healthy"] is True
assert h["consecutive_failures"] == 0
def test_record_failure_increments_count():
worker = PeerSyncWorker(BASE_CONFIG)
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 1
assert worker._health("http://ddo-2:2222")["healthy"] is True
def test_record_failure_marks_degraded_at_threshold():
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
worker = PeerSyncWorker(BASE_CONFIG)
for _ in range(FAILURE_THRESHOLD):
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
assert worker._health("http://ddo-2:2222")["healthy"] is False
def test_record_success_resets_health():
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
worker = PeerSyncWorker(BASE_CONFIG)
for _ in range(FAILURE_THRESHOLD):
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
assert not worker._health("http://ddo-2:2222")["healthy"]
worker._record_success("http://ddo-2:2222")
assert worker._health("http://ddo-2:2222")["healthy"] is True
assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 0
# ---------------------------------------------------------------------------
# Peer discovery (_discover_peers_from)
# ---------------------------------------------------------------------------
def test_discover_peers_adds_new_peer(monkeypatch):
"""New peer URL returned by /internal/peers is added to the peer list."""
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = ["http://node-c:2222"]
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._discover_peers_from(BASE_CONFIG["peers"][0])
urls = [p["url"] for p in worker.peers]
assert "http://node-c:2222" in urls
def test_discover_peers_skips_known(monkeypatch):
"""Already-known peer URLs are not re-added."""
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = ["http://ddo-2:2222"] # already known
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._discover_peers_from(BASE_CONFIG["peers"][0])
assert len(worker.peers) == 1 # unchanged
def test_discover_peers_tolerates_failure(monkeypatch):
"""Network error during discovery does not propagate."""
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(*args, **kwargs):
raise ConnectionError("peer down")
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
# Should not raise
worker._discover_peers_from(BASE_CONFIG["peers"][0])
def test_start_skips_when_disabled(caplog): def test_start_skips_when_disabled(caplog):
worker = PeerSyncWorker({"enabled": False}) worker = PeerSyncWorker({"enabled": False})
worker.start() worker.start()
@@ -261,3 +394,53 @@ def test_sync_empty_peer_list(patch_connect, monkeypatch):
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._sync_from_peer(_make_peer()) worker._sync_from_peer(_make_peer())
# ---------------------------------------------------------------------------
# get_peer_status
# ---------------------------------------------------------------------------
def test_get_peer_status_no_contact_yet():
worker = PeerSyncWorker(BASE_CONFIG)
status = worker.get_peer_status()
assert status["enabled"] is True
assert status["total"] == 1
assert status["healthy"] == 1
assert status["degraded"] == 0
assert status["peers"][0]["url"] == "http://ddo-2:2222"
assert status["peers"][0]["healthy"] is True
assert status["peers"][0]["last_seen"] is None
def test_get_peer_status_after_success():
worker = PeerSyncWorker(BASE_CONFIG)
worker._record_success("http://ddo-2:2222")
status = worker.get_peer_status()
assert status["healthy"] == 1
assert status["degraded"] == 0
assert status["peers"][0]["last_seen"] is not None
def test_get_peer_status_after_degraded():
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
worker = PeerSyncWorker(BASE_CONFIG)
for _ in range(FAILURE_THRESHOLD):
worker._record_failure("http://ddo-2:2222", Exception("timeout"))
status = worker.get_peer_status()
assert status["healthy"] == 0
assert status["degraded"] == 1
assert status["peers"][0]["healthy"] is False
def test_get_peer_status_disabled():
worker = PeerSyncWorker({})
status = worker.get_peer_status()
assert status["enabled"] is False
assert status["total"] == 0
assert status["peers"] == []

View File

@@ -55,7 +55,9 @@ DA_CLIENT_PATH = "directdnsonly.app.reconciler.DirectAdminClient"
def _patch_da(return_value): def _patch_da(return_value):
"""Patch DirectAdminClient so list_domains returns a fixed value.""" """Patch DirectAdminClient so list_domains returns a fixed value."""
return patch(DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value}) return patch(
DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value}
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -233,7 +235,12 @@ def test_heal_skips_domains_without_zone_data(delete_queue, patch_connect):
registry, _ = _make_backend_registry(zone_exists_return=False) registry, _ = _make_backend_registry(zone_exists_return=False)
patch_connect.add( patch_connect.add(
Domain(domain="nodata.com", hostname="da1.example.com", username="admin", zone_data=None) Domain(
domain="nodata.com",
hostname="da1.example.com",
username="admin",
zone_data=None,
)
) )
patch_connect.commit() patch_connect.commit()
@@ -310,3 +317,83 @@ def test_heal_skipped_when_no_registry(delete_queue, patch_connect):
w._reconcile_all() w._reconcile_all()
assert save_queue.empty() assert save_queue.empty()
# ---------------------------------------------------------------------------
# get_status — last-run state
# ---------------------------------------------------------------------------
def test_get_status_before_any_run(worker):
status = worker.get_status()
assert status["enabled"] is True
assert status["alive"] is False
assert status["last_run"] == {}
def test_get_status_after_run(worker, patch_connect):
with _patch_da(set()):
worker._reconcile_all()
s = worker.get_status()
assert s["enabled"] is True
lr = s["last_run"]
assert lr["status"] == "ok"
assert "started_at" in lr
assert "completed_at" in lr
assert "duration_seconds" in lr
assert lr["da_servers_polled"] == 1
assert lr["da_servers_unreachable"] == 0
assert lr["dry_run"] is False
def test_get_status_counts_unreachable_server(worker, patch_connect):
with _patch_da(None):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["da_servers_polled"] == 1
assert lr["da_servers_unreachable"] == 1
def test_get_status_counts_orphans(worker, delete_queue, patch_connect):
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with _patch_da(set()):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["orphans_found"] == 1
assert lr["orphans_queued"] == 1
def test_get_status_dry_run_orphans_not_queued_in_stats(dry_run_worker, patch_connect):
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with _patch_da(set()):
dry_run_worker._reconcile_all()
lr = dry_run_worker.get_status()["last_run"]
assert lr["dry_run"] is True
assert lr["orphans_found"] == 1
assert lr["orphans_queued"] == 0
def test_get_status_zones_in_db_counted(worker, patch_connect):
for d in ["a.com", "b.com", "c.com"]:
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
patch_connect.commit()
with _patch_da({"a.com", "b.com", "c.com"}):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["zones_in_db"] == 3
assert lr["zones_in_da"] == 3
assert lr["orphans_found"] == 0

162
tests/test_status_api.py Normal file
View File

@@ -0,0 +1,162 @@
"""Tests for directdnsonly.app.api.status — StatusAPI."""
import json
from unittest.mock import MagicMock
import cherrypy
import pytest
from directdnsonly.app.api.status import StatusAPI
from directdnsonly.app.db.models import Domain
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_RECONCILER_OK = {
"enabled": True,
"alive": True,
"dry_run": False,
"interval_minutes": 60,
"last_run": {},
}
_PEER_SYNC_OFF = {
"enabled": False,
"alive": False,
"peers": [],
"total": 0,
"healthy": 0,
"degraded": 0,
}
def _qs(**overrides):
base = {
"save_queue_size": 0,
"delete_queue_size": 0,
"retry_queue_size": 0,
"dead_letters": 0,
"save_worker_alive": True,
"delete_worker_alive": True,
"retry_worker_alive": True,
"reconciler": _RECONCILER_OK,
"peer_sync": _PEER_SYNC_OFF,
}
base.update(overrides)
return base
def _api(qs=None):
wm = MagicMock()
wm.queue_status.return_value = qs or _qs()
return StatusAPI(wm)
# ---------------------------------------------------------------------------
# _compute_overall
# ---------------------------------------------------------------------------
def test_overall_ok_all_healthy():
assert StatusAPI._compute_overall(_qs()) == "ok"
def test_overall_error_save_worker_dead():
assert StatusAPI._compute_overall(_qs(save_worker_alive=False)) == "error"
def test_overall_error_delete_worker_dead():
assert StatusAPI._compute_overall(_qs(delete_worker_alive=False)) == "error"
def test_overall_degraded_retries_pending():
assert StatusAPI._compute_overall(_qs(retry_queue_size=3)) == "degraded"
def test_overall_degraded_dead_letters():
assert StatusAPI._compute_overall(_qs(dead_letters=1)) == "degraded"
def test_overall_degraded_peer_unhealthy():
ps = {**_PEER_SYNC_OFF, "degraded": 1}
assert StatusAPI._compute_overall(_qs(peer_sync=ps)) == "degraded"
def test_overall_error_takes_priority_over_degraded():
"""error > degraded when both conditions are true."""
assert (
StatusAPI._compute_overall(
_qs(save_worker_alive=False, retry_queue_size=5)
)
== "error"
)
# ---------------------------------------------------------------------------
# _build — structure and zone count
# ---------------------------------------------------------------------------
def test_build_structure(patch_connect):
api = _api()
result = api._build()
assert "status" in result
assert "queues" in result
assert "workers" in result
assert "reconciler" in result
assert "peer_sync" in result
assert "zones" in result
def test_build_zone_count_zero(patch_connect):
api = _api()
result = api._build()
assert result["zones"]["total"] == 0
def test_build_zone_count_with_domains(patch_connect):
for d in ["a.com", "b.com", "c.com"]:
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
patch_connect.commit()
api = _api()
result = api._build()
assert result["zones"]["total"] == 3
def test_build_queues_forwarded(patch_connect):
api = _api(_qs(save_queue_size=2, delete_queue_size=1, retry_queue_size=3, dead_letters=1))
result = api._build()
assert result["queues"]["save"] == 2
assert result["queues"]["delete"] == 1
assert result["queues"]["retry"] == 3
assert result["queues"]["dead_letters"] == 1
def test_build_workers_forwarded(patch_connect):
api = _api()
result = api._build()
assert result["workers"]["save"] is True
assert result["workers"]["delete"] is True
assert result["workers"]["retry_drain"] is True
# ---------------------------------------------------------------------------
# index — JSON encoding
# ---------------------------------------------------------------------------
def test_index_returns_valid_json(patch_connect):
api = _api()
with MagicMock() as mock_resp:
cherrypy.response = mock_resp
cherrypy.response.headers = {}
body = api.index()
data = json.loads(body)
assert data["status"] == "ok"
assert isinstance(data["zones"]["total"], int)