You've already forked directdnsonly
chore: complete SQLAlchemy 2.0 migration in coredns_mysql backend and tests ⬆️
Migrate remaining session.query() calls in coredns_mysql.py to select()/session.execute() style; update bulk delete to delete() construct and count to func.count(); drop sessionmaker(bind=). Update test fixtures and assertions to match. Zero session.query() calls remaining across the entire codebase.
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
"""Tests for the CoreDNS MySQL backend (run against in-memory SQLite)."""
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import create_engine, select
|
||||
from sqlalchemy.orm import scoped_session, sessionmaker
|
||||
|
||||
from directdnsonly.app.backends.coredns_mysql import (
|
||||
@@ -28,7 +28,7 @@ def mysql_backend():
|
||||
self.config = {}
|
||||
self.instance_name = "test"
|
||||
self.engine = engine
|
||||
self.Session = scoped_session(sessionmaker(bind=engine))
|
||||
self.Session = scoped_session(sessionmaker(engine))
|
||||
|
||||
yield _TestBackend()
|
||||
engine.dispose()
|
||||
@@ -84,8 +84,8 @@ def test_write_zone_removes_stale_records(mysql_backend):
|
||||
mysql_backend.write_zone("example.com", reduced)
|
||||
|
||||
session = mysql_backend.Session()
|
||||
zone = session.query(Zone).filter_by(zone_name="example.com.").first()
|
||||
records = session.query(Record).filter_by(zone_id=zone.id, type="AAAA").all()
|
||||
zone = session.execute(select(Zone).filter_by(zone_name="example.com.")).scalar_one_or_none()
|
||||
records = session.execute(select(Record).filter_by(zone_id=zone.id, type="AAAA")).scalars().all()
|
||||
assert records == []
|
||||
session.close()
|
||||
|
||||
@@ -141,7 +141,7 @@ def test_reconcile_removes_extra_records(mysql_backend):
|
||||
|
||||
# Inject a phantom record directly into the DB
|
||||
session = mysql_backend.Session()
|
||||
zone = session.query(Zone).filter_by(zone_name="example.com.").first()
|
||||
zone = session.execute(select(Zone).filter_by(zone_name="example.com.")).scalar_one_or_none()
|
||||
session.add(
|
||||
Record(
|
||||
zone_id=zone.id,
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
import datetime
|
||||
import json
|
||||
import pytest
|
||||
from sqlalchemy import select, func
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from directdnsonly.app.peer_sync import PeerSyncWorker
|
||||
@@ -123,7 +124,9 @@ def test_sync_creates_new_local_record(patch_connect, monkeypatch):
|
||||
|
||||
worker._sync_from_peer(_make_peer())
|
||||
|
||||
record = session.query(Domain).filter_by(domain="example.com").first()
|
||||
record = session.execute(
|
||||
select(Domain).filter_by(domain="example.com")
|
||||
).scalar_one_or_none()
|
||||
assert record is not None
|
||||
assert record.zone_data == ZONE_DATA
|
||||
assert record.zone_updated_at == NOW
|
||||
@@ -152,7 +155,6 @@ def test_sync_updates_older_local_record(patch_connect, monkeypatch):
|
||||
|
||||
worker._sync_from_peer(_make_peer())
|
||||
|
||||
from sqlalchemy import select
|
||||
record = session.execute(
|
||||
select(Domain).filter_by(domain="example.com")
|
||||
).scalar_one_or_none()
|
||||
@@ -187,7 +189,9 @@ def test_sync_skips_when_local_is_newer(patch_connect, monkeypatch):
|
||||
|
||||
# zone_data fetch should not have been called
|
||||
assert not fetch_calls
|
||||
record = session.query(Domain).filter_by(domain="example.com").first()
|
||||
record = session.execute(
|
||||
select(Domain).filter_by(domain="example.com")
|
||||
).scalar_one_or_none()
|
||||
assert record.zone_data == "newer local"
|
||||
|
||||
|
||||
@@ -219,7 +223,7 @@ def test_sync_skips_peer_with_bad_status(patch_connect, monkeypatch):
|
||||
worker._sync_from_peer(_make_peer())
|
||||
|
||||
# No records should have been created
|
||||
assert session.query(Domain).count() == 0
|
||||
assert session.execute(select(func.count()).select_from(Domain)).scalar() == 0
|
||||
|
||||
|
||||
def test_sync_skips_missing_zone_data_in_response(patch_connect, monkeypatch):
|
||||
@@ -241,7 +245,7 @@ def test_sync_skips_missing_zone_data_in_response(patch_connect, monkeypatch):
|
||||
|
||||
worker._sync_from_peer(_make_peer())
|
||||
|
||||
assert session.query(Domain).count() == 0
|
||||
assert session.execute(select(func.count()).select_from(Domain)).scalar() == 0
|
||||
|
||||
|
||||
def test_sync_empty_peer_list(patch_connect, monkeypatch):
|
||||
|
||||
Reference in New Issue
Block a user