feat: retry queue, backend healing, and zone_data persistence 🔁

- worker.py: third persistent retry queue with exponential backoff (30s→30m,
  max 5 attempts); failed backends tracked per-item so retries target only the
  failing nodes; zone_data stored in DB after every successful write
- Domain model: zone_data TEXT + zone_updated_at DATETIME columns; additive
  migration applied on startup so existing deployments upgrade in place
- ReconciliationWorker: Option C healing pass — checks every configured backend
  for zone presence after each reconciliation cycle and re-queues any zone
  missing from a backend using stored zone_data, enabling automatic recovery
  from prolonged backend outages without waiting for DirectAdmin to re-push
- 82 tests, all passing
This commit is contained in:
2026-02-19 14:05:22 +13:00
parent 0e044b7dc2
commit b523b17f30
7 changed files with 476 additions and 179 deletions

View File

@@ -28,6 +28,13 @@ DirectAdmin Multi-Server
**Each instance is completely independent** — no shared state, no cross-talk. Redundancy comes from DA pushing to both. If one container goes down, DA continues to push to the other. **Each instance is completely independent** — no shared state, no cross-talk. Redundancy comes from DA pushing to both. If one container goes down, DA continues to push to the other.
> **DNS consistency note:** DirectAdmin pushes to each Extra DNS server sequentially, not atomically. Two brief consistency windows exist:
>
> - **Transient gap** — between the first and second push completing, the two instances will return different answers. This is typically sub-second and resolves on its own.
> - **Permanent drift** — if the push to one instance fails permanently (network outage, container down), that instance will serve stale or missing zone data until DA retries or the zone is updated again. The built-in reconciliation poller detects *orphaned zones* (present in our DB but deleted from DA) but does **not** compare zone content between instances.
>
> For workloads where split-brain DNS is unacceptable, use Topology B (single write path → multiple MySQL replicas) instead.
#### `config/app.yml` — instance 1 #### `config/app.yml` — instance 1
```yaml ```yaml
@@ -134,6 +141,30 @@ Adding a third data centre is a single stanza in the config — no code changes
--- ---
## CoreDNS MySQL Backend — Required Fork
The `coredns_mysql` backend writes zones to a MySQL database that CoreDNS reads
at query time. **Vanilla CoreDNS with a stock MySQL plugin is not sufficient**
out of the box it does not act as a fully authoritative server, does not return
NS records in the additional section, does not set the AA flag, and does not
handle wildcard records.
This project is designed to work with a patched fork that resolves all of those
issues:
**[cybercinch/coredns_mysql_extend](https://github.com/cybercinch/coredns_mysql_extend)**
Key differences from the upstream plugin:
- Fully authoritative responses — correct AA flag and NXDOMAIN on misses
- Wildcard record support (`*` entries served correctly)
- NS records returned in the additional section
Use the BIND backend if you want a zero-dependency setup with no custom CoreDNS
build required.
---
## Features ## Features
- Multi-backend DNS management (BIND, CoreDNS MySQL) - Multi-backend DNS management (BIND, CoreDNS MySQL)
- Parallel backend dispatch — all enabled backends updated simultaneously - Parallel backend dispatch — all enabled backends updated simultaneously

View File

@@ -1,13 +1,32 @@
from sqlalchemy import create_engine from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from vyper import v from vyper import v
from loguru import logger
import datetime import datetime
Base = declarative_base() Base = declarative_base()
def _migrate(engine):
"""Apply additive schema migrations for columns added after initial release."""
migrations = [
("domains", "zone_data", "ALTER TABLE domains ADD COLUMN zone_data TEXT"),
("domains", "zone_updated_at", "ALTER TABLE domains ADD COLUMN zone_updated_at DATETIME"),
]
with engine.connect() as conn:
for table, column, ddl in migrations:
try:
conn.execute(text(f"SELECT {column} FROM {table} LIMIT 1"))
except Exception:
try:
conn.execute(text(ddl))
logger.info(f"[db] Migration applied: added {table}.{column}")
except Exception as exc:
logger.warning(f"[db] Migration skipped ({table}.{column}): {exc}")
def connect(dbtype="sqlite", **kwargs): def connect(dbtype="sqlite", **kwargs):
if dbtype == "sqlite": if dbtype == "sqlite":
# Start SQLite engine # Start SQLite engine
@@ -19,6 +38,7 @@ def connect(dbtype="sqlite", **kwargs):
"sqlite:///" + db_location, connect_args={"check_same_thread": False} "sqlite:///" + db_location, connect_args={"check_same_thread": False}
) )
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
_migrate(engine)
return sessionmaker(bind=engine)() return sessionmaker(bind=engine)()
elif dbtype == "mysql": elif dbtype == "mysql":
# Start a MySQL engine # Start a MySQL engine
@@ -50,6 +70,7 @@ def connect(dbtype="sqlite", **kwargs):
+ db_name + db_name
) )
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
_migrate(engine)
return sessionmaker(bind=engine)() return sessionmaker(bind=engine)()
else: else:
raise Exception("Unknown/unimplemented database type: {}".format(dbtype)) raise Exception("Unknown/unimplemented database type: {}".format(dbtype))

View File

@@ -1,5 +1,5 @@
from directdnsonly.app.db import Base from directdnsonly.app.db import Base
from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Column, Integer, String, DateTime, Text
class Key(Base): class Key(Base):
@@ -25,6 +25,8 @@ class Domain(Base):
domain = Column(String(255), unique=True) domain = Column(String(255), unique=True)
hostname = Column(String(255)) hostname = Column(String(255))
username = Column(String(255)) username = Column(String(255))
zone_data = Column(Text, nullable=True) # last known zone file from DA
zone_updated_at = Column(DateTime, nullable=True) # when zone_data was last stored
def __repr__(self): def __repr__(self):
return "<Domain(id='%s', domain='%s', hostname='%s', username='%s')>" % ( return "<Domain(id='%s', domain='%s', hostname='%s', username='%s')>" % (

View File

@@ -11,6 +11,10 @@ class ReconciliationWorker:
"""Periodically polls configured DirectAdmin servers and queues deletes """Periodically polls configured DirectAdmin servers and queues deletes
for any zones in our DB that no longer exist in DirectAdmin. for any zones in our DB that no longer exist in DirectAdmin.
Also runs an Option C backend healing pass: for each zone with stored
zone_data, checks every backend for presence and re-queues any that are
missing (e.g. after a prolonged backend outage).
Safety rules: Safety rules:
- If a DA server is unreachable, skip it entirely — never delete on uncertainty - If a DA server is unreachable, skip it entirely — never delete on uncertainty
- Only touches domains registered via DaDNS (present in our `domains` table) - Only touches domains registered via DaDNS (present in our `domains` table)
@@ -18,8 +22,16 @@ class ReconciliationWorker:
- Pushes to the existing delete_queue so the full delete path is exercised - Pushes to the existing delete_queue so the full delete path is exercised
""" """
def __init__(self, delete_queue, reconciliation_config: dict): def __init__(
self,
delete_queue,
reconciliation_config: dict,
save_queue=None,
backend_registry=None,
):
self.delete_queue = delete_queue self.delete_queue = delete_queue
self.save_queue = save_queue
self.backend_registry = backend_registry
self.enabled = reconciliation_config.get("enabled", False) self.enabled = reconciliation_config.get("enabled", False)
self.interval_seconds = reconciliation_config.get("interval_minutes", 60) * 60 self.interval_seconds = reconciliation_config.get("interval_minutes", 60) * 60
self.servers = reconciliation_config.get("directadmin_servers") or [] self.servers = reconciliation_config.get("directadmin_servers") or []
@@ -180,3 +192,73 @@ class ReconciliationWorker:
f"[reconciler] Reconciliation pass complete — " f"[reconciler] Reconciliation pass complete — "
f"{total_queued} domain(s) queued for deletion" f"{total_queued} domain(s) queued for deletion"
) )
# Option C: heal backends that are missing zones
if self.save_queue is not None and self.backend_registry is not None:
self._heal_backends()
def _heal_backends(self):
"""Check every backend for zone presence and re-queue any zone that is
missing from one or more backends, using the stored zone_data as the
authoritative source. This corrects backends that missed pushes due to
downtime without waiting for DirectAdmin to re-send the zone.
"""
backends = self.backend_registry.get_available_backends()
if not backends:
return
session = connect()
try:
domains = (
session.query(Domain)
.filter(Domain.zone_data.isnot(None))
.all()
)
if not domains:
logger.debug("[reconciler] Healing pass: no zone_data stored yet — skipping")
return
healed = 0
for record in domains:
missing = []
for backend_name, backend in backends.items():
try:
if not backend.zone_exists(record.domain):
missing.append(backend_name)
except Exception as exc:
logger.warning(
f"[reconciler] heal: zone_exists check failed for "
f"{record.domain} on {backend_name}: {exc}"
)
if missing:
mode = "[DRY-RUN] Would heal" if self.dry_run else "Healing"
logger.warning(
f"[reconciler] {mode}{record.domain} missing from "
f"{missing}; re-queuing with stored zone_data"
)
if not self.dry_run:
self.save_queue.put(
{
"domain": record.domain,
"hostname": record.hostname or "",
"username": record.username or "",
"zone_file": record.zone_data,
"failed_backends": missing,
"retry_count": 0,
"source": "reconciler_heal",
}
)
healed += 1
if healed:
logger.info(
f"[reconciler] Healing pass complete — "
f"{healed} zone(s) re-queued for backend recovery"
)
else:
logger.debug(
"[reconciler] Healing pass complete — all backends consistent"
)
finally:
session.close()

View File

@@ -1,3 +1,4 @@
import datetime
import os import os
import threading import threading
import time import time
@@ -12,6 +13,15 @@ from directdnsonly.app.db.models import Domain
from directdnsonly.app.db import connect from directdnsonly.app.db import connect
from directdnsonly.app.reconciler import ReconciliationWorker from directdnsonly.app.reconciler import ReconciliationWorker
# ---------------------------------------------------------------------------
# Retry configuration
# ---------------------------------------------------------------------------
MAX_RETRIES = 5
# Seconds to wait before each retry attempt (exponential-ish backoff)
BACKOFF_SECONDS = [30, 120, 300, 900, 1800] # 30s, 2m, 5m, 15m, 30m
RETRY_DRAIN_INTERVAL = 30 # how often the retry drain thread wakes
class WorkerManager: class WorkerManager:
def __init__( def __init__(
@@ -22,26 +32,28 @@ class WorkerManager:
self._running = False self._running = False
self._save_thread = None self._save_thread = None
self._delete_thread = None self._delete_thread = None
self._retry_thread = None
self._reconciler = None self._reconciler = None
self._reconciliation_config = reconciliation_config or {} self._reconciliation_config = reconciliation_config or {}
# Initialize queues with error handling
try: try:
os.makedirs(queue_path, exist_ok=True) os.makedirs(queue_path, exist_ok=True)
self.save_queue = Queue(f"{queue_path}/save") self.save_queue = Queue(f"{queue_path}/save")
self.delete_queue = Queue(f"{queue_path}/delete") self.delete_queue = Queue(f"{queue_path}/delete")
self.retry_queue = Queue(f"{queue_path}/retry")
logger.success(f"Initialized queues at {queue_path}") logger.success(f"Initialized queues at {queue_path}")
except Exception as e: except Exception as e:
logger.critical(f"Failed to initialize queues: {e}") logger.critical(f"Failed to initialize queues: {e}")
raise raise
# ------------------------------------------------------------------
# Save queue worker
# ------------------------------------------------------------------
def _process_save_queue(self): def _process_save_queue(self):
"""Main worker loop for processing save requests"""
logger.info("Save queue worker started") logger.info("Save queue worker started")
# Get DB Connection
session = connect() session = connect()
# Batch tracking
batch_start = None batch_start = None
batch_processed = 0 batch_processed = 0
batch_failed = 0 batch_failed = 0
@@ -50,58 +62,65 @@ class WorkerManager:
try: try:
item = self.save_queue.get(block=True, timeout=5) item = self.save_queue.get(block=True, timeout=5)
# Start a new batch timer on the first item
if batch_start is None: if batch_start is None:
batch_start = time.monotonic() batch_start = time.monotonic()
batch_processed = 0 batch_processed = 0
batch_failed = 0 batch_failed = 0
pending = self.save_queue.qsize() pending = self.save_queue.qsize()
logger.info( logger.info(
f"📥 Batch started — {pending + 1} zone(s) queued " f"📥 Batch started — {pending + 1} zone(s) queued for processing"
f"for processing"
) )
domain = item.get("domain", "unknown")
is_retry = item.get("source") in ("retry", "reconciler_heal")
target_backends = item.get("failed_backends") # None = all backends
logger.debug( logger.debug(
f"Processing zone update for {item.get('domain', 'unknown')}" f"Processing zone update for {domain}"
+ (f" [retry #{item.get('retry_count', 0)}]" if is_retry else "")
+ (f" [backends: {target_backends}]" if target_backends else "")
) )
if not check_zone_exists(item.get("domain")): if not is_retry and not check_zone_exists(domain):
put_zone_index( put_zone_index(
item.get("domain"), item.get("hostname"), item.get("username") domain, item.get("hostname"), item.get("username")
) )
# Validate item structure
if not all(k in item for k in ["domain", "zone_file"]): if not all(k in item for k in ["domain", "zone_file"]):
logger.error(f"Invalid queue item: {item}") logger.error(f"Invalid queue item: {item}")
self.save_queue.task_done() self.save_queue.task_done()
batch_failed += 1 batch_failed += 1
continue continue
# Process with all available backends
backends = self.backend_registry.get_available_backends() backends = self.backend_registry.get_available_backends()
if target_backends:
backends = {k: v for k, v in backends.items() if k in target_backends}
if not backends: if not backends:
logger.warning("No active backends available!") logger.warning("No target backends available for this item!")
self.save_queue.task_done()
batch_failed += 1
continue
if len(backends) > 1: if len(backends) > 1:
# Process backends in parallel for faster sync failed = self._process_backends_parallel(backends, item, session)
logger.debug(
f"Processing {item['domain']} across "
f"{len(backends)} backends concurrently: "
f"{', '.join(backends.keys())}"
)
self._process_backends_parallel(backends, item, session)
else: else:
# Single backend, no need for thread overhead failed = set()
for backend_name, backend in backends.items(): for backend_name, backend in backends.items():
self._process_single_backend( if not self._process_single_backend(backend_name, backend, item, session):
backend_name, backend, item, session failed.add(backend_name)
)
if failed:
self._schedule_retry(item, failed)
batch_failed += 1
else:
# Successful write — persist zone_data for Option C healing
self._store_zone_data(session, domain, item["zone_file"])
batch_processed += 1
self.save_queue.task_done() self.save_queue.task_done()
batch_processed += 1 logger.debug(f"Completed processing for {domain}")
logger.debug(f"Completed processing for {item['domain']}")
except Empty: except Empty:
# Queue is empty — if we were in a batch, log the summary
if batch_start is not None: if batch_start is not None:
elapsed = time.monotonic() - batch_start elapsed = time.monotonic() - batch_start
total = batch_processed + batch_failed total = batch_processed + batch_failed
@@ -119,35 +138,144 @@ class WorkerManager:
except Exception as e: except Exception as e:
logger.error(f"Unexpected worker error: {e}") logger.error(f"Unexpected worker error: {e}")
batch_failed += 1 batch_failed += 1
time.sleep(1) # Prevent tight error loops time.sleep(1)
def _process_single_backend(self, backend_name, backend, item, session): def _process_single_backend(self, backend_name, backend, item, session) -> bool:
"""Process a zone update for a single backend""" """Write a zone to one backend. Returns True on success, False on failure."""
try: try:
logger.debug(f"Using backend: {backend_name}")
if backend.write_zone(item["domain"], item["zone_file"]): if backend.write_zone(item["domain"], item["zone_file"]):
logger.debug(f"Successfully updated {item['domain']} in {backend_name}") logger.debug(f"Successfully updated {item['domain']} in {backend_name}")
if backend.get_name() == "bind": if backend.get_name() == "bind":
# Need to update the named.conf
backend.update_named_conf( backend.update_named_conf(
[d.domain for d in session.query(Domain).all()] [d.domain for d in session.query(Domain).all()]
) )
# Reload all zones
backend.reload_zone() backend.reload_zone()
else: else:
backend.reload_zone(zone_name=item["domain"]) backend.reload_zone(zone_name=item["domain"])
# Verify record count matches the source zone from DirectAdmin
self._verify_backend_record_count( self._verify_backend_record_count(
backend_name, backend, item["domain"], item["zone_file"] backend_name, backend, item["domain"], item["zone_file"]
) )
return True
else: else:
logger.error(f"Failed to update {item['domain']} in {backend_name}") logger.error(f"Failed to update {item['domain']} in {backend_name}")
return False
except Exception as e: except Exception as e:
logger.error(f"Error in {backend_name}: {str(e)}") logger.error(f"Error in {backend_name}: {str(e)}")
return False
def _process_backends_parallel(self, backends, item, session) -> set:
"""Write a zone to multiple backends concurrently.
Returns a set of backend names that failed."""
start_time = time.monotonic()
failed = set()
with ThreadPoolExecutor(
max_workers=len(backends), thread_name_prefix="backend"
) as executor:
futures = {
executor.submit(
self._process_single_backend, backend_name, backend, item, session
): backend_name
for backend_name, backend in backends.items()
}
for future in as_completed(futures):
backend_name = futures[future]
try:
success = future.result()
if not success:
failed.add(backend_name)
except Exception as e:
logger.error(f"Unhandled error in backend {backend_name}: {e}")
failed.add(backend_name)
elapsed = (time.monotonic() - start_time) * 1000
logger.debug(
f"Parallel processing of {item['domain']} across "
f"{len(backends)} backends completed in {elapsed:.0f}ms"
)
return failed
def _schedule_retry(self, item: dict, failed_backends: set):
"""Push a failed write onto the retry queue with exponential backoff.
Discards to dead-letter after MAX_RETRIES attempts."""
retry_count = item.get("retry_count", 0) + 1
if retry_count > MAX_RETRIES:
logger.error(
f"[retry] Dead-letter: {item['domain']} failed on "
f"{failed_backends} after {MAX_RETRIES} attempts — giving up"
)
return
delay = BACKOFF_SECONDS[min(retry_count - 1, len(BACKOFF_SECONDS) - 1)]
retry_item = {
**item,
"failed_backends": list(failed_backends),
"retry_count": retry_count,
"retry_after": time.time() + delay,
"source": "retry",
}
self.retry_queue.put(retry_item)
logger.warning(
f"[retry] {item['domain']}{list(failed_backends)} "
f"scheduled for retry #{retry_count} in {delay}s"
)
def _store_zone_data(self, session, domain: str, zone_file: str):
"""Persist the latest zone file content to the domain DB record."""
try:
record = session.query(Domain).filter_by(domain=domain).first()
if record:
record.zone_data = zone_file
record.zone_updated_at = datetime.datetime.utcnow()
session.commit()
except Exception as exc:
logger.warning(f"[worker] Could not store zone_data for {domain}: {exc}")
# ------------------------------------------------------------------
# Retry drain worker
# ------------------------------------------------------------------
def _process_retry_queue(self):
"""Periodically drain the retry queue and re-feed ready items to the
save queue. Items not yet due are put back onto the retry queue."""
logger.info("Retry drain worker started")
while self._running:
time.sleep(RETRY_DRAIN_INTERVAL)
now = time.time()
pending = []
# Drain all current retry items into memory
while True:
try:
pending.append(self.retry_queue.get_nowait())
self.retry_queue.task_done()
except Empty:
break
if not pending:
continue
ready = [i for i in pending if i.get("retry_after", 0) <= now]
not_ready = [i for i in pending if i.get("retry_after", 0) > now]
for item in not_ready:
self.retry_queue.put(item)
for item in ready:
logger.info(
f"[retry] Re-queuing {item['domain']}"
f"{item.get('failed_backends')} "
f"(attempt #{item.get('retry_count', '?')})"
)
self.save_queue.put(item)
if ready:
logger.debug(
f"[retry] Drain: {len(ready)} item(s) ready, "
f"{len(not_ready)} still pending"
)
# ------------------------------------------------------------------
# Delete queue worker
# ------------------------------------------------------------------
def _process_delete_queue(self): def _process_delete_queue(self):
"""Worker loop for processing zone deletion requests"""
logger.info("Delete queue worker started") logger.info("Delete queue worker started")
session = connect() session = connect()
@@ -181,47 +309,25 @@ class WorkerManager:
backends = self.backend_registry.get_available_backends() backends = self.backend_registry.get_available_backends()
remaining_domains = [d.domain for d in session.query(Domain).all()] remaining_domains = [d.domain for d in session.query(Domain).all()]
delete_success = True delete_success = True
if not backends: if not backends:
logger.warning( logger.warning(
f"No active backends — {domain} will be removed from DB only" f"No active backends — {domain} will be removed from DB only"
) )
elif len(backends) > 1: elif len(backends) > 1:
# Parallel delete, track failures
results = [] results = []
def delete_backend_wrapper(
backend_name, backend, domain, remaining_domains
):
try:
return backend.delete_zone(domain)
except Exception as e:
logger.error(
f"Error deleting {domain} from {backend_name}: {e}"
)
return False
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=len(backends)) as executor: with ThreadPoolExecutor(max_workers=len(backends)) as executor:
futures = { futures = {
executor.submit( executor.submit(
delete_backend_wrapper, self._delete_single_backend,
backend_name, backend_name, backend, domain, remaining_domains
backend,
domain,
remaining_domains,
): backend_name ): backend_name
for backend_name, backend in backends.items() for backend_name, backend in backends.items()
} }
for future in as_completed(futures): for future in as_completed(futures):
backend_name = futures[future] backend_name = futures[future]
try: try:
result = future.result() results.append(future.result())
results.append(result)
if not result:
logger.error(
f"Failed to delete {domain} from {backend_name}"
)
except Exception as e: except Exception as e:
logger.error( logger.error(
f"Unhandled error deleting from {backend_name}: {e}" f"Unhandled error deleting from {backend_name}: {e}"
@@ -229,32 +335,22 @@ class WorkerManager:
results.append(False) results.append(False)
delete_success = all(results) delete_success = all(results)
else: else:
# Single backend
for backend_name, backend in backends.items(): for backend_name, backend in backends.items():
try: if not self._delete_single_backend(
result = backend.delete_zone(domain) backend_name, backend, domain, remaining_domains
if not result: ):
logger.error(
f"Failed to delete {domain} from {backend_name}"
)
delete_success = False
except Exception as e:
logger.error(
f"Error deleting {domain} from {backend_name}: {e}"
)
delete_success = False delete_success = False
if delete_success: if delete_success:
session.delete(record) session.delete(record)
session.commit() session.commit()
logger.info(f"Removed {domain} from database")
self.delete_queue.task_done()
logger.success(f"Delete completed for {domain}") logger.success(f"Delete completed for {domain}")
else: else:
logger.error( logger.error(
f"Delete failed for {domain} on one or more backends — DB record retained" f"Delete failed for {domain} on one or more backends — "
f"DB record retained"
) )
self.delete_queue.task_done() self.delete_queue.task_done()
except Empty: except Empty:
continue continue
@@ -262,8 +358,10 @@ class WorkerManager:
logger.error(f"Unexpected delete worker error: {e}") logger.error(f"Unexpected delete worker error: {e}")
time.sleep(1) time.sleep(1)
def _delete_single_backend(self, backend_name, backend, domain, remaining_domains): def _delete_single_backend(
"""Delete a zone from a single backend""" self, backend_name, backend, domain, remaining_domains
) -> bool:
"""Delete a zone from one backend. Returns True on success."""
try: try:
if backend.delete_zone(domain): if backend.delete_zone(domain):
logger.debug(f"Deleted {domain} from {backend_name}") logger.debug(f"Deleted {domain} from {backend_name}")
@@ -272,83 +370,19 @@ class WorkerManager:
backend.reload_zone() backend.reload_zone()
else: else:
backend.reload_zone(zone_name=domain) backend.reload_zone(zone_name=domain)
return True
else: else:
logger.error(f"Failed to delete {domain} from {backend_name}") logger.error(f"Failed to delete {domain} from {backend_name}")
return False
except Exception as e: except Exception as e:
logger.error(f"Error deleting {domain} from {backend_name}: {e}") logger.error(f"Error deleting {domain} from {backend_name}: {e}")
return False
def _process_backends_delete_parallel(self, backends, domain, remaining_domains): # ------------------------------------------------------------------
"""Delete a zone from multiple backends in parallel""" # Record count verification
start_time = time.monotonic() # ------------------------------------------------------------------
with ThreadPoolExecutor(
max_workers=len(backends),
thread_name_prefix="backend_del",
) as executor:
futures = {
executor.submit(
self._delete_single_backend,
backend_name,
backend,
domain,
remaining_domains,
): backend_name
for backend_name, backend in backends.items()
}
for future in as_completed(futures):
backend_name = futures[future]
try:
future.result()
except Exception as e:
logger.error(f"Unhandled error deleting from {backend_name}: {e}")
elapsed = (time.monotonic() - start_time) * 1000
logger.debug(
f"Parallel delete of {domain} across "
f"{len(backends)} backends completed in {elapsed:.0f}ms"
)
def _process_backends_parallel(self, backends, item, session):
"""Process zone updates across multiple backends in parallel"""
start_time = time.monotonic()
with ThreadPoolExecutor(
max_workers=len(backends), thread_name_prefix="backend"
) as executor:
futures = {
executor.submit(
self._process_single_backend, backend_name, backend, item, session
): backend_name
for backend_name, backend in backends.items()
}
for future in as_completed(futures):
backend_name = futures[future]
try:
future.result()
except Exception as e:
logger.error(
f"Unhandled error processing backend "
f"{backend_name}: {str(e)}"
)
elapsed = (time.monotonic() - start_time) * 1000
logger.debug(
f"Parallel processing of {item['domain']} across "
f"{len(backends)} backends completed in {elapsed:.0f}ms"
)
def _verify_backend_record_count(self, backend_name, backend, zone_name, zone_data): def _verify_backend_record_count(self, backend_name, backend, zone_name, zone_data):
"""Verify and reconcile the backend record count against the
authoritative BIND zone from DirectAdmin.
After a successful write, this method checks whether the number of
records stored in the backend matches the number of records parsed
from the source zone file. If there are **extra** records in the
backend (e.g. from replication drift or stale data) they are
automatically removed via the backend's reconcile method.
Args:
backend_name: Display name of the backend instance
backend: The backend instance
zone_name: The zone that was just written
zone_data: The raw BIND zone file content (authoritative source)
"""
try: try:
expected = count_zone_records(zone_data, zone_name) expected = count_zone_records(zone_data, zone_name)
if expected < 0: if expected < 0:
@@ -359,46 +393,40 @@ class WorkerManager:
return return
matches, actual = backend.verify_zone_record_count(zone_name, expected) matches, actual = backend.verify_zone_record_count(zone_name, expected)
if matches: if matches:
return # All good return
if actual > expected: if actual > expected:
logger.warning( logger.warning(
f"[{backend_name}] Backend has {actual - expected} extra " f"[{backend_name}] Backend has {actual - expected} extra "
f"record(s) for {zone_name} — reconciling against " f"record(s) for {zone_name} — reconciling"
f"DirectAdmin source zone"
) )
success, removed = backend.reconcile_zone_records(zone_name, zone_data) success, removed = backend.reconcile_zone_records(zone_name, zone_data)
if success and removed > 0: if success and removed > 0:
# Verify again after reconciliation
matches, new_count = backend.verify_zone_record_count( matches, new_count = backend.verify_zone_record_count(
zone_name, expected zone_name, expected
) )
if matches: if matches:
logger.success( logger.success(
f"[{backend_name}] Reconciliation successful for " f"[{backend_name}] Reconciliation successful for "
f"{zone_name}: removed {removed} extra record(s), " f"{zone_name}: removed {removed} extra record(s)"
f"count now matches source ({new_count})"
) )
else: else:
logger.error( logger.error(
f"[{backend_name}] Reconciliation for {zone_name} " f"[{backend_name}] Reconciliation for {zone_name} "
f"removed {removed} record(s) but count still " f"removed {removed} record(s) but count still mismatched: "
f"mismatched: expected {expected}, got {new_count}" f"expected {expected}, got {new_count}"
) )
else: else:
logger.warning( logger.warning(
f"[{backend_name}] Backend has fewer records than source " f"[{backend_name}] Backend has fewer records than source "
f"for {zone_name} (expected {expected}, got {actual}) — " f"for {zone_name} (expected {expected}, got {actual}) — "
f"this may indicate a write failure; the next zone push " f"next zone push from DirectAdmin should correct this"
f"from DirectAdmin should correct this"
) )
except NotImplementedError: except NotImplementedError:
logger.debug( logger.debug(
f"[{backend_name}] Record count verification not " f"[{backend_name}] Record count verification not supported — skipping"
f"supported — skipping"
) )
except Exception as e: except Exception as e:
logger.error( logger.error(
@@ -406,49 +434,56 @@ class WorkerManager:
f"for {zone_name}: {e}" f"for {zone_name}: {e}"
) )
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def start(self): def start(self):
"""Start background workers"""
if self._running: if self._running:
return return
self._running = True self._running = True
self._save_thread = threading.Thread( self._save_thread = threading.Thread(
target=self._process_save_queue, daemon=True, name="save_queue_worker" target=self._process_save_queue, daemon=True, name="save_queue_worker"
) )
self._delete_thread = threading.Thread( self._delete_thread = threading.Thread(
target=self._process_delete_queue, daemon=True, name="delete_queue_worker" target=self._process_delete_queue, daemon=True, name="delete_queue_worker"
) )
self._retry_thread = threading.Thread(
target=self._process_retry_queue, daemon=True, name="retry_drain_worker"
)
self._save_thread.start() self._save_thread.start()
self._delete_thread.start() self._delete_thread.start()
self._retry_thread.start()
logger.info( logger.info(
f"Started worker threads: {self._save_thread.name}, {self._delete_thread.name}" f"Started worker threads: save, delete, retry_drain"
) )
self._reconciler = ReconciliationWorker( self._reconciler = ReconciliationWorker(
delete_queue=self.delete_queue, delete_queue=self.delete_queue,
save_queue=self.save_queue,
backend_registry=self.backend_registry,
reconciliation_config=self._reconciliation_config, reconciliation_config=self._reconciliation_config,
) )
self._reconciler.start() self._reconciler.start()
def stop(self): def stop(self):
"""Stop background workers gracefully"""
self._running = False self._running = False
if self._reconciler: if self._reconciler:
self._reconciler.stop() self._reconciler.stop()
if self._save_thread: for thread in (self._save_thread, self._delete_thread, self._retry_thread):
self._save_thread.join(timeout=5) if thread:
if self._delete_thread: thread.join(timeout=5)
self._delete_thread.join(timeout=5)
logger.info("Workers stopped") logger.info("Workers stopped")
def queue_status(self): def queue_status(self):
"""Return current queue status"""
return { return {
"save_queue_size": self.save_queue.qsize(), "save_queue_size": self.save_queue.qsize(),
"delete_queue_size": self.delete_queue.qsize(), "delete_queue_size": self.delete_queue.qsize(),
"retry_queue_size": self.retry_queue.qsize(),
"save_worker_alive": self._save_thread and self._save_thread.is_alive(), "save_worker_alive": self._save_thread and self._save_thread.is_alive(),
"delete_worker_alive": self._delete_thread "delete_worker_alive": self._delete_thread and self._delete_thread.is_alive(),
and self._delete_thread.is_alive(), "retry_worker_alive": self._retry_thread and self._retry_thread.is_alive(),
"reconciler_alive": ( "reconciler_alive": (
self._reconciler.is_alive if self._reconciler else False self._reconciler.is_alive if self._reconciler else False
), ),

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "directdnsonly" name = "directdnsonly"
version = "2.2.0" version = "2.3.0"
description = "DNS Management System - DirectAdmin to multiple backends" description = "DNS Management System - DirectAdmin to multiple backends"
authors = [ authors = [
{name = "Aaron Guise",email = "aaron@guise.net.nz"} {name = "Aaron Guise",email = "aaron@guise.net.nz"}

View File

@@ -2,7 +2,7 @@
import pytest import pytest
from queue import Queue from queue import Queue
from unittest.mock import patch from unittest.mock import patch, MagicMock
from directdnsonly.app.reconciler import ReconciliationWorker from directdnsonly.app.reconciler import ReconciliationWorker
from directdnsonly.app.db.models import Domain from directdnsonly.app.db.models import Domain
@@ -173,3 +173,129 @@ def test_no_servers_does_not_start(delete_queue):
w = ReconciliationWorker(delete_queue, cfg) w = ReconciliationWorker(delete_queue, cfg)
w.start() w.start()
assert not w.is_alive assert not w.is_alive
# ---------------------------------------------------------------------------
# _heal_backends — Option C backend healing
# ---------------------------------------------------------------------------
def _make_backend_registry(zone_exists_return: bool):
"""Build a mock backend_registry with one backend whose zone_exists returns
the given value."""
backend = MagicMock()
backend.zone_exists.return_value = zone_exists_return
registry = MagicMock()
registry.get_available_backends.return_value = {"coredns": backend}
return registry, backend
def test_heal_queues_zone_missing_from_backend(delete_queue, patch_connect):
save_queue = Queue()
registry, backend = _make_backend_registry(zone_exists_return=False)
patch_connect.add(
Domain(
domain="missing.com",
hostname="da1.example.com",
username="admin",
zone_data="; zone file",
)
)
patch_connect.commit()
w = ReconciliationWorker(
delete_queue, BASE_CONFIG, save_queue=save_queue, backend_registry=registry
)
w._heal_backends()
assert not save_queue.empty()
item = save_queue.get_nowait()
assert item["domain"] == "missing.com"
assert item["failed_backends"] == ["coredns"]
assert item["source"] == "reconciler_heal"
assert item["zone_file"] == "; zone file"
def test_heal_skips_domains_without_zone_data(delete_queue, patch_connect):
save_queue = Queue()
registry, _ = _make_backend_registry(zone_exists_return=False)
patch_connect.add(
Domain(domain="nodata.com", hostname="da1.example.com", username="admin", zone_data=None)
)
patch_connect.commit()
w = ReconciliationWorker(
delete_queue, BASE_CONFIG, save_queue=save_queue, backend_registry=registry
)
w._heal_backends()
assert save_queue.empty()
def test_heal_skips_when_all_backends_have_zone(delete_queue, patch_connect):
save_queue = Queue()
registry, _ = _make_backend_registry(zone_exists_return=True)
patch_connect.add(
Domain(
domain="present.com",
hostname="da1.example.com",
username="admin",
zone_data="; zone file",
)
)
patch_connect.commit()
w = ReconciliationWorker(
delete_queue, BASE_CONFIG, save_queue=save_queue, backend_registry=registry
)
w._heal_backends()
assert save_queue.empty()
def test_heal_dry_run_does_not_queue(delete_queue, patch_connect):
save_queue = Queue()
registry, _ = _make_backend_registry(zone_exists_return=False)
patch_connect.add(
Domain(
domain="dry.com",
hostname="da1.example.com",
username="admin",
zone_data="; zone file",
)
)
patch_connect.commit()
cfg = {**BASE_CONFIG, "dry_run": True}
w = ReconciliationWorker(
delete_queue, cfg, save_queue=save_queue, backend_registry=registry
)
w._heal_backends()
assert save_queue.empty()
def test_heal_skipped_when_no_registry(delete_queue, patch_connect):
"""_heal_backends should not run when backend_registry is None."""
save_queue = Queue()
patch_connect.add(
Domain(
domain="noregistry.com",
hostname="da1.example.com",
username="admin",
zone_data="; zone file",
)
)
patch_connect.commit()
w = ReconciliationWorker(delete_queue, BASE_CONFIG, save_queue=save_queue)
# Should not raise; healing is silently skipped
with _patch_da({"noregistry.com"}):
w._reconcile_all()
assert save_queue.empty()