style: apply black formatting across codebase 🎨

No logic changes — pure reformatting of line lengths, dict literals,
method-chain line breaks, and trailing newlines to satisfy black's style.
This commit is contained in:
2026-02-18 22:53:09 +13:00
parent 807d6271f1
commit 74c5f4012e
12 changed files with 291 additions and 164 deletions

View File

@@ -113,19 +113,23 @@ class DNSAdminAPI:
if domain_exists:
record = get_domain_record(domain)
return urlencode({
"error": 0,
"exists": 1,
"details": f"Domain exists on {record.hostname}",
})
return urlencode(
{
"error": 0,
"exists": 1,
"details": f"Domain exists on {record.hostname}",
}
)
# parent match only
parent_record = get_parent_domain_record(domain)
return urlencode({
"error": 0,
"exists": 2,
"details": f"Parent Domain exists on {parent_record.hostname}",
})
return urlencode(
{
"error": 0,
"exists": 2,
"details": f"Parent Domain exists on {parent_record.hostname}",
}
)
def _handle_rawsave(self, domain: str, params: dict):
"""Process zone file saves"""

View File

@@ -119,9 +119,7 @@ class CoreDNSMySQLBackend(DNSBackend):
)
session.add(existing_soa)
changes["added"] += 1
logger.debug(
f"Added SOA record: {soa_name} SOA {soa_content}"
)
logger.debug(f"Added SOA record: {soa_name} SOA {soa_content}")
# Process all non-SOA records
for record_name, record_type, record_content, record_ttl in source_records:
@@ -172,7 +170,7 @@ class CoreDNSMySQLBackend(DNSBackend):
changes["removed"] += 1
session.commit()
total_changes = changes['added'] + changes['updated'] + changes['removed']
total_changes = changes["added"] + changes["updated"] + changes["removed"]
if total_changes > 0:
logger.info(
f"[{self.instance_name}] Zone {zone_name} updated: "
@@ -180,9 +178,7 @@ class CoreDNSMySQLBackend(DNSBackend):
f"{changes['removed']} removed"
)
else:
logger.debug(
f"[{self.instance_name}] Zone {zone_name}: no changes"
)
logger.debug(f"[{self.instance_name}] Zone {zone_name}: no changes")
return True
except Exception as e:
@@ -196,7 +192,11 @@ class CoreDNSMySQLBackend(DNSBackend):
session = self.Session()
try:
# First find the zone
zone = session.query(Zone).filter_by(zone_name=self.dot_fqdn(zone_name)).first()
zone = (
session.query(Zone)
.filter_by(zone_name=self.dot_fqdn(zone_name))
.first()
)
if not zone:
logger.warning(f"Zone {zone_name} not found for deletion")
return False
@@ -230,7 +230,9 @@ class CoreDNSMySQLBackend(DNSBackend):
session = self.Session()
try:
exists = (
session.query(Zone).filter_by(zone_name=self.dot_fqdn(zone_name)).first()
session.query(Zone)
.filter_by(zone_name=self.dot_fqdn(zone_name))
.first()
is not None
)
logger.debug(f"Zone existence check for {zone_name}: {exists}")
@@ -266,17 +268,11 @@ class CoreDNSMySQLBackend(DNSBackend):
The normalized CNAME target string
"""
if record_content.startswith("@"):
logger.debug(
f"CNAME target starts with '@', replacing with zone FQDN"
)
logger.debug(f"CNAME target starts with '@', replacing with zone FQDN")
record_content = self.dot_fqdn(zone_name)
elif not record_content.endswith("."):
logger.debug(
f"CNAME target {record_content} is relative, appending zone"
)
record_content = ".".join(
[record_content, self.dot_fqdn(zone_name)]
)
logger.debug(f"CNAME target {record_content} is relative, appending zone")
record_content = ".".join([record_content, self.dot_fqdn(zone_name)])
return record_content
def _parse_zone_to_record_set(
@@ -306,9 +302,7 @@ class CoreDNSMySQLBackend(DNSBackend):
continue
if record_type == "CNAME":
record_content = self._normalize_cname_data(
zone_name, record_content
)
record_content = self._normalize_cname_data(zone_name, record_content)
records.add((record_name, record_type, record_content, ttl))
@@ -341,9 +335,7 @@ class CoreDNSMySQLBackend(DNSBackend):
)
return False, 0
actual_count = (
session.query(Record).filter_by(zone_id=zone.id).count()
)
actual_count = session.query(Record).filter_by(zone_id=zone.id).count()
matches = actual_count == expected_count
if not matches:
@@ -409,14 +401,11 @@ class CoreDNSMySQLBackend(DNSBackend):
)
# Build lookup keys (without TTL) matching write_zone's key format
expected_keys: Set[Tuple[str, str, str]] = {
(hostname, rtype, data)
for hostname, rtype, data, _ in source_records
(hostname, rtype, data) for hostname, rtype, data, _ in source_records
}
# Query all records currently in the backend for this zone
db_records = (
session.query(Record).filter_by(zone_id=zone.id).all()
)
db_records = session.query(Record).filter_by(zone_id=zone.id).all()
removed = 0
for record in db_records:

View File

@@ -109,9 +109,7 @@ class ReconciliationWorker:
f"[reconciler] {hostname}: {len(da_domains) if da_domains else 0} active domain(s) in DA"
)
except Exception as e:
logger.error(
f"[reconciler] Unexpected error polling {hostname}: {e}"
)
logger.error(f"[reconciler] Unexpected error polling {hostname}: {e}")
# Now check local DB for all domains, update master if needed, and queue deletes only from recorded master
session = connect()
@@ -147,12 +145,14 @@ class ReconciliationWorker:
f"(master: {recorded_master})"
)
else:
self.delete_queue.put({
"domain": record.domain,
"hostname": record.hostname,
"username": record.username or "",
"source": "reconciler",
})
self.delete_queue.put(
{
"domain": record.domain,
"hostname": record.hostname,
"username": record.username or "",
"source": "reconciler",
}
)
logger.debug(
f"[reconciler] Queued delete for orphan: {record.domain} "
f"(master: {recorded_master})"
@@ -161,9 +161,13 @@ class ReconciliationWorker:
if migrated or backfilled:
session.commit()
if backfilled:
logger.info(f"[reconciler] {backfilled} domain(s) had missing hostname backfilled.")
logger.info(
f"[reconciler] {backfilled} domain(s) had missing hostname backfilled."
)
if migrated:
logger.info(f"[reconciler] {migrated} domain(s) migrated to new master.")
logger.info(
f"[reconciler] {migrated} domain(s) migrated to new master."
)
finally:
session.close()
if self.dry_run:
@@ -178,7 +182,13 @@ class ReconciliationWorker:
)
def _fetch_da_domains(
self, hostname: str, port: int, username: str, password: str, use_ssl: bool, ipp: int = 1000
self,
hostname: str,
port: int,
username: str,
password: str,
use_ssl: bool,
ipp: int = 1000,
):
"""Fetch all domains from a DA server via CMD_DNS_ADMIN (JSON, paging supported).
@@ -205,13 +215,21 @@ class ReconciliationWorker:
response = requests.get(url, **req_kwargs)
if response.is_redirect or response.status_code in (301, 302, 303, 307, 308):
if response.is_redirect or response.status_code in (
301,
302,
303,
307,
308,
):
if not cookies:
logger.debug(
f"[reconciler] {hostname}:{port} redirected Basic Auth "
f"(HTTP {response.status_code}) — attempting session login (DA Evo)"
)
cookies = self._da_session_login(scheme, hostname, port, username, password)
cookies = self._da_session_login(
scheme, hostname, port, username, password
)
if cookies is None:
return None
continue # retry this page with cookies
@@ -279,9 +297,7 @@ class ReconciliationWorker:
)
return None
except Exception as e:
logger.error(
f"[reconciler] Unexpected error fetching from {hostname}: {e}"
)
logger.error(f"[reconciler] Unexpected error fetching from {hostname}: {e}")
return None
def _da_session_login(
@@ -310,12 +326,12 @@ class ReconciliationWorker:
f"check username/password."
)
return None
logger.debug(f"[reconciler] {hostname}:{port} session login successful (DA Evo)")
logger.debug(
f"[reconciler] {hostname}:{port} session login successful (DA Evo)"
)
return response.cookies
except Exception as e:
logger.error(
f"[reconciler] {hostname}:{port} session login failed: {e}"
)
logger.error(f"[reconciler] {hostname}:{port} session login failed: {e}")
return None
@staticmethod
@@ -334,24 +350,44 @@ class ReconciliationWorker:
domains = params.get("list[]", [])
return {d.strip().lower() for d in domains if d.strip()}
if __name__ == "__main__":
import argparse
import sys
from queue import Queue
parser = argparse.ArgumentParser(description="Test DirectAdmin domain fetcher (JSON/paging)")
parser = argparse.ArgumentParser(
description="Test DirectAdmin domain fetcher (JSON/paging)"
)
parser.add_argument("--hostname", required=True, help="DirectAdmin server hostname")
parser.add_argument("--port", type=int, default=2222, help="DirectAdmin port (default: 2222)")
parser.add_argument(
"--port", type=int, default=2222, help="DirectAdmin port (default: 2222)"
)
parser.add_argument("--username", required=True, help="DirectAdmin admin username")
parser.add_argument("--password", required=True, help="DirectAdmin admin password")
parser.add_argument("--ssl", action="store_true", help="Use HTTPS (default: True)")
parser.add_argument("--no-ssl", dest="ssl", action="store_false", help="Use HTTP (not recommended)")
parser.add_argument(
"--no-ssl", dest="ssl", action="store_false", help="Use HTTP (not recommended)"
)
parser.set_defaults(ssl=True)
parser.add_argument("--verify-ssl", action="store_true", help="Verify SSL certs (default: True)")
parser.add_argument("--no-verify-ssl", dest="verify_ssl", action="store_false", help="Don't verify SSL certs")
parser.add_argument(
"--verify-ssl", action="store_true", help="Verify SSL certs (default: True)"
)
parser.add_argument(
"--no-verify-ssl",
dest="verify_ssl",
action="store_false",
help="Don't verify SSL certs",
)
parser.set_defaults(verify_ssl=True)
parser.add_argument("--ipp", type=int, default=1000, help="Items per page (default: 1000)")
parser.add_argument("--print-json", action="store_true", help="Print raw JSON response for first page")
parser.add_argument(
"--ipp", type=int, default=1000, help="Items per page (default: 1000)"
)
parser.add_argument(
"--print-json",
action="store_true",
help="Print raw JSON response for first page",
)
args = parser.parse_args()
@@ -372,7 +408,9 @@ if __name__ == "__main__":
q = Queue()
worker = ReconciliationWorker(q, config)
server = config["directadmin_servers"][0]
print(f"Fetching domains from {server['hostname']}:{server['port']} (ipp={args.ipp})...")
print(
f"Fetching domains from {server['hostname']}:{server['port']} (ipp={args.ipp})..."
)
# Directly call the fetch method for testing
domains = worker._fetch_da_domains(
server["hostname"],
@@ -380,7 +418,7 @@ if __name__ == "__main__":
server.get("username"),
server.get("password"),
server.get("ssl", True),
ipp=args.ipp
ipp=args.ipp,
)
if domains is None:
print("Failed to fetch domains.", file=sys.stderr)

View File

@@ -59,9 +59,7 @@ def count_zone_records(zone_data: str, domain_name: str) -> int:
if rdata.rdclass == IN:
count += 1
logger.debug(
f"Source zone {domain_name} contains {count} records"
)
logger.debug(f"Source zone {domain_name} contains {count} records")
return count
except DNSException as e:

View File

@@ -50,7 +50,9 @@ def main():
# Configure CherryPy
user_password_dict = {
config.get_string("app.auth_username"): config.get_string("app.auth_password")
config.get_string("app.auth_username"): config.get_string(
"app.auth_password"
)
}
check_password = cherrypy.lib.auth_basic.checkpassword_dict(user_password_dict)

View File

@@ -14,7 +14,9 @@ from directdnsonly.app.reconciler import ReconciliationWorker
class WorkerManager:
def __init__(self, queue_path: str, backend_registry, reconciliation_config: dict = None):
def __init__(
self, queue_path: str, backend_registry, reconciliation_config: dict = None
):
self.queue_path = queue_path
self.backend_registry = backend_registry
self._running = False
@@ -86,9 +88,7 @@ class WorkerManager:
f"{len(backends)} backends concurrently: "
f"{', '.join(backends.keys())}"
)
self._process_backends_parallel(
backends, item, session
)
self._process_backends_parallel(backends, item, session)
else:
# Single backend, no need for thread overhead
for backend_name, backend in backends.items():
@@ -126,9 +126,7 @@ class WorkerManager:
try:
logger.debug(f"Using backend: {backend_name}")
if backend.write_zone(item["domain"], item["zone_file"]):
logger.debug(
f"Successfully updated {item['domain']} in {backend_name}"
)
logger.debug(f"Successfully updated {item['domain']} in {backend_name}")
if backend.get_name() == "bind":
# Need to update the named.conf
backend.update_named_conf(
@@ -144,9 +142,7 @@ class WorkerManager:
backend_name, backend, item["domain"], item["zone_file"]
)
else:
logger.error(
f"Failed to update {item['domain']} in {backend_name}"
)
logger.error(f"Failed to update {item['domain']} in {backend_name}")
except Exception as e:
logger.error(f"Error in {backend_name}: {str(e)}")
@@ -165,9 +161,7 @@ class WorkerManager:
record = session.query(Domain).filter_by(domain=domain).first()
if not record:
logger.warning(
f"Domain {domain} not found in DB — skipping delete"
)
logger.warning(f"Domain {domain} not found in DB — skipping delete")
self.delete_queue.task_done()
continue
@@ -194,16 +188,29 @@ class WorkerManager:
elif len(backends) > 1:
# Parallel delete, track failures
results = []
def delete_backend_wrapper(backend_name, backend, domain, remaining_domains):
def delete_backend_wrapper(
backend_name, backend, domain, remaining_domains
):
try:
return backend.delete_zone(domain)
except Exception as e:
logger.error(f"Error deleting {domain} from {backend_name}: {e}")
logger.error(
f"Error deleting {domain} from {backend_name}: {e}"
)
return False
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=len(backends)) as executor:
futures = {
executor.submit(delete_backend_wrapper, backend_name, backend, domain, remaining_domains): backend_name
executor.submit(
delete_backend_wrapper,
backend_name,
backend,
domain,
remaining_domains,
): backend_name
for backend_name, backend in backends.items()
}
for future in as_completed(futures):
@@ -212,9 +219,13 @@ class WorkerManager:
result = future.result()
results.append(result)
if not result:
logger.error(f"Failed to delete {domain} from {backend_name}")
logger.error(
f"Failed to delete {domain} from {backend_name}"
)
except Exception as e:
logger.error(f"Unhandled error deleting from {backend_name}: {e}")
logger.error(
f"Unhandled error deleting from {backend_name}: {e}"
)
results.append(False)
delete_success = all(results)
else:
@@ -223,10 +234,14 @@ class WorkerManager:
try:
result = backend.delete_zone(domain)
if not result:
logger.error(f"Failed to delete {domain} from {backend_name}")
logger.error(
f"Failed to delete {domain} from {backend_name}"
)
delete_success = False
except Exception as e:
logger.error(f"Error deleting {domain} from {backend_name}: {e}")
logger.error(
f"Error deleting {domain} from {backend_name}: {e}"
)
delete_success = False
if delete_success:
@@ -236,7 +251,9 @@ class WorkerManager:
self.delete_queue.task_done()
logger.success(f"Delete completed for {domain}")
else:
logger.error(f"Delete failed for {domain} on one or more backends — DB record retained")
logger.error(
f"Delete failed for {domain} on one or more backends — DB record retained"
)
self.delete_queue.task_done()
except Empty:
@@ -270,7 +287,10 @@ class WorkerManager:
futures = {
executor.submit(
self._delete_single_backend,
backend_name, backend, domain, remaining_domains
backend_name,
backend,
domain,
remaining_domains,
): backend_name
for backend_name, backend in backends.items()
}
@@ -279,9 +299,7 @@ class WorkerManager:
try:
future.result()
except Exception as e:
logger.error(
f"Unhandled error deleting from {backend_name}: {e}"
)
logger.error(f"Unhandled error deleting from {backend_name}: {e}")
elapsed = (time.monotonic() - start_time) * 1000
logger.debug(
f"Parallel delete of {domain} across "
@@ -292,13 +310,11 @@ class WorkerManager:
"""Process zone updates across multiple backends in parallel"""
start_time = time.monotonic()
with ThreadPoolExecutor(
max_workers=len(backends),
thread_name_prefix="backend"
max_workers=len(backends), thread_name_prefix="backend"
) as executor:
futures = {
executor.submit(
self._process_single_backend,
backend_name, backend, item, session
self._process_single_backend, backend_name, backend, item, session
): backend_name
for backend_name, backend in backends.items()
}
@@ -317,9 +333,7 @@ class WorkerManager:
f"{len(backends)} backends completed in {elapsed:.0f}ms"
)
def _verify_backend_record_count(
self, backend_name, backend, zone_name, zone_data
):
def _verify_backend_record_count(self, backend_name, backend, zone_name, zone_data):
"""Verify and reconcile the backend record count against the
authoritative BIND zone from DirectAdmin.
@@ -344,9 +358,7 @@ class WorkerManager:
)
return
matches, actual = backend.verify_zone_record_count(
zone_name, expected
)
matches, actual = backend.verify_zone_record_count(zone_name, expected)
if matches:
return # All good
@@ -357,9 +369,7 @@ class WorkerManager:
f"record(s) for {zone_name} — reconciling against "
f"DirectAdmin source zone"
)
success, removed = backend.reconcile_zone_records(
zone_name, zone_data
)
success, removed = backend.reconcile_zone_records(zone_name, zone_data)
if success and removed > 0:
# Verify again after reconciliation
matches, new_count = backend.verify_zone_record_count(
@@ -437,6 +447,9 @@ class WorkerManager:
"save_queue_size": self.save_queue.qsize(),
"delete_queue_size": self.delete_queue.qsize(),
"save_worker_alive": self._save_thread and self._save_thread.is_alive(),
"delete_worker_alive": self._delete_thread and self._delete_thread.is_alive(),
"reconciler_alive": self._reconciler.is_alive if self._reconciler else False,
"delete_worker_alive": self._delete_thread
and self._delete_thread.is_alive(),
"reconciler_alive": (
self._reconciler.is_alive if self._reconciler else False
),
}

View File

@@ -1,10 +1,14 @@
"""Shared test fixtures for directdnsonly test suite."""
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from directdnsonly.app.db import Base
from directdnsonly.app.db.models import Domain, Key # noqa: F401 — registers models with Base
from directdnsonly.app.db.models import (
Domain,
Key,
) # noqa: F401 — registers models with Base
@pytest.fixture

View File

@@ -1,4 +1,5 @@
"""Tests for directdnsonly.app.api.admin — DNSAdminAPI handler methods."""
import pytest
from unittest.mock import MagicMock, patch
from urllib.parse import parse_qs
@@ -60,8 +61,12 @@ def test_handle_exists_unsupported_action_returns_error(api):
def test_handle_exists_domain_not_found(api):
with patch("directdnsonly.app.api.admin.check_zone_exists", return_value=False), \
patch("directdnsonly.app.api.admin.check_parent_domain_owner", return_value=False):
with (
patch("directdnsonly.app.api.admin.check_zone_exists", return_value=False),
patch(
"directdnsonly.app.api.admin.check_parent_domain_owner", return_value=False
),
):
result = api._handle_exists({"action": "exists", "domain": "example.com"})
parsed = parse_qs(result)
@@ -73,8 +78,10 @@ def test_handle_exists_domain_found(api):
record = MagicMock()
record.hostname = "da1.example.com"
with patch("directdnsonly.app.api.admin.check_zone_exists", return_value=True), \
patch("directdnsonly.app.api.admin.get_domain_record", return_value=record):
with (
patch("directdnsonly.app.api.admin.check_zone_exists", return_value=True),
patch("directdnsonly.app.api.admin.get_domain_record", return_value=record),
):
result = api._handle_exists({"action": "exists", "domain": "example.com"})
parsed = parse_qs(result)
@@ -87,14 +94,22 @@ def test_handle_exists_parent_found(api):
parent = MagicMock()
parent.hostname = "da2.example.com"
with patch("directdnsonly.app.api.admin.check_zone_exists", return_value=False), \
patch("directdnsonly.app.api.admin.check_parent_domain_owner", return_value=True), \
patch("directdnsonly.app.api.admin.get_parent_domain_record", return_value=parent):
result = api._handle_exists({
"action": "exists",
"domain": "sub.example.com",
"check_for_parent_domain": "1",
})
with (
patch("directdnsonly.app.api.admin.check_zone_exists", return_value=False),
patch(
"directdnsonly.app.api.admin.check_parent_domain_owner", return_value=True
),
patch(
"directdnsonly.app.api.admin.get_parent_domain_record", return_value=parent
),
):
result = api._handle_exists(
{
"action": "exists",
"domain": "sub.example.com",
"check_for_parent_domain": "1",
}
)
parsed = parse_qs(result)
assert parsed["error"] == ["0"]
@@ -107,9 +122,11 @@ def test_handle_exists_no_parent_check_when_flag_absent(api):
record = MagicMock()
record.hostname = "da1.example.com"
with patch("directdnsonly.app.api.admin.check_zone_exists", return_value=True), \
patch("directdnsonly.app.api.admin.check_parent_domain_owner") as mock_parent, \
patch("directdnsonly.app.api.admin.get_domain_record", return_value=record):
with (
patch("directdnsonly.app.api.admin.check_zone_exists", return_value=True),
patch("directdnsonly.app.api.admin.check_parent_domain_owner") as mock_parent,
patch("directdnsonly.app.api.admin.get_domain_record", return_value=record),
):
api._handle_exists({"action": "exists", "domain": "example.com"})
mock_parent.assert_not_called()
@@ -123,14 +140,21 @@ SAMPLE_ZONE = "$ORIGIN example.com.\n$TTL 300\nexample.com. 300 IN A 1.2.3.4\n"
def test_rawsave_enqueues_item(api, save_queue):
with patch("directdnsonly.app.api.admin.validate_and_normalize_zone",
return_value=SAMPLE_ZONE), \
patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))):
result = api._handle_rawsave("example.com", {
"zone_file": SAMPLE_ZONE,
"hostname": "da1.example.com",
"username": "admin",
})
with (
patch(
"directdnsonly.app.api.admin.validate_and_normalize_zone",
return_value=SAMPLE_ZONE,
),
patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))),
):
result = api._handle_rawsave(
"example.com",
{
"zone_file": SAMPLE_ZONE,
"hostname": "da1.example.com",
"username": "admin",
},
)
save_queue.put.assert_called_once()
item = save_queue.put.call_args[0][0]
@@ -150,9 +174,13 @@ def test_rawsave_missing_zone_file_raises(api):
def test_rawsave_invalid_zone_raises(api):
with patch("directdnsonly.app.api.admin.validate_and_normalize_zone",
side_effect=ValueError("Invalid zone data: bad record")), \
patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))):
with (
patch(
"directdnsonly.app.api.admin.validate_and_normalize_zone",
side_effect=ValueError("Invalid zone data: bad record"),
),
patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))),
):
with pytest.raises(ValueError, match="Invalid zone data"):
api._handle_rawsave("example.com", {"zone_file": "garbage"})
@@ -164,10 +192,13 @@ def test_rawsave_invalid_zone_raises(api):
def test_delete_enqueues_item(api, delete_queue):
with patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="10.0.0.1"))):
result = api._handle_delete("example.com", {
"hostname": "da1.example.com",
"username": "admin",
})
result = api._handle_delete(
"example.com",
{
"hostname": "da1.example.com",
"username": "admin",
},
)
delete_queue.put.assert_called_once()
item = delete_queue.put.call_args[0][0]

View File

@@ -1,4 +1,5 @@
"""Tests for the CoreDNS MySQL backend (run against in-memory SQLite)."""
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
@@ -141,8 +142,16 @@ def test_reconcile_removes_extra_records(mysql_backend):
# Inject a phantom record directly into the DB
session = mysql_backend.Session()
zone = session.query(Zone).filter_by(zone_name="example.com.").first()
session.add(Record(zone_id=zone.id, hostname="phantom", type="A",
data="10.0.0.99", ttl=300, online=True))
session.add(
Record(
zone_id=zone.id,
hostname="phantom",
type="A",
data="10.0.0.99",
ttl=300,
online=True,
)
)
session.commit()
session.close()

View File

@@ -1,4 +1,5 @@
"""Tests for directdnsonly.app.reconciler — ReconciliationWorker."""
import pytest
import requests.exceptions
from queue import Queue
@@ -12,7 +13,13 @@ from directdnsonly.app.db.models import Domain
# Fixtures
# ---------------------------------------------------------------------------
SERVER = {"hostname": "da1.example.com", "port": 2222, "username": "admin", "password": "secret", "ssl": True}
SERVER = {
"hostname": "da1.example.com",
"port": 2222,
"username": "admin",
"password": "secret",
"ssl": True,
}
BASE_CONFIG = {
"enabled": True,
@@ -46,7 +53,9 @@ def dry_run_worker(delete_queue):
def test_orphan_queued_when_domain_missing_from_da(worker, delete_queue, patch_connect):
patch_connect.add(Domain(domain="orphan.com", hostname="da1.example.com", username="admin"))
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value=set()):
@@ -59,7 +68,9 @@ def test_orphan_queued_when_domain_missing_from_da(worker, delete_queue, patch_c
def test_orphan_not_queued_in_dry_run(dry_run_worker, delete_queue, patch_connect):
patch_connect.add(Domain(domain="orphan.com", hostname="da1.example.com", username="admin"))
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with patch.object(dry_run_worker, "_fetch_da_domains", return_value=set()):
@@ -70,7 +81,9 @@ def test_orphan_not_queued_in_dry_run(dry_run_worker, delete_queue, patch_connec
def test_orphan_not_queued_for_unknown_server(worker, delete_queue, patch_connect):
"""Domains whose recorded master is NOT in our configured servers are skipped."""
patch_connect.add(Domain(domain="other.com", hostname="da99.unknown.com", username="admin"))
patch_connect.add(
Domain(domain="other.com", hostname="da99.unknown.com", username="admin")
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value=set()):
@@ -80,7 +93,9 @@ def test_orphan_not_queued_for_unknown_server(worker, delete_queue, patch_connec
def test_active_domain_not_queued(worker, delete_queue, patch_connect):
patch_connect.add(Domain(domain="good.com", hostname="da1.example.com", username="admin"))
patch_connect.add(
Domain(domain="good.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value={"good.com"}):
@@ -106,7 +121,9 @@ def test_backfill_null_hostname(worker, patch_connect):
def test_migration_updates_hostname(worker, patch_connect):
patch_connect.add(Domain(domain="moved.com", hostname="da-old.example.com", username="admin"))
patch_connect.add(
Domain(domain="moved.com", hostname="da-old.example.com", username="admin")
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value={"moved.com"}):
@@ -150,7 +167,9 @@ def test_fetch_returns_domains_from_json(worker):
mock_resp = _make_json_response(["example.com", "test.com"])
with patch("requests.get", return_value=mock_resp):
result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True)
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result == {"example.com", "test.com"}
@@ -160,7 +179,9 @@ def test_fetch_paginates(worker):
page2 = _make_json_response(["b.com"], total_pages=2)
with patch("requests.get", side_effect=[page1, page2]):
result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True)
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result == {"a.com", "b.com"}
@@ -170,9 +191,13 @@ def test_fetch_redirect_triggers_session_login(worker):
redirect_resp.status_code = 302
redirect_resp.is_redirect = True
with patch("requests.get", return_value=redirect_resp), \
patch.object(worker, "_da_session_login", return_value=None):
result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True)
with (
patch("requests.get", return_value=redirect_resp),
patch.object(worker, "_da_session_login", return_value=None),
):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
@@ -185,28 +210,40 @@ def test_fetch_html_response_returns_none(worker):
mock_resp.raise_for_status = MagicMock()
with patch("requests.get", return_value=mock_resp):
result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True)
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
def test_fetch_connection_error_returns_none(worker):
with patch("requests.get", side_effect=requests.exceptions.ConnectionError("refused")):
result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True)
with patch(
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
def test_fetch_timeout_returns_none(worker):
with patch("requests.get", side_effect=requests.exceptions.Timeout()):
result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True)
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
def test_fetch_ssl_error_returns_none(worker):
with patch("requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")):
result = worker._fetch_da_domains("da1.example.com", 2222, "admin", "secret", True)
with patch(
"requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")
):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None

View File

@@ -1,4 +1,5 @@
"""Tests for directdnsonly.app.utils — zone index helper functions."""
import pytest
from directdnsonly.app.db.models import Domain

View File

@@ -1,4 +1,5 @@
"""Tests for directdnsonly.app.utils.zone_parser."""
import pytest
from dns.exception import DNSException