From 0b31b7578984c1bcec01cebd2fad98f0c0e40aa7 Mon Sep 17 00:00:00 2001 From: Aaron Guise Date: Wed, 25 Feb 2026 15:43:08 +1300 Subject: [PATCH] =?UTF-8?q?fix:=20correct=20RDATA=20encoding=20and=20batch?= =?UTF-8?q?=20processing=20in=20CoreDNS=20MySQL=20backend=20=F0=9F=90=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix dnspython silently relativizing in-zone FQDN targets to '@' by calling rdata.to_text(origin=origin, relativize=False); CoreDNS MySQL requires absolute FQDNs in RDATA and was serving '.' for any CNAME/MX pointing to the zone apex - Reorder write_zone to delete stale records before inserting new ones so a brief NXDOMAIN is preferred over briefly serving duplicate records - Rework save-queue batch loop: keep batch open until queue is empty rather than closing after a fixed timeout, so sequential DA zone pushes accumulate into a single batch - Add managed_by='directadmin' to _ensure_zone_exists for new and legacy NULL rows --- directdnsonly/app/backends/coredns_mysql.py | 122 +++++---------- directdnsonly/app/da/client.py | 8 +- directdnsonly/app/db/models/__init__.py | 4 +- directdnsonly/worker.py | 161 ++++++++++---------- tests/test_coredns_mysql.py | 87 ----------- tests/test_da_client.py | 20 ++- tests/test_reconciler.py | 11 +- 7 files changed, 149 insertions(+), 264 deletions(-) diff --git a/directdnsonly/app/backends/coredns_mysql.py b/directdnsonly/app/backends/coredns_mysql.py index c6b92b0..3cab234 100644 --- a/directdnsonly/app/backends/coredns_mysql.py +++ b/directdnsonly/app/backends/coredns_mysql.py @@ -91,10 +91,34 @@ class CoreDNSMySQLBackend(DNSBackend): zone_name, zone_data ) - # Track changes - current_records = set() + # Pre-compute the set of (hostname, type, data) keys that should + # remain after this update, so we can identify stale records upfront. + incoming_keys = { + (name, rtype, data) for name, rtype, data, _ in source_records + } + changes = {"added": 0, "updated": 0, "removed": 0} + # --- 1. Remove stale records first --- + # Deleting before inserting means a brief NXDOMAIN is preferable + # to briefly serving both old and new records simultaneously. + for key, record in existing_records.items(): + if key not in incoming_keys: + logger.debug( + f"Removed record: {record.hostname} {record.type} {record.data}" + ) + session.delete(record) + changes["removed"] += 1 + + # Handle SOA removal if needed + if existing_soa and not source_soa: + logger.debug( + f"Removed SOA record: {existing_soa.hostname} SOA {existing_soa.data}" + ) + session.delete(existing_soa) + changes["removed"] += 1 + + # --- 2. Add / update incoming records --- # Handle SOA record if source_soa: soa_name, soa_content, soa_ttl = source_soa @@ -124,7 +148,6 @@ class CoreDNSMySQLBackend(DNSBackend): # Process all non-SOA records for record_name, record_type, record_content, record_ttl in source_records: key = (record_name, record_type, record_content) - current_records.add(key) if key in existing_records: # Update existing record if TTL changed @@ -152,23 +175,6 @@ class CoreDNSMySQLBackend(DNSBackend): f"Added new record: {record_name} {record_type} {record_content}" ) - # Remove records that no longer exist in the source zone - for key, record in existing_records.items(): - if key not in current_records: - logger.debug( - f"Removed record: {record.hostname} {record.type} {record.data}" - ) - session.delete(record) - changes["removed"] += 1 - - # Handle SOA removal if needed - if existing_soa and not source_soa: - logger.debug( - f"Removed SOA record: {existing_soa.hostname} SOA {existing_soa.data}" - ) - session.delete(existing_soa) - changes["removed"] += 1 - session.commit() total_changes = changes["added"] + changes["updated"] + changes["removed"] if total_changes > 0: @@ -258,64 +264,10 @@ class CoreDNSMySQLBackend(DNSBackend): zone.managed_by = "directadmin" return zone - def _relativize_name(self, zone_name: str, name: str) -> str: - """Normalise a DNS hostname for CoreDNS MySQL storage. - - CoreDNS MySQL (cybercinch fork) expects: - ``@`` — zone apex - ``sub`` — in-zone hostname (relative, no trailing dot) - ``other.domain.`` — out-of-zone FQDN (trailing dot) - - When a zone file lacks a ``$ORIGIN`` directive dnspython cannot - relativize names and returns absolute FQDNs. This method converts - both the already-relative form (from dnspython) and the absolute FQDN - form into the format CoreDNS MySQL understands. - - Storing the zone FQDN as-is (e.g. ``ithome.net.nz.``) causes CoreDNS - to strip the zone suffix and serve ``MX 0 .`` / ``CNAME .`` instead of - the correct apex target — hence the conversion to ``@``. - """ - if name in ("@", "."): - return "@" - zone_fqdn = self.dot_fqdn(zone_name) - if name == zone_fqdn: - return "@" - suffix = "." + zone_fqdn - if name.endswith(suffix): - return name[: -len(suffix)] - return name - - def _normalize_cname_data(self, zone_name: str, record_content: str) -> str: - return self._relativize_name(zone_name, record_content) - - def _normalize_mx_data(self, zone_name: str, record_content: str) -> str: - """Normalize MX RDATA: relativize the exchange hostname. - - ``rdata.to_text()`` returns ``"priority exchange"``. When the exchange - is the zone apex it may arrive as ``@`` (dnspython-relativized) or as - the full FQDN (no ``$ORIGIN`` in zone). Both are converted to ``@``. - """ - parts = record_content.split(None, 1) - if len(parts) == 2: - priority, exchange = parts - return f"{priority} {self._relativize_name(zone_name, exchange)}" - return record_content - - def _normalize_ns_data(self, zone_name: str, record_content: str) -> str: - return self._relativize_name(zone_name, record_content) - - def _normalize_srv_data(self, zone_name: str, record_content: str) -> str: - """Normalize SRV RDATA: relativize the target (last field).""" - parts = record_content.rsplit(None, 1) - if len(parts) == 2: - prefix, target = parts - return f"{prefix} {self._relativize_name(zone_name, target)}" - return record_content - def _parse_zone_to_record_set( self, zone_name: str, zone_data: str ) -> Tuple[Set[Tuple[str, str, str, int]], Optional[Tuple[str, str, int]]]: - """Parse a BIND zone file into a set of normalised record keys. + """Parse a BIND zone file into a set of record keys. Returns: Tuple of: @@ -326,27 +278,27 @@ class CoreDNSMySQLBackend(DNSBackend): records: Set[Tuple[str, str, str, int]] = set() soa = None + # Use the zone origin (if available) to expand relative names in RDATA + # back to absolute FQDNs. Without this, dnspython's default relativize=True + # behaviour turns in-zone targets like `wvvcc.co.nz.` into `@` in the + # stored data, which CoreDNS then serves incorrectly. + origin = dns_zone.origin + for name, ttl, rdata in dns_zone.iterate_rdatas(): if rdata.rdclass != IN: continue record_name = str(name) record_type = rdata.rdtype.name - record_content = rdata.to_text() + if origin is not None: + record_content = rdata.to_text(origin=origin, relativize=False) + else: + record_content = rdata.to_text() if record_type == "SOA": soa = (record_name, record_content, ttl) continue - if record_type == "CNAME": - record_content = self._normalize_cname_data(zone_name, record_content) - elif record_type == "MX": - record_content = self._normalize_mx_data(zone_name, record_content) - elif record_type == "NS": - record_content = self._normalize_ns_data(zone_name, record_content) - elif record_type == "SRV": - record_content = self._normalize_srv_data(zone_name, record_content) - records.add((record_name, record_type, record_content, ttl)) return records, soa diff --git a/directdnsonly/app/da/client.py b/directdnsonly/app/da/client.py index 93a9fc4..d333f6f 100644 --- a/directdnsonly/app/da/client.py +++ b/directdnsonly/app/da/client.py @@ -70,7 +70,13 @@ class DirectAdminClient: if response is None: return None - if response.is_redirect or response.status_code in (301, 302, 303, 307, 308): + if response.is_redirect or response.status_code in ( + 301, + 302, + 303, + 307, + 308, + ): if self._cookies: logger.error( f"[da:{self.hostname}] Still redirecting after session login — " diff --git a/directdnsonly/app/db/models/__init__.py b/directdnsonly/app/db/models/__init__.py index 35e8a8f..f644836 100644 --- a/directdnsonly/app/db/models/__init__.py +++ b/directdnsonly/app/db/models/__init__.py @@ -25,8 +25,8 @@ class Domain(Base): domain = Column(String(255), unique=True) hostname = Column(String(255)) username = Column(String(255)) - zone_data = Column(Text, nullable=True) # last known zone file from DA - zone_updated_at = Column(DateTime, nullable=True) # when zone_data was last stored + zone_data = Column(Text, nullable=True) # last known zone file from DA + zone_updated_at = Column(DateTime, nullable=True) # when zone_data was last stored def __repr__(self): return "" % ( diff --git a/directdnsonly/worker.py b/directdnsonly/worker.py index e9e6c84..89ca2fa 100644 --- a/directdnsonly/worker.py +++ b/directdnsonly/worker.py @@ -62,93 +62,90 @@ class WorkerManager: logger.info("Save queue worker started") session = connect() - batch_start = None - batch_processed = 0 - batch_failed = 0 - while self._running: + # Block until at least one item is available try: item = self.save_queue.get(block=True, timeout=5) - - if batch_start is None: - batch_start = time.monotonic() - batch_processed = 0 - batch_failed = 0 - pending = self.save_queue.qsize() - logger.info( - f"📥 Batch started — {pending + 1} zone(s) queued for processing" - ) - - domain = item.get("domain", "unknown") - is_retry = item.get("source") in ("retry", "reconciler_heal") - target_backends = item.get("failed_backends") # None = all backends - - logger.debug( - f"Processing zone update for {domain}" - + (f" [retry #{item.get('retry_count', 0)}]" if is_retry else "") - + (f" [backends: {target_backends}]" if target_backends else "") - ) - - if not is_retry and not check_zone_exists(domain): - put_zone_index(domain, item.get("hostname"), item.get("username")) - - if not all(k in item for k in ["domain", "zone_file"]): - logger.error(f"Invalid queue item: {item}") - self.save_queue.task_done() - batch_failed += 1 - continue - - backends = self.backend_registry.get_available_backends() - if target_backends: - backends = { - k: v for k, v in backends.items() if k in target_backends - } - if not backends: - logger.warning("No target backends available for this item!") - self.save_queue.task_done() - batch_failed += 1 - continue - - if len(backends) > 1: - failed = self._process_backends_parallel(backends, item, session) - else: - failed = set() - for backend_name, backend in backends.items(): - if not self._process_single_backend( - backend_name, backend, item, session - ): - failed.add(backend_name) - - if failed: - self._schedule_retry(item, failed) - batch_failed += 1 - else: - # Successful write — persist zone_data for Option C healing - self._store_zone_data(session, domain, item["zone_file"]) - batch_processed += 1 - - self.save_queue.task_done() - logger.debug(f"Completed processing for {domain}") - except Empty: - if batch_start is not None: - elapsed = time.monotonic() - batch_start - total = batch_processed + batch_failed - rate = batch_processed / elapsed if elapsed > 0 else 0 - logger.success( - f"📦 Batch complete — {batch_processed}/{total} zone(s) " - f"processed successfully in {elapsed:.1f}s " - f"({rate:.1f} zones/sec)" - + (f", {batch_failed} failed" if batch_failed else "") - ) - batch_start = None - batch_processed = 0 - batch_failed = 0 continue - except Exception as e: - logger.error(f"Unexpected worker error: {e}") - batch_failed += 1 - time.sleep(1) + + # Open a batch and keep processing until the queue is empty + batch_start = time.monotonic() + batch_processed = 0 + batch_failed = 0 + logger.info("📥 Batch started") + + while True: + try: + domain = item.get("domain", "unknown") + is_retry = item.get("source") in ("retry", "reconciler_heal") + target_backends = item.get("failed_backends") # None = all backends + + logger.debug( + f"Processing zone update for {domain}" + + (f" [retry #{item.get('retry_count', 0)}]" if is_retry else "") + + (f" [backends: {target_backends}]" if target_backends else "") + ) + + if not is_retry and not check_zone_exists(domain): + put_zone_index(domain, item.get("hostname"), item.get("username")) + + if not all(k in item for k in ["domain", "zone_file"]): + logger.error(f"Invalid queue item: {item}") + self.save_queue.task_done() + batch_failed += 1 + else: + backends = self.backend_registry.get_available_backends() + if target_backends: + backends = { + k: v for k, v in backends.items() if k in target_backends + } + if not backends: + logger.warning("No target backends available for this item!") + self.save_queue.task_done() + batch_failed += 1 + else: + if len(backends) > 1: + failed = self._process_backends_parallel(backends, item, session) + else: + failed = set() + for backend_name, backend in backends.items(): + if not self._process_single_backend( + backend_name, backend, item, session + ): + failed.add(backend_name) + + if failed: + self._schedule_retry(item, failed) + batch_failed += 1 + else: + self._store_zone_data(session, domain, item["zone_file"]) + batch_processed += 1 + + self.save_queue.task_done() + logger.debug(f"Completed processing for {domain}") + + except Exception as e: + logger.error(f"Unexpected worker error processing {item.get('domain', '?')}: {e}") + batch_failed += 1 + time.sleep(1) + + # Check immediately for the next item — keep batch open while + # more work is queued; close it only when the queue is empty. + try: + item = self.save_queue.get_nowait() + except Empty: + break + + elapsed = time.monotonic() - batch_start + total = batch_processed + batch_failed + rate = batch_processed / elapsed if elapsed > 0 else 0 + logger.success( + f"📦 Batch complete — {batch_processed}/{total} zone(s) " + f"processed successfully in {elapsed:.1f}s " + f"({rate:.1f} zones/sec)" + + (f", {batch_failed} failed" if batch_failed else "") + ) def _process_single_backend(self, backend_name, backend, item, session) -> bool: """Write a zone to one backend. Returns True on success, False on failure.""" diff --git a/tests/test_coredns_mysql.py b/tests/test_coredns_mysql.py index de2f609..e922960 100644 --- a/tests/test_coredns_mysql.py +++ b/tests/test_coredns_mysql.py @@ -199,90 +199,3 @@ def test_write_zone_migrates_null_managed_by(mysql_backend): assert zone.managed_by == "directadmin" session.close() - -# --------------------------------------------------------------------------- -# _relativize_name — apex/in-zone/external normalisation for CoreDNS MySQL -# --------------------------------------------------------------------------- - - -def test_relativize_apex_symbol(mysql_backend): - assert mysql_backend._relativize_name("example.com", "@") == "@" - - -def test_relativize_dot(mysql_backend): - assert mysql_backend._relativize_name("example.com", ".") == "@" - - -def test_relativize_zone_fqdn_to_apex(mysql_backend): - """Full zone FQDN must become '@' — storing it as-is causes CoreDNS to serve '.'.""" - assert mysql_backend._relativize_name("example.com", "example.com.") == "@" - - -def test_relativize_in_zone_subdomain(mysql_backend): - assert mysql_backend._relativize_name("example.com", "mail.example.com.") == "mail" - - -def test_relativize_external_fqdn_unchanged(mysql_backend): - assert mysql_backend._relativize_name("example.com", "mail.google.com.") == "mail.google.com." - - -def test_relativize_already_relative_unchanged(mysql_backend): - assert mysql_backend._relativize_name("example.com", "mail") == "mail" - - -# --------------------------------------------------------------------------- -# MX record normalization via write_zone -# --------------------------------------------------------------------------- - -MX_APEX_ZONE = """\ -$ORIGIN example.com. -$TTL 300 -example.com. 300 IN SOA ns.example.com. admin.example.com. (2023 3600 1800 604800 86400) -example.com. 300 IN MX 0 example.com. -example.com. 300 IN MX 10 mail.google.com. -""" - -MX_RELATIVE_ZONE = """\ -$ORIGIN example.com. -$TTL 300 -example.com. 300 IN SOA ns.example.com. admin.example.com. (2023 3600 1800 604800 86400) -example.com. 300 IN MX 0 @ -example.com. 300 IN MX 10 mail.google.com. -""" - - -def _get_mx_data(mysql_backend, zone_name="example.com"): - session = mysql_backend.Session() - zone = session.execute( - select(Zone).filter_by(zone_name=zone_name + ".") - ).scalar_one_or_none() - records = ( - session.execute( - select(Record).filter_by(zone_id=zone.id, type="MX") - ).scalars().all() - ) - result = {r.data for r in records} - session.close() - return result - - -def test_mx_apex_fqdn_stored_as_at_symbol(mysql_backend): - """MX pointing to zone FQDN must be stored as '0 @'.""" - mysql_backend.write_zone("example.com", MX_APEX_ZONE) - mx_data = _get_mx_data(mysql_backend) - assert "0 @" in mx_data - assert not any("example.com" in d for d in mx_data) - - -def test_mx_apex_at_symbol_stored_as_at_symbol(mysql_backend): - """MX '0 @' (already relative) must remain '0 @'.""" - mysql_backend.write_zone("example.com", MX_RELATIVE_ZONE) - mx_data = _get_mx_data(mysql_backend) - assert "0 @" in mx_data - - -def test_mx_external_fqdn_stored_unchanged(mysql_backend): - """External MX target must be stored as absolute FQDN.""" - mysql_backend.write_zone("example.com", MX_APEX_ZONE) - mx_data = _get_mx_data(mysql_backend) - assert "10 mail.google.com." in mx_data diff --git a/tests/test_da_client.py b/tests/test_da_client.py index 8730c39..a1f5a96 100644 --- a/tests/test_da_client.py +++ b/tests/test_da_client.py @@ -24,7 +24,9 @@ def _make_json_response(domains_list, total_pages=1): def _client(): - return DirectAdminClient("da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True) + return DirectAdminClient( + "da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True + ) # --------------------------------------------------------------------------- @@ -105,7 +107,9 @@ def test_html_response_returns_none(): def test_connection_error_returns_none(): - with patch("requests.get", side_effect=requests.exceptions.ConnectionError("refused")): + with patch( + "requests.get", side_effect=requests.exceptions.ConnectionError("refused") + ): result = _client().list_domains() assert result is None @@ -119,7 +123,9 @@ def test_timeout_returns_none(): def test_ssl_error_returns_none(): - with patch("requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")): + with patch( + "requests.get", side_effect=requests.exceptions.SSLError("cert verify failed") + ): result = _client().list_domains() assert result is None @@ -131,12 +137,16 @@ def test_ssl_error_returns_none(): def test_parse_standard_querystring(): - result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com&list[]=test.com") + result = DirectAdminClient._parse_legacy_domain_list( + "list[]=example.com&list[]=test.com" + ) assert result == {"example.com", "test.com"} def test_parse_newline_separated(): - result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com\nlist[]=test.com") + result = DirectAdminClient._parse_legacy_domain_list( + "list[]=example.com\nlist[]=test.com" + ) assert result == {"example.com", "test.com"} diff --git a/tests/test_reconciler.py b/tests/test_reconciler.py index 0a73dd9..4f50272 100644 --- a/tests/test_reconciler.py +++ b/tests/test_reconciler.py @@ -55,7 +55,9 @@ DA_CLIENT_PATH = "directdnsonly.app.reconciler.DirectAdminClient" def _patch_da(return_value): """Patch DirectAdminClient so list_domains returns a fixed value.""" - return patch(DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value}) + return patch( + DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value} + ) # --------------------------------------------------------------------------- @@ -233,7 +235,12 @@ def test_heal_skips_domains_without_zone_data(delete_queue, patch_connect): registry, _ = _make_backend_registry(zone_exists_return=False) patch_connect.add( - Domain(domain="nodata.com", hostname="da1.example.com", username="admin", zone_data=None) + Domain( + domain="nodata.com", + hostname="da1.example.com", + username="admin", + zone_data=None, + ) ) patch_connect.commit()