You've already forked directdnsonly
Compare commits
10 Commits
fbb6220728
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| db60d808de | |||
| 0f417da204 | |||
| 3f6a061ffe | |||
| 0b31b75789 | |||
| 83fbb03cad | |||
| 5e9a6f19bd | |||
| 4a4b4f2b98 | |||
| 6e96e78376 | |||
| e8939bcd82 | |||
| d98f08a408 |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -26,3 +26,9 @@ build
|
||||
*.mypy_cache
|
||||
*.pytest_cache
|
||||
/data/*
|
||||
|
||||
# Editor / tool settings — always local, never committed
|
||||
.vscode/
|
||||
.claude/
|
||||
.env
|
||||
*.env
|
||||
|
||||
63
README.md
63
README.md
@@ -115,8 +115,8 @@ Both MySQL backends are written **concurrently** within the same zone update. A
|
||||
|
||||
| Scenario | What happens |
|
||||
|---|---|
|
||||
| One MySQL backend unreachable | Other backend(s) succeed immediately. Failed backend queued for retry with exponential backoff (30 s → 2 m → 5 m → 15 m → 30 m, up to 5 attempts). |
|
||||
| MySQL backend down for hours | Retry queue exhausts. On recovery, the reconciliation healing pass detects the backend is missing zones and re-pushes all of them using stored `zone_data` — no DA intervention required. |
|
||||
| One MySQL backend unreachable | Other backend(s) succeed immediately. Failed backend queued for retry with exponential backoff (30 s → 2 m → 5 m → 15 m → 30 m, up to 5 attempts). CoreDNS continues serving from its local JSON cache throughout. |
|
||||
| MySQL backend down for hours | Retry queue exhausts. CoreDNS serves from cache the entire time — zero query downtime. On recovery, the reconciliation healing pass detects the backend is missing zones and re-pushes all of them using stored `zone_data` — no DA intervention required. |
|
||||
| directdnsonly container restarts | Persistent queue survives. In-flight zone updates replay on startup. |
|
||||
| directdnsonly container down during DA push | DA cannot deliver. Persistent queue on disk is intact; when the container comes back, it resumes processing any previously queued items. New pushes during downtime are lost at the DA level (DA does not retry). |
|
||||
| Zone deleted from DA | Reconciliation poller detects orphan and queues delete across all backends. |
|
||||
@@ -250,7 +250,7 @@ Register each container as a separate Extra DNS server entry in DA → DNS Admin
|
||||
| **Orphan detection** | Yes — reconciler | Yes — reconciler | Yes — reconciler (per instance) |
|
||||
| **External DB required** | No | Yes (MySQL per CoreDNS node) | No (NSD) or Yes (CoreDNS MySQL) |
|
||||
| **Horizontal scaling** | Add DA Extra DNS entries + containers | Add backend stanzas in config | Add DA Extra DNS entries + containers + peer list |
|
||||
| **Best for** | Simple HA, no external DB | Multi-DC, stronger consistency | Most robust HA — survives extended outages without DA re-push |
|
||||
| **Best for** | Simple HA, no external DB | Best overall — resilient writes (retry queue) + resilient reads (CoreDNS cache fallback), no daemon reloads, scales to thousands of zones | Most robust HA — resilient at every layer, survives extended outages without DA re-push |
|
||||
|
||||
---
|
||||
|
||||
@@ -298,10 +298,12 @@ The container image ships with **both NSD and BIND9** installed. The entrypoint
|
||||
|
||||
**Summary recommendation:**
|
||||
|
||||
- **Up to ~300 zones, no external DB:** Use the NSD backend (bundled) — lighter, faster, authoritative-only, same zone file format as BIND.
|
||||
- **300–1 000+ zones:** CoreDNS MySQL wins — zone data in MySQL means no daemon reload at all.
|
||||
- **Need zero-interruption zone swaps:** Knot DNS.
|
||||
- **Need an HTTP API for zone management (no file I/O):** PowerDNS Authoritative with its native HTTP API and file/SQLite backend.
|
||||
- **Any scale, external DB available:** CoreDNS MySQL ([cybercinch fork](https://github.com/cybercinch/coredns_mysql_extend)) wins at every zone count. Connection pooling, JSON cache fallback, health monitoring, and zero-downtime operation during DB maintenance make it the most resilient choice regardless of size. No daemon reload ever needed — a zone write is a MySQL INSERT.
|
||||
- **No external DB, simplicity first:** NSD (bundled) — lightweight, fast, authoritative-only, same RFC 1035 zone file format as BIND.
|
||||
- **Need zero-interruption zone swaps:** Knot DNS (RCU — serves old zone to in-flight queries while atomically swapping in the new one).
|
||||
- **Need an HTTP API for zone management:** PowerDNS Authoritative with its native HTTP API.
|
||||
|
||||
> **Note:** Knot DNS and PowerDNS backends are **not implemented** in directdnsonly — they are listed here as architectural context only. Implemented backends: `nsd`, `bind`, `coredns_mysql`. Pull requests for additional backends are welcome.
|
||||
|
||||
---
|
||||
|
||||
@@ -314,18 +316,23 @@ NS records in the additional section, does not set the AA flag, and does not
|
||||
handle wildcard records.
|
||||
|
||||
This project is designed to work with a patched fork that resolves all of those
|
||||
issues:
|
||||
issues and adds production-grade resilience:
|
||||
|
||||
**[cybercinch/coredns_mysql_extend](https://github.com/cybercinch/coredns_mysql_extend)**
|
||||
|
||||
Key differences from the upstream plugin:
|
||||
| Feature | Detail |
|
||||
|---|---|
|
||||
| **Fully authoritative** | Correct AA flag, NXDOMAIN on misses, NS records in the additional section |
|
||||
| **Wildcard records** | `*` entries served correctly |
|
||||
| **Connection pooling** | Configurable MySQL connection management — efficient under load |
|
||||
| **Degraded operation** | Automatic fallback to a local JSON cache when MySQL is unavailable — DNS keeps serving |
|
||||
| **Smart caching** | Intelligent per-record cache management reduces per-query MySQL round-trips |
|
||||
| **Health monitoring** | Continuous database health checks with configurable intervals |
|
||||
| **Zero downtime** | DNS continues serving during database maintenance windows |
|
||||
|
||||
- Fully authoritative responses — correct AA flag and NXDOMAIN on misses
|
||||
- Wildcard record support (`*` entries served correctly)
|
||||
- NS records returned in the additional section
|
||||
**Why this matters for Topology B:** directdnsonly's retry queue handles the write side during a MySQL outage — the CoreDNS fork handles the read side. Between them, neither writes nor queries are dropped during transient database failures.
|
||||
|
||||
Use the BIND backend if you want a zero-dependency setup with no custom CoreDNS
|
||||
build required.
|
||||
Use the NSD or BIND backend if you want a zero-dependency setup with no custom CoreDNS build required.
|
||||
|
||||
---
|
||||
|
||||
@@ -502,12 +509,20 @@ The built-in env var mapping targets the backend named `coredns_mysql`. For mult
|
||||
|
||||
#### Peer sync
|
||||
|
||||
| Config key | Environment variable | Default | Description |
|
||||
|---|---|---|---|
|
||||
| `peer_sync.enabled` | `DADNS_PEER_SYNC_ENABLED` | `false` | Enable background peer-to-peer zone sync |
|
||||
| `peer_sync.interval_minutes` | `DADNS_PEER_SYNC_INTERVAL_MINUTES` | `15` | How often each peer is polled |
|
||||
| Config key / Environment variable | Default | Description |
|
||||
|---|---|---|
|
||||
| `peer_sync.enabled` / `DADNS_PEER_SYNC_ENABLED` | `false` | Enable background peer-to-peer zone sync |
|
||||
| `peer_sync.interval_minutes` / `DADNS_PEER_SYNC_INTERVAL_MINUTES` | `15` | How often each peer is polled |
|
||||
|
||||
> The `peer_sync.peers` list (peer URLs, credentials) requires a config file — it cannot be expressed as simple env vars.
|
||||
For a **single peer** (the typical two-node Topology C setup) the peer can be configured entirely via env vars — no config file required:
|
||||
|
||||
| Environment variable | Default | Description |
|
||||
|---|---|---|
|
||||
| `DADNS_PEER_SYNC_PEER_URL` | _(unset)_ | URL of the single peer (e.g. `http://ddo-2:2222`). When set, this peer is automatically appended to the peers list. |
|
||||
| `DADNS_PEER_SYNC_PEER_USERNAME` | `directdnsonly` | Basic auth username for the peer |
|
||||
| `DADNS_PEER_SYNC_PEER_PASSWORD` | _(empty)_ | Basic auth password for the peer |
|
||||
|
||||
> For **multiple peers**, use a config file with the `peer_sync.peers` list. A peer defined via env var is deduped — if the same URL already appears in the config file it will not be added twice.
|
||||
|
||||
---
|
||||
|
||||
@@ -540,8 +555,11 @@ services:
|
||||
DADNS_APP_AUTH_PASSWORD: my-strong-secret
|
||||
DADNS_DNS_DEFAULT_BACKEND: nsd
|
||||
DADNS_DNS_BACKENDS_NSD_ENABLED: "true"
|
||||
DADNS_PEER_SYNC_ENABLED: "true"
|
||||
DADNS_PEER_SYNC_PEER_URL: http://directdnsonly-mlb:2222
|
||||
DADNS_PEER_SYNC_PEER_USERNAME: directdnsonly
|
||||
DADNS_PEER_SYNC_PEER_PASSWORD: my-strong-secret
|
||||
volumes:
|
||||
- ./config/syd:/app/config # contains peer_sync.peers list
|
||||
- syd-data:/app/data
|
||||
|
||||
directdnsonly-mlb:
|
||||
@@ -553,8 +571,11 @@ services:
|
||||
DADNS_APP_AUTH_PASSWORD: my-strong-secret
|
||||
DADNS_DNS_DEFAULT_BACKEND: nsd
|
||||
DADNS_DNS_BACKENDS_NSD_ENABLED: "true"
|
||||
DADNS_PEER_SYNC_ENABLED: "true"
|
||||
DADNS_PEER_SYNC_PEER_URL: http://directdnsonly-syd:2222
|
||||
DADNS_PEER_SYNC_PEER_USERNAME: directdnsonly
|
||||
DADNS_PEER_SYNC_PEER_PASSWORD: my-strong-secret
|
||||
volumes:
|
||||
- ./config/mlb:/app/config # contains peer_sync.peers list
|
||||
- mlb-data:/app/data
|
||||
|
||||
volumes:
|
||||
|
||||
17
directdnsonly/__main__.py
Normal file
17
directdnsonly/__main__.py
Normal file
@@ -0,0 +1,17 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
|
||||
def run():
|
||||
# main.py uses short-form imports (from app.*, from worker) that resolve
|
||||
# relative to the directdnsonly/ package directory. Insert it into the
|
||||
# path before importing so `python -m directdnsonly` and the `dadns`
|
||||
# console script both work without changing main.py.
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
from main import main
|
||||
|
||||
main()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run()
|
||||
@@ -7,14 +7,19 @@ from directdnsonly.app.db.models import Domain
|
||||
|
||||
|
||||
class InternalAPI:
|
||||
"""Peer-to-peer zone_data exchange endpoint.
|
||||
"""Peer-to-peer zone_data exchange endpoints.
|
||||
|
||||
Used by PeerSyncWorker to replicate zone_data between directdnsonly
|
||||
instances so each node can independently heal its local backends.
|
||||
|
||||
All routes require the same basic auth as the main API.
|
||||
All routes require peer_sync basic auth credentials, which are
|
||||
configured separately from the main DirectAdmin-facing credentials
|
||||
(peer_sync.auth_username / peer_sync.auth_password).
|
||||
"""
|
||||
|
||||
def __init__(self, peer_syncer=None):
|
||||
self._peer_syncer = peer_syncer
|
||||
|
||||
@cherrypy.expose
|
||||
def zones(self, domain=None):
|
||||
"""Return zone metadata or zone_data for a specific domain.
|
||||
@@ -77,3 +82,15 @@ class InternalAPI:
|
||||
return json.dumps({"error": "internal server error"}).encode()
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@cherrypy.expose
|
||||
def peers(self):
|
||||
"""Return the list of peer URLs this node knows about.
|
||||
|
||||
GET /internal/peers
|
||||
Returns a JSON array of URL strings. Used by other nodes during
|
||||
sync to discover new cluster members (gossip-lite mesh expansion).
|
||||
"""
|
||||
cherrypy.response.headers["Content-Type"] = "application/json"
|
||||
urls = self._peer_syncer.get_peer_urls() if self._peer_syncer else []
|
||||
return json.dumps(urls).encode()
|
||||
|
||||
82
directdnsonly/app/api/status.py
Normal file
82
directdnsonly/app/api/status.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""Operational status endpoint — aggregates queue, worker, reconciler, and peer health."""
|
||||
|
||||
import json
|
||||
|
||||
import cherrypy
|
||||
from sqlalchemy import func, select
|
||||
|
||||
from directdnsonly.app.db import connect
|
||||
from directdnsonly.app.db.models import Domain
|
||||
|
||||
|
||||
class StatusAPI:
|
||||
"""Exposes GET /status as a JSON health/status document.
|
||||
|
||||
Aggregates data from WorkerManager.queue_status() and a live DB zone count
|
||||
into a single response that a UI or monitoring system can poll.
|
||||
|
||||
Overall ``status`` field:
|
||||
- ``ok`` — all workers alive, no dead-letters, all peers healthy
|
||||
- ``degraded`` — retries pending, dead-letters present, or a peer is unhealthy
|
||||
- ``error`` — a core worker thread is not alive
|
||||
"""
|
||||
|
||||
def __init__(self, worker_manager):
|
||||
self._wm = worker_manager
|
||||
|
||||
@cherrypy.expose
|
||||
def index(self):
|
||||
cherrypy.response.headers["Content-Type"] = "application/json"
|
||||
return json.dumps(self._build(), default=str).encode()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _build(self) -> dict:
|
||||
qs = self._wm.queue_status()
|
||||
|
||||
zone_count = self._zone_count()
|
||||
|
||||
overall = self._compute_overall(qs)
|
||||
|
||||
return {
|
||||
"status": overall,
|
||||
"queues": {
|
||||
"save": qs.get("save_queue_size", 0),
|
||||
"delete": qs.get("delete_queue_size", 0),
|
||||
"retry": qs.get("retry_queue_size", 0),
|
||||
"dead_letters": qs.get("dead_letters", 0),
|
||||
},
|
||||
"workers": {
|
||||
"save": qs.get("save_worker_alive"),
|
||||
"delete": qs.get("delete_worker_alive"),
|
||||
"retry_drain": qs.get("retry_worker_alive"),
|
||||
},
|
||||
"reconciler": qs.get("reconciler", {}),
|
||||
"peer_sync": qs.get("peer_sync", {}),
|
||||
"zones": {"total": zone_count},
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _zone_count() -> int:
|
||||
session = connect()
|
||||
try:
|
||||
return session.execute(select(func.count(Domain.id))).scalar() or 0
|
||||
except Exception:
|
||||
return 0
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@staticmethod
|
||||
def _compute_overall(qs: dict) -> str:
|
||||
if not qs.get("save_worker_alive") or not qs.get("delete_worker_alive"):
|
||||
return "error"
|
||||
peer_sync = qs.get("peer_sync", {})
|
||||
if (
|
||||
qs.get("retry_queue_size", 0) > 0
|
||||
or qs.get("dead_letters", 0) > 0
|
||||
or peer_sync.get("degraded", 0) > 0
|
||||
):
|
||||
return "degraded"
|
||||
return "ok"
|
||||
@@ -14,6 +14,7 @@ class Zone(Base):
|
||||
__tablename__ = "zones"
|
||||
id = Column(Integer, primary_key=True)
|
||||
zone_name = Column(String(255), nullable=False, index=True, unique=True)
|
||||
managed_by = Column(String(255), nullable=True) # 'directadmin' | 'direct' | NULL (legacy)
|
||||
|
||||
|
||||
class Record(Base):
|
||||
@@ -90,10 +91,34 @@ class CoreDNSMySQLBackend(DNSBackend):
|
||||
zone_name, zone_data
|
||||
)
|
||||
|
||||
# Track changes
|
||||
current_records = set()
|
||||
# Pre-compute the set of (hostname, type, data) keys that should
|
||||
# remain after this update, so we can identify stale records upfront.
|
||||
incoming_keys = {
|
||||
(name, rtype, data) for name, rtype, data, _ in source_records
|
||||
}
|
||||
|
||||
changes = {"added": 0, "updated": 0, "removed": 0}
|
||||
|
||||
# --- 1. Remove stale records first ---
|
||||
# Deleting before inserting means a brief NXDOMAIN is preferable
|
||||
# to briefly serving both old and new records simultaneously.
|
||||
for key, record in existing_records.items():
|
||||
if key not in incoming_keys:
|
||||
logger.debug(
|
||||
f"Removed record: {record.hostname} {record.type} {record.data}"
|
||||
)
|
||||
session.delete(record)
|
||||
changes["removed"] += 1
|
||||
|
||||
# Handle SOA removal if needed
|
||||
if existing_soa and not source_soa:
|
||||
logger.debug(
|
||||
f"Removed SOA record: {existing_soa.hostname} SOA {existing_soa.data}"
|
||||
)
|
||||
session.delete(existing_soa)
|
||||
changes["removed"] += 1
|
||||
|
||||
# --- 2. Add / update incoming records ---
|
||||
# Handle SOA record
|
||||
if source_soa:
|
||||
soa_name, soa_content, soa_ttl = source_soa
|
||||
@@ -123,7 +148,6 @@ class CoreDNSMySQLBackend(DNSBackend):
|
||||
# Process all non-SOA records
|
||||
for record_name, record_type, record_content, record_ttl in source_records:
|
||||
key = (record_name, record_type, record_content)
|
||||
current_records.add(key)
|
||||
|
||||
if key in existing_records:
|
||||
# Update existing record if TTL changed
|
||||
@@ -151,23 +175,6 @@ class CoreDNSMySQLBackend(DNSBackend):
|
||||
f"Added new record: {record_name} {record_type} {record_content}"
|
||||
)
|
||||
|
||||
# Remove records that no longer exist in the source zone
|
||||
for key, record in existing_records.items():
|
||||
if key not in current_records:
|
||||
logger.debug(
|
||||
f"Removed record: {record.hostname} {record.type} {record.data}"
|
||||
)
|
||||
session.delete(record)
|
||||
changes["removed"] += 1
|
||||
|
||||
# Handle SOA removal if needed
|
||||
if existing_soa and not source_soa:
|
||||
logger.debug(
|
||||
f"Removed SOA record: {existing_soa.hostname} SOA {existing_soa.data}"
|
||||
)
|
||||
session.delete(existing_soa)
|
||||
changes["removed"] += 1
|
||||
|
||||
session.commit()
|
||||
total_changes = changes["added"] + changes["updated"] + changes["removed"]
|
||||
if total_changes > 0:
|
||||
@@ -240,43 +247,27 @@ class CoreDNSMySQLBackend(DNSBackend):
|
||||
session.close()
|
||||
|
||||
def _ensure_zone_exists(self, session, zone_name: str) -> Zone:
|
||||
"""Ensure a zone exists in the database, creating it if necessary"""
|
||||
"""Ensure a zone exists in the database, creating it if necessary."""
|
||||
zone = session.execute(
|
||||
select(Zone).filter_by(zone_name=self.dot_fqdn(zone_name))
|
||||
).scalar_one_or_none()
|
||||
if not zone:
|
||||
logger.debug(f"Creating new zone: {self.dot_fqdn(zone_name)}")
|
||||
zone = Zone(zone_name=self.dot_fqdn(zone_name))
|
||||
zone = Zone(
|
||||
zone_name=self.dot_fqdn(zone_name),
|
||||
managed_by="directadmin",
|
||||
)
|
||||
session.add(zone)
|
||||
session.flush() # Get the zone ID
|
||||
session.flush()
|
||||
elif not zone.managed_by:
|
||||
# Migrate pre-existing rows that were created before this field was added
|
||||
zone.managed_by = "directadmin"
|
||||
return zone
|
||||
|
||||
def _normalize_cname_data(self, zone_name: str, record_content: str) -> str:
|
||||
"""Normalize CNAME record data to ensure consistent FQDN format.
|
||||
|
||||
This ensures CNAME targets are always stored as fully-qualified domain
|
||||
names so that record comparison between the BIND zone source and the
|
||||
database is deterministic.
|
||||
|
||||
Args:
|
||||
zone_name: The zone name for relative-name expansion
|
||||
record_content: The raw CNAME target from the parsed zone
|
||||
|
||||
Returns:
|
||||
The normalized CNAME target string
|
||||
"""
|
||||
if record_content.startswith("@"):
|
||||
logger.debug(f"CNAME target starts with '@', replacing with zone FQDN")
|
||||
record_content = self.dot_fqdn(zone_name)
|
||||
elif not record_content.endswith("."):
|
||||
logger.debug(f"CNAME target {record_content} is relative, appending zone")
|
||||
record_content = ".".join([record_content, self.dot_fqdn(zone_name)])
|
||||
return record_content
|
||||
|
||||
def _parse_zone_to_record_set(
|
||||
self, zone_name: str, zone_data: str
|
||||
) -> Tuple[Set[Tuple[str, str, str, int]], Optional[Tuple[str, str, int]]]:
|
||||
"""Parse a BIND zone file into a set of normalised record keys.
|
||||
"""Parse a BIND zone file into a set of record keys.
|
||||
|
||||
Returns:
|
||||
Tuple of:
|
||||
@@ -287,21 +278,27 @@ class CoreDNSMySQLBackend(DNSBackend):
|
||||
records: Set[Tuple[str, str, str, int]] = set()
|
||||
soa = None
|
||||
|
||||
# Use the zone origin (if available) to expand relative names in RDATA
|
||||
# back to absolute FQDNs. Without this, dnspython's default relativize=True
|
||||
# behaviour turns in-zone targets like `wvvcc.co.nz.` into `@` in the
|
||||
# stored data, which CoreDNS then serves incorrectly.
|
||||
origin = dns_zone.origin
|
||||
|
||||
for name, ttl, rdata in dns_zone.iterate_rdatas():
|
||||
if rdata.rdclass != IN:
|
||||
continue
|
||||
|
||||
record_name = str(name)
|
||||
record_type = rdata.rdtype.name
|
||||
record_content = rdata.to_text()
|
||||
if origin is not None:
|
||||
record_content = rdata.to_text(origin=origin, relativize=False)
|
||||
else:
|
||||
record_content = rdata.to_text()
|
||||
|
||||
if record_type == "SOA":
|
||||
soa = (record_name, record_content, ttl)
|
||||
continue
|
||||
|
||||
if record_type == "CNAME":
|
||||
record_content = self._normalize_cname_data(zone_name, record_content)
|
||||
|
||||
records.add((record_name, record_type, record_content, ttl))
|
||||
|
||||
return records, soa
|
||||
|
||||
@@ -70,7 +70,13 @@ class DirectAdminClient:
|
||||
if response is None:
|
||||
return None
|
||||
|
||||
if response.is_redirect or response.status_code in (301, 302, 303, 307, 308):
|
||||
if response.is_redirect or response.status_code in (
|
||||
301,
|
||||
302,
|
||||
303,
|
||||
307,
|
||||
308,
|
||||
):
|
||||
if self._cookies:
|
||||
logger.error(
|
||||
f"[da:{self.hostname}] Still redirecting after session login — "
|
||||
@@ -155,6 +161,136 @@ class DirectAdminClient:
|
||||
logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}")
|
||||
return None
|
||||
|
||||
def post(
|
||||
self, command: str, data: Optional[dict] = None
|
||||
) -> Optional[requests.Response]:
|
||||
"""Authenticated POST to any DA CMD_* endpoint."""
|
||||
url = f"{self.scheme}://{self.hostname}:{self.port}/{command}"
|
||||
kwargs: dict = dict(
|
||||
data=data or {},
|
||||
timeout=30,
|
||||
verify=self.verify_ssl,
|
||||
allow_redirects=False,
|
||||
)
|
||||
if self._cookies:
|
||||
kwargs["cookies"] = self._cookies
|
||||
else:
|
||||
kwargs["auth"] = (self.username, self.password)
|
||||
|
||||
try:
|
||||
return requests.post(url, **kwargs)
|
||||
except Exception as exc:
|
||||
logger.error(f"[da:{self.hostname}] POST {command} failed: {exc}")
|
||||
return None
|
||||
|
||||
def get_extra_dns_servers(self) -> dict:
|
||||
"""Return the Extra DNS server map from CMD_MULTI_SERVER (GET).
|
||||
|
||||
Returns a dict keyed by server hostname/IP, each value being the
|
||||
per-server settings dict (dns, domain_check, port, user, ssl, …).
|
||||
Returns ``{}`` on any error.
|
||||
"""
|
||||
resp = self.get("CMD_MULTI_SERVER", params={"json": "yes"})
|
||||
if resp is None or resp.status_code != 200:
|
||||
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER GET failed")
|
||||
return {}
|
||||
try:
|
||||
return resp.json().get("servers", {})
|
||||
except Exception as exc:
|
||||
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER parse error: {exc}")
|
||||
return {}
|
||||
|
||||
def add_extra_dns_server(
|
||||
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
|
||||
) -> bool:
|
||||
"""Register a new Extra DNS server via CMD_MULTI_SERVER action=add.
|
||||
|
||||
Returns ``True`` if DA reports success, ``False`` otherwise.
|
||||
"""
|
||||
resp = self.post(
|
||||
"CMD_MULTI_SERVER",
|
||||
data={
|
||||
"action": "add",
|
||||
"json": "yes",
|
||||
"ip": ip,
|
||||
"port": str(port),
|
||||
"user": user,
|
||||
"passwd": passwd,
|
||||
"ssl": "yes" if ssl else "no",
|
||||
},
|
||||
)
|
||||
if resp is None or resp.status_code != 200:
|
||||
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add failed for {ip}")
|
||||
return False
|
||||
try:
|
||||
result = resp.json()
|
||||
if result.get("success"):
|
||||
logger.info(f"[da:{self.hostname}] Added Extra DNS server {ip}")
|
||||
return True
|
||||
logger.error(
|
||||
f"[da:{self.hostname}] CMD_MULTI_SERVER add error: {result.get('result', result)}"
|
||||
)
|
||||
return False
|
||||
except Exception as exc:
|
||||
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add parse error: {exc}")
|
||||
return False
|
||||
|
||||
def ensure_extra_dns_server(
|
||||
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
|
||||
) -> bool:
|
||||
"""Add (if absent) and configure a directdnsonly Extra DNS server.
|
||||
|
||||
Ensures the server is registered with ``dns=yes`` and
|
||||
``domain_check=yes`` so DirectAdmin pushes zone updates to it.
|
||||
Returns ``True`` if fully configured, ``False`` on any failure.
|
||||
"""
|
||||
servers = self.get_extra_dns_servers()
|
||||
if ip not in servers:
|
||||
if not self.add_extra_dns_server(ip, port, user, passwd, ssl):
|
||||
return False
|
||||
|
||||
ssl_str = "yes" if ssl else "no"
|
||||
resp = self.post(
|
||||
"CMD_MULTI_SERVER",
|
||||
data={
|
||||
"action": "multiple",
|
||||
"save": "yes",
|
||||
"json": "yes",
|
||||
"passwd": "",
|
||||
"select0": ip,
|
||||
f"port-{ip}": str(port),
|
||||
f"user-{ip}": user,
|
||||
f"ssl-{ip}": ssl_str,
|
||||
f"dns-{ip}": "yes",
|
||||
f"domain_check-{ip}": "yes",
|
||||
f"user_check-{ip}": "no",
|
||||
f"email-{ip}": "no",
|
||||
f"show_all_users-{ip}": "no",
|
||||
},
|
||||
)
|
||||
if resp is None or resp.status_code != 200:
|
||||
logger.error(
|
||||
f"[da:{self.hostname}] CMD_MULTI_SERVER save failed for {ip}"
|
||||
)
|
||||
return False
|
||||
try:
|
||||
result = resp.json()
|
||||
if result.get("success"):
|
||||
logger.info(
|
||||
f"[da:{self.hostname}] Extra DNS server {ip} configured "
|
||||
f"(dns=yes domain_check=yes)"
|
||||
)
|
||||
return True
|
||||
logger.error(
|
||||
f"[da:{self.hostname}] CMD_MULTI_SERVER save error: {result.get('result', result)}"
|
||||
)
|
||||
return False
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"[da:{self.hostname}] CMD_MULTI_SERVER save parse error: {exc}"
|
||||
)
|
||||
return False
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@@ -25,8 +25,8 @@ class Domain(Base):
|
||||
domain = Column(String(255), unique=True)
|
||||
hostname = Column(String(255))
|
||||
username = Column(String(255))
|
||||
zone_data = Column(Text, nullable=True) # last known zone file from DA
|
||||
zone_updated_at = Column(DateTime, nullable=True) # when zone_data was last stored
|
||||
zone_data = Column(Text, nullable=True) # last known zone file from DA
|
||||
zone_updated_at = Column(DateTime, nullable=True) # when zone_data was last stored
|
||||
|
||||
def __repr__(self):
|
||||
return "<Domain(id='%s', domain='%s', hostname='%s', username='%s')>" % (
|
||||
|
||||
@@ -2,23 +2,37 @@
|
||||
"""Peer sync worker — exchanges zone_data between directdnsonly instances.
|
||||
|
||||
Each node stores zone_data in its local SQLite DB after every successful
|
||||
backend write. When DirectAdmin pushes a zone to one node but the other
|
||||
backend write. When DirectAdmin pushes a zone to one node but another
|
||||
is temporarily offline, the offline node misses that zone_data.
|
||||
|
||||
PeerSyncWorker corrects this by periodically comparing zone lists with
|
||||
configured peers and fetching any zone_data that is newer or absent locally.
|
||||
all known peers and fetching any zone_data that is newer or absent locally.
|
||||
It only updates the local DB — it never writes directly to backends. The
|
||||
existing reconciler healing pass then detects missing zones and re-pushes
|
||||
using the freshly synced zone_data.
|
||||
|
||||
Mesh behaviour:
|
||||
- Each node exposes /internal/peers listing the URLs it knows about
|
||||
- During each sync pass, every peer is asked for its peer list; any URLs
|
||||
not already known are added automatically (gossip-lite discovery)
|
||||
- A three-node cluster therefore only needs a linear chain of initial
|
||||
connections — nodes propagate awareness of each other on the first pass
|
||||
|
||||
Health tracking:
|
||||
- Consecutive failures per peer are counted; after FAILURE_THRESHOLD
|
||||
misses the peer is marked degraded and a warning is logged once
|
||||
- On the next successful contact the peer is marked recovered
|
||||
|
||||
Safety properties:
|
||||
- If a peer is unreachable, skip it silently and retry next interval
|
||||
- If a peer is unreachable, skip it and try next interval
|
||||
- Only zone_data is synced — backend writes remain the sole responsibility
|
||||
of the local save queue worker
|
||||
- Newer zone_updated_at timestamp wins; local data is never overwritten
|
||||
with older peer data
|
||||
- Peer discovery is best-effort and never fails a sync pass
|
||||
"""
|
||||
import datetime
|
||||
import os
|
||||
import threading
|
||||
from loguru import logger
|
||||
import requests
|
||||
@@ -27,6 +41,9 @@ from sqlalchemy import select
|
||||
from directdnsonly.app.db import connect
|
||||
from directdnsonly.app.db.models import Domain
|
||||
|
||||
# Consecutive failures before a peer is logged as degraded
|
||||
FAILURE_THRESHOLD = 3
|
||||
|
||||
|
||||
class PeerSyncWorker:
|
||||
"""Periodically fetches zone_data from peer directdnsonly instances and
|
||||
@@ -36,10 +53,58 @@ class PeerSyncWorker:
|
||||
def __init__(self, peer_sync_config: dict):
|
||||
self.enabled = peer_sync_config.get("enabled", False)
|
||||
self.interval_seconds = peer_sync_config.get("interval_minutes", 15) * 60
|
||||
self.peers = peer_sync_config.get("peers") or []
|
||||
self.peers = list(peer_sync_config.get("peers") or [])
|
||||
|
||||
# Per-peer health state: url -> {consecutive_failures, healthy, last_seen}
|
||||
self._peer_health: dict = {}
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# Env-var peer injection
|
||||
# ----------------------------------------------------------------
|
||||
# Original single-peer vars (backward compat):
|
||||
# DADNS_PEER_SYNC_PEER_URL / _USERNAME / _PASSWORD
|
||||
# Numbered multi-peer vars (new):
|
||||
# DADNS_PEER_SYNC_PEER_1_URL / _USERNAME / _PASSWORD
|
||||
# DADNS_PEER_SYNC_PEER_2_URL / ... (up to 9)
|
||||
known_urls = {p.get("url") for p in self.peers}
|
||||
|
||||
env_candidates = []
|
||||
|
||||
single_url = os.environ.get("DADNS_PEER_SYNC_PEER_URL", "").strip()
|
||||
if single_url:
|
||||
env_candidates.append({
|
||||
"url": single_url,
|
||||
"username": os.environ.get("DADNS_PEER_SYNC_PEER_USERNAME", "peersync"),
|
||||
"password": os.environ.get("DADNS_PEER_SYNC_PEER_PASSWORD", ""),
|
||||
})
|
||||
|
||||
for i in range(1, 10):
|
||||
numbered_url = os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_URL", "").strip()
|
||||
if not numbered_url:
|
||||
break
|
||||
env_candidates.append({
|
||||
"url": numbered_url,
|
||||
"username": os.environ.get(
|
||||
f"DADNS_PEER_SYNC_PEER_{i}_USERNAME", "peersync"
|
||||
),
|
||||
"password": os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_PASSWORD", ""),
|
||||
})
|
||||
|
||||
for candidate in env_candidates:
|
||||
if candidate["url"] not in known_urls:
|
||||
self.peers.append(candidate)
|
||||
known_urls.add(candidate["url"])
|
||||
logger.debug(
|
||||
f"[peer_sync] Added peer from env vars: {candidate['url']}"
|
||||
)
|
||||
|
||||
self._stop_event = threading.Event()
|
||||
self._thread = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def start(self):
|
||||
if not self.enabled:
|
||||
logger.info("Peer sync disabled — skipping")
|
||||
@@ -70,6 +135,70 @@ class PeerSyncWorker:
|
||||
def is_alive(self):
|
||||
return self._thread is not None and self._thread.is_alive()
|
||||
|
||||
def get_peer_urls(self) -> list:
|
||||
"""Return the current list of known peer URLs.
|
||||
Exposed via /internal/peers so other nodes can discover this node's mesh."""
|
||||
return [p["url"] for p in self.peers if p.get("url")]
|
||||
|
||||
def get_peer_status(self) -> dict:
|
||||
"""Return peer health summary for the /status endpoint."""
|
||||
peers = []
|
||||
for peer in self.peers:
|
||||
url = peer.get("url", "")
|
||||
h = self._peer_health.get(url, {})
|
||||
last_seen = h.get("last_seen")
|
||||
peers.append({
|
||||
"url": url,
|
||||
"healthy": h.get("healthy", True),
|
||||
"consecutive_failures": h.get("consecutive_failures", 0),
|
||||
"last_seen": last_seen.isoformat() if last_seen else None,
|
||||
})
|
||||
healthy = sum(1 for p in peers if p["healthy"])
|
||||
return {
|
||||
"enabled": self.enabled,
|
||||
"alive": self.is_alive,
|
||||
"interval_minutes": self.interval_seconds // 60,
|
||||
"peers": peers,
|
||||
"total": len(peers),
|
||||
"healthy": healthy,
|
||||
"degraded": len(peers) - healthy,
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Health tracking
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _health(self, url: str) -> dict:
|
||||
return self._peer_health.setdefault(
|
||||
url, {"consecutive_failures": 0, "healthy": True, "last_seen": None}
|
||||
)
|
||||
|
||||
def _record_success(self, url: str):
|
||||
h = self._health(url)
|
||||
recovered = not h["healthy"]
|
||||
h.update(
|
||||
consecutive_failures=0,
|
||||
healthy=True,
|
||||
last_seen=datetime.datetime.utcnow(),
|
||||
)
|
||||
if recovered:
|
||||
logger.info(f"[peer_sync] {url}: peer recovered")
|
||||
|
||||
def _record_failure(self, url: str, exc):
|
||||
h = self._health(url)
|
||||
h["consecutive_failures"] += 1
|
||||
if h["healthy"] and h["consecutive_failures"] >= FAILURE_THRESHOLD:
|
||||
h["healthy"] = False
|
||||
logger.warning(
|
||||
f"[peer_sync] {url}: marked degraded after {FAILURE_THRESHOLD} "
|
||||
f"consecutive failures — {exc}"
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
f"[peer_sync] {url}: unreachable "
|
||||
f"(failure #{h['consecutive_failures']}) — {exc}"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal
|
||||
# ------------------------------------------------------------------
|
||||
@@ -82,15 +211,49 @@ class PeerSyncWorker:
|
||||
|
||||
def _sync_all(self):
|
||||
logger.debug(f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)")
|
||||
for peer in self.peers:
|
||||
# Iterate over a snapshot — _discover_peers_from may grow self.peers
|
||||
for peer in list(self.peers):
|
||||
url = peer.get("url")
|
||||
if not url:
|
||||
logger.warning("[peer_sync] Peer config missing url — skipping")
|
||||
continue
|
||||
try:
|
||||
self._sync_from_peer(peer)
|
||||
self._discover_peers_from(peer)
|
||||
self._record_success(url)
|
||||
except Exception as exc:
|
||||
logger.warning(f"[peer_sync] Skipping unreachable peer {url}: {exc}")
|
||||
self._record_failure(url, exc)
|
||||
|
||||
def _discover_peers_from(self, peer: dict):
|
||||
"""Fetch peer's known peer list and add any new nodes for mesh expansion.
|
||||
|
||||
This is best-effort — failures are silently swallowed so they never
|
||||
interrupt the main sync pass."""
|
||||
url = peer.get("url", "").rstrip("/")
|
||||
username = peer.get("username")
|
||||
password = peer.get("password")
|
||||
auth = (username, password) if username else None
|
||||
try:
|
||||
resp = requests.get(f"{url}/internal/peers", auth=auth, timeout=5)
|
||||
if resp.status_code != 200:
|
||||
return
|
||||
remote_urls = resp.json() # list of URL strings
|
||||
known_urls = {p.get("url") for p in self.peers}
|
||||
for remote_url in remote_urls:
|
||||
if remote_url and remote_url not in known_urls:
|
||||
# Inherit credentials from the introducing peer — in practice
|
||||
# all cluster nodes share the same peer_sync auth credentials.
|
||||
self.peers.append({
|
||||
"url": remote_url,
|
||||
"username": username,
|
||||
"password": password,
|
||||
})
|
||||
known_urls.add(remote_url)
|
||||
logger.info(
|
||||
f"[peer_sync] Discovered new peer {remote_url} via {url}"
|
||||
)
|
||||
except Exception:
|
||||
pass # discovery is best-effort
|
||||
|
||||
def _sync_from_peer(self, peer: dict):
|
||||
url = peer.get("url", "").rstrip("/")
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
import datetime
|
||||
import threading
|
||||
from loguru import logger
|
||||
from sqlalchemy import select
|
||||
@@ -42,6 +43,17 @@ class ReconciliationWorker:
|
||||
self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60
|
||||
self._stop_event = threading.Event()
|
||||
self._thread = None
|
||||
self._last_run: dict = {}
|
||||
|
||||
def get_status(self) -> dict:
|
||||
"""Return reconciler configuration and last-run statistics."""
|
||||
return {
|
||||
"enabled": self.enabled,
|
||||
"alive": self.is_alive,
|
||||
"dry_run": self.dry_run,
|
||||
"interval_minutes": self.interval_seconds // 60,
|
||||
"last_run": dict(self._last_run),
|
||||
}
|
||||
|
||||
def start(self):
|
||||
if not self.enabled:
|
||||
@@ -104,11 +116,18 @@ class ReconciliationWorker:
|
||||
self._reconcile_all()
|
||||
|
||||
def _reconcile_all(self):
|
||||
started_at = datetime.datetime.utcnow()
|
||||
self._last_run = {"status": "running", "started_at": started_at.isoformat()}
|
||||
logger.info(
|
||||
f"[reconciler] Starting reconciliation pass across "
|
||||
f"{len(self.servers)} server(s)"
|
||||
)
|
||||
total_queued = 0
|
||||
da_servers_polled = 0
|
||||
da_servers_unreachable = 0
|
||||
migrated = 0
|
||||
backfilled = 0
|
||||
zones_in_db = 0
|
||||
|
||||
# Build a map of all domains seen on all DA servers: domain -> hostname
|
||||
all_da_domains: dict = {}
|
||||
@@ -126,23 +145,26 @@ class ReconciliationWorker:
|
||||
ssl=server.get("ssl", True),
|
||||
verify_ssl=self.verify_ssl,
|
||||
)
|
||||
da_servers_polled += 1
|
||||
da_domains = client.list_domains(ipp=self.ipp)
|
||||
if da_domains is not None:
|
||||
for d in da_domains:
|
||||
all_da_domains[d] = hostname
|
||||
else:
|
||||
da_servers_unreachable += 1
|
||||
logger.debug(
|
||||
f"[reconciler] {hostname}: "
|
||||
f"{len(da_domains) if da_domains else 0} active domain(s) in DA"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}")
|
||||
da_servers_unreachable += 1
|
||||
|
||||
# Compare local DB against what DA reported; update masters and queue deletes
|
||||
session = connect()
|
||||
try:
|
||||
all_local_domains = session.execute(select(Domain)).scalars().all()
|
||||
migrated = 0
|
||||
backfilled = 0
|
||||
zones_in_db = len(all_local_domains)
|
||||
known_servers = {s.get("hostname") for s in self.servers}
|
||||
for record in all_local_domains:
|
||||
domain = record.domain
|
||||
@@ -209,10 +231,31 @@ class ReconciliationWorker:
|
||||
)
|
||||
|
||||
# Option C: heal backends that are missing zones
|
||||
zones_healed = 0
|
||||
if self.save_queue is not None and self.backend_registry is not None:
|
||||
self._heal_backends()
|
||||
zones_healed = self._heal_backends()
|
||||
|
||||
def _heal_backends(self):
|
||||
completed_at = datetime.datetime.utcnow()
|
||||
self._last_run = {
|
||||
"status": "ok",
|
||||
"started_at": started_at.isoformat(),
|
||||
"completed_at": completed_at.isoformat(),
|
||||
"duration_seconds": round(
|
||||
(completed_at - started_at).total_seconds(), 1
|
||||
),
|
||||
"da_servers_polled": da_servers_polled,
|
||||
"da_servers_unreachable": da_servers_unreachable,
|
||||
"zones_in_da": len(all_da_domains),
|
||||
"zones_in_db": zones_in_db,
|
||||
"orphans_found": total_queued,
|
||||
"orphans_queued": total_queued if not self.dry_run else 0,
|
||||
"hostnames_backfilled": backfilled,
|
||||
"hostnames_migrated": migrated,
|
||||
"zones_healed": zones_healed,
|
||||
"dry_run": self.dry_run,
|
||||
}
|
||||
|
||||
def _heal_backends(self) -> int:
|
||||
"""Check every backend for zone presence and re-queue any zone that is
|
||||
missing from one or more backends, using the stored zone_data as the
|
||||
authoritative source. This corrects backends that missed pushes due to
|
||||
@@ -220,9 +263,10 @@ class ReconciliationWorker:
|
||||
"""
|
||||
backends = self.backend_registry.get_available_backends()
|
||||
if not backends:
|
||||
return
|
||||
return 0
|
||||
|
||||
session = connect()
|
||||
healed = 0
|
||||
try:
|
||||
domains = session.execute(
|
||||
select(Domain).where(Domain.zone_data.isnot(None))
|
||||
@@ -231,9 +275,7 @@ class ReconciliationWorker:
|
||||
logger.debug(
|
||||
"[reconciler] Healing pass: no zone_data stored yet — skipping"
|
||||
)
|
||||
return
|
||||
|
||||
healed = 0
|
||||
return 0
|
||||
for record in domains:
|
||||
missing = []
|
||||
for backend_name, backend in backends.items():
|
||||
@@ -277,3 +319,4 @@ class ReconciliationWorker:
|
||||
)
|
||||
finally:
|
||||
session.close()
|
||||
return healed
|
||||
|
||||
@@ -69,6 +69,8 @@ def load_config() -> Vyper:
|
||||
# Peer sync defaults
|
||||
v.set_default("peer_sync.enabled", False)
|
||||
v.set_default("peer_sync.interval_minutes", 15)
|
||||
v.set_default("peer_sync.auth_username", "peersync")
|
||||
v.set_default("peer_sync.auth_password", "changeme")
|
||||
|
||||
# Read configuration
|
||||
try:
|
||||
|
||||
@@ -3,6 +3,20 @@ timezone: Pacific/Auckland
|
||||
log_level: INFO
|
||||
queue_location: ./data/queues
|
||||
|
||||
# Application datastore — stores domain index and zone_data for healing/peer-sync.
|
||||
# SQLite (default) requires no extra dependencies and is fine for single-node setups.
|
||||
# MySQL is recommended for multi-node deployments with a shared datastore.
|
||||
datastore:
|
||||
type: sqlite
|
||||
db_location: ./data/directdnsonly.db
|
||||
# --- MySQL ---
|
||||
# type: mysql
|
||||
# host: "127.0.0.1"
|
||||
# port: "3306"
|
||||
# name: "directdnsonly"
|
||||
# user: "directdnsonly"
|
||||
# pass: "changeme"
|
||||
|
||||
app:
|
||||
auth_username: directdnsonly
|
||||
auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var
|
||||
|
||||
@@ -4,6 +4,7 @@ from app.backends import BackendRegistry
|
||||
from app.api.admin import DNSAdminAPI
|
||||
from app.api.health import HealthAPI
|
||||
from app.api.internal import InternalAPI
|
||||
from app.api.status import StatusAPI
|
||||
from app import configure_logging
|
||||
from worker import WorkerManager
|
||||
from directdnsonly.config import config
|
||||
@@ -90,6 +91,17 @@ def main():
|
||||
if config.get_string("app.log_level").upper() != "DEBUG":
|
||||
cherrypy.log.access_log.propagate = False
|
||||
|
||||
# Peer sync auth — separate credentials from the DA-facing API so a
|
||||
# compromised peer node cannot push zones or access the admin endpoints.
|
||||
peer_user_password_dict = {
|
||||
config.get_string("peer_sync.auth_username"): config.get_string(
|
||||
"peer_sync.auth_password"
|
||||
)
|
||||
}
|
||||
peer_check_password = cherrypy.lib.auth_basic.checkpassword_dict(
|
||||
peer_user_password_dict
|
||||
)
|
||||
|
||||
# Mount applications
|
||||
root = Root()
|
||||
root = DNSAdminAPI(
|
||||
@@ -98,12 +110,18 @@ def main():
|
||||
backend_registry=registry,
|
||||
)
|
||||
root.health = HealthAPI(registry)
|
||||
root.internal = InternalAPI()
|
||||
root.internal = InternalAPI(peer_syncer=worker_manager._peer_syncer)
|
||||
root.status = StatusAPI(worker_manager)
|
||||
|
||||
# Add queue status endpoint
|
||||
# Add queue status endpoint (debug)
|
||||
root.queue_status = lambda: worker_manager.queue_status()
|
||||
|
||||
cherrypy.tree.mount(root, "/")
|
||||
# Override auth for /internal so peers use their own credentials
|
||||
cherrypy.tree.mount(root, "/", config={
|
||||
"/internal": {
|
||||
"tools.auth_basic.checkpassword": peer_check_password,
|
||||
}
|
||||
})
|
||||
cherrypy.engine.start()
|
||||
logger.success(f"Server started on port {config.get_int('app.listen_port')}")
|
||||
|
||||
|
||||
@@ -43,6 +43,7 @@ class WorkerManager:
|
||||
self._peer_syncer = None
|
||||
self._reconciliation_config = reconciliation_config or {}
|
||||
self._peer_sync_config = peer_sync_config or {}
|
||||
self._dead_letter_count = 0
|
||||
|
||||
try:
|
||||
os.makedirs(queue_path, exist_ok=True)
|
||||
@@ -62,93 +63,90 @@ class WorkerManager:
|
||||
logger.info("Save queue worker started")
|
||||
session = connect()
|
||||
|
||||
batch_start = None
|
||||
batch_processed = 0
|
||||
batch_failed = 0
|
||||
|
||||
while self._running:
|
||||
# Block until at least one item is available
|
||||
try:
|
||||
item = self.save_queue.get(block=True, timeout=5)
|
||||
|
||||
if batch_start is None:
|
||||
batch_start = time.monotonic()
|
||||
batch_processed = 0
|
||||
batch_failed = 0
|
||||
pending = self.save_queue.qsize()
|
||||
logger.info(
|
||||
f"📥 Batch started — {pending + 1} zone(s) queued for processing"
|
||||
)
|
||||
|
||||
domain = item.get("domain", "unknown")
|
||||
is_retry = item.get("source") in ("retry", "reconciler_heal")
|
||||
target_backends = item.get("failed_backends") # None = all backends
|
||||
|
||||
logger.debug(
|
||||
f"Processing zone update for {domain}"
|
||||
+ (f" [retry #{item.get('retry_count', 0)}]" if is_retry else "")
|
||||
+ (f" [backends: {target_backends}]" if target_backends else "")
|
||||
)
|
||||
|
||||
if not is_retry and not check_zone_exists(domain):
|
||||
put_zone_index(domain, item.get("hostname"), item.get("username"))
|
||||
|
||||
if not all(k in item for k in ["domain", "zone_file"]):
|
||||
logger.error(f"Invalid queue item: {item}")
|
||||
self.save_queue.task_done()
|
||||
batch_failed += 1
|
||||
continue
|
||||
|
||||
backends = self.backend_registry.get_available_backends()
|
||||
if target_backends:
|
||||
backends = {
|
||||
k: v for k, v in backends.items() if k in target_backends
|
||||
}
|
||||
if not backends:
|
||||
logger.warning("No target backends available for this item!")
|
||||
self.save_queue.task_done()
|
||||
batch_failed += 1
|
||||
continue
|
||||
|
||||
if len(backends) > 1:
|
||||
failed = self._process_backends_parallel(backends, item, session)
|
||||
else:
|
||||
failed = set()
|
||||
for backend_name, backend in backends.items():
|
||||
if not self._process_single_backend(
|
||||
backend_name, backend, item, session
|
||||
):
|
||||
failed.add(backend_name)
|
||||
|
||||
if failed:
|
||||
self._schedule_retry(item, failed)
|
||||
batch_failed += 1
|
||||
else:
|
||||
# Successful write — persist zone_data for Option C healing
|
||||
self._store_zone_data(session, domain, item["zone_file"])
|
||||
batch_processed += 1
|
||||
|
||||
self.save_queue.task_done()
|
||||
logger.debug(f"Completed processing for {domain}")
|
||||
|
||||
except Empty:
|
||||
if batch_start is not None:
|
||||
elapsed = time.monotonic() - batch_start
|
||||
total = batch_processed + batch_failed
|
||||
rate = batch_processed / elapsed if elapsed > 0 else 0
|
||||
logger.success(
|
||||
f"📦 Batch complete — {batch_processed}/{total} zone(s) "
|
||||
f"processed successfully in {elapsed:.1f}s "
|
||||
f"({rate:.1f} zones/sec)"
|
||||
+ (f", {batch_failed} failed" if batch_failed else "")
|
||||
)
|
||||
batch_start = None
|
||||
batch_processed = 0
|
||||
batch_failed = 0
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected worker error: {e}")
|
||||
batch_failed += 1
|
||||
time.sleep(1)
|
||||
|
||||
# Open a batch and keep processing until the queue is empty
|
||||
batch_start = time.monotonic()
|
||||
batch_processed = 0
|
||||
batch_failed = 0
|
||||
logger.info("📥 Batch started")
|
||||
|
||||
while True:
|
||||
try:
|
||||
domain = item.get("domain", "unknown")
|
||||
is_retry = item.get("source") in ("retry", "reconciler_heal")
|
||||
target_backends = item.get("failed_backends") # None = all backends
|
||||
|
||||
logger.debug(
|
||||
f"Processing zone update for {domain}"
|
||||
+ (f" [retry #{item.get('retry_count', 0)}]" if is_retry else "")
|
||||
+ (f" [backends: {target_backends}]" if target_backends else "")
|
||||
)
|
||||
|
||||
if not is_retry and not check_zone_exists(domain):
|
||||
put_zone_index(domain, item.get("hostname"), item.get("username"))
|
||||
|
||||
if not all(k in item for k in ["domain", "zone_file"]):
|
||||
logger.error(f"Invalid queue item: {item}")
|
||||
self.save_queue.task_done()
|
||||
batch_failed += 1
|
||||
else:
|
||||
backends = self.backend_registry.get_available_backends()
|
||||
if target_backends:
|
||||
backends = {
|
||||
k: v for k, v in backends.items() if k in target_backends
|
||||
}
|
||||
if not backends:
|
||||
logger.warning("No target backends available for this item!")
|
||||
self.save_queue.task_done()
|
||||
batch_failed += 1
|
||||
else:
|
||||
if len(backends) > 1:
|
||||
failed = self._process_backends_parallel(backends, item, session)
|
||||
else:
|
||||
failed = set()
|
||||
for backend_name, backend in backends.items():
|
||||
if not self._process_single_backend(
|
||||
backend_name, backend, item, session
|
||||
):
|
||||
failed.add(backend_name)
|
||||
|
||||
if failed:
|
||||
self._schedule_retry(item, failed)
|
||||
batch_failed += 1
|
||||
else:
|
||||
self._store_zone_data(session, domain, item["zone_file"])
|
||||
batch_processed += 1
|
||||
|
||||
self.save_queue.task_done()
|
||||
logger.debug(f"Completed processing for {domain}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected worker error processing {item.get('domain', '?')}: {e}")
|
||||
batch_failed += 1
|
||||
time.sleep(1)
|
||||
|
||||
# Check immediately for the next item — keep batch open while
|
||||
# more work is queued; close it only when the queue is empty.
|
||||
try:
|
||||
item = self.save_queue.get_nowait()
|
||||
except Empty:
|
||||
break
|
||||
|
||||
elapsed = time.monotonic() - batch_start
|
||||
total = batch_processed + batch_failed
|
||||
rate = batch_processed / elapsed if elapsed > 0 else 0
|
||||
logger.success(
|
||||
f"📦 Batch complete — {batch_processed}/{total} zone(s) "
|
||||
f"processed successfully in {elapsed:.1f}s "
|
||||
f"({rate:.1f} zones/sec)"
|
||||
+ (f", {batch_failed} failed" if batch_failed else "")
|
||||
)
|
||||
|
||||
def _process_single_backend(self, backend_name, backend, item, session) -> bool:
|
||||
"""Write a zone to one backend. Returns True on success, False on failure."""
|
||||
@@ -208,6 +206,7 @@ class WorkerManager:
|
||||
Discards to dead-letter after MAX_RETRIES attempts."""
|
||||
retry_count = item.get("retry_count", 0) + 1
|
||||
if retry_count > MAX_RETRIES:
|
||||
self._dead_letter_count += 1
|
||||
logger.error(
|
||||
f"[retry] Dead-letter: {item['domain']} failed on "
|
||||
f"{failed_backends} after {MAX_RETRIES} attempts — giving up"
|
||||
@@ -499,18 +498,24 @@ class WorkerManager:
|
||||
logger.info("Workers stopped")
|
||||
|
||||
def queue_status(self):
|
||||
reconciler = (
|
||||
self._reconciler.get_status()
|
||||
if self._reconciler
|
||||
else {"enabled": False, "alive": False, "last_run": {}}
|
||||
)
|
||||
peer_sync = (
|
||||
self._peer_syncer.get_peer_status()
|
||||
if self._peer_syncer
|
||||
else {"enabled": False, "alive": False, "peers": [], "total": 0, "healthy": 0, "degraded": 0}
|
||||
)
|
||||
return {
|
||||
"save_queue_size": self.save_queue.qsize(),
|
||||
"delete_queue_size": self.delete_queue.qsize(),
|
||||
"retry_queue_size": self.retry_queue.qsize(),
|
||||
"save_worker_alive": self._save_thread and self._save_thread.is_alive(),
|
||||
"delete_worker_alive": self._delete_thread
|
||||
and self._delete_thread.is_alive(),
|
||||
"retry_worker_alive": self._retry_thread and self._retry_thread.is_alive(),
|
||||
"reconciler_alive": (
|
||||
self._reconciler.is_alive if self._reconciler else False
|
||||
),
|
||||
"peer_syncer_alive": (
|
||||
self._peer_syncer.is_alive if self._peer_syncer else False
|
||||
),
|
||||
"dead_letters": self._dead_letter_count,
|
||||
"save_worker_alive": bool(self._save_thread and self._save_thread.is_alive()),
|
||||
"delete_worker_alive": bool(self._delete_thread and self._delete_thread.is_alive()),
|
||||
"retry_worker_alive": bool(self._retry_thread and self._retry_thread.is_alive()),
|
||||
"reconciler": reconciler,
|
||||
"peer_sync": peer_sync,
|
||||
}
|
||||
|
||||
@@ -20,6 +20,9 @@ dependencies = [
|
||||
"requests (>=2.32.0,<3.0.0)",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
dadns = "directdnsonly.__main__:run"
|
||||
|
||||
[tool.poetry]
|
||||
package-mode = true
|
||||
|
||||
|
||||
@@ -1,12 +1,33 @@
|
||||
CREATE TABLE IF NOT EXISTS `records` (
|
||||
`id` int(11) NOT NULL AUTO_INCREMENT,
|
||||
`zone` varchar(255) NOT NULL,
|
||||
`name` varchar(255) NOT NULL,
|
||||
`ttl` int(11) DEFAULT NULL,
|
||||
`type` varchar(10) NOT NULL,
|
||||
`data` text NOT NULL,
|
||||
-- DirectDNSOnly — CoreDNS MySQL schema
|
||||
-- Compatible with cybercinch/coredns_mysql_extend
|
||||
--
|
||||
-- managed_by values:
|
||||
-- 'directadmin' zone is managed via directdnsonly / DirectAdmin push
|
||||
-- 'direct' zone was created directly (not via DA)
|
||||
-- NULL legacy row created before this column was added
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `zones` (
|
||||
`id` int(11) NOT NULL AUTO_INCREMENT,
|
||||
`zone_name` varchar(255) NOT NULL,
|
||||
`managed_by` varchar(255) DEFAULT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_zone` (`zone`),
|
||||
KEY `idx_name` (`name`),
|
||||
KEY `idx_type` (`type`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
UNIQUE KEY `uq_zone_name` (`zone_name`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `records` (
|
||||
`id` int(11) NOT NULL AUTO_INCREMENT,
|
||||
`zone_id` int(11) NOT NULL,
|
||||
`hostname` varchar(255) NOT NULL,
|
||||
`type` varchar(10) NOT NULL,
|
||||
`data` text NOT NULL,
|
||||
`ttl` int(11) DEFAULT NULL,
|
||||
`online` tinyint(1) NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_zone_id` (`zone_id`),
|
||||
KEY `idx_hostname` (`hostname`),
|
||||
CONSTRAINT `fk_records_zone` FOREIGN KEY (`zone_id`) REFERENCES `zones` (`id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- Migration: add managed_by to an existing installation
|
||||
-- ALTER TABLE `zones` ADD COLUMN `managed_by` varchar(255) DEFAULT NULL;
|
||||
-- UPDATE `zones` SET `managed_by` = 'directadmin' WHERE `managed_by` IS NULL;
|
||||
|
||||
@@ -38,4 +38,5 @@ def patch_connect(db_session, monkeypatch):
|
||||
monkeypatch.setattr("directdnsonly.app.utils.connect", _factory)
|
||||
monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory)
|
||||
monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory)
|
||||
monkeypatch.setattr("directdnsonly.app.api.status.connect", _factory)
|
||||
return db_session
|
||||
|
||||
@@ -165,3 +165,37 @@ def test_reconcile_no_changes_when_zone_matches(mysql_backend):
|
||||
success, removed = mysql_backend.reconcile_zone_records("example.com", ZONE_DATA)
|
||||
assert success
|
||||
assert removed == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# managed_by field
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_write_zone_sets_managed_by_directadmin(mysql_backend):
|
||||
mysql_backend.write_zone("example.com", ZONE_DATA)
|
||||
session = mysql_backend.Session()
|
||||
zone = session.execute(
|
||||
select(Zone).filter_by(zone_name="example.com.")
|
||||
).scalar_one_or_none()
|
||||
assert zone.managed_by == "directadmin"
|
||||
session.close()
|
||||
|
||||
|
||||
def test_write_zone_migrates_null_managed_by(mysql_backend):
|
||||
"""Zones that pre-exist without managed_by get it set on next write."""
|
||||
session = mysql_backend.Session()
|
||||
zone = Zone(zone_name="example.com.", managed_by=None)
|
||||
session.add(zone)
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
mysql_backend.write_zone("example.com", ZONE_DATA)
|
||||
|
||||
session = mysql_backend.Session()
|
||||
zone = session.execute(
|
||||
select(Zone).filter_by(zone_name="example.com.")
|
||||
).scalar_one_or_none()
|
||||
assert zone.managed_by == "directadmin"
|
||||
session.close()
|
||||
|
||||
|
||||
@@ -24,7 +24,9 @@ def _make_json_response(domains_list, total_pages=1):
|
||||
|
||||
|
||||
def _client():
|
||||
return DirectAdminClient("da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True)
|
||||
return DirectAdminClient(
|
||||
"da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -105,7 +107,9 @@ def test_html_response_returns_none():
|
||||
|
||||
|
||||
def test_connection_error_returns_none():
|
||||
with patch("requests.get", side_effect=requests.exceptions.ConnectionError("refused")):
|
||||
with patch(
|
||||
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
|
||||
):
|
||||
result = _client().list_domains()
|
||||
|
||||
assert result is None
|
||||
@@ -119,7 +123,9 @@ def test_timeout_returns_none():
|
||||
|
||||
|
||||
def test_ssl_error_returns_none():
|
||||
with patch("requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")):
|
||||
with patch(
|
||||
"requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")
|
||||
):
|
||||
result = _client().list_domains()
|
||||
|
||||
assert result is None
|
||||
@@ -131,12 +137,16 @@ def test_ssl_error_returns_none():
|
||||
|
||||
|
||||
def test_parse_standard_querystring():
|
||||
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com&list[]=test.com")
|
||||
result = DirectAdminClient._parse_legacy_domain_list(
|
||||
"list[]=example.com&list[]=test.com"
|
||||
)
|
||||
assert result == {"example.com", "test.com"}
|
||||
|
||||
|
||||
def test_parse_newline_separated():
|
||||
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com\nlist[]=test.com")
|
||||
result = DirectAdminClient._parse_legacy_domain_list(
|
||||
"list[]=example.com\nlist[]=test.com"
|
||||
)
|
||||
assert result == {"example.com", "test.com"}
|
||||
|
||||
|
||||
@@ -190,3 +200,171 @@ def test_login_returns_false_on_exception():
|
||||
result = client._login()
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_extra_dns_servers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _multi_server_get_resp(servers=None):
|
||||
mock = MagicMock()
|
||||
mock.status_code = 200
|
||||
mock.is_redirect = False
|
||||
mock.headers = {"Content-Type": "application/json"}
|
||||
mock.json.return_value = {"CLUSTER_ON": "yes", "servers": servers or {}}
|
||||
mock.raise_for_status = MagicMock()
|
||||
return mock
|
||||
|
||||
|
||||
def test_get_extra_dns_servers_returns_servers_dict():
|
||||
servers = {
|
||||
"1.2.3.4": {"dns": "yes", "domain_check": "yes", "port": "2222", "ssl": "no"}
|
||||
}
|
||||
with patch("requests.get", return_value=_multi_server_get_resp(servers)):
|
||||
result = _client().get_extra_dns_servers()
|
||||
|
||||
assert "1.2.3.4" in result
|
||||
assert result["1.2.3.4"]["dns"] == "yes"
|
||||
|
||||
|
||||
def test_get_extra_dns_servers_returns_empty_on_http_error():
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 500
|
||||
with patch("requests.get", return_value=mock_resp):
|
||||
result = _client().get_extra_dns_servers()
|
||||
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_get_extra_dns_servers_returns_empty_on_connection_error():
|
||||
with patch(
|
||||
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
|
||||
):
|
||||
result = _client().get_extra_dns_servers()
|
||||
|
||||
assert result == {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# add_extra_dns_server
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_add_extra_dns_server_returns_true_on_success():
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 200
|
||||
mock_resp.json.return_value = {"result": "", "success": "Connection Added"}
|
||||
|
||||
with patch("requests.post", return_value=mock_resp):
|
||||
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
|
||||
|
||||
assert result is True
|
||||
|
||||
|
||||
def test_add_extra_dns_server_returns_false_on_da_error():
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 200
|
||||
mock_resp.json.return_value = {"result": "Server already exists", "success": ""}
|
||||
|
||||
with patch("requests.post", return_value=mock_resp):
|
||||
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_add_extra_dns_server_returns_false_on_connection_error():
|
||||
with patch(
|
||||
"requests.post", side_effect=requests.exceptions.ConnectionError("refused")
|
||||
):
|
||||
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ensure_extra_dns_server
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _add_success_resp():
|
||||
mock = MagicMock()
|
||||
mock.status_code = 200
|
||||
mock.json.return_value = {"result": "", "success": "Connection Added"}
|
||||
return mock
|
||||
|
||||
|
||||
def _save_success_resp():
|
||||
mock = MagicMock()
|
||||
mock.status_code = 200
|
||||
mock.json.return_value = {"result": "", "success": "Connections Saved"}
|
||||
return mock
|
||||
|
||||
|
||||
def test_ensure_extra_dns_server_adds_and_configures_new_server():
|
||||
"""Server not yet registered — adds it, then saves dns+domain_check settings."""
|
||||
with (
|
||||
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
|
||||
patch(
|
||||
"requests.post",
|
||||
side_effect=[_add_success_resp(), _save_success_resp()],
|
||||
),
|
||||
):
|
||||
result = _client().ensure_extra_dns_server(
|
||||
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
|
||||
)
|
||||
|
||||
assert result is True
|
||||
|
||||
|
||||
def test_ensure_extra_dns_server_skips_add_when_already_present():
|
||||
"""Server already registered — no add call, only saves settings."""
|
||||
existing = {
|
||||
"1.2.3.4": {"dns": "no", "domain_check": "no", "port": "2222", "ssl": "no"}
|
||||
}
|
||||
with (
|
||||
patch("requests.get", return_value=_multi_server_get_resp(servers=existing)),
|
||||
patch("requests.post", return_value=_save_success_resp()) as mock_post,
|
||||
):
|
||||
result = _client().ensure_extra_dns_server(
|
||||
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
|
||||
)
|
||||
|
||||
assert result is True
|
||||
assert mock_post.call_count == 1 # save only, no add
|
||||
|
||||
|
||||
def test_ensure_extra_dns_server_returns_false_when_add_fails():
|
||||
fail_resp = MagicMock()
|
||||
fail_resp.status_code = 200
|
||||
fail_resp.json.return_value = {"result": "error", "success": ""}
|
||||
|
||||
with (
|
||||
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
|
||||
patch("requests.post", return_value=fail_resp),
|
||||
):
|
||||
result = _client().ensure_extra_dns_server(
|
||||
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
|
||||
)
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_ensure_extra_dns_server_returns_false_when_save_fails():
|
||||
"""Add succeeds but the subsequent settings save fails."""
|
||||
fail_save = MagicMock()
|
||||
fail_save.status_code = 200
|
||||
fail_save.json.return_value = {"result": "error", "success": ""}
|
||||
|
||||
with (
|
||||
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
|
||||
patch(
|
||||
"requests.post",
|
||||
side_effect=[_add_success_resp(), fail_save],
|
||||
),
|
||||
):
|
||||
result = _client().ensure_extra_dns_server(
|
||||
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
|
||||
)
|
||||
|
||||
assert result is False
|
||||
|
||||
@@ -58,6 +58,139 @@ def test_peers_stored():
|
||||
assert worker.peers[0]["url"] == "http://ddo-2:2222"
|
||||
|
||||
|
||||
def test_peer_from_env_var(monkeypatch):
|
||||
"""DADNS_PEER_SYNC_PEER_URL adds a peer without a config file."""
|
||||
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_URL", "http://ddo-env:2222")
|
||||
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_USERNAME", "admin")
|
||||
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_PASSWORD", "secret")
|
||||
worker = PeerSyncWorker({"enabled": True})
|
||||
assert len(worker.peers) == 1
|
||||
assert worker.peers[0]["url"] == "http://ddo-env:2222"
|
||||
assert worker.peers[0]["username"] == "admin"
|
||||
assert worker.peers[0]["password"] == "secret"
|
||||
|
||||
|
||||
def test_env_peer_not_duplicated_when_also_in_config(monkeypatch):
|
||||
"""Env var peer is not added if it already appears in the config file peers list."""
|
||||
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_URL", "http://ddo-2:2222")
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
# BASE_CONFIG already has http://ddo-2:2222 — must remain exactly one entry
|
||||
urls = [p["url"] for p in worker.peers]
|
||||
assert urls.count("http://ddo-2:2222") == 1
|
||||
|
||||
|
||||
def test_numbered_env_peers(monkeypatch):
|
||||
"""DADNS_PEER_SYNC_PEER_1_URL and _2_URL add multiple peers."""
|
||||
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://node-a:2222")
|
||||
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_USERNAME", "peersync")
|
||||
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_PASSWORD", "s3cr3t")
|
||||
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_2_URL", "http://node-b:2222")
|
||||
worker = PeerSyncWorker({"enabled": True})
|
||||
urls = [p["url"] for p in worker.peers]
|
||||
assert "http://node-a:2222" in urls
|
||||
assert "http://node-b:2222" in urls
|
||||
assert len(urls) == 2
|
||||
|
||||
|
||||
def test_numbered_env_peers_not_duplicated(monkeypatch):
|
||||
"""Numbered env var peers are deduplicated against the config file list."""
|
||||
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://ddo-2:2222")
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
urls = [p["url"] for p in worker.peers]
|
||||
assert urls.count("http://ddo-2:2222") == 1
|
||||
|
||||
|
||||
def test_get_peer_urls():
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
assert worker.get_peer_urls() == ["http://ddo-2:2222"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Health tracking
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_peer_health_starts_healthy():
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
h = worker._health("http://ddo-2:2222")
|
||||
assert h["healthy"] is True
|
||||
assert h["consecutive_failures"] == 0
|
||||
|
||||
|
||||
def test_record_failure_increments_count():
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
|
||||
assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 1
|
||||
assert worker._health("http://ddo-2:2222")["healthy"] is True
|
||||
|
||||
|
||||
def test_record_failure_marks_degraded_at_threshold():
|
||||
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
for _ in range(FAILURE_THRESHOLD):
|
||||
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
|
||||
assert worker._health("http://ddo-2:2222")["healthy"] is False
|
||||
|
||||
|
||||
def test_record_success_resets_health():
|
||||
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
for _ in range(FAILURE_THRESHOLD):
|
||||
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
|
||||
assert not worker._health("http://ddo-2:2222")["healthy"]
|
||||
worker._record_success("http://ddo-2:2222")
|
||||
assert worker._health("http://ddo-2:2222")["healthy"] is True
|
||||
assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Peer discovery (_discover_peers_from)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_discover_peers_adds_new_peer(monkeypatch):
|
||||
"""New peer URL returned by /internal/peers is added to the peer list."""
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
|
||||
def mock_get(url, auth=None, timeout=10, params=None):
|
||||
resp = MagicMock()
|
||||
resp.status_code = 200
|
||||
resp.json.return_value = ["http://node-c:2222"]
|
||||
return resp
|
||||
|
||||
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
|
||||
worker._discover_peers_from(BASE_CONFIG["peers"][0])
|
||||
urls = [p["url"] for p in worker.peers]
|
||||
assert "http://node-c:2222" in urls
|
||||
|
||||
|
||||
def test_discover_peers_skips_known(monkeypatch):
|
||||
"""Already-known peer URLs are not re-added."""
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
|
||||
def mock_get(url, auth=None, timeout=10, params=None):
|
||||
resp = MagicMock()
|
||||
resp.status_code = 200
|
||||
resp.json.return_value = ["http://ddo-2:2222"] # already known
|
||||
return resp
|
||||
|
||||
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
|
||||
worker._discover_peers_from(BASE_CONFIG["peers"][0])
|
||||
assert len(worker.peers) == 1 # unchanged
|
||||
|
||||
|
||||
def test_discover_peers_tolerates_failure(monkeypatch):
|
||||
"""Network error during discovery does not propagate."""
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
|
||||
def mock_get(*args, **kwargs):
|
||||
raise ConnectionError("peer down")
|
||||
|
||||
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
|
||||
# Should not raise
|
||||
worker._discover_peers_from(BASE_CONFIG["peers"][0])
|
||||
|
||||
|
||||
def test_start_skips_when_disabled(caplog):
|
||||
worker = PeerSyncWorker({"enabled": False})
|
||||
worker.start()
|
||||
@@ -261,3 +394,53 @@ def test_sync_empty_peer_list(patch_connect, monkeypatch):
|
||||
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
|
||||
|
||||
worker._sync_from_peer(_make_peer())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_peer_status
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_peer_status_no_contact_yet():
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
status = worker.get_peer_status()
|
||||
|
||||
assert status["enabled"] is True
|
||||
assert status["total"] == 1
|
||||
assert status["healthy"] == 1
|
||||
assert status["degraded"] == 0
|
||||
assert status["peers"][0]["url"] == "http://ddo-2:2222"
|
||||
assert status["peers"][0]["healthy"] is True
|
||||
assert status["peers"][0]["last_seen"] is None
|
||||
|
||||
|
||||
def test_get_peer_status_after_success():
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
worker._record_success("http://ddo-2:2222")
|
||||
status = worker.get_peer_status()
|
||||
|
||||
assert status["healthy"] == 1
|
||||
assert status["degraded"] == 0
|
||||
assert status["peers"][0]["last_seen"] is not None
|
||||
|
||||
|
||||
def test_get_peer_status_after_degraded():
|
||||
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
|
||||
|
||||
worker = PeerSyncWorker(BASE_CONFIG)
|
||||
for _ in range(FAILURE_THRESHOLD):
|
||||
worker._record_failure("http://ddo-2:2222", Exception("timeout"))
|
||||
|
||||
status = worker.get_peer_status()
|
||||
assert status["healthy"] == 0
|
||||
assert status["degraded"] == 1
|
||||
assert status["peers"][0]["healthy"] is False
|
||||
|
||||
|
||||
def test_get_peer_status_disabled():
|
||||
worker = PeerSyncWorker({})
|
||||
status = worker.get_peer_status()
|
||||
|
||||
assert status["enabled"] is False
|
||||
assert status["total"] == 0
|
||||
assert status["peers"] == []
|
||||
|
||||
@@ -55,7 +55,9 @@ DA_CLIENT_PATH = "directdnsonly.app.reconciler.DirectAdminClient"
|
||||
|
||||
def _patch_da(return_value):
|
||||
"""Patch DirectAdminClient so list_domains returns a fixed value."""
|
||||
return patch(DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value})
|
||||
return patch(
|
||||
DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value}
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -233,7 +235,12 @@ def test_heal_skips_domains_without_zone_data(delete_queue, patch_connect):
|
||||
registry, _ = _make_backend_registry(zone_exists_return=False)
|
||||
|
||||
patch_connect.add(
|
||||
Domain(domain="nodata.com", hostname="da1.example.com", username="admin", zone_data=None)
|
||||
Domain(
|
||||
domain="nodata.com",
|
||||
hostname="da1.example.com",
|
||||
username="admin",
|
||||
zone_data=None,
|
||||
)
|
||||
)
|
||||
patch_connect.commit()
|
||||
|
||||
@@ -310,3 +317,83 @@ def test_heal_skipped_when_no_registry(delete_queue, patch_connect):
|
||||
w._reconcile_all()
|
||||
|
||||
assert save_queue.empty()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_status — last-run state
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_status_before_any_run(worker):
|
||||
status = worker.get_status()
|
||||
assert status["enabled"] is True
|
||||
assert status["alive"] is False
|
||||
assert status["last_run"] == {}
|
||||
|
||||
|
||||
def test_get_status_after_run(worker, patch_connect):
|
||||
with _patch_da(set()):
|
||||
worker._reconcile_all()
|
||||
|
||||
s = worker.get_status()
|
||||
assert s["enabled"] is True
|
||||
lr = s["last_run"]
|
||||
assert lr["status"] == "ok"
|
||||
assert "started_at" in lr
|
||||
assert "completed_at" in lr
|
||||
assert "duration_seconds" in lr
|
||||
assert lr["da_servers_polled"] == 1
|
||||
assert lr["da_servers_unreachable"] == 0
|
||||
assert lr["dry_run"] is False
|
||||
|
||||
|
||||
def test_get_status_counts_unreachable_server(worker, patch_connect):
|
||||
with _patch_da(None):
|
||||
worker._reconcile_all()
|
||||
|
||||
lr = worker.get_status()["last_run"]
|
||||
assert lr["da_servers_polled"] == 1
|
||||
assert lr["da_servers_unreachable"] == 1
|
||||
|
||||
|
||||
def test_get_status_counts_orphans(worker, delete_queue, patch_connect):
|
||||
patch_connect.add(
|
||||
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
|
||||
)
|
||||
patch_connect.commit()
|
||||
|
||||
with _patch_da(set()):
|
||||
worker._reconcile_all()
|
||||
|
||||
lr = worker.get_status()["last_run"]
|
||||
assert lr["orphans_found"] == 1
|
||||
assert lr["orphans_queued"] == 1
|
||||
|
||||
|
||||
def test_get_status_dry_run_orphans_not_queued_in_stats(dry_run_worker, patch_connect):
|
||||
patch_connect.add(
|
||||
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
|
||||
)
|
||||
patch_connect.commit()
|
||||
|
||||
with _patch_da(set()):
|
||||
dry_run_worker._reconcile_all()
|
||||
|
||||
lr = dry_run_worker.get_status()["last_run"]
|
||||
assert lr["dry_run"] is True
|
||||
assert lr["orphans_found"] == 1
|
||||
assert lr["orphans_queued"] == 0
|
||||
|
||||
|
||||
def test_get_status_zones_in_db_counted(worker, patch_connect):
|
||||
for d in ["a.com", "b.com", "c.com"]:
|
||||
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
|
||||
patch_connect.commit()
|
||||
|
||||
with _patch_da({"a.com", "b.com", "c.com"}):
|
||||
worker._reconcile_all()
|
||||
|
||||
lr = worker.get_status()["last_run"]
|
||||
assert lr["zones_in_db"] == 3
|
||||
assert lr["zones_in_da"] == 3
|
||||
assert lr["orphans_found"] == 0
|
||||
|
||||
162
tests/test_status_api.py
Normal file
162
tests/test_status_api.py
Normal file
@@ -0,0 +1,162 @@
|
||||
"""Tests for directdnsonly.app.api.status — StatusAPI."""
|
||||
|
||||
import json
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import cherrypy
|
||||
import pytest
|
||||
|
||||
from directdnsonly.app.api.status import StatusAPI
|
||||
from directdnsonly.app.db.models import Domain
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_RECONCILER_OK = {
|
||||
"enabled": True,
|
||||
"alive": True,
|
||||
"dry_run": False,
|
||||
"interval_minutes": 60,
|
||||
"last_run": {},
|
||||
}
|
||||
_PEER_SYNC_OFF = {
|
||||
"enabled": False,
|
||||
"alive": False,
|
||||
"peers": [],
|
||||
"total": 0,
|
||||
"healthy": 0,
|
||||
"degraded": 0,
|
||||
}
|
||||
|
||||
|
||||
def _qs(**overrides):
|
||||
base = {
|
||||
"save_queue_size": 0,
|
||||
"delete_queue_size": 0,
|
||||
"retry_queue_size": 0,
|
||||
"dead_letters": 0,
|
||||
"save_worker_alive": True,
|
||||
"delete_worker_alive": True,
|
||||
"retry_worker_alive": True,
|
||||
"reconciler": _RECONCILER_OK,
|
||||
"peer_sync": _PEER_SYNC_OFF,
|
||||
}
|
||||
base.update(overrides)
|
||||
return base
|
||||
|
||||
|
||||
def _api(qs=None):
|
||||
wm = MagicMock()
|
||||
wm.queue_status.return_value = qs or _qs()
|
||||
return StatusAPI(wm)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _compute_overall
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_overall_ok_all_healthy():
|
||||
assert StatusAPI._compute_overall(_qs()) == "ok"
|
||||
|
||||
|
||||
def test_overall_error_save_worker_dead():
|
||||
assert StatusAPI._compute_overall(_qs(save_worker_alive=False)) == "error"
|
||||
|
||||
|
||||
def test_overall_error_delete_worker_dead():
|
||||
assert StatusAPI._compute_overall(_qs(delete_worker_alive=False)) == "error"
|
||||
|
||||
|
||||
def test_overall_degraded_retries_pending():
|
||||
assert StatusAPI._compute_overall(_qs(retry_queue_size=3)) == "degraded"
|
||||
|
||||
|
||||
def test_overall_degraded_dead_letters():
|
||||
assert StatusAPI._compute_overall(_qs(dead_letters=1)) == "degraded"
|
||||
|
||||
|
||||
def test_overall_degraded_peer_unhealthy():
|
||||
ps = {**_PEER_SYNC_OFF, "degraded": 1}
|
||||
assert StatusAPI._compute_overall(_qs(peer_sync=ps)) == "degraded"
|
||||
|
||||
|
||||
def test_overall_error_takes_priority_over_degraded():
|
||||
"""error > degraded when both conditions are true."""
|
||||
assert (
|
||||
StatusAPI._compute_overall(
|
||||
_qs(save_worker_alive=False, retry_queue_size=5)
|
||||
)
|
||||
== "error"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build — structure and zone count
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_build_structure(patch_connect):
|
||||
api = _api()
|
||||
result = api._build()
|
||||
|
||||
assert "status" in result
|
||||
assert "queues" in result
|
||||
assert "workers" in result
|
||||
assert "reconciler" in result
|
||||
assert "peer_sync" in result
|
||||
assert "zones" in result
|
||||
|
||||
|
||||
def test_build_zone_count_zero(patch_connect):
|
||||
api = _api()
|
||||
result = api._build()
|
||||
assert result["zones"]["total"] == 0
|
||||
|
||||
|
||||
def test_build_zone_count_with_domains(patch_connect):
|
||||
for d in ["a.com", "b.com", "c.com"]:
|
||||
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
|
||||
patch_connect.commit()
|
||||
|
||||
api = _api()
|
||||
result = api._build()
|
||||
assert result["zones"]["total"] == 3
|
||||
|
||||
|
||||
def test_build_queues_forwarded(patch_connect):
|
||||
api = _api(_qs(save_queue_size=2, delete_queue_size=1, retry_queue_size=3, dead_letters=1))
|
||||
result = api._build()
|
||||
|
||||
assert result["queues"]["save"] == 2
|
||||
assert result["queues"]["delete"] == 1
|
||||
assert result["queues"]["retry"] == 3
|
||||
assert result["queues"]["dead_letters"] == 1
|
||||
|
||||
|
||||
def test_build_workers_forwarded(patch_connect):
|
||||
api = _api()
|
||||
result = api._build()
|
||||
|
||||
assert result["workers"]["save"] is True
|
||||
assert result["workers"]["delete"] is True
|
||||
assert result["workers"]["retry_drain"] is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# index — JSON encoding
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_index_returns_valid_json(patch_connect):
|
||||
api = _api()
|
||||
with MagicMock() as mock_resp:
|
||||
cherrypy.response = mock_resp
|
||||
cherrypy.response.headers = {}
|
||||
body = api.index()
|
||||
|
||||
data = json.loads(body)
|
||||
assert data["status"] == "ok"
|
||||
assert isinstance(data["zones"]["total"], int)
|
||||
Reference in New Issue
Block a user