Compare commits

...

10 Commits

Author SHA1 Message Date
db60d808de feat: operational status endpoint + reconciler/peer state tracking 📊
- ReconciliationWorker._last_run stores per-pass stats (da_servers_polled,
  zones_in_da/db, orphans_found/queued, hostnames_backfilled/migrated,
  zones_healed, duration_seconds, dry_run flag)
- ReconciliationWorker.get_status() exposes state for API/UI consumption
- _heal_backends() now returns healed count
- PeerSyncWorker.get_peer_status() serialises _peer_health to JSON-safe dict
  (url, healthy, consecutive_failures, last_seen) with summary totals
- WorkerManager tracks dead-letter count; queue_status() now returns nested
  reconciler/peer_sync dicts replacing flat reconciler_alive/peer_syncer_alive
- New GET /status endpoint (StatusAPI) aggregates queue depths, worker liveness,
  reconciler last-run, peer health, and live zone count; computes ok/degraded/error
- .gitignore: exclude .claude/, .vscode/, .env (always local)
- app.yml: add documented datastore section (SQLite default + MySQL commented)
- 164 tests passing (23 new tests added)
2026-02-25 18:51:56 +13:00
0f417da204 feat: add CMD_MULTI_SERVER methods to DirectAdminClient 🔌
Adds get_extra_dns_servers(), add_extra_dns_server(), and the
high-level ensure_extra_dns_server() which registers a node and
enforces dns=yes + domain_check=yes in a single call.  Also adds
the generic post() helper.  10 new tests, 141 total.
2026-02-25 16:29:21 +13:00
3f6a061ffe feat: mesh peer sync with health tracking and separate peer credentials 🔗
- Separate peer_sync.auth_username/password from the DA-facing credentials
  so /internal/* uses its own basic auth; a compromised peer cannot push
  zones or access the admin API
- Per-peer health tracking: consecutive failure count, degraded/recovered
  log events at FAILURE_THRESHOLD (3) and on first successful contact after
  degradation
- Gossip-lite mesh discovery: each sync pass calls /internal/peers on every
  known peer and adds newly discovered node URLs automatically; a linear
  chain of initial connections is sufficient to form a full mesh
- /internal/peers endpoint returns the node's live peer URL list
- Support DADNS_PEER_SYNC_PEER_N_URL/USERNAME/PASSWORD numbered env vars
  for multi-peer env-var-only deployments (up to 9); original single-peer
  DADNS_PEER_SYNC_PEER_URL retained for backward compatibility
2026-02-25 16:08:26 +13:00
0b31b75789 fix: correct RDATA encoding and batch processing in CoreDNS MySQL backend 🐛
- Fix dnspython silently relativizing in-zone FQDN targets to '@' by
  calling rdata.to_text(origin=origin, relativize=False); CoreDNS MySQL
  requires absolute FQDNs in RDATA and was serving '.' for any CNAME/MX
  pointing to the zone apex
- Reorder write_zone to delete stale records before inserting new ones
  so a brief NXDOMAIN is preferred over briefly serving duplicate records
- Rework save-queue batch loop: keep batch open until queue is empty
  rather than closing after a fixed timeout, so sequential DA zone pushes
  accumulate into a single batch
- Add managed_by='directadmin' to _ensure_zone_exists for new and
  legacy NULL rows
2026-02-25 15:43:08 +13:00
83fbb03cad fix: relativize zone-apex hostnames to '@' for CoreDNS MySQL 🐛
CoreDNS MySQL (cybercinch fork) expects '@' for zone-apex references in
record RDATA. Storing the full FQDN (e.g. 'ithome.net.nz.') caused CoreDNS
to strip the zone suffix and serve 'MX 0 .' / 'CNAME .' instead of the
correct apex target.

- Add _relativize_name(): converts zone FQDN → '@', in-zone subdomains →
  relative label, external FQDNs left unchanged. Handles both already-
  relativized output from dnspython ($ORIGIN present) and absolute FQDNs
  when $ORIGIN is absent from the zone file.
- Replace _normalize_cname_data() with _relativize_name(); add
  _normalize_mx_data(), _normalize_ns_data(), _normalize_srv_data() using
  the same helper.
- _parse_zone_to_record_set() now normalizes MX, NS, SRV alongside CNAME.
- _ensure_zone_exists() sets managed_by='directadmin' on create and
  back-fills NULL rows from pre-migration installs.
- Zone.managed_by changed to nullable=True to match ALTER TABLE migration
  where existing rows have no value.
- schema/coredns_mysql.sql updated to reflect actual two-table schema with
  managed_by column and migration comment.
- 11 new tests (130 total, all passing).
2026-02-25 14:37:14 +13:00
5e9a6f19bd fix: add __main__.py so python -m directdnsonly works in container 🐛
- directdnsonly/__main__.py: inserts package dir into sys.path before
  importing main.py (which uses short-form relative imports) then calls
  main(); works for both `python -m directdnsonly` and the dadns script
- pyproject.toml: wire up `dadns` console script entry point
2026-02-20 14:17:53 +13:00
4a4b4f2b98 docs: clarify Knot DNS and PowerDNS are not implemented backends 📝
Add explicit note that only nsd, bind, and coredns_mysql are available —
Knot and PowerDNS are listed as architectural context only.
2026-02-20 06:59:12 +13:00
6e96e78376 docs: CoreDNS MySQL is the recommended choice at all scale levels 🏆
The cybercinch fork's resilience features (cache fallback, health monitoring,
zero downtime, connection pooling) make it the best DNS backend regardless of
zone count — not just at 300+ zones. Update summary recommendation and
topology comparison "Best for" row to reflect this.
2026-02-20 06:53:47 +13:00
e8939bcd82 docs: document CoreDNS fork resilience features accurately 📋
Replace vague "file caching" description with the confirmed feature set:
connection pooling, degraded operation (JSON cache fallback), smart caching,
health monitoring, zero downtime. Update Topology B failure table to reflect
that CoreDNS serves from cache throughout MySQL outages. Add write/read split
summary — retry queue covers writes, CoreDNS cache covers reads.
2026-02-20 06:52:27 +13:00
d98f08a408 feat: peer sync configurable via env vars + document CoreDNS file cache 🔗
- PeerSyncWorker reads DADNS_PEER_SYNC_PEER_URL / _USERNAME / _PASSWORD env
  vars to populate a single peer without a config file; deduped against any
  config-file peers so the URL never appears twice
- 2 new tests (119 total, all passing)
- README: peer sync single-peer env var table; Topology C compose example
  updated to use env vars only (no config file needed for two-node setup)
- README: document cybercinch/coredns_mysql_extend built-in file caching —
  serves from cache during MySQL outages, eliminates per-query round-trips
2026-02-20 06:41:46 +13:00
22 changed files with 1393 additions and 203 deletions

6
.gitignore vendored
View File

@@ -26,3 +26,9 @@ build
*.mypy_cache
*.pytest_cache
/data/*
# Editor / tool settings — always local, never committed
.vscode/
.claude/
.env
*.env

View File

@@ -115,8 +115,8 @@ Both MySQL backends are written **concurrently** within the same zone update. A
| Scenario | What happens |
|---|---|
| One MySQL backend unreachable | Other backend(s) succeed immediately. Failed backend queued for retry with exponential backoff (30 s → 2 m → 5 m → 15 m → 30 m, up to 5 attempts). |
| MySQL backend down for hours | Retry queue exhausts. On recovery, the reconciliation healing pass detects the backend is missing zones and re-pushes all of them using stored `zone_data` — no DA intervention required. |
| One MySQL backend unreachable | Other backend(s) succeed immediately. Failed backend queued for retry with exponential backoff (30 s → 2 m → 5 m → 15 m → 30 m, up to 5 attempts). CoreDNS continues serving from its local JSON cache throughout. |
| MySQL backend down for hours | Retry queue exhausts. CoreDNS serves from cache the entire time — zero query downtime. On recovery, the reconciliation healing pass detects the backend is missing zones and re-pushes all of them using stored `zone_data` — no DA intervention required. |
| directdnsonly container restarts | Persistent queue survives. In-flight zone updates replay on startup. |
| directdnsonly container down during DA push | DA cannot deliver. Persistent queue on disk is intact; when the container comes back, it resumes processing any previously queued items. New pushes during downtime are lost at the DA level (DA does not retry). |
| Zone deleted from DA | Reconciliation poller detects orphan and queues delete across all backends. |
@@ -250,7 +250,7 @@ Register each container as a separate Extra DNS server entry in DA → DNS Admin
| **Orphan detection** | Yes — reconciler | Yes — reconciler | Yes — reconciler (per instance) |
| **External DB required** | No | Yes (MySQL per CoreDNS node) | No (NSD) or Yes (CoreDNS MySQL) |
| **Horizontal scaling** | Add DA Extra DNS entries + containers | Add backend stanzas in config | Add DA Extra DNS entries + containers + peer list |
| **Best for** | Simple HA, no external DB | Multi-DC, stronger consistency | Most robust HA — survives extended outages without DA re-push |
| **Best for** | Simple HA, no external DB | Best overall — resilient writes (retry queue) + resilient reads (CoreDNS cache fallback), no daemon reloads, scales to thousands of zones | Most robust HA — resilient at every layer, survives extended outages without DA re-push |
---
@@ -298,10 +298,12 @@ The container image ships with **both NSD and BIND9** installed. The entrypoint
**Summary recommendation:**
- **Up to ~300 zones, no external DB:** Use the NSD backend (bundled) — lighter, faster, authoritative-only, same zone file format as BIND.
- **3001 000+ zones:** CoreDNS MySQL wins — zone data in MySQL means no daemon reload at all.
- **Need zero-interruption zone swaps:** Knot DNS.
- **Need an HTTP API for zone management (no file I/O):** PowerDNS Authoritative with its native HTTP API and file/SQLite backend.
- **Any scale, external DB available:** CoreDNS MySQL ([cybercinch fork](https://github.com/cybercinch/coredns_mysql_extend)) wins at every zone count. Connection pooling, JSON cache fallback, health monitoring, and zero-downtime operation during DB maintenance make it the most resilient choice regardless of size. No daemon reload ever needed — a zone write is a MySQL INSERT.
- **No external DB, simplicity first:** NSD (bundled) — lightweight, fast, authoritative-only, same RFC 1035 zone file format as BIND.
- **Need zero-interruption zone swaps:** Knot DNS (RCU — serves old zone to in-flight queries while atomically swapping in the new one).
- **Need an HTTP API for zone management:** PowerDNS Authoritative with its native HTTP API.
> **Note:** Knot DNS and PowerDNS backends are **not implemented** in directdnsonly — they are listed here as architectural context only. Implemented backends: `nsd`, `bind`, `coredns_mysql`. Pull requests for additional backends are welcome.
---
@@ -314,18 +316,23 @@ NS records in the additional section, does not set the AA flag, and does not
handle wildcard records.
This project is designed to work with a patched fork that resolves all of those
issues:
issues and adds production-grade resilience:
**[cybercinch/coredns_mysql_extend](https://github.com/cybercinch/coredns_mysql_extend)**
Key differences from the upstream plugin:
| Feature | Detail |
|---|---|
| **Fully authoritative** | Correct AA flag, NXDOMAIN on misses, NS records in the additional section |
| **Wildcard records** | `*` entries served correctly |
| **Connection pooling** | Configurable MySQL connection management — efficient under load |
| **Degraded operation** | Automatic fallback to a local JSON cache when MySQL is unavailable — DNS keeps serving |
| **Smart caching** | Intelligent per-record cache management reduces per-query MySQL round-trips |
| **Health monitoring** | Continuous database health checks with configurable intervals |
| **Zero downtime** | DNS continues serving during database maintenance windows |
- Fully authoritative responses — correct AA flag and NXDOMAIN on misses
- Wildcard record support (`*` entries served correctly)
- NS records returned in the additional section
**Why this matters for Topology B:** directdnsonly's retry queue handles the write side during a MySQL outage — the CoreDNS fork handles the read side. Between them, neither writes nor queries are dropped during transient database failures.
Use the BIND backend if you want a zero-dependency setup with no custom CoreDNS
build required.
Use the NSD or BIND backend if you want a zero-dependency setup with no custom CoreDNS build required.
---
@@ -502,12 +509,20 @@ The built-in env var mapping targets the backend named `coredns_mysql`. For mult
#### Peer sync
| Config key | Environment variable | Default | Description |
|---|---|---|---|
| `peer_sync.enabled` | `DADNS_PEER_SYNC_ENABLED` | `false` | Enable background peer-to-peer zone sync |
| `peer_sync.interval_minutes` | `DADNS_PEER_SYNC_INTERVAL_MINUTES` | `15` | How often each peer is polled |
| Config key / Environment variable | Default | Description |
|---|---|---|
| `peer_sync.enabled` / `DADNS_PEER_SYNC_ENABLED` | `false` | Enable background peer-to-peer zone sync |
| `peer_sync.interval_minutes` / `DADNS_PEER_SYNC_INTERVAL_MINUTES` | `15` | How often each peer is polled |
> The `peer_sync.peers` list (peer URLs, credentials) requires a config file — it cannot be expressed as simple env vars.
For a **single peer** (the typical two-node Topology C setup) the peer can be configured entirely via env vars — no config file required:
| Environment variable | Default | Description |
|---|---|---|
| `DADNS_PEER_SYNC_PEER_URL` | _(unset)_ | URL of the single peer (e.g. `http://ddo-2:2222`). When set, this peer is automatically appended to the peers list. |
| `DADNS_PEER_SYNC_PEER_USERNAME` | `directdnsonly` | Basic auth username for the peer |
| `DADNS_PEER_SYNC_PEER_PASSWORD` | _(empty)_ | Basic auth password for the peer |
> For **multiple peers**, use a config file with the `peer_sync.peers` list. A peer defined via env var is deduped — if the same URL already appears in the config file it will not be added twice.
---
@@ -540,8 +555,11 @@ services:
DADNS_APP_AUTH_PASSWORD: my-strong-secret
DADNS_DNS_DEFAULT_BACKEND: nsd
DADNS_DNS_BACKENDS_NSD_ENABLED: "true"
DADNS_PEER_SYNC_ENABLED: "true"
DADNS_PEER_SYNC_PEER_URL: http://directdnsonly-mlb:2222
DADNS_PEER_SYNC_PEER_USERNAME: directdnsonly
DADNS_PEER_SYNC_PEER_PASSWORD: my-strong-secret
volumes:
- ./config/syd:/app/config # contains peer_sync.peers list
- syd-data:/app/data
directdnsonly-mlb:
@@ -553,8 +571,11 @@ services:
DADNS_APP_AUTH_PASSWORD: my-strong-secret
DADNS_DNS_DEFAULT_BACKEND: nsd
DADNS_DNS_BACKENDS_NSD_ENABLED: "true"
DADNS_PEER_SYNC_ENABLED: "true"
DADNS_PEER_SYNC_PEER_URL: http://directdnsonly-syd:2222
DADNS_PEER_SYNC_PEER_USERNAME: directdnsonly
DADNS_PEER_SYNC_PEER_PASSWORD: my-strong-secret
volumes:
- ./config/mlb:/app/config # contains peer_sync.peers list
- mlb-data:/app/data
volumes:

17
directdnsonly/__main__.py Normal file
View File

@@ -0,0 +1,17 @@
import os
import sys
def run():
# main.py uses short-form imports (from app.*, from worker) that resolve
# relative to the directdnsonly/ package directory. Insert it into the
# path before importing so `python -m directdnsonly` and the `dadns`
# console script both work without changing main.py.
sys.path.insert(0, os.path.dirname(__file__))
from main import main
main()
if __name__ == "__main__":
run()

View File

@@ -7,14 +7,19 @@ from directdnsonly.app.db.models import Domain
class InternalAPI:
"""Peer-to-peer zone_data exchange endpoint.
"""Peer-to-peer zone_data exchange endpoints.
Used by PeerSyncWorker to replicate zone_data between directdnsonly
instances so each node can independently heal its local backends.
All routes require the same basic auth as the main API.
All routes require peer_sync basic auth credentials, which are
configured separately from the main DirectAdmin-facing credentials
(peer_sync.auth_username / peer_sync.auth_password).
"""
def __init__(self, peer_syncer=None):
self._peer_syncer = peer_syncer
@cherrypy.expose
def zones(self, domain=None):
"""Return zone metadata or zone_data for a specific domain.
@@ -77,3 +82,15 @@ class InternalAPI:
return json.dumps({"error": "internal server error"}).encode()
finally:
session.close()
@cherrypy.expose
def peers(self):
"""Return the list of peer URLs this node knows about.
GET /internal/peers
Returns a JSON array of URL strings. Used by other nodes during
sync to discover new cluster members (gossip-lite mesh expansion).
"""
cherrypy.response.headers["Content-Type"] = "application/json"
urls = self._peer_syncer.get_peer_urls() if self._peer_syncer else []
return json.dumps(urls).encode()

View File

@@ -0,0 +1,82 @@
"""Operational status endpoint — aggregates queue, worker, reconciler, and peer health."""
import json
import cherrypy
from sqlalchemy import func, select
from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain
class StatusAPI:
"""Exposes GET /status as a JSON health/status document.
Aggregates data from WorkerManager.queue_status() and a live DB zone count
into a single response that a UI or monitoring system can poll.
Overall ``status`` field:
- ``ok`` — all workers alive, no dead-letters, all peers healthy
- ``degraded`` — retries pending, dead-letters present, or a peer is unhealthy
- ``error`` — a core worker thread is not alive
"""
def __init__(self, worker_manager):
self._wm = worker_manager
@cherrypy.expose
def index(self):
cherrypy.response.headers["Content-Type"] = "application/json"
return json.dumps(self._build(), default=str).encode()
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _build(self) -> dict:
qs = self._wm.queue_status()
zone_count = self._zone_count()
overall = self._compute_overall(qs)
return {
"status": overall,
"queues": {
"save": qs.get("save_queue_size", 0),
"delete": qs.get("delete_queue_size", 0),
"retry": qs.get("retry_queue_size", 0),
"dead_letters": qs.get("dead_letters", 0),
},
"workers": {
"save": qs.get("save_worker_alive"),
"delete": qs.get("delete_worker_alive"),
"retry_drain": qs.get("retry_worker_alive"),
},
"reconciler": qs.get("reconciler", {}),
"peer_sync": qs.get("peer_sync", {}),
"zones": {"total": zone_count},
}
@staticmethod
def _zone_count() -> int:
session = connect()
try:
return session.execute(select(func.count(Domain.id))).scalar() or 0
except Exception:
return 0
finally:
session.close()
@staticmethod
def _compute_overall(qs: dict) -> str:
if not qs.get("save_worker_alive") or not qs.get("delete_worker_alive"):
return "error"
peer_sync = qs.get("peer_sync", {})
if (
qs.get("retry_queue_size", 0) > 0
or qs.get("dead_letters", 0) > 0
or peer_sync.get("degraded", 0) > 0
):
return "degraded"
return "ok"

View File

@@ -14,6 +14,7 @@ class Zone(Base):
__tablename__ = "zones"
id = Column(Integer, primary_key=True)
zone_name = Column(String(255), nullable=False, index=True, unique=True)
managed_by = Column(String(255), nullable=True) # 'directadmin' | 'direct' | NULL (legacy)
class Record(Base):
@@ -90,10 +91,34 @@ class CoreDNSMySQLBackend(DNSBackend):
zone_name, zone_data
)
# Track changes
current_records = set()
# Pre-compute the set of (hostname, type, data) keys that should
# remain after this update, so we can identify stale records upfront.
incoming_keys = {
(name, rtype, data) for name, rtype, data, _ in source_records
}
changes = {"added": 0, "updated": 0, "removed": 0}
# --- 1. Remove stale records first ---
# Deleting before inserting means a brief NXDOMAIN is preferable
# to briefly serving both old and new records simultaneously.
for key, record in existing_records.items():
if key not in incoming_keys:
logger.debug(
f"Removed record: {record.hostname} {record.type} {record.data}"
)
session.delete(record)
changes["removed"] += 1
# Handle SOA removal if needed
if existing_soa and not source_soa:
logger.debug(
f"Removed SOA record: {existing_soa.hostname} SOA {existing_soa.data}"
)
session.delete(existing_soa)
changes["removed"] += 1
# --- 2. Add / update incoming records ---
# Handle SOA record
if source_soa:
soa_name, soa_content, soa_ttl = source_soa
@@ -123,7 +148,6 @@ class CoreDNSMySQLBackend(DNSBackend):
# Process all non-SOA records
for record_name, record_type, record_content, record_ttl in source_records:
key = (record_name, record_type, record_content)
current_records.add(key)
if key in existing_records:
# Update existing record if TTL changed
@@ -151,23 +175,6 @@ class CoreDNSMySQLBackend(DNSBackend):
f"Added new record: {record_name} {record_type} {record_content}"
)
# Remove records that no longer exist in the source zone
for key, record in existing_records.items():
if key not in current_records:
logger.debug(
f"Removed record: {record.hostname} {record.type} {record.data}"
)
session.delete(record)
changes["removed"] += 1
# Handle SOA removal if needed
if existing_soa and not source_soa:
logger.debug(
f"Removed SOA record: {existing_soa.hostname} SOA {existing_soa.data}"
)
session.delete(existing_soa)
changes["removed"] += 1
session.commit()
total_changes = changes["added"] + changes["updated"] + changes["removed"]
if total_changes > 0:
@@ -240,43 +247,27 @@ class CoreDNSMySQLBackend(DNSBackend):
session.close()
def _ensure_zone_exists(self, session, zone_name: str) -> Zone:
"""Ensure a zone exists in the database, creating it if necessary"""
"""Ensure a zone exists in the database, creating it if necessary."""
zone = session.execute(
select(Zone).filter_by(zone_name=self.dot_fqdn(zone_name))
).scalar_one_or_none()
if not zone:
logger.debug(f"Creating new zone: {self.dot_fqdn(zone_name)}")
zone = Zone(zone_name=self.dot_fqdn(zone_name))
zone = Zone(
zone_name=self.dot_fqdn(zone_name),
managed_by="directadmin",
)
session.add(zone)
session.flush() # Get the zone ID
session.flush()
elif not zone.managed_by:
# Migrate pre-existing rows that were created before this field was added
zone.managed_by = "directadmin"
return zone
def _normalize_cname_data(self, zone_name: str, record_content: str) -> str:
"""Normalize CNAME record data to ensure consistent FQDN format.
This ensures CNAME targets are always stored as fully-qualified domain
names so that record comparison between the BIND zone source and the
database is deterministic.
Args:
zone_name: The zone name for relative-name expansion
record_content: The raw CNAME target from the parsed zone
Returns:
The normalized CNAME target string
"""
if record_content.startswith("@"):
logger.debug(f"CNAME target starts with '@', replacing with zone FQDN")
record_content = self.dot_fqdn(zone_name)
elif not record_content.endswith("."):
logger.debug(f"CNAME target {record_content} is relative, appending zone")
record_content = ".".join([record_content, self.dot_fqdn(zone_name)])
return record_content
def _parse_zone_to_record_set(
self, zone_name: str, zone_data: str
) -> Tuple[Set[Tuple[str, str, str, int]], Optional[Tuple[str, str, int]]]:
"""Parse a BIND zone file into a set of normalised record keys.
"""Parse a BIND zone file into a set of record keys.
Returns:
Tuple of:
@@ -287,21 +278,27 @@ class CoreDNSMySQLBackend(DNSBackend):
records: Set[Tuple[str, str, str, int]] = set()
soa = None
# Use the zone origin (if available) to expand relative names in RDATA
# back to absolute FQDNs. Without this, dnspython's default relativize=True
# behaviour turns in-zone targets like `wvvcc.co.nz.` into `@` in the
# stored data, which CoreDNS then serves incorrectly.
origin = dns_zone.origin
for name, ttl, rdata in dns_zone.iterate_rdatas():
if rdata.rdclass != IN:
continue
record_name = str(name)
record_type = rdata.rdtype.name
record_content = rdata.to_text()
if origin is not None:
record_content = rdata.to_text(origin=origin, relativize=False)
else:
record_content = rdata.to_text()
if record_type == "SOA":
soa = (record_name, record_content, ttl)
continue
if record_type == "CNAME":
record_content = self._normalize_cname_data(zone_name, record_content)
records.add((record_name, record_type, record_content, ttl))
return records, soa

View File

@@ -70,7 +70,13 @@ class DirectAdminClient:
if response is None:
return None
if response.is_redirect or response.status_code in (301, 302, 303, 307, 308):
if response.is_redirect or response.status_code in (
301,
302,
303,
307,
308,
):
if self._cookies:
logger.error(
f"[da:{self.hostname}] Still redirecting after session login — "
@@ -155,6 +161,136 @@ class DirectAdminClient:
logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}")
return None
def post(
self, command: str, data: Optional[dict] = None
) -> Optional[requests.Response]:
"""Authenticated POST to any DA CMD_* endpoint."""
url = f"{self.scheme}://{self.hostname}:{self.port}/{command}"
kwargs: dict = dict(
data=data or {},
timeout=30,
verify=self.verify_ssl,
allow_redirects=False,
)
if self._cookies:
kwargs["cookies"] = self._cookies
else:
kwargs["auth"] = (self.username, self.password)
try:
return requests.post(url, **kwargs)
except Exception as exc:
logger.error(f"[da:{self.hostname}] POST {command} failed: {exc}")
return None
def get_extra_dns_servers(self) -> dict:
"""Return the Extra DNS server map from CMD_MULTI_SERVER (GET).
Returns a dict keyed by server hostname/IP, each value being the
per-server settings dict (dns, domain_check, port, user, ssl, …).
Returns ``{}`` on any error.
"""
resp = self.get("CMD_MULTI_SERVER", params={"json": "yes"})
if resp is None or resp.status_code != 200:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER GET failed")
return {}
try:
return resp.json().get("servers", {})
except Exception as exc:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER parse error: {exc}")
return {}
def add_extra_dns_server(
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
) -> bool:
"""Register a new Extra DNS server via CMD_MULTI_SERVER action=add.
Returns ``True`` if DA reports success, ``False`` otherwise.
"""
resp = self.post(
"CMD_MULTI_SERVER",
data={
"action": "add",
"json": "yes",
"ip": ip,
"port": str(port),
"user": user,
"passwd": passwd,
"ssl": "yes" if ssl else "no",
},
)
if resp is None or resp.status_code != 200:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add failed for {ip}")
return False
try:
result = resp.json()
if result.get("success"):
logger.info(f"[da:{self.hostname}] Added Extra DNS server {ip}")
return True
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER add error: {result.get('result', result)}"
)
return False
except Exception as exc:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add parse error: {exc}")
return False
def ensure_extra_dns_server(
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
) -> bool:
"""Add (if absent) and configure a directdnsonly Extra DNS server.
Ensures the server is registered with ``dns=yes`` and
``domain_check=yes`` so DirectAdmin pushes zone updates to it.
Returns ``True`` if fully configured, ``False`` on any failure.
"""
servers = self.get_extra_dns_servers()
if ip not in servers:
if not self.add_extra_dns_server(ip, port, user, passwd, ssl):
return False
ssl_str = "yes" if ssl else "no"
resp = self.post(
"CMD_MULTI_SERVER",
data={
"action": "multiple",
"save": "yes",
"json": "yes",
"passwd": "",
"select0": ip,
f"port-{ip}": str(port),
f"user-{ip}": user,
f"ssl-{ip}": ssl_str,
f"dns-{ip}": "yes",
f"domain_check-{ip}": "yes",
f"user_check-{ip}": "no",
f"email-{ip}": "no",
f"show_all_users-{ip}": "no",
},
)
if resp is None or resp.status_code != 200:
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save failed for {ip}"
)
return False
try:
result = resp.json()
if result.get("success"):
logger.info(
f"[da:{self.hostname}] Extra DNS server {ip} configured "
f"(dns=yes domain_check=yes)"
)
return True
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save error: {result.get('result', result)}"
)
return False
except Exception as exc:
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save parse error: {exc}"
)
return False
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------

View File

@@ -25,8 +25,8 @@ class Domain(Base):
domain = Column(String(255), unique=True)
hostname = Column(String(255))
username = Column(String(255))
zone_data = Column(Text, nullable=True) # last known zone file from DA
zone_updated_at = Column(DateTime, nullable=True) # when zone_data was last stored
zone_data = Column(Text, nullable=True) # last known zone file from DA
zone_updated_at = Column(DateTime, nullable=True) # when zone_data was last stored
def __repr__(self):
return "<Domain(id='%s', domain='%s', hostname='%s', username='%s')>" % (

View File

@@ -2,23 +2,37 @@
"""Peer sync worker — exchanges zone_data between directdnsonly instances.
Each node stores zone_data in its local SQLite DB after every successful
backend write. When DirectAdmin pushes a zone to one node but the other
backend write. When DirectAdmin pushes a zone to one node but another
is temporarily offline, the offline node misses that zone_data.
PeerSyncWorker corrects this by periodically comparing zone lists with
configured peers and fetching any zone_data that is newer or absent locally.
all known peers and fetching any zone_data that is newer or absent locally.
It only updates the local DB — it never writes directly to backends. The
existing reconciler healing pass then detects missing zones and re-pushes
using the freshly synced zone_data.
Mesh behaviour:
- Each node exposes /internal/peers listing the URLs it knows about
- During each sync pass, every peer is asked for its peer list; any URLs
not already known are added automatically (gossip-lite discovery)
- A three-node cluster therefore only needs a linear chain of initial
connections — nodes propagate awareness of each other on the first pass
Health tracking:
- Consecutive failures per peer are counted; after FAILURE_THRESHOLD
misses the peer is marked degraded and a warning is logged once
- On the next successful contact the peer is marked recovered
Safety properties:
- If a peer is unreachable, skip it silently and retry next interval
- If a peer is unreachable, skip it and try next interval
- Only zone_data is synced — backend writes remain the sole responsibility
of the local save queue worker
- Newer zone_updated_at timestamp wins; local data is never overwritten
with older peer data
- Peer discovery is best-effort and never fails a sync pass
"""
import datetime
import os
import threading
from loguru import logger
import requests
@@ -27,6 +41,9 @@ from sqlalchemy import select
from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain
# Consecutive failures before a peer is logged as degraded
FAILURE_THRESHOLD = 3
class PeerSyncWorker:
"""Periodically fetches zone_data from peer directdnsonly instances and
@@ -36,10 +53,58 @@ class PeerSyncWorker:
def __init__(self, peer_sync_config: dict):
self.enabled = peer_sync_config.get("enabled", False)
self.interval_seconds = peer_sync_config.get("interval_minutes", 15) * 60
self.peers = peer_sync_config.get("peers") or []
self.peers = list(peer_sync_config.get("peers") or [])
# Per-peer health state: url -> {consecutive_failures, healthy, last_seen}
self._peer_health: dict = {}
# ----------------------------------------------------------------
# Env-var peer injection
# ----------------------------------------------------------------
# Original single-peer vars (backward compat):
# DADNS_PEER_SYNC_PEER_URL / _USERNAME / _PASSWORD
# Numbered multi-peer vars (new):
# DADNS_PEER_SYNC_PEER_1_URL / _USERNAME / _PASSWORD
# DADNS_PEER_SYNC_PEER_2_URL / ... (up to 9)
known_urls = {p.get("url") for p in self.peers}
env_candidates = []
single_url = os.environ.get("DADNS_PEER_SYNC_PEER_URL", "").strip()
if single_url:
env_candidates.append({
"url": single_url,
"username": os.environ.get("DADNS_PEER_SYNC_PEER_USERNAME", "peersync"),
"password": os.environ.get("DADNS_PEER_SYNC_PEER_PASSWORD", ""),
})
for i in range(1, 10):
numbered_url = os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_URL", "").strip()
if not numbered_url:
break
env_candidates.append({
"url": numbered_url,
"username": os.environ.get(
f"DADNS_PEER_SYNC_PEER_{i}_USERNAME", "peersync"
),
"password": os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_PASSWORD", ""),
})
for candidate in env_candidates:
if candidate["url"] not in known_urls:
self.peers.append(candidate)
known_urls.add(candidate["url"])
logger.debug(
f"[peer_sync] Added peer from env vars: {candidate['url']}"
)
self._stop_event = threading.Event()
self._thread = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def start(self):
if not self.enabled:
logger.info("Peer sync disabled — skipping")
@@ -70,6 +135,70 @@ class PeerSyncWorker:
def is_alive(self):
return self._thread is not None and self._thread.is_alive()
def get_peer_urls(self) -> list:
"""Return the current list of known peer URLs.
Exposed via /internal/peers so other nodes can discover this node's mesh."""
return [p["url"] for p in self.peers if p.get("url")]
def get_peer_status(self) -> dict:
"""Return peer health summary for the /status endpoint."""
peers = []
for peer in self.peers:
url = peer.get("url", "")
h = self._peer_health.get(url, {})
last_seen = h.get("last_seen")
peers.append({
"url": url,
"healthy": h.get("healthy", True),
"consecutive_failures": h.get("consecutive_failures", 0),
"last_seen": last_seen.isoformat() if last_seen else None,
})
healthy = sum(1 for p in peers if p["healthy"])
return {
"enabled": self.enabled,
"alive": self.is_alive,
"interval_minutes": self.interval_seconds // 60,
"peers": peers,
"total": len(peers),
"healthy": healthy,
"degraded": len(peers) - healthy,
}
# ------------------------------------------------------------------
# Health tracking
# ------------------------------------------------------------------
def _health(self, url: str) -> dict:
return self._peer_health.setdefault(
url, {"consecutive_failures": 0, "healthy": True, "last_seen": None}
)
def _record_success(self, url: str):
h = self._health(url)
recovered = not h["healthy"]
h.update(
consecutive_failures=0,
healthy=True,
last_seen=datetime.datetime.utcnow(),
)
if recovered:
logger.info(f"[peer_sync] {url}: peer recovered")
def _record_failure(self, url: str, exc):
h = self._health(url)
h["consecutive_failures"] += 1
if h["healthy"] and h["consecutive_failures"] >= FAILURE_THRESHOLD:
h["healthy"] = False
logger.warning(
f"[peer_sync] {url}: marked degraded after {FAILURE_THRESHOLD} "
f"consecutive failures — {exc}"
)
else:
logger.debug(
f"[peer_sync] {url}: unreachable "
f"(failure #{h['consecutive_failures']}) — {exc}"
)
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
@@ -82,15 +211,49 @@ class PeerSyncWorker:
def _sync_all(self):
logger.debug(f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)")
for peer in self.peers:
# Iterate over a snapshot — _discover_peers_from may grow self.peers
for peer in list(self.peers):
url = peer.get("url")
if not url:
logger.warning("[peer_sync] Peer config missing url — skipping")
continue
try:
self._sync_from_peer(peer)
self._discover_peers_from(peer)
self._record_success(url)
except Exception as exc:
logger.warning(f"[peer_sync] Skipping unreachable peer {url}: {exc}")
self._record_failure(url, exc)
def _discover_peers_from(self, peer: dict):
"""Fetch peer's known peer list and add any new nodes for mesh expansion.
This is best-effort — failures are silently swallowed so they never
interrupt the main sync pass."""
url = peer.get("url", "").rstrip("/")
username = peer.get("username")
password = peer.get("password")
auth = (username, password) if username else None
try:
resp = requests.get(f"{url}/internal/peers", auth=auth, timeout=5)
if resp.status_code != 200:
return
remote_urls = resp.json() # list of URL strings
known_urls = {p.get("url") for p in self.peers}
for remote_url in remote_urls:
if remote_url and remote_url not in known_urls:
# Inherit credentials from the introducing peer — in practice
# all cluster nodes share the same peer_sync auth credentials.
self.peers.append({
"url": remote_url,
"username": username,
"password": password,
})
known_urls.add(remote_url)
logger.info(
f"[peer_sync] Discovered new peer {remote_url} via {url}"
)
except Exception:
pass # discovery is best-effort
def _sync_from_peer(self, peer: dict):
url = peer.get("url", "").rstrip("/")

View File

@@ -1,4 +1,5 @@
#!/usr/bin/env python3
import datetime
import threading
from loguru import logger
from sqlalchemy import select
@@ -42,6 +43,17 @@ class ReconciliationWorker:
self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60
self._stop_event = threading.Event()
self._thread = None
self._last_run: dict = {}
def get_status(self) -> dict:
"""Return reconciler configuration and last-run statistics."""
return {
"enabled": self.enabled,
"alive": self.is_alive,
"dry_run": self.dry_run,
"interval_minutes": self.interval_seconds // 60,
"last_run": dict(self._last_run),
}
def start(self):
if not self.enabled:
@@ -104,11 +116,18 @@ class ReconciliationWorker:
self._reconcile_all()
def _reconcile_all(self):
started_at = datetime.datetime.utcnow()
self._last_run = {"status": "running", "started_at": started_at.isoformat()}
logger.info(
f"[reconciler] Starting reconciliation pass across "
f"{len(self.servers)} server(s)"
)
total_queued = 0
da_servers_polled = 0
da_servers_unreachable = 0
migrated = 0
backfilled = 0
zones_in_db = 0
# Build a map of all domains seen on all DA servers: domain -> hostname
all_da_domains: dict = {}
@@ -126,23 +145,26 @@ class ReconciliationWorker:
ssl=server.get("ssl", True),
verify_ssl=self.verify_ssl,
)
da_servers_polled += 1
da_domains = client.list_domains(ipp=self.ipp)
if da_domains is not None:
for d in da_domains:
all_da_domains[d] = hostname
else:
da_servers_unreachable += 1
logger.debug(
f"[reconciler] {hostname}: "
f"{len(da_domains) if da_domains else 0} active domain(s) in DA"
)
except Exception as exc:
logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}")
da_servers_unreachable += 1
# Compare local DB against what DA reported; update masters and queue deletes
session = connect()
try:
all_local_domains = session.execute(select(Domain)).scalars().all()
migrated = 0
backfilled = 0
zones_in_db = len(all_local_domains)
known_servers = {s.get("hostname") for s in self.servers}
for record in all_local_domains:
domain = record.domain
@@ -209,10 +231,31 @@ class ReconciliationWorker:
)
# Option C: heal backends that are missing zones
zones_healed = 0
if self.save_queue is not None and self.backend_registry is not None:
self._heal_backends()
zones_healed = self._heal_backends()
def _heal_backends(self):
completed_at = datetime.datetime.utcnow()
self._last_run = {
"status": "ok",
"started_at": started_at.isoformat(),
"completed_at": completed_at.isoformat(),
"duration_seconds": round(
(completed_at - started_at).total_seconds(), 1
),
"da_servers_polled": da_servers_polled,
"da_servers_unreachable": da_servers_unreachable,
"zones_in_da": len(all_da_domains),
"zones_in_db": zones_in_db,
"orphans_found": total_queued,
"orphans_queued": total_queued if not self.dry_run else 0,
"hostnames_backfilled": backfilled,
"hostnames_migrated": migrated,
"zones_healed": zones_healed,
"dry_run": self.dry_run,
}
def _heal_backends(self) -> int:
"""Check every backend for zone presence and re-queue any zone that is
missing from one or more backends, using the stored zone_data as the
authoritative source. This corrects backends that missed pushes due to
@@ -220,9 +263,10 @@ class ReconciliationWorker:
"""
backends = self.backend_registry.get_available_backends()
if not backends:
return
return 0
session = connect()
healed = 0
try:
domains = session.execute(
select(Domain).where(Domain.zone_data.isnot(None))
@@ -231,9 +275,7 @@ class ReconciliationWorker:
logger.debug(
"[reconciler] Healing pass: no zone_data stored yet — skipping"
)
return
healed = 0
return 0
for record in domains:
missing = []
for backend_name, backend in backends.items():
@@ -277,3 +319,4 @@ class ReconciliationWorker:
)
finally:
session.close()
return healed

View File

@@ -69,6 +69,8 @@ def load_config() -> Vyper:
# Peer sync defaults
v.set_default("peer_sync.enabled", False)
v.set_default("peer_sync.interval_minutes", 15)
v.set_default("peer_sync.auth_username", "peersync")
v.set_default("peer_sync.auth_password", "changeme")
# Read configuration
try:

View File

@@ -3,6 +3,20 @@ timezone: Pacific/Auckland
log_level: INFO
queue_location: ./data/queues
# Application datastore — stores domain index and zone_data for healing/peer-sync.
# SQLite (default) requires no extra dependencies and is fine for single-node setups.
# MySQL is recommended for multi-node deployments with a shared datastore.
datastore:
type: sqlite
db_location: ./data/directdnsonly.db
# --- MySQL ---
# type: mysql
# host: "127.0.0.1"
# port: "3306"
# name: "directdnsonly"
# user: "directdnsonly"
# pass: "changeme"
app:
auth_username: directdnsonly
auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var

View File

@@ -4,6 +4,7 @@ from app.backends import BackendRegistry
from app.api.admin import DNSAdminAPI
from app.api.health import HealthAPI
from app.api.internal import InternalAPI
from app.api.status import StatusAPI
from app import configure_logging
from worker import WorkerManager
from directdnsonly.config import config
@@ -90,6 +91,17 @@ def main():
if config.get_string("app.log_level").upper() != "DEBUG":
cherrypy.log.access_log.propagate = False
# Peer sync auth — separate credentials from the DA-facing API so a
# compromised peer node cannot push zones or access the admin endpoints.
peer_user_password_dict = {
config.get_string("peer_sync.auth_username"): config.get_string(
"peer_sync.auth_password"
)
}
peer_check_password = cherrypy.lib.auth_basic.checkpassword_dict(
peer_user_password_dict
)
# Mount applications
root = Root()
root = DNSAdminAPI(
@@ -98,12 +110,18 @@ def main():
backend_registry=registry,
)
root.health = HealthAPI(registry)
root.internal = InternalAPI()
root.internal = InternalAPI(peer_syncer=worker_manager._peer_syncer)
root.status = StatusAPI(worker_manager)
# Add queue status endpoint
# Add queue status endpoint (debug)
root.queue_status = lambda: worker_manager.queue_status()
cherrypy.tree.mount(root, "/")
# Override auth for /internal so peers use their own credentials
cherrypy.tree.mount(root, "/", config={
"/internal": {
"tools.auth_basic.checkpassword": peer_check_password,
}
})
cherrypy.engine.start()
logger.success(f"Server started on port {config.get_int('app.listen_port')}")

View File

@@ -43,6 +43,7 @@ class WorkerManager:
self._peer_syncer = None
self._reconciliation_config = reconciliation_config or {}
self._peer_sync_config = peer_sync_config or {}
self._dead_letter_count = 0
try:
os.makedirs(queue_path, exist_ok=True)
@@ -62,93 +63,90 @@ class WorkerManager:
logger.info("Save queue worker started")
session = connect()
batch_start = None
batch_processed = 0
batch_failed = 0
while self._running:
# Block until at least one item is available
try:
item = self.save_queue.get(block=True, timeout=5)
if batch_start is None:
batch_start = time.monotonic()
batch_processed = 0
batch_failed = 0
pending = self.save_queue.qsize()
logger.info(
f"📥 Batch started — {pending + 1} zone(s) queued for processing"
)
domain = item.get("domain", "unknown")
is_retry = item.get("source") in ("retry", "reconciler_heal")
target_backends = item.get("failed_backends") # None = all backends
logger.debug(
f"Processing zone update for {domain}"
+ (f" [retry #{item.get('retry_count', 0)}]" if is_retry else "")
+ (f" [backends: {target_backends}]" if target_backends else "")
)
if not is_retry and not check_zone_exists(domain):
put_zone_index(domain, item.get("hostname"), item.get("username"))
if not all(k in item for k in ["domain", "zone_file"]):
logger.error(f"Invalid queue item: {item}")
self.save_queue.task_done()
batch_failed += 1
continue
backends = self.backend_registry.get_available_backends()
if target_backends:
backends = {
k: v for k, v in backends.items() if k in target_backends
}
if not backends:
logger.warning("No target backends available for this item!")
self.save_queue.task_done()
batch_failed += 1
continue
if len(backends) > 1:
failed = self._process_backends_parallel(backends, item, session)
else:
failed = set()
for backend_name, backend in backends.items():
if not self._process_single_backend(
backend_name, backend, item, session
):
failed.add(backend_name)
if failed:
self._schedule_retry(item, failed)
batch_failed += 1
else:
# Successful write — persist zone_data for Option C healing
self._store_zone_data(session, domain, item["zone_file"])
batch_processed += 1
self.save_queue.task_done()
logger.debug(f"Completed processing for {domain}")
except Empty:
if batch_start is not None:
elapsed = time.monotonic() - batch_start
total = batch_processed + batch_failed
rate = batch_processed / elapsed if elapsed > 0 else 0
logger.success(
f"📦 Batch complete — {batch_processed}/{total} zone(s) "
f"processed successfully in {elapsed:.1f}s "
f"({rate:.1f} zones/sec)"
+ (f", {batch_failed} failed" if batch_failed else "")
)
batch_start = None
batch_processed = 0
batch_failed = 0
continue
except Exception as e:
logger.error(f"Unexpected worker error: {e}")
batch_failed += 1
time.sleep(1)
# Open a batch and keep processing until the queue is empty
batch_start = time.monotonic()
batch_processed = 0
batch_failed = 0
logger.info("📥 Batch started")
while True:
try:
domain = item.get("domain", "unknown")
is_retry = item.get("source") in ("retry", "reconciler_heal")
target_backends = item.get("failed_backends") # None = all backends
logger.debug(
f"Processing zone update for {domain}"
+ (f" [retry #{item.get('retry_count', 0)}]" if is_retry else "")
+ (f" [backends: {target_backends}]" if target_backends else "")
)
if not is_retry and not check_zone_exists(domain):
put_zone_index(domain, item.get("hostname"), item.get("username"))
if not all(k in item for k in ["domain", "zone_file"]):
logger.error(f"Invalid queue item: {item}")
self.save_queue.task_done()
batch_failed += 1
else:
backends = self.backend_registry.get_available_backends()
if target_backends:
backends = {
k: v for k, v in backends.items() if k in target_backends
}
if not backends:
logger.warning("No target backends available for this item!")
self.save_queue.task_done()
batch_failed += 1
else:
if len(backends) > 1:
failed = self._process_backends_parallel(backends, item, session)
else:
failed = set()
for backend_name, backend in backends.items():
if not self._process_single_backend(
backend_name, backend, item, session
):
failed.add(backend_name)
if failed:
self._schedule_retry(item, failed)
batch_failed += 1
else:
self._store_zone_data(session, domain, item["zone_file"])
batch_processed += 1
self.save_queue.task_done()
logger.debug(f"Completed processing for {domain}")
except Exception as e:
logger.error(f"Unexpected worker error processing {item.get('domain', '?')}: {e}")
batch_failed += 1
time.sleep(1)
# Check immediately for the next item — keep batch open while
# more work is queued; close it only when the queue is empty.
try:
item = self.save_queue.get_nowait()
except Empty:
break
elapsed = time.monotonic() - batch_start
total = batch_processed + batch_failed
rate = batch_processed / elapsed if elapsed > 0 else 0
logger.success(
f"📦 Batch complete — {batch_processed}/{total} zone(s) "
f"processed successfully in {elapsed:.1f}s "
f"({rate:.1f} zones/sec)"
+ (f", {batch_failed} failed" if batch_failed else "")
)
def _process_single_backend(self, backend_name, backend, item, session) -> bool:
"""Write a zone to one backend. Returns True on success, False on failure."""
@@ -208,6 +206,7 @@ class WorkerManager:
Discards to dead-letter after MAX_RETRIES attempts."""
retry_count = item.get("retry_count", 0) + 1
if retry_count > MAX_RETRIES:
self._dead_letter_count += 1
logger.error(
f"[retry] Dead-letter: {item['domain']} failed on "
f"{failed_backends} after {MAX_RETRIES} attempts — giving up"
@@ -499,18 +498,24 @@ class WorkerManager:
logger.info("Workers stopped")
def queue_status(self):
reconciler = (
self._reconciler.get_status()
if self._reconciler
else {"enabled": False, "alive": False, "last_run": {}}
)
peer_sync = (
self._peer_syncer.get_peer_status()
if self._peer_syncer
else {"enabled": False, "alive": False, "peers": [], "total": 0, "healthy": 0, "degraded": 0}
)
return {
"save_queue_size": self.save_queue.qsize(),
"delete_queue_size": self.delete_queue.qsize(),
"retry_queue_size": self.retry_queue.qsize(),
"save_worker_alive": self._save_thread and self._save_thread.is_alive(),
"delete_worker_alive": self._delete_thread
and self._delete_thread.is_alive(),
"retry_worker_alive": self._retry_thread and self._retry_thread.is_alive(),
"reconciler_alive": (
self._reconciler.is_alive if self._reconciler else False
),
"peer_syncer_alive": (
self._peer_syncer.is_alive if self._peer_syncer else False
),
"dead_letters": self._dead_letter_count,
"save_worker_alive": bool(self._save_thread and self._save_thread.is_alive()),
"delete_worker_alive": bool(self._delete_thread and self._delete_thread.is_alive()),
"retry_worker_alive": bool(self._retry_thread and self._retry_thread.is_alive()),
"reconciler": reconciler,
"peer_sync": peer_sync,
}

View File

@@ -20,6 +20,9 @@ dependencies = [
"requests (>=2.32.0,<3.0.0)",
]
[project.scripts]
dadns = "directdnsonly.__main__:run"
[tool.poetry]
package-mode = true

View File

@@ -1,12 +1,33 @@
CREATE TABLE IF NOT EXISTS `records` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`zone` varchar(255) NOT NULL,
`name` varchar(255) NOT NULL,
`ttl` int(11) DEFAULT NULL,
`type` varchar(10) NOT NULL,
`data` text NOT NULL,
-- DirectDNSOnly — CoreDNS MySQL schema
-- Compatible with cybercinch/coredns_mysql_extend
--
-- managed_by values:
-- 'directadmin' zone is managed via directdnsonly / DirectAdmin push
-- 'direct' zone was created directly (not via DA)
-- NULL legacy row created before this column was added
CREATE TABLE IF NOT EXISTS `zones` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`zone_name` varchar(255) NOT NULL,
`managed_by` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_zone` (`zone`),
KEY `idx_name` (`name`),
KEY `idx_type` (`type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
UNIQUE KEY `uq_zone_name` (`zone_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE IF NOT EXISTS `records` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`zone_id` int(11) NOT NULL,
`hostname` varchar(255) NOT NULL,
`type` varchar(10) NOT NULL,
`data` text NOT NULL,
`ttl` int(11) DEFAULT NULL,
`online` tinyint(1) NOT NULL DEFAULT 0,
PRIMARY KEY (`id`),
KEY `idx_zone_id` (`zone_id`),
KEY `idx_hostname` (`hostname`),
CONSTRAINT `fk_records_zone` FOREIGN KEY (`zone_id`) REFERENCES `zones` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Migration: add managed_by to an existing installation
-- ALTER TABLE `zones` ADD COLUMN `managed_by` varchar(255) DEFAULT NULL;
-- UPDATE `zones` SET `managed_by` = 'directadmin' WHERE `managed_by` IS NULL;

View File

@@ -38,4 +38,5 @@ def patch_connect(db_session, monkeypatch):
monkeypatch.setattr("directdnsonly.app.utils.connect", _factory)
monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory)
monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory)
monkeypatch.setattr("directdnsonly.app.api.status.connect", _factory)
return db_session

View File

@@ -165,3 +165,37 @@ def test_reconcile_no_changes_when_zone_matches(mysql_backend):
success, removed = mysql_backend.reconcile_zone_records("example.com", ZONE_DATA)
assert success
assert removed == 0
# ---------------------------------------------------------------------------
# managed_by field
# ---------------------------------------------------------------------------
def test_write_zone_sets_managed_by_directadmin(mysql_backend):
mysql_backend.write_zone("example.com", ZONE_DATA)
session = mysql_backend.Session()
zone = session.execute(
select(Zone).filter_by(zone_name="example.com.")
).scalar_one_or_none()
assert zone.managed_by == "directadmin"
session.close()
def test_write_zone_migrates_null_managed_by(mysql_backend):
"""Zones that pre-exist without managed_by get it set on next write."""
session = mysql_backend.Session()
zone = Zone(zone_name="example.com.", managed_by=None)
session.add(zone)
session.commit()
session.close()
mysql_backend.write_zone("example.com", ZONE_DATA)
session = mysql_backend.Session()
zone = session.execute(
select(Zone).filter_by(zone_name="example.com.")
).scalar_one_or_none()
assert zone.managed_by == "directadmin"
session.close()

View File

@@ -24,7 +24,9 @@ def _make_json_response(domains_list, total_pages=1):
def _client():
return DirectAdminClient("da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True)
return DirectAdminClient(
"da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True
)
# ---------------------------------------------------------------------------
@@ -105,7 +107,9 @@ def test_html_response_returns_none():
def test_connection_error_returns_none():
with patch("requests.get", side_effect=requests.exceptions.ConnectionError("refused")):
with patch(
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
):
result = _client().list_domains()
assert result is None
@@ -119,7 +123,9 @@ def test_timeout_returns_none():
def test_ssl_error_returns_none():
with patch("requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")):
with patch(
"requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")
):
result = _client().list_domains()
assert result is None
@@ -131,12 +137,16 @@ def test_ssl_error_returns_none():
def test_parse_standard_querystring():
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com&list[]=test.com")
result = DirectAdminClient._parse_legacy_domain_list(
"list[]=example.com&list[]=test.com"
)
assert result == {"example.com", "test.com"}
def test_parse_newline_separated():
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com\nlist[]=test.com")
result = DirectAdminClient._parse_legacy_domain_list(
"list[]=example.com\nlist[]=test.com"
)
assert result == {"example.com", "test.com"}
@@ -190,3 +200,171 @@ def test_login_returns_false_on_exception():
result = client._login()
assert result is False
# ---------------------------------------------------------------------------
# get_extra_dns_servers
# ---------------------------------------------------------------------------
def _multi_server_get_resp(servers=None):
mock = MagicMock()
mock.status_code = 200
mock.is_redirect = False
mock.headers = {"Content-Type": "application/json"}
mock.json.return_value = {"CLUSTER_ON": "yes", "servers": servers or {}}
mock.raise_for_status = MagicMock()
return mock
def test_get_extra_dns_servers_returns_servers_dict():
servers = {
"1.2.3.4": {"dns": "yes", "domain_check": "yes", "port": "2222", "ssl": "no"}
}
with patch("requests.get", return_value=_multi_server_get_resp(servers)):
result = _client().get_extra_dns_servers()
assert "1.2.3.4" in result
assert result["1.2.3.4"]["dns"] == "yes"
def test_get_extra_dns_servers_returns_empty_on_http_error():
mock_resp = MagicMock()
mock_resp.status_code = 500
with patch("requests.get", return_value=mock_resp):
result = _client().get_extra_dns_servers()
assert result == {}
def test_get_extra_dns_servers_returns_empty_on_connection_error():
with patch(
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
):
result = _client().get_extra_dns_servers()
assert result == {}
# ---------------------------------------------------------------------------
# add_extra_dns_server
# ---------------------------------------------------------------------------
def test_add_extra_dns_server_returns_true_on_success():
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {"result": "", "success": "Connection Added"}
with patch("requests.post", return_value=mock_resp):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is True
def test_add_extra_dns_server_returns_false_on_da_error():
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {"result": "Server already exists", "success": ""}
with patch("requests.post", return_value=mock_resp):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is False
def test_add_extra_dns_server_returns_false_on_connection_error():
with patch(
"requests.post", side_effect=requests.exceptions.ConnectionError("refused")
):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is False
# ---------------------------------------------------------------------------
# ensure_extra_dns_server
# ---------------------------------------------------------------------------
def _add_success_resp():
mock = MagicMock()
mock.status_code = 200
mock.json.return_value = {"result": "", "success": "Connection Added"}
return mock
def _save_success_resp():
mock = MagicMock()
mock.status_code = 200
mock.json.return_value = {"result": "", "success": "Connections Saved"}
return mock
def test_ensure_extra_dns_server_adds_and_configures_new_server():
"""Server not yet registered — adds it, then saves dns+domain_check settings."""
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch(
"requests.post",
side_effect=[_add_success_resp(), _save_success_resp()],
),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is True
def test_ensure_extra_dns_server_skips_add_when_already_present():
"""Server already registered — no add call, only saves settings."""
existing = {
"1.2.3.4": {"dns": "no", "domain_check": "no", "port": "2222", "ssl": "no"}
}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers=existing)),
patch("requests.post", return_value=_save_success_resp()) as mock_post,
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is True
assert mock_post.call_count == 1 # save only, no add
def test_ensure_extra_dns_server_returns_false_when_add_fails():
fail_resp = MagicMock()
fail_resp.status_code = 200
fail_resp.json.return_value = {"result": "error", "success": ""}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch("requests.post", return_value=fail_resp),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is False
def test_ensure_extra_dns_server_returns_false_when_save_fails():
"""Add succeeds but the subsequent settings save fails."""
fail_save = MagicMock()
fail_save.status_code = 200
fail_save.json.return_value = {"result": "error", "success": ""}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch(
"requests.post",
side_effect=[_add_success_resp(), fail_save],
),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is False

View File

@@ -58,6 +58,139 @@ def test_peers_stored():
assert worker.peers[0]["url"] == "http://ddo-2:2222"
def test_peer_from_env_var(monkeypatch):
"""DADNS_PEER_SYNC_PEER_URL adds a peer without a config file."""
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_URL", "http://ddo-env:2222")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_USERNAME", "admin")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_PASSWORD", "secret")
worker = PeerSyncWorker({"enabled": True})
assert len(worker.peers) == 1
assert worker.peers[0]["url"] == "http://ddo-env:2222"
assert worker.peers[0]["username"] == "admin"
assert worker.peers[0]["password"] == "secret"
def test_env_peer_not_duplicated_when_also_in_config(monkeypatch):
"""Env var peer is not added if it already appears in the config file peers list."""
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_URL", "http://ddo-2:2222")
worker = PeerSyncWorker(BASE_CONFIG)
# BASE_CONFIG already has http://ddo-2:2222 — must remain exactly one entry
urls = [p["url"] for p in worker.peers]
assert urls.count("http://ddo-2:2222") == 1
def test_numbered_env_peers(monkeypatch):
"""DADNS_PEER_SYNC_PEER_1_URL and _2_URL add multiple peers."""
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://node-a:2222")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_USERNAME", "peersync")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_PASSWORD", "s3cr3t")
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_2_URL", "http://node-b:2222")
worker = PeerSyncWorker({"enabled": True})
urls = [p["url"] for p in worker.peers]
assert "http://node-a:2222" in urls
assert "http://node-b:2222" in urls
assert len(urls) == 2
def test_numbered_env_peers_not_duplicated(monkeypatch):
"""Numbered env var peers are deduplicated against the config file list."""
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://ddo-2:2222")
worker = PeerSyncWorker(BASE_CONFIG)
urls = [p["url"] for p in worker.peers]
assert urls.count("http://ddo-2:2222") == 1
def test_get_peer_urls():
worker = PeerSyncWorker(BASE_CONFIG)
assert worker.get_peer_urls() == ["http://ddo-2:2222"]
# ---------------------------------------------------------------------------
# Health tracking
# ---------------------------------------------------------------------------
def test_peer_health_starts_healthy():
worker = PeerSyncWorker(BASE_CONFIG)
h = worker._health("http://ddo-2:2222")
assert h["healthy"] is True
assert h["consecutive_failures"] == 0
def test_record_failure_increments_count():
worker = PeerSyncWorker(BASE_CONFIG)
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 1
assert worker._health("http://ddo-2:2222")["healthy"] is True
def test_record_failure_marks_degraded_at_threshold():
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
worker = PeerSyncWorker(BASE_CONFIG)
for _ in range(FAILURE_THRESHOLD):
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
assert worker._health("http://ddo-2:2222")["healthy"] is False
def test_record_success_resets_health():
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
worker = PeerSyncWorker(BASE_CONFIG)
for _ in range(FAILURE_THRESHOLD):
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
assert not worker._health("http://ddo-2:2222")["healthy"]
worker._record_success("http://ddo-2:2222")
assert worker._health("http://ddo-2:2222")["healthy"] is True
assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 0
# ---------------------------------------------------------------------------
# Peer discovery (_discover_peers_from)
# ---------------------------------------------------------------------------
def test_discover_peers_adds_new_peer(monkeypatch):
"""New peer URL returned by /internal/peers is added to the peer list."""
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = ["http://node-c:2222"]
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._discover_peers_from(BASE_CONFIG["peers"][0])
urls = [p["url"] for p in worker.peers]
assert "http://node-c:2222" in urls
def test_discover_peers_skips_known(monkeypatch):
"""Already-known peer URLs are not re-added."""
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(url, auth=None, timeout=10, params=None):
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = ["http://ddo-2:2222"] # already known
return resp
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._discover_peers_from(BASE_CONFIG["peers"][0])
assert len(worker.peers) == 1 # unchanged
def test_discover_peers_tolerates_failure(monkeypatch):
"""Network error during discovery does not propagate."""
worker = PeerSyncWorker(BASE_CONFIG)
def mock_get(*args, **kwargs):
raise ConnectionError("peer down")
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
# Should not raise
worker._discover_peers_from(BASE_CONFIG["peers"][0])
def test_start_skips_when_disabled(caplog):
worker = PeerSyncWorker({"enabled": False})
worker.start()
@@ -261,3 +394,53 @@ def test_sync_empty_peer_list(patch_connect, monkeypatch):
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._sync_from_peer(_make_peer())
# ---------------------------------------------------------------------------
# get_peer_status
# ---------------------------------------------------------------------------
def test_get_peer_status_no_contact_yet():
worker = PeerSyncWorker(BASE_CONFIG)
status = worker.get_peer_status()
assert status["enabled"] is True
assert status["total"] == 1
assert status["healthy"] == 1
assert status["degraded"] == 0
assert status["peers"][0]["url"] == "http://ddo-2:2222"
assert status["peers"][0]["healthy"] is True
assert status["peers"][0]["last_seen"] is None
def test_get_peer_status_after_success():
worker = PeerSyncWorker(BASE_CONFIG)
worker._record_success("http://ddo-2:2222")
status = worker.get_peer_status()
assert status["healthy"] == 1
assert status["degraded"] == 0
assert status["peers"][0]["last_seen"] is not None
def test_get_peer_status_after_degraded():
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
worker = PeerSyncWorker(BASE_CONFIG)
for _ in range(FAILURE_THRESHOLD):
worker._record_failure("http://ddo-2:2222", Exception("timeout"))
status = worker.get_peer_status()
assert status["healthy"] == 0
assert status["degraded"] == 1
assert status["peers"][0]["healthy"] is False
def test_get_peer_status_disabled():
worker = PeerSyncWorker({})
status = worker.get_peer_status()
assert status["enabled"] is False
assert status["total"] == 0
assert status["peers"] == []

View File

@@ -55,7 +55,9 @@ DA_CLIENT_PATH = "directdnsonly.app.reconciler.DirectAdminClient"
def _patch_da(return_value):
"""Patch DirectAdminClient so list_domains returns a fixed value."""
return patch(DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value})
return patch(
DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value}
)
# ---------------------------------------------------------------------------
@@ -233,7 +235,12 @@ def test_heal_skips_domains_without_zone_data(delete_queue, patch_connect):
registry, _ = _make_backend_registry(zone_exists_return=False)
patch_connect.add(
Domain(domain="nodata.com", hostname="da1.example.com", username="admin", zone_data=None)
Domain(
domain="nodata.com",
hostname="da1.example.com",
username="admin",
zone_data=None,
)
)
patch_connect.commit()
@@ -310,3 +317,83 @@ def test_heal_skipped_when_no_registry(delete_queue, patch_connect):
w._reconcile_all()
assert save_queue.empty()
# ---------------------------------------------------------------------------
# get_status — last-run state
# ---------------------------------------------------------------------------
def test_get_status_before_any_run(worker):
status = worker.get_status()
assert status["enabled"] is True
assert status["alive"] is False
assert status["last_run"] == {}
def test_get_status_after_run(worker, patch_connect):
with _patch_da(set()):
worker._reconcile_all()
s = worker.get_status()
assert s["enabled"] is True
lr = s["last_run"]
assert lr["status"] == "ok"
assert "started_at" in lr
assert "completed_at" in lr
assert "duration_seconds" in lr
assert lr["da_servers_polled"] == 1
assert lr["da_servers_unreachable"] == 0
assert lr["dry_run"] is False
def test_get_status_counts_unreachable_server(worker, patch_connect):
with _patch_da(None):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["da_servers_polled"] == 1
assert lr["da_servers_unreachable"] == 1
def test_get_status_counts_orphans(worker, delete_queue, patch_connect):
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with _patch_da(set()):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["orphans_found"] == 1
assert lr["orphans_queued"] == 1
def test_get_status_dry_run_orphans_not_queued_in_stats(dry_run_worker, patch_connect):
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with _patch_da(set()):
dry_run_worker._reconcile_all()
lr = dry_run_worker.get_status()["last_run"]
assert lr["dry_run"] is True
assert lr["orphans_found"] == 1
assert lr["orphans_queued"] == 0
def test_get_status_zones_in_db_counted(worker, patch_connect):
for d in ["a.com", "b.com", "c.com"]:
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
patch_connect.commit()
with _patch_da({"a.com", "b.com", "c.com"}):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["zones_in_db"] == 3
assert lr["zones_in_da"] == 3
assert lr["orphans_found"] == 0

162
tests/test_status_api.py Normal file
View File

@@ -0,0 +1,162 @@
"""Tests for directdnsonly.app.api.status — StatusAPI."""
import json
from unittest.mock import MagicMock
import cherrypy
import pytest
from directdnsonly.app.api.status import StatusAPI
from directdnsonly.app.db.models import Domain
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_RECONCILER_OK = {
"enabled": True,
"alive": True,
"dry_run": False,
"interval_minutes": 60,
"last_run": {},
}
_PEER_SYNC_OFF = {
"enabled": False,
"alive": False,
"peers": [],
"total": 0,
"healthy": 0,
"degraded": 0,
}
def _qs(**overrides):
base = {
"save_queue_size": 0,
"delete_queue_size": 0,
"retry_queue_size": 0,
"dead_letters": 0,
"save_worker_alive": True,
"delete_worker_alive": True,
"retry_worker_alive": True,
"reconciler": _RECONCILER_OK,
"peer_sync": _PEER_SYNC_OFF,
}
base.update(overrides)
return base
def _api(qs=None):
wm = MagicMock()
wm.queue_status.return_value = qs or _qs()
return StatusAPI(wm)
# ---------------------------------------------------------------------------
# _compute_overall
# ---------------------------------------------------------------------------
def test_overall_ok_all_healthy():
assert StatusAPI._compute_overall(_qs()) == "ok"
def test_overall_error_save_worker_dead():
assert StatusAPI._compute_overall(_qs(save_worker_alive=False)) == "error"
def test_overall_error_delete_worker_dead():
assert StatusAPI._compute_overall(_qs(delete_worker_alive=False)) == "error"
def test_overall_degraded_retries_pending():
assert StatusAPI._compute_overall(_qs(retry_queue_size=3)) == "degraded"
def test_overall_degraded_dead_letters():
assert StatusAPI._compute_overall(_qs(dead_letters=1)) == "degraded"
def test_overall_degraded_peer_unhealthy():
ps = {**_PEER_SYNC_OFF, "degraded": 1}
assert StatusAPI._compute_overall(_qs(peer_sync=ps)) == "degraded"
def test_overall_error_takes_priority_over_degraded():
"""error > degraded when both conditions are true."""
assert (
StatusAPI._compute_overall(
_qs(save_worker_alive=False, retry_queue_size=5)
)
== "error"
)
# ---------------------------------------------------------------------------
# _build — structure and zone count
# ---------------------------------------------------------------------------
def test_build_structure(patch_connect):
api = _api()
result = api._build()
assert "status" in result
assert "queues" in result
assert "workers" in result
assert "reconciler" in result
assert "peer_sync" in result
assert "zones" in result
def test_build_zone_count_zero(patch_connect):
api = _api()
result = api._build()
assert result["zones"]["total"] == 0
def test_build_zone_count_with_domains(patch_connect):
for d in ["a.com", "b.com", "c.com"]:
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
patch_connect.commit()
api = _api()
result = api._build()
assert result["zones"]["total"] == 3
def test_build_queues_forwarded(patch_connect):
api = _api(_qs(save_queue_size=2, delete_queue_size=1, retry_queue_size=3, dead_letters=1))
result = api._build()
assert result["queues"]["save"] == 2
assert result["queues"]["delete"] == 1
assert result["queues"]["retry"] == 3
assert result["queues"]["dead_letters"] == 1
def test_build_workers_forwarded(patch_connect):
api = _api()
result = api._build()
assert result["workers"]["save"] is True
assert result["workers"]["delete"] is True
assert result["workers"]["retry_drain"] is True
# ---------------------------------------------------------------------------
# index — JSON encoding
# ---------------------------------------------------------------------------
def test_index_returns_valid_json(patch_connect):
api = _api()
with MagicMock() as mock_resp:
cherrypy.response = mock_resp
cherrypy.response.headers = {}
body = api.index()
data = json.loads(body)
assert data["status"] == "ok"
assert isinstance(data["zones"]["total"], int)