Compare commits

...

2 Commits

Author SHA1 Message Date
db60d808de feat: operational status endpoint + reconciler/peer state tracking 📊
- ReconciliationWorker._last_run stores per-pass stats (da_servers_polled,
  zones_in_da/db, orphans_found/queued, hostnames_backfilled/migrated,
  zones_healed, duration_seconds, dry_run flag)
- ReconciliationWorker.get_status() exposes state for API/UI consumption
- _heal_backends() now returns healed count
- PeerSyncWorker.get_peer_status() serialises _peer_health to JSON-safe dict
  (url, healthy, consecutive_failures, last_seen) with summary totals
- WorkerManager tracks dead-letter count; queue_status() now returns nested
  reconciler/peer_sync dicts replacing flat reconciler_alive/peer_syncer_alive
- New GET /status endpoint (StatusAPI) aggregates queue depths, worker liveness,
  reconciler last-run, peer health, and live zone count; computes ok/degraded/error
- .gitignore: exclude .claude/, .vscode/, .env (always local)
- app.yml: add documented datastore section (SQLite default + MySQL commented)
- 164 tests passing (23 new tests added)
2026-02-25 18:51:56 +13:00
0f417da204 feat: add CMD_MULTI_SERVER methods to DirectAdminClient 🔌
Adds get_extra_dns_servers(), add_extra_dns_server(), and the
high-level ensure_extra_dns_server() which registers a node and
enforces dns=yes + domain_check=yes in a single call.  Also adds
the generic post() helper.  10 new tests, 141 total.
2026-02-25 16:29:21 +13:00
13 changed files with 789 additions and 19 deletions

6
.gitignore vendored
View File

@@ -26,3 +26,9 @@ build
*.mypy_cache *.mypy_cache
*.pytest_cache *.pytest_cache
/data/* /data/*
# Editor / tool settings — always local, never committed
.vscode/
.claude/
.env
*.env

View File

@@ -0,0 +1,82 @@
"""Operational status endpoint — aggregates queue, worker, reconciler, and peer health."""
import json
import cherrypy
from sqlalchemy import func, select
from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain
class StatusAPI:
"""Exposes GET /status as a JSON health/status document.
Aggregates data from WorkerManager.queue_status() and a live DB zone count
into a single response that a UI or monitoring system can poll.
Overall ``status`` field:
- ``ok`` — all workers alive, no dead-letters, all peers healthy
- ``degraded`` — retries pending, dead-letters present, or a peer is unhealthy
- ``error`` — a core worker thread is not alive
"""
def __init__(self, worker_manager):
self._wm = worker_manager
@cherrypy.expose
def index(self):
cherrypy.response.headers["Content-Type"] = "application/json"
return json.dumps(self._build(), default=str).encode()
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _build(self) -> dict:
qs = self._wm.queue_status()
zone_count = self._zone_count()
overall = self._compute_overall(qs)
return {
"status": overall,
"queues": {
"save": qs.get("save_queue_size", 0),
"delete": qs.get("delete_queue_size", 0),
"retry": qs.get("retry_queue_size", 0),
"dead_letters": qs.get("dead_letters", 0),
},
"workers": {
"save": qs.get("save_worker_alive"),
"delete": qs.get("delete_worker_alive"),
"retry_drain": qs.get("retry_worker_alive"),
},
"reconciler": qs.get("reconciler", {}),
"peer_sync": qs.get("peer_sync", {}),
"zones": {"total": zone_count},
}
@staticmethod
def _zone_count() -> int:
session = connect()
try:
return session.execute(select(func.count(Domain.id))).scalar() or 0
except Exception:
return 0
finally:
session.close()
@staticmethod
def _compute_overall(qs: dict) -> str:
if not qs.get("save_worker_alive") or not qs.get("delete_worker_alive"):
return "error"
peer_sync = qs.get("peer_sync", {})
if (
qs.get("retry_queue_size", 0) > 0
or qs.get("dead_letters", 0) > 0
or peer_sync.get("degraded", 0) > 0
):
return "degraded"
return "ok"

View File

@@ -161,6 +161,136 @@ class DirectAdminClient:
logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}") logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}")
return None return None
def post(
self, command: str, data: Optional[dict] = None
) -> Optional[requests.Response]:
"""Authenticated POST to any DA CMD_* endpoint."""
url = f"{self.scheme}://{self.hostname}:{self.port}/{command}"
kwargs: dict = dict(
data=data or {},
timeout=30,
verify=self.verify_ssl,
allow_redirects=False,
)
if self._cookies:
kwargs["cookies"] = self._cookies
else:
kwargs["auth"] = (self.username, self.password)
try:
return requests.post(url, **kwargs)
except Exception as exc:
logger.error(f"[da:{self.hostname}] POST {command} failed: {exc}")
return None
def get_extra_dns_servers(self) -> dict:
"""Return the Extra DNS server map from CMD_MULTI_SERVER (GET).
Returns a dict keyed by server hostname/IP, each value being the
per-server settings dict (dns, domain_check, port, user, ssl, …).
Returns ``{}`` on any error.
"""
resp = self.get("CMD_MULTI_SERVER", params={"json": "yes"})
if resp is None or resp.status_code != 200:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER GET failed")
return {}
try:
return resp.json().get("servers", {})
except Exception as exc:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER parse error: {exc}")
return {}
def add_extra_dns_server(
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
) -> bool:
"""Register a new Extra DNS server via CMD_MULTI_SERVER action=add.
Returns ``True`` if DA reports success, ``False`` otherwise.
"""
resp = self.post(
"CMD_MULTI_SERVER",
data={
"action": "add",
"json": "yes",
"ip": ip,
"port": str(port),
"user": user,
"passwd": passwd,
"ssl": "yes" if ssl else "no",
},
)
if resp is None or resp.status_code != 200:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add failed for {ip}")
return False
try:
result = resp.json()
if result.get("success"):
logger.info(f"[da:{self.hostname}] Added Extra DNS server {ip}")
return True
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER add error: {result.get('result', result)}"
)
return False
except Exception as exc:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add parse error: {exc}")
return False
def ensure_extra_dns_server(
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
) -> bool:
"""Add (if absent) and configure a directdnsonly Extra DNS server.
Ensures the server is registered with ``dns=yes`` and
``domain_check=yes`` so DirectAdmin pushes zone updates to it.
Returns ``True`` if fully configured, ``False`` on any failure.
"""
servers = self.get_extra_dns_servers()
if ip not in servers:
if not self.add_extra_dns_server(ip, port, user, passwd, ssl):
return False
ssl_str = "yes" if ssl else "no"
resp = self.post(
"CMD_MULTI_SERVER",
data={
"action": "multiple",
"save": "yes",
"json": "yes",
"passwd": "",
"select0": ip,
f"port-{ip}": str(port),
f"user-{ip}": user,
f"ssl-{ip}": ssl_str,
f"dns-{ip}": "yes",
f"domain_check-{ip}": "yes",
f"user_check-{ip}": "no",
f"email-{ip}": "no",
f"show_all_users-{ip}": "no",
},
)
if resp is None or resp.status_code != 200:
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save failed for {ip}"
)
return False
try:
result = resp.json()
if result.get("success"):
logger.info(
f"[da:{self.hostname}] Extra DNS server {ip} configured "
f"(dns=yes domain_check=yes)"
)
return True
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save error: {result.get('result', result)}"
)
return False
except Exception as exc:
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save parse error: {exc}"
)
return False
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Internal # Internal
# ------------------------------------------------------------------ # ------------------------------------------------------------------

View File

@@ -140,6 +140,30 @@ class PeerSyncWorker:
Exposed via /internal/peers so other nodes can discover this node's mesh.""" Exposed via /internal/peers so other nodes can discover this node's mesh."""
return [p["url"] for p in self.peers if p.get("url")] return [p["url"] for p in self.peers if p.get("url")]
def get_peer_status(self) -> dict:
"""Return peer health summary for the /status endpoint."""
peers = []
for peer in self.peers:
url = peer.get("url", "")
h = self._peer_health.get(url, {})
last_seen = h.get("last_seen")
peers.append({
"url": url,
"healthy": h.get("healthy", True),
"consecutive_failures": h.get("consecutive_failures", 0),
"last_seen": last_seen.isoformat() if last_seen else None,
})
healthy = sum(1 for p in peers if p["healthy"])
return {
"enabled": self.enabled,
"alive": self.is_alive,
"interval_minutes": self.interval_seconds // 60,
"peers": peers,
"total": len(peers),
"healthy": healthy,
"degraded": len(peers) - healthy,
}
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Health tracking # Health tracking
# ------------------------------------------------------------------ # ------------------------------------------------------------------

View File

@@ -1,4 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import datetime
import threading import threading
from loguru import logger from loguru import logger
from sqlalchemy import select from sqlalchemy import select
@@ -42,6 +43,17 @@ class ReconciliationWorker:
self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60 self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._thread = None self._thread = None
self._last_run: dict = {}
def get_status(self) -> dict:
"""Return reconciler configuration and last-run statistics."""
return {
"enabled": self.enabled,
"alive": self.is_alive,
"dry_run": self.dry_run,
"interval_minutes": self.interval_seconds // 60,
"last_run": dict(self._last_run),
}
def start(self): def start(self):
if not self.enabled: if not self.enabled:
@@ -104,11 +116,18 @@ class ReconciliationWorker:
self._reconcile_all() self._reconcile_all()
def _reconcile_all(self): def _reconcile_all(self):
started_at = datetime.datetime.utcnow()
self._last_run = {"status": "running", "started_at": started_at.isoformat()}
logger.info( logger.info(
f"[reconciler] Starting reconciliation pass across " f"[reconciler] Starting reconciliation pass across "
f"{len(self.servers)} server(s)" f"{len(self.servers)} server(s)"
) )
total_queued = 0 total_queued = 0
da_servers_polled = 0
da_servers_unreachable = 0
migrated = 0
backfilled = 0
zones_in_db = 0
# Build a map of all domains seen on all DA servers: domain -> hostname # Build a map of all domains seen on all DA servers: domain -> hostname
all_da_domains: dict = {} all_da_domains: dict = {}
@@ -126,23 +145,26 @@ class ReconciliationWorker:
ssl=server.get("ssl", True), ssl=server.get("ssl", True),
verify_ssl=self.verify_ssl, verify_ssl=self.verify_ssl,
) )
da_servers_polled += 1
da_domains = client.list_domains(ipp=self.ipp) da_domains = client.list_domains(ipp=self.ipp)
if da_domains is not None: if da_domains is not None:
for d in da_domains: for d in da_domains:
all_da_domains[d] = hostname all_da_domains[d] = hostname
else:
da_servers_unreachable += 1
logger.debug( logger.debug(
f"[reconciler] {hostname}: " f"[reconciler] {hostname}: "
f"{len(da_domains) if da_domains else 0} active domain(s) in DA" f"{len(da_domains) if da_domains else 0} active domain(s) in DA"
) )
except Exception as exc: except Exception as exc:
logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}") logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}")
da_servers_unreachable += 1
# Compare local DB against what DA reported; update masters and queue deletes # Compare local DB against what DA reported; update masters and queue deletes
session = connect() session = connect()
try: try:
all_local_domains = session.execute(select(Domain)).scalars().all() all_local_domains = session.execute(select(Domain)).scalars().all()
migrated = 0 zones_in_db = len(all_local_domains)
backfilled = 0
known_servers = {s.get("hostname") for s in self.servers} known_servers = {s.get("hostname") for s in self.servers}
for record in all_local_domains: for record in all_local_domains:
domain = record.domain domain = record.domain
@@ -209,10 +231,31 @@ class ReconciliationWorker:
) )
# Option C: heal backends that are missing zones # Option C: heal backends that are missing zones
zones_healed = 0
if self.save_queue is not None and self.backend_registry is not None: if self.save_queue is not None and self.backend_registry is not None:
self._heal_backends() zones_healed = self._heal_backends()
def _heal_backends(self): completed_at = datetime.datetime.utcnow()
self._last_run = {
"status": "ok",
"started_at": started_at.isoformat(),
"completed_at": completed_at.isoformat(),
"duration_seconds": round(
(completed_at - started_at).total_seconds(), 1
),
"da_servers_polled": da_servers_polled,
"da_servers_unreachable": da_servers_unreachable,
"zones_in_da": len(all_da_domains),
"zones_in_db": zones_in_db,
"orphans_found": total_queued,
"orphans_queued": total_queued if not self.dry_run else 0,
"hostnames_backfilled": backfilled,
"hostnames_migrated": migrated,
"zones_healed": zones_healed,
"dry_run": self.dry_run,
}
def _heal_backends(self) -> int:
"""Check every backend for zone presence and re-queue any zone that is """Check every backend for zone presence and re-queue any zone that is
missing from one or more backends, using the stored zone_data as the missing from one or more backends, using the stored zone_data as the
authoritative source. This corrects backends that missed pushes due to authoritative source. This corrects backends that missed pushes due to
@@ -220,9 +263,10 @@ class ReconciliationWorker:
""" """
backends = self.backend_registry.get_available_backends() backends = self.backend_registry.get_available_backends()
if not backends: if not backends:
return return 0
session = connect() session = connect()
healed = 0
try: try:
domains = session.execute( domains = session.execute(
select(Domain).where(Domain.zone_data.isnot(None)) select(Domain).where(Domain.zone_data.isnot(None))
@@ -231,9 +275,7 @@ class ReconciliationWorker:
logger.debug( logger.debug(
"[reconciler] Healing pass: no zone_data stored yet — skipping" "[reconciler] Healing pass: no zone_data stored yet — skipping"
) )
return return 0
healed = 0
for record in domains: for record in domains:
missing = [] missing = []
for backend_name, backend in backends.items(): for backend_name, backend in backends.items():
@@ -277,3 +319,4 @@ class ReconciliationWorker:
) )
finally: finally:
session.close() session.close()
return healed

View File

@@ -3,6 +3,20 @@ timezone: Pacific/Auckland
log_level: INFO log_level: INFO
queue_location: ./data/queues queue_location: ./data/queues
# Application datastore — stores domain index and zone_data for healing/peer-sync.
# SQLite (default) requires no extra dependencies and is fine for single-node setups.
# MySQL is recommended for multi-node deployments with a shared datastore.
datastore:
type: sqlite
db_location: ./data/directdnsonly.db
# --- MySQL ---
# type: mysql
# host: "127.0.0.1"
# port: "3306"
# name: "directdnsonly"
# user: "directdnsonly"
# pass: "changeme"
app: app:
auth_username: directdnsonly auth_username: directdnsonly
auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var

View File

@@ -4,6 +4,7 @@ from app.backends import BackendRegistry
from app.api.admin import DNSAdminAPI from app.api.admin import DNSAdminAPI
from app.api.health import HealthAPI from app.api.health import HealthAPI
from app.api.internal import InternalAPI from app.api.internal import InternalAPI
from app.api.status import StatusAPI
from app import configure_logging from app import configure_logging
from worker import WorkerManager from worker import WorkerManager
from directdnsonly.config import config from directdnsonly.config import config
@@ -110,8 +111,9 @@ def main():
) )
root.health = HealthAPI(registry) root.health = HealthAPI(registry)
root.internal = InternalAPI(peer_syncer=worker_manager._peer_syncer) root.internal = InternalAPI(peer_syncer=worker_manager._peer_syncer)
root.status = StatusAPI(worker_manager)
# Add queue status endpoint # Add queue status endpoint (debug)
root.queue_status = lambda: worker_manager.queue_status() root.queue_status = lambda: worker_manager.queue_status()
# Override auth for /internal so peers use their own credentials # Override auth for /internal so peers use their own credentials

View File

@@ -43,6 +43,7 @@ class WorkerManager:
self._peer_syncer = None self._peer_syncer = None
self._reconciliation_config = reconciliation_config or {} self._reconciliation_config = reconciliation_config or {}
self._peer_sync_config = peer_sync_config or {} self._peer_sync_config = peer_sync_config or {}
self._dead_letter_count = 0
try: try:
os.makedirs(queue_path, exist_ok=True) os.makedirs(queue_path, exist_ok=True)
@@ -205,6 +206,7 @@ class WorkerManager:
Discards to dead-letter after MAX_RETRIES attempts.""" Discards to dead-letter after MAX_RETRIES attempts."""
retry_count = item.get("retry_count", 0) + 1 retry_count = item.get("retry_count", 0) + 1
if retry_count > MAX_RETRIES: if retry_count > MAX_RETRIES:
self._dead_letter_count += 1
logger.error( logger.error(
f"[retry] Dead-letter: {item['domain']} failed on " f"[retry] Dead-letter: {item['domain']} failed on "
f"{failed_backends} after {MAX_RETRIES} attempts — giving up" f"{failed_backends} after {MAX_RETRIES} attempts — giving up"
@@ -496,18 +498,24 @@ class WorkerManager:
logger.info("Workers stopped") logger.info("Workers stopped")
def queue_status(self): def queue_status(self):
reconciler = (
self._reconciler.get_status()
if self._reconciler
else {"enabled": False, "alive": False, "last_run": {}}
)
peer_sync = (
self._peer_syncer.get_peer_status()
if self._peer_syncer
else {"enabled": False, "alive": False, "peers": [], "total": 0, "healthy": 0, "degraded": 0}
)
return { return {
"save_queue_size": self.save_queue.qsize(), "save_queue_size": self.save_queue.qsize(),
"delete_queue_size": self.delete_queue.qsize(), "delete_queue_size": self.delete_queue.qsize(),
"retry_queue_size": self.retry_queue.qsize(), "retry_queue_size": self.retry_queue.qsize(),
"save_worker_alive": self._save_thread and self._save_thread.is_alive(), "dead_letters": self._dead_letter_count,
"delete_worker_alive": self._delete_thread "save_worker_alive": bool(self._save_thread and self._save_thread.is_alive()),
and self._delete_thread.is_alive(), "delete_worker_alive": bool(self._delete_thread and self._delete_thread.is_alive()),
"retry_worker_alive": self._retry_thread and self._retry_thread.is_alive(), "retry_worker_alive": bool(self._retry_thread and self._retry_thread.is_alive()),
"reconciler_alive": ( "reconciler": reconciler,
self._reconciler.is_alive if self._reconciler else False "peer_sync": peer_sync,
),
"peer_syncer_alive": (
self._peer_syncer.is_alive if self._peer_syncer else False
),
} }

View File

@@ -38,4 +38,5 @@ def patch_connect(db_session, monkeypatch):
monkeypatch.setattr("directdnsonly.app.utils.connect", _factory) monkeypatch.setattr("directdnsonly.app.utils.connect", _factory)
monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory) monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory)
monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory) monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory)
monkeypatch.setattr("directdnsonly.app.api.status.connect", _factory)
return db_session return db_session

View File

@@ -200,3 +200,171 @@ def test_login_returns_false_on_exception():
result = client._login() result = client._login()
assert result is False assert result is False
# ---------------------------------------------------------------------------
# get_extra_dns_servers
# ---------------------------------------------------------------------------
def _multi_server_get_resp(servers=None):
mock = MagicMock()
mock.status_code = 200
mock.is_redirect = False
mock.headers = {"Content-Type": "application/json"}
mock.json.return_value = {"CLUSTER_ON": "yes", "servers": servers or {}}
mock.raise_for_status = MagicMock()
return mock
def test_get_extra_dns_servers_returns_servers_dict():
servers = {
"1.2.3.4": {"dns": "yes", "domain_check": "yes", "port": "2222", "ssl": "no"}
}
with patch("requests.get", return_value=_multi_server_get_resp(servers)):
result = _client().get_extra_dns_servers()
assert "1.2.3.4" in result
assert result["1.2.3.4"]["dns"] == "yes"
def test_get_extra_dns_servers_returns_empty_on_http_error():
mock_resp = MagicMock()
mock_resp.status_code = 500
with patch("requests.get", return_value=mock_resp):
result = _client().get_extra_dns_servers()
assert result == {}
def test_get_extra_dns_servers_returns_empty_on_connection_error():
with patch(
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
):
result = _client().get_extra_dns_servers()
assert result == {}
# ---------------------------------------------------------------------------
# add_extra_dns_server
# ---------------------------------------------------------------------------
def test_add_extra_dns_server_returns_true_on_success():
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {"result": "", "success": "Connection Added"}
with patch("requests.post", return_value=mock_resp):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is True
def test_add_extra_dns_server_returns_false_on_da_error():
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {"result": "Server already exists", "success": ""}
with patch("requests.post", return_value=mock_resp):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is False
def test_add_extra_dns_server_returns_false_on_connection_error():
with patch(
"requests.post", side_effect=requests.exceptions.ConnectionError("refused")
):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is False
# ---------------------------------------------------------------------------
# ensure_extra_dns_server
# ---------------------------------------------------------------------------
def _add_success_resp():
mock = MagicMock()
mock.status_code = 200
mock.json.return_value = {"result": "", "success": "Connection Added"}
return mock
def _save_success_resp():
mock = MagicMock()
mock.status_code = 200
mock.json.return_value = {"result": "", "success": "Connections Saved"}
return mock
def test_ensure_extra_dns_server_adds_and_configures_new_server():
"""Server not yet registered — adds it, then saves dns+domain_check settings."""
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch(
"requests.post",
side_effect=[_add_success_resp(), _save_success_resp()],
),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is True
def test_ensure_extra_dns_server_skips_add_when_already_present():
"""Server already registered — no add call, only saves settings."""
existing = {
"1.2.3.4": {"dns": "no", "domain_check": "no", "port": "2222", "ssl": "no"}
}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers=existing)),
patch("requests.post", return_value=_save_success_resp()) as mock_post,
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is True
assert mock_post.call_count == 1 # save only, no add
def test_ensure_extra_dns_server_returns_false_when_add_fails():
fail_resp = MagicMock()
fail_resp.status_code = 200
fail_resp.json.return_value = {"result": "error", "success": ""}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch("requests.post", return_value=fail_resp),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is False
def test_ensure_extra_dns_server_returns_false_when_save_fails():
"""Add succeeds but the subsequent settings save fails."""
fail_save = MagicMock()
fail_save.status_code = 200
fail_save.json.return_value = {"result": "error", "success": ""}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch(
"requests.post",
side_effect=[_add_success_resp(), fail_save],
),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is False

View File

@@ -394,3 +394,53 @@ def test_sync_empty_peer_list(patch_connect, monkeypatch):
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._sync_from_peer(_make_peer()) worker._sync_from_peer(_make_peer())
# ---------------------------------------------------------------------------
# get_peer_status
# ---------------------------------------------------------------------------
def test_get_peer_status_no_contact_yet():
worker = PeerSyncWorker(BASE_CONFIG)
status = worker.get_peer_status()
assert status["enabled"] is True
assert status["total"] == 1
assert status["healthy"] == 1
assert status["degraded"] == 0
assert status["peers"][0]["url"] == "http://ddo-2:2222"
assert status["peers"][0]["healthy"] is True
assert status["peers"][0]["last_seen"] is None
def test_get_peer_status_after_success():
worker = PeerSyncWorker(BASE_CONFIG)
worker._record_success("http://ddo-2:2222")
status = worker.get_peer_status()
assert status["healthy"] == 1
assert status["degraded"] == 0
assert status["peers"][0]["last_seen"] is not None
def test_get_peer_status_after_degraded():
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
worker = PeerSyncWorker(BASE_CONFIG)
for _ in range(FAILURE_THRESHOLD):
worker._record_failure("http://ddo-2:2222", Exception("timeout"))
status = worker.get_peer_status()
assert status["healthy"] == 0
assert status["degraded"] == 1
assert status["peers"][0]["healthy"] is False
def test_get_peer_status_disabled():
worker = PeerSyncWorker({})
status = worker.get_peer_status()
assert status["enabled"] is False
assert status["total"] == 0
assert status["peers"] == []

View File

@@ -317,3 +317,83 @@ def test_heal_skipped_when_no_registry(delete_queue, patch_connect):
w._reconcile_all() w._reconcile_all()
assert save_queue.empty() assert save_queue.empty()
# ---------------------------------------------------------------------------
# get_status — last-run state
# ---------------------------------------------------------------------------
def test_get_status_before_any_run(worker):
status = worker.get_status()
assert status["enabled"] is True
assert status["alive"] is False
assert status["last_run"] == {}
def test_get_status_after_run(worker, patch_connect):
with _patch_da(set()):
worker._reconcile_all()
s = worker.get_status()
assert s["enabled"] is True
lr = s["last_run"]
assert lr["status"] == "ok"
assert "started_at" in lr
assert "completed_at" in lr
assert "duration_seconds" in lr
assert lr["da_servers_polled"] == 1
assert lr["da_servers_unreachable"] == 0
assert lr["dry_run"] is False
def test_get_status_counts_unreachable_server(worker, patch_connect):
with _patch_da(None):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["da_servers_polled"] == 1
assert lr["da_servers_unreachable"] == 1
def test_get_status_counts_orphans(worker, delete_queue, patch_connect):
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with _patch_da(set()):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["orphans_found"] == 1
assert lr["orphans_queued"] == 1
def test_get_status_dry_run_orphans_not_queued_in_stats(dry_run_worker, patch_connect):
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with _patch_da(set()):
dry_run_worker._reconcile_all()
lr = dry_run_worker.get_status()["last_run"]
assert lr["dry_run"] is True
assert lr["orphans_found"] == 1
assert lr["orphans_queued"] == 0
def test_get_status_zones_in_db_counted(worker, patch_connect):
for d in ["a.com", "b.com", "c.com"]:
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
patch_connect.commit()
with _patch_da({"a.com", "b.com", "c.com"}):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["zones_in_db"] == 3
assert lr["zones_in_da"] == 3
assert lr["orphans_found"] == 0

162
tests/test_status_api.py Normal file
View File

@@ -0,0 +1,162 @@
"""Tests for directdnsonly.app.api.status — StatusAPI."""
import json
from unittest.mock import MagicMock
import cherrypy
import pytest
from directdnsonly.app.api.status import StatusAPI
from directdnsonly.app.db.models import Domain
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_RECONCILER_OK = {
"enabled": True,
"alive": True,
"dry_run": False,
"interval_minutes": 60,
"last_run": {},
}
_PEER_SYNC_OFF = {
"enabled": False,
"alive": False,
"peers": [],
"total": 0,
"healthy": 0,
"degraded": 0,
}
def _qs(**overrides):
base = {
"save_queue_size": 0,
"delete_queue_size": 0,
"retry_queue_size": 0,
"dead_letters": 0,
"save_worker_alive": True,
"delete_worker_alive": True,
"retry_worker_alive": True,
"reconciler": _RECONCILER_OK,
"peer_sync": _PEER_SYNC_OFF,
}
base.update(overrides)
return base
def _api(qs=None):
wm = MagicMock()
wm.queue_status.return_value = qs or _qs()
return StatusAPI(wm)
# ---------------------------------------------------------------------------
# _compute_overall
# ---------------------------------------------------------------------------
def test_overall_ok_all_healthy():
assert StatusAPI._compute_overall(_qs()) == "ok"
def test_overall_error_save_worker_dead():
assert StatusAPI._compute_overall(_qs(save_worker_alive=False)) == "error"
def test_overall_error_delete_worker_dead():
assert StatusAPI._compute_overall(_qs(delete_worker_alive=False)) == "error"
def test_overall_degraded_retries_pending():
assert StatusAPI._compute_overall(_qs(retry_queue_size=3)) == "degraded"
def test_overall_degraded_dead_letters():
assert StatusAPI._compute_overall(_qs(dead_letters=1)) == "degraded"
def test_overall_degraded_peer_unhealthy():
ps = {**_PEER_SYNC_OFF, "degraded": 1}
assert StatusAPI._compute_overall(_qs(peer_sync=ps)) == "degraded"
def test_overall_error_takes_priority_over_degraded():
"""error > degraded when both conditions are true."""
assert (
StatusAPI._compute_overall(
_qs(save_worker_alive=False, retry_queue_size=5)
)
== "error"
)
# ---------------------------------------------------------------------------
# _build — structure and zone count
# ---------------------------------------------------------------------------
def test_build_structure(patch_connect):
api = _api()
result = api._build()
assert "status" in result
assert "queues" in result
assert "workers" in result
assert "reconciler" in result
assert "peer_sync" in result
assert "zones" in result
def test_build_zone_count_zero(patch_connect):
api = _api()
result = api._build()
assert result["zones"]["total"] == 0
def test_build_zone_count_with_domains(patch_connect):
for d in ["a.com", "b.com", "c.com"]:
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
patch_connect.commit()
api = _api()
result = api._build()
assert result["zones"]["total"] == 3
def test_build_queues_forwarded(patch_connect):
api = _api(_qs(save_queue_size=2, delete_queue_size=1, retry_queue_size=3, dead_letters=1))
result = api._build()
assert result["queues"]["save"] == 2
assert result["queues"]["delete"] == 1
assert result["queues"]["retry"] == 3
assert result["queues"]["dead_letters"] == 1
def test_build_workers_forwarded(patch_connect):
api = _api()
result = api._build()
assert result["workers"]["save"] is True
assert result["workers"]["delete"] is True
assert result["workers"]["retry_drain"] is True
# ---------------------------------------------------------------------------
# index — JSON encoding
# ---------------------------------------------------------------------------
def test_index_returns_valid_json(patch_connect):
api = _api()
with MagicMock() as mock_resp:
cherrypy.response = mock_resp
cherrypy.response.headers = {}
body = api.index()
data = json.loads(body)
assert data["status"] == "ok"
assert isinstance(data["zones"]["total"], int)