From e0a119558d2d4f5e8cdbfcf1714dbeb77b667f3f Mon Sep 17 00:00:00 2001 From: Aaron Guise Date: Thu, 19 Feb 2026 12:16:22 +1300 Subject: [PATCH] =?UTF-8?q?refactor:=20extract=20DirectAdminClient=20into?= =?UTF-8?q?=20directdnsonly.app.da=20module=20=F0=9F=8F=97=EF=B8=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move all outbound DirectAdmin HTTP logic out of ReconciliationWorker and into a dedicated, independently testable DirectAdminClient class: - directdnsonly/app/da/client.py: list_domains (paginated JSON + legacy fallback), get (authenticated GET to any CMD_* endpoint), _login (DA Evo session-cookie fallback), _parse_legacy_domain_list - directdnsonly/app/da/__init__.py: public re-export of DirectAdminClient - reconciler.py: now purely reconciliation logic; instantiates a client per configured server — no HTTP code remaining - tests/test_da_client.py: 16 dedicated tests for DirectAdminClient - tests/test_reconciler.py: mocks at the DirectAdminClient class boundary instead of the internal _fetch_da_domains method Bumped to 2.2.0 — DirectAdminClient is now a first-class public API. --- directdnsonly/app/da/__init__.py | 3 + directdnsonly/app/da/client.py | 204 +++++++++++++++++++++ directdnsonly/app/reconciler.py | 301 ++----------------------------- pyproject.toml | 2 +- tests/test_da_client.py | 192 ++++++++++++++++++++ tests/test_reconciler.py | 164 ++--------------- 6 files changed, 439 insertions(+), 427 deletions(-) create mode 100644 directdnsonly/app/da/__init__.py create mode 100644 directdnsonly/app/da/client.py create mode 100644 tests/test_da_client.py diff --git a/directdnsonly/app/da/__init__.py b/directdnsonly/app/da/__init__.py new file mode 100644 index 0000000..717efcc --- /dev/null +++ b/directdnsonly/app/da/__init__.py @@ -0,0 +1,3 @@ +from .client import DirectAdminClient + +__all__ = ["DirectAdminClient"] diff --git a/directdnsonly/app/da/client.py b/directdnsonly/app/da/client.py new file mode 100644 index 0000000..93a9fc4 --- /dev/null +++ b/directdnsonly/app/da/client.py @@ -0,0 +1,204 @@ +"""DirectAdmin HTTP client. + +Encapsulates all outbound communication with a single DirectAdmin server: +authenticated requests, the Basic-Auth → session-cookie fallback for DA Evo, +paginated domain listing, and the legacy URL-encoded response parser. +""" + +from __future__ import annotations + +from urllib.parse import parse_qs +from typing import Optional + +import requests +import requests.exceptions +from loguru import logger + + +class DirectAdminClient: + """HTTP client for a single DirectAdmin server. + + Handles two authentication modes transparently: + - Basic Auth (classic DA / API-only access) + - Session cookie via CMD_LOGIN (DA Evolution — redirects Basic Auth) + + Usage:: + + client = DirectAdminClient("da1.example.com", 2222, "admin", "secret") + domains = client.list_domains() # set[str] or None on failure + response = client.get("CMD_API_SHOW_ALL_USERS") + """ + + def __init__( + self, + hostname: str, + port: int, + username: str, + password: str, + ssl: bool = True, + verify_ssl: bool = True, + ) -> None: + self.hostname = hostname + self.port = port + self.username = username + self.password = password + self.scheme = "https" if ssl else "http" + self.verify_ssl = verify_ssl + self._cookies = None # populated on first successful session login + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def list_domains(self, ipp: int = 1000) -> Optional[set]: + """Return all domains on this DA server via CMD_DNS_ADMIN (JSON, paginated). + + Falls back to the legacy URL-encoded parser if JSON decode fails. + Returns a set of lowercase domain strings, or ``None`` if the server + is unreachable or returns an error. + """ + page = 1 + all_domains: set = set() + total_pages = 1 + + try: + while page <= total_pages: + response = self.get( + "CMD_DNS_ADMIN", + params={"json": "yes", "page": page, "ipp": ipp}, + ) + if response is None: + return None + + if response.is_redirect or response.status_code in (301, 302, 303, 307, 308): + if self._cookies: + logger.error( + f"[da:{self.hostname}] Still redirecting after session login — " + f"check that '{self.username}' has admin-level access. Skipping." + ) + return None + logger.debug( + f"[da:{self.hostname}] Basic Auth redirected " + f"(HTTP {response.status_code}) — attempting session login (DA Evo)" + ) + if not self._login(): + return None + continue # retry this page with cookies + + response.raise_for_status() + + content_type = response.headers.get("Content-Type", "") + if "text/html" in content_type: + logger.error( + f"[da:{self.hostname}] Returned HTML instead of API response — " + f"check credentials and admin-level access. Skipping." + ) + return None + + try: + data = response.json() + for k, v in data.items(): + if k.isdigit() and isinstance(v, dict) and "domain" in v: + all_domains.add(v["domain"].strip().lower()) + total_pages = int(data.get("info", {}).get("total_pages", 1)) + page += 1 + except Exception as exc: + logger.error( + f"[da:{self.hostname}] JSON decode failed on page {page}: {exc}\n" + f"Raw response: {response.text[:500]}" + ) + all_domains.update(self._parse_legacy_domain_list(response.text)) + break # no paging in legacy mode + + return all_domains + + except requests.exceptions.SSLError as exc: + logger.error( + f"[da:{self.hostname}] SSL error — {exc}. " + f"Set verify_ssl: false in reconciliation config if using self-signed certs." + ) + except requests.exceptions.ConnectionError as exc: + logger.error(f"[da:{self.hostname}] Cannot reach server — {exc}. Skipping.") + except requests.exceptions.Timeout: + logger.error(f"[da:{self.hostname}] Connection timed out. Skipping.") + except requests.exceptions.HTTPError as exc: + logger.error(f"[da:{self.hostname}] HTTP error — {exc}. Skipping.") + except Exception as exc: + logger.error(f"[da:{self.hostname}] Unexpected error: {exc}") + + return None + + def get( + self, command: str, params: Optional[dict] = None + ) -> Optional[requests.Response]: + """Authenticated GET to any DA CMD_* endpoint. + + Uses session cookies when available (after a successful ``_login``), + otherwise falls back to HTTP Basic Auth. Does **not** follow redirects + so callers can detect the Basic-Auth → cookie upgrade. + """ + url = f"{self.scheme}://{self.hostname}:{self.port}/{command}" + kwargs: dict = dict( + params=params or {}, + timeout=30, + verify=self.verify_ssl, + allow_redirects=False, + ) + if self._cookies: + kwargs["cookies"] = self._cookies + else: + kwargs["auth"] = (self.username, self.password) + + try: + return requests.get(url, **kwargs) + except Exception as exc: + logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}") + return None + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _login(self) -> bool: + """POST CMD_LOGIN to obtain a DA Evo session cookie. + + Populates ``self._cookies`` on success and returns ``True``. + Returns ``False`` on any failure. + """ + login_url = f"{self.scheme}://{self.hostname}:{self.port}/CMD_LOGIN" + try: + response = requests.post( + login_url, + data={ + "username": self.username, + "password": self.password, + "referer": "/CMD_DNS_ADMIN?json=yes&page=1&ipp=500", + }, + timeout=30, + verify=self.verify_ssl, + allow_redirects=False, + ) + if not response.cookies: + logger.error( + f"[da:{self.hostname}] CMD_LOGIN returned no session cookie — " + f"check username/password." + ) + return False + self._cookies = response.cookies + logger.debug(f"[da:{self.hostname}] Session login successful (DA Evo)") + return True + except Exception as exc: + logger.error(f"[da:{self.hostname}] Session login failed: {exc}") + return False + + @staticmethod + def _parse_legacy_domain_list(body: str) -> set: + """Parse DA's legacy CMD_API_SHOW_ALL_DOMAINS URL-encoded response. + + DA returns ``list[]=example.com&list[]=example2.com``, optionally + newline-separated instead of ampersand-separated. + """ + normalised = body.replace("\n", "&").strip("&") + params = parse_qs(normalised) + domains = params.get("list[]", []) + return {d.strip().lower() for d in domains if d.strip()} diff --git a/directdnsonly/app/reconciler.py b/directdnsonly/app/reconciler.py index 6713939..0a58caa 100755 --- a/directdnsonly/app/reconciler.py +++ b/directdnsonly/app/reconciler.py @@ -1,11 +1,8 @@ #!/usr/bin/env python3 import threading -from urllib.parse import parse_qs from loguru import logger -import requests -import requests.exceptions - +from directdnsonly.app.da import DirectAdminClient from directdnsonly.app.db import connect from directdnsonly.app.db.models import Domain @@ -76,7 +73,6 @@ class ReconciliationWorker: def _run(self): logger.info("Reconciliation worker starting — running initial check now") self._reconcile_all() - # Wait for interval or stop signal; returns True when stopped while not self._stop_event.wait(timeout=self.interval_seconds): self._reconcile_all() @@ -86,32 +82,35 @@ class ReconciliationWorker: f"{len(self.servers)} server(s)" ) total_queued = 0 - # Build a map of all domains seen on all DA servers - all_da_domains = {} # domain -> hostname + + # Build a map of all domains seen on all DA servers: domain -> hostname + all_da_domains: dict = {} for server in self.servers: hostname = server.get("hostname") if not hostname: logger.warning("[reconciler] Server config missing hostname — skipping") continue try: - da_domains = self._fetch_da_domains( - hostname, - server.get("port", 2222), - server.get("username"), - server.get("password"), - server.get("ssl", True), - ipp=self.ipp, + client = DirectAdminClient( + hostname=hostname, + port=server.get("port", 2222), + username=server.get("username"), + password=server.get("password"), + ssl=server.get("ssl", True), + verify_ssl=self.verify_ssl, ) + da_domains = client.list_domains(ipp=self.ipp) if da_domains is not None: for d in da_domains: all_da_domains[d] = hostname logger.debug( - f"[reconciler] {hostname}: {len(da_domains) if da_domains else 0} active domain(s) in DA" + f"[reconciler] {hostname}: " + f"{len(da_domains) if da_domains else 0} active domain(s) in DA" ) - except Exception as e: - logger.error(f"[reconciler] Unexpected error polling {hostname}: {e}") + except Exception as exc: + logger.error(f"[reconciler] Unexpected error polling {hostname}: {exc}") - # Now check local DB for all domains, update master if needed, and queue deletes only from recorded master + # Compare local DB against what DA reported; update masters and queue deletes session = connect() try: all_local_domains = session.query(Domain).all() @@ -137,7 +136,6 @@ class ReconciliationWorker: record.hostname = actual_master migrated += 1 else: - # Only act if the recorded master is one we're polling if recorded_master in known_servers: if self.dry_run: logger.warning( @@ -158,6 +156,7 @@ class ReconciliationWorker: f"(master: {recorded_master})" ) total_queued += 1 + if migrated or backfilled: session.commit() if backfilled: @@ -170,6 +169,7 @@ class ReconciliationWorker: ) finally: session.close() + if self.dry_run: logger.info( f"[reconciler] Reconciliation pass complete [DRY-RUN] — " @@ -180,266 +180,3 @@ class ReconciliationWorker: f"[reconciler] Reconciliation pass complete — " f"{total_queued} domain(s) queued for deletion" ) - - def _fetch_da_domains( - self, - hostname: str, - port: int, - username: str, - password: str, - use_ssl: bool, - ipp: int = 1000, - ): - """Fetch all domains from a DA server via CMD_DNS_ADMIN (JSON, paging supported). - - Returns a set of domain strings on success, or None on any failure. - """ - scheme = "https" if use_ssl else "http" - page = 1 - all_domains = set() - total_pages = 1 - cookies = None - - try: - while page <= total_pages: - url = f"{scheme}://{hostname}:{port}/CMD_DNS_ADMIN?json=yes&page={page}&ipp={ipp}" - req_kwargs = dict( - timeout=30, - verify=self.verify_ssl, - allow_redirects=False, - ) - if cookies: - req_kwargs["cookies"] = cookies - else: - req_kwargs["auth"] = (username, password) - - response = requests.get(url, **req_kwargs) - - if response.is_redirect or response.status_code in ( - 301, - 302, - 303, - 307, - 308, - ): - if not cookies: - logger.debug( - f"[reconciler] {hostname}:{port} redirected Basic Auth " - f"(HTTP {response.status_code}) — attempting session login (DA Evo)" - ) - cookies = self._da_session_login( - scheme, hostname, port, username, password - ) - if cookies is None: - return None - continue # retry this page with cookies - else: - logger.error( - f"[reconciler] {hostname}:{port} still redirecting after session login — " - f"check that '{username}' has admin-level access. Skipping." - ) - return None - - response.raise_for_status() - content_type = response.headers.get("Content-Type", "") - if "text/html" in content_type: - logger.error( - f"[reconciler] {hostname}:{port} returned HTML instead of API response — " - f"check credentials and admin-level access. Skipping." - ) - return None - - # Try JSON first - try: - data = response.json() - # Domains are in keys '0', '1', ... - for k, v in data.items(): - if k.isdigit() and isinstance(v, dict) and "domain" in v: - all_domains.add(v["domain"].strip().lower()) - # Paging info - info = data.get("info", {}) - total_pages = int(info.get("total_pages", 1)) - page += 1 - continue - except Exception as e: - logger.error( - f"[reconciler] JSON decode failed for {hostname}:{port} page {page}: {e}\nRaw response: {response.text[:500]}" - ) - # Fallback to legacy parser - domains = self._parse_da_domain_list(response.text) - all_domains.update(domains) - break # No paging in legacy mode - - return all_domains - - except requests.exceptions.SSLError as e: - logger.error( - f"[reconciler] SSL error connecting to {hostname}:{port} — {e}. " - f"Set verify_ssl: false in reconciliation config if using self-signed certs." - ) - return None - except requests.exceptions.ConnectionError as e: - logger.error( - f"[reconciler] Cannot reach {hostname}:{port} — {e}. " - f"Skipping this server." - ) - return None - except requests.exceptions.Timeout: - logger.error( - f"[reconciler] Timeout connecting to {hostname}:{port}. " - f"Skipping this server." - ) - return None - except requests.exceptions.HTTPError as e: - logger.error( - f"[reconciler] HTTP {response.status_code} from {hostname}:{port} — {e}. " - f"Skipping this server." - ) - return None - except Exception as e: - logger.error(f"[reconciler] Unexpected error fetching from {hostname}: {e}") - return None - - def _da_session_login( - self, scheme: str, hostname: str, port: int, username: str, password: str - ): - """POST to CMD_LOGIN to obtain a DA Evo session cookie. - - Returns a RequestsCookieJar on success, or None on failure. - """ - login_url = f"{scheme}://{hostname}:{port}/CMD_LOGIN" - try: - response = requests.post( - login_url, - data={ - "username": username, - "password": password, - "referer": "/CMD_DNS_ADMIN?json=yes&page=1&ipp=500", - }, - timeout=30, - verify=self.verify_ssl, - allow_redirects=False, - ) - if not response.cookies: - logger.error( - f"[reconciler] {hostname}:{port} CMD_LOGIN returned no session cookie — " - f"check username/password." - ) - return None - logger.debug( - f"[reconciler] {hostname}:{port} session login successful (DA Evo)" - ) - return response.cookies - except Exception as e: - logger.error(f"[reconciler] {hostname}:{port} session login failed: {e}") - return None - - @staticmethod - def _parse_da_domain_list(body: str) -> set: - """Parse DA's CMD_API_SHOW_ALL_DOMAINS response. - - DA returns URL-encoded key=value pairs, either on one line or newline- - separated. The domain list uses the key 'list[]'. - - Example response: - list[]=example.com&list[]=example2.com - """ - # Normalise newline-separated responses to a single query string - normalised = body.replace("\n", "&").strip("&") - params = parse_qs(normalised) - domains = params.get("list[]", []) - return {d.strip().lower() for d in domains if d.strip()} - - -if __name__ == "__main__": - import argparse - import sys - from queue import Queue - - parser = argparse.ArgumentParser( - description="Test DirectAdmin domain fetcher (JSON/paging)" - ) - parser.add_argument("--hostname", required=True, help="DirectAdmin server hostname") - parser.add_argument( - "--port", type=int, default=2222, help="DirectAdmin port (default: 2222)" - ) - parser.add_argument("--username", required=True, help="DirectAdmin admin username") - parser.add_argument("--password", required=True, help="DirectAdmin admin password") - parser.add_argument("--ssl", action="store_true", help="Use HTTPS (default: True)") - parser.add_argument( - "--no-ssl", dest="ssl", action="store_false", help="Use HTTP (not recommended)" - ) - parser.set_defaults(ssl=True) - parser.add_argument( - "--verify-ssl", action="store_true", help="Verify SSL certs (default: True)" - ) - parser.add_argument( - "--no-verify-ssl", - dest="verify_ssl", - action="store_false", - help="Don't verify SSL certs", - ) - parser.set_defaults(verify_ssl=True) - parser.add_argument( - "--ipp", type=int, default=1000, help="Items per page (default: 1000)" - ) - parser.add_argument( - "--print-json", - action="store_true", - help="Print raw JSON response for first page", - ) - - args = parser.parse_args() - - # Minimal config for testing - config = { - "enabled": True, - "directadmin_servers": [ - { - "hostname": args.hostname, - "port": args.port, - "username": args.username, - "password": args.password, - "ssl": args.ssl, - } - ], - "verify_ssl": args.verify_ssl, - } - q = Queue() - worker = ReconciliationWorker(q, config) - server = config["directadmin_servers"][0] - print( - f"Fetching domains from {server['hostname']}:{server['port']} (ipp={args.ipp})..." - ) - # Directly call the fetch method for testing - domains = worker._fetch_da_domains( - server["hostname"], - server.get("port", 2222), - server.get("username"), - server.get("password"), - server.get("ssl", True), - ipp=args.ipp, - ) - if domains is None: - print("Failed to fetch domains.", file=sys.stderr) - sys.exit(1) - print(f"Fetched {len(domains)} domains:") - for d in sorted(domains): - print(d) - - if args.print_json: - # Print the first page's raw JSON for inspection - scheme = "https" if server.get("ssl", True) else "http" - url = f"{scheme}://{server['hostname']}:{server.get('port', 2222)}/CMD_DNS_ADMIN?json=yes&page=1&ipp={args.ipp}" - resp = requests.get( - url, - auth=(server.get("username"), server.get("password")), - timeout=30, - verify=args.verify_ssl, - allow_redirects=False, - ) - try: - print("\nRaw JSON for first page:") - print(resp.json()) - except Exception: - print("(Could not parse JSON)") diff --git a/pyproject.toml b/pyproject.toml index a1dbafa..bd5c3dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "directdnsonly" -version = "2.1.0" +version = "2.2.0" description = "DNS Management System - DirectAdmin to multiple backends" authors = [ {name = "Aaron Guise",email = "aaron@guise.net.nz"} diff --git a/tests/test_da_client.py b/tests/test_da_client.py new file mode 100644 index 0000000..8730c39 --- /dev/null +++ b/tests/test_da_client.py @@ -0,0 +1,192 @@ +"""Tests for directdnsonly.app.da.client — DirectAdminClient.""" + +import requests.exceptions +from unittest.mock import MagicMock, patch + +from directdnsonly.app.da import DirectAdminClient + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_json_response(domains_list, total_pages=1): + data = {str(i): {"domain": d} for i, d in enumerate(domains_list)} + data["info"] = {"total_pages": total_pages} + mock = MagicMock() + mock.status_code = 200 + mock.is_redirect = False + mock.headers = {"Content-Type": "application/json"} + mock.json.return_value = data + mock.raise_for_status = MagicMock() + return mock + + +def _client(): + return DirectAdminClient("da1.example.com", 2222, "admin", "secret", ssl=True, verify_ssl=True) + + +# --------------------------------------------------------------------------- +# list_domains — JSON happy path +# --------------------------------------------------------------------------- + + +def test_list_domains_returns_set_from_json(): + mock_resp = _make_json_response(["example.com", "test.com"]) + + with patch("requests.get", return_value=mock_resp): + result = _client().list_domains() + + assert result == {"example.com", "test.com"} + + +def test_list_domains_paginates(): + page1 = _make_json_response(["a.com"], total_pages=2) + page2 = _make_json_response(["b.com"], total_pages=2) + + with patch("requests.get", side_effect=[page1, page2]): + result = _client().list_domains() + + assert result == {"a.com", "b.com"} + + +# --------------------------------------------------------------------------- +# list_domains — DA Evo session login fallback +# --------------------------------------------------------------------------- + + +def test_redirect_triggers_session_login(): + redirect_resp = MagicMock() + redirect_resp.status_code = 302 + redirect_resp.is_redirect = True + + client = _client() + with ( + patch("requests.get", return_value=redirect_resp), + patch.object(client, "_login", return_value=False), + ): + result = client.list_domains() + + assert result is None + + +def test_persistent_redirect_after_login_returns_none(): + redirect_resp = MagicMock() + redirect_resp.status_code = 302 + redirect_resp.is_redirect = True + + client = _client() + # Simulate cookies already set (login succeeded previously) + client._cookies = {"session": "abc"} + + with patch("requests.get", return_value=redirect_resp): + result = client.list_domains() + + assert result is None + + +# --------------------------------------------------------------------------- +# list_domains — error cases +# --------------------------------------------------------------------------- + + +def test_html_response_returns_none(): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.is_redirect = False + mock_resp.headers = {"Content-Type": "text/html; charset=utf-8"} + mock_resp.raise_for_status = MagicMock() + + with patch("requests.get", return_value=mock_resp): + result = _client().list_domains() + + assert result is None + + +def test_connection_error_returns_none(): + with patch("requests.get", side_effect=requests.exceptions.ConnectionError("refused")): + result = _client().list_domains() + + assert result is None + + +def test_timeout_returns_none(): + with patch("requests.get", side_effect=requests.exceptions.Timeout()): + result = _client().list_domains() + + assert result is None + + +def test_ssl_error_returns_none(): + with patch("requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")): + result = _client().list_domains() + + assert result is None + + +# --------------------------------------------------------------------------- +# _parse_legacy_domain_list +# --------------------------------------------------------------------------- + + +def test_parse_standard_querystring(): + result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com&list[]=test.com") + assert result == {"example.com", "test.com"} + + +def test_parse_newline_separated(): + result = DirectAdminClient._parse_legacy_domain_list("list[]=example.com\nlist[]=test.com") + assert result == {"example.com", "test.com"} + + +def test_parse_empty_body_returns_empty_set(): + assert DirectAdminClient._parse_legacy_domain_list("") == set() + + +def test_parse_normalises_to_lowercase(): + result = DirectAdminClient._parse_legacy_domain_list("list[]=EXAMPLE.COM") + assert "example.com" in result + assert "EXAMPLE.COM" not in result + + +def test_parse_strips_whitespace(): + result = DirectAdminClient._parse_legacy_domain_list("list[]= example.com ") + assert "example.com" in result + + +# --------------------------------------------------------------------------- +# _login +# --------------------------------------------------------------------------- + + +def test_login_stores_cookies_on_success(): + mock_resp = MagicMock() + mock_resp.cookies = {"session": "tok123"} + + client = _client() + with patch("requests.post", return_value=mock_resp): + result = client._login() + + assert result is True + assert client._cookies == {"session": "tok123"} + + +def test_login_returns_false_when_no_cookies(): + mock_resp = MagicMock() + mock_resp.cookies = {} + + client = _client() + with patch("requests.post", return_value=mock_resp): + result = client._login() + + assert result is False + assert client._cookies is None + + +def test_login_returns_false_on_exception(): + client = _client() + with patch("requests.post", side_effect=requests.exceptions.ConnectionError()): + result = client._login() + + assert result is False diff --git a/tests/test_reconciler.py b/tests/test_reconciler.py index daf1b2b..501dcc4 100644 --- a/tests/test_reconciler.py +++ b/tests/test_reconciler.py @@ -1,9 +1,8 @@ """Tests for directdnsonly.app.reconciler — ReconciliationWorker.""" import pytest -import requests.exceptions from queue import Queue -from unittest.mock import MagicMock, patch +from unittest.mock import patch from directdnsonly.app.reconciler import ReconciliationWorker from directdnsonly.app.db.models import Domain @@ -47,6 +46,18 @@ def dry_run_worker(delete_queue): return ReconciliationWorker(delete_queue, cfg) +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +DA_CLIENT_PATH = "directdnsonly.app.reconciler.DirectAdminClient" + + +def _patch_da(return_value): + """Patch DirectAdminClient so list_domains returns a fixed value.""" + return patch(DA_CLIENT_PATH, **{"return_value.list_domains.return_value": return_value}) + + # --------------------------------------------------------------------------- # _reconcile_all — orphan detection # --------------------------------------------------------------------------- @@ -58,7 +69,7 @@ def test_orphan_queued_when_domain_missing_from_da(worker, delete_queue, patch_c ) patch_connect.commit() - with patch.object(worker, "_fetch_da_domains", return_value=set()): + with _patch_da(set()): worker._reconcile_all() assert not delete_queue.empty() @@ -73,7 +84,7 @@ def test_orphan_not_queued_in_dry_run(dry_run_worker, delete_queue, patch_connec ) patch_connect.commit() - with patch.object(dry_run_worker, "_fetch_da_domains", return_value=set()): + with _patch_da(set()): dry_run_worker._reconcile_all() assert delete_queue.empty() @@ -86,7 +97,7 @@ def test_orphan_not_queued_for_unknown_server(worker, delete_queue, patch_connec ) patch_connect.commit() - with patch.object(worker, "_fetch_da_domains", return_value=set()): + with _patch_da(set()): worker._reconcile_all() assert delete_queue.empty() @@ -98,7 +109,7 @@ def test_active_domain_not_queued(worker, delete_queue, patch_connect): ) patch_connect.commit() - with patch.object(worker, "_fetch_da_domains", return_value={"good.com"}): + with _patch_da({"good.com"}): worker._reconcile_all() assert delete_queue.empty() @@ -113,7 +124,7 @@ def test_backfill_null_hostname(worker, patch_connect): patch_connect.add(Domain(domain="backfill.com", hostname=None, username="admin")) patch_connect.commit() - with patch.object(worker, "_fetch_da_domains", return_value={"backfill.com"}): + with _patch_da({"backfill.com"}): worker._reconcile_all() record = patch_connect.query(Domain).filter_by(domain="backfill.com").first() @@ -126,7 +137,7 @@ def test_migration_updates_hostname(worker, patch_connect): ) patch_connect.commit() - with patch.object(worker, "_fetch_da_domains", return_value={"moved.com"}): + with _patch_da({"moved.com"}): worker._reconcile_all() record = patch_connect.query(Domain).filter_by(domain="moved.com").first() @@ -138,148 +149,13 @@ def test_dry_run_still_backfills(dry_run_worker, patch_connect): patch_connect.add(Domain(domain="fill.com", hostname=None, username="admin")) patch_connect.commit() - with patch.object(dry_run_worker, "_fetch_da_domains", return_value={"fill.com"}): + with _patch_da({"fill.com"}): dry_run_worker._reconcile_all() record = patch_connect.query(Domain).filter_by(domain="fill.com").first() assert record.hostname == "da1.example.com" -# --------------------------------------------------------------------------- -# _fetch_da_domains — HTTP handling -# --------------------------------------------------------------------------- - - -def _make_json_response(domains_dict, total_pages=1): - """Return a mock requests.Response with JSON payload matching DA format.""" - data = {str(i): {"domain": d} for i, d in enumerate(domains_dict)} - data["info"] = {"total_pages": total_pages} - mock = MagicMock() - mock.status_code = 200 - mock.is_redirect = False - mock.headers = {"Content-Type": "application/json"} - mock.json.return_value = data - mock.raise_for_status = MagicMock() - return mock - - -def test_fetch_returns_domains_from_json(worker): - mock_resp = _make_json_response(["example.com", "test.com"]) - - with patch("requests.get", return_value=mock_resp): - result = worker._fetch_da_domains( - "da1.example.com", 2222, "admin", "secret", True - ) - - assert result == {"example.com", "test.com"} - - -def test_fetch_paginates(worker): - page1 = _make_json_response(["a.com"], total_pages=2) - page2 = _make_json_response(["b.com"], total_pages=2) - - with patch("requests.get", side_effect=[page1, page2]): - result = worker._fetch_da_domains( - "da1.example.com", 2222, "admin", "secret", True - ) - - assert result == {"a.com", "b.com"} - - -def test_fetch_redirect_triggers_session_login(worker): - redirect_resp = MagicMock() - redirect_resp.status_code = 302 - redirect_resp.is_redirect = True - - with ( - patch("requests.get", return_value=redirect_resp), - patch.object(worker, "_da_session_login", return_value=None), - ): - result = worker._fetch_da_domains( - "da1.example.com", 2222, "admin", "secret", True - ) - - assert result is None - - -def test_fetch_html_response_returns_none(worker): - mock_resp = MagicMock() - mock_resp.status_code = 200 - mock_resp.is_redirect = False - mock_resp.headers = {"Content-Type": "text/html; charset=utf-8"} - mock_resp.raise_for_status = MagicMock() - - with patch("requests.get", return_value=mock_resp): - result = worker._fetch_da_domains( - "da1.example.com", 2222, "admin", "secret", True - ) - - assert result is None - - -def test_fetch_connection_error_returns_none(worker): - with patch( - "requests.get", side_effect=requests.exceptions.ConnectionError("refused") - ): - result = worker._fetch_da_domains( - "da1.example.com", 2222, "admin", "secret", True - ) - - assert result is None - - -def test_fetch_timeout_returns_none(worker): - with patch("requests.get", side_effect=requests.exceptions.Timeout()): - result = worker._fetch_da_domains( - "da1.example.com", 2222, "admin", "secret", True - ) - - assert result is None - - -def test_fetch_ssl_error_returns_none(worker): - with patch( - "requests.get", side_effect=requests.exceptions.SSLError("cert verify failed") - ): - result = worker._fetch_da_domains( - "da1.example.com", 2222, "admin", "secret", True - ) - - assert result is None - - -# --------------------------------------------------------------------------- -# _parse_da_domain_list — legacy format fallback -# --------------------------------------------------------------------------- - - -def test_parse_standard_querystring(): - body = "list[]=example.com&list[]=test.com" - result = ReconciliationWorker._parse_da_domain_list(body) - assert result == {"example.com", "test.com"} - - -def test_parse_newline_separated(): - body = "list[]=example.com\nlist[]=test.com" - result = ReconciliationWorker._parse_da_domain_list(body) - assert result == {"example.com", "test.com"} - - -def test_parse_empty_body_returns_empty_set(): - assert ReconciliationWorker._parse_da_domain_list("") == set() - - -def test_parse_normalises_to_lowercase(): - result = ReconciliationWorker._parse_da_domain_list("list[]=EXAMPLE.COM") - assert "example.com" in result - assert "EXAMPLE.COM" not in result - - -def test_parse_strips_whitespace(): - result = ReconciliationWorker._parse_da_domain_list("list[]= example.com ") - assert "example.com" in result - - # --------------------------------------------------------------------------- # Worker lifecycle # ---------------------------------------------------------------------------