refactor: extract DirectAdminClient into directdnsonly.app.da module 🏗️

Move all outbound DirectAdmin HTTP logic out of ReconciliationWorker and
into a dedicated, independently testable DirectAdminClient class:

- directdnsonly/app/da/client.py: list_domains (paginated JSON + legacy
  fallback), get (authenticated GET to any CMD_* endpoint), _login
  (DA Evo session-cookie fallback), _parse_legacy_domain_list
- directdnsonly/app/da/__init__.py: public re-export of DirectAdminClient
- reconciler.py: now purely reconciliation logic; instantiates a client
  per configured server — no HTTP code remaining
- tests/test_da_client.py: 16 dedicated tests for DirectAdminClient
- tests/test_reconciler.py: mocks at the DirectAdminClient class boundary
  instead of the internal _fetch_da_domains method

Bumped to 2.2.0 — DirectAdminClient is now a first-class public API.
This commit is contained in:
2026-02-19 12:16:22 +13:00
parent ae1e89a236
commit e0a119558d
6 changed files with 439 additions and 427 deletions

View File

@@ -0,0 +1,3 @@
from .client import DirectAdminClient
__all__ = ["DirectAdminClient"]

View File

@@ -0,0 +1,204 @@
"""DirectAdmin HTTP client.
Encapsulates all outbound communication with a single DirectAdmin server:
authenticated requests, the Basic-Auth → session-cookie fallback for DA Evo,
paginated domain listing, and the legacy URL-encoded response parser.
"""
from __future__ import annotations
from urllib.parse import parse_qs
from typing import Optional
import requests
import requests.exceptions
from loguru import logger
class DirectAdminClient:
"""HTTP client for a single DirectAdmin server.
Handles two authentication modes transparently:
- Basic Auth (classic DA / API-only access)
- Session cookie via CMD_LOGIN (DA Evolution — redirects Basic Auth)
Usage::
client = DirectAdminClient("da1.example.com", 2222, "admin", "secret")
domains = client.list_domains() # set[str] or None on failure
response = client.get("CMD_API_SHOW_ALL_USERS")
"""
def __init__(
self,
hostname: str,
port: int,
username: str,
password: str,
ssl: bool = True,
verify_ssl: bool = True,
) -> None:
self.hostname = hostname
self.port = port
self.username = username
self.password = password
self.scheme = "https" if ssl else "http"
self.verify_ssl = verify_ssl
self._cookies = None # populated on first successful session login
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def list_domains(self, ipp: int = 1000) -> Optional[set]:
"""Return all domains on this DA server via CMD_DNS_ADMIN (JSON, paginated).
Falls back to the legacy URL-encoded parser if JSON decode fails.
Returns a set of lowercase domain strings, or ``None`` if the server
is unreachable or returns an error.
"""
page = 1
all_domains: set = set()
total_pages = 1
try:
while page <= total_pages:
response = self.get(
"CMD_DNS_ADMIN",
params={"json": "yes", "page": page, "ipp": ipp},
)
if response is None:
return None
if response.is_redirect or response.status_code in (301, 302, 303, 307, 308):
if self._cookies:
logger.error(
f"[da:{self.hostname}] Still redirecting after session login — "
f"check that '{self.username}' has admin-level access. Skipping."
)
return None
logger.debug(
f"[da:{self.hostname}] Basic Auth redirected "
f"(HTTP {response.status_code}) — attempting session login (DA Evo)"
)
if not self._login():
return None
continue # retry this page with cookies
response.raise_for_status()
content_type = response.headers.get("Content-Type", "")
if "text/html" in content_type:
logger.error(
f"[da:{self.hostname}] Returned HTML instead of API response — "
f"check credentials and admin-level access. Skipping."
)
return None
try:
data = response.json()
for k, v in data.items():
if k.isdigit() and isinstance(v, dict) and "domain" in v:
all_domains.add(v["domain"].strip().lower())
total_pages = int(data.get("info", {}).get("total_pages", 1))
page += 1
except Exception as exc:
logger.error(
f"[da:{self.hostname}] JSON decode failed on page {page}: {exc}\n"
f"Raw response: {response.text[:500]}"
)
all_domains.update(self._parse_legacy_domain_list(response.text))
break # no paging in legacy mode
return all_domains
except requests.exceptions.SSLError as exc:
logger.error(
f"[da:{self.hostname}] SSL error — {exc}. "
f"Set verify_ssl: false in reconciliation config if using self-signed certs."
)
except requests.exceptions.ConnectionError as exc:
logger.error(f"[da:{self.hostname}] Cannot reach server — {exc}. Skipping.")
except requests.exceptions.Timeout:
logger.error(f"[da:{self.hostname}] Connection timed out. Skipping.")
except requests.exceptions.HTTPError as exc:
logger.error(f"[da:{self.hostname}] HTTP error — {exc}. Skipping.")
except Exception as exc:
logger.error(f"[da:{self.hostname}] Unexpected error: {exc}")
return None
def get(
self, command: str, params: Optional[dict] = None
) -> Optional[requests.Response]:
"""Authenticated GET to any DA CMD_* endpoint.
Uses session cookies when available (after a successful ``_login``),
otherwise falls back to HTTP Basic Auth. Does **not** follow redirects
so callers can detect the Basic-Auth → cookie upgrade.
"""
url = f"{self.scheme}://{self.hostname}:{self.port}/{command}"
kwargs: dict = dict(
params=params or {},
timeout=30,
verify=self.verify_ssl,
allow_redirects=False,
)
if self._cookies:
kwargs["cookies"] = self._cookies
else:
kwargs["auth"] = (self.username, self.password)
try:
return requests.get(url, **kwargs)
except Exception as exc:
logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}")
return None
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _login(self) -> bool:
"""POST CMD_LOGIN to obtain a DA Evo session cookie.
Populates ``self._cookies`` on success and returns ``True``.
Returns ``False`` on any failure.
"""
login_url = f"{self.scheme}://{self.hostname}:{self.port}/CMD_LOGIN"
try:
response = requests.post(
login_url,
data={
"username": self.username,
"password": self.password,
"referer": "/CMD_DNS_ADMIN?json=yes&page=1&ipp=500",
},
timeout=30,
verify=self.verify_ssl,
allow_redirects=False,
)
if not response.cookies:
logger.error(
f"[da:{self.hostname}] CMD_LOGIN returned no session cookie — "
f"check username/password."
)
return False
self._cookies = response.cookies
logger.debug(f"[da:{self.hostname}] Session login successful (DA Evo)")
return True
except Exception as exc:
logger.error(f"[da:{self.hostname}] Session login failed: {exc}")
return False
@staticmethod
def _parse_legacy_domain_list(body: str) -> set:
"""Parse DA's legacy CMD_API_SHOW_ALL_DOMAINS URL-encoded response.
DA returns ``list[]=example.com&list[]=example2.com``, optionally
newline-separated instead of ampersand-separated.
"""
normalised = body.replace("\n", "&").strip("&")
params = parse_qs(normalised)
domains = params.get("list[]", [])
return {d.strip().lower() for d in domains if d.strip()}

View File

@@ -1,11 +1,8 @@
#!/usr/bin/env python3
import threading
from urllib.parse import parse_qs
from loguru import logger
import requests
import requests.exceptions
from directdnsonly.app.da import DirectAdminClient
from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain
@@ -76,7 +73,6 @@ class ReconciliationWorker:
def _run(self):
logger.info("Reconciliation worker starting — running initial check now")
self._reconcile_all()
# Wait for interval or stop signal; returns True when stopped
while not self._stop_event.wait(timeout=self.interval_seconds):
self._reconcile_all()
@@ -86,32 +82,35 @@ class ReconciliationWorker:
f"{len(self.servers)} server(s)"
)
total_queued = 0
# Build a map of all domains seen on all DA servers
all_da_domains = {} # domain -> hostname
# Build a map of all domains seen on all DA servers: domain -> hostname
all_da_domains: dict = {}
for server in self.servers:
hostname = server.get("hostname")
if not hostname:
logger.warning("[reconciler] Server config missing hostname — skipping")
continue
try:
da_domains = self._fetch_da_domains(
hostname,
server.get("port", 2222),
server.get("username"),
server.get("password"),
server.get("ssl", True),
ipp=self.ipp,
client = DirectAdminClient(
hostname=hostname,
port=server.get("port", 2222),
username=server.get("username"),
password=server.get("password"),
ssl=server.get("ssl", True),
verify_ssl=self.verify_ssl,
)
da_domains = client.list_domains(ipp=self.ipp)
if da_domains is not None:
for d in da_domains:
all_da_domains[d] = hostname
logger.debug(
f"[reconciler] {hostname}: {len(da_domains) if da_domains else 0} active domain(s) in DA"
f"[reconciler] {hostname}: "
f"{len(da_domains) if da_domains else 0} active domain(s) in DA"
)
except Exception as e:
logger.error(f"[reconciler] Unexpected error polling {hostname}: {e}")
except Exception as exc:
logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}")
# Now check local DB for all domains, update master if needed, and queue deletes only from recorded master
# Compare local DB against what DA reported; update masters and queue deletes
session = connect()
try:
all_local_domains = session.query(Domain).all()
@@ -137,7 +136,6 @@ class ReconciliationWorker:
record.hostname = actual_master
migrated += 1
else:
# Only act if the recorded master is one we're polling
if recorded_master in known_servers:
if self.dry_run:
logger.warning(
@@ -158,6 +156,7 @@ class ReconciliationWorker:
f"(master: {recorded_master})"
)
total_queued += 1
if migrated or backfilled:
session.commit()
if backfilled:
@@ -170,6 +169,7 @@ class ReconciliationWorker:
)
finally:
session.close()
if self.dry_run:
logger.info(
f"[reconciler] Reconciliation pass complete [DRY-RUN] — "
@@ -180,266 +180,3 @@ class ReconciliationWorker:
f"[reconciler] Reconciliation pass complete — "
f"{total_queued} domain(s) queued for deletion"
)
def _fetch_da_domains(
self,
hostname: str,
port: int,
username: str,
password: str,
use_ssl: bool,
ipp: int = 1000,
):
"""Fetch all domains from a DA server via CMD_DNS_ADMIN (JSON, paging supported).
Returns a set of domain strings on success, or None on any failure.
"""
scheme = "https" if use_ssl else "http"
page = 1
all_domains = set()
total_pages = 1
cookies = None
try:
while page <= total_pages:
url = f"{scheme}://{hostname}:{port}/CMD_DNS_ADMIN?json=yes&page={page}&ipp={ipp}"
req_kwargs = dict(
timeout=30,
verify=self.verify_ssl,
allow_redirects=False,
)
if cookies:
req_kwargs["cookies"] = cookies
else:
req_kwargs["auth"] = (username, password)
response = requests.get(url, **req_kwargs)
if response.is_redirect or response.status_code in (
301,
302,
303,
307,
308,
):
if not cookies:
logger.debug(
f"[reconciler] {hostname}:{port} redirected Basic Auth "
f"(HTTP {response.status_code}) — attempting session login (DA Evo)"
)
cookies = self._da_session_login(
scheme, hostname, port, username, password
)
if cookies is None:
return None
continue # retry this page with cookies
else:
logger.error(
f"[reconciler] {hostname}:{port} still redirecting after session login — "
f"check that '{username}' has admin-level access. Skipping."
)
return None
response.raise_for_status()
content_type = response.headers.get("Content-Type", "")
if "text/html" in content_type:
logger.error(
f"[reconciler] {hostname}:{port} returned HTML instead of API response — "
f"check credentials and admin-level access. Skipping."
)
return None
# Try JSON first
try:
data = response.json()
# Domains are in keys '0', '1', ...
for k, v in data.items():
if k.isdigit() and isinstance(v, dict) and "domain" in v:
all_domains.add(v["domain"].strip().lower())
# Paging info
info = data.get("info", {})
total_pages = int(info.get("total_pages", 1))
page += 1
continue
except Exception as e:
logger.error(
f"[reconciler] JSON decode failed for {hostname}:{port} page {page}: {e}\nRaw response: {response.text[:500]}"
)
# Fallback to legacy parser
domains = self._parse_da_domain_list(response.text)
all_domains.update(domains)
break # No paging in legacy mode
return all_domains
except requests.exceptions.SSLError as e:
logger.error(
f"[reconciler] SSL error connecting to {hostname}:{port}{e}. "
f"Set verify_ssl: false in reconciliation config if using self-signed certs."
)
return None
except requests.exceptions.ConnectionError as e:
logger.error(
f"[reconciler] Cannot reach {hostname}:{port}{e}. "
f"Skipping this server."
)
return None
except requests.exceptions.Timeout:
logger.error(
f"[reconciler] Timeout connecting to {hostname}:{port}. "
f"Skipping this server."
)
return None
except requests.exceptions.HTTPError as e:
logger.error(
f"[reconciler] HTTP {response.status_code} from {hostname}:{port}{e}. "
f"Skipping this server."
)
return None
except Exception as e:
logger.error(f"[reconciler] Unexpected error fetching from {hostname}: {e}")
return None
def _da_session_login(
self, scheme: str, hostname: str, port: int, username: str, password: str
):
"""POST to CMD_LOGIN to obtain a DA Evo session cookie.
Returns a RequestsCookieJar on success, or None on failure.
"""
login_url = f"{scheme}://{hostname}:{port}/CMD_LOGIN"
try:
response = requests.post(
login_url,
data={
"username": username,
"password": password,
"referer": "/CMD_DNS_ADMIN?json=yes&page=1&ipp=500",
},
timeout=30,
verify=self.verify_ssl,
allow_redirects=False,
)
if not response.cookies:
logger.error(
f"[reconciler] {hostname}:{port} CMD_LOGIN returned no session cookie — "
f"check username/password."
)
return None
logger.debug(
f"[reconciler] {hostname}:{port} session login successful (DA Evo)"
)
return response.cookies
except Exception as e:
logger.error(f"[reconciler] {hostname}:{port} session login failed: {e}")
return None
@staticmethod
def _parse_da_domain_list(body: str) -> set:
"""Parse DA's CMD_API_SHOW_ALL_DOMAINS response.
DA returns URL-encoded key=value pairs, either on one line or newline-
separated. The domain list uses the key 'list[]'.
Example response:
list[]=example.com&list[]=example2.com
"""
# Normalise newline-separated responses to a single query string
normalised = body.replace("\n", "&").strip("&")
params = parse_qs(normalised)
domains = params.get("list[]", [])
return {d.strip().lower() for d in domains if d.strip()}
if __name__ == "__main__":
import argparse
import sys
from queue import Queue
parser = argparse.ArgumentParser(
description="Test DirectAdmin domain fetcher (JSON/paging)"
)
parser.add_argument("--hostname", required=True, help="DirectAdmin server hostname")
parser.add_argument(
"--port", type=int, default=2222, help="DirectAdmin port (default: 2222)"
)
parser.add_argument("--username", required=True, help="DirectAdmin admin username")
parser.add_argument("--password", required=True, help="DirectAdmin admin password")
parser.add_argument("--ssl", action="store_true", help="Use HTTPS (default: True)")
parser.add_argument(
"--no-ssl", dest="ssl", action="store_false", help="Use HTTP (not recommended)"
)
parser.set_defaults(ssl=True)
parser.add_argument(
"--verify-ssl", action="store_true", help="Verify SSL certs (default: True)"
)
parser.add_argument(
"--no-verify-ssl",
dest="verify_ssl",
action="store_false",
help="Don't verify SSL certs",
)
parser.set_defaults(verify_ssl=True)
parser.add_argument(
"--ipp", type=int, default=1000, help="Items per page (default: 1000)"
)
parser.add_argument(
"--print-json",
action="store_true",
help="Print raw JSON response for first page",
)
args = parser.parse_args()
# Minimal config for testing
config = {
"enabled": True,
"directadmin_servers": [
{
"hostname": args.hostname,
"port": args.port,
"username": args.username,
"password": args.password,
"ssl": args.ssl,
}
],
"verify_ssl": args.verify_ssl,
}
q = Queue()
worker = ReconciliationWorker(q, config)
server = config["directadmin_servers"][0]
print(
f"Fetching domains from {server['hostname']}:{server['port']} (ipp={args.ipp})..."
)
# Directly call the fetch method for testing
domains = worker._fetch_da_domains(
server["hostname"],
server.get("port", 2222),
server.get("username"),
server.get("password"),
server.get("ssl", True),
ipp=args.ipp,
)
if domains is None:
print("Failed to fetch domains.", file=sys.stderr)
sys.exit(1)
print(f"Fetched {len(domains)} domains:")
for d in sorted(domains):
print(d)
if args.print_json:
# Print the first page's raw JSON for inspection
scheme = "https" if server.get("ssl", True) else "http"
url = f"{scheme}://{server['hostname']}:{server.get('port', 2222)}/CMD_DNS_ADMIN?json=yes&page=1&ipp={args.ipp}"
resp = requests.get(
url,
auth=(server.get("username"), server.get("password")),
timeout=30,
verify=args.verify_ssl,
allow_redirects=False,
)
try:
print("\nRaw JSON for first page:")
print(resp.json())
except Exception:
print("(Could not parse JSON)")

View File

@@ -1,6 +1,6 @@
[project]
name = "directdnsonly"
version = "2.1.0"
version = "2.2.0"
description = "DNS Management System - DirectAdmin to multiple backends"
authors = [
{name = "Aaron Guise",email = "aaron@guise.net.nz"}

192
tests/test_da_client.py Normal file
View File

@@ -0,0 +1,192 @@
"""Tests for directdnsonly.app.da.client — DirectAdminClient."""
import requests.exceptions
from unittest.mock import MagicMock, patch
from directdnsonly.app.da import DirectAdminClient
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_json_response(domains_list, total_pages=1):
data = {str(i): {"domain": d} for i, d in enumerate(domains_list)}
data["info"] = {"total_pages": total_pages}
mock = MagicMock()
mock.status_code = 200
mock.is_redirect = False
mock.headers = {"Content-Type": "application/json"}
mock.json.return_value = data
mock.raise_for_status = MagicMock()
return mock
def _client():
return DirectAdminClient("da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True)
# ---------------------------------------------------------------------------
# list_domains — JSON happy path
# ---------------------------------------------------------------------------
def test_list_domains_returns_set_from_json():
mock_resp = _make_json_response(["example.com", "test.com"])
with patch("requests.get", return_value=mock_resp):
result = _client().list_domains()
assert result == {"example.com", "test.com"}
def test_list_domains_paginates():
page1 = _make_json_response(["a.com"], total_pages=2)
page2 = _make_json_response(["b.com"], total_pages=2)
with patch("requests.get", side_effect=[page1, page2]):
result = _client().list_domains()
assert result == {"a.com", "b.com"}
# ---------------------------------------------------------------------------
# list_domains — DA Evo session login fallback
# ---------------------------------------------------------------------------
def test_redirect_triggers_session_login():
redirect_resp = MagicMock()
redirect_resp.status_code = 302
redirect_resp.is_redirect = True
client = _client()
with (
patch("requests.get", return_value=redirect_resp),
patch.object(client, "_login", return_value=False),
):
result = client.list_domains()
assert result is None
def test_persistent_redirect_after_login_returns_none():
redirect_resp = MagicMock()
redirect_resp.status_code = 302
redirect_resp.is_redirect = True
client = _client()
# Simulate cookies already set (login succeeded previously)
client._cookies = {"session": "abc"}
with patch("requests.get", return_value=redirect_resp):
result = client.list_domains()
assert result is None
# ---------------------------------------------------------------------------
# list_domains — error cases
# ---------------------------------------------------------------------------
def test_html_response_returns_none():
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.is_redirect = False
mock_resp.headers = {"Content-Type": "text/html; charset=utf-8"}
mock_resp.raise_for_status = MagicMock()
with patch("requests.get", return_value=mock_resp):
result = _client().list_domains()
assert result is None
def test_connection_error_returns_none():
with patch("requests.get", side_effect=requests.exceptions.ConnectionError("refused")):
result = _client().list_domains()
assert result is None
def test_timeout_returns_none():
with patch("requests.get", side_effect=requests.exceptions.Timeout()):
result = _client().list_domains()
assert result is None
def test_ssl_error_returns_none():
with patch("requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")):
result = _client().list_domains()
assert result is None
# ---------------------------------------------------------------------------
# _parse_legacy_domain_list
# ---------------------------------------------------------------------------
def test_parse_standard_querystring():
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com&list[]=test.com")
assert result == {"example.com", "test.com"}
def test_parse_newline_separated():
result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com\nlist[]=test.com")
assert result == {"example.com", "test.com"}
def test_parse_empty_body_returns_empty_set():
assert DirectAdminClient._parse_legacy_domain_list("") == set()
def test_parse_normalises_to_lowercase():
result = DirectAdminClient._parse_legacy_domain_list("list[]=EXAMPLE.COM")
assert "example.com" in result
assert "EXAMPLE.COM" not in result
def test_parse_strips_whitespace():
result = DirectAdminClient._parse_legacy_domain_list("list[]= example.com ")
assert "example.com" in result
# ---------------------------------------------------------------------------
# _login
# ---------------------------------------------------------------------------
def test_login_stores_cookies_on_success():
mock_resp = MagicMock()
mock_resp.cookies = {"session": "tok123"}
client = _client()
with patch("requests.post", return_value=mock_resp):
result = client._login()
assert result is True
assert client._cookies == {"session": "tok123"}
def test_login_returns_false_when_no_cookies():
mock_resp = MagicMock()
mock_resp.cookies = {}
client = _client()
with patch("requests.post", return_value=mock_resp):
result = client._login()
assert result is False
assert client._cookies is None
def test_login_returns_false_on_exception():
client = _client()
with patch("requests.post", side_effect=requests.exceptions.ConnectionError()):
result = client._login()
assert result is False

View File

@@ -1,9 +1,8 @@
"""Tests for directdnsonly.app.reconciler — ReconciliationWorker."""
import pytest
import requests.exceptions
from queue import Queue
from unittest.mock import MagicMock, patch
from unittest.mock import patch
from directdnsonly.app.reconciler import ReconciliationWorker
from directdnsonly.app.db.models import Domain
@@ -47,6 +46,18 @@ def dry_run_worker(delete_queue):
return ReconciliationWorker(delete_queue, cfg)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
DA_CLIENT_PATH = "directdnsonly.app.reconciler.DirectAdminClient"
def _patch_da(return_value):
"""Patch DirectAdminClient so list_domains returns a fixed value."""
return patch(DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value})
# ---------------------------------------------------------------------------
# _reconcile_all — orphan detection
# ---------------------------------------------------------------------------
@@ -58,7 +69,7 @@ def test_orphan_queued_when_domain_missing_from_da(worker, delete_queue, patch_c
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value=set()):
with _patch_da(set()):
worker._reconcile_all()
assert not delete_queue.empty()
@@ -73,7 +84,7 @@ def test_orphan_not_queued_in_dry_run(dry_run_worker, delete_queue, patch_connec
)
patch_connect.commit()
with patch.object(dry_run_worker, "_fetch_da_domains", return_value=set()):
with _patch_da(set()):
dry_run_worker._reconcile_all()
assert delete_queue.empty()
@@ -86,7 +97,7 @@ def test_orphan_not_queued_for_unknown_server(worker, delete_queue, patch_connec
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value=set()):
with _patch_da(set()):
worker._reconcile_all()
assert delete_queue.empty()
@@ -98,7 +109,7 @@ def test_active_domain_not_queued(worker, delete_queue, patch_connect):
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value={"good.com"}):
with _patch_da({"good.com"}):
worker._reconcile_all()
assert delete_queue.empty()
@@ -113,7 +124,7 @@ def test_backfill_null_hostname(worker, patch_connect):
patch_connect.add(Domain(domain="backfill.com", hostname=None, username="admin"))
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value={"backfill.com"}):
with _patch_da({"backfill.com"}):
worker._reconcile_all()
record = patch_connect.query(Domain).filter_by(domain="backfill.com").first()
@@ -126,7 +137,7 @@ def test_migration_updates_hostname(worker, patch_connect):
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value={"moved.com"}):
with _patch_da({"moved.com"}):
worker._reconcile_all()
record = patch_connect.query(Domain).filter_by(domain="moved.com").first()
@@ -138,148 +149,13 @@ def test_dry_run_still_backfills(dry_run_worker, patch_connect):
patch_connect.add(Domain(domain="fill.com", hostname=None, username="admin"))
patch_connect.commit()
with patch.object(dry_run_worker, "_fetch_da_domains", return_value={"fill.com"}):
with _patch_da({"fill.com"}):
dry_run_worker._reconcile_all()
record = patch_connect.query(Domain).filter_by(domain="fill.com").first()
assert record.hostname == "da1.example.com"
# ---------------------------------------------------------------------------
# _fetch_da_domains — HTTP handling
# ---------------------------------------------------------------------------
def _make_json_response(domains_dict, total_pages=1):
"""Return a mock requests.Response with JSON payload matching DA format."""
data = {str(i): {"domain": d} for i, d in enumerate(domains_dict)}
data["info"] = {"total_pages": total_pages}
mock = MagicMock()
mock.status_code = 200
mock.is_redirect = False
mock.headers = {"Content-Type": "application/json"}
mock.json.return_value = data
mock.raise_for_status = MagicMock()
return mock
def test_fetch_returns_domains_from_json(worker):
mock_resp = _make_json_response(["example.com", "test.com"])
with patch("requests.get", return_value=mock_resp):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result == {"example.com", "test.com"}
def test_fetch_paginates(worker):
page1 = _make_json_response(["a.com"], total_pages=2)
page2 = _make_json_response(["b.com"], total_pages=2)
with patch("requests.get", side_effect=[page1, page2]):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result == {"a.com", "b.com"}
def test_fetch_redirect_triggers_session_login(worker):
redirect_resp = MagicMock()
redirect_resp.status_code = 302
redirect_resp.is_redirect = True
with (
patch("requests.get", return_value=redirect_resp),
patch.object(worker, "_da_session_login", return_value=None),
):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
def test_fetch_html_response_returns_none(worker):
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.is_redirect = False
mock_resp.headers = {"Content-Type": "text/html; charset=utf-8"}
mock_resp.raise_for_status = MagicMock()
with patch("requests.get", return_value=mock_resp):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
def test_fetch_connection_error_returns_none(worker):
with patch(
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
def test_fetch_timeout_returns_none(worker):
with patch("requests.get", side_effect=requests.exceptions.Timeout()):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
def test_fetch_ssl_error_returns_none(worker):
with patch(
"requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")
):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
# ---------------------------------------------------------------------------
# _parse_da_domain_list — legacy format fallback
# ---------------------------------------------------------------------------
def test_parse_standard_querystring():
body = "list[]=example.com&list[]=test.com"
result = ReconciliationWorker._parse_da_domain_list(body)
assert result == {"example.com", "test.com"}
def test_parse_newline_separated():
body = "list[]=example.com\nlist[]=test.com"
result = ReconciliationWorker._parse_da_domain_list(body)
assert result == {"example.com", "test.com"}
def test_parse_empty_body_returns_empty_set():
assert ReconciliationWorker._parse_da_domain_list("") == set()
def test_parse_normalises_to_lowercase():
result = ReconciliationWorker._parse_da_domain_list("list[]=EXAMPLE.COM")
assert "example.com" in result
assert "EXAMPLE.COM" not in result
def test_parse_strips_whitespace():
result = ReconciliationWorker._parse_da_domain_list("list[]= example.com ")
assert "example.com" in result
# ---------------------------------------------------------------------------
# Worker lifecycle
# ---------------------------------------------------------------------------