From 74c5f4012e4ea11b6b5b87fdab69ff591ebd59a4 Mon Sep 17 00:00:00 2001 From: Aaron Guise Date: Wed, 18 Feb 2026 22:53:09 +1300 Subject: [PATCH] =?UTF-8?q?style:=20apply=20black=20formatting=20across=20?= =?UTF-8?q?codebase=20=F0=9F=8E=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No logic changes — pure reformatting of line lengths, dict literals, method-chain line breaks, and trailing newlines to satisfy black's style. --- directdnsonly/app/api/admin.py | 24 +++-- directdnsonly/app/backends/coredns_mysql.py | 47 ++++----- directdnsonly/app/reconciler.py | 100 ++++++++++++++------ directdnsonly/app/utils/zone_parser.py | 4 +- directdnsonly/main.py | 4 +- directdnsonly/worker.py | 93 ++++++++++-------- tests/conftest.py | 6 +- tests/test_admin_api.py | 91 ++++++++++++------ tests/test_coredns_mysql.py | 13 ++- tests/test_reconciler.py | 71 ++++++++++---- tests/test_utils.py | 1 + tests/test_zone_parser.py | 1 + 12 files changed, 291 insertions(+), 164 deletions(-) diff --git a/directdnsonly/app/api/admin.py b/directdnsonly/app/api/admin.py index 304d3b2..9386dba 100644 --- a/directdnsonly/app/api/admin.py +++ b/directdnsonly/app/api/admin.py @@ -113,19 +113,23 @@ class DNSAdminAPI: if domain_exists: record = get_domain_record(domain) - return urlencode({ - "error": 0, - "exists": 1, - "details": f"Domain exists on {record.hostname}", - }) + return urlencode( + { + "error": 0, + "exists": 1, + "details": f"Domain exists on {record.hostname}", + } + ) # parent match only parent_record = get_parent_domain_record(domain) - return urlencode({ - "error": 0, - "exists": 2, - "details": f"Parent Domain exists on {parent_record.hostname}", - }) + return urlencode( + { + "error": 0, + "exists": 2, + "details": f"Parent Domain exists on {parent_record.hostname}", + } + ) def _handle_rawsave(self, domain: str, params: dict): """Process zone file saves""" diff --git a/directdnsonly/app/backends/coredns_mysql.py b/directdnsonly/app/backends/coredns_mysql.py index 9d457db..0e493f1 100644 --- a/directdnsonly/app/backends/coredns_mysql.py +++ b/directdnsonly/app/backends/coredns_mysql.py @@ -119,9 +119,7 @@ class CoreDNSMySQLBackend(DNSBackend): ) session.add(existing_soa) changes["added"] += 1 - logger.debug( - f"Added SOA record: {soa_name} SOA {soa_content}" - ) + logger.debug(f"Added SOA record: {soa_name} SOA {soa_content}") # Process all non-SOA records for record_name, record_type, record_content, record_ttl in source_records: @@ -172,7 +170,7 @@ class CoreDNSMySQLBackend(DNSBackend): changes["removed"] += 1 session.commit() - total_changes = changes['added'] + changes['updated'] + changes['removed'] + total_changes = changes["added"] + changes["updated"] + changes["removed"] if total_changes > 0: logger.info( f"[{self.instance_name}] Zone {zone_name} updated: " @@ -180,9 +178,7 @@ class CoreDNSMySQLBackend(DNSBackend): f"{changes['removed']} removed" ) else: - logger.debug( - f"[{self.instance_name}] Zone {zone_name}: no changes" - ) + logger.debug(f"[{self.instance_name}] Zone {zone_name}: no changes") return True except Exception as e: @@ -196,7 +192,11 @@ class CoreDNSMySQLBackend(DNSBackend): session = self.Session() try: # First find the zone - zone = session.query(Zone).filter_by(zone_name=self.dot_fqdn(zone_name)).first() + zone = ( + session.query(Zone) + .filter_by(zone_name=self.dot_fqdn(zone_name)) + .first() + ) if not zone: logger.warning(f"Zone {zone_name} not found for deletion") return False @@ -230,7 +230,9 @@ class CoreDNSMySQLBackend(DNSBackend): session = self.Session() try: exists = ( - session.query(Zone).filter_by(zone_name=self.dot_fqdn(zone_name)).first() + session.query(Zone) + .filter_by(zone_name=self.dot_fqdn(zone_name)) + .first() is not None ) logger.debug(f"Zone existence check for {zone_name}: {exists}") @@ -266,17 +268,11 @@ class CoreDNSMySQLBackend(DNSBackend): The normalized CNAME target string """ if record_content.startswith("@"): - logger.debug( - f"CNAME target starts with '@', replacing with zone FQDN" - ) + logger.debug(f"CNAME target starts with '@', replacing with zone FQDN") record_content = self.dot_fqdn(zone_name) elif not record_content.endswith("."): - logger.debug( - f"CNAME target {record_content} is relative, appending zone" - ) - record_content = ".".join( - [record_content, self.dot_fqdn(zone_name)] - ) + logger.debug(f"CNAME target {record_content} is relative, appending zone") + record_content = ".".join([record_content, self.dot_fqdn(zone_name)]) return record_content def _parse_zone_to_record_set( @@ -306,9 +302,7 @@ class CoreDNSMySQLBackend(DNSBackend): continue if record_type == "CNAME": - record_content = self._normalize_cname_data( - zone_name, record_content - ) + record_content = self._normalize_cname_data(zone_name, record_content) records.add((record_name, record_type, record_content, ttl)) @@ -341,9 +335,7 @@ class CoreDNSMySQLBackend(DNSBackend): ) return False, 0 - actual_count = ( - session.query(Record).filter_by(zone_id=zone.id).count() - ) + actual_count = session.query(Record).filter_by(zone_id=zone.id).count() matches = actual_count == expected_count if not matches: @@ -409,14 +401,11 @@ class CoreDNSMySQLBackend(DNSBackend): ) # Build lookup keys (without TTL) matching write_zone's key format expected_keys: Set[Tuple[str, str, str]] = { - (hostname, rtype, data) - for hostname, rtype, data, _ in source_records + (hostname, rtype, data) for hostname, rtype, data, _ in source_records } # Query all records currently in the backend for this zone - db_records = ( - session.query(Record).filter_by(zone_id=zone.id).all() - ) + db_records = session.query(Record).filter_by(zone_id=zone.id).all() removed = 0 for record in db_records: diff --git a/directdnsonly/app/reconciler.py b/directdnsonly/app/reconciler.py index baa693d..6713939 100755 --- a/directdnsonly/app/reconciler.py +++ b/directdnsonly/app/reconciler.py @@ -109,9 +109,7 @@ class ReconciliationWorker: f"[reconciler] {hostname}: {len(da_domains) if da_domains else 0} active domain(s) in DA" ) except Exception as e: - logger.error( - f"[reconciler] Unexpected error polling {hostname}: {e}" - ) + logger.error(f"[reconciler] Unexpected error polling {hostname}: {e}") # Now check local DB for all domains, update master if needed, and queue deletes only from recorded master session = connect() @@ -147,12 +145,14 @@ class ReconciliationWorker: f"(master: {recorded_master})" ) else: - self.delete_queue.put({ - "domain": record.domain, - "hostname": record.hostname, - "username": record.username or "", - "source": "reconciler", - }) + self.delete_queue.put( + { + "domain": record.domain, + "hostname": record.hostname, + "username": record.username or "", + "source": "reconciler", + } + ) logger.debug( f"[reconciler] Queued delete for orphan: {record.domain} " f"(master: {recorded_master})" @@ -161,9 +161,13 @@ class ReconciliationWorker: if migrated or backfilled: session.commit() if backfilled: - logger.info(f"[reconciler] {backfilled} domain(s) had missing hostname backfilled.") + logger.info( + f"[reconciler] {backfilled} domain(s) had missing hostname backfilled." + ) if migrated: - logger.info(f"[reconciler] {migrated} domain(s) migrated to new master.") + logger.info( + f"[reconciler] {migrated} domain(s) migrated to new master." + ) finally: session.close() if self.dry_run: @@ -178,7 +182,13 @@ class ReconciliationWorker: ) def _fetch_da_domains( - self, hostname: str, port: int, username: str, password: str, use_ssl: bool, ipp: int = 1000 + self, + hostname: str, + port: int, + username: str, + password: str, + use_ssl: bool, + ipp: int = 1000, ): """Fetch all domains from a DA server via CMD_DNS_ADMIN (JSON, paging supported). @@ -205,13 +215,21 @@ class ReconciliationWorker: response = requests.get(url, **req_kwargs) - if response.is_redirect or response.status_code in (301, 302, 303, 307, 308): + if response.is_redirect or response.status_code in ( + 301, + 302, + 303, + 307, + 308, + ): if not cookies: logger.debug( f"[reconciler] {hostname}:{port} redirected Basic Auth " f"(HTTP {response.status_code}) — attempting session login (DA Evo)" ) - cookies = self._da_session_login(scheme, hostname, port, username, password) + cookies = self._da_session_login( + scheme, hostname, port, username, password + ) if cookies is None: return None continue # retry this page with cookies @@ -279,9 +297,7 @@ class ReconciliationWorker: ) return None except Exception as e: - logger.error( - f"[reconciler] Unexpected error fetching from {hostname}: {e}" - ) + logger.error(f"[reconciler] Unexpected error fetching from {hostname}: {e}") return None def _da_session_login( @@ -310,12 +326,12 @@ class ReconciliationWorker: f"check username/password." ) return None - logger.debug(f"[reconciler] {hostname}:{port} session login successful (DA Evo)") + logger.debug( + f"[reconciler] {hostname}:{port} session login successful (DA Evo)" + ) return response.cookies except Exception as e: - logger.error( - f"[reconciler] {hostname}:{port} session login failed: {e}" - ) + logger.error(f"[reconciler] {hostname}:{port} session login failed: {e}") return None @staticmethod @@ -334,24 +350,44 @@ class ReconciliationWorker: domains = params.get("list[]", []) return {d.strip().lower() for d in domains if d.strip()} + if __name__ == "__main__": import argparse import sys from queue import Queue - parser = argparse.ArgumentParser(description="Test DirectAdmin domain fetcher (JSON/paging)") + parser = argparse.ArgumentParser( + description="Test DirectAdmin domain fetcher (JSON/paging)" + ) parser.add_argument("--hostname", required=True, help="DirectAdmin server hostname") - parser.add_argument("--port", type=int, default=2222, help="DirectAdmin port (default: 2222)") + parser.add_argument( + "--port", type=int, default=2222, help="DirectAdmin port (default: 2222)" + ) parser.add_argument("--username", required=True, help="DirectAdmin admin username") parser.add_argument("--password", required=True, help="DirectAdmin admin password") parser.add_argument("--ssl", action="store_true", help="Use HTTPS (default: True)") - parser.add_argument("--no-ssl", dest="ssl", action="store_false", help="Use HTTP (not recommended)") + parser.add_argument( + "--no-ssl", dest="ssl", action="store_false", help="Use HTTP (not recommended)" + ) parser.set_defaults(ssl=True) - parser.add_argument("--verify-ssl", action="store_true", help="Verify SSL certs (default: True)") - parser.add_argument("--no-verify-ssl", dest="verify_ssl", action="store_false", help="Don't verify SSL certs") + parser.add_argument( + "--verify-ssl", action="store_true", help="Verify SSL certs (default: True)" + ) + parser.add_argument( + "--no-verify-ssl", + dest="verify_ssl", + action="store_false", + help="Don't verify SSL certs", + ) parser.set_defaults(verify_ssl=True) - parser.add_argument("--ipp", type=int, default=1000, help="Items per page (default: 1000)") - parser.add_argument("--print-json", action="store_true", help="Print raw JSON response for first page") + parser.add_argument( + "--ipp", type=int, default=1000, help="Items per page (default: 1000)" + ) + parser.add_argument( + "--print-json", + action="store_true", + help="Print raw JSON response for first page", + ) args = parser.parse_args() @@ -372,7 +408,9 @@ if __name__ == "__main__": q = Queue() worker = ReconciliationWorker(q, config) server = config["directadmin_servers"][0] - print(f"Fetching domains from {server['hostname']}:{server['port']} (ipp={args.ipp})...") + print( + f"Fetching domains from {server['hostname']}:{server['port']} (ipp={args.ipp})..." + ) # Directly call the fetch method for testing domains = worker._fetch_da_domains( server["hostname"], @@ -380,7 +418,7 @@ if __name__ == "__main__": server.get("username"), server.get("password"), server.get("ssl", True), - ipp=args.ipp + ipp=args.ipp, ) if domains is None: print("Failed to fetch domains.", file=sys.stderr) @@ -404,4 +442,4 @@ if __name__ == "__main__": print("\nRaw JSON for first page:") print(resp.json()) except Exception: - print("(Could not parse JSON)") \ No newline at end of file + print("(Could not parse JSON)") diff --git a/directdnsonly/app/utils/zone_parser.py b/directdnsonly/app/utils/zone_parser.py index c124ce7..f7f7b8e 100644 --- a/directdnsonly/app/utils/zone_parser.py +++ b/directdnsonly/app/utils/zone_parser.py @@ -59,9 +59,7 @@ def count_zone_records(zone_data: str, domain_name: str) -> int: if rdata.rdclass == IN: count += 1 - logger.debug( - f"Source zone {domain_name} contains {count} records" - ) + logger.debug(f"Source zone {domain_name} contains {count} records") return count except DNSException as e: diff --git a/directdnsonly/main.py b/directdnsonly/main.py index 89c13fa..1a8e633 100644 --- a/directdnsonly/main.py +++ b/directdnsonly/main.py @@ -50,7 +50,9 @@ def main(): # Configure CherryPy user_password_dict = { - config.get_string("app.auth_username"): config.get_string("app.auth_password") + config.get_string("app.auth_username"): config.get_string( + "app.auth_password" + ) } check_password = cherrypy.lib.auth_basic.checkpassword_dict(user_password_dict) diff --git a/directdnsonly/worker.py b/directdnsonly/worker.py index b738b9d..7c933d8 100644 --- a/directdnsonly/worker.py +++ b/directdnsonly/worker.py @@ -14,7 +14,9 @@ from directdnsonly.app.reconciler import ReconciliationWorker class WorkerManager: - def __init__(self, queue_path: str, backend_registry, reconciliation_config: dict = None): + def __init__( + self, queue_path: str, backend_registry, reconciliation_config: dict = None + ): self.queue_path = queue_path self.backend_registry = backend_registry self._running = False @@ -86,9 +88,7 @@ class WorkerManager: f"{len(backends)} backends concurrently: " f"{', '.join(backends.keys())}" ) - self._process_backends_parallel( - backends, item, session - ) + self._process_backends_parallel(backends, item, session) else: # Single backend, no need for thread overhead for backend_name, backend in backends.items(): @@ -126,9 +126,7 @@ class WorkerManager: try: logger.debug(f"Using backend: {backend_name}") if backend.write_zone(item["domain"], item["zone_file"]): - logger.debug( - f"Successfully updated {item['domain']} in {backend_name}" - ) + logger.debug(f"Successfully updated {item['domain']} in {backend_name}") if backend.get_name() == "bind": # Need to update the named.conf backend.update_named_conf( @@ -144,9 +142,7 @@ class WorkerManager: backend_name, backend, item["domain"], item["zone_file"] ) else: - logger.error( - f"Failed to update {item['domain']} in {backend_name}" - ) + logger.error(f"Failed to update {item['domain']} in {backend_name}") except Exception as e: logger.error(f"Error in {backend_name}: {str(e)}") @@ -165,9 +161,7 @@ class WorkerManager: record = session.query(Domain).filter_by(domain=domain).first() if not record: - logger.warning( - f"Domain {domain} not found in DB — skipping delete" - ) + logger.warning(f"Domain {domain} not found in DB — skipping delete") self.delete_queue.task_done() continue @@ -194,16 +188,29 @@ class WorkerManager: elif len(backends) > 1: # Parallel delete, track failures results = [] - def delete_backend_wrapper(backend_name, backend, domain, remaining_domains): + + def delete_backend_wrapper( + backend_name, backend, domain, remaining_domains + ): try: return backend.delete_zone(domain) except Exception as e: - logger.error(f"Error deleting {domain} from {backend_name}: {e}") + logger.error( + f"Error deleting {domain} from {backend_name}: {e}" + ) return False + from concurrent.futures import ThreadPoolExecutor, as_completed + with ThreadPoolExecutor(max_workers=len(backends)) as executor: futures = { - executor.submit(delete_backend_wrapper, backend_name, backend, domain, remaining_domains): backend_name + executor.submit( + delete_backend_wrapper, + backend_name, + backend, + domain, + remaining_domains, + ): backend_name for backend_name, backend in backends.items() } for future in as_completed(futures): @@ -212,9 +219,13 @@ class WorkerManager: result = future.result() results.append(result) if not result: - logger.error(f"Failed to delete {domain} from {backend_name}") + logger.error( + f"Failed to delete {domain} from {backend_name}" + ) except Exception as e: - logger.error(f"Unhandled error deleting from {backend_name}: {e}") + logger.error( + f"Unhandled error deleting from {backend_name}: {e}" + ) results.append(False) delete_success = all(results) else: @@ -223,10 +234,14 @@ class WorkerManager: try: result = backend.delete_zone(domain) if not result: - logger.error(f"Failed to delete {domain} from {backend_name}") + logger.error( + f"Failed to delete {domain} from {backend_name}" + ) delete_success = False except Exception as e: - logger.error(f"Error deleting {domain} from {backend_name}: {e}") + logger.error( + f"Error deleting {domain} from {backend_name}: {e}" + ) delete_success = False if delete_success: @@ -236,7 +251,9 @@ class WorkerManager: self.delete_queue.task_done() logger.success(f"Delete completed for {domain}") else: - logger.error(f"Delete failed for {domain} on one or more backends — DB record retained") + logger.error( + f"Delete failed for {domain} on one or more backends — DB record retained" + ) self.delete_queue.task_done() except Empty: @@ -270,7 +287,10 @@ class WorkerManager: futures = { executor.submit( self._delete_single_backend, - backend_name, backend, domain, remaining_domains + backend_name, + backend, + domain, + remaining_domains, ): backend_name for backend_name, backend in backends.items() } @@ -279,9 +299,7 @@ class WorkerManager: try: future.result() except Exception as e: - logger.error( - f"Unhandled error deleting from {backend_name}: {e}" - ) + logger.error(f"Unhandled error deleting from {backend_name}: {e}") elapsed = (time.monotonic() - start_time) * 1000 logger.debug( f"Parallel delete of {domain} across " @@ -292,13 +310,11 @@ class WorkerManager: """Process zone updates across multiple backends in parallel""" start_time = time.monotonic() with ThreadPoolExecutor( - max_workers=len(backends), - thread_name_prefix="backend" + max_workers=len(backends), thread_name_prefix="backend" ) as executor: futures = { executor.submit( - self._process_single_backend, - backend_name, backend, item, session + self._process_single_backend, backend_name, backend, item, session ): backend_name for backend_name, backend in backends.items() } @@ -317,9 +333,7 @@ class WorkerManager: f"{len(backends)} backends completed in {elapsed:.0f}ms" ) - def _verify_backend_record_count( - self, backend_name, backend, zone_name, zone_data - ): + def _verify_backend_record_count(self, backend_name, backend, zone_name, zone_data): """Verify and reconcile the backend record count against the authoritative BIND zone from DirectAdmin. @@ -344,9 +358,7 @@ class WorkerManager: ) return - matches, actual = backend.verify_zone_record_count( - zone_name, expected - ) + matches, actual = backend.verify_zone_record_count(zone_name, expected) if matches: return # All good @@ -357,9 +369,7 @@ class WorkerManager: f"record(s) for {zone_name} — reconciling against " f"DirectAdmin source zone" ) - success, removed = backend.reconcile_zone_records( - zone_name, zone_data - ) + success, removed = backend.reconcile_zone_records(zone_name, zone_data) if success and removed > 0: # Verify again after reconciliation matches, new_count = backend.verify_zone_record_count( @@ -437,6 +447,9 @@ class WorkerManager: "save_queue_size": self.save_queue.qsize(), "delete_queue_size": self.delete_queue.qsize(), "save_worker_alive": self._save_thread and self._save_thread.is_alive(), - "delete_worker_alive": self._delete_thread and self._delete_thread.is_alive(), - "reconciler_alive": self._reconciler.is_alive if self._reconciler else False, + "delete_worker_alive": self._delete_thread + and self._delete_thread.is_alive(), + "reconciler_alive": ( + self._reconciler.is_alive if self._reconciler else False + ), } diff --git a/tests/conftest.py b/tests/conftest.py index 421d3aa..e65b5cf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,14 @@ """Shared test fixtures for directdnsonly test suite.""" + import pytest from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from directdnsonly.app.db import Base -from directdnsonly.app.db.models import Domain, Key # noqa: F401 — registers models with Base +from directdnsonly.app.db.models import ( + Domain, + Key, +) # noqa: F401 — registers models with Base @pytest.fixture diff --git a/tests/test_admin_api.py b/tests/test_admin_api.py index 8820eef..1edc2da 100644 --- a/tests/test_admin_api.py +++ b/tests/test_admin_api.py @@ -1,4 +1,5 @@ """Tests for directdnsonly.app.api.admin — DNSAdminAPI handler methods.""" + import pytest from unittest.mock import MagicMock, patch from urllib.parse import parse_qs @@ -60,8 +61,12 @@ def test_handle_exists_unsupported_action_returns_error(api): def test_handle_exists_domain_not_found(api): - with patch("directdnsonly.app.api.admin.check_zone_exists", return_value=False), \ - patch("directdnsonly.app.api.admin.check_parent_domain_owner", return_value=False): + with ( + patch("directdnsonly.app.api.admin.check_zone_exists", return_value=False), + patch( + "directdnsonly.app.api.admin.check_parent_domain_owner", return_value=False + ), + ): result = api._handle_exists({"action": "exists", "domain": "example.com"}) parsed = parse_qs(result) @@ -73,8 +78,10 @@ def test_handle_exists_domain_found(api): record = MagicMock() record.hostname = "da1.example.com" - with patch("directdnsonly.app.api.admin.check_zone_exists", return_value=True), \ - patch("directdnsonly.app.api.admin.get_domain_record", return_value=record): + with ( + patch("directdnsonly.app.api.admin.check_zone_exists", return_value=True), + patch("directdnsonly.app.api.admin.get_domain_record", return_value=record), + ): result = api._handle_exists({"action": "exists", "domain": "example.com"}) parsed = parse_qs(result) @@ -87,14 +94,22 @@ def test_handle_exists_parent_found(api): parent = MagicMock() parent.hostname = "da2.example.com" - with patch("directdnsonly.app.api.admin.check_zone_exists", return_value=False), \ - patch("directdnsonly.app.api.admin.check_parent_domain_owner", return_value=True), \ - patch("directdnsonly.app.api.admin.get_parent_domain_record", return_value=parent): - result = api._handle_exists({ - "action": "exists", - "domain": "sub.example.com", - "check_for_parent_domain": "1", - }) + with ( + patch("directdnsonly.app.api.admin.check_zone_exists", return_value=False), + patch( + "directdnsonly.app.api.admin.check_parent_domain_owner", return_value=True + ), + patch( + "directdnsonly.app.api.admin.get_parent_domain_record", return_value=parent + ), + ): + result = api._handle_exists( + { + "action": "exists", + "domain": "sub.example.com", + "check_for_parent_domain": "1", + } + ) parsed = parse_qs(result) assert parsed["error"] == ["0"] @@ -107,9 +122,11 @@ def test_handle_exists_no_parent_check_when_flag_absent(api): record = MagicMock() record.hostname = "da1.example.com" - with patch("directdnsonly.app.api.admin.check_zone_exists", return_value=True), \ - patch("directdnsonly.app.api.admin.check_parent_domain_owner") as mock_parent, \ - patch("directdnsonly.app.api.admin.get_domain_record", return_value=record): + with ( + patch("directdnsonly.app.api.admin.check_zone_exists", return_value=True), + patch("directdnsonly.app.api.admin.check_parent_domain_owner") as mock_parent, + patch("directdnsonly.app.api.admin.get_domain_record", return_value=record), + ): api._handle_exists({"action": "exists", "domain": "example.com"}) mock_parent.assert_not_called() @@ -123,14 +140,21 @@ SAMPLE_ZONE = "$ORIGIN example.com.\n$TTL 300\nexample.com. 300 IN A 1.2.3.4\n" def test_rawsave_enqueues_item(api, save_queue): - with patch("directdnsonly.app.api.admin.validate_and_normalize_zone", - return_value=SAMPLE_ZONE), \ - patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))): - result = api._handle_rawsave("example.com", { - "zone_file": SAMPLE_ZONE, - "hostname": "da1.example.com", - "username": "admin", - }) + with ( + patch( + "directdnsonly.app.api.admin.validate_and_normalize_zone", + return_value=SAMPLE_ZONE, + ), + patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))), + ): + result = api._handle_rawsave( + "example.com", + { + "zone_file": SAMPLE_ZONE, + "hostname": "da1.example.com", + "username": "admin", + }, + ) save_queue.put.assert_called_once() item = save_queue.put.call_args[0][0] @@ -150,9 +174,13 @@ def test_rawsave_missing_zone_file_raises(api): def test_rawsave_invalid_zone_raises(api): - with patch("directdnsonly.app.api.admin.validate_and_normalize_zone", - side_effect=ValueError("Invalid zone data: bad record")), \ - patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))): + with ( + patch( + "directdnsonly.app.api.admin.validate_and_normalize_zone", + side_effect=ValueError("Invalid zone data: bad record"), + ), + patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))), + ): with pytest.raises(ValueError, match="Invalid zone data"): api._handle_rawsave("example.com", {"zone_file": "garbage"}) @@ -164,10 +192,13 @@ def test_rawsave_invalid_zone_raises(api): def test_delete_enqueues_item(api, delete_queue): with patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="10.0.0.1"))): - result = api._handle_delete("example.com", { - "hostname": "da1.example.com", - "username": "admin", - }) + result = api._handle_delete( + "example.com", + { + "hostname": "da1.example.com", + "username": "admin", + }, + ) delete_queue.put.assert_called_once() item = delete_queue.put.call_args[0][0] diff --git a/tests/test_coredns_mysql.py b/tests/test_coredns_mysql.py index cb38088..977bb5e 100644 --- a/tests/test_coredns_mysql.py +++ b/tests/test_coredns_mysql.py @@ -1,4 +1,5 @@ """Tests for the CoreDNS MySQL backend (run against in-memory SQLite).""" + import pytest from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker @@ -141,8 +142,16 @@ def test_reconcile_removes_extra_records(mysql_backend): # Inject a phantom record directly into the DB session = mysql_backend.Session() zone = session.query(Zone).filter_by(zone_name="example.com.").first() - session.add(Record(zone_id=zone.id, hostname="phantom", type="A", - data="10.0.0.99", ttl=300, online=True)) + session.add( + Record( + zone_id=zone.id, + hostname="phantom", + type="A", + data="10.0.0.99", + ttl=300, + online=True, + ) + ) session.commit() session.close() diff --git a/tests/test_reconciler.py b/tests/test_reconciler.py index 14b5f91..daf1b2b 100644 --- a/tests/test_reconciler.py +++ b/tests/test_reconciler.py @@ -1,4 +1,5 @@ """Tests for directdnsonly.app.reconciler — ReconciliationWorker.""" + import pytest import requests.exceptions from queue import Queue @@ -12,7 +13,13 @@ from directdnsonly.app.db.models import Domain # Fixtures # --------------------------------------------------------------------------- -SERVER = {"hostname": "da1.example.com", "port": 2222, "username": "admin", "password": "secret", "ssl": True} +SERVER = { + "hostname": "da1.example.com", + "port": 2222, + "username": "admin", + "password": "secret", + "ssl": True, +} BASE_CONFIG = { "enabled": True, @@ -46,7 +53,9 @@ def dry_run_worker(delete_queue): def test_orphan_queued_when_domain_missing_from_da(worker, delete_queue, patch_connect): - patch_connect.add(Domain(domain="orphan.com", hostname="da1.example.com", username="admin")) + patch_connect.add( + Domain(domain="orphan.com", hostname="da1.example.com", username="admin") + ) patch_connect.commit() with patch.object(worker, "_fetch_da_domains", return_value=set()): @@ -59,7 +68,9 @@ def test_orphan_queued_when_domain_missing_from_da(worker, delete_queue, patch_c def test_orphan_not_queued_in_dry_run(dry_run_worker, delete_queue, patch_connect): - patch_connect.add(Domain(domain="orphan.com", hostname="da1.example.com", username="admin")) + patch_connect.add( + Domain(domain="orphan.com", hostname="da1.example.com", username="admin") + ) patch_connect.commit() with patch.object(dry_run_worker, "_fetch_da_domains", return_value=set()): @@ -70,7 +81,9 @@ def test_orphan_not_queued_in_dry_run(dry_run_worker, delete_queue, patch_connec def test_orphan_not_queued_for_unknown_server(worker, delete_queue, patch_connect): """Domains whose recorded master is NOT in our configured servers are skipped.""" - patch_connect.add(Domain(domain="other.com", hostname="da99.unknown.com", username="admin")) + patch_connect.add( + Domain(domain="other.com", hostname="da99.unknown.com", username="admin") + ) patch_connect.commit() with patch.object(worker, "_fetch_da_domains", return_value=set()): @@ -80,7 +93,9 @@ def test_orphan_not_queued_for_unknown_server(worker, delete_queue, patch_connec def test_active_domain_not_queued(worker, delete_queue, patch_connect): - patch_connect.add(Domain(domain="good.com", hostname="da1.example.com", username="admin")) + patch_connect.add( + Domain(domain="good.com", hostname="da1.example.com", username="admin") + ) patch_connect.commit() with patch.object(worker, "_fetch_da_domains", return_value={"good.com"}): @@ -106,7 +121,9 @@ def test_backfill_null_hostname(worker, patch_connect): def test_migration_updates_hostname(worker, patch_connect): - patch_connect.add(Domain(domain="moved.com", hostname="da-old.example.com", username="admin")) + patch_connect.add( + Domain(domain="moved.com", hostname="da-old.example.com", username="admin") + ) patch_connect.commit() with patch.object(worker, "_fetch_da_domains", return_value={"moved.com"}): @@ -150,7 +167,9 @@ def test_fetch_returns_domains_from_json(worker): mock_resp = _make_json_response(["example.com", "test.com"]) with patch("requests.get", return_value=mock_resp): - result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True) + result = worker._fetch_da_domains( + "da1.example.com", 2222, "admin", "secret", True + ) assert result == {"example.com", "test.com"} @@ -160,7 +179,9 @@ def test_fetch_paginates(worker): page2 = _make_json_response(["b.com"], total_pages=2) with patch("requests.get", side_effect=[page1, page2]): - result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True) + result = worker._fetch_da_domains( + "da1.example.com", 2222, "admin", "secret", True + ) assert result == {"a.com", "b.com"} @@ -170,9 +191,13 @@ def test_fetch_redirect_triggers_session_login(worker): redirect_resp.status_code = 302 redirect_resp.is_redirect = True - with patch("requests.get", return_value=redirect_resp), \ - patch.object(worker, "_da_session_login", return_value=None): - result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True) + with ( + patch("requests.get", return_value=redirect_resp), + patch.object(worker, "_da_session_login", return_value=None), + ): + result = worker._fetch_da_domains( + "da1.example.com", 2222, "admin", "secret", True + ) assert result is None @@ -185,28 +210,40 @@ def test_fetch_html_response_returns_none(worker): mock_resp.raise_for_status = MagicMock() with patch("requests.get", return_value=mock_resp): - result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True) + result = worker._fetch_da_domains( + "da1.example.com", 2222, "admin", "secret", True + ) assert result is None def test_fetch_connection_error_returns_none(worker): - with patch("requests.get", side_effect=requests.exceptions.ConnectionError("refused")): - result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True) + with patch( + "requests.get", side_effect=requests.exceptions.ConnectionError("refused") + ): + result = worker._fetch_da_domains( + "da1.example.com", 2222, "admin", "secret", True + ) assert result is None def test_fetch_timeout_returns_none(worker): with patch("requests.get", side_effect=requests.exceptions.Timeout()): - result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True) + result = worker._fetch_da_domains( + "da1.example.com", 2222, "admin", "secret", True + ) assert result is None def test_fetch_ssl_error_returns_none(worker): - with patch("requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")): - result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True) + with patch( + "requests.get", side_effect=requests.exceptions.SSLError("cert verify failed") + ): + result = worker._fetch_da_domains( + "da1.example.com", 2222, "admin", "secret", True + ) assert result is None diff --git a/tests/test_utils.py b/tests/test_utils.py index 756ca67..235b60c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,4 +1,5 @@ """Tests for directdnsonly.app.utils — zone index helper functions.""" + import pytest from directdnsonly.app.db.models import Domain diff --git a/tests/test_zone_parser.py b/tests/test_zone_parser.py index 1a745d2..e7888ed 100644 --- a/tests/test_zone_parser.py +++ b/tests/test_zone_parser.py @@ -1,4 +1,5 @@ """Tests for directdnsonly.app.utils.zone_parser.""" + import pytest from dns.exception import DNSException