You've already forked directdnsonly
Compare commits
10 Commits
fbb6220728
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| db60d808de | |||
| 0f417da204 | |||
| 3f6a061ffe | |||
| 0b31b75789 | |||
| 83fbb03cad | |||
| 5e9a6f19bd | |||
| 4a4b4f2b98 | |||
| 6e96e78376 | |||
| e8939bcd82 | |||
| d98f08a408 |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -26,3 +26,9 @@ build
|
|||||||
*.mypy_cache
|
*.mypy_cache
|
||||||
*.pytest_cache
|
*.pytest_cache
|
||||||
/data/*
|
/data/*
|
||||||
|
|
||||||
|
# Editor / tool settings — always local, never committed
|
||||||
|
.vscode/
|
||||||
|
.claude/
|
||||||
|
.env
|
||||||
|
*.env
|
||||||
|
|||||||
63
README.md
63
README.md
@@ -115,8 +115,8 @@ Both MySQL backends are written **concurrently** within the same zone update. A
|
|||||||
|
|
||||||
| Scenario | What happens |
|
| Scenario | What happens |
|
||||||
|---|---|
|
|---|---|
|
||||||
| One MySQL backend unreachable | Other backend(s) succeed immediately. Failed backend queued for retry with exponential backoff (30 s → 2 m → 5 m → 15 m → 30 m, up to 5 attempts). |
|
| One MySQL backend unreachable | Other backend(s) succeed immediately. Failed backend queued for retry with exponential backoff (30 s → 2 m → 5 m → 15 m → 30 m, up to 5 attempts). CoreDNS continues serving from its local JSON cache throughout. |
|
||||||
| MySQL backend down for hours | Retry queue exhausts. On recovery, the reconciliation healing pass detects the backend is missing zones and re-pushes all of them using stored `zone_data` — no DA intervention required. |
|
| MySQL backend down for hours | Retry queue exhausts. CoreDNS serves from cache the entire time — zero query downtime. On recovery, the reconciliation healing pass detects the backend is missing zones and re-pushes all of them using stored `zone_data` — no DA intervention required. |
|
||||||
| directdnsonly container restarts | Persistent queue survives. In-flight zone updates replay on startup. |
|
| directdnsonly container restarts | Persistent queue survives. In-flight zone updates replay on startup. |
|
||||||
| directdnsonly container down during DA push | DA cannot deliver. Persistent queue on disk is intact; when the container comes back, it resumes processing any previously queued items. New pushes during downtime are lost at the DA level (DA does not retry). |
|
| directdnsonly container down during DA push | DA cannot deliver. Persistent queue on disk is intact; when the container comes back, it resumes processing any previously queued items. New pushes during downtime are lost at the DA level (DA does not retry). |
|
||||||
| Zone deleted from DA | Reconciliation poller detects orphan and queues delete across all backends. |
|
| Zone deleted from DA | Reconciliation poller detects orphan and queues delete across all backends. |
|
||||||
@@ -250,7 +250,7 @@ Register each container as a separate Extra DNS server entry in DA → DNS Admin
|
|||||||
| **Orphan detection** | Yes — reconciler | Yes — reconciler | Yes — reconciler (per instance) |
|
| **Orphan detection** | Yes — reconciler | Yes — reconciler | Yes — reconciler (per instance) |
|
||||||
| **External DB required** | No | Yes (MySQL per CoreDNS node) | No (NSD) or Yes (CoreDNS MySQL) |
|
| **External DB required** | No | Yes (MySQL per CoreDNS node) | No (NSD) or Yes (CoreDNS MySQL) |
|
||||||
| **Horizontal scaling** | Add DA Extra DNS entries + containers | Add backend stanzas in config | Add DA Extra DNS entries + containers + peer list |
|
| **Horizontal scaling** | Add DA Extra DNS entries + containers | Add backend stanzas in config | Add DA Extra DNS entries + containers + peer list |
|
||||||
| **Best for** | Simple HA, no external DB | Multi-DC, stronger consistency | Most robust HA — survives extended outages without DA re-push |
|
| **Best for** | Simple HA, no external DB | Best overall — resilient writes (retry queue) + resilient reads (CoreDNS cache fallback), no daemon reloads, scales to thousands of zones | Most robust HA — resilient at every layer, survives extended outages without DA re-push |
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -298,10 +298,12 @@ The container image ships with **both NSD and BIND9** installed. The entrypoint
|
|||||||
|
|
||||||
**Summary recommendation:**
|
**Summary recommendation:**
|
||||||
|
|
||||||
- **Up to ~300 zones, no external DB:** Use the NSD backend (bundled) — lighter, faster, authoritative-only, same zone file format as BIND.
|
- **Any scale, external DB available:** CoreDNS MySQL ([cybercinch fork](https://github.com/cybercinch/coredns_mysql_extend)) wins at every zone count. Connection pooling, JSON cache fallback, health monitoring, and zero-downtime operation during DB maintenance make it the most resilient choice regardless of size. No daemon reload ever needed — a zone write is a MySQL INSERT.
|
||||||
- **300–1 000+ zones:** CoreDNS MySQL wins — zone data in MySQL means no daemon reload at all.
|
- **No external DB, simplicity first:** NSD (bundled) — lightweight, fast, authoritative-only, same RFC 1035 zone file format as BIND.
|
||||||
- **Need zero-interruption zone swaps:** Knot DNS.
|
- **Need zero-interruption zone swaps:** Knot DNS (RCU — serves old zone to in-flight queries while atomically swapping in the new one).
|
||||||
- **Need an HTTP API for zone management (no file I/O):** PowerDNS Authoritative with its native HTTP API and file/SQLite backend.
|
- **Need an HTTP API for zone management:** PowerDNS Authoritative with its native HTTP API.
|
||||||
|
|
||||||
|
> **Note:** Knot DNS and PowerDNS backends are **not implemented** in directdnsonly — they are listed here as architectural context only. Implemented backends: `nsd`, `bind`, `coredns_mysql`. Pull requests for additional backends are welcome.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -314,18 +316,23 @@ NS records in the additional section, does not set the AA flag, and does not
|
|||||||
handle wildcard records.
|
handle wildcard records.
|
||||||
|
|
||||||
This project is designed to work with a patched fork that resolves all of those
|
This project is designed to work with a patched fork that resolves all of those
|
||||||
issues:
|
issues and adds production-grade resilience:
|
||||||
|
|
||||||
**[cybercinch/coredns_mysql_extend](https://github.com/cybercinch/coredns_mysql_extend)**
|
**[cybercinch/coredns_mysql_extend](https://github.com/cybercinch/coredns_mysql_extend)**
|
||||||
|
|
||||||
Key differences from the upstream plugin:
|
| Feature | Detail |
|
||||||
|
|---|---|
|
||||||
|
| **Fully authoritative** | Correct AA flag, NXDOMAIN on misses, NS records in the additional section |
|
||||||
|
| **Wildcard records** | `*` entries served correctly |
|
||||||
|
| **Connection pooling** | Configurable MySQL connection management — efficient under load |
|
||||||
|
| **Degraded operation** | Automatic fallback to a local JSON cache when MySQL is unavailable — DNS keeps serving |
|
||||||
|
| **Smart caching** | Intelligent per-record cache management reduces per-query MySQL round-trips |
|
||||||
|
| **Health monitoring** | Continuous database health checks with configurable intervals |
|
||||||
|
| **Zero downtime** | DNS continues serving during database maintenance windows |
|
||||||
|
|
||||||
- Fully authoritative responses — correct AA flag and NXDOMAIN on misses
|
**Why this matters for Topology B:** directdnsonly's retry queue handles the write side during a MySQL outage — the CoreDNS fork handles the read side. Between them, neither writes nor queries are dropped during transient database failures.
|
||||||
- Wildcard record support (`*` entries served correctly)
|
|
||||||
- NS records returned in the additional section
|
|
||||||
|
|
||||||
Use the BIND backend if you want a zero-dependency setup with no custom CoreDNS
|
Use the NSD or BIND backend if you want a zero-dependency setup with no custom CoreDNS build required.
|
||||||
build required.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -502,12 +509,20 @@ The built-in env var mapping targets the backend named `coredns_mysql`. For mult
|
|||||||
|
|
||||||
#### Peer sync
|
#### Peer sync
|
||||||
|
|
||||||
| Config key | Environment variable | Default | Description |
|
| Config key / Environment variable | Default | Description |
|
||||||
|---|---|---|---|
|
|---|---|---|
|
||||||
| `peer_sync.enabled` | `DADNS_PEER_SYNC_ENABLED` | `false` | Enable background peer-to-peer zone sync |
|
| `peer_sync.enabled` / `DADNS_PEER_SYNC_ENABLED` | `false` | Enable background peer-to-peer zone sync |
|
||||||
| `peer_sync.interval_minutes` | `DADNS_PEER_SYNC_INTERVAL_MINUTES` | `15` | How often each peer is polled |
|
| `peer_sync.interval_minutes` / `DADNS_PEER_SYNC_INTERVAL_MINUTES` | `15` | How often each peer is polled |
|
||||||
|
|
||||||
> The `peer_sync.peers` list (peer URLs, credentials) requires a config file — it cannot be expressed as simple env vars.
|
For a **single peer** (the typical two-node Topology C setup) the peer can be configured entirely via env vars — no config file required:
|
||||||
|
|
||||||
|
| Environment variable | Default | Description |
|
||||||
|
|---|---|---|
|
||||||
|
| `DADNS_PEER_SYNC_PEER_URL` | _(unset)_ | URL of the single peer (e.g. `http://ddo-2:2222`). When set, this peer is automatically appended to the peers list. |
|
||||||
|
| `DADNS_PEER_SYNC_PEER_USERNAME` | `directdnsonly` | Basic auth username for the peer |
|
||||||
|
| `DADNS_PEER_SYNC_PEER_PASSWORD` | _(empty)_ | Basic auth password for the peer |
|
||||||
|
|
||||||
|
> For **multiple peers**, use a config file with the `peer_sync.peers` list. A peer defined via env var is deduped — if the same URL already appears in the config file it will not be added twice.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -540,8 +555,11 @@ services:
|
|||||||
DADNS_APP_AUTH_PASSWORD: my-strong-secret
|
DADNS_APP_AUTH_PASSWORD: my-strong-secret
|
||||||
DADNS_DNS_DEFAULT_BACKEND: nsd
|
DADNS_DNS_DEFAULT_BACKEND: nsd
|
||||||
DADNS_DNS_BACKENDS_NSD_ENABLED: "true"
|
DADNS_DNS_BACKENDS_NSD_ENABLED: "true"
|
||||||
|
DADNS_PEER_SYNC_ENABLED: "true"
|
||||||
|
DADNS_PEER_SYNC_PEER_URL: http://directdnsonly-mlb:2222
|
||||||
|
DADNS_PEER_SYNC_PEER_USERNAME: directdnsonly
|
||||||
|
DADNS_PEER_SYNC_PEER_PASSWORD: my-strong-secret
|
||||||
volumes:
|
volumes:
|
||||||
- ./config/syd:/app/config # contains peer_sync.peers list
|
|
||||||
- syd-data:/app/data
|
- syd-data:/app/data
|
||||||
|
|
||||||
directdnsonly-mlb:
|
directdnsonly-mlb:
|
||||||
@@ -553,8 +571,11 @@ services:
|
|||||||
DADNS_APP_AUTH_PASSWORD: my-strong-secret
|
DADNS_APP_AUTH_PASSWORD: my-strong-secret
|
||||||
DADNS_DNS_DEFAULT_BACKEND: nsd
|
DADNS_DNS_DEFAULT_BACKEND: nsd
|
||||||
DADNS_DNS_BACKENDS_NSD_ENABLED: "true"
|
DADNS_DNS_BACKENDS_NSD_ENABLED: "true"
|
||||||
|
DADNS_PEER_SYNC_ENABLED: "true"
|
||||||
|
DADNS_PEER_SYNC_PEER_URL: http://directdnsonly-syd:2222
|
||||||
|
DADNS_PEER_SYNC_PEER_USERNAME: directdnsonly
|
||||||
|
DADNS_PEER_SYNC_PEER_PASSWORD: my-strong-secret
|
||||||
volumes:
|
volumes:
|
||||||
- ./config/mlb:/app/config # contains peer_sync.peers list
|
|
||||||
- mlb-data:/app/data
|
- mlb-data:/app/data
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
|
|||||||
17
directdnsonly/__main__.py
Normal file
17
directdnsonly/__main__.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
def run():
|
||||||
|
# main.py uses short-form imports (from app.*, from worker) that resolve
|
||||||
|
# relative to the directdnsonly/ package directory. Insert it into the
|
||||||
|
# path before importing so `python -m directdnsonly` and the `dadns`
|
||||||
|
# console script both work without changing main.py.
|
||||||
|
sys.path.insert(0, os.path.dirname(__file__))
|
||||||
|
from main import main
|
||||||
|
|
||||||
|
main()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
run()
|
||||||
@@ -7,14 +7,19 @@ from directdnsonly.app.db.models import Domain
|
|||||||
|
|
||||||
|
|
||||||
class InternalAPI:
|
class InternalAPI:
|
||||||
"""Peer-to-peer zone_data exchange endpoint.
|
"""Peer-to-peer zone_data exchange endpoints.
|
||||||
|
|
||||||
Used by PeerSyncWorker to replicate zone_data between directdnsonly
|
Used by PeerSyncWorker to replicate zone_data between directdnsonly
|
||||||
instances so each node can independently heal its local backends.
|
instances so each node can independently heal its local backends.
|
||||||
|
|
||||||
All routes require the same basic auth as the main API.
|
All routes require peer_sync basic auth credentials, which are
|
||||||
|
configured separately from the main DirectAdmin-facing credentials
|
||||||
|
(peer_sync.auth_username / peer_sync.auth_password).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def __init__(self, peer_syncer=None):
|
||||||
|
self._peer_syncer = peer_syncer
|
||||||
|
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
def zones(self, domain=None):
|
def zones(self, domain=None):
|
||||||
"""Return zone metadata or zone_data for a specific domain.
|
"""Return zone metadata or zone_data for a specific domain.
|
||||||
@@ -77,3 +82,15 @@ class InternalAPI:
|
|||||||
return json.dumps({"error": "internal server error"}).encode()
|
return json.dumps({"error": "internal server error"}).encode()
|
||||||
finally:
|
finally:
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
|
@cherrypy.expose
|
||||||
|
def peers(self):
|
||||||
|
"""Return the list of peer URLs this node knows about.
|
||||||
|
|
||||||
|
GET /internal/peers
|
||||||
|
Returns a JSON array of URL strings. Used by other nodes during
|
||||||
|
sync to discover new cluster members (gossip-lite mesh expansion).
|
||||||
|
"""
|
||||||
|
cherrypy.response.headers["Content-Type"] = "application/json"
|
||||||
|
urls = self._peer_syncer.get_peer_urls() if self._peer_syncer else []
|
||||||
|
return json.dumps(urls).encode()
|
||||||
|
|||||||
82
directdnsonly/app/api/status.py
Normal file
82
directdnsonly/app/api/status.py
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
"""Operational status endpoint — aggregates queue, worker, reconciler, and peer health."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
import cherrypy
|
||||||
|
from sqlalchemy import func, select
|
||||||
|
|
||||||
|
from directdnsonly.app.db import connect
|
||||||
|
from directdnsonly.app.db.models import Domain
|
||||||
|
|
||||||
|
|
||||||
|
class StatusAPI:
|
||||||
|
"""Exposes GET /status as a JSON health/status document.
|
||||||
|
|
||||||
|
Aggregates data from WorkerManager.queue_status() and a live DB zone count
|
||||||
|
into a single response that a UI or monitoring system can poll.
|
||||||
|
|
||||||
|
Overall ``status`` field:
|
||||||
|
- ``ok`` — all workers alive, no dead-letters, all peers healthy
|
||||||
|
- ``degraded`` — retries pending, dead-letters present, or a peer is unhealthy
|
||||||
|
- ``error`` — a core worker thread is not alive
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, worker_manager):
|
||||||
|
self._wm = worker_manager
|
||||||
|
|
||||||
|
@cherrypy.expose
|
||||||
|
def index(self):
|
||||||
|
cherrypy.response.headers["Content-Type"] = "application/json"
|
||||||
|
return json.dumps(self._build(), default=str).encode()
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Internal
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _build(self) -> dict:
|
||||||
|
qs = self._wm.queue_status()
|
||||||
|
|
||||||
|
zone_count = self._zone_count()
|
||||||
|
|
||||||
|
overall = self._compute_overall(qs)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status": overall,
|
||||||
|
"queues": {
|
||||||
|
"save": qs.get("save_queue_size", 0),
|
||||||
|
"delete": qs.get("delete_queue_size", 0),
|
||||||
|
"retry": qs.get("retry_queue_size", 0),
|
||||||
|
"dead_letters": qs.get("dead_letters", 0),
|
||||||
|
},
|
||||||
|
"workers": {
|
||||||
|
"save": qs.get("save_worker_alive"),
|
||||||
|
"delete": qs.get("delete_worker_alive"),
|
||||||
|
"retry_drain": qs.get("retry_worker_alive"),
|
||||||
|
},
|
||||||
|
"reconciler": qs.get("reconciler", {}),
|
||||||
|
"peer_sync": qs.get("peer_sync", {}),
|
||||||
|
"zones": {"total": zone_count},
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _zone_count() -> int:
|
||||||
|
session = connect()
|
||||||
|
try:
|
||||||
|
return session.execute(select(func.count(Domain.id))).scalar() or 0
|
||||||
|
except Exception:
|
||||||
|
return 0
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _compute_overall(qs: dict) -> str:
|
||||||
|
if not qs.get("save_worker_alive") or not qs.get("delete_worker_alive"):
|
||||||
|
return "error"
|
||||||
|
peer_sync = qs.get("peer_sync", {})
|
||||||
|
if (
|
||||||
|
qs.get("retry_queue_size", 0) > 0
|
||||||
|
or qs.get("dead_letters", 0) > 0
|
||||||
|
or peer_sync.get("degraded", 0) > 0
|
||||||
|
):
|
||||||
|
return "degraded"
|
||||||
|
return "ok"
|
||||||
@@ -14,6 +14,7 @@ class Zone(Base):
|
|||||||
__tablename__ = "zones"
|
__tablename__ = "zones"
|
||||||
id = Column(Integer, primary_key=True)
|
id = Column(Integer, primary_key=True)
|
||||||
zone_name = Column(String(255), nullable=False, index=True, unique=True)
|
zone_name = Column(String(255), nullable=False, index=True, unique=True)
|
||||||
|
managed_by = Column(String(255), nullable=True) # 'directadmin' | 'direct' | NULL (legacy)
|
||||||
|
|
||||||
|
|
||||||
class Record(Base):
|
class Record(Base):
|
||||||
@@ -90,10 +91,34 @@ class CoreDNSMySQLBackend(DNSBackend):
|
|||||||
zone_name, zone_data
|
zone_name, zone_data
|
||||||
)
|
)
|
||||||
|
|
||||||
# Track changes
|
# Pre-compute the set of (hostname, type, data) keys that should
|
||||||
current_records = set()
|
# remain after this update, so we can identify stale records upfront.
|
||||||
|
incoming_keys = {
|
||||||
|
(name, rtype, data) for name, rtype, data, _ in source_records
|
||||||
|
}
|
||||||
|
|
||||||
changes = {"added": 0, "updated": 0, "removed": 0}
|
changes = {"added": 0, "updated": 0, "removed": 0}
|
||||||
|
|
||||||
|
# --- 1. Remove stale records first ---
|
||||||
|
# Deleting before inserting means a brief NXDOMAIN is preferable
|
||||||
|
# to briefly serving both old and new records simultaneously.
|
||||||
|
for key, record in existing_records.items():
|
||||||
|
if key not in incoming_keys:
|
||||||
|
logger.debug(
|
||||||
|
f"Removed record: {record.hostname} {record.type} {record.data}"
|
||||||
|
)
|
||||||
|
session.delete(record)
|
||||||
|
changes["removed"] += 1
|
||||||
|
|
||||||
|
# Handle SOA removal if needed
|
||||||
|
if existing_soa and not source_soa:
|
||||||
|
logger.debug(
|
||||||
|
f"Removed SOA record: {existing_soa.hostname} SOA {existing_soa.data}"
|
||||||
|
)
|
||||||
|
session.delete(existing_soa)
|
||||||
|
changes["removed"] += 1
|
||||||
|
|
||||||
|
# --- 2. Add / update incoming records ---
|
||||||
# Handle SOA record
|
# Handle SOA record
|
||||||
if source_soa:
|
if source_soa:
|
||||||
soa_name, soa_content, soa_ttl = source_soa
|
soa_name, soa_content, soa_ttl = source_soa
|
||||||
@@ -123,7 +148,6 @@ class CoreDNSMySQLBackend(DNSBackend):
|
|||||||
# Process all non-SOA records
|
# Process all non-SOA records
|
||||||
for record_name, record_type, record_content, record_ttl in source_records:
|
for record_name, record_type, record_content, record_ttl in source_records:
|
||||||
key = (record_name, record_type, record_content)
|
key = (record_name, record_type, record_content)
|
||||||
current_records.add(key)
|
|
||||||
|
|
||||||
if key in existing_records:
|
if key in existing_records:
|
||||||
# Update existing record if TTL changed
|
# Update existing record if TTL changed
|
||||||
@@ -151,23 +175,6 @@ class CoreDNSMySQLBackend(DNSBackend):
|
|||||||
f"Added new record: {record_name} {record_type} {record_content}"
|
f"Added new record: {record_name} {record_type} {record_content}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Remove records that no longer exist in the source zone
|
|
||||||
for key, record in existing_records.items():
|
|
||||||
if key not in current_records:
|
|
||||||
logger.debug(
|
|
||||||
f"Removed record: {record.hostname} {record.type} {record.data}"
|
|
||||||
)
|
|
||||||
session.delete(record)
|
|
||||||
changes["removed"] += 1
|
|
||||||
|
|
||||||
# Handle SOA removal if needed
|
|
||||||
if existing_soa and not source_soa:
|
|
||||||
logger.debug(
|
|
||||||
f"Removed SOA record: {existing_soa.hostname} SOA {existing_soa.data}"
|
|
||||||
)
|
|
||||||
session.delete(existing_soa)
|
|
||||||
changes["removed"] += 1
|
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
total_changes = changes["added"] + changes["updated"] + changes["removed"]
|
total_changes = changes["added"] + changes["updated"] + changes["removed"]
|
||||||
if total_changes > 0:
|
if total_changes > 0:
|
||||||
@@ -240,43 +247,27 @@ class CoreDNSMySQLBackend(DNSBackend):
|
|||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
def _ensure_zone_exists(self, session, zone_name: str) -> Zone:
|
def _ensure_zone_exists(self, session, zone_name: str) -> Zone:
|
||||||
"""Ensure a zone exists in the database, creating it if necessary"""
|
"""Ensure a zone exists in the database, creating it if necessary."""
|
||||||
zone = session.execute(
|
zone = session.execute(
|
||||||
select(Zone).filter_by(zone_name=self.dot_fqdn(zone_name))
|
select(Zone).filter_by(zone_name=self.dot_fqdn(zone_name))
|
||||||
).scalar_one_or_none()
|
).scalar_one_or_none()
|
||||||
if not zone:
|
if not zone:
|
||||||
logger.debug(f"Creating new zone: {self.dot_fqdn(zone_name)}")
|
logger.debug(f"Creating new zone: {self.dot_fqdn(zone_name)}")
|
||||||
zone = Zone(zone_name=self.dot_fqdn(zone_name))
|
zone = Zone(
|
||||||
|
zone_name=self.dot_fqdn(zone_name),
|
||||||
|
managed_by="directadmin",
|
||||||
|
)
|
||||||
session.add(zone)
|
session.add(zone)
|
||||||
session.flush() # Get the zone ID
|
session.flush()
|
||||||
|
elif not zone.managed_by:
|
||||||
|
# Migrate pre-existing rows that were created before this field was added
|
||||||
|
zone.managed_by = "directadmin"
|
||||||
return zone
|
return zone
|
||||||
|
|
||||||
def _normalize_cname_data(self, zone_name: str, record_content: str) -> str:
|
|
||||||
"""Normalize CNAME record data to ensure consistent FQDN format.
|
|
||||||
|
|
||||||
This ensures CNAME targets are always stored as fully-qualified domain
|
|
||||||
names so that record comparison between the BIND zone source and the
|
|
||||||
database is deterministic.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
zone_name: The zone name for relative-name expansion
|
|
||||||
record_content: The raw CNAME target from the parsed zone
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The normalized CNAME target string
|
|
||||||
"""
|
|
||||||
if record_content.startswith("@"):
|
|
||||||
logger.debug(f"CNAME target starts with '@', replacing with zone FQDN")
|
|
||||||
record_content = self.dot_fqdn(zone_name)
|
|
||||||
elif not record_content.endswith("."):
|
|
||||||
logger.debug(f"CNAME target {record_content} is relative, appending zone")
|
|
||||||
record_content = ".".join([record_content, self.dot_fqdn(zone_name)])
|
|
||||||
return record_content
|
|
||||||
|
|
||||||
def _parse_zone_to_record_set(
|
def _parse_zone_to_record_set(
|
||||||
self, zone_name: str, zone_data: str
|
self, zone_name: str, zone_data: str
|
||||||
) -> Tuple[Set[Tuple[str, str, str, int]], Optional[Tuple[str, str, int]]]:
|
) -> Tuple[Set[Tuple[str, str, str, int]], Optional[Tuple[str, str, int]]]:
|
||||||
"""Parse a BIND zone file into a set of normalised record keys.
|
"""Parse a BIND zone file into a set of record keys.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple of:
|
Tuple of:
|
||||||
@@ -287,21 +278,27 @@ class CoreDNSMySQLBackend(DNSBackend):
|
|||||||
records: Set[Tuple[str, str, str, int]] = set()
|
records: Set[Tuple[str, str, str, int]] = set()
|
||||||
soa = None
|
soa = None
|
||||||
|
|
||||||
|
# Use the zone origin (if available) to expand relative names in RDATA
|
||||||
|
# back to absolute FQDNs. Without this, dnspython's default relativize=True
|
||||||
|
# behaviour turns in-zone targets like `wvvcc.co.nz.` into `@` in the
|
||||||
|
# stored data, which CoreDNS then serves incorrectly.
|
||||||
|
origin = dns_zone.origin
|
||||||
|
|
||||||
for name, ttl, rdata in dns_zone.iterate_rdatas():
|
for name, ttl, rdata in dns_zone.iterate_rdatas():
|
||||||
if rdata.rdclass != IN:
|
if rdata.rdclass != IN:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
record_name = str(name)
|
record_name = str(name)
|
||||||
record_type = rdata.rdtype.name
|
record_type = rdata.rdtype.name
|
||||||
record_content = rdata.to_text()
|
if origin is not None:
|
||||||
|
record_content = rdata.to_text(origin=origin, relativize=False)
|
||||||
|
else:
|
||||||
|
record_content = rdata.to_text()
|
||||||
|
|
||||||
if record_type == "SOA":
|
if record_type == "SOA":
|
||||||
soa = (record_name, record_content, ttl)
|
soa = (record_name, record_content, ttl)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if record_type == "CNAME":
|
|
||||||
record_content = self._normalize_cname_data(zone_name, record_content)
|
|
||||||
|
|
||||||
records.add((record_name, record_type, record_content, ttl))
|
records.add((record_name, record_type, record_content, ttl))
|
||||||
|
|
||||||
return records, soa
|
return records, soa
|
||||||
|
|||||||
@@ -70,7 +70,13 @@ class DirectAdminClient:
|
|||||||
if response is None:
|
if response is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if response.is_redirect or response.status_code in (301, 302, 303, 307, 308):
|
if response.is_redirect or response.status_code in (
|
||||||
|
301,
|
||||||
|
302,
|
||||||
|
303,
|
||||||
|
307,
|
||||||
|
308,
|
||||||
|
):
|
||||||
if self._cookies:
|
if self._cookies:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"[da:{self.hostname}] Still redirecting after session login — "
|
f"[da:{self.hostname}] Still redirecting after session login — "
|
||||||
@@ -155,6 +161,136 @@ class DirectAdminClient:
|
|||||||
logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}")
|
logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def post(
|
||||||
|
self, command: str, data: Optional[dict] = None
|
||||||
|
) -> Optional[requests.Response]:
|
||||||
|
"""Authenticated POST to any DA CMD_* endpoint."""
|
||||||
|
url = f"{self.scheme}://{self.hostname}:{self.port}/{command}"
|
||||||
|
kwargs: dict = dict(
|
||||||
|
data=data or {},
|
||||||
|
timeout=30,
|
||||||
|
verify=self.verify_ssl,
|
||||||
|
allow_redirects=False,
|
||||||
|
)
|
||||||
|
if self._cookies:
|
||||||
|
kwargs["cookies"] = self._cookies
|
||||||
|
else:
|
||||||
|
kwargs["auth"] = (self.username, self.password)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return requests.post(url, **kwargs)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(f"[da:{self.hostname}] POST {command} failed: {exc}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_extra_dns_servers(self) -> dict:
|
||||||
|
"""Return the Extra DNS server map from CMD_MULTI_SERVER (GET).
|
||||||
|
|
||||||
|
Returns a dict keyed by server hostname/IP, each value being the
|
||||||
|
per-server settings dict (dns, domain_check, port, user, ssl, …).
|
||||||
|
Returns ``{}`` on any error.
|
||||||
|
"""
|
||||||
|
resp = self.get("CMD_MULTI_SERVER", params={"json": "yes"})
|
||||||
|
if resp is None or resp.status_code != 200:
|
||||||
|
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER GET failed")
|
||||||
|
return {}
|
||||||
|
try:
|
||||||
|
return resp.json().get("servers", {})
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER parse error: {exc}")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def add_extra_dns_server(
|
||||||
|
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
|
||||||
|
) -> bool:
|
||||||
|
"""Register a new Extra DNS server via CMD_MULTI_SERVER action=add.
|
||||||
|
|
||||||
|
Returns ``True`` if DA reports success, ``False`` otherwise.
|
||||||
|
"""
|
||||||
|
resp = self.post(
|
||||||
|
"CMD_MULTI_SERVER",
|
||||||
|
data={
|
||||||
|
"action": "add",
|
||||||
|
"json": "yes",
|
||||||
|
"ip": ip,
|
||||||
|
"port": str(port),
|
||||||
|
"user": user,
|
||||||
|
"passwd": passwd,
|
||||||
|
"ssl": "yes" if ssl else "no",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if resp is None or resp.status_code != 200:
|
||||||
|
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add failed for {ip}")
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
result = resp.json()
|
||||||
|
if result.get("success"):
|
||||||
|
logger.info(f"[da:{self.hostname}] Added Extra DNS server {ip}")
|
||||||
|
return True
|
||||||
|
logger.error(
|
||||||
|
f"[da:{self.hostname}] CMD_MULTI_SERVER add error: {result.get('result', result)}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add parse error: {exc}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def ensure_extra_dns_server(
|
||||||
|
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
|
||||||
|
) -> bool:
|
||||||
|
"""Add (if absent) and configure a directdnsonly Extra DNS server.
|
||||||
|
|
||||||
|
Ensures the server is registered with ``dns=yes`` and
|
||||||
|
``domain_check=yes`` so DirectAdmin pushes zone updates to it.
|
||||||
|
Returns ``True`` if fully configured, ``False`` on any failure.
|
||||||
|
"""
|
||||||
|
servers = self.get_extra_dns_servers()
|
||||||
|
if ip not in servers:
|
||||||
|
if not self.add_extra_dns_server(ip, port, user, passwd, ssl):
|
||||||
|
return False
|
||||||
|
|
||||||
|
ssl_str = "yes" if ssl else "no"
|
||||||
|
resp = self.post(
|
||||||
|
"CMD_MULTI_SERVER",
|
||||||
|
data={
|
||||||
|
"action": "multiple",
|
||||||
|
"save": "yes",
|
||||||
|
"json": "yes",
|
||||||
|
"passwd": "",
|
||||||
|
"select0": ip,
|
||||||
|
f"port-{ip}": str(port),
|
||||||
|
f"user-{ip}": user,
|
||||||
|
f"ssl-{ip}": ssl_str,
|
||||||
|
f"dns-{ip}": "yes",
|
||||||
|
f"domain_check-{ip}": "yes",
|
||||||
|
f"user_check-{ip}": "no",
|
||||||
|
f"email-{ip}": "no",
|
||||||
|
f"show_all_users-{ip}": "no",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if resp is None or resp.status_code != 200:
|
||||||
|
logger.error(
|
||||||
|
f"[da:{self.hostname}] CMD_MULTI_SERVER save failed for {ip}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
result = resp.json()
|
||||||
|
if result.get("success"):
|
||||||
|
logger.info(
|
||||||
|
f"[da:{self.hostname}] Extra DNS server {ip} configured "
|
||||||
|
f"(dns=yes domain_check=yes)"
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
logger.error(
|
||||||
|
f"[da:{self.hostname}] CMD_MULTI_SERVER save error: {result.get('result', result)}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(
|
||||||
|
f"[da:{self.hostname}] CMD_MULTI_SERVER save parse error: {exc}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Internal
|
# Internal
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|||||||
@@ -25,8 +25,8 @@ class Domain(Base):
|
|||||||
domain = Column(String(255), unique=True)
|
domain = Column(String(255), unique=True)
|
||||||
hostname = Column(String(255))
|
hostname = Column(String(255))
|
||||||
username = Column(String(255))
|
username = Column(String(255))
|
||||||
zone_data = Column(Text, nullable=True) # last known zone file from DA
|
zone_data = Column(Text, nullable=True) # last known zone file from DA
|
||||||
zone_updated_at = Column(DateTime, nullable=True) # when zone_data was last stored
|
zone_updated_at = Column(DateTime, nullable=True) # when zone_data was last stored
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "<Domain(id='%s', domain='%s', hostname='%s', username='%s')>" % (
|
return "<Domain(id='%s', domain='%s', hostname='%s', username='%s')>" % (
|
||||||
|
|||||||
@@ -2,23 +2,37 @@
|
|||||||
"""Peer sync worker — exchanges zone_data between directdnsonly instances.
|
"""Peer sync worker — exchanges zone_data between directdnsonly instances.
|
||||||
|
|
||||||
Each node stores zone_data in its local SQLite DB after every successful
|
Each node stores zone_data in its local SQLite DB after every successful
|
||||||
backend write. When DirectAdmin pushes a zone to one node but the other
|
backend write. When DirectAdmin pushes a zone to one node but another
|
||||||
is temporarily offline, the offline node misses that zone_data.
|
is temporarily offline, the offline node misses that zone_data.
|
||||||
|
|
||||||
PeerSyncWorker corrects this by periodically comparing zone lists with
|
PeerSyncWorker corrects this by periodically comparing zone lists with
|
||||||
configured peers and fetching any zone_data that is newer or absent locally.
|
all known peers and fetching any zone_data that is newer or absent locally.
|
||||||
It only updates the local DB — it never writes directly to backends. The
|
It only updates the local DB — it never writes directly to backends. The
|
||||||
existing reconciler healing pass then detects missing zones and re-pushes
|
existing reconciler healing pass then detects missing zones and re-pushes
|
||||||
using the freshly synced zone_data.
|
using the freshly synced zone_data.
|
||||||
|
|
||||||
|
Mesh behaviour:
|
||||||
|
- Each node exposes /internal/peers listing the URLs it knows about
|
||||||
|
- During each sync pass, every peer is asked for its peer list; any URLs
|
||||||
|
not already known are added automatically (gossip-lite discovery)
|
||||||
|
- A three-node cluster therefore only needs a linear chain of initial
|
||||||
|
connections — nodes propagate awareness of each other on the first pass
|
||||||
|
|
||||||
|
Health tracking:
|
||||||
|
- Consecutive failures per peer are counted; after FAILURE_THRESHOLD
|
||||||
|
misses the peer is marked degraded and a warning is logged once
|
||||||
|
- On the next successful contact the peer is marked recovered
|
||||||
|
|
||||||
Safety properties:
|
Safety properties:
|
||||||
- If a peer is unreachable, skip it silently and retry next interval
|
- If a peer is unreachable, skip it and try next interval
|
||||||
- Only zone_data is synced — backend writes remain the sole responsibility
|
- Only zone_data is synced — backend writes remain the sole responsibility
|
||||||
of the local save queue worker
|
of the local save queue worker
|
||||||
- Newer zone_updated_at timestamp wins; local data is never overwritten
|
- Newer zone_updated_at timestamp wins; local data is never overwritten
|
||||||
with older peer data
|
with older peer data
|
||||||
|
- Peer discovery is best-effort and never fails a sync pass
|
||||||
"""
|
"""
|
||||||
import datetime
|
import datetime
|
||||||
|
import os
|
||||||
import threading
|
import threading
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
import requests
|
import requests
|
||||||
@@ -27,6 +41,9 @@ from sqlalchemy import select
|
|||||||
from directdnsonly.app.db import connect
|
from directdnsonly.app.db import connect
|
||||||
from directdnsonly.app.db.models import Domain
|
from directdnsonly.app.db.models import Domain
|
||||||
|
|
||||||
|
# Consecutive failures before a peer is logged as degraded
|
||||||
|
FAILURE_THRESHOLD = 3
|
||||||
|
|
||||||
|
|
||||||
class PeerSyncWorker:
|
class PeerSyncWorker:
|
||||||
"""Periodically fetches zone_data from peer directdnsonly instances and
|
"""Periodically fetches zone_data from peer directdnsonly instances and
|
||||||
@@ -36,10 +53,58 @@ class PeerSyncWorker:
|
|||||||
def __init__(self, peer_sync_config: dict):
|
def __init__(self, peer_sync_config: dict):
|
||||||
self.enabled = peer_sync_config.get("enabled", False)
|
self.enabled = peer_sync_config.get("enabled", False)
|
||||||
self.interval_seconds = peer_sync_config.get("interval_minutes", 15) * 60
|
self.interval_seconds = peer_sync_config.get("interval_minutes", 15) * 60
|
||||||
self.peers = peer_sync_config.get("peers") or []
|
self.peers = list(peer_sync_config.get("peers") or [])
|
||||||
|
|
||||||
|
# Per-peer health state: url -> {consecutive_failures, healthy, last_seen}
|
||||||
|
self._peer_health: dict = {}
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Env-var peer injection
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Original single-peer vars (backward compat):
|
||||||
|
# DADNS_PEER_SYNC_PEER_URL / _USERNAME / _PASSWORD
|
||||||
|
# Numbered multi-peer vars (new):
|
||||||
|
# DADNS_PEER_SYNC_PEER_1_URL / _USERNAME / _PASSWORD
|
||||||
|
# DADNS_PEER_SYNC_PEER_2_URL / ... (up to 9)
|
||||||
|
known_urls = {p.get("url") for p in self.peers}
|
||||||
|
|
||||||
|
env_candidates = []
|
||||||
|
|
||||||
|
single_url = os.environ.get("DADNS_PEER_SYNC_PEER_URL", "").strip()
|
||||||
|
if single_url:
|
||||||
|
env_candidates.append({
|
||||||
|
"url": single_url,
|
||||||
|
"username": os.environ.get("DADNS_PEER_SYNC_PEER_USERNAME", "peersync"),
|
||||||
|
"password": os.environ.get("DADNS_PEER_SYNC_PEER_PASSWORD", ""),
|
||||||
|
})
|
||||||
|
|
||||||
|
for i in range(1, 10):
|
||||||
|
numbered_url = os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_URL", "").strip()
|
||||||
|
if not numbered_url:
|
||||||
|
break
|
||||||
|
env_candidates.append({
|
||||||
|
"url": numbered_url,
|
||||||
|
"username": os.environ.get(
|
||||||
|
f"DADNS_PEER_SYNC_PEER_{i}_USERNAME", "peersync"
|
||||||
|
),
|
||||||
|
"password": os.environ.get(f"DADNS_PEER_SYNC_PEER_{i}_PASSWORD", ""),
|
||||||
|
})
|
||||||
|
|
||||||
|
for candidate in env_candidates:
|
||||||
|
if candidate["url"] not in known_urls:
|
||||||
|
self.peers.append(candidate)
|
||||||
|
known_urls.add(candidate["url"])
|
||||||
|
logger.debug(
|
||||||
|
f"[peer_sync] Added peer from env vars: {candidate['url']}"
|
||||||
|
)
|
||||||
|
|
||||||
self._stop_event = threading.Event()
|
self._stop_event = threading.Event()
|
||||||
self._thread = None
|
self._thread = None
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Lifecycle
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
if not self.enabled:
|
if not self.enabled:
|
||||||
logger.info("Peer sync disabled — skipping")
|
logger.info("Peer sync disabled — skipping")
|
||||||
@@ -70,6 +135,70 @@ class PeerSyncWorker:
|
|||||||
def is_alive(self):
|
def is_alive(self):
|
||||||
return self._thread is not None and self._thread.is_alive()
|
return self._thread is not None and self._thread.is_alive()
|
||||||
|
|
||||||
|
def get_peer_urls(self) -> list:
|
||||||
|
"""Return the current list of known peer URLs.
|
||||||
|
Exposed via /internal/peers so other nodes can discover this node's mesh."""
|
||||||
|
return [p["url"] for p in self.peers if p.get("url")]
|
||||||
|
|
||||||
|
def get_peer_status(self) -> dict:
|
||||||
|
"""Return peer health summary for the /status endpoint."""
|
||||||
|
peers = []
|
||||||
|
for peer in self.peers:
|
||||||
|
url = peer.get("url", "")
|
||||||
|
h = self._peer_health.get(url, {})
|
||||||
|
last_seen = h.get("last_seen")
|
||||||
|
peers.append({
|
||||||
|
"url": url,
|
||||||
|
"healthy": h.get("healthy", True),
|
||||||
|
"consecutive_failures": h.get("consecutive_failures", 0),
|
||||||
|
"last_seen": last_seen.isoformat() if last_seen else None,
|
||||||
|
})
|
||||||
|
healthy = sum(1 for p in peers if p["healthy"])
|
||||||
|
return {
|
||||||
|
"enabled": self.enabled,
|
||||||
|
"alive": self.is_alive,
|
||||||
|
"interval_minutes": self.interval_seconds // 60,
|
||||||
|
"peers": peers,
|
||||||
|
"total": len(peers),
|
||||||
|
"healthy": healthy,
|
||||||
|
"degraded": len(peers) - healthy,
|
||||||
|
}
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Health tracking
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _health(self, url: str) -> dict:
|
||||||
|
return self._peer_health.setdefault(
|
||||||
|
url, {"consecutive_failures": 0, "healthy": True, "last_seen": None}
|
||||||
|
)
|
||||||
|
|
||||||
|
def _record_success(self, url: str):
|
||||||
|
h = self._health(url)
|
||||||
|
recovered = not h["healthy"]
|
||||||
|
h.update(
|
||||||
|
consecutive_failures=0,
|
||||||
|
healthy=True,
|
||||||
|
last_seen=datetime.datetime.utcnow(),
|
||||||
|
)
|
||||||
|
if recovered:
|
||||||
|
logger.info(f"[peer_sync] {url}: peer recovered")
|
||||||
|
|
||||||
|
def _record_failure(self, url: str, exc):
|
||||||
|
h = self._health(url)
|
||||||
|
h["consecutive_failures"] += 1
|
||||||
|
if h["healthy"] and h["consecutive_failures"] >= FAILURE_THRESHOLD:
|
||||||
|
h["healthy"] = False
|
||||||
|
logger.warning(
|
||||||
|
f"[peer_sync] {url}: marked degraded after {FAILURE_THRESHOLD} "
|
||||||
|
f"consecutive failures — {exc}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.debug(
|
||||||
|
f"[peer_sync] {url}: unreachable "
|
||||||
|
f"(failure #{h['consecutive_failures']}) — {exc}"
|
||||||
|
)
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Internal
|
# Internal
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
@@ -82,15 +211,49 @@ class PeerSyncWorker:
|
|||||||
|
|
||||||
def _sync_all(self):
|
def _sync_all(self):
|
||||||
logger.debug(f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)")
|
logger.debug(f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)")
|
||||||
for peer in self.peers:
|
# Iterate over a snapshot — _discover_peers_from may grow self.peers
|
||||||
|
for peer in list(self.peers):
|
||||||
url = peer.get("url")
|
url = peer.get("url")
|
||||||
if not url:
|
if not url:
|
||||||
logger.warning("[peer_sync] Peer config missing url — skipping")
|
logger.warning("[peer_sync] Peer config missing url — skipping")
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
self._sync_from_peer(peer)
|
self._sync_from_peer(peer)
|
||||||
|
self._discover_peers_from(peer)
|
||||||
|
self._record_success(url)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning(f"[peer_sync] Skipping unreachable peer {url}: {exc}")
|
self._record_failure(url, exc)
|
||||||
|
|
||||||
|
def _discover_peers_from(self, peer: dict):
|
||||||
|
"""Fetch peer's known peer list and add any new nodes for mesh expansion.
|
||||||
|
|
||||||
|
This is best-effort — failures are silently swallowed so they never
|
||||||
|
interrupt the main sync pass."""
|
||||||
|
url = peer.get("url", "").rstrip("/")
|
||||||
|
username = peer.get("username")
|
||||||
|
password = peer.get("password")
|
||||||
|
auth = (username, password) if username else None
|
||||||
|
try:
|
||||||
|
resp = requests.get(f"{url}/internal/peers", auth=auth, timeout=5)
|
||||||
|
if resp.status_code != 200:
|
||||||
|
return
|
||||||
|
remote_urls = resp.json() # list of URL strings
|
||||||
|
known_urls = {p.get("url") for p in self.peers}
|
||||||
|
for remote_url in remote_urls:
|
||||||
|
if remote_url and remote_url not in known_urls:
|
||||||
|
# Inherit credentials from the introducing peer — in practice
|
||||||
|
# all cluster nodes share the same peer_sync auth credentials.
|
||||||
|
self.peers.append({
|
||||||
|
"url": remote_url,
|
||||||
|
"username": username,
|
||||||
|
"password": password,
|
||||||
|
})
|
||||||
|
known_urls.add(remote_url)
|
||||||
|
logger.info(
|
||||||
|
f"[peer_sync] Discovered new peer {remote_url} via {url}"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass # discovery is best-effort
|
||||||
|
|
||||||
def _sync_from_peer(self, peer: dict):
|
def _sync_from_peer(self, peer: dict):
|
||||||
url = peer.get("url", "").rstrip("/")
|
url = peer.get("url", "").rstrip("/")
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
import datetime
|
||||||
import threading
|
import threading
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
@@ -42,6 +43,17 @@ class ReconciliationWorker:
|
|||||||
self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60
|
self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60
|
||||||
self._stop_event = threading.Event()
|
self._stop_event = threading.Event()
|
||||||
self._thread = None
|
self._thread = None
|
||||||
|
self._last_run: dict = {}
|
||||||
|
|
||||||
|
def get_status(self) -> dict:
|
||||||
|
"""Return reconciler configuration and last-run statistics."""
|
||||||
|
return {
|
||||||
|
"enabled": self.enabled,
|
||||||
|
"alive": self.is_alive,
|
||||||
|
"dry_run": self.dry_run,
|
||||||
|
"interval_minutes": self.interval_seconds // 60,
|
||||||
|
"last_run": dict(self._last_run),
|
||||||
|
}
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
if not self.enabled:
|
if not self.enabled:
|
||||||
@@ -104,11 +116,18 @@ class ReconciliationWorker:
|
|||||||
self._reconcile_all()
|
self._reconcile_all()
|
||||||
|
|
||||||
def _reconcile_all(self):
|
def _reconcile_all(self):
|
||||||
|
started_at = datetime.datetime.utcnow()
|
||||||
|
self._last_run = {"status": "running", "started_at": started_at.isoformat()}
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[reconciler] Starting reconciliation pass across "
|
f"[reconciler] Starting reconciliation pass across "
|
||||||
f"{len(self.servers)} server(s)"
|
f"{len(self.servers)} server(s)"
|
||||||
)
|
)
|
||||||
total_queued = 0
|
total_queued = 0
|
||||||
|
da_servers_polled = 0
|
||||||
|
da_servers_unreachable = 0
|
||||||
|
migrated = 0
|
||||||
|
backfilled = 0
|
||||||
|
zones_in_db = 0
|
||||||
|
|
||||||
# Build a map of all domains seen on all DA servers: domain -> hostname
|
# Build a map of all domains seen on all DA servers: domain -> hostname
|
||||||
all_da_domains: dict = {}
|
all_da_domains: dict = {}
|
||||||
@@ -126,23 +145,26 @@ class ReconciliationWorker:
|
|||||||
ssl=server.get("ssl", True),
|
ssl=server.get("ssl", True),
|
||||||
verify_ssl=self.verify_ssl,
|
verify_ssl=self.verify_ssl,
|
||||||
)
|
)
|
||||||
|
da_servers_polled += 1
|
||||||
da_domains = client.list_domains(ipp=self.ipp)
|
da_domains = client.list_domains(ipp=self.ipp)
|
||||||
if da_domains is not None:
|
if da_domains is not None:
|
||||||
for d in da_domains:
|
for d in da_domains:
|
||||||
all_da_domains[d] = hostname
|
all_da_domains[d] = hostname
|
||||||
|
else:
|
||||||
|
da_servers_unreachable += 1
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"[reconciler] {hostname}: "
|
f"[reconciler] {hostname}: "
|
||||||
f"{len(da_domains) if da_domains else 0} active domain(s) in DA"
|
f"{len(da_domains) if da_domains else 0} active domain(s) in DA"
|
||||||
)
|
)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}")
|
logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}")
|
||||||
|
da_servers_unreachable += 1
|
||||||
|
|
||||||
# Compare local DB against what DA reported; update masters and queue deletes
|
# Compare local DB against what DA reported; update masters and queue deletes
|
||||||
session = connect()
|
session = connect()
|
||||||
try:
|
try:
|
||||||
all_local_domains = session.execute(select(Domain)).scalars().all()
|
all_local_domains = session.execute(select(Domain)).scalars().all()
|
||||||
migrated = 0
|
zones_in_db = len(all_local_domains)
|
||||||
backfilled = 0
|
|
||||||
known_servers = {s.get("hostname") for s in self.servers}
|
known_servers = {s.get("hostname") for s in self.servers}
|
||||||
for record in all_local_domains:
|
for record in all_local_domains:
|
||||||
domain = record.domain
|
domain = record.domain
|
||||||
@@ -209,10 +231,31 @@ class ReconciliationWorker:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Option C: heal backends that are missing zones
|
# Option C: heal backends that are missing zones
|
||||||
|
zones_healed = 0
|
||||||
if self.save_queue is not None and self.backend_registry is not None:
|
if self.save_queue is not None and self.backend_registry is not None:
|
||||||
self._heal_backends()
|
zones_healed = self._heal_backends()
|
||||||
|
|
||||||
def _heal_backends(self):
|
completed_at = datetime.datetime.utcnow()
|
||||||
|
self._last_run = {
|
||||||
|
"status": "ok",
|
||||||
|
"started_at": started_at.isoformat(),
|
||||||
|
"completed_at": completed_at.isoformat(),
|
||||||
|
"duration_seconds": round(
|
||||||
|
(completed_at - started_at).total_seconds(), 1
|
||||||
|
),
|
||||||
|
"da_servers_polled": da_servers_polled,
|
||||||
|
"da_servers_unreachable": da_servers_unreachable,
|
||||||
|
"zones_in_da": len(all_da_domains),
|
||||||
|
"zones_in_db": zones_in_db,
|
||||||
|
"orphans_found": total_queued,
|
||||||
|
"orphans_queued": total_queued if not self.dry_run else 0,
|
||||||
|
"hostnames_backfilled": backfilled,
|
||||||
|
"hostnames_migrated": migrated,
|
||||||
|
"zones_healed": zones_healed,
|
||||||
|
"dry_run": self.dry_run,
|
||||||
|
}
|
||||||
|
|
||||||
|
def _heal_backends(self) -> int:
|
||||||
"""Check every backend for zone presence and re-queue any zone that is
|
"""Check every backend for zone presence and re-queue any zone that is
|
||||||
missing from one or more backends, using the stored zone_data as the
|
missing from one or more backends, using the stored zone_data as the
|
||||||
authoritative source. This corrects backends that missed pushes due to
|
authoritative source. This corrects backends that missed pushes due to
|
||||||
@@ -220,9 +263,10 @@ class ReconciliationWorker:
|
|||||||
"""
|
"""
|
||||||
backends = self.backend_registry.get_available_backends()
|
backends = self.backend_registry.get_available_backends()
|
||||||
if not backends:
|
if not backends:
|
||||||
return
|
return 0
|
||||||
|
|
||||||
session = connect()
|
session = connect()
|
||||||
|
healed = 0
|
||||||
try:
|
try:
|
||||||
domains = session.execute(
|
domains = session.execute(
|
||||||
select(Domain).where(Domain.zone_data.isnot(None))
|
select(Domain).where(Domain.zone_data.isnot(None))
|
||||||
@@ -231,9 +275,7 @@ class ReconciliationWorker:
|
|||||||
logger.debug(
|
logger.debug(
|
||||||
"[reconciler] Healing pass: no zone_data stored yet — skipping"
|
"[reconciler] Healing pass: no zone_data stored yet — skipping"
|
||||||
)
|
)
|
||||||
return
|
return 0
|
||||||
|
|
||||||
healed = 0
|
|
||||||
for record in domains:
|
for record in domains:
|
||||||
missing = []
|
missing = []
|
||||||
for backend_name, backend in backends.items():
|
for backend_name, backend in backends.items():
|
||||||
@@ -277,3 +319,4 @@ class ReconciliationWorker:
|
|||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
session.close()
|
session.close()
|
||||||
|
return healed
|
||||||
|
|||||||
@@ -69,6 +69,8 @@ def load_config() -> Vyper:
|
|||||||
# Peer sync defaults
|
# Peer sync defaults
|
||||||
v.set_default("peer_sync.enabled", False)
|
v.set_default("peer_sync.enabled", False)
|
||||||
v.set_default("peer_sync.interval_minutes", 15)
|
v.set_default("peer_sync.interval_minutes", 15)
|
||||||
|
v.set_default("peer_sync.auth_username", "peersync")
|
||||||
|
v.set_default("peer_sync.auth_password", "changeme")
|
||||||
|
|
||||||
# Read configuration
|
# Read configuration
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -3,6 +3,20 @@ timezone: Pacific/Auckland
|
|||||||
log_level: INFO
|
log_level: INFO
|
||||||
queue_location: ./data/queues
|
queue_location: ./data/queues
|
||||||
|
|
||||||
|
# Application datastore — stores domain index and zone_data for healing/peer-sync.
|
||||||
|
# SQLite (default) requires no extra dependencies and is fine for single-node setups.
|
||||||
|
# MySQL is recommended for multi-node deployments with a shared datastore.
|
||||||
|
datastore:
|
||||||
|
type: sqlite
|
||||||
|
db_location: ./data/directdnsonly.db
|
||||||
|
# --- MySQL ---
|
||||||
|
# type: mysql
|
||||||
|
# host: "127.0.0.1"
|
||||||
|
# port: "3306"
|
||||||
|
# name: "directdnsonly"
|
||||||
|
# user: "directdnsonly"
|
||||||
|
# pass: "changeme"
|
||||||
|
|
||||||
app:
|
app:
|
||||||
auth_username: directdnsonly
|
auth_username: directdnsonly
|
||||||
auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var
|
auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from app.backends import BackendRegistry
|
|||||||
from app.api.admin import DNSAdminAPI
|
from app.api.admin import DNSAdminAPI
|
||||||
from app.api.health import HealthAPI
|
from app.api.health import HealthAPI
|
||||||
from app.api.internal import InternalAPI
|
from app.api.internal import InternalAPI
|
||||||
|
from app.api.status import StatusAPI
|
||||||
from app import configure_logging
|
from app import configure_logging
|
||||||
from worker import WorkerManager
|
from worker import WorkerManager
|
||||||
from directdnsonly.config import config
|
from directdnsonly.config import config
|
||||||
@@ -90,6 +91,17 @@ def main():
|
|||||||
if config.get_string("app.log_level").upper() != "DEBUG":
|
if config.get_string("app.log_level").upper() != "DEBUG":
|
||||||
cherrypy.log.access_log.propagate = False
|
cherrypy.log.access_log.propagate = False
|
||||||
|
|
||||||
|
# Peer sync auth — separate credentials from the DA-facing API so a
|
||||||
|
# compromised peer node cannot push zones or access the admin endpoints.
|
||||||
|
peer_user_password_dict = {
|
||||||
|
config.get_string("peer_sync.auth_username"): config.get_string(
|
||||||
|
"peer_sync.auth_password"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
peer_check_password = cherrypy.lib.auth_basic.checkpassword_dict(
|
||||||
|
peer_user_password_dict
|
||||||
|
)
|
||||||
|
|
||||||
# Mount applications
|
# Mount applications
|
||||||
root = Root()
|
root = Root()
|
||||||
root = DNSAdminAPI(
|
root = DNSAdminAPI(
|
||||||
@@ -98,12 +110,18 @@ def main():
|
|||||||
backend_registry=registry,
|
backend_registry=registry,
|
||||||
)
|
)
|
||||||
root.health = HealthAPI(registry)
|
root.health = HealthAPI(registry)
|
||||||
root.internal = InternalAPI()
|
root.internal = InternalAPI(peer_syncer=worker_manager._peer_syncer)
|
||||||
|
root.status = StatusAPI(worker_manager)
|
||||||
|
|
||||||
# Add queue status endpoint
|
# Add queue status endpoint (debug)
|
||||||
root.queue_status = lambda: worker_manager.queue_status()
|
root.queue_status = lambda: worker_manager.queue_status()
|
||||||
|
|
||||||
cherrypy.tree.mount(root, "/")
|
# Override auth for /internal so peers use their own credentials
|
||||||
|
cherrypy.tree.mount(root, "/", config={
|
||||||
|
"/internal": {
|
||||||
|
"tools.auth_basic.checkpassword": peer_check_password,
|
||||||
|
}
|
||||||
|
})
|
||||||
cherrypy.engine.start()
|
cherrypy.engine.start()
|
||||||
logger.success(f"Server started on port {config.get_int('app.listen_port')}")
|
logger.success(f"Server started on port {config.get_int('app.listen_port')}")
|
||||||
|
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ class WorkerManager:
|
|||||||
self._peer_syncer = None
|
self._peer_syncer = None
|
||||||
self._reconciliation_config = reconciliation_config or {}
|
self._reconciliation_config = reconciliation_config or {}
|
||||||
self._peer_sync_config = peer_sync_config or {}
|
self._peer_sync_config = peer_sync_config or {}
|
||||||
|
self._dead_letter_count = 0
|
||||||
|
|
||||||
try:
|
try:
|
||||||
os.makedirs(queue_path, exist_ok=True)
|
os.makedirs(queue_path, exist_ok=True)
|
||||||
@@ -62,93 +63,90 @@ class WorkerManager:
|
|||||||
logger.info("Save queue worker started")
|
logger.info("Save queue worker started")
|
||||||
session = connect()
|
session = connect()
|
||||||
|
|
||||||
batch_start = None
|
|
||||||
batch_processed = 0
|
|
||||||
batch_failed = 0
|
|
||||||
|
|
||||||
while self._running:
|
while self._running:
|
||||||
|
# Block until at least one item is available
|
||||||
try:
|
try:
|
||||||
item = self.save_queue.get(block=True, timeout=5)
|
item = self.save_queue.get(block=True, timeout=5)
|
||||||
|
|
||||||
if batch_start is None:
|
|
||||||
batch_start = time.monotonic()
|
|
||||||
batch_processed = 0
|
|
||||||
batch_failed = 0
|
|
||||||
pending = self.save_queue.qsize()
|
|
||||||
logger.info(
|
|
||||||
f"📥 Batch started — {pending + 1} zone(s) queued for processing"
|
|
||||||
)
|
|
||||||
|
|
||||||
domain = item.get("domain", "unknown")
|
|
||||||
is_retry = item.get("source") in ("retry", "reconciler_heal")
|
|
||||||
target_backends = item.get("failed_backends") # None = all backends
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
f"Processing zone update for {domain}"
|
|
||||||
+ (f" [retry #{item.get('retry_count', 0)}]" if is_retry else "")
|
|
||||||
+ (f" [backends: {target_backends}]" if target_backends else "")
|
|
||||||
)
|
|
||||||
|
|
||||||
if not is_retry and not check_zone_exists(domain):
|
|
||||||
put_zone_index(domain, item.get("hostname"), item.get("username"))
|
|
||||||
|
|
||||||
if not all(k in item for k in ["domain", "zone_file"]):
|
|
||||||
logger.error(f"Invalid queue item: {item}")
|
|
||||||
self.save_queue.task_done()
|
|
||||||
batch_failed += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
backends = self.backend_registry.get_available_backends()
|
|
||||||
if target_backends:
|
|
||||||
backends = {
|
|
||||||
k: v for k, v in backends.items() if k in target_backends
|
|
||||||
}
|
|
||||||
if not backends:
|
|
||||||
logger.warning("No target backends available for this item!")
|
|
||||||
self.save_queue.task_done()
|
|
||||||
batch_failed += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
if len(backends) > 1:
|
|
||||||
failed = self._process_backends_parallel(backends, item, session)
|
|
||||||
else:
|
|
||||||
failed = set()
|
|
||||||
for backend_name, backend in backends.items():
|
|
||||||
if not self._process_single_backend(
|
|
||||||
backend_name, backend, item, session
|
|
||||||
):
|
|
||||||
failed.add(backend_name)
|
|
||||||
|
|
||||||
if failed:
|
|
||||||
self._schedule_retry(item, failed)
|
|
||||||
batch_failed += 1
|
|
||||||
else:
|
|
||||||
# Successful write — persist zone_data for Option C healing
|
|
||||||
self._store_zone_data(session, domain, item["zone_file"])
|
|
||||||
batch_processed += 1
|
|
||||||
|
|
||||||
self.save_queue.task_done()
|
|
||||||
logger.debug(f"Completed processing for {domain}")
|
|
||||||
|
|
||||||
except Empty:
|
except Empty:
|
||||||
if batch_start is not None:
|
|
||||||
elapsed = time.monotonic() - batch_start
|
|
||||||
total = batch_processed + batch_failed
|
|
||||||
rate = batch_processed / elapsed if elapsed > 0 else 0
|
|
||||||
logger.success(
|
|
||||||
f"📦 Batch complete — {batch_processed}/{total} zone(s) "
|
|
||||||
f"processed successfully in {elapsed:.1f}s "
|
|
||||||
f"({rate:.1f} zones/sec)"
|
|
||||||
+ (f", {batch_failed} failed" if batch_failed else "")
|
|
||||||
)
|
|
||||||
batch_start = None
|
|
||||||
batch_processed = 0
|
|
||||||
batch_failed = 0
|
|
||||||
continue
|
continue
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Unexpected worker error: {e}")
|
# Open a batch and keep processing until the queue is empty
|
||||||
batch_failed += 1
|
batch_start = time.monotonic()
|
||||||
time.sleep(1)
|
batch_processed = 0
|
||||||
|
batch_failed = 0
|
||||||
|
logger.info("📥 Batch started")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
domain = item.get("domain", "unknown")
|
||||||
|
is_retry = item.get("source") in ("retry", "reconciler_heal")
|
||||||
|
target_backends = item.get("failed_backends") # None = all backends
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"Processing zone update for {domain}"
|
||||||
|
+ (f" [retry #{item.get('retry_count', 0)}]" if is_retry else "")
|
||||||
|
+ (f" [backends: {target_backends}]" if target_backends else "")
|
||||||
|
)
|
||||||
|
|
||||||
|
if not is_retry and not check_zone_exists(domain):
|
||||||
|
put_zone_index(domain, item.get("hostname"), item.get("username"))
|
||||||
|
|
||||||
|
if not all(k in item for k in ["domain", "zone_file"]):
|
||||||
|
logger.error(f"Invalid queue item: {item}")
|
||||||
|
self.save_queue.task_done()
|
||||||
|
batch_failed += 1
|
||||||
|
else:
|
||||||
|
backends = self.backend_registry.get_available_backends()
|
||||||
|
if target_backends:
|
||||||
|
backends = {
|
||||||
|
k: v for k, v in backends.items() if k in target_backends
|
||||||
|
}
|
||||||
|
if not backends:
|
||||||
|
logger.warning("No target backends available for this item!")
|
||||||
|
self.save_queue.task_done()
|
||||||
|
batch_failed += 1
|
||||||
|
else:
|
||||||
|
if len(backends) > 1:
|
||||||
|
failed = self._process_backends_parallel(backends, item, session)
|
||||||
|
else:
|
||||||
|
failed = set()
|
||||||
|
for backend_name, backend in backends.items():
|
||||||
|
if not self._process_single_backend(
|
||||||
|
backend_name, backend, item, session
|
||||||
|
):
|
||||||
|
failed.add(backend_name)
|
||||||
|
|
||||||
|
if failed:
|
||||||
|
self._schedule_retry(item, failed)
|
||||||
|
batch_failed += 1
|
||||||
|
else:
|
||||||
|
self._store_zone_data(session, domain, item["zone_file"])
|
||||||
|
batch_processed += 1
|
||||||
|
|
||||||
|
self.save_queue.task_done()
|
||||||
|
logger.debug(f"Completed processing for {domain}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected worker error processing {item.get('domain', '?')}: {e}")
|
||||||
|
batch_failed += 1
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Check immediately for the next item — keep batch open while
|
||||||
|
# more work is queued; close it only when the queue is empty.
|
||||||
|
try:
|
||||||
|
item = self.save_queue.get_nowait()
|
||||||
|
except Empty:
|
||||||
|
break
|
||||||
|
|
||||||
|
elapsed = time.monotonic() - batch_start
|
||||||
|
total = batch_processed + batch_failed
|
||||||
|
rate = batch_processed / elapsed if elapsed > 0 else 0
|
||||||
|
logger.success(
|
||||||
|
f"📦 Batch complete — {batch_processed}/{total} zone(s) "
|
||||||
|
f"processed successfully in {elapsed:.1f}s "
|
||||||
|
f"({rate:.1f} zones/sec)"
|
||||||
|
+ (f", {batch_failed} failed" if batch_failed else "")
|
||||||
|
)
|
||||||
|
|
||||||
def _process_single_backend(self, backend_name, backend, item, session) -> bool:
|
def _process_single_backend(self, backend_name, backend, item, session) -> bool:
|
||||||
"""Write a zone to one backend. Returns True on success, False on failure."""
|
"""Write a zone to one backend. Returns True on success, False on failure."""
|
||||||
@@ -208,6 +206,7 @@ class WorkerManager:
|
|||||||
Discards to dead-letter after MAX_RETRIES attempts."""
|
Discards to dead-letter after MAX_RETRIES attempts."""
|
||||||
retry_count = item.get("retry_count", 0) + 1
|
retry_count = item.get("retry_count", 0) + 1
|
||||||
if retry_count > MAX_RETRIES:
|
if retry_count > MAX_RETRIES:
|
||||||
|
self._dead_letter_count += 1
|
||||||
logger.error(
|
logger.error(
|
||||||
f"[retry] Dead-letter: {item['domain']} failed on "
|
f"[retry] Dead-letter: {item['domain']} failed on "
|
||||||
f"{failed_backends} after {MAX_RETRIES} attempts — giving up"
|
f"{failed_backends} after {MAX_RETRIES} attempts — giving up"
|
||||||
@@ -499,18 +498,24 @@ class WorkerManager:
|
|||||||
logger.info("Workers stopped")
|
logger.info("Workers stopped")
|
||||||
|
|
||||||
def queue_status(self):
|
def queue_status(self):
|
||||||
|
reconciler = (
|
||||||
|
self._reconciler.get_status()
|
||||||
|
if self._reconciler
|
||||||
|
else {"enabled": False, "alive": False, "last_run": {}}
|
||||||
|
)
|
||||||
|
peer_sync = (
|
||||||
|
self._peer_syncer.get_peer_status()
|
||||||
|
if self._peer_syncer
|
||||||
|
else {"enabled": False, "alive": False, "peers": [], "total": 0, "healthy": 0, "degraded": 0}
|
||||||
|
)
|
||||||
return {
|
return {
|
||||||
"save_queue_size": self.save_queue.qsize(),
|
"save_queue_size": self.save_queue.qsize(),
|
||||||
"delete_queue_size": self.delete_queue.qsize(),
|
"delete_queue_size": self.delete_queue.qsize(),
|
||||||
"retry_queue_size": self.retry_queue.qsize(),
|
"retry_queue_size": self.retry_queue.qsize(),
|
||||||
"save_worker_alive": self._save_thread and self._save_thread.is_alive(),
|
"dead_letters": self._dead_letter_count,
|
||||||
"delete_worker_alive": self._delete_thread
|
"save_worker_alive": bool(self._save_thread and self._save_thread.is_alive()),
|
||||||
and self._delete_thread.is_alive(),
|
"delete_worker_alive": bool(self._delete_thread and self._delete_thread.is_alive()),
|
||||||
"retry_worker_alive": self._retry_thread and self._retry_thread.is_alive(),
|
"retry_worker_alive": bool(self._retry_thread and self._retry_thread.is_alive()),
|
||||||
"reconciler_alive": (
|
"reconciler": reconciler,
|
||||||
self._reconciler.is_alive if self._reconciler else False
|
"peer_sync": peer_sync,
|
||||||
),
|
|
||||||
"peer_syncer_alive": (
|
|
||||||
self._peer_syncer.is_alive if self._peer_syncer else False
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,6 +20,9 @@ dependencies = [
|
|||||||
"requests (>=2.32.0,<3.0.0)",
|
"requests (>=2.32.0,<3.0.0)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[project.scripts]
|
||||||
|
dadns = "directdnsonly.__main__:run"
|
||||||
|
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
package-mode = true
|
package-mode = true
|
||||||
|
|
||||||
|
|||||||
@@ -1,12 +1,33 @@
|
|||||||
CREATE TABLE IF NOT EXISTS `records` (
|
-- DirectDNSOnly — CoreDNS MySQL schema
|
||||||
`id` int(11) NOT NULL AUTO_INCREMENT,
|
-- Compatible with cybercinch/coredns_mysql_extend
|
||||||
`zone` varchar(255) NOT NULL,
|
--
|
||||||
`name` varchar(255) NOT NULL,
|
-- managed_by values:
|
||||||
`ttl` int(11) DEFAULT NULL,
|
-- 'directadmin' zone is managed via directdnsonly / DirectAdmin push
|
||||||
`type` varchar(10) NOT NULL,
|
-- 'direct' zone was created directly (not via DA)
|
||||||
`data` text NOT NULL,
|
-- NULL legacy row created before this column was added
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS `zones` (
|
||||||
|
`id` int(11) NOT NULL AUTO_INCREMENT,
|
||||||
|
`zone_name` varchar(255) NOT NULL,
|
||||||
|
`managed_by` varchar(255) DEFAULT NULL,
|
||||||
PRIMARY KEY (`id`),
|
PRIMARY KEY (`id`),
|
||||||
KEY `idx_zone` (`zone`),
|
UNIQUE KEY `uq_zone_name` (`zone_name`)
|
||||||
KEY `idx_name` (`name`),
|
|
||||||
KEY `idx_type` (`type`)
|
|
||||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS `records` (
|
||||||
|
`id` int(11) NOT NULL AUTO_INCREMENT,
|
||||||
|
`zone_id` int(11) NOT NULL,
|
||||||
|
`hostname` varchar(255) NOT NULL,
|
||||||
|
`type` varchar(10) NOT NULL,
|
||||||
|
`data` text NOT NULL,
|
||||||
|
`ttl` int(11) DEFAULT NULL,
|
||||||
|
`online` tinyint(1) NOT NULL DEFAULT 0,
|
||||||
|
PRIMARY KEY (`id`),
|
||||||
|
KEY `idx_zone_id` (`zone_id`),
|
||||||
|
KEY `idx_hostname` (`hostname`),
|
||||||
|
CONSTRAINT `fk_records_zone` FOREIGN KEY (`zone_id`) REFERENCES `zones` (`id`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||||
|
|
||||||
|
-- Migration: add managed_by to an existing installation
|
||||||
|
-- ALTER TABLE `zones` ADD COLUMN `managed_by` varchar(255) DEFAULT NULL;
|
||||||
|
-- UPDATE `zones` SET `managed_by` = 'directadmin' WHERE `managed_by` IS NULL;
|
||||||
|
|||||||
@@ -38,4 +38,5 @@ def patch_connect(db_session, monkeypatch):
|
|||||||
monkeypatch.setattr("directdnsonly.app.utils.connect", _factory)
|
monkeypatch.setattr("directdnsonly.app.utils.connect", _factory)
|
||||||
monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory)
|
monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory)
|
||||||
monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory)
|
monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory)
|
||||||
|
monkeypatch.setattr("directdnsonly.app.api.status.connect", _factory)
|
||||||
return db_session
|
return db_session
|
||||||
|
|||||||
@@ -165,3 +165,37 @@ def test_reconcile_no_changes_when_zone_matches(mysql_backend):
|
|||||||
success, removed = mysql_backend.reconcile_zone_records("example.com", ZONE_DATA)
|
success, removed = mysql_backend.reconcile_zone_records("example.com", ZONE_DATA)
|
||||||
assert success
|
assert success
|
||||||
assert removed == 0
|
assert removed == 0
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# managed_by field
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_zone_sets_managed_by_directadmin(mysql_backend):
|
||||||
|
mysql_backend.write_zone("example.com", ZONE_DATA)
|
||||||
|
session = mysql_backend.Session()
|
||||||
|
zone = session.execute(
|
||||||
|
select(Zone).filter_by(zone_name="example.com.")
|
||||||
|
).scalar_one_or_none()
|
||||||
|
assert zone.managed_by == "directadmin"
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_zone_migrates_null_managed_by(mysql_backend):
|
||||||
|
"""Zones that pre-exist without managed_by get it set on next write."""
|
||||||
|
session = mysql_backend.Session()
|
||||||
|
zone = Zone(zone_name="example.com.", managed_by=None)
|
||||||
|
session.add(zone)
|
||||||
|
session.commit()
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
mysql_backend.write_zone("example.com", ZONE_DATA)
|
||||||
|
|
||||||
|
session = mysql_backend.Session()
|
||||||
|
zone = session.execute(
|
||||||
|
select(Zone).filter_by(zone_name="example.com.")
|
||||||
|
).scalar_one_or_none()
|
||||||
|
assert zone.managed_by == "directadmin"
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|||||||
@@ -24,7 +24,9 @@ def _make_json_response(domains_list, total_pages=1):
|
|||||||
|
|
||||||
|
|
||||||
def _client():
|
def _client():
|
||||||
return DirectAdminClient("da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True)
|
return DirectAdminClient(
|
||||||
|
"da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -105,7 +107,9 @@ def test_html_response_returns_none():
|
|||||||
|
|
||||||
|
|
||||||
def test_connection_error_returns_none():
|
def test_connection_error_returns_none():
|
||||||
with patch("requests.get", side_effect=requests.exceptions.ConnectionError("refused")):
|
with patch(
|
||||||
|
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
|
||||||
|
):
|
||||||
result = _client().list_domains()
|
result = _client().list_domains()
|
||||||
|
|
||||||
assert result is None
|
assert result is None
|
||||||
@@ -119,7 +123,9 @@ def test_timeout_returns_none():
|
|||||||
|
|
||||||
|
|
||||||
def test_ssl_error_returns_none():
|
def test_ssl_error_returns_none():
|
||||||
with patch("requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")):
|
with patch(
|
||||||
|
"requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")
|
||||||
|
):
|
||||||
result = _client().list_domains()
|
result = _client().list_domains()
|
||||||
|
|
||||||
assert result is None
|
assert result is None
|
||||||
@@ -131,12 +137,16 @@ def test_ssl_error_returns_none():
|
|||||||
|
|
||||||
|
|
||||||
def test_parse_standard_querystring():
|
def test_parse_standard_querystring():
|
||||||
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com&list[]=test.com")
|
result = DirectAdminClient._parse_legacy_domain_list(
|
||||||
|
"list[]=example.com&list[]=test.com"
|
||||||
|
)
|
||||||
assert result == {"example.com", "test.com"}
|
assert result == {"example.com", "test.com"}
|
||||||
|
|
||||||
|
|
||||||
def test_parse_newline_separated():
|
def test_parse_newline_separated():
|
||||||
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com\nlist[]=test.com")
|
result = DirectAdminClient._parse_legacy_domain_list(
|
||||||
|
"list[]=example.com\nlist[]=test.com"
|
||||||
|
)
|
||||||
assert result == {"example.com", "test.com"}
|
assert result == {"example.com", "test.com"}
|
||||||
|
|
||||||
|
|
||||||
@@ -190,3 +200,171 @@ def test_login_returns_false_on_exception():
|
|||||||
result = client._login()
|
result = client._login()
|
||||||
|
|
||||||
assert result is False
|
assert result is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# get_extra_dns_servers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _multi_server_get_resp(servers=None):
|
||||||
|
mock = MagicMock()
|
||||||
|
mock.status_code = 200
|
||||||
|
mock.is_redirect = False
|
||||||
|
mock.headers = {"Content-Type": "application/json"}
|
||||||
|
mock.json.return_value = {"CLUSTER_ON": "yes", "servers": servers or {}}
|
||||||
|
mock.raise_for_status = MagicMock()
|
||||||
|
return mock
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_extra_dns_servers_returns_servers_dict():
|
||||||
|
servers = {
|
||||||
|
"1.2.3.4": {"dns": "yes", "domain_check": "yes", "port": "2222", "ssl": "no"}
|
||||||
|
}
|
||||||
|
with patch("requests.get", return_value=_multi_server_get_resp(servers)):
|
||||||
|
result = _client().get_extra_dns_servers()
|
||||||
|
|
||||||
|
assert "1.2.3.4" in result
|
||||||
|
assert result["1.2.3.4"]["dns"] == "yes"
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_extra_dns_servers_returns_empty_on_http_error():
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.status_code = 500
|
||||||
|
with patch("requests.get", return_value=mock_resp):
|
||||||
|
result = _client().get_extra_dns_servers()
|
||||||
|
|
||||||
|
assert result == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_extra_dns_servers_returns_empty_on_connection_error():
|
||||||
|
with patch(
|
||||||
|
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
|
||||||
|
):
|
||||||
|
result = _client().get_extra_dns_servers()
|
||||||
|
|
||||||
|
assert result == {}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# add_extra_dns_server
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_extra_dns_server_returns_true_on_success():
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.status_code = 200
|
||||||
|
mock_resp.json.return_value = {"result": "", "success": "Connection Added"}
|
||||||
|
|
||||||
|
with patch("requests.post", return_value=mock_resp):
|
||||||
|
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_extra_dns_server_returns_false_on_da_error():
|
||||||
|
mock_resp = MagicMock()
|
||||||
|
mock_resp.status_code = 200
|
||||||
|
mock_resp.json.return_value = {"result": "Server already exists", "success": ""}
|
||||||
|
|
||||||
|
with patch("requests.post", return_value=mock_resp):
|
||||||
|
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_extra_dns_server_returns_false_on_connection_error():
|
||||||
|
with patch(
|
||||||
|
"requests.post", side_effect=requests.exceptions.ConnectionError("refused")
|
||||||
|
):
|
||||||
|
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# ensure_extra_dns_server
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _add_success_resp():
|
||||||
|
mock = MagicMock()
|
||||||
|
mock.status_code = 200
|
||||||
|
mock.json.return_value = {"result": "", "success": "Connection Added"}
|
||||||
|
return mock
|
||||||
|
|
||||||
|
|
||||||
|
def _save_success_resp():
|
||||||
|
mock = MagicMock()
|
||||||
|
mock.status_code = 200
|
||||||
|
mock.json.return_value = {"result": "", "success": "Connections Saved"}
|
||||||
|
return mock
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_extra_dns_server_adds_and_configures_new_server():
|
||||||
|
"""Server not yet registered — adds it, then saves dns+domain_check settings."""
|
||||||
|
with (
|
||||||
|
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
|
||||||
|
patch(
|
||||||
|
"requests.post",
|
||||||
|
side_effect=[_add_success_resp(), _save_success_resp()],
|
||||||
|
),
|
||||||
|
):
|
||||||
|
result = _client().ensure_extra_dns_server(
|
||||||
|
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_extra_dns_server_skips_add_when_already_present():
|
||||||
|
"""Server already registered — no add call, only saves settings."""
|
||||||
|
existing = {
|
||||||
|
"1.2.3.4": {"dns": "no", "domain_check": "no", "port": "2222", "ssl": "no"}
|
||||||
|
}
|
||||||
|
with (
|
||||||
|
patch("requests.get", return_value=_multi_server_get_resp(servers=existing)),
|
||||||
|
patch("requests.post", return_value=_save_success_resp()) as mock_post,
|
||||||
|
):
|
||||||
|
result = _client().ensure_extra_dns_server(
|
||||||
|
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
assert mock_post.call_count == 1 # save only, no add
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_extra_dns_server_returns_false_when_add_fails():
|
||||||
|
fail_resp = MagicMock()
|
||||||
|
fail_resp.status_code = 200
|
||||||
|
fail_resp.json.return_value = {"result": "error", "success": ""}
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
|
||||||
|
patch("requests.post", return_value=fail_resp),
|
||||||
|
):
|
||||||
|
result = _client().ensure_extra_dns_server(
|
||||||
|
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_extra_dns_server_returns_false_when_save_fails():
|
||||||
|
"""Add succeeds but the subsequent settings save fails."""
|
||||||
|
fail_save = MagicMock()
|
||||||
|
fail_save.status_code = 200
|
||||||
|
fail_save.json.return_value = {"result": "error", "success": ""}
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
|
||||||
|
patch(
|
||||||
|
"requests.post",
|
||||||
|
side_effect=[_add_success_resp(), fail_save],
|
||||||
|
),
|
||||||
|
):
|
||||||
|
result = _client().ensure_extra_dns_server(
|
||||||
|
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
|||||||
@@ -58,6 +58,139 @@ def test_peers_stored():
|
|||||||
assert worker.peers[0]["url"] == "http://ddo-2:2222"
|
assert worker.peers[0]["url"] == "http://ddo-2:2222"
|
||||||
|
|
||||||
|
|
||||||
|
def test_peer_from_env_var(monkeypatch):
|
||||||
|
"""DADNS_PEER_SYNC_PEER_URL adds a peer without a config file."""
|
||||||
|
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_URL", "http://ddo-env:2222")
|
||||||
|
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_USERNAME", "admin")
|
||||||
|
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_PASSWORD", "secret")
|
||||||
|
worker = PeerSyncWorker({"enabled": True})
|
||||||
|
assert len(worker.peers) == 1
|
||||||
|
assert worker.peers[0]["url"] == "http://ddo-env:2222"
|
||||||
|
assert worker.peers[0]["username"] == "admin"
|
||||||
|
assert worker.peers[0]["password"] == "secret"
|
||||||
|
|
||||||
|
|
||||||
|
def test_env_peer_not_duplicated_when_also_in_config(monkeypatch):
|
||||||
|
"""Env var peer is not added if it already appears in the config file peers list."""
|
||||||
|
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_URL", "http://ddo-2:2222")
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
# BASE_CONFIG already has http://ddo-2:2222 — must remain exactly one entry
|
||||||
|
urls = [p["url"] for p in worker.peers]
|
||||||
|
assert urls.count("http://ddo-2:2222") == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_numbered_env_peers(monkeypatch):
|
||||||
|
"""DADNS_PEER_SYNC_PEER_1_URL and _2_URL add multiple peers."""
|
||||||
|
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://node-a:2222")
|
||||||
|
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_USERNAME", "peersync")
|
||||||
|
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_PASSWORD", "s3cr3t")
|
||||||
|
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_2_URL", "http://node-b:2222")
|
||||||
|
worker = PeerSyncWorker({"enabled": True})
|
||||||
|
urls = [p["url"] for p in worker.peers]
|
||||||
|
assert "http://node-a:2222" in urls
|
||||||
|
assert "http://node-b:2222" in urls
|
||||||
|
assert len(urls) == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_numbered_env_peers_not_duplicated(monkeypatch):
|
||||||
|
"""Numbered env var peers are deduplicated against the config file list."""
|
||||||
|
monkeypatch.setenv("DADNS_PEER_SYNC_PEER_1_URL", "http://ddo-2:2222")
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
urls = [p["url"] for p in worker.peers]
|
||||||
|
assert urls.count("http://ddo-2:2222") == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_peer_urls():
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
assert worker.get_peer_urls() == ["http://ddo-2:2222"]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Health tracking
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_peer_health_starts_healthy():
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
h = worker._health("http://ddo-2:2222")
|
||||||
|
assert h["healthy"] is True
|
||||||
|
assert h["consecutive_failures"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_record_failure_increments_count():
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
|
||||||
|
assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 1
|
||||||
|
assert worker._health("http://ddo-2:2222")["healthy"] is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_record_failure_marks_degraded_at_threshold():
|
||||||
|
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
for _ in range(FAILURE_THRESHOLD):
|
||||||
|
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
|
||||||
|
assert worker._health("http://ddo-2:2222")["healthy"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_record_success_resets_health():
|
||||||
|
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
for _ in range(FAILURE_THRESHOLD):
|
||||||
|
worker._record_failure("http://ddo-2:2222", ConnectionError("down"))
|
||||||
|
assert not worker._health("http://ddo-2:2222")["healthy"]
|
||||||
|
worker._record_success("http://ddo-2:2222")
|
||||||
|
assert worker._health("http://ddo-2:2222")["healthy"] is True
|
||||||
|
assert worker._health("http://ddo-2:2222")["consecutive_failures"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Peer discovery (_discover_peers_from)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_discover_peers_adds_new_peer(monkeypatch):
|
||||||
|
"""New peer URL returned by /internal/peers is added to the peer list."""
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
|
||||||
|
def mock_get(url, auth=None, timeout=10, params=None):
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.status_code = 200
|
||||||
|
resp.json.return_value = ["http://node-c:2222"]
|
||||||
|
return resp
|
||||||
|
|
||||||
|
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
|
||||||
|
worker._discover_peers_from(BASE_CONFIG["peers"][0])
|
||||||
|
urls = [p["url"] for p in worker.peers]
|
||||||
|
assert "http://node-c:2222" in urls
|
||||||
|
|
||||||
|
|
||||||
|
def test_discover_peers_skips_known(monkeypatch):
|
||||||
|
"""Already-known peer URLs are not re-added."""
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
|
||||||
|
def mock_get(url, auth=None, timeout=10, params=None):
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.status_code = 200
|
||||||
|
resp.json.return_value = ["http://ddo-2:2222"] # already known
|
||||||
|
return resp
|
||||||
|
|
||||||
|
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
|
||||||
|
worker._discover_peers_from(BASE_CONFIG["peers"][0])
|
||||||
|
assert len(worker.peers) == 1 # unchanged
|
||||||
|
|
||||||
|
|
||||||
|
def test_discover_peers_tolerates_failure(monkeypatch):
|
||||||
|
"""Network error during discovery does not propagate."""
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
|
||||||
|
def mock_get(*args, **kwargs):
|
||||||
|
raise ConnectionError("peer down")
|
||||||
|
|
||||||
|
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
|
||||||
|
# Should not raise
|
||||||
|
worker._discover_peers_from(BASE_CONFIG["peers"][0])
|
||||||
|
|
||||||
|
|
||||||
def test_start_skips_when_disabled(caplog):
|
def test_start_skips_when_disabled(caplog):
|
||||||
worker = PeerSyncWorker({"enabled": False})
|
worker = PeerSyncWorker({"enabled": False})
|
||||||
worker.start()
|
worker.start()
|
||||||
@@ -261,3 +394,53 @@ def test_sync_empty_peer_list(patch_connect, monkeypatch):
|
|||||||
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
|
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
|
||||||
|
|
||||||
worker._sync_from_peer(_make_peer())
|
worker._sync_from_peer(_make_peer())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# get_peer_status
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_peer_status_no_contact_yet():
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
status = worker.get_peer_status()
|
||||||
|
|
||||||
|
assert status["enabled"] is True
|
||||||
|
assert status["total"] == 1
|
||||||
|
assert status["healthy"] == 1
|
||||||
|
assert status["degraded"] == 0
|
||||||
|
assert status["peers"][0]["url"] == "http://ddo-2:2222"
|
||||||
|
assert status["peers"][0]["healthy"] is True
|
||||||
|
assert status["peers"][0]["last_seen"] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_peer_status_after_success():
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
worker._record_success("http://ddo-2:2222")
|
||||||
|
status = worker.get_peer_status()
|
||||||
|
|
||||||
|
assert status["healthy"] == 1
|
||||||
|
assert status["degraded"] == 0
|
||||||
|
assert status["peers"][0]["last_seen"] is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_peer_status_after_degraded():
|
||||||
|
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
|
||||||
|
|
||||||
|
worker = PeerSyncWorker(BASE_CONFIG)
|
||||||
|
for _ in range(FAILURE_THRESHOLD):
|
||||||
|
worker._record_failure("http://ddo-2:2222", Exception("timeout"))
|
||||||
|
|
||||||
|
status = worker.get_peer_status()
|
||||||
|
assert status["healthy"] == 0
|
||||||
|
assert status["degraded"] == 1
|
||||||
|
assert status["peers"][0]["healthy"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_peer_status_disabled():
|
||||||
|
worker = PeerSyncWorker({})
|
||||||
|
status = worker.get_peer_status()
|
||||||
|
|
||||||
|
assert status["enabled"] is False
|
||||||
|
assert status["total"] == 0
|
||||||
|
assert status["peers"] == []
|
||||||
|
|||||||
@@ -55,7 +55,9 @@ DA_CLIENT_PATH = "directdnsonly.app.reconciler.DirectAdminClient"
|
|||||||
|
|
||||||
def _patch_da(return_value):
|
def _patch_da(return_value):
|
||||||
"""Patch DirectAdminClient so list_domains returns a fixed value."""
|
"""Patch DirectAdminClient so list_domains returns a fixed value."""
|
||||||
return patch(DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value})
|
return patch(
|
||||||
|
DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -233,7 +235,12 @@ def test_heal_skips_domains_without_zone_data(delete_queue, patch_connect):
|
|||||||
registry, _ = _make_backend_registry(zone_exists_return=False)
|
registry, _ = _make_backend_registry(zone_exists_return=False)
|
||||||
|
|
||||||
patch_connect.add(
|
patch_connect.add(
|
||||||
Domain(domain="nodata.com", hostname="da1.example.com", username="admin", zone_data=None)
|
Domain(
|
||||||
|
domain="nodata.com",
|
||||||
|
hostname="da1.example.com",
|
||||||
|
username="admin",
|
||||||
|
zone_data=None,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
patch_connect.commit()
|
patch_connect.commit()
|
||||||
|
|
||||||
@@ -310,3 +317,83 @@ def test_heal_skipped_when_no_registry(delete_queue, patch_connect):
|
|||||||
w._reconcile_all()
|
w._reconcile_all()
|
||||||
|
|
||||||
assert save_queue.empty()
|
assert save_queue.empty()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# get_status — last-run state
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_status_before_any_run(worker):
|
||||||
|
status = worker.get_status()
|
||||||
|
assert status["enabled"] is True
|
||||||
|
assert status["alive"] is False
|
||||||
|
assert status["last_run"] == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_status_after_run(worker, patch_connect):
|
||||||
|
with _patch_da(set()):
|
||||||
|
worker._reconcile_all()
|
||||||
|
|
||||||
|
s = worker.get_status()
|
||||||
|
assert s["enabled"] is True
|
||||||
|
lr = s["last_run"]
|
||||||
|
assert lr["status"] == "ok"
|
||||||
|
assert "started_at" in lr
|
||||||
|
assert "completed_at" in lr
|
||||||
|
assert "duration_seconds" in lr
|
||||||
|
assert lr["da_servers_polled"] == 1
|
||||||
|
assert lr["da_servers_unreachable"] == 0
|
||||||
|
assert lr["dry_run"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_status_counts_unreachable_server(worker, patch_connect):
|
||||||
|
with _patch_da(None):
|
||||||
|
worker._reconcile_all()
|
||||||
|
|
||||||
|
lr = worker.get_status()["last_run"]
|
||||||
|
assert lr["da_servers_polled"] == 1
|
||||||
|
assert lr["da_servers_unreachable"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_status_counts_orphans(worker, delete_queue, patch_connect):
|
||||||
|
patch_connect.add(
|
||||||
|
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
|
||||||
|
)
|
||||||
|
patch_connect.commit()
|
||||||
|
|
||||||
|
with _patch_da(set()):
|
||||||
|
worker._reconcile_all()
|
||||||
|
|
||||||
|
lr = worker.get_status()["last_run"]
|
||||||
|
assert lr["orphans_found"] == 1
|
||||||
|
assert lr["orphans_queued"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_status_dry_run_orphans_not_queued_in_stats(dry_run_worker, patch_connect):
|
||||||
|
patch_connect.add(
|
||||||
|
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
|
||||||
|
)
|
||||||
|
patch_connect.commit()
|
||||||
|
|
||||||
|
with _patch_da(set()):
|
||||||
|
dry_run_worker._reconcile_all()
|
||||||
|
|
||||||
|
lr = dry_run_worker.get_status()["last_run"]
|
||||||
|
assert lr["dry_run"] is True
|
||||||
|
assert lr["orphans_found"] == 1
|
||||||
|
assert lr["orphans_queued"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_status_zones_in_db_counted(worker, patch_connect):
|
||||||
|
for d in ["a.com", "b.com", "c.com"]:
|
||||||
|
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
|
||||||
|
patch_connect.commit()
|
||||||
|
|
||||||
|
with _patch_da({"a.com", "b.com", "c.com"}):
|
||||||
|
worker._reconcile_all()
|
||||||
|
|
||||||
|
lr = worker.get_status()["last_run"]
|
||||||
|
assert lr["zones_in_db"] == 3
|
||||||
|
assert lr["zones_in_da"] == 3
|
||||||
|
assert lr["orphans_found"] == 0
|
||||||
|
|||||||
162
tests/test_status_api.py
Normal file
162
tests/test_status_api.py
Normal file
@@ -0,0 +1,162 @@
|
|||||||
|
"""Tests for directdnsonly.app.api.status — StatusAPI."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
import cherrypy
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from directdnsonly.app.api.status import StatusAPI
|
||||||
|
from directdnsonly.app.db.models import Domain
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_RECONCILER_OK = {
|
||||||
|
"enabled": True,
|
||||||
|
"alive": True,
|
||||||
|
"dry_run": False,
|
||||||
|
"interval_minutes": 60,
|
||||||
|
"last_run": {},
|
||||||
|
}
|
||||||
|
_PEER_SYNC_OFF = {
|
||||||
|
"enabled": False,
|
||||||
|
"alive": False,
|
||||||
|
"peers": [],
|
||||||
|
"total": 0,
|
||||||
|
"healthy": 0,
|
||||||
|
"degraded": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _qs(**overrides):
|
||||||
|
base = {
|
||||||
|
"save_queue_size": 0,
|
||||||
|
"delete_queue_size": 0,
|
||||||
|
"retry_queue_size": 0,
|
||||||
|
"dead_letters": 0,
|
||||||
|
"save_worker_alive": True,
|
||||||
|
"delete_worker_alive": True,
|
||||||
|
"retry_worker_alive": True,
|
||||||
|
"reconciler": _RECONCILER_OK,
|
||||||
|
"peer_sync": _PEER_SYNC_OFF,
|
||||||
|
}
|
||||||
|
base.update(overrides)
|
||||||
|
return base
|
||||||
|
|
||||||
|
|
||||||
|
def _api(qs=None):
|
||||||
|
wm = MagicMock()
|
||||||
|
wm.queue_status.return_value = qs or _qs()
|
||||||
|
return StatusAPI(wm)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _compute_overall
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_overall_ok_all_healthy():
|
||||||
|
assert StatusAPI._compute_overall(_qs()) == "ok"
|
||||||
|
|
||||||
|
|
||||||
|
def test_overall_error_save_worker_dead():
|
||||||
|
assert StatusAPI._compute_overall(_qs(save_worker_alive=False)) == "error"
|
||||||
|
|
||||||
|
|
||||||
|
def test_overall_error_delete_worker_dead():
|
||||||
|
assert StatusAPI._compute_overall(_qs(delete_worker_alive=False)) == "error"
|
||||||
|
|
||||||
|
|
||||||
|
def test_overall_degraded_retries_pending():
|
||||||
|
assert StatusAPI._compute_overall(_qs(retry_queue_size=3)) == "degraded"
|
||||||
|
|
||||||
|
|
||||||
|
def test_overall_degraded_dead_letters():
|
||||||
|
assert StatusAPI._compute_overall(_qs(dead_letters=1)) == "degraded"
|
||||||
|
|
||||||
|
|
||||||
|
def test_overall_degraded_peer_unhealthy():
|
||||||
|
ps = {**_PEER_SYNC_OFF, "degraded": 1}
|
||||||
|
assert StatusAPI._compute_overall(_qs(peer_sync=ps)) == "degraded"
|
||||||
|
|
||||||
|
|
||||||
|
def test_overall_error_takes_priority_over_degraded():
|
||||||
|
"""error > degraded when both conditions are true."""
|
||||||
|
assert (
|
||||||
|
StatusAPI._compute_overall(
|
||||||
|
_qs(save_worker_alive=False, retry_queue_size=5)
|
||||||
|
)
|
||||||
|
== "error"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _build — structure and zone count
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_structure(patch_connect):
|
||||||
|
api = _api()
|
||||||
|
result = api._build()
|
||||||
|
|
||||||
|
assert "status" in result
|
||||||
|
assert "queues" in result
|
||||||
|
assert "workers" in result
|
||||||
|
assert "reconciler" in result
|
||||||
|
assert "peer_sync" in result
|
||||||
|
assert "zones" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_zone_count_zero(patch_connect):
|
||||||
|
api = _api()
|
||||||
|
result = api._build()
|
||||||
|
assert result["zones"]["total"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_zone_count_with_domains(patch_connect):
|
||||||
|
for d in ["a.com", "b.com", "c.com"]:
|
||||||
|
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
|
||||||
|
patch_connect.commit()
|
||||||
|
|
||||||
|
api = _api()
|
||||||
|
result = api._build()
|
||||||
|
assert result["zones"]["total"] == 3
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_queues_forwarded(patch_connect):
|
||||||
|
api = _api(_qs(save_queue_size=2, delete_queue_size=1, retry_queue_size=3, dead_letters=1))
|
||||||
|
result = api._build()
|
||||||
|
|
||||||
|
assert result["queues"]["save"] == 2
|
||||||
|
assert result["queues"]["delete"] == 1
|
||||||
|
assert result["queues"]["retry"] == 3
|
||||||
|
assert result["queues"]["dead_letters"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_workers_forwarded(patch_connect):
|
||||||
|
api = _api()
|
||||||
|
result = api._build()
|
||||||
|
|
||||||
|
assert result["workers"]["save"] is True
|
||||||
|
assert result["workers"]["delete"] is True
|
||||||
|
assert result["workers"]["retry_drain"] is True
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# index — JSON encoding
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_index_returns_valid_json(patch_connect):
|
||||||
|
api = _api()
|
||||||
|
with MagicMock() as mock_resp:
|
||||||
|
cherrypy.response = mock_resp
|
||||||
|
cherrypy.response.headers = {}
|
||||||
|
body = api.index()
|
||||||
|
|
||||||
|
data = json.loads(body)
|
||||||
|
assert data["status"] == "ok"
|
||||||
|
assert isinstance(data["zones"]["total"], int)
|
||||||
Reference in New Issue
Block a user