You've already forked directdnsonly
feat: add initial_delay_minutes to reconciler for LB stagger 🕐
Configurable startup delay before the first reconciliation pass so that multiple receivers behind a load balancer can be offset without relying on container start order (which is lost on reboot). Set to half the interval on the secondary receiver — e.g. interval 60m → delay 30m. Default is 0 (no change to existing behaviour). Stop event is respected during the delay so the worker shuts down cleanly even mid-wait.
This commit is contained in:
@@ -38,6 +38,7 @@ class ReconciliationWorker:
|
|||||||
self.verify_ssl = reconciliation_config.get("verify_ssl", True)
|
self.verify_ssl = reconciliation_config.get("verify_ssl", True)
|
||||||
self.ipp = int(reconciliation_config.get("ipp", 1000))
|
self.ipp = int(reconciliation_config.get("ipp", 1000))
|
||||||
self.dry_run = bool(reconciliation_config.get("dry_run", False))
|
self.dry_run = bool(reconciliation_config.get("dry_run", False))
|
||||||
|
self._initial_delay = reconciliation_config.get("initial_delay_minutes", 0) * 60
|
||||||
self._stop_event = threading.Event()
|
self._stop_event = threading.Event()
|
||||||
self._thread = None
|
self._thread = None
|
||||||
|
|
||||||
@@ -58,9 +59,13 @@ class ReconciliationWorker:
|
|||||||
self._thread.start()
|
self._thread.start()
|
||||||
server_names = [s.get("hostname", "?") for s in self.servers]
|
server_names = [s.get("hostname", "?") for s in self.servers]
|
||||||
mode = "DRY-RUN" if self.dry_run else "LIVE"
|
mode = "DRY-RUN" if self.dry_run else "LIVE"
|
||||||
|
delay_str = (
|
||||||
|
f", initial_delay: {self._initial_delay // 60}m" if self._initial_delay else ""
|
||||||
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Reconciliation poller started [{mode}] — "
|
f"Reconciliation poller started [{mode}] — "
|
||||||
f"interval: {self.interval_seconds // 60}m, "
|
f"interval: {self.interval_seconds // 60}m"
|
||||||
|
f"{delay_str}, "
|
||||||
f"servers: {server_names}"
|
f"servers: {server_names}"
|
||||||
)
|
)
|
||||||
if self.dry_run:
|
if self.dry_run:
|
||||||
@@ -83,6 +88,13 @@ class ReconciliationWorker:
|
|||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
def _run(self):
|
def _run(self):
|
||||||
|
if self._initial_delay > 0:
|
||||||
|
logger.info(
|
||||||
|
f"[reconciler] Initial delay {self._initial_delay // 60}m — "
|
||||||
|
f"first reconciliation pass deferred"
|
||||||
|
)
|
||||||
|
if self._stop_event.wait(timeout=self._initial_delay):
|
||||||
|
return # stopped cleanly during the initial delay
|
||||||
logger.info("Reconciliation worker starting — running initial check now")
|
logger.info("Reconciliation worker starting — running initial check now")
|
||||||
self._reconcile_all()
|
self._reconcile_all()
|
||||||
while not self._stop_event.wait(timeout=self.interval_seconds):
|
while not self._stop_event.wait(timeout=self.interval_seconds):
|
||||||
|
|||||||
@@ -14,6 +14,8 @@ app:
|
|||||||
# enabled: true
|
# enabled: true
|
||||||
# dry_run: true # log orphans but do NOT queue deletes — safe first-run mode
|
# dry_run: true # log orphans but do NOT queue deletes — safe first-run mode
|
||||||
# interval_minutes: 60
|
# interval_minutes: 60
|
||||||
|
# initial_delay_minutes: 0 # stagger first run when running multiple receivers behind a LB
|
||||||
|
# # e.g. receiver-1: 0, receiver-2: 30 (half the interval)
|
||||||
# verify_ssl: true # set false for self-signed DA certs
|
# verify_ssl: true # set false for self-signed DA certs
|
||||||
# ipp: 1000 # items per page when polling DA (default 1000)
|
# ipp: 1000 # items per page when polling DA (default 1000)
|
||||||
# directadmin_servers:
|
# directadmin_servers:
|
||||||
|
|||||||
@@ -175,6 +175,17 @@ def test_no_servers_does_not_start(delete_queue):
|
|||||||
assert not w.is_alive
|
assert not w.is_alive
|
||||||
|
|
||||||
|
|
||||||
|
def test_initial_delay_stored(delete_queue):
|
||||||
|
cfg = {**BASE_CONFIG, "initial_delay_minutes": 30}
|
||||||
|
w = ReconciliationWorker(delete_queue, cfg)
|
||||||
|
assert w._initial_delay == 30 * 60
|
||||||
|
|
||||||
|
|
||||||
|
def test_zero_initial_delay_by_default(delete_queue):
|
||||||
|
w = ReconciliationWorker(delete_queue, BASE_CONFIG)
|
||||||
|
assert w._initial_delay == 0
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# _heal_backends — Option C backend healing
|
# _heal_backends — Option C backend healing
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user