feat: operational status endpoint + reconciler/peer state tracking 📊

- ReconciliationWorker._last_run stores per-pass stats (da_servers_polled,
  zones_in_da/db, orphans_found/queued, hostnames_backfilled/migrated,
  zones_healed, duration_seconds, dry_run flag)
- ReconciliationWorker.get_status() exposes state for API/UI consumption
- _heal_backends() now returns healed count
- PeerSyncWorker.get_peer_status() serialises _peer_health to JSON-safe dict
  (url, healthy, consecutive_failures, last_seen) with summary totals
- WorkerManager tracks dead-letter count; queue_status() now returns nested
  reconciler/peer_sync dicts replacing flat reconciler_alive/peer_syncer_alive
- New GET /status endpoint (StatusAPI) aggregates queue depths, worker liveness,
  reconciler last-run, peer health, and live zone count; computes ok/degraded/error
- .gitignore: exclude .claude/, .vscode/, .env (always local)
- app.yml: add documented datastore section (SQLite default + MySQL commented)
- 164 tests passing (23 new tests added)
This commit is contained in:
2026-02-25 18:51:56 +13:00
parent 0f417da204
commit db60d808de
11 changed files with 491 additions and 19 deletions

6
.gitignore vendored
View File

@@ -26,3 +26,9 @@ build
*.mypy_cache *.mypy_cache
*.pytest_cache *.pytest_cache
/data/* /data/*
# Editor / tool settings — always local, never committed
.vscode/
.claude/
.env
*.env

View File

@@ -0,0 +1,82 @@
"""Operational status endpoint — aggregates queue, worker, reconciler, and peer health."""
import json
import cherrypy
from sqlalchemy import func, select
from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain
class StatusAPI:
"""Exposes GET /status as a JSON health/status document.
Aggregates data from WorkerManager.queue_status() and a live DB zone count
into a single response that a UI or monitoring system can poll.
Overall ``status`` field:
- ``ok`` — all workers alive, no dead-letters, all peers healthy
- ``degraded`` — retries pending, dead-letters present, or a peer is unhealthy
- ``error`` — a core worker thread is not alive
"""
def __init__(self, worker_manager):
self._wm = worker_manager
@cherrypy.expose
def index(self):
cherrypy.response.headers["Content-Type"] = "application/json"
return json.dumps(self._build(), default=str).encode()
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _build(self) -> dict:
qs = self._wm.queue_status()
zone_count = self._zone_count()
overall = self._compute_overall(qs)
return {
"status": overall,
"queues": {
"save": qs.get("save_queue_size", 0),
"delete": qs.get("delete_queue_size", 0),
"retry": qs.get("retry_queue_size", 0),
"dead_letters": qs.get("dead_letters", 0),
},
"workers": {
"save": qs.get("save_worker_alive"),
"delete": qs.get("delete_worker_alive"),
"retry_drain": qs.get("retry_worker_alive"),
},
"reconciler": qs.get("reconciler", {}),
"peer_sync": qs.get("peer_sync", {}),
"zones": {"total": zone_count},
}
@staticmethod
def _zone_count() -> int:
session = connect()
try:
return session.execute(select(func.count(Domain.id))).scalar() or 0
except Exception:
return 0
finally:
session.close()
@staticmethod
def _compute_overall(qs: dict) -> str:
if not qs.get("save_worker_alive") or not qs.get("delete_worker_alive"):
return "error"
peer_sync = qs.get("peer_sync", {})
if (
qs.get("retry_queue_size", 0) > 0
or qs.get("dead_letters", 0) > 0
or peer_sync.get("degraded", 0) > 0
):
return "degraded"
return "ok"

View File

@@ -140,6 +140,30 @@ class PeerSyncWorker:
Exposed via /internal/peers so other nodes can discover this node's mesh.""" Exposed via /internal/peers so other nodes can discover this node's mesh."""
return [p["url"] for p in self.peers if p.get("url")] return [p["url"] for p in self.peers if p.get("url")]
def get_peer_status(self) -> dict:
"""Return peer health summary for the /status endpoint."""
peers = []
for peer in self.peers:
url = peer.get("url", "")
h = self._peer_health.get(url, {})
last_seen = h.get("last_seen")
peers.append({
"url": url,
"healthy": h.get("healthy", True),
"consecutive_failures": h.get("consecutive_failures", 0),
"last_seen": last_seen.isoformat() if last_seen else None,
})
healthy = sum(1 for p in peers if p["healthy"])
return {
"enabled": self.enabled,
"alive": self.is_alive,
"interval_minutes": self.interval_seconds // 60,
"peers": peers,
"total": len(peers),
"healthy": healthy,
"degraded": len(peers) - healthy,
}
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Health tracking # Health tracking
# ------------------------------------------------------------------ # ------------------------------------------------------------------

View File

@@ -1,4 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import datetime
import threading import threading
from loguru import logger from loguru import logger
from sqlalchemy import select from sqlalchemy import select
@@ -42,6 +43,17 @@ class ReconciliationWorker:
self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60 self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._thread = None self._thread = None
self._last_run: dict = {}
def get_status(self) -> dict:
"""Return reconciler configuration and last-run statistics."""
return {
"enabled": self.enabled,
"alive": self.is_alive,
"dry_run": self.dry_run,
"interval_minutes": self.interval_seconds // 60,
"last_run": dict(self._last_run),
}
def start(self): def start(self):
if not self.enabled: if not self.enabled:
@@ -104,11 +116,18 @@ class ReconciliationWorker:
self._reconcile_all() self._reconcile_all()
def _reconcile_all(self): def _reconcile_all(self):
started_at = datetime.datetime.utcnow()
self._last_run = {"status": "running", "started_at": started_at.isoformat()}
logger.info( logger.info(
f"[reconciler] Starting reconciliation pass across " f"[reconciler] Starting reconciliation pass across "
f"{len(self.servers)} server(s)" f"{len(self.servers)} server(s)"
) )
total_queued = 0 total_queued = 0
da_servers_polled = 0
da_servers_unreachable = 0
migrated = 0
backfilled = 0
zones_in_db = 0
# Build a map of all domains seen on all DA servers: domain -> hostname # Build a map of all domains seen on all DA servers: domain -> hostname
all_da_domains: dict = {} all_da_domains: dict = {}
@@ -126,23 +145,26 @@ class ReconciliationWorker:
ssl=server.get("ssl", True), ssl=server.get("ssl", True),
verify_ssl=self.verify_ssl, verify_ssl=self.verify_ssl,
) )
da_servers_polled += 1
da_domains = client.list_domains(ipp=self.ipp) da_domains = client.list_domains(ipp=self.ipp)
if da_domains is not None: if da_domains is not None:
for d in da_domains: for d in da_domains:
all_da_domains[d] = hostname all_da_domains[d] = hostname
else:
da_servers_unreachable += 1
logger.debug( logger.debug(
f"[reconciler] {hostname}: " f"[reconciler] {hostname}: "
f"{len(da_domains) if da_domains else 0} active domain(s) in DA" f"{len(da_domains) if da_domains else 0} active domain(s) in DA"
) )
except Exception as exc: except Exception as exc:
logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}") logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}")
da_servers_unreachable += 1
# Compare local DB against what DA reported; update masters and queue deletes # Compare local DB against what DA reported; update masters and queue deletes
session = connect() session = connect()
try: try:
all_local_domains = session.execute(select(Domain)).scalars().all() all_local_domains = session.execute(select(Domain)).scalars().all()
migrated = 0 zones_in_db = len(all_local_domains)
backfilled = 0
known_servers = {s.get("hostname") for s in self.servers} known_servers = {s.get("hostname") for s in self.servers}
for record in all_local_domains: for record in all_local_domains:
domain = record.domain domain = record.domain
@@ -209,10 +231,31 @@ class ReconciliationWorker:
) )
# Option C: heal backends that are missing zones # Option C: heal backends that are missing zones
zones_healed = 0
if self.save_queue is not None and self.backend_registry is not None: if self.save_queue is not None and self.backend_registry is not None:
self._heal_backends() zones_healed = self._heal_backends()
def _heal_backends(self): completed_at = datetime.datetime.utcnow()
self._last_run = {
"status": "ok",
"started_at": started_at.isoformat(),
"completed_at": completed_at.isoformat(),
"duration_seconds": round(
(completed_at - started_at).total_seconds(), 1
),
"da_servers_polled": da_servers_polled,
"da_servers_unreachable": da_servers_unreachable,
"zones_in_da": len(all_da_domains),
"zones_in_db": zones_in_db,
"orphans_found": total_queued,
"orphans_queued": total_queued if not self.dry_run else 0,
"hostnames_backfilled": backfilled,
"hostnames_migrated": migrated,
"zones_healed": zones_healed,
"dry_run": self.dry_run,
}
def _heal_backends(self) -> int:
"""Check every backend for zone presence and re-queue any zone that is """Check every backend for zone presence and re-queue any zone that is
missing from one or more backends, using the stored zone_data as the missing from one or more backends, using the stored zone_data as the
authoritative source. This corrects backends that missed pushes due to authoritative source. This corrects backends that missed pushes due to
@@ -220,9 +263,10 @@ class ReconciliationWorker:
""" """
backends = self.backend_registry.get_available_backends() backends = self.backend_registry.get_available_backends()
if not backends: if not backends:
return return 0
session = connect() session = connect()
healed = 0
try: try:
domains = session.execute( domains = session.execute(
select(Domain).where(Domain.zone_data.isnot(None)) select(Domain).where(Domain.zone_data.isnot(None))
@@ -231,9 +275,7 @@ class ReconciliationWorker:
logger.debug( logger.debug(
"[reconciler] Healing pass: no zone_data stored yet — skipping" "[reconciler] Healing pass: no zone_data stored yet — skipping"
) )
return return 0
healed = 0
for record in domains: for record in domains:
missing = [] missing = []
for backend_name, backend in backends.items(): for backend_name, backend in backends.items():
@@ -277,3 +319,4 @@ class ReconciliationWorker:
) )
finally: finally:
session.close() session.close()
return healed

View File

@@ -3,6 +3,20 @@ timezone: Pacific/Auckland
log_level: INFO log_level: INFO
queue_location: ./data/queues queue_location: ./data/queues
# Application datastore — stores domain index and zone_data for healing/peer-sync.
# SQLite (default) requires no extra dependencies and is fine for single-node setups.
# MySQL is recommended for multi-node deployments with a shared datastore.
datastore:
type: sqlite
db_location: ./data/directdnsonly.db
# --- MySQL ---
# type: mysql
# host: "127.0.0.1"
# port: "3306"
# name: "directdnsonly"
# user: "directdnsonly"
# pass: "changeme"
app: app:
auth_username: directdnsonly auth_username: directdnsonly
auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var

View File

@@ -4,6 +4,7 @@ from app.backends import BackendRegistry
from app.api.admin import DNSAdminAPI from app.api.admin import DNSAdminAPI
from app.api.health import HealthAPI from app.api.health import HealthAPI
from app.api.internal import InternalAPI from app.api.internal import InternalAPI
from app.api.status import StatusAPI
from app import configure_logging from app import configure_logging
from worker import WorkerManager from worker import WorkerManager
from directdnsonly.config import config from directdnsonly.config import config
@@ -110,8 +111,9 @@ def main():
) )
root.health = HealthAPI(registry) root.health = HealthAPI(registry)
root.internal = InternalAPI(peer_syncer=worker_manager._peer_syncer) root.internal = InternalAPI(peer_syncer=worker_manager._peer_syncer)
root.status = StatusAPI(worker_manager)
# Add queue status endpoint # Add queue status endpoint (debug)
root.queue_status = lambda: worker_manager.queue_status() root.queue_status = lambda: worker_manager.queue_status()
# Override auth for /internal so peers use their own credentials # Override auth for /internal so peers use their own credentials

View File

@@ -43,6 +43,7 @@ class WorkerManager:
self._peer_syncer = None self._peer_syncer = None
self._reconciliation_config = reconciliation_config or {} self._reconciliation_config = reconciliation_config or {}
self._peer_sync_config = peer_sync_config or {} self._peer_sync_config = peer_sync_config or {}
self._dead_letter_count = 0
try: try:
os.makedirs(queue_path, exist_ok=True) os.makedirs(queue_path, exist_ok=True)
@@ -205,6 +206,7 @@ class WorkerManager:
Discards to dead-letter after MAX_RETRIES attempts.""" Discards to dead-letter after MAX_RETRIES attempts."""
retry_count = item.get("retry_count", 0) + 1 retry_count = item.get("retry_count", 0) + 1
if retry_count > MAX_RETRIES: if retry_count > MAX_RETRIES:
self._dead_letter_count += 1
logger.error( logger.error(
f"[retry] Dead-letter: {item['domain']} failed on " f"[retry] Dead-letter: {item['domain']} failed on "
f"{failed_backends} after {MAX_RETRIES} attempts — giving up" f"{failed_backends} after {MAX_RETRIES} attempts — giving up"
@@ -496,18 +498,24 @@ class WorkerManager:
logger.info("Workers stopped") logger.info("Workers stopped")
def queue_status(self): def queue_status(self):
reconciler = (
self._reconciler.get_status()
if self._reconciler
else {"enabled": False, "alive": False, "last_run": {}}
)
peer_sync = (
self._peer_syncer.get_peer_status()
if self._peer_syncer
else {"enabled": False, "alive": False, "peers": [], "total": 0, "healthy": 0, "degraded": 0}
)
return { return {
"save_queue_size": self.save_queue.qsize(), "save_queue_size": self.save_queue.qsize(),
"delete_queue_size": self.delete_queue.qsize(), "delete_queue_size": self.delete_queue.qsize(),
"retry_queue_size": self.retry_queue.qsize(), "retry_queue_size": self.retry_queue.qsize(),
"save_worker_alive": self._save_thread and self._save_thread.is_alive(), "dead_letters": self._dead_letter_count,
"delete_worker_alive": self._delete_thread "save_worker_alive": bool(self._save_thread and self._save_thread.is_alive()),
and self._delete_thread.is_alive(), "delete_worker_alive": bool(self._delete_thread and self._delete_thread.is_alive()),
"retry_worker_alive": self._retry_thread and self._retry_thread.is_alive(), "retry_worker_alive": bool(self._retry_thread and self._retry_thread.is_alive()),
"reconciler_alive": ( "reconciler": reconciler,
self._reconciler.is_alive if self._reconciler else False "peer_sync": peer_sync,
),
"peer_syncer_alive": (
self._peer_syncer.is_alive if self._peer_syncer else False
),
} }

View File

@@ -38,4 +38,5 @@ def patch_connect(db_session, monkeypatch):
monkeypatch.setattr("directdnsonly.app.utils.connect", _factory) monkeypatch.setattr("directdnsonly.app.utils.connect", _factory)
monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory) monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory)
monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory) monkeypatch.setattr("directdnsonly.app.peer_sync.connect", _factory)
monkeypatch.setattr("directdnsonly.app.api.status.connect", _factory)
return db_session return db_session

View File

@@ -394,3 +394,53 @@ def test_sync_empty_peer_list(patch_connect, monkeypatch):
monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get) monkeypatch.setattr("directdnsonly.app.peer_sync.requests.get", mock_get)
worker._sync_from_peer(_make_peer()) worker._sync_from_peer(_make_peer())
# ---------------------------------------------------------------------------
# get_peer_status
# ---------------------------------------------------------------------------
def test_get_peer_status_no_contact_yet():
worker = PeerSyncWorker(BASE_CONFIG)
status = worker.get_peer_status()
assert status["enabled"] is True
assert status["total"] == 1
assert status["healthy"] == 1
assert status["degraded"] == 0
assert status["peers"][0]["url"] == "http://ddo-2:2222"
assert status["peers"][0]["healthy"] is True
assert status["peers"][0]["last_seen"] is None
def test_get_peer_status_after_success():
worker = PeerSyncWorker(BASE_CONFIG)
worker._record_success("http://ddo-2:2222")
status = worker.get_peer_status()
assert status["healthy"] == 1
assert status["degraded"] == 0
assert status["peers"][0]["last_seen"] is not None
def test_get_peer_status_after_degraded():
from directdnsonly.app.peer_sync import FAILURE_THRESHOLD
worker = PeerSyncWorker(BASE_CONFIG)
for _ in range(FAILURE_THRESHOLD):
worker._record_failure("http://ddo-2:2222", Exception("timeout"))
status = worker.get_peer_status()
assert status["healthy"] == 0
assert status["degraded"] == 1
assert status["peers"][0]["healthy"] is False
def test_get_peer_status_disabled():
worker = PeerSyncWorker({})
status = worker.get_peer_status()
assert status["enabled"] is False
assert status["total"] == 0
assert status["peers"] == []

View File

@@ -317,3 +317,83 @@ def test_heal_skipped_when_no_registry(delete_queue, patch_connect):
w._reconcile_all() w._reconcile_all()
assert save_queue.empty() assert save_queue.empty()
# ---------------------------------------------------------------------------
# get_status — last-run state
# ---------------------------------------------------------------------------
def test_get_status_before_any_run(worker):
status = worker.get_status()
assert status["enabled"] is True
assert status["alive"] is False
assert status["last_run"] == {}
def test_get_status_after_run(worker, patch_connect):
with _patch_da(set()):
worker._reconcile_all()
s = worker.get_status()
assert s["enabled"] is True
lr = s["last_run"]
assert lr["status"] == "ok"
assert "started_at" in lr
assert "completed_at" in lr
assert "duration_seconds" in lr
assert lr["da_servers_polled"] == 1
assert lr["da_servers_unreachable"] == 0
assert lr["dry_run"] is False
def test_get_status_counts_unreachable_server(worker, patch_connect):
with _patch_da(None):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["da_servers_polled"] == 1
assert lr["da_servers_unreachable"] == 1
def test_get_status_counts_orphans(worker, delete_queue, patch_connect):
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with _patch_da(set()):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["orphans_found"] == 1
assert lr["orphans_queued"] == 1
def test_get_status_dry_run_orphans_not_queued_in_stats(dry_run_worker, patch_connect):
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with _patch_da(set()):
dry_run_worker._reconcile_all()
lr = dry_run_worker.get_status()["last_run"]
assert lr["dry_run"] is True
assert lr["orphans_found"] == 1
assert lr["orphans_queued"] == 0
def test_get_status_zones_in_db_counted(worker, patch_connect):
for d in ["a.com", "b.com", "c.com"]:
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
patch_connect.commit()
with _patch_da({"a.com", "b.com", "c.com"}):
worker._reconcile_all()
lr = worker.get_status()["last_run"]
assert lr["zones_in_db"] == 3
assert lr["zones_in_da"] == 3
assert lr["orphans_found"] == 0

162
tests/test_status_api.py Normal file
View File

@@ -0,0 +1,162 @@
"""Tests for directdnsonly.app.api.status — StatusAPI."""
import json
from unittest.mock import MagicMock
import cherrypy
import pytest
from directdnsonly.app.api.status import StatusAPI
from directdnsonly.app.db.models import Domain
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_RECONCILER_OK = {
"enabled": True,
"alive": True,
"dry_run": False,
"interval_minutes": 60,
"last_run": {},
}
_PEER_SYNC_OFF = {
"enabled": False,
"alive": False,
"peers": [],
"total": 0,
"healthy": 0,
"degraded": 0,
}
def _qs(**overrides):
base = {
"save_queue_size": 0,
"delete_queue_size": 0,
"retry_queue_size": 0,
"dead_letters": 0,
"save_worker_alive": True,
"delete_worker_alive": True,
"retry_worker_alive": True,
"reconciler": _RECONCILER_OK,
"peer_sync": _PEER_SYNC_OFF,
}
base.update(overrides)
return base
def _api(qs=None):
wm = MagicMock()
wm.queue_status.return_value = qs or _qs()
return StatusAPI(wm)
# ---------------------------------------------------------------------------
# _compute_overall
# ---------------------------------------------------------------------------
def test_overall_ok_all_healthy():
assert StatusAPI._compute_overall(_qs()) == "ok"
def test_overall_error_save_worker_dead():
assert StatusAPI._compute_overall(_qs(save_worker_alive=False)) == "error"
def test_overall_error_delete_worker_dead():
assert StatusAPI._compute_overall(_qs(delete_worker_alive=False)) == "error"
def test_overall_degraded_retries_pending():
assert StatusAPI._compute_overall(_qs(retry_queue_size=3)) == "degraded"
def test_overall_degraded_dead_letters():
assert StatusAPI._compute_overall(_qs(dead_letters=1)) == "degraded"
def test_overall_degraded_peer_unhealthy():
ps = {**_PEER_SYNC_OFF, "degraded": 1}
assert StatusAPI._compute_overall(_qs(peer_sync=ps)) == "degraded"
def test_overall_error_takes_priority_over_degraded():
"""error > degraded when both conditions are true."""
assert (
StatusAPI._compute_overall(
_qs(save_worker_alive=False, retry_queue_size=5)
)
== "error"
)
# ---------------------------------------------------------------------------
# _build — structure and zone count
# ---------------------------------------------------------------------------
def test_build_structure(patch_connect):
api = _api()
result = api._build()
assert "status" in result
assert "queues" in result
assert "workers" in result
assert "reconciler" in result
assert "peer_sync" in result
assert "zones" in result
def test_build_zone_count_zero(patch_connect):
api = _api()
result = api._build()
assert result["zones"]["total"] == 0
def test_build_zone_count_with_domains(patch_connect):
for d in ["a.com", "b.com", "c.com"]:
patch_connect.add(Domain(domain=d, hostname="da1.example.com", username="admin"))
patch_connect.commit()
api = _api()
result = api._build()
assert result["zones"]["total"] == 3
def test_build_queues_forwarded(patch_connect):
api = _api(_qs(save_queue_size=2, delete_queue_size=1, retry_queue_size=3, dead_letters=1))
result = api._build()
assert result["queues"]["save"] == 2
assert result["queues"]["delete"] == 1
assert result["queues"]["retry"] == 3
assert result["queues"]["dead_letters"] == 1
def test_build_workers_forwarded(patch_connect):
api = _api()
result = api._build()
assert result["workers"]["save"] is True
assert result["workers"]["delete"] is True
assert result["workers"]["retry_drain"] is True
# ---------------------------------------------------------------------------
# index — JSON encoding
# ---------------------------------------------------------------------------
def test_index_returns_valid_json(patch_connect):
api = _api()
with MagicMock() as mock_resp:
cherrypy.response = mock_resp
cherrypy.response.headers = {}
body = api.index()
data = json.loads(body)
assert data["status"] == "ok"
assert isinstance(data["zones"]["total"], int)