chore: upgrade SQLAlchemy to 2.0 and bump all stale deps ⬆️

- SQLAlchemy 1.4 → 2.0.46: migrate all session.query() calls to
  select() / session.execute() style; move declarative_base import
  from ext.declarative to sqlalchemy.orm; explicit conn.commit()
  after DDL in _migrate(); drop sessionmaker(bind=) keyword
- persist-queue 1.0 → 1.1, pymysql 1.1.1 → 1.1.2,
  dnspython 2.7 → 2.8, pyyaml 6.0.2 → 6.0.3
- pytest 8.3 → 9.0.2, pytest-cov 6.1 → 7.0,
  pytest-mock 3.14 → 3.15.1, black 25.1 → 26.1

97 tests pass, zero deprecation warnings
This commit is contained in:
2026-02-19 23:37:15 +13:00
parent 22e64498ce
commit 8c1c2b4abc
9 changed files with 782 additions and 483 deletions

View File

@@ -1,8 +1,7 @@
from typing import Optional, Dict, Set, Tuple, Any from typing import Optional, Dict, Set, Tuple, Any
from sqlalchemy import create_engine, Column, String, Integer, Text, ForeignKey, Boolean from sqlalchemy import create_engine, Column, String, Integer, Text, ForeignKey, Boolean
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, scoped_session, relationship, declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session, relationship
from dns import zone as dns_zone_module from dns import zone as dns_zone_module
from dns.rdataclass import IN from dns.rdataclass import IN
from loguru import logger from loguru import logger

View File

@@ -1,6 +1,5 @@
from sqlalchemy import create_engine, text from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.ext.declarative import declarative_base
from vyper import v from vyper import v
from loguru import logger from loguru import logger
@@ -13,7 +12,11 @@ def _migrate(engine):
"""Apply additive schema migrations for columns added after initial release.""" """Apply additive schema migrations for columns added after initial release."""
migrations = [ migrations = [
("domains", "zone_data", "ALTER TABLE domains ADD COLUMN zone_data TEXT"), ("domains", "zone_data", "ALTER TABLE domains ADD COLUMN zone_data TEXT"),
("domains", "zone_updated_at", "ALTER TABLE domains ADD COLUMN zone_updated_at DATETIME"), (
"domains",
"zone_updated_at",
"ALTER TABLE domains ADD COLUMN zone_updated_at DATETIME",
),
] ]
with engine.connect() as conn: with engine.connect() as conn:
for table, column, ddl in migrations: for table, column, ddl in migrations:
@@ -22,6 +25,7 @@ def _migrate(engine):
except Exception: except Exception:
try: try:
conn.execute(text(ddl)) conn.execute(text(ddl))
conn.commit()
logger.info(f"[db] Migration applied: added {table}.{column}") logger.info(f"[db] Migration applied: added {table}.{column}")
except Exception as exc: except Exception as exc:
logger.warning(f"[db] Migration skipped ({table}.{column}): {exc}") logger.warning(f"[db] Migration skipped ({table}.{column}): {exc}")
@@ -39,7 +43,7 @@ def connect(dbtype="sqlite", **kwargs):
) )
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
_migrate(engine) _migrate(engine)
return sessionmaker(bind=engine)() return sessionmaker(engine)()
elif dbtype == "mysql": elif dbtype == "mysql":
# Start a MySQL engine # Start a MySQL engine
db_user = v.get_string("datastore.user") db_user = v.get_string("datastore.user")
@@ -71,6 +75,6 @@ def connect(dbtype="sqlite", **kwargs):
) )
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
_migrate(engine) _migrate(engine)
return sessionmaker(bind=engine)() return sessionmaker(engine)()
else: else:
raise Exception("Unknown/unimplemented database type: {}".format(dbtype)) raise Exception("Unknown/unimplemented database type: {}".format(dbtype))

View File

@@ -22,6 +22,7 @@ import datetime
import threading import threading
from loguru import logger from loguru import logger
import requests import requests
from sqlalchemy import select
from directdnsonly.app.db import connect from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain from directdnsonly.app.db.models import Domain
@@ -44,9 +45,7 @@ class PeerSyncWorker:
logger.info("Peer sync disabled — skipping") logger.info("Peer sync disabled — skipping")
return return
if not self.peers: if not self.peers:
logger.warning( logger.warning("Peer sync enabled but no peers configured")
"Peer sync enabled but no peers configured"
)
return return
self._stop_event.clear() self._stop_event.clear()
@@ -82,9 +81,7 @@ class PeerSyncWorker:
self._sync_all() self._sync_all()
def _sync_all(self): def _sync_all(self):
logger.debug( logger.debug(f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)")
f"[peer_sync] Starting sync pass across {len(self.peers)} peer(s)"
)
for peer in self.peers: for peer in self.peers:
url = peer.get("url") url = peer.get("url")
if not url: if not url:
@@ -93,9 +90,7 @@ class PeerSyncWorker:
try: try:
self._sync_from_peer(peer) self._sync_from_peer(peer)
except Exception as exc: except Exception as exc:
logger.warning( logger.warning(f"[peer_sync] Skipping unreachable peer {url}: {exc}")
f"[peer_sync] Skipping unreachable peer {url}: {exc}"
)
def _sync_from_peer(self, peer: dict): def _sync_from_peer(self, peer: dict):
url = peer.get("url", "").rstrip("/") url = peer.get("url", "").rstrip("/")
@@ -104,9 +99,7 @@ class PeerSyncWorker:
auth = (username, password) if username else None auth = (username, password) if username else None
# Fetch the peer's zone list # Fetch the peer's zone list
resp = requests.get( resp = requests.get(f"{url}/internal/zones", auth=auth, timeout=10)
f"{url}/internal/zones", auth=auth, timeout=10
)
if resp.status_code != 200: if resp.status_code != 200:
logger.warning( logger.warning(
f"[peer_sync] {url}: /internal/zones returned {resp.status_code}" f"[peer_sync] {url}: /internal/zones returned {resp.status_code}"
@@ -133,7 +126,9 @@ class PeerSyncWorker:
else None else None
) )
local = session.query(Domain).filter_by(domain=domain).first() local = session.execute(
select(Domain).filter_by(domain=domain)
).scalar_one_or_none()
needs_sync = ( needs_sync = (
local is None local is None
@@ -183,16 +178,12 @@ class PeerSyncWorker:
else: else:
local.zone_data = zone_data local.zone_data = zone_data
local.zone_updated_at = peer_ts local.zone_updated_at = peer_ts
logger.debug( logger.debug(f"[peer_sync] {url}: updated zone_data for {domain}")
f"[peer_sync] {url}: updated zone_data for {domain}"
)
synced += 1 synced += 1
if synced: if synced:
session.commit() session.commit()
logger.info( logger.info(f"[peer_sync] Synced {synced} zone(s) from {url}")
f"[peer_sync] Synced {synced} zone(s) from {url}"
)
else: else:
logger.debug(f"[peer_sync] {url}: already up to date") logger.debug(f"[peer_sync] {url}: already up to date")
finally: finally:

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import threading import threading
from loguru import logger from loguru import logger
from sqlalchemy import select
from directdnsonly.app.da import DirectAdminClient from directdnsonly.app.da import DirectAdminClient
from directdnsonly.app.db import connect from directdnsonly.app.db import connect
@@ -60,7 +61,9 @@ class ReconciliationWorker:
server_names = [s.get("hostname", "?") for s in self.servers] server_names = [s.get("hostname", "?") for s in self.servers]
mode = "DRY-RUN" if self.dry_run else "LIVE" mode = "DRY-RUN" if self.dry_run else "LIVE"
delay_str = ( delay_str = (
f", initial_delay: {self._initial_delay // 60}m" if self._initial_delay else "" f", initial_delay: {self._initial_delay // 60}m"
if self._initial_delay
else ""
) )
logger.info( logger.info(
f"Reconciliation poller started [{mode}] — " f"Reconciliation poller started [{mode}] — "
@@ -137,7 +140,7 @@ class ReconciliationWorker:
# Compare local DB against what DA reported; update masters and queue deletes # Compare local DB against what DA reported; update masters and queue deletes
session = connect() session = connect()
try: try:
all_local_domains = session.query(Domain).all() all_local_domains = session.execute(select(Domain)).scalars().all()
migrated = 0 migrated = 0
backfilled = 0 backfilled = 0
known_servers = {s.get("hostname") for s in self.servers} known_servers = {s.get("hostname") for s in self.servers}
@@ -221,13 +224,13 @@ class ReconciliationWorker:
session = connect() session = connect()
try: try:
domains = ( domains = session.execute(
session.query(Domain) select(Domain).where(Domain.zone_data.isnot(None))
.filter(Domain.zone_data.isnot(None)) ).scalars().all()
.all()
)
if not domains: if not domains:
logger.debug("[reconciler] Healing pass: no zone_data stored yet — skipping") logger.debug(
"[reconciler] Healing pass: no zone_data stored yet — skipping"
)
return return
healed = 0 healed = 0

View File

@@ -1,4 +1,5 @@
from loguru import logger from loguru import logger
from sqlalchemy import select
from directdnsonly.app.db.models import * from directdnsonly.app.db.models import *
from directdnsonly.app.db import connect from directdnsonly.app.db import connect
@@ -8,12 +9,11 @@ def check_zone_exists(zone_name):
# Check if zone is present in the index # Check if zone is present in the index
session = connect() session = connect()
logger.debug("Checking if {} is present in the DB".format(zone_name)) logger.debug("Checking if {} is present in the DB".format(zone_name))
domain_exists = bool(session.query(Domain.id).filter_by(domain=zone_name).first()) domain_exists = bool(
session.execute(select(Domain.id).filter_by(domain=zone_name)).first()
)
logger.debug("Returned from query: {}".format(domain_exists)) logger.debug("Returned from query: {}".format(domain_exists))
if domain_exists: return domain_exists
return True
else:
return False
def put_zone_index(zone_name, host_name, user_name): def put_zone_index(zone_name, host_name, user_name):
@@ -28,7 +28,9 @@ def put_zone_index(zone_name, host_name, user_name):
def get_domain_record(zone_name): def get_domain_record(zone_name):
"""Return the Domain record for zone_name, or None if not found""" """Return the Domain record for zone_name, or None if not found"""
session = connect() session = connect()
return session.query(Domain).filter_by(domain=zone_name).first() return session.execute(
select(Domain).filter_by(domain=zone_name)
).scalar_one_or_none()
def check_parent_domain_owner(zone_name): def check_parent_domain_owner(zone_name):
@@ -38,7 +40,9 @@ def check_parent_domain_owner(zone_name):
return False return False
session = connect() session = connect()
logger.debug("Checking if parent domain {} exists in DB".format(parent_domain)) logger.debug("Checking if parent domain {} exists in DB".format(parent_domain))
return bool(session.query(Domain.id).filter_by(domain=parent_domain).first()) return bool(
session.execute(select(Domain.id).filter_by(domain=parent_domain)).first()
)
def get_parent_domain_record(zone_name): def get_parent_domain_record(zone_name):
@@ -47,4 +51,6 @@ def get_parent_domain_record(zone_name):
if not parent_domain: if not parent_domain:
return None return None
session = connect() session = connect()
return session.query(Domain).filter_by(domain=parent_domain).first() return session.execute(
select(Domain).filter_by(domain=parent_domain)
).scalar_one_or_none()

View File

@@ -6,6 +6,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from loguru import logger from loguru import logger
from persistqueue import Queue from persistqueue import Queue
from persistqueue.exceptions import Empty from persistqueue.exceptions import Empty
from sqlalchemy import select
from app.utils import check_zone_exists, put_zone_index from app.utils import check_zone_exists, put_zone_index
from app.utils.zone_parser import count_zone_records from app.utils.zone_parser import count_zone_records
@@ -89,9 +90,7 @@ class WorkerManager:
) )
if not is_retry and not check_zone_exists(domain): if not is_retry and not check_zone_exists(domain):
put_zone_index( put_zone_index(domain, item.get("hostname"), item.get("username"))
domain, item.get("hostname"), item.get("username")
)
if not all(k in item for k in ["domain", "zone_file"]): if not all(k in item for k in ["domain", "zone_file"]):
logger.error(f"Invalid queue item: {item}") logger.error(f"Invalid queue item: {item}")
@@ -101,7 +100,9 @@ class WorkerManager:
backends = self.backend_registry.get_available_backends() backends = self.backend_registry.get_available_backends()
if target_backends: if target_backends:
backends = {k: v for k, v in backends.items() if k in target_backends} backends = {
k: v for k, v in backends.items() if k in target_backends
}
if not backends: if not backends:
logger.warning("No target backends available for this item!") logger.warning("No target backends available for this item!")
self.save_queue.task_done() self.save_queue.task_done()
@@ -113,7 +114,9 @@ class WorkerManager:
else: else:
failed = set() failed = set()
for backend_name, backend in backends.items(): for backend_name, backend in backends.items():
if not self._process_single_backend(backend_name, backend, item, session): if not self._process_single_backend(
backend_name, backend, item, session
):
failed.add(backend_name) failed.add(backend_name)
if failed: if failed:
@@ -154,7 +157,7 @@ class WorkerManager:
logger.debug(f"Successfully updated {item['domain']} in {backend_name}") logger.debug(f"Successfully updated {item['domain']} in {backend_name}")
if backend.get_name() == "bind": if backend.get_name() == "bind":
backend.update_named_conf( backend.update_named_conf(
[d.domain for d in session.query(Domain).all()] [d.domain for d in session.execute(select(Domain)).scalars().all()]
) )
backend.reload_zone() backend.reload_zone()
else: else:
@@ -227,7 +230,9 @@ class WorkerManager:
def _store_zone_data(self, session, domain: str, zone_file: str): def _store_zone_data(self, session, domain: str, zone_file: str):
"""Persist the latest zone file content to the domain DB record.""" """Persist the latest zone file content to the domain DB record."""
try: try:
record = session.query(Domain).filter_by(domain=domain).first() record = session.execute(
select(Domain).filter_by(domain=domain)
).scalar_one_or_none()
if record: if record:
record.zone_data = zone_file record.zone_data = zone_file
record.zone_updated_at = datetime.datetime.utcnow() record.zone_updated_at = datetime.datetime.utcnow()
@@ -294,7 +299,9 @@ class WorkerManager:
logger.debug(f"Processing delete for {domain}") logger.debug(f"Processing delete for {domain}")
record = session.query(Domain).filter_by(domain=domain).first() record = session.execute(
select(Domain).filter_by(domain=domain)
).scalar_one_or_none()
if not record: if not record:
logger.warning(f"Domain {domain} not found in DB — skipping delete") logger.warning(f"Domain {domain} not found in DB — skipping delete")
self.delete_queue.task_done() self.delete_queue.task_done()
@@ -314,7 +321,9 @@ class WorkerManager:
) )
backends = self.backend_registry.get_available_backends() backends = self.backend_registry.get_available_backends()
remaining_domains = [d.domain for d in session.query(Domain).all()] remaining_domains = [
d.domain for d in session.execute(select(Domain)).scalars().all()
]
delete_success = True delete_success = True
if not backends: if not backends:
@@ -327,7 +336,10 @@ class WorkerManager:
futures = { futures = {
executor.submit( executor.submit(
self._delete_single_backend, self._delete_single_backend,
backend_name, backend, domain, remaining_domains backend_name,
backend,
domain,
remaining_domains,
): backend_name ): backend_name
for backend_name, backend in backends.items() for backend_name, backend in backends.items()
} }
@@ -462,9 +474,7 @@ class WorkerManager:
self._save_thread.start() self._save_thread.start()
self._delete_thread.start() self._delete_thread.start()
self._retry_thread.start() self._retry_thread.start()
logger.info( logger.info(f"Started worker threads: save, delete, retry_drain")
f"Started worker threads: save, delete, retry_drain"
)
self._reconciler = ReconciliationWorker( self._reconciler = ReconciliationWorker(
delete_queue=self.delete_queue, delete_queue=self.delete_queue,
@@ -494,7 +504,8 @@ class WorkerManager:
"delete_queue_size": self.delete_queue.qsize(), "delete_queue_size": self.delete_queue.qsize(),
"retry_queue_size": self.retry_queue.qsize(), "retry_queue_size": self.retry_queue.qsize(),
"save_worker_alive": self._save_thread and self._save_thread.is_alive(), "save_worker_alive": self._save_thread and self._save_thread.is_alive(),
"delete_worker_alive": self._delete_thread and self._delete_thread.is_alive(), "delete_worker_alive": self._delete_thread
and self._delete_thread.is_alive(),
"retry_worker_alive": self._retry_thread and self._retry_thread.is_alive(), "retry_worker_alive": self._retry_thread and self._retry_thread.is_alive(),
"reconciler_alive": ( "reconciler_alive": (
self._reconciler.is_alive if self._reconciler else False self._reconciler.is_alive if self._reconciler else False

1117
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -11,12 +11,12 @@ requires-python = ">=3.11,<3.14"
dependencies = [ dependencies = [
"vyper-config (>=1.2.1,<2.0.0)", "vyper-config (>=1.2.1,<2.0.0)",
"loguru (>=0.7.3,<0.8.0)", "loguru (>=0.7.3,<0.8.0)",
"persist-queue (>=1.0.0,<2.0.0)", "persist-queue (>=1.1.0,<2.0.0)",
"cherrypy (>=18.10.0,<19.0.0)", "cherrypy (>=18.10.0,<19.0.0)",
"sqlalchemy (<2.0.0)", "sqlalchemy (>=2.0.0,<3.0.0)",
"pymysql (>=1.1.1,<2.0.0)", "pymysql (>=1.1.2,<2.0.0)",
"dnspython (>=2.7.0,<3.0.0)", "dnspython (>=2.8.0,<3.0.0)",
"pyyaml (>=6.0.2,<7.0.0)", "pyyaml (>=6.0.3,<7.0.0)",
"requests (>=2.32.0,<3.0.0)", "requests (>=2.32.0,<3.0.0)",
] ]
@@ -24,11 +24,11 @@ dependencies = [
package-mode = true package-mode = true
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
black = "^25.1.0" black = "^26.1.0"
pyinstaller = "^6.13.0" pyinstaller = "^6.13.0"
pytest = "^8.3.5" pytest = "^9.0.2"
pytest-cov = "^6.1.1" pytest-cov = "^7.0.0"
pytest-mock = "^3.14.0" pytest-mock = "^3.15.1"
[build-system] [build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"] requires = ["poetry-core>=2.0.0,<3.0.0"]

View File

@@ -21,7 +21,7 @@ def engine():
@pytest.fixture @pytest.fixture
def db_session(engine): def db_session(engine):
session = sessionmaker(bind=engine)() session = sessionmaker(engine)()
yield session yield session
session.close() session.close()