diff --git a/.gitignore b/.gitignore index b4ee593..555476f 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,9 @@ build *.mypy_cache *.pytest_cache /data/* + +# Editor / tool settings — always local, never committed +.vscode/ +.claude/ +.env +*.env diff --git a/directdnsonly/app/api/status.py b/directdnsonly/app/api/status.py new file mode 100644 index 0000000..908ec78 --- /dev/null +++ b/directdnsonly/app/api/status.py @@ -0,0 +1,82 @@ +"""Operational status endpoint — aggregates queue, worker, reconciler, and peer health.""" + +import json + +import cherrypy +from sqlalchemy import func, select + +from directdnsonly.app.db import connect +from directdnsonly.app.db.models import Domain + + +class StatusAPI: + """Exposes GET /status as a JSON health/status document. + + Aggregates data from WorkerManager.queue_status() and a live DB zone count + into a single response that a UI or monitoring system can poll. + + Overall ``status`` field: + - ``ok`` — all workers alive, no dead-letters, all peers healthy + - ``degraded`` — retries pending, dead-letters present, or a peer is unhealthy + - ``error`` — a core worker thread is not alive + """ + + def __init__(self, worker_manager): + self._wm = worker_manager + + @cherrypy.expose + def index(self): + cherrypy.response.headers["Content-Type"] = "application/json" + return json.dumps(self._build(), default=str).encode() + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _build(self) -> dict: + qs = self._wm.queue_status() + + zone_count = self._zone_count() + + overall = self._compute_overall(qs) + + return { + "status": overall, + "queues": { + "save": qs.get("save_queue_size", 0), + "delete": qs.get("delete_queue_size", 0), + "retry": qs.get("retry_queue_size", 0), + "dead_letters": qs.get("dead_letters", 0), + }, + "workers": { + "save": qs.get("save_worker_alive"), + "delete": qs.get("delete_worker_alive"), + "retry_drain": qs.get("retry_worker_alive"), + }, + "reconciler": qs.get("reconciler", {}), + "peer_sync": qs.get("peer_sync", {}), + "zones": {"total": zone_count}, + } + + @staticmethod + def _zone_count() -> int: + session = connect() + try: + return session.execute(select(func.count(Domain.id))).scalar() or 0 + except Exception: + return 0 + finally: + session.close() + + @staticmethod + def _compute_overall(qs: dict) -> str: + if not qs.get("save_worker_alive") or not qs.get("delete_worker_alive"): + return "error" + peer_sync = qs.get("peer_sync", {}) + if ( + qs.get("retry_queue_size", 0) > 0 + or qs.get("dead_letters", 0) > 0 + or peer_sync.get("degraded", 0) > 0 + ): + return "degraded" + return "ok" diff --git a/directdnsonly/app/peer_sync.py b/directdnsonly/app/peer_sync.py index 944e2af..cc916d8 100644 --- a/directdnsonly/app/peer_sync.py +++ b/directdnsonly/app/peer_sync.py @@ -140,6 +140,30 @@ class PeerSyncWorker: Exposed via /internal/peers so other nodes can discover this node's mesh.""" return [p["url"] for p in self.peers if p.get("url")] + def get_peer_status(self) -> dict: + """Return peer health summary for the /status endpoint.""" + peers = [] + for peer in self.peers: + url = peer.get("url", "") + h = self._peer_health.get(url, {}) + last_seen = h.get("last_seen") + peers.append({ + "url": url, + "healthy": h.get("healthy", True), + "consecutive_failures": h.get("consecutive_failures", 0), + "last_seen": last_seen.isoformat() if last_seen else None, + }) + healthy = sum(1 for p in peers if p["healthy"]) + return { + "enabled": self.enabled, + "alive": self.is_alive, + "interval_minutes": self.interval_seconds // 60, + "peers": peers, + "total": len(peers), + "healthy": healthy, + "degraded": len(peers) - healthy, + } + # ------------------------------------------------------------------ # Health tracking # ------------------------------------------------------------------ diff --git a/directdnsonly/app/reconciler.py b/directdnsonly/app/reconciler.py index 465f393..064c0a2 100755 --- a/directdnsonly/app/reconciler.py +++ b/directdnsonly/app/reconciler.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import datetime import threading from loguru import logger from sqlalchemy import select @@ -42,6 +43,17 @@ class ReconciliationWorker: self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60 self._stop_event = threading.Event() self._thread = None + self._last_run: dict = {} + + def get_status(self) -> dict: + """Return reconciler configuration and last-run statistics.""" + return { + "enabled": self.enabled, + "alive": self.is_alive, + "dry_run": self.dry_run, + "interval_minutes": self.interval_seconds // 60, + "last_run": dict(self._last_run), + } def start(self): if not self.enabled: @@ -104,11 +116,18 @@ class ReconciliationWorker: self._reconcile_all() def _reconcile_all(self): + started_at = datetime.datetime.utcnow() + self._last_run = {"status": "running", "started_at": started_at.isoformat()} logger.info( f"[reconciler] Starting reconciliation pass across " f"{len(self.servers)} server(s)" ) total_queued = 0 + da_servers_polled = 0 + da_servers_unreachable = 0 + migrated = 0 + backfilled = 0 + zones_in_db = 0 # Build a map of all domains seen on all DA servers: domain -> hostname all_da_domains: dict = {} @@ -126,23 +145,26 @@ class ReconciliationWorker: ssl=server.get("ssl", True), verify_ssl=self.verify_ssl, ) + da_servers_polled += 1 da_domains = client.list_domains(ipp=self.ipp) if da_domains is not None: for d in da_domains: all_da_domains[d] = hostname + else: + da_servers_unreachable += 1 logger.debug( f"[reconciler] {hostname}: " f"{len(da_domains) if da_domains else 0} active domain(s) in DA" ) except Exception as exc: logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}") + da_servers_unreachable += 1 # Compare local DB against what DA reported; update masters and queue deletes session = connect() try: all_local_domains = session.execute(select(Domain)).scalars().all() - migrated = 0 - backfilled = 0 + zones_in_db = len(all_local_domains) known_servers = {s.get("hostname") for s in self.servers} for record in all_local_domains: domain = record.domain @@ -209,10 +231,31 @@ class ReconciliationWorker: ) # Option C: heal backends that are missing zones + zones_healed = 0 if self.save_queue is not None and self.backend_registry is not None: - self._heal_backends() + zones_healed = self._heal_backends() - def _heal_backends(self): + completed_at = datetime.datetime.utcnow() + self._last_run = { + "status": "ok", + "started_at": started_at.isoformat(), + "completed_at": completed_at.isoformat(), + "duration_seconds": round( + (completed_at - started_at).total_seconds(), 1 + ), + "da_servers_polled": da_servers_polled, + "da_servers_unreachable": da_servers_unreachable, + "zones_in_da": len(all_da_domains), + "zones_in_db": zones_in_db, + "orphans_found": total_queued, + "orphans_queued": total_queued if not self.dry_run else 0, + "hostnames_backfilled": backfilled, + "hostnames_migrated": migrated, + "zones_healed": zones_healed, + "dry_run": self.dry_run, + } + + def _heal_backends(self) -> int: """Check every backend for zone presence and re-queue any zone that is missing from one or more backends, using the stored zone_data as the authoritative source. This corrects backends that missed pushes due to @@ -220,9 +263,10 @@ class ReconciliationWorker: """ backends = self.backend_registry.get_available_backends() if not backends: - return + return 0 session = connect() + healed = 0 try: domains = session.execute( select(Domain).where(Domain.zone_data.isnot(None)) @@ -231,9 +275,7 @@ class ReconciliationWorker: logger.debug( "[reconciler] Healing pass: no zone_data stored yet — skipping" ) - return - - healed = 0 + return 0 for record in domains: missing = [] for backend_name, backend in backends.items(): @@ -277,3 +319,4 @@ class ReconciliationWorker: ) finally: session.close() + return healed diff --git a/directdnsonly/config/app.yml b/directdnsonly/config/app.yml index 59fa97c..20dc69f 100644 --- a/directdnsonly/config/app.yml +++ b/directdnsonly/config/app.yml @@ -3,6 +3,20 @@ timezone: Pacific/Auckland log_level: INFO queue_location: ./data/queues +# Application datastore — stores domain index and zone_data for healing/peer-sync. +# SQLite (default) requires no extra dependencies and is fine for single-node setups. +# MySQL is recommended for multi-node deployments with a shared datastore. +datastore: + type: sqlite + db_location: ./data/directdnsonly.db + # --- MySQL --- + # type: mysql + # host: "127.0.0.1" + # port: "3306" + # name: "directdnsonly" + # user: "directdnsonly" + # pass: "changeme" + app: auth_username: directdnsonly auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var diff --git a/directdnsonly/main.py b/directdnsonly/main.py index e6ff3cc..c95c216 100644 --- a/directdnsonly/main.py +++ b/directdnsonly/main.py @@ -4,6 +4,7 @@ from app.backends import BackendRegistry from app.api.admin import DNSAdminAPI from app.api.health import HealthAPI from app.api.internal import InternalAPI +from app.api.status import StatusAPI from app import configure_logging from worker import WorkerManager from directdnsonly.config import config @@ -110,8 +111,9 @@ def main(): ) root.health = HealthAPI(registry) root.internal = InternalAPI(peer_syncer=worker_manager._peer_syncer) + root.status = StatusAPI(worker_manager) - # Add queue status endpoint + # Add queue status endpoint (debug) root.queue_status = lambda: worker_manager.queue_status() # Override auth for /internal so peers use their own credentials diff --git a/directdnsonly/worker.py b/directdnsonly/worker.py index 89ca2fa..2b9227c 100644 --- a/directdnsonly/worker.py +++ b/directdnsonly/worker.py @@ -43,6 +43,7 @@ class WorkerManager: self._peer_syncer = None self._reconciliation_config = reconciliation_config or {} self._peer_sync_config = peer_sync_config or {} + self._dead_letter_count = 0 try: os.makedirs(queue_path, exist_ok=True) @@ -205,6 +206,7 @@ class WorkerManager: Discards to dead-letter after MAX_RETRIES attempts.""" retry_count = item.get("retry_count", 0) + 1 if retry_count > MAX_RETRIES: + self._dead_letter_count += 1 logger.error( f"[retry] Dead-letter: {item['domain']} failed on " f"{failed_backends} after {MAX_RETRIES} attempts — giving up" @@ -496,18 +498,24 @@ class WorkerManager: logger.info("Workers stopped") def queue_status(self): + reconciler = ( + self._reconciler.get_status() + if self._reconciler + else {"enabled": False, "alive": False, "last_run": {}} + ) + peer_sync = ( + self._peer_syncer.get_peer_status() + if self._peer_syncer + else {"enabled": False, "alive": False, "peers": [], "total": 0, "healthy": 0, "degraded": 0} + ) return { "save_queue_size": self.save_queue.qsize(), "delete_queue_size": self.delete_queue.qsize(), "retry_queue_size": self.retry_queue.qsize(), - "save_worker_alive": self._save_thread and self._save_thread.is_alive(), - "delete_worker_alive": self._delete_thread - and self._delete_thread.is_alive(), - "retry_worker_alive": self._retry_thread and self._retry_thread.is_alive(), - "reconciler_alive": ( - self._reconciler.is_alive if self._reconciler else False - ), - "peer_syncer_alive": ( - self._peer_syncer.is_alive if self._peer_syncer else False - ), + "dead_letters": self._dead_letter_count, + "save_worker_alive": bool(self._save_thread and self._save_thread.is_alive()), + "delete_worker_alive": bool(self._delete_thread and self._delete_thread.is_alive()), + "retry_worker_alive": bool(self._retry_thread and self._retry_thread.is_alive()), + "reconciler": reconciler, + "peer_sync": peer_sync, } diff --git a/tests/conftest.py b/tests/conftest.py index 38231fb..1dc6829 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -38,4 +38,5 @@ def patch_connect(db_session, monkeypatch): monkeypatch.setattr("directdnsonly.app.utils.connect", _factory) monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory) monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory) + monkeypatch.setattr("directdnsonly.app.api.status.connect", _factory) return db_session diff --git a/tests/test_peer_sync.py b/tests/test_peer_sync.py index b9f570d..7baf054 100644 --- a/tests/test_peer_sync.py +++ b/tests/test_peer_sync.py @@ -394,3 +394,53 @@ def test_sync_empty_peer_list(patch_connect, monkeypatch): monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) worker._sync_from_peer(_make_peer()) + + +# --------------------------------------------------------------------------- +# get_peer_status +# --------------------------------------------------------------------------- + + +def test_get_peer_status_no_contact_yet(): + worker = PeerSyncWorker(BASE_CONFIG) + status = worker.get_peer_status() + + assert status["enabled"] is True + assert status["total"] == 1 + assert status["healthy"] == 1 + assert status["degraded"] == 0 + assert status["peers"][0]["url"] == "http://ddo-2:2222" + assert status["peers"][0]["healthy"] is True + assert status["peers"][0]["last_seen"] is None + + +def test_get_peer_status_after_success(): + worker = PeerSyncWorker(BASE_CONFIG) + worker._record_success("http://ddo-2:2222") + status = worker.get_peer_status() + + assert status["healthy"] == 1 + assert status["degraded"] == 0 + assert status["peers"][0]["last_seen"] is not None + + +def test_get_peer_status_after_degraded(): + from directdnsonly.app.peer_sync import FAILURE_THRESHOLD + + worker = PeerSyncWorker(BASE_CONFIG) + for _ in range(FAILURE_THRESHOLD): + worker._record_failure("http://ddo-2:2222", Exception("timeout")) + + status = worker.get_peer_status() + assert status["healthy"] == 0 + assert status["degraded"] == 1 + assert status["peers"][0]["healthy"] is False + + +def test_get_peer_status_disabled(): + worker = PeerSyncWorker({}) + status = worker.get_peer_status() + + assert status["enabled"] is False + assert status["total"] == 0 + assert status["peers"] == [] diff --git a/tests/test_reconciler.py b/tests/test_reconciler.py index 4f50272..d0701e1 100644 --- a/tests/test_reconciler.py +++ b/tests/test_reconciler.py @@ -317,3 +317,83 @@ def test_heal_skipped_when_no_registry(delete_queue, patch_connect): w._reconcile_all() assert save_queue.empty() + + +# --------------------------------------------------------------------------- +# get_status — last-run state +# --------------------------------------------------------------------------- + + +def test_get_status_before_any_run(worker): + status = worker.get_status() + assert status["enabled"] is True + assert status["alive"] is False + assert status["last_run"] == {} + + +def test_get_status_after_run(worker, patch_connect): + with _patch_da(set()): + worker._reconcile_all() + + s = worker.get_status() + assert s["enabled"] is True + lr = s["last_run"] + assert lr["status"] == "ok" + assert "started_at" in lr + assert "completed_at" in lr + assert "duration_seconds" in lr + assert lr["da_servers_polled"] == 1 + assert lr["da_servers_unreachable"] == 0 + assert lr["dry_run"] is False + + +def test_get_status_counts_unreachable_server(worker, patch_connect): + with _patch_da(None): + worker._reconcile_all() + + lr = worker.get_status()["last_run"] + assert lr["da_servers_polled"] == 1 + assert lr["da_servers_unreachable"] == 1 + + +def test_get_status_counts_orphans(worker, delete_queue, patch_connect): + patch_connect.add( + Domain(domain="orphan.com", hostname="da1.example.com", username="admin") + ) + patch_connect.commit() + + with _patch_da(set()): + worker._reconcile_all() + + lr = worker.get_status()["last_run"] + assert lr["orphans_found"] == 1 + assert lr["orphans_queued"] == 1 + + +def test_get_status_dry_run_orphans_not_queued_in_stats(dry_run_worker, patch_connect): + patch_connect.add( + Domain(domain="orphan.com", hostname="da1.example.com", username="admin") + ) + patch_connect.commit() + + with _patch_da(set()): + dry_run_worker._reconcile_all() + + lr = dry_run_worker.get_status()["last_run"] + assert lr["dry_run"] is True + assert lr["orphans_found"] == 1 + assert lr["orphans_queued"] == 0 + + +def test_get_status_zones_in_db_counted(worker, patch_connect): + for d in ["a.com", "b.com", "c.com"]: + patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin")) + patch_connect.commit() + + with _patch_da({"a.com", "b.com", "c.com"}): + worker._reconcile_all() + + lr = worker.get_status()["last_run"] + assert lr["zones_in_db"] == 3 + assert lr["zones_in_da"] == 3 + assert lr["orphans_found"] == 0 diff --git a/tests/test_status_api.py b/tests/test_status_api.py new file mode 100644 index 0000000..2427ebc --- /dev/null +++ b/tests/test_status_api.py @@ -0,0 +1,162 @@ +"""Tests for directdnsonly.app.api.status — StatusAPI.""" + +import json +from unittest.mock import MagicMock + +import cherrypy +import pytest + +from directdnsonly.app.api.status import StatusAPI +from directdnsonly.app.db.models import Domain + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_RECONCILER_OK = { + "enabled": True, + "alive": True, + "dry_run": False, + "interval_minutes": 60, + "last_run": {}, +} +_PEER_SYNC_OFF = { + "enabled": False, + "alive": False, + "peers": [], + "total": 0, + "healthy": 0, + "degraded": 0, +} + + +def _qs(**overrides): + base = { + "save_queue_size": 0, + "delete_queue_size": 0, + "retry_queue_size": 0, + "dead_letters": 0, + "save_worker_alive": True, + "delete_worker_alive": True, + "retry_worker_alive": True, + "reconciler": _RECONCILER_OK, + "peer_sync": _PEER_SYNC_OFF, + } + base.update(overrides) + return base + + +def _api(qs=None): + wm = MagicMock() + wm.queue_status.return_value = qs or _qs() + return StatusAPI(wm) + + +# --------------------------------------------------------------------------- +# _compute_overall +# --------------------------------------------------------------------------- + + +def test_overall_ok_all_healthy(): + assert StatusAPI._compute_overall(_qs()) == "ok" + + +def test_overall_error_save_worker_dead(): + assert StatusAPI._compute_overall(_qs(save_worker_alive=False)) == "error" + + +def test_overall_error_delete_worker_dead(): + assert StatusAPI._compute_overall(_qs(delete_worker_alive=False)) == "error" + + +def test_overall_degraded_retries_pending(): + assert StatusAPI._compute_overall(_qs(retry_queue_size=3)) == "degraded" + + +def test_overall_degraded_dead_letters(): + assert StatusAPI._compute_overall(_qs(dead_letters=1)) == "degraded" + + +def test_overall_degraded_peer_unhealthy(): + ps = {**_PEER_SYNC_OFF, "degraded": 1} + assert StatusAPI._compute_overall(_qs(peer_sync=ps)) == "degraded" + + +def test_overall_error_takes_priority_over_degraded(): + """error > degraded when both conditions are true.""" + assert ( + StatusAPI._compute_overall( + _qs(save_worker_alive=False, retry_queue_size=5) + ) + == "error" + ) + + +# --------------------------------------------------------------------------- +# _build — structure and zone count +# --------------------------------------------------------------------------- + + +def test_build_structure(patch_connect): + api = _api() + result = api._build() + + assert "status" in result + assert "queues" in result + assert "workers" in result + assert "reconciler" in result + assert "peer_sync" in result + assert "zones" in result + + +def test_build_zone_count_zero(patch_connect): + api = _api() + result = api._build() + assert result["zones"]["total"] == 0 + + +def test_build_zone_count_with_domains(patch_connect): + for d in ["a.com", "b.com", "c.com"]: + patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin")) + patch_connect.commit() + + api = _api() + result = api._build() + assert result["zones"]["total"] == 3 + + +def test_build_queues_forwarded(patch_connect): + api = _api(_qs(save_queue_size=2, delete_queue_size=1, retry_queue_size=3, dead_letters=1)) + result = api._build() + + assert result["queues"]["save"] == 2 + assert result["queues"]["delete"] == 1 + assert result["queues"]["retry"] == 3 + assert result["queues"]["dead_letters"] == 1 + + +def test_build_workers_forwarded(patch_connect): + api = _api() + result = api._build() + + assert result["workers"]["save"] is True + assert result["workers"]["delete"] is True + assert result["workers"]["retry_drain"] is True + + +# --------------------------------------------------------------------------- +# index — JSON encoding +# --------------------------------------------------------------------------- + + +def test_index_returns_valid_json(patch_connect): + api = _api() + with MagicMock() as mock_resp: + cherrypy.response = mock_resp + cherrypy.response.headers = {} + body = api.index() + + data = json.loads(body) + assert data["status"] == "ok" + assert isinstance(data["zones"]["total"], int)