fix: correct RDATA encoding and batch processing in CoreDNS MySQL backend 🐛

- Fix dnspython silently relativizing in-zone FQDN targets to '@' by
  calling rdata.to_text(origin=origin, relativize=False); CoreDNS MySQL
  requires absolute FQDNs in RDATA and was serving '.' for any CNAME/MX
  pointing to the zone apex
- Reorder write_zone to delete stale records before inserting new ones
  so a brief NXDOMAIN is preferred over briefly serving duplicate records
- Rework save-queue batch loop: keep batch open until queue is empty
  rather than closing after a fixed timeout, so sequential DA zone pushes
  accumulate into a single batch
- Add managed_by='directadmin' to _ensure_zone_exists for new and
  legacy NULL rows
This commit is contained in:
2026-02-25 15:43:08 +13:00
parent 83fbb03cad
commit 0b31b75789
7 changed files with 149 additions and 264 deletions

View File

@@ -24,7 +24,9 @@ def _make_json_response(domains_list, total_pages=1):
def _client():
return DirectAdminClient("da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True)
return DirectAdminClient(
"da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True
)
# ---------------------------------------------------------------------------
@@ -105,7 +107,9 @@ def test_html_response_returns_none():
def test_connection_error_returns_none():
with patch("requests.get", side_effect=requests.exceptions.ConnectionError("refused")):
with patch(
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
):
result = _client().list_domains()
assert result is None
@@ -119,7 +123,9 @@ def test_timeout_returns_none():
def test_ssl_error_returns_none():
with patch("requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")):
with patch(
"requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")
):
result = _client().list_domains()
assert result is None
@@ -131,12 +137,16 @@ def test_ssl_error_returns_none():
def test_parse_standard_querystring():
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com&list[]=test.com")
result = DirectAdminClient._parse_legacy_domain_list(
"list[]=example.com&list[]=test.com"
)
assert result == {"example.com", "test.com"}
def test_parse_newline_separated():
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com\nlist[]=test.com")
result = DirectAdminClient._parse_legacy_domain_list(
"list[]=example.com\nlist[]=test.com"
)
assert result == {"example.com", "test.com"}