Compare commits

..

10 Commits

Author SHA1 Message Date
0903d78458 fix: update .gitignore to include dist/ and modify build command in justfile 🐛 2026-02-18 23:04:41 +13:00
74c5f4012e style: apply black formatting across codebase 🎨
No logic changes — pure reformatting of line lengths, dict literals,
method-chain line breaks, and trailing newlines to satisfy black's style.
2026-02-18 22:53:09 +13:00
807d6271f1 chore: rewrite justfile for pyenv + poetry dev workflow 🔧
Replaces outdated PyInstaller-only recipe with full task runner:
install, test, coverage, coverage-html, test-one, fmt, fmt-check, ci, run,
build, clean. PATH export wires in pyenv shims and poetry automatically.
2026-02-18 22:46:18 +13:00
bd46227364 feat: add test suite, fix backend bugs, remove legacy artifacts 🧪
- Add 73-test suite across conftest, utils, admin API, reconciler, zone parser,
  and CoreDNS MySQL backend (all green, ~0.5s)
- Fix zone_exists filter using wrong column name (name → zone_name)
- Fix delete_zone missing dot_fqdn normalization on lookup
- Remove spurious unused `from config import config` in coredns_mysql.py
- Fix config loader to search module-relative path so tests find app.yml
  without needing a root-level config/ directory
- Remove legacy v1 Flask prototype (app.py), empty config.json, and
  duplicate root config/app.yml
2026-02-18 22:03:04 +13:00
b8f12d0208 feat: Update dependencies in poetry.lock and pyproject.toml
- Added `certifi` version 2026.1.4 and `charset-normalizer` version 3.4.4 to poetry.lock.
- Introduced `idna` version 3.11 to poetry.lock.
- Updated `requests` to version 2.32.5 in poetry.lock and added it as a dependency in pyproject.toml.
- Updated `urllib3` to version 2.6.3 in poetry.lock.
- Added extras for `requests` and `urllib3` in poetry.lock.
2026-02-18 07:18:44 +13:00
5c8bc2653c feat: enhance README with detailed concurrent multi-backend processing architecture and usage instructions 2026-02-17 16:19:51 +13:00
02536cd448 feat: update Dockerfile for improved BIND configuration and application setup 2026-02-17 16:16:01 +13:00
24877be037 chore: add .gitkeep to logs directory for empty directory preservation 2026-02-17 16:14:24 +13:00
6445cf49c0 feat: migrate to Poetry and implement multi-backend DNS management
- Migrated from setuptools to Poetry; added pyproject.toml, poetry.lock,
  poetry.toml and .python-version (Python 3.11.12)
- Built out full directdnsonly Python package with BIND and CoreDNS MySQL
  backends, CherryPy REST API, persist-queue worker, and vyper-based config
- Auth credentials now read from config/env (app.auth_username/password)
  rather than hardcoded; override via DADNS_APP_AUTH_PASSWORD env var
- Added Dockerfile.deepseek: Python 3.11 slim + BIND9 + Poetry install
- Rewrote docker-compose.yml for local dev stack (MySQL + dadns services)
- Added SQL schema, docker/ BIND configs, justfile, tests, and README
- Expanded .gitignore for Poetry/Python project artifacts
2026-02-17 16:12:46 +13:00
1d1c12b661 chore: Clean out previous version of directdnsonly 🔥 2025-05-28 09:50:35 +12:00
55 changed files with 5305 additions and 3582 deletions

24
.gitignore vendored
View File

@@ -1,6 +1,28 @@
*.db
dist/
venv/
.venv
.idea
build
!build/.gitkeep
!build/.gitkeep
**/__pycache__/
*.pyc
*.pyo
*.pyd
*.egg-info
*.egg
*.log
*.DS_Store
*.swp
*.swo
*.bak
*.tmp
*.orig
*.coverage
*.cover
*.tox
*.dist-info
*.egg-info
*.mypy_cache
*.pytest_cache
/data/*

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.11.12

View File

@@ -1,11 +1,53 @@
FROM pypy:slim-buster
RUN mkdir -p /opt/apikeyhandler/config
VOLUME /opt/apikeyhandler/config
COPY ./src/ /opt/apikeyhandler
WORKDIR /opt/apikeyhandler
RUN pip install -r requirements.txt
CMD pypy3 main.py
FROM python:3.11.12-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
bind9 \
bind9utils \
dnsutils \
gcc \
python3-dev \
default-libmysqlclient-dev \
&& rm -rf /var/lib/apt/lists/*
# Configure BIND
RUN mkdir -p /etc/named/zones && \
chown -R bind:bind /etc/named && \
chmod 755 /etc/named/zones
COPY docker/named.conf.local /etc/bind/
COPY docker/named.conf.options /etc/bind/
RUN chown root:bind /etc/bind/named.conf.*
# Install Python dependencies
WORKDIR /app
COPY pyproject.toml poetry.lock README.md ./
# Install specific Poetry version that matches your lock file
RUN pip install "poetry==2.1.2" # Adjust version to match your lock file
# Copy application files
COPY directdnsonly ./directdnsonly
COPY config ./config
COPY schema ./schema
RUN poetry config virtualenvs.create false && \
poetry install
# Create data directories
RUN mkdir -p /app/data/queues && \
mkdir -p /app/data/zones && \
mkdir -p /app/logs && \
chmod -R 755 /app/data
# Configure BIND zone directory to match app config
#RUN ln -s /app/data/zones /etc/named/zones/dadns
# Start script
COPY docker/entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 2222 53/udp
CMD ["/entrypoint.sh"]

View File

@@ -1,16 +0,0 @@
FROM centos:latest
ENV APP_NAME=rpmbuild
ENV VERSION=latest
RUN mkdir -p /tmp/build/rpm
WORKDIR /tmp/build/rpm
RUN dnf install -y --allowerasing gcc rpm-build rpm-devel \
rpmlint make bash coreutils \
diffutils patch rpmdevtools && \
dnf clean all && \
rm -Rf /var/dnf/cache && \
rpmdev-setuptree
VOLUME /tmp/build/rpm
CMD ["rpmbuild", "--define version ${VERSION}", "-bb", "${APP_NAME}.spec"]

View File

@@ -1,55 +0,0 @@
FROM python:3.7.9 as builder
# Allow Passing Version from CI
ARG VERSION
ENV LC_ALL=en_NZ.utf8
ENV LANG=en_NZ.utf8
ENV APP_NAME="directdnsonly"
RUN mkdir -p /tmp/build && apt-get update && \
apt-get install -y libgcc1-dbg libssl-dev
COPY src/ /tmp/build/
COPY requirements.txt /tmp/build
WORKDIR /tmp/build
WORKDIR /tmp/src
RUN wget https://github.com/NixOS/patchelf/releases/download/0.12/patchelf-0.12.tar.bz2 && \
tar xvf patchelf-0.12.tar.bz2 && \
cd /tmp/src/patchelf-0.12* && \
./configure --prefix="/usr" && \
make install
WORKDIR /tmp/build
RUN pip3 install -r requirements.txt && \
pyinstaller \
--hidden-import=json \
--hidden-import=pyopenssl \
--hidden-import=jaraco \
--hidden-import=cheroot \
--hidden-import=cheroot.ssl.pyopenssl \
--hidden-import=cheroot.ssl.builtin \
--noconfirm --onefile ${APP_NAME}.py && \
cd /tmp/build/dist && \
staticx ${APP_NAME} ./${APP_NAME}_static
RUN mkdir -p /tmp/approot && \
mkdir -p /tmp/approot/app && \
mkdir -p /tmp/approot/app/config && \
mkdir -p /tmp/approot/etc && \
mkdir -p /tmp/approot/tmp && \
mkdir -p /tmp/approot/data && \
mkdir -p /tmp/approot/lib/x86_64-linux-gnu && \
cp /tmp/build/config/app.yml /tmp/approot/app/config/app.yml && \
cp /tmp/build/dist/${APP_NAME}_static /tmp/approot/app/${APP_NAME} && \
cp /usr/lib/gcc/x86_64-linux-gnu/8/libgcc_s.so.1 /tmp/approot/lib/x86_64-linux-gnu/libgcc_s.so.1
FROM scratch
COPY --from=builder /tmp/approot /
COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo
ENV TZ=Pacific/Auckland
WORKDIR /app
VOLUME /app/config /data
CMD ["/app/directdnsonly"]

14
Pipfile
View File

@@ -1,14 +0,0 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
cherrypy = "==18.6.0"
sqlalchemy = "==1.3.19"
pyyaml = "==5.3.1"
[requires]
python_version = "3.8"

188
Pipfile.lock generated
View File

@@ -1,188 +0,0 @@
{
"_meta": {
"hash": {
"sha256": "046c8d93fda36d9f83c939c4d6474b700962db5b1e4cb774c4ea5b76f0af50af"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.8"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {
"cheroot": {
"hashes": [
"sha256:ab342666c8e565a55cd2baf2648be9b379269a89d47e60862a087cff9d8b33ce",
"sha256:b6c18caf5f79cdae668c35fc8309fc88ea4a964cce9e2ca8504fab13bcf57301"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==8.4.5"
},
"cherrypy": {
"hashes": [
"sha256:56608edd831ad00991ae585625e0206ed61cf1a0850e4b2cc48489fb2308c499",
"sha256:c0a7283f02a384c112a0a18404fd3abd849fc7fd4bec19378067150a2573d2e4"
],
"index": "pypi",
"version": "==18.6.0"
},
"jaraco.classes": {
"hashes": [
"sha256:116429c2047953f525afdcae165475c4589c7b14870e78b2d068ecb01018827e",
"sha256:c38698ff8ef932eb33d91c0e8fc192ad7c44ecee03f7f585afd4f35aeaef7aab"
],
"markers": "python_version >= '3.6'",
"version": "==3.1.0"
},
"jaraco.collections": {
"hashes": [
"sha256:a7889f28c80c4875bd6256d9924e8526dacfef22cd7b80ff8469b4d312f9f144",
"sha256:be570ef4f2e7290b757449395238fa63d70a9255574624e73c5ff9f1ee554721"
],
"markers": "python_version >= '3.6'",
"version": "==3.0.0"
},
"jaraco.functools": {
"hashes": [
"sha256:9fedc4be3117512ca3e03e1b2ffa7a6a6ffa589bfb7d02bfb324e55d493b94f4",
"sha256:d3dc9f6c1a1d45d7f59682a3bf77aceb685c1a60891606c7e4161e72ecc399ad"
],
"markers": "python_version >= '3.6'",
"version": "==3.0.1"
},
"jaraco.text": {
"hashes": [
"sha256:c87569c9afae14f71b2e1c57f316770ab6981ab675d9c602be1c7981161bacdd",
"sha256:e5078b1126cc0f166c7859aa75103a56c0d0f39ebcafc21695615472e0f810ec"
],
"markers": "python_version >= '2.7'",
"version": "==3.2.0"
},
"more-itertools": {
"hashes": [
"sha256:6f83822ae94818eae2612063a5101a7311e68ae8002005b5e05f03fd74a86a20",
"sha256:9b30f12df9393f0d28af9210ff8efe48d10c94f73e5daf886f10c4b0b0b4f03c"
],
"markers": "python_version >= '3.5'",
"version": "==8.5.0"
},
"portend": {
"hashes": [
"sha256:600dd54175e17e9347e5f3d4217aa8bcf4bf4fa5ffbc4df034e5ec1ba7cdaff5",
"sha256:62dd00b94a6a55fbf0320365fbdeba37f0d1fe14d613841037dc4780bedfda8f"
],
"markers": "python_version >= '2.7'",
"version": "==2.6"
},
"pytz": {
"hashes": [
"sha256:a494d53b6d39c3c6e44c3bec237336e14305e4f29bbf800b599253057fbb79ed",
"sha256:c35965d010ce31b23eeb663ed3cc8c906275d6be1a34393a1d73a41febf4a048"
],
"version": "==2020.1"
},
"pywin32": {
"hashes": [
"sha256:00eaf43dbd05ba6a9b0080c77e161e0b7a601f9a3f660727a952e40140537de7",
"sha256:11cb6610efc2f078c9e6d8f5d0f957620c333f4b23466931a247fb945ed35e89",
"sha256:1f45db18af5d36195447b2cffacd182fe2d296849ba0aecdab24d3852fbf3f80",
"sha256:37dc9935f6a383cc744315ae0c2882ba1768d9b06700a70f35dc1ce73cd4ba9c",
"sha256:6e38c44097a834a4707c1b63efa9c2435f5a42afabff634a17f563bc478dfcc8",
"sha256:8319bafdcd90b7202c50d6014efdfe4fde9311b3ff15fd6f893a45c0868de203",
"sha256:9b3466083f8271e1a5eb0329f4e0d61925d46b40b195a33413e0905dccb285e8",
"sha256:a60d795c6590a5b6baeacd16c583d91cce8038f959bd80c53bd9a68f40130f2d",
"sha256:af40887b6fc200eafe4d7742c48417529a8702dcc1a60bf89eee152d1d11209f",
"sha256:ec16d44b49b5f34e99eb97cf270806fdc560dff6f84d281eb2fcb89a014a56a9",
"sha256:ed74b72d8059a6606f64842e7917aeee99159ebd6b8d6261c518d002837be298",
"sha256:fa6ba028909cfc64ce9e24bcf22f588b14871980d9787f1e2002c99af8f1850c"
],
"markers": "sys_platform == 'win32'",
"version": "==228"
},
"pyyaml": {
"hashes": [
"sha256:06a0d7ba600ce0b2d2fe2e78453a470b5a6e000a985dd4a4e54e436cc36b0e97",
"sha256:240097ff019d7c70a4922b6869d8a86407758333f02203e0fc6ff79c5dcede76",
"sha256:4f4b913ca1a7319b33cfb1369e91e50354d6f07a135f3b901aca02aa95940bd2",
"sha256:69f00dca373f240f842b2931fb2c7e14ddbacd1397d57157a9b005a6a9942648",
"sha256:73f099454b799e05e5ab51423c7bcf361c58d3206fa7b0d555426b1f4d9a3eaf",
"sha256:74809a57b329d6cc0fdccee6318f44b9b8649961fa73144a98735b0aaf029f1f",
"sha256:7739fc0fa8205b3ee8808aea45e968bc90082c10aef6ea95e855e10abf4a37b2",
"sha256:95f71d2af0ff4227885f7a6605c37fd53d3a106fcab511b8860ecca9fcf400ee",
"sha256:b8eac752c5e14d3eca0e6dd9199cd627518cb5ec06add0de9d32baeee6fe645d",
"sha256:cc8955cfbfc7a115fa81d85284ee61147059a753344bc51098f3ccd69b0d7e0c",
"sha256:d13155f591e6fcc1ec3b30685d50bf0711574e2c0dfffd7644babf8b5102ca1a"
],
"index": "pypi",
"version": "==5.3.1"
},
"six": {
"hashes": [
"sha256:30639c035cdb23534cd4aa2dd52c3bf48f06e5f4a941509c8bafd8ce11080259",
"sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==1.15.0"
},
"sqlalchemy": {
"hashes": [
"sha256:072766c3bd09294d716b2d114d46ffc5ccf8ea0b714a4e1c48253014b771c6bb",
"sha256:107d4af989831d7b091e382d192955679ec07a9209996bf8090f1f539ffc5804",
"sha256:15c0bcd3c14f4086701c33a9e87e2c7ceb3bcb4a246cd88ec54a49cf2a5bd1a6",
"sha256:26c5ca9d09f0e21b8671a32f7d83caad5be1f6ff45eef5ec2f6fd0db85fc5dc0",
"sha256:276936d41111a501cf4a1a0543e25449108d87e9f8c94714f7660eaea89ae5fe",
"sha256:3292a28344922415f939ee7f4fc0c186f3d5a0bf02192ceabd4f1129d71b08de",
"sha256:33d29ae8f1dc7c75b191bb6833f55a19c932514b9b5ce8c3ab9bc3047da5db36",
"sha256:3bba2e9fbedb0511769780fe1d63007081008c5c2d7d715e91858c94dbaa260e",
"sha256:465c999ef30b1c7525f81330184121521418a67189053bcf585824d833c05b66",
"sha256:51064ee7938526bab92acd049d41a1dc797422256086b39c08bafeffb9d304c6",
"sha256:5a49e8473b1ab1228302ed27365ea0fadd4bf44bc0f9e73fe38e10fdd3d6b4fc",
"sha256:618db68745682f64cedc96ca93707805d1f3a031747b5a0d8e150cfd5055ae4d",
"sha256:6547b27698b5b3bbfc5210233bd9523de849b2bb8a0329cd754c9308fc8a05ce",
"sha256:6557af9e0d23f46b8cd56f8af08eaac72d2e3c632ac8d5cf4e20215a8dca7cea",
"sha256:73a40d4fcd35fdedce07b5885905753d5d4edf413fbe53544dd871f27d48bd4f",
"sha256:8280f9dae4adb5889ce0bb3ec6a541bf05434db5f9ab7673078c00713d148365",
"sha256:83469ad15262402b0e0974e612546bc0b05f379b5aa9072ebf66d0f8fef16bea",
"sha256:860d0fe234922fd5552b7f807fbb039e3e7ca58c18c8d38aa0d0a95ddf4f6c23",
"sha256:883c9fb62cebd1e7126dd683222b3b919657590c3e2db33bdc50ebbad53e0338",
"sha256:8afcb6f4064d234a43fea108859942d9795c4060ed0fbd9082b0f280181a15c1",
"sha256:96f51489ac187f4bab588cf51f9ff2d40b6d170ac9a4270ffaed535c8404256b",
"sha256:9e865835e36dfbb1873b65e722ea627c096c11b05f796831e3a9b542926e979e",
"sha256:aa0554495fe06172b550098909be8db79b5accdf6ffb59611900bea345df5eba",
"sha256:b595e71c51657f9ee3235db8b53d0b57c09eee74dfb5b77edff0e46d2218dc02",
"sha256:b6ff91356354b7ff3bd208adcf875056d3d886ed7cef90c571aef2ab8a554b12",
"sha256:b70bad2f1a5bd3460746c3fb3ab69e4e0eb5f59d977a23f9b66e5bdc74d97b86",
"sha256:c7adb1f69a80573698c2def5ead584138ca00fff4ad9785a4b0b2bf927ba308d",
"sha256:c898b3ebcc9eae7b36bd0b4bbbafce2d8076680f6868bcbacee2d39a7a9726a7",
"sha256:e49947d583fe4d29af528677e4f0aa21f5e535ca2ae69c48270ebebd0d8843c0",
"sha256:eb1d71643e4154398b02e88a42fc8b29db8c44ce4134cf0f4474bfc5cb5d4dac",
"sha256:f2e8a9c0c8813a468aa659a01af6592f71cd30237ec27c4cc0683f089f90dcfc",
"sha256:fe7fe11019fc3e6600819775a7d55abc5446dda07e9795f5954fdbf8a49e1c37"
],
"index": "pypi",
"version": "==1.3.19"
},
"tempora": {
"hashes": [
"sha256:599a3a910b377f2b544c7b221582ecf4cb049b017c994b37f2b1a9ed1099716e",
"sha256:9f46de767be7dd21d9602a8a5b0978fd55abc70af3e2a7814c85c00d7a8fffa3"
],
"markers": "python_version >= '3.6'",
"version": "==4.0.0"
},
"zc.lockfile": {
"hashes": [
"sha256:307ad78227e48be260e64896ec8886edc7eae22d8ec53e4d528ab5537a83203b",
"sha256:cc33599b549f0c8a248cb72f3bf32d77712de1ff7ee8814312eb6456b42c015f"
],
"version": "==2.0"
}
},
"develop": {}
}

123
README.md Normal file
View File

@@ -0,0 +1,123 @@
# DaDNS - DNS Management System
## Features
- Multi-backend DNS management (BIND, CoreDNS MySQL)
- Parallel backend dispatch — all enabled backends updated simultaneously
- Persistent queue — zone updates survive restarts
- Automatic record-count verification and drift reconciliation
- Thread-safe operations
- Loguru-based logging
## Installation
```bash
poetry install
poetry run dadns
```
## Concurrent Multi-Backend Processing
DaDNS propagates every zone update to all enabled backends in parallel using a
queue-based worker architecture.
### Architecture
```
DirectAdmin zone push
Persistent Queue (persist-queue, survives restarts)
save_queue_worker (single daemon thread, sequential dequeue)
├─ 1 backend enabled ──▶ direct call (no thread overhead)
└─ N backends enabled ──▶ ThreadPoolExecutor(max_workers=N)
┌─────┴─────┐
▼ ▼
bind coredns_dc1 ...
(concurrent, as_completed)
```
### How it works
1. **Queue consumer** — A single background thread drains the persistent save
queue. Items are processed one zone at a time, in order.
2. **Single-backend path** — When only one backend is enabled, the zone is
written directly with no extra thread spawning.
3. **Parallel-backend path** — When two or more backends are enabled, a
`ThreadPoolExecutor` with one thread per backend dispatches all writes
simultaneously. Results are collected with `as_completed`, so a slow or
failing backend does not block the others.
4. **Record verification** — After each successful write, the backend's stored
record count is compared against the authoritative count parsed from the
source zone file (the DirectAdmin zone). Mismatches trigger automatic
reconciliation: extra records are removed and the count is re-verified.
5. **Batch telemetry** — The worker tracks batch start time and emits a summary
log on queue drain, including zones processed, failures, elapsed time, and
throughput (zones/sec).
### Log output (example)
```
INFO | 📥 Batch started — 12 zone(s) queued for processing
DEBUG | Processing example.com across 2 backends concurrently: bind, coredns_dc1
DEBUG | Parallel processing of example.com across 2 backends completed in 43ms
SUCCESS | 📦 Batch complete — 12/12 zone(s) processed successfully in 1.8s (6.7 zones/sec)
```
### Adding backends
Enable additional backends in `config/app.yml`. Each enabled backend is
automatically included in the parallel dispatch — no code changes required.
```yaml
dns:
backends:
bind:
enabled: true
coredns_dc1:
enabled: true
host: "mysql-dc1"
coredns_dc2:
enabled: true # adds a third parallel worker automatically
host: "mysql-dc2"
```
## Configuration
Edit `config/app.yml` for backend settings. Credentials can be overridden via
environment variables using the `DADNS_` prefix (e.g.
`DADNS_APP_AUTH_PASSWORD`).
### Config Files
#### `config/app.yml`
```yaml
timezone: Pacific/Auckland
log_level: INFO
queue_location: ./data/queues
app:
auth_username: directdnsonly
auth_password: changeme # override with DADNS_APP_AUTH_PASSWORD
dns:
default_backend: bind
backends:
bind:
enabled: true
zones_dir: ./data/zones
named_conf: ./data/named.conf.include
coredns_mysql:
enabled: true
host: "127.0.0.1"
port: 3306
database: "coredns"
username: "coredns"
password: "password"

105
app.py
View File

@@ -1,105 +0,0 @@
from flask import Flask, request
import mmap
import re
app = Flask(__name__)
@app.route('/')
def hello_world():
return 'Hello World!'
@app.route('/CMD_API_LOGIN_TEST')
def login_test():
multi_dict = request.values
for key in multi_dict:
print(multi_dict.get(key))
print(multi_dict.getlist(key))
# print(request.values)
print(request.headers)
print(request.authorization)
return 'error=0&text=Login OK&details=none'
@app.route('/CMD_API_DNS_ADMIN', methods=['GET', 'POST'])
def domain_admin():
print(str(request.data, encoding="utf-8"))
print(request.values.get('action'))
action = request.values.get('action')
if action == 'exists':
# DirectAdmin is checking whether the domain is in the cluster
return 'result: exists=1'
if action == 'delete':
# Domain is being removed from the DNS
hostname = request.values.get('hostname')
username = request.values.get('username')
domain = request.values.get('select0')
if action == 'rawsave':
# DirectAdmin wants to add/update a domain
hostname = request.values.get('hostname')
username = request.values.get('username')
domain = request.values.get('domain')
if not check_zone_exists(str(domain)):
put_zone_index(str(domain))
write_zone_file(str(domain), request.data.decode("utf-8"))
else:
# Domain already exists
write_zone_file(str(domain), request.data.decode("utf-8"))
def create_zone_index():
# Create an index of all zones present from zone definitions
regex = r"(?<=\")(?P<domain>.*)(?=\"\s)"
with open(zone_index_file, 'w+') as f:
with open(named_conf, 'r') as named_file:
while True:
# read line
line = named_file.readline()
if not line:
# Reached end of file
break
print(line)
hosted_domain = re.search(regex, line).group(0)
f.write(hosted_domain + "\n")
def put_zone_index(zone_name):
# add a new zone to index
with open(zone_index_file, 'a+') as f:
# We are using append mode
f.write(zone_name)
def write_zone_file(zone_name, data):
# Write the zone to file
with open(zones_dir + '/' + zone_name + '.db', 'w') as f:
f.write(data)
def check_zone_exists(zone_name):
# Check if zone is present in the index
with open(zone_index_file, 'r') as f:
try:
s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
if s.find(bytes(zone_name, encoding='utf8')) != -1:
return True
else:
return False
except ValueError as e:
# File Empty?
return False
if __name__ == '__main__':
zones_dir = "/etc/pdns/zones"
zone_index_file = "/etc/pdns/zones/.index"
named_conf = "/etc/pdns/named.conf"
create_zone_index()
app.run(host="0.0.0.0")

View File

@@ -0,0 +1 @@
# Package initialization

View File

@@ -0,0 +1,18 @@
from loguru import logger
import sys
from directdnsonly.config import config
def configure_logging():
logger.remove()
logger.add(
sys.stderr,
level=config.get("log_level"),
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
)
logger.add(
"logs/directdnsonly_{time}.log",
rotation="10 MB",
retention="30 days",
level="DEBUG",
)

View File

@@ -0,0 +1 @@
# Package initialization

View File

@@ -0,0 +1,188 @@
import cherrypy
from urllib.parse import urlencode, parse_qs
from loguru import logger
from directdnsonly.app.utils import (
check_zone_exists,
check_parent_domain_owner,
get_domain_record,
get_parent_domain_record,
)
from directdnsonly.app.utils.zone_parser import validate_and_normalize_zone
class DNSAdminAPI:
def __init__(self, save_queue, delete_queue, backend_registry):
self.save_queue = save_queue
self.delete_queue = delete_queue
self.backend_registry = backend_registry
@cherrypy.expose
def index(self):
return "DNS Admin API - Available endpoints: /CMD_API_DNS_ADMIN"
@cherrypy.expose
def CMD_API_LOGIN_TEST(self):
"""DirectAdmin login test — confirms credentials are valid"""
return urlencode({"error": 0, "text": "Login OK"})
@cherrypy.expose
def CMD_API_DNS_ADMIN(self, **params):
"""Handle both DirectAdmin-style API calls and raw zone file uploads"""
try:
if cherrypy.request.method == "GET":
return self._handle_exists(params)
if cherrypy.request.method != "POST":
cherrypy.response.status = 405
return urlencode({"error": 1, "text": "Method not allowed"})
# Parse parameters from both query string and body
body_params = {}
if cherrypy.request.body:
content_type = cherrypy.request.headers.get("Content-Type", "")
if "application/x-www-form-urlencoded" in content_type:
raw_body = cherrypy.request.body.read()
if raw_body:
body_params = parse_qs(raw_body.decode("utf-8"))
body_params = {
k: v[0] if len(v) == 1 else v
for k, v in body_params.items()
}
elif "text/plain" in content_type:
body_params = {
"zone_file": cherrypy.request.body.read().decode("utf-8")
}
# Combine parameters (body overrides query)
all_params = {**params, **body_params}
logger.debug(f"Request parameters: {all_params}")
if "zone_file" not in all_params:
logger.debug(
"No zone file provided. Maybe in body as DirectAdmin does?"
)
# Grab from body
all_params["zone_file"] = str(cherrypy.request.body.read(), "utf-8")
logger.debug("Read zone file from body :)")
# Required parameters
action = all_params.get("action")
domain = all_params.get("domain")
if not action:
# DirectAdmin sends an initial request without an action
# parameter as a connectivity check — respond with success
logger.debug("Received request with no action — connectivity check")
return urlencode({"error": 0, "text": "OK"})
if not domain:
raise ValueError("Missing 'domain' parameter")
# Handle different actions
if action == "rawsave":
return self._handle_rawsave(domain, all_params)
elif action == "delete":
return self._handle_delete(domain, all_params)
else:
raise ValueError(f"Unsupported action: {action}")
except Exception as e:
logger.error(f"API error: {str(e)}")
cherrypy.response.status = 400
return urlencode({"error": 1, "text": str(e)})
def _handle_exists(self, params: dict):
"""Handle GET action=exists — domain and optional parent domain lookup"""
action = params.get("action")
if action != "exists":
cherrypy.response.status = 400
return urlencode({"error": 1, "text": f"Unsupported GET action: {action}"})
domain = params.get("domain")
if not domain:
cherrypy.response.status = 400
return urlencode({"error": 1, "text": "Missing 'domain' parameter"})
check_parent = bool(params.get("check_for_parent_domain"))
domain_exists = check_zone_exists(domain)
parent_exists = check_parent_domain_owner(domain) if check_parent else False
if not domain_exists and not parent_exists:
return urlencode({"error": 0, "exists": 0})
if domain_exists:
record = get_domain_record(domain)
return urlencode(
{
"error": 0,
"exists": 1,
"details": f"Domain exists on {record.hostname}",
}
)
# parent match only
parent_record = get_parent_domain_record(domain)
return urlencode(
{
"error": 0,
"exists": 2,
"details": f"Parent Domain exists on {parent_record.hostname}",
}
)
def _handle_rawsave(self, domain: str, params: dict):
"""Process zone file saves"""
zone_data = params.get("zone_file")
if not zone_data:
raise ValueError("Missing zone file content")
normalized_zone = validate_and_normalize_zone(zone_data, domain)
logger.info(f"Validated zone for {domain}")
self.save_queue.put(
{
"domain": domain,
"zone_file": normalized_zone,
"hostname": params.get("hostname", ""),
"username": params.get("username", ""),
"client_ip": cherrypy.request.remote.ip,
}
)
logger.success(f"Queued zone update for {domain}")
return urlencode({"error": 0})
def _handle_delete(self, domain: str, params: dict):
"""Process zone deletions"""
self.delete_queue.put(
{
"domain": domain,
"hostname": params.get("hostname", ""),
"username": params.get("username", ""),
"client_ip": cherrypy.request.remote.ip,
}
)
logger.success(f"Queued deletion for {domain}")
return urlencode({"error": 0})
@cherrypy.expose
def queue_status(self):
"""Debug endpoint for queue monitoring"""
return {
"save_queue_size": self.save_queue.qsize(),
"delete_queue_size": self.delete_queue.qsize(),
"last_save_item": self._get_last_item(self.save_queue),
"last_delete_item": self._get_last_item(self.delete_queue),
}
@staticmethod
def _get_last_item(queue):
"""Helper to safely get last queue item"""
try:
if hasattr(queue, "last_item"):
return queue.last_item
return "Last item tracking not available"
except Exception:
return "Error retrieving last item"

View File

@@ -0,0 +1,24 @@
import cherrypy
from loguru import logger
class HealthAPI:
def __init__(self, backend_registry):
self.registry = backend_registry
@cherrypy.expose
def health(self):
status = {"status": "OK", "backends": []}
for name, backend in self.registry.get_available_backends().items():
status["backends"].append(
{
"name": name,
"status": (
"active" if backend().zone_exists("test") else "unavailable"
),
}
)
logger.debug("Health check performed")
return status

View File

@@ -0,0 +1,89 @@
from typing import Dict, Type, Optional
from .base import DNSBackend
from .bind import BINDBackend
from .coredns_mysql import CoreDNSMySQLBackend
from directdnsonly.config import config
from loguru import logger
class BackendRegistry:
def __init__(self):
self._backend_types = {
"bind": BINDBackend,
"coredns_mysql": CoreDNSMySQLBackend,
}
self._backend_instances: Dict[str, DNSBackend] = {}
self._initialized = False
def _initialize_backends(self):
"""Initialize and cache all enabled backend instances"""
if self._initialized:
return
try:
logger.debug("Attempting to load backend configurations")
backend_configs = config.get("dns")
if not backend_configs:
logger.warning("No 'dns' configuration found")
self._initialized = True
return
backend_configs = backend_configs.get("backends")
if not backend_configs:
logger.warning("No 'dns.backends' configuration found")
self._initialized = True
return
logger.debug(f"Found backend configs: {backend_configs}")
for instance_name, instance_config in backend_configs.items():
logger.debug(f"Processing backend instance: {instance_name}")
backend_type = instance_config.get("type")
if not backend_type:
logger.warning(
f"No type specified for backend instance: {instance_name}"
)
continue
if backend_type not in self._backend_types:
logger.warning(
f"Unknown backend type '{backend_type}' for instance: {instance_name}"
)
continue
backend_class = self._backend_types[backend_type]
if not backend_class.is_available():
logger.warning(
f"Backend {backend_type} is not available for instance: {instance_name}"
)
continue
enabled = instance_config.get("enabled", False)
if not enabled:
logger.debug(f"Backend instance {instance_name} is disabled")
continue
logger.debug(
f"Initializing backend instance {instance_name} of type {backend_type}"
)
try:
backend = backend_class(instance_config)
self._backend_instances[instance_name] = backend
logger.info(
f"Successfully initialized backend instance: {instance_name}"
)
except Exception as e:
logger.error(
f"Failed to initialize backend instance {instance_name}: {e}"
)
except Exception as e:
logger.error(f"Error loading backend configurations: {e}")
self._initialized = True
def get_available_backends(self) -> Dict[str, DNSBackend]:
"""Return cached backend instances, initializing on first call"""
self._initialize_backends()
return self._backend_instances

View File

@@ -0,0 +1,75 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any, Tuple
class DNSBackend(ABC):
def __init__(self, config: Dict[str, Any]):
self.config = config
self.instance_name = config.get("instance_name", self.get_name())
@classmethod
@abstractmethod
def get_name(cls) -> str:
"""Return the backend type name"""
pass
@property
def instance_id(self) -> str:
"""Return the unique instance identifier"""
return self.instance_name
@classmethod
@abstractmethod
def is_available(cls) -> bool:
pass
@abstractmethod
def write_zone(self, zone_name: str, zone_data: str) -> bool:
pass
@abstractmethod
def delete_zone(self, zone_name: str) -> bool:
pass
@abstractmethod
def reload_zone(self, zone_name: Optional[str] = None) -> bool:
pass
@abstractmethod
def zone_exists(self, zone_name: str) -> bool:
pass
def verify_zone_record_count(
self, zone_name: str, expected_count: int
) -> Tuple[bool, int]:
"""Verify the record count in this backend matches the expected count
from the source zone file.
Args:
zone_name: The zone to verify
expected_count: The number of records parsed from the source zone
Returns:
Tuple of (matches: bool, actual_count: int)
"""
raise NotImplementedError(
f"Backend {self.get_name()} does not implement record count verification"
)
def reconcile_zone_records(
self, zone_name: str, zone_data: str
) -> Tuple[bool, int]:
"""Reconcile backend records against the authoritative BIND zone from
DirectAdmin. Any records in the backend that are not present in the
source zone will be removed.
Args:
zone_name: The zone to reconcile
zone_data: The raw BIND zone file content (authoritative source)
Returns:
Tuple of (success: bool, records_removed: int)
"""
raise NotImplementedError(
f"Backend {self.get_name()} does not implement zone reconciliation"
)

View File

@@ -0,0 +1,124 @@
import os
import subprocess
from loguru import logger
from pathlib import Path
from typing import Dict, List, Optional
from .base import DNSBackend
class BINDBackend(DNSBackend):
@classmethod
def get_name(cls) -> str:
return "bind"
@classmethod
def is_available(cls) -> bool:
try:
result = subprocess.run(["named", "-v"], capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"BIND available: {result.stdout.splitlines()[0]}")
return True
return False
except FileNotFoundError:
logger.warning("BIND/named not found in PATH")
return False
def __init__(self, config: Dict):
self.zones_dir = Path(config["zones_dir"])
self.named_conf = Path(config["named_conf"])
# Safe directory creation handling
try:
# Check if it's a symlink first
if self.zones_dir.is_symlink():
logger.debug(f"{self.zones_dir} is already a symlink")
elif not self.zones_dir.exists():
self.zones_dir.mkdir(parents=True, mode=0o755)
logger.debug(f"Created zones directory: {self.zones_dir}")
else:
logger.debug(f"Directory already exists: {self.zones_dir}")
# Ensure proper permissions
os.chmod(self.zones_dir, 0o755)
logger.debug(f"Using zones directory: {self.zones_dir}")
except FileExistsError:
logger.debug(f"Directory already exists (safe to ignore): {self.zones_dir}")
except Exception as e:
logger.error(f"Failed to setup zones directory: {e}")
raise
# Verify named.conf exists
if not self.named_conf.exists():
logger.warning(f"named.conf not found at {self.named_conf}")
self.named_conf.touch()
logger.info(f"Created empty named.conf at {self.named_conf}")
logger.success(f"BIND backend initialized for {self.zones_dir}")
def write_zone(self, zone_name: str, zone_data: str) -> bool:
zone_file = self.zones_dir / f"{zone_name}.db"
try:
with open(zone_file, "w") as f:
f.write(zone_data)
logger.debug(f"Wrote zone file: {zone_file}")
return True
except IOError as e:
logger.error(f"Failed to write zone file {zone_file}: {e}")
return False
def delete_zone(self, zone_name: str) -> bool:
zone_file = self.zones_dir / f"{zone_name}.db"
try:
if zone_file.exists():
zone_file.unlink()
logger.debug(f"Deleted zone file: {zone_file}")
return True
logger.warning(f"Zone file not found: {zone_file}")
return False
except IOError as e:
logger.error(f"Failed to delete zone file {zone_file}: {e}")
return False
def reload_zone(self, zone_name: Optional[str] = None) -> bool:
try:
if zone_name:
cmd = ["rndc", "reload", zone_name]
logger.debug(f"Reloading single zone: {zone_name}")
else:
cmd = ["rndc", "reload"]
logger.debug("Reloading all zones")
result = subprocess.run(
cmd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
logger.debug(f"BIND reload successful: {result.stdout}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"BIND reload failed: {e.stderr}")
return False
except Exception as e:
logger.error(f"Unexpected error during BIND reload: {e}")
return False
def zone_exists(self, zone_name: str) -> bool:
zone_file = self.zones_dir / f"{zone_name}.db"
exists = zone_file.exists()
logger.debug(f"Zone existence check for {zone_name}: {exists}")
return exists
def update_named_conf(self, zones: List[str]) -> bool:
try:
with open(self.named_conf, "w") as f:
for zone in zones:
zone_file = self.zones_dir / f"{zone}.db"
f.write(f'zone "{zone}" {{ type master; file "{zone_file}"; }};\n')
logger.debug(f"Updated named.conf: {self.named_conf}")
return True
except IOError as e:
logger.error(f"Failed to update named.conf: {e}")
return False

View File

@@ -0,0 +1,448 @@
from typing import Optional, Dict, Set, Tuple, Any
from sqlalchemy import create_engine, Column, String, Integer, Text, ForeignKey, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session, relationship
from dns import zone as dns_zone_module
from dns.rdataclass import IN
from loguru import logger
from .base import DNSBackend
Base = declarative_base()
class Zone(Base):
__tablename__ = "zones"
id = Column(Integer, primary_key=True)
zone_name = Column(String(255), nullable=False, index=True, unique=True)
class Record(Base):
__tablename__ = "records"
id = Column(Integer, primary_key=True)
zone_id = Column(Integer, ForeignKey("zones.id"), nullable=False)
hostname = Column(String(255), nullable=False, index=True)
type = Column(String(10), nullable=False)
data = Column(Text, nullable=False)
ttl = Column(Integer, nullable=True)
online = Column(Boolean, nullable=False, default=False)
zone = relationship("Zone", backref="records")
class CoreDNSMySQLBackend(DNSBackend):
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.host = config.get("host", "localhost")
self.port = config.get("port", 3306)
self.database = config.get("database", "coredns")
self.username = config.get("username")
self.password = config.get("password")
self.engine = create_engine(
f"mysql+pymysql://{self.username}:{self.password}@"
f"{self.host}:{self.port}/{self.database}",
pool_pre_ping=True,
pool_size=5,
max_overflow=10,
)
self.Session = scoped_session(sessionmaker(bind=self.engine))
Base.metadata.create_all(self.engine)
logger.info(
f"Initialized CoreDNS MySQL backend '{self.instance_name}' "
f"for {self.database}@{self.host}:{self.port}"
)
@staticmethod
def dot_fqdn(zone_name):
return f"{zone_name}." if not zone_name.endswith(".") else zone_name
@classmethod
def get_name(cls) -> str:
return "coredns_mysql"
@classmethod
def is_available(cls) -> bool:
try:
import pymysql
return True
except ImportError:
logger.warning("PyMySQL not available - CoreDNS MySQL backend disabled")
return False
def write_zone(self, zone_name: str, zone_data: str) -> bool:
session = self.Session()
try:
# Ensure zone exists
zone = self._ensure_zone_exists(session, zone_name)
# Get existing records for this zone but track SOA records separately
existing_records = {}
existing_soa = None
for r in session.query(Record).filter_by(zone_id=zone.id).all():
if r.type == "SOA":
existing_soa = r
else:
existing_records[(r.hostname, r.type, r.data)] = r
# Parse the zone data into a normalised record set
source_records, source_soa = self._parse_zone_to_record_set(
zone_name, zone_data
)
# Track changes
current_records = set()
changes = {"added": 0, "updated": 0, "removed": 0}
# Handle SOA record
if source_soa:
soa_name, soa_content, soa_ttl = source_soa
soa_parts = soa_content.split()
if len(soa_parts) == 7:
if existing_soa:
existing_soa.data = soa_content
existing_soa.ttl = soa_ttl
existing_soa.online = True
changes["updated"] += 1
logger.debug(
f"Updated SOA record: {soa_name} SOA {soa_content}"
)
else:
existing_soa = Record(
zone_id=zone.id,
hostname=soa_name,
type="SOA",
data=soa_content,
ttl=soa_ttl,
online=True,
)
session.add(existing_soa)
changes["added"] += 1
logger.debug(f"Added SOA record: {soa_name} SOA {soa_content}")
# Process all non-SOA records
for record_name, record_type, record_content, record_ttl in source_records:
key = (record_name, record_type, record_content)
current_records.add(key)
if key in existing_records:
# Update existing record if TTL changed
record = existing_records[key]
if record.ttl != record_ttl:
record.ttl = record_ttl
record.online = True
changes["updated"] += 1
logger.debug(
f"Updated TTL for record: {record_name} {record_type} {record_content}"
)
else:
# Add new record
new_record = Record(
zone_id=zone.id,
hostname=record_name,
type=record_type,
data=record_content,
ttl=record_ttl,
online=True,
)
session.add(new_record)
changes["added"] += 1
logger.debug(
f"Added new record: {record_name} {record_type} {record_content}"
)
# Remove records that no longer exist in the source zone
for key, record in existing_records.items():
if key not in current_records:
logger.debug(
f"Removed record: {record.hostname} {record.type} {record.data}"
)
session.delete(record)
changes["removed"] += 1
# Handle SOA removal if needed
if existing_soa and not source_soa:
logger.debug(
f"Removed SOA record: {existing_soa.hostname} SOA {existing_soa.data}"
)
session.delete(existing_soa)
changes["removed"] += 1
session.commit()
total_changes = changes["added"] + changes["updated"] + changes["removed"]
if total_changes > 0:
logger.info(
f"[{self.instance_name}] Zone {zone_name} updated: "
f"{changes['added']} added, {changes['updated']} updated, "
f"{changes['removed']} removed"
)
else:
logger.debug(f"[{self.instance_name}] Zone {zone_name}: no changes")
return True
except Exception as e:
logger.error(f"Error writing zone {zone_name}: {e}")
session.rollback()
return False
finally:
session.close()
def delete_zone(self, zone_name: str) -> bool:
session = self.Session()
try:
# First find the zone
zone = (
session.query(Zone)
.filter_by(zone_name=self.dot_fqdn(zone_name))
.first()
)
if not zone:
logger.warning(f"Zone {zone_name} not found for deletion")
return False
# Delete all records associated with the zone
count = session.query(Record).filter_by(zone_id=zone.id).delete()
# Delete the zone itself
session.delete(zone)
session.commit()
logger.info(f"Deleted zone {zone_name} with {count} records")
return True
except Exception as e:
session.rollback()
logger.error(f"Zone deletion failed for {zone_name}: {e}")
return False
finally:
session.close()
def reload_zone(self, zone_name: Optional[str] = None) -> bool:
# In coredns_mysql_extend, the core plugin handles reloading automatically
# when database changes are detected, so we just log the request
if zone_name:
logger.debug(f"CoreDNS reload triggered for zone {zone_name}")
else:
logger.debug("CoreDNS reload triggered for all zones")
return True
def zone_exists(self, zone_name: str) -> bool:
session = self.Session()
try:
exists = (
session.query(Zone)
.filter_by(zone_name=self.dot_fqdn(zone_name))
.first()
is not None
)
logger.debug(f"Zone existence check for {zone_name}: {exists}")
return exists
except Exception as e:
logger.error(f"Zone existence check failed for {zone_name}: {e}")
return False
finally:
session.close()
def _ensure_zone_exists(self, session, zone_name: str) -> Zone:
"""Ensure a zone exists in the database, creating it if necessary"""
zone = session.query(Zone).filter_by(zone_name=self.dot_fqdn(zone_name)).first()
if not zone:
logger.debug(f"Creating new zone: {self.dot_fqdn(zone_name)}")
zone = Zone(zone_name=self.dot_fqdn(zone_name))
session.add(zone)
session.flush() # Get the zone ID
return zone
def _normalize_cname_data(self, zone_name: str, record_content: str) -> str:
"""Normalize CNAME record data to ensure consistent FQDN format.
This ensures CNAME targets are always stored as fully-qualified domain
names so that record comparison between the BIND zone source and the
database is deterministic.
Args:
zone_name: The zone name for relative-name expansion
record_content: The raw CNAME target from the parsed zone
Returns:
The normalized CNAME target string
"""
if record_content.startswith("@"):
logger.debug(f"CNAME target starts with '@', replacing with zone FQDN")
record_content = self.dot_fqdn(zone_name)
elif not record_content.endswith("."):
logger.debug(f"CNAME target {record_content} is relative, appending zone")
record_content = ".".join([record_content, self.dot_fqdn(zone_name)])
return record_content
def _parse_zone_to_record_set(
self, zone_name: str, zone_data: str
) -> Tuple[Set[Tuple[str, str, str, int]], Optional[Tuple[str, str, int]]]:
"""Parse a BIND zone file into a set of normalised record keys.
Returns:
Tuple of:
- set of (hostname, type, data, ttl) tuples for non-SOA records
- (hostname, soa_data, ttl) tuple for the SOA record, or None
"""
dns_zone = dns_zone_module.from_text(zone_data, check_origin=False)
records: Set[Tuple[str, str, str, int]] = set()
soa = None
for name, ttl, rdata in dns_zone.iterate_rdatas():
if rdata.rdclass != IN:
continue
record_name = str(name)
record_type = rdata.rdtype.name
record_content = rdata.to_text()
if record_type == "SOA":
soa = (record_name, record_content, ttl)
continue
if record_type == "CNAME":
record_content = self._normalize_cname_data(zone_name, record_content)
records.add((record_name, record_type, record_content, ttl))
return records, soa
def verify_zone_record_count(
self, zone_name: str, expected_count: int
) -> tuple[bool, int]:
"""Verify the record count in this backend matches the expected count
from the source (DirectAdmin) zone file.
Args:
zone_name: The zone to verify
expected_count: The number of records parsed from the source BIND zone
Returns:
Tuple of (matches: bool, actual_count: int)
"""
session = self.Session()
try:
zone = (
session.query(Zone)
.filter_by(zone_name=self.dot_fqdn(zone_name))
.first()
)
if not zone:
logger.warning(
f"[{self.instance_name}] Zone {zone_name} not found "
f"during record count verification"
)
return False, 0
actual_count = session.query(Record).filter_by(zone_id=zone.id).count()
matches = actual_count == expected_count
if not matches:
logger.warning(
f"[{self.instance_name}] Record count mismatch for "
f"{zone_name}: source zone has {expected_count} records, "
f"backend has {actual_count} records "
f"(difference: {actual_count - expected_count:+d})"
)
else:
logger.debug(
f"[{self.instance_name}] Record count verified for "
f"{zone_name}: {actual_count} records match source"
)
return matches, actual_count
except Exception as e:
logger.error(
f"[{self.instance_name}] Error verifying record count "
f"for {zone_name}: {e}"
)
return False, -1
finally:
session.close()
def reconcile_zone_records(
self, zone_name: str, zone_data: str
) -> Tuple[bool, int]:
"""Reconcile backend records against the authoritative BIND zone from
DirectAdmin. Any records in the backend that are **not** present in
the source zone will be deleted.
This is the post-write safety net: even though ``write_zone`` already
removes stale records during normal processing, this method catches
any extras that may have crept in via race conditions, manual edits,
or replication drift between MySQL nodes.
Args:
zone_name: The zone to reconcile
zone_data: The raw BIND zone file content (authoritative source)
Returns:
Tuple of (success: bool, records_removed: int)
"""
session = self.Session()
try:
zone = (
session.query(Zone)
.filter_by(zone_name=self.dot_fqdn(zone_name))
.first()
)
if not zone:
logger.warning(
f"[{self.instance_name}] Zone {zone_name} not found "
f"during reconciliation"
)
return False, 0
# Build the expected record set from the source BIND zone
source_records, source_soa = self._parse_zone_to_record_set(
zone_name, zone_data
)
# Build lookup keys (without TTL) matching write_zone's key format
expected_keys: Set[Tuple[str, str, str]] = {
(hostname, rtype, data) for hostname, rtype, data, _ in source_records
}
# Query all records currently in the backend for this zone
db_records = session.query(Record).filter_by(zone_id=zone.id).all()
removed = 0
for record in db_records:
# SOA records are managed separately skip them
if record.type == "SOA":
continue
key = (record.hostname, record.type, record.data)
if key not in expected_keys:
logger.debug(
f"[{self.instance_name}] Reconcile: removing extra "
f"record from {zone_name}: "
f"{record.hostname} {record.type} {record.data}"
)
session.delete(record)
removed += 1
if removed > 0:
session.commit()
logger.info(
f"[{self.instance_name}] Reconciliation for {zone_name}: "
f"removed {removed} extra record(s) not in source zone"
)
else:
logger.debug(
f"[{self.instance_name}] Reconciliation for {zone_name}: "
f"all records match source zone — no action needed"
)
return True, removed
except Exception as e:
logger.error(
f"[{self.instance_name}] Error reconciling records "
f"for {zone_name}: {e}"
)
session.rollback()
return False, 0
finally:
session.close()

View File

@@ -0,0 +1,332 @@
from typing import Optional, Dict, Set, Tuple, List
from sqlalchemy import (
create_engine,
Column,
String,
Integer,
Text,
Boolean,
DateTime,
func,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from loguru import logger
from .base import DNSBackend
from config import config
import time
Base = declarative_base()
class Domain(Base):
__tablename__ = "domains"
id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False, index=True, unique=True)
master = Column(String(128), nullable=True)
last_check = Column(Integer, nullable=True)
type = Column(String(6), nullable=False, default="NATIVE")
notified_serial = Column(Integer, nullable=True)
account = Column(String(40), nullable=True)
class Record(Base):
__tablename__ = "records"
id = Column(Integer, primary_key=True)
domain_id = Column(Integer, nullable=False, index=True)
name = Column(String(255), nullable=False, index=True)
type = Column(String(10), nullable=False)
content = Column(Text, nullable=False)
ttl = Column(Integer, nullable=True)
prio = Column(Integer, nullable=True)
change_date = Column(Integer, nullable=True)
disabled = Column(Boolean, nullable=False, default=False)
ordername = Column(String(255), nullable=True)
auth = Column(Boolean, nullable=False, default=True)
class PowerDNSMySQLBackend(DNSBackend):
@classmethod
def get_name(cls) -> str:
return "powerdns_mysql"
@classmethod
def is_available(cls) -> bool:
try:
import pymysql
return True
except ImportError:
logger.warning("PyMySQL not available - PowerDNS MySQL backend disabled")
return False
@staticmethod
def ensure_fqdn(name: str, zone_name: str) -> str:
"""Ensure name is fully qualified for PowerDNS"""
if name == "@" or name == "":
return zone_name
elif name.endswith("."):
return name.rstrip(".")
elif name == zone_name:
return name
else:
return f"{name}.{zone_name}"
def __init__(self, config: dict = None):
c = config or config.get("dns.backends.powerdns_mysql")
self.engine = create_engine(
f"mysql+pymysql://{c['username']}:{c['password']}@"
f"{c['host']}:{c['port']}/{c['database']}",
pool_pre_ping=True,
)
self.Session = scoped_session(sessionmaker(bind=self.engine))
Base.metadata.create_all(self.engine)
logger.info(f"Initialized PowerDNS MySQL backend for {c['database']}")
def _ensure_domain_exists(self, session, zone_name: str) -> Domain:
"""Ensure domain exists and return domain object"""
domain = session.query(Domain).filter_by(name=zone_name).first()
if not domain:
domain = Domain(name=zone_name, type="NATIVE")
session.add(domain)
session.flush() # Flush to get the domain ID
logger.info(f"Created new domain: {zone_name}")
return domain
def _parse_soa_content(self, soa_content: str) -> Dict[str, str]:
"""Parse SOA record content into components"""
parts = soa_content.split()
if len(parts) >= 7:
return {
"primary_ns": parts[0],
"hostmaster": parts[1],
"serial": parts[2],
"refresh": parts[3],
"retry": parts[4],
"expire": parts[5],
"minimum": parts[6],
}
return {}
def write_zone(self, zone_name: str, zone_data: str) -> bool:
from dns import zone as dns_zone_module
from dns.rdataclass import IN
session = self.Session()
try:
# Ensure domain exists
domain = self._ensure_domain_exists(session, zone_name)
# Get existing records for this domain
existing_records = {
(r.name, r.type): r
for r in session.query(Record).filter_by(domain_id=domain.id).all()
}
# Parse the zone data
dns_zone = dns_zone_module.from_text(zone_data, check_origin=False)
# Track records we process
current_records: Set[Tuple[str, str]] = set()
changes = {"added": 0, "updated": 0, "removed": 0}
current_time = int(time.time())
# Process all records
for name, ttl, rdata in dns_zone.iterate_rdatas():
if rdata.rdclass != IN:
continue
record_name = self.ensure_fqdn(str(name), zone_name)
record_type = rdata.rdtype.name
record_content = rdata.to_text()
record_ttl = ttl
record_prio = None
# Handle MX records priority
if record_type == "MX":
parts = record_content.split(" ", 1)
if len(parts) == 2:
record_prio = int(parts[0])
record_content = parts[1]
# Handle SRV records priority and other fields
elif record_type == "SRV":
parts = record_content.split(" ", 3)
if len(parts) == 4:
record_prio = int(parts[0])
record_content = f"{parts[1]} {parts[2]} {parts[3]}"
# Ensure CNAME and other records have proper FQDN format
if record_type in ["CNAME", "MX", "NS"]:
if not record_content.endswith(".") and record_content != "@":
if record_content == "@":
record_content = zone_name
elif "." not in record_content:
record_content = f"{record_content}.{zone_name}"
key = (record_name, record_type)
current_records.add(key)
if key in existing_records:
# Update existing record if needed
record = existing_records[key]
if (
record.content != record_content
or record.ttl != record_ttl
or record.prio != record_prio
):
record.content = record_content
record.ttl = record_ttl
record.prio = record_prio
record.change_date = current_time
record.disabled = False
changes["updated"] += 1
else:
# Add new record
new_record = Record(
domain_id=domain.id,
name=record_name,
type=record_type,
content=record_content,
ttl=record_ttl,
prio=record_prio,
change_date=current_time,
disabled=False,
auth=True,
)
session.add(new_record)
changes["added"] += 1
# Remove deleted records
for key in set(existing_records.keys()) - current_records:
session.delete(existing_records[key])
changes["removed"] += 1
session.commit()
logger.success(
f"Zone {zone_name} updated: "
f"+{changes['added']} ~{changes['updated']} -{changes['removed']}"
)
return True
except Exception as e:
session.rollback()
logger.error(f"Zone update failed for {zone_name}: {e}")
return False
finally:
session.close()
def delete_zone(self, zone_name: str) -> bool:
session = self.Session()
try:
# First find the domain
domain = session.query(Domain).filter_by(name=zone_name).first()
if not domain:
logger.warning(f"Domain {zone_name} not found for deletion")
return False
# Delete all records associated with the domain
count = session.query(Record).filter_by(domain_id=domain.id).delete()
# Delete the domain itself
session.delete(domain)
session.commit()
logger.info(f"Deleted domain {zone_name} with {count} records")
return True
except Exception as e:
session.rollback()
logger.error(f"Domain deletion failed for {zone_name}: {e}")
return False
finally:
session.close()
def reload_zone(self, zone_name: Optional[str] = None) -> bool:
"""PowerDNS reload - could trigger pdns_control reload if needed"""
if zone_name:
logger.debug(f"PowerDNS reload triggered for zone {zone_name}")
# Optional: Call pdns_control reload-zones here if needed
# subprocess.run(['pdns_control', 'reload-zones'], check=True)
else:
logger.debug("PowerDNS reload triggered for all zones")
# Optional: Call pdns_control reload here if needed
# subprocess.run(['pdns_control', 'reload'], check=True)
return True
def zone_exists(self, zone_name: str) -> bool:
session = self.Session()
try:
exists = session.query(Domain).filter_by(name=zone_name).first() is not None
logger.debug(f"Zone existence check for {zone_name}: {exists}")
return exists
except Exception as e:
logger.error(f"Zone existence check failed for {zone_name}: {e}")
return False
finally:
session.close()
def get_zone_records(self, zone_name: str) -> List[Dict]:
"""Get all records for a zone - useful for debugging/inspection"""
session = self.Session()
try:
domain = session.query(Domain).filter_by(name=zone_name).first()
if not domain:
return []
records = session.query(Record).filter_by(domain_id=domain.id).all()
return [
{
"name": r.name,
"type": r.type,
"content": r.content,
"ttl": r.ttl,
"prio": r.prio,
"disabled": r.disabled,
}
for r in records
]
except Exception as e:
logger.error(f"Failed to get records for {zone_name}: {e}")
return []
finally:
session.close()
def set_record_status(
self, zone_name: str, record_name: str, record_type: str, disabled: bool
) -> bool:
"""Enable/disable specific records"""
session = self.Session()
try:
domain = session.query(Domain).filter_by(name=zone_name).first()
if not domain:
logger.warning(f"Domain {zone_name} not found")
return False
full_name = self.ensure_fqdn(record_name, zone_name)
record = (
session.query(Record)
.filter_by(domain_id=domain.id, name=full_name, type=record_type)
.first()
)
if not record:
logger.warning(
f"Record {full_name} {record_type} not found in {zone_name}"
)
return False
record.disabled = disabled
record.change_date = int(time.time())
session.commit()
status = "disabled" if disabled else "enabled"
logger.info(f"Record {full_name} {record_type} {status} in {zone_name}")
return True
except Exception as e:
session.rollback()
logger.error(f"Failed to set record status: {e}")
return False
finally:
session.close()

View File

@@ -0,0 +1,55 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from vyper import v
import datetime
Base = declarative_base()
def connect(dbtype="sqlite", **kwargs):
if dbtype == "sqlite":
# Start SQLite engine
db_location = v.get("datastore.db_location")
if db_location == -1:
raise Exception("DB Type is sqlite but db_location is not defined")
else:
engine = create_engine(
"sqlite:///" + db_location, connect_args={"check_same_thread": False}
)
Base.metadata.create_all(engine)
return sessionmaker(bind=engine)()
elif dbtype == "mysql":
# Start a MySQL engine
db_user = v.get_string("datastore.user")
db_host = v.get_string("datastore.host")
db_name = v.get_string("datastore.name")
db_pass = v.get_string("datastore.pass")
db_port = v.get_string("datastore.port")
if (
not v.is_set("datastore.user")
or not v.is_set("datastore.name")
or not v.is_set("datastore.pass")
or not v.is_set("datastore.host")
):
raise Exception(
"DB Type is mysql but db_(host,name,and pass) are not populated"
)
else:
engine = create_engine(
"mysql+pymysql://"
+ db_user
+ ":"
+ db_pass
+ "@"
+ db_host
+ ":"
+ db_port
+ "/"
+ db_name
)
Base.metadata.create_all(engine)
return sessionmaker(bind=engine)()
else:
raise Exception("Unknown/unimplemented database type: {}".format(dbtype))

View File

@@ -1,28 +1,35 @@
from lib.db import Base
from sqlalchemy import Column, Integer, String, DateTime
class Key(Base):
__tablename__ = "keys"
id = Column(Integer, primary_key=True)
key = Column(String, unique=True)
name = Column(String)
expires = Column(DateTime)
service = Column(String)
def __repr__(self):
return "<Key(key='%s', name='%s', expires='%s', service='%s')>" % (
self.key, self.name, self.expires, self.service)
class Domain(Base):
__tablename__ = "domains"
id = Column(Integer, primary_key=True)
domain = Column(String, unique=True)
hostname = Column(String)
username = Column(String)
def __repr__(self):
return "<Domain(id='%s', domain='%s', hostname='%s', username='%s')>" % (
self.id, self.domain, self.hostname, self.username
)
from directdnsonly.app.db import Base
from sqlalchemy import Column, Integer, String, DateTime
class Key(Base):
__tablename__ = "keys"
id = Column(Integer, primary_key=True)
key = Column(String(255), unique=True)
name = Column(String(255))
expires = Column(DateTime)
service = Column(String(255))
def __repr__(self):
return "<Key(key='%s', name='%s', expires='%s', service='%s')>" % (
self.key,
self.name,
self.expires,
self.service,
)
class Domain(Base):
__tablename__ = "domains"
id = Column(Integer, primary_key=True)
domain = Column(String(255), unique=True)
hostname = Column(String(255))
username = Column(String(255))
def __repr__(self):
return "<Domain(id='%s', domain='%s', hostname='%s', username='%s')>" % (
self.id,
self.domain,
self.hostname,
self.username,
)

445
directdnsonly/app/reconciler.py Executable file
View File

@@ -0,0 +1,445 @@
#!/usr/bin/env python3
import threading
from urllib.parse import parse_qs
from loguru import logger
import requests
import requests.exceptions
from directdnsonly.app.db import connect
from directdnsonly.app.db.models import Domain
class ReconciliationWorker:
"""Periodically polls configured DirectAdmin servers and queues deletes
for any zones in our DB that no longer exist in DirectAdmin.
Safety rules:
- If a DA server is unreachable, skip it entirely — never delete on uncertainty
- Only touches domains registered via DaDNS (present in our `domains` table)
- Domains in CoreDNS but NOT in our DB are not our zones; left untouched
- Pushes to the existing delete_queue so the full delete path is exercised
"""
def __init__(self, delete_queue, reconciliation_config: dict):
self.delete_queue = delete_queue
self.enabled = reconciliation_config.get("enabled", False)
self.interval_seconds = reconciliation_config.get("interval_minutes", 60) * 60
self.servers = reconciliation_config.get("directadmin_servers") or []
self.verify_ssl = reconciliation_config.get("verify_ssl", True)
self.ipp = int(reconciliation_config.get("ipp", 1000))
self.dry_run = bool(reconciliation_config.get("dry_run", False))
self._stop_event = threading.Event()
self._thread = None
def start(self):
if not self.enabled:
logger.info("Reconciliation poller disabled — skipping")
return
if not self.servers:
logger.warning(
"Reconciliation enabled but no directadmin_servers configured"
)
return
self._stop_event.clear()
self._thread = threading.Thread(
target=self._run, daemon=True, name="reconciliation_worker"
)
self._thread.start()
server_names = [s.get("hostname", "?") for s in self.servers]
mode = "DRY-RUN" if self.dry_run else "LIVE"
logger.info(
f"Reconciliation poller started [{mode}] — "
f"interval: {self.interval_seconds // 60}m, "
f"servers: {server_names}"
)
if self.dry_run:
logger.warning(
"[reconciler] DRY-RUN mode active — orphans will be logged but NOT queued for deletion"
)
def stop(self):
self._stop_event.set()
if self._thread:
self._thread.join(timeout=10)
logger.info("Reconciliation poller stopped")
@property
def is_alive(self):
return self._thread is not None and self._thread.is_alive()
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _run(self):
logger.info("Reconciliation worker starting — running initial check now")
self._reconcile_all()
# Wait for interval or stop signal; returns True when stopped
while not self._stop_event.wait(timeout=self.interval_seconds):
self._reconcile_all()
def _reconcile_all(self):
logger.info(
f"[reconciler] Starting reconciliation pass across "
f"{len(self.servers)} server(s)"
)
total_queued = 0
# Build a map of all domains seen on all DA servers
all_da_domains = {} # domain -> hostname
for server in self.servers:
hostname = server.get("hostname")
if not hostname:
logger.warning("[reconciler] Server config missing hostname — skipping")
continue
try:
da_domains = self._fetch_da_domains(
hostname,
server.get("port", 2222),
server.get("username"),
server.get("password"),
server.get("ssl", True),
ipp=self.ipp,
)
if da_domains is not None:
for d in da_domains:
all_da_domains[d] = hostname
logger.debug(
f"[reconciler] {hostname}: {len(da_domains) if da_domains else 0} active domain(s) in DA"
)
except Exception as e:
logger.error(f"[reconciler] Unexpected error polling {hostname}: {e}")
# Now check local DB for all domains, update master if needed, and queue deletes only from recorded master
session = connect()
try:
all_local_domains = session.query(Domain).all()
migrated = 0
backfilled = 0
known_servers = {s.get("hostname") for s in self.servers}
for record in all_local_domains:
domain = record.domain
recorded_master = record.hostname
actual_master = all_da_domains.get(domain)
if actual_master:
if not recorded_master:
logger.info(
f"[reconciler] Domain '{domain}' hostname backfilled: '{actual_master}'"
)
record.hostname = actual_master
backfilled += 1
elif actual_master != recorded_master:
logger.warning(
f"[reconciler] Domain '{domain}' migrated: "
f"'{recorded_master}' -> '{actual_master}'. Updating local DB."
)
record.hostname = actual_master
migrated += 1
else:
# Only act if the recorded master is one we're polling
if recorded_master in known_servers:
if self.dry_run:
logger.warning(
f"[reconciler] [DRY-RUN] Would delete orphan: {record.domain} "
f"(master: {recorded_master})"
)
else:
self.delete_queue.put(
{
"domain": record.domain,
"hostname": record.hostname,
"username": record.username or "",
"source": "reconciler",
}
)
logger.debug(
f"[reconciler] Queued delete for orphan: {record.domain} "
f"(master: {recorded_master})"
)
total_queued += 1
if migrated or backfilled:
session.commit()
if backfilled:
logger.info(
f"[reconciler] {backfilled} domain(s) had missing hostname backfilled."
)
if migrated:
logger.info(
f"[reconciler] {migrated} domain(s) migrated to new master."
)
finally:
session.close()
if self.dry_run:
logger.info(
f"[reconciler] Reconciliation pass complete [DRY-RUN] — "
f"{total_queued} orphan(s) identified (none deleted)"
)
else:
logger.info(
f"[reconciler] Reconciliation pass complete — "
f"{total_queued} domain(s) queued for deletion"
)
def _fetch_da_domains(
self,
hostname: str,
port: int,
username: str,
password: str,
use_ssl: bool,
ipp: int = 1000,
):
"""Fetch all domains from a DA server via CMD_DNS_ADMIN (JSON, paging supported).
Returns a set of domain strings on success, or None on any failure.
"""
scheme = "https" if use_ssl else "http"
page = 1
all_domains = set()
total_pages = 1
cookies = None
try:
while page <= total_pages:
url = f"{scheme}://{hostname}:{port}/CMD_DNS_ADMIN?json=yes&page={page}&ipp={ipp}"
req_kwargs = dict(
timeout=30,
verify=self.verify_ssl,
allow_redirects=False,
)
if cookies:
req_kwargs["cookies"] = cookies
else:
req_kwargs["auth"] = (username, password)
response = requests.get(url, **req_kwargs)
if response.is_redirect or response.status_code in (
301,
302,
303,
307,
308,
):
if not cookies:
logger.debug(
f"[reconciler] {hostname}:{port} redirected Basic Auth "
f"(HTTP {response.status_code}) — attempting session login (DA Evo)"
)
cookies = self._da_session_login(
scheme, hostname, port, username, password
)
if cookies is None:
return None
continue # retry this page with cookies
else:
logger.error(
f"[reconciler] {hostname}:{port} still redirecting after session login — "
f"check that '{username}' has admin-level access. Skipping."
)
return None
response.raise_for_status()
content_type = response.headers.get("Content-Type", "")
if "text/html" in content_type:
logger.error(
f"[reconciler] {hostname}:{port} returned HTML instead of API response — "
f"check credentials and admin-level access. Skipping."
)
return None
# Try JSON first
try:
data = response.json()
# Domains are in keys '0', '1', ...
for k, v in data.items():
if k.isdigit() and isinstance(v, dict) and "domain" in v:
all_domains.add(v["domain"].strip().lower())
# Paging info
info = data.get("info", {})
total_pages = int(info.get("total_pages", 1))
page += 1
continue
except Exception as e:
logger.error(
f"[reconciler] JSON decode failed for {hostname}:{port} page {page}: {e}\nRaw response: {response.text[:500]}"
)
# Fallback to legacy parser
domains = self._parse_da_domain_list(response.text)
all_domains.update(domains)
break # No paging in legacy mode
return all_domains
except requests.exceptions.SSLError as e:
logger.error(
f"[reconciler] SSL error connecting to {hostname}:{port}{e}. "
f"Set verify_ssl: false in reconciliation config if using self-signed certs."
)
return None
except requests.exceptions.ConnectionError as e:
logger.error(
f"[reconciler] Cannot reach {hostname}:{port}{e}. "
f"Skipping this server."
)
return None
except requests.exceptions.Timeout:
logger.error(
f"[reconciler] Timeout connecting to {hostname}:{port}. "
f"Skipping this server."
)
return None
except requests.exceptions.HTTPError as e:
logger.error(
f"[reconciler] HTTP {response.status_code} from {hostname}:{port}{e}. "
f"Skipping this server."
)
return None
except Exception as e:
logger.error(f"[reconciler] Unexpected error fetching from {hostname}: {e}")
return None
def _da_session_login(
self, scheme: str, hostname: str, port: int, username: str, password: str
):
"""POST to CMD_LOGIN to obtain a DA Evo session cookie.
Returns a RequestsCookieJar on success, or None on failure.
"""
login_url = f"{scheme}://{hostname}:{port}/CMD_LOGIN"
try:
response = requests.post(
login_url,
data={
"username": username,
"password": password,
"referer": "/CMD_DNS_ADMIN?json=yes&page=1&ipp=500",
},
timeout=30,
verify=self.verify_ssl,
allow_redirects=False,
)
if not response.cookies:
logger.error(
f"[reconciler] {hostname}:{port} CMD_LOGIN returned no session cookie — "
f"check username/password."
)
return None
logger.debug(
f"[reconciler] {hostname}:{port} session login successful (DA Evo)"
)
return response.cookies
except Exception as e:
logger.error(f"[reconciler] {hostname}:{port} session login failed: {e}")
return None
@staticmethod
def _parse_da_domain_list(body: str) -> set:
"""Parse DA's CMD_API_SHOW_ALL_DOMAINS response.
DA returns URL-encoded key=value pairs, either on one line or newline-
separated. The domain list uses the key 'list[]'.
Example response:
list[]=example.com&list[]=example2.com
"""
# Normalise newline-separated responses to a single query string
normalised = body.replace("\n", "&").strip("&")
params = parse_qs(normalised)
domains = params.get("list[]", [])
return {d.strip().lower() for d in domains if d.strip()}
if __name__ == "__main__":
import argparse
import sys
from queue import Queue
parser = argparse.ArgumentParser(
description="Test DirectAdmin domain fetcher (JSON/paging)"
)
parser.add_argument("--hostname", required=True, help="DirectAdmin server hostname")
parser.add_argument(
"--port", type=int, default=2222, help="DirectAdmin port (default: 2222)"
)
parser.add_argument("--username", required=True, help="DirectAdmin admin username")
parser.add_argument("--password", required=True, help="DirectAdmin admin password")
parser.add_argument("--ssl", action="store_true", help="Use HTTPS (default: True)")
parser.add_argument(
"--no-ssl", dest="ssl", action="store_false", help="Use HTTP (not recommended)"
)
parser.set_defaults(ssl=True)
parser.add_argument(
"--verify-ssl", action="store_true", help="Verify SSL certs (default: True)"
)
parser.add_argument(
"--no-verify-ssl",
dest="verify_ssl",
action="store_false",
help="Don't verify SSL certs",
)
parser.set_defaults(verify_ssl=True)
parser.add_argument(
"--ipp", type=int, default=1000, help="Items per page (default: 1000)"
)
parser.add_argument(
"--print-json",
action="store_true",
help="Print raw JSON response for first page",
)
args = parser.parse_args()
# Minimal config for testing
config = {
"enabled": True,
"directadmin_servers": [
{
"hostname": args.hostname,
"port": args.port,
"username": args.username,
"password": args.password,
"ssl": args.ssl,
}
],
"verify_ssl": args.verify_ssl,
}
q = Queue()
worker = ReconciliationWorker(q, config)
server = config["directadmin_servers"][0]
print(
f"Fetching domains from {server['hostname']}:{server['port']} (ipp={args.ipp})..."
)
# Directly call the fetch method for testing
domains = worker._fetch_da_domains(
server["hostname"],
server.get("port", 2222),
server.get("username"),
server.get("password"),
server.get("ssl", True),
ipp=args.ipp,
)
if domains is None:
print("Failed to fetch domains.", file=sys.stderr)
sys.exit(1)
print(f"Fetched {len(domains)} domains:")
for d in sorted(domains):
print(d)
if args.print_json:
# Print the first page's raw JSON for inspection
scheme = "https" if server.get("ssl", True) else "http"
url = f"{scheme}://{server['hostname']}:{server.get('port', 2222)}/CMD_DNS_ADMIN?json=yes&page=1&ipp={args.ipp}"
resp = requests.get(
url,
auth=(server.get("username"), server.get("password")),
timeout=30,
verify=args.verify_ssl,
allow_redirects=False,
)
try:
print("\nRaw JSON for first page:")
print(resp.json())
except Exception:
print("(Could not parse JSON)")

View File

@@ -0,0 +1,50 @@
from loguru import logger
from directdnsonly.app.db.models import *
from directdnsonly.app.db import connect
def check_zone_exists(zone_name):
# Check if zone is present in the index
session = connect()
logger.debug("Checking if {} is present in the DB".format(zone_name))
domain_exists = bool(session.query(Domain.id).filter_by(domain=zone_name).first())
logger.debug("Returned from query: {}".format(domain_exists))
if domain_exists:
return True
else:
return False
def put_zone_index(zone_name, host_name, user_name):
# add a new zone to index
session = connect()
logger.debug("Placed zone into database.. {}".format(str(zone_name)))
domain = Domain(domain=zone_name, hostname=host_name, username=user_name)
session.add(domain)
session.commit()
def get_domain_record(zone_name):
"""Return the Domain record for zone_name, or None if not found"""
session = connect()
return session.query(Domain).filter_by(domain=zone_name).first()
def check_parent_domain_owner(zone_name):
"""Return True if the immediate parent domain of zone_name exists in the DB"""
parent_domain = ".".join(zone_name.split(".")[1:])
if not parent_domain:
return False
session = connect()
logger.debug("Checking if parent domain {} exists in DB".format(parent_domain))
return bool(session.query(Domain.id).filter_by(domain=parent_domain).first())
def get_parent_domain_record(zone_name):
"""Return the Domain record for the parent of zone_name, or None"""
parent_domain = ".".join(zone_name.split(".")[1:])
if not parent_domain:
return None
session = connect()
return session.query(Domain).filter_by(domain=parent_domain).first()

View File

@@ -0,0 +1,67 @@
from dns import zone, name
from dns.rdataclass import IN
from dns.exception import DNSException
from loguru import logger
def validate_and_normalize_zone(zone_data: str, domain_name: str) -> str:
"""
Normalize zone file content and ensure proper origin handling
Returns normalized zone data
Raises DNSException on validation failure
"""
# Ensure domain ends with dot
if not domain_name.endswith("."):
domain_name = f"{domain_name}."
# Add $ORIGIN if missing
if "$ORIGIN" not in zone_data:
zone_data = f"$ORIGIN {domain_name}\n{zone_data}"
# Add $TTL if missing
if "$TTL" not in zone_data:
zone_data = f"$TTL 300\n{zone_data}"
# Validate the zone
try:
zone.from_text(
zone_data, origin=name.from_text(domain_name), check_origin=False
)
return zone_data
except DNSException as e:
logger.error(f"Zone validation failed: {e}")
raise ValueError(f"Invalid zone data: {str(e)}")
def count_zone_records(zone_data: str, domain_name: str) -> int:
"""Count the number of individual DNS records in a parsed BIND zone file.
This counts every individual resource record (each A, AAAA, MX, TXT, etc.)
the same way the CoreDNS MySQL backend stores them — one row per record.
Args:
zone_data: The raw or normalized BIND zone file content
domain_name: The domain name for the zone
Returns:
The total number of individual records in the zone
"""
if not domain_name.endswith("."):
domain_name = f"{domain_name}."
try:
dns_zone = zone.from_text(
zone_data, origin=name.from_text(domain_name), check_origin=False
)
count = 0
for _, _, rdata in dns_zone.iterate_rdatas():
if rdata.rdclass == IN:
count += 1
logger.debug(f"Source zone {domain_name} contains {count} records")
return count
except DNSException as e:
logger.error(f"Failed to count records for {domain_name}: {e}")
return -1

View File

@@ -0,0 +1,74 @@
from vyper import v, Vyper
from loguru import logger
# from vyper.config import Config
import os
from pathlib import Path
from typing import Any, Dict
def load_config() -> Vyper:
# Initialize Vyper
v.set_config_name("app") # Looks for app.yaml/app.yml
# Bundled config colocated with this module (always present in the package)
v.add_config_path(str(Path(__file__).parent))
v.add_config_path(".") # Search in current directory
v.add_config_path("./config")
v.set_env_prefix("DADNS")
v.set_env_key_replacer("_", ".")
v.automatic_env()
# Set defaults for all required parameters
v.set_default("log_level", "info")
v.set_default("queue_location", "./data/queues")
v.set_default("timezone", "Pacific/Aucland")
# Set defaults for app
v.set_default("app.listen_port", 2222)
v.set_default("app.proxy_support", True)
v.set_default("app.proxy_support_base", "http://127.0.0.1")
v.set_default("app.log_level", "debug")
v.set_default("app.log_to", "file")
v.set_default("app.ssl_enable", "false")
v.set_default("app.listen_port", 2222)
v.set_default("app.token_valid_for_days", 30)
v.set_default("app.queue_location", "conf/queues")
v.set_default("app.auth_username", "directdnsonly")
v.set_default("app.auth_password", "changeme")
v.set_default("timezone", "Pacific/Auckland")
# DNS backend defaults
v.set_default("dns.backends.bind.enabled", False)
v.set_default("dns.backends.bind.zones_dir", "/etc/named/zones")
v.set_default("dns.backends.bind.named_conf", "/etc/named.conf.local")
v.set_default("dns.backends.coredns_mysql.enabled", False)
v.set_default("dns.backends.coredns_mysql.host", "localhost")
v.set_default("dns.backends.coredns_mysql.port", 3306)
v.set_default("dns.backends.coredns_mysql.database", "coredns")
v.set_default("dns.backends.coredns_mysql.username", "coredns")
v.set_default("dns.backends.coredns_mysql.password", "")
v.set_default("dns.backends.coredns_mysql.table_name", "records")
# Set Defaults Datastore
v.set_default("datastore.type", "sqlite")
v.set_default("datastore.port", 3306)
v.set_default("datastore.db_location", "data/directdns.db")
# Reconciliation poller defaults
v.set_default("reconciliation.enabled", False)
v.set_default("reconciliation.dry_run", False)
v.set_default("reconciliation.interval_minutes", 60)
v.set_default("reconciliation.verify_ssl", True)
# Read configuration
try:
if not v.read_in_config():
logger.warning("No config file found, using defaults")
except Exception:
logger.warning("No config file found, using defaults")
return v
# Global config instance
config = load_config()

View File

@@ -0,0 +1,56 @@
---
timezone: Pacific/Auckland
log_level: INFO
queue_location: ./data/queues
app:
auth_username: directdnsonly
auth_password: changeme # Override via DADNS_APP_AUTH_PASSWORD env var
# Reconciliation poller — queries each DA server and removes orphaned zones
# Disabled by default. Only touches zones registered via DaDNS (in our DB).
# If a DA server is unreachable, that server is skipped entirely.
#reconciliation:
# enabled: true
# dry_run: true # log orphans but do NOT queue deletes — safe first-run mode
# interval_minutes: 60
# verify_ssl: true # set false for self-signed DA certs
# ipp: 1000 # items per page when polling DA (default 1000)
# directadmin_servers:
# - hostname: da1.example.com
# port: 2222
# username: admin
# password: secret
# ssl: true
# - hostname: da2.example.com
# port: 2222
# username: admin
# password: secret
# ssl: true
dns:
default_backend: bind
backends:
bind:
type: bind
enabled: true
zones_dir: ./data/zones
named_conf: ./data/named.conf.include
coredns_dc1:
type: coredns_mysql
enabled: true
host: "mysql-dc1"
port: 3306
database: "coredns"
username: "coredns"
password: "coredns123"
table_name: "records"
coredns_dc2:
type: coredns_mysql
enabled: true
host: "mysql-dc2"
port: 3306
database: "coredns"
username: "coredns"
password: "coredns123"
table_name: "records"

119
directdnsonly/main.py Normal file
View File

@@ -0,0 +1,119 @@
from loguru import logger
import cherrypy
from app.backends import BackendRegistry
from app.api.admin import DNSAdminAPI
from app.api.health import HealthAPI
from app import configure_logging
from worker import WorkerManager
from directdnsonly.config import config
from directdnsonly.app.db import connect
import importlib.metadata
app_version = importlib.metadata.version("directdnsonly")
class Root:
pass
def main():
try:
# Initialize logging
configure_logging()
logger.info("Starting DaDNS server initialization")
# Initialize backend registry
registry = BackendRegistry()
available_backends = registry.get_available_backends()
logger.info(f"Available backend instances: {list(available_backends.keys())}")
global session
try:
session = connect(config.get("datastore.type"))
except Exception as e:
logger.error(str(e))
print("ERROR: " + str(e))
exit(1)
logger.info("Database Connected!")
# Setup worker manager
reconciliation_config = config.get("reconciliation") or {}
worker_manager = WorkerManager(
queue_path=config.get("queue_location"),
backend_registry=registry,
reconciliation_config=reconciliation_config,
)
worker_manager.start()
logger.info(
f"Worker manager started with queue path: {config.get('queue_location')}"
)
# Configure CherryPy
user_password_dict = {
config.get_string("app.auth_username"): config.get_string(
"app.auth_password"
)
}
check_password = cherrypy.lib.auth_basic.checkpassword_dict(user_password_dict)
cherrypy.config.update(
{
"server.socket_host": "0.0.0.0",
"server.socket_port": config.get_int("app.listen_port"),
"tools.proxy.on": config.get_bool("app.proxy_support"),
"tools.proxy.base": config.get_string("app.proxy_support_base"),
"tools.auth_basic.on": True,
"tools.auth_basic.realm": "dadns",
"tools.auth_basic.checkpassword": check_password,
"tools.response_headers.on": True,
"tools.response_headers.headers": [
("Server", "DirectDNS v" + app_version)
],
"environment": config.get("environment"),
}
)
if config.get_bool("app.ssl_enable"):
cherrypy.config.update(
{
"server.ssl_module": "builtin",
"server.ssl_certificate": config.get("app.ssl_cert"),
"server.ssl_private_key": config.get("app.ssl_key"),
"server.ssl_certificate_chain": config.get("ssl_bundle"),
}
)
# cherrypy.log.error_log.propagate = False
if config.get_string("app.log_level").upper() != "DEBUG":
cherrypy.log.access_log.propagate = False
# Mount applications
root = Root()
root = DNSAdminAPI(
save_queue=worker_manager.save_queue,
delete_queue=worker_manager.delete_queue,
backend_registry=registry,
)
root.health = HealthAPI(registry)
# Add queue status endpoint
root.queue_status = lambda: worker_manager.queue_status()
cherrypy.tree.mount(root, "/")
cherrypy.engine.start()
logger.success(f"Server started on port {config.get_int('app.listen_port')}")
# Add shutdown handler
cherrypy.engine.subscribe("stop", worker_manager.stop)
cherrypy.engine.block()
except Exception as e:
logger.critical(f"Server startup failed: {e}")
if "worker_manager" in locals():
worker_manager.stop()
raise
if __name__ == "__main__":
main()

455
directdnsonly/worker.py Normal file
View File

@@ -0,0 +1,455 @@
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from loguru import logger
from persistqueue import Queue
from persistqueue.exceptions import Empty
from app.utils import check_zone_exists, put_zone_index
from app.utils.zone_parser import count_zone_records
from directdnsonly.app.db.models import Domain
from directdnsonly.app.db import connect
from directdnsonly.app.reconciler import ReconciliationWorker
class WorkerManager:
def __init__(
self, queue_path: str, backend_registry, reconciliation_config: dict = None
):
self.queue_path = queue_path
self.backend_registry = backend_registry
self._running = False
self._save_thread = None
self._delete_thread = None
self._reconciler = None
self._reconciliation_config = reconciliation_config or {}
# Initialize queues with error handling
try:
os.makedirs(queue_path, exist_ok=True)
self.save_queue = Queue(f"{queue_path}/save")
self.delete_queue = Queue(f"{queue_path}/delete")
logger.success(f"Initialized queues at {queue_path}")
except Exception as e:
logger.critical(f"Failed to initialize queues: {e}")
raise
def _process_save_queue(self):
"""Main worker loop for processing save requests"""
logger.info("Save queue worker started")
# Get DB Connection
session = connect()
# Batch tracking
batch_start = None
batch_processed = 0
batch_failed = 0
while self._running:
try:
item = self.save_queue.get(block=True, timeout=5)
# Start a new batch timer on the first item
if batch_start is None:
batch_start = time.monotonic()
batch_processed = 0
batch_failed = 0
pending = self.save_queue.qsize()
logger.info(
f"📥 Batch started — {pending + 1} zone(s) queued "
f"for processing"
)
logger.debug(
f"Processing zone update for {item.get('domain', 'unknown')}"
)
if not check_zone_exists(item.get("domain")):
put_zone_index(
item.get("domain"), item.get("hostname"), item.get("username")
)
# Validate item structure
if not all(k in item for k in ["domain", "zone_file"]):
logger.error(f"Invalid queue item: {item}")
self.save_queue.task_done()
batch_failed += 1
continue
# Process with all available backends
backends = self.backend_registry.get_available_backends()
if not backends:
logger.warning("No active backends available!")
if len(backends) > 1:
# Process backends in parallel for faster sync
logger.debug(
f"Processing {item['domain']} across "
f"{len(backends)} backends concurrently: "
f"{', '.join(backends.keys())}"
)
self._process_backends_parallel(backends, item, session)
else:
# Single backend, no need for thread overhead
for backend_name, backend in backends.items():
self._process_single_backend(
backend_name, backend, item, session
)
self.save_queue.task_done()
batch_processed += 1
logger.debug(f"Completed processing for {item['domain']}")
except Empty:
# Queue is empty — if we were in a batch, log the summary
if batch_start is not None:
elapsed = time.monotonic() - batch_start
total = batch_processed + batch_failed
rate = batch_processed / elapsed if elapsed > 0 else 0
logger.success(
f"📦 Batch complete — {batch_processed}/{total} zone(s) "
f"processed successfully in {elapsed:.1f}s "
f"({rate:.1f} zones/sec)"
+ (f", {batch_failed} failed" if batch_failed else "")
)
batch_start = None
batch_processed = 0
batch_failed = 0
continue
except Exception as e:
logger.error(f"Unexpected worker error: {e}")
batch_failed += 1
time.sleep(1) # Prevent tight error loops
def _process_single_backend(self, backend_name, backend, item, session):
"""Process a zone update for a single backend"""
try:
logger.debug(f"Using backend: {backend_name}")
if backend.write_zone(item["domain"], item["zone_file"]):
logger.debug(f"Successfully updated {item['domain']} in {backend_name}")
if backend.get_name() == "bind":
# Need to update the named.conf
backend.update_named_conf(
[d.domain for d in session.query(Domain).all()]
)
# Reload all zones
backend.reload_zone()
else:
backend.reload_zone(zone_name=item["domain"])
# Verify record count matches the source zone from DirectAdmin
self._verify_backend_record_count(
backend_name, backend, item["domain"], item["zone_file"]
)
else:
logger.error(f"Failed to update {item['domain']} in {backend_name}")
except Exception as e:
logger.error(f"Error in {backend_name}: {str(e)}")
def _process_delete_queue(self):
"""Worker loop for processing zone deletion requests"""
logger.info("Delete queue worker started")
session = connect()
while self._running:
try:
item = self.delete_queue.get(block=True, timeout=5)
domain = item.get("domain")
hostname = item.get("hostname", "")
logger.debug(f"Processing delete for {domain}")
record = session.query(Domain).filter_by(domain=domain).first()
if not record:
logger.warning(f"Domain {domain} not found in DB — skipping delete")
self.delete_queue.task_done()
continue
if record.hostname and record.hostname != hostname:
logger.warning(
f"Hostname mismatch for {domain}: registered on "
f"{record.hostname}, delete requested from {hostname} — rejected"
)
self.delete_queue.task_done()
continue
if not record.hostname:
logger.warning(
f"No origin hostname stored for {domain}"
f"skipping ownership check, proceeding with delete"
)
backends = self.backend_registry.get_available_backends()
remaining_domains = [d.domain for d in session.query(Domain).all()]
delete_success = True
if not backends:
logger.warning(
f"No active backends — {domain} will be removed from DB only"
)
elif len(backends) > 1:
# Parallel delete, track failures
results = []
def delete_backend_wrapper(
backend_name, backend, domain, remaining_domains
):
try:
return backend.delete_zone(domain)
except Exception as e:
logger.error(
f"Error deleting {domain} from {backend_name}: {e}"
)
return False
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=len(backends)) as executor:
futures = {
executor.submit(
delete_backend_wrapper,
backend_name,
backend,
domain,
remaining_domains,
): backend_name
for backend_name, backend in backends.items()
}
for future in as_completed(futures):
backend_name = futures[future]
try:
result = future.result()
results.append(result)
if not result:
logger.error(
f"Failed to delete {domain} from {backend_name}"
)
except Exception as e:
logger.error(
f"Unhandled error deleting from {backend_name}: {e}"
)
results.append(False)
delete_success = all(results)
else:
# Single backend
for backend_name, backend in backends.items():
try:
result = backend.delete_zone(domain)
if not result:
logger.error(
f"Failed to delete {domain} from {backend_name}"
)
delete_success = False
except Exception as e:
logger.error(
f"Error deleting {domain} from {backend_name}: {e}"
)
delete_success = False
if delete_success:
session.delete(record)
session.commit()
logger.info(f"Removed {domain} from database")
self.delete_queue.task_done()
logger.success(f"Delete completed for {domain}")
else:
logger.error(
f"Delete failed for {domain} on one or more backends — DB record retained"
)
self.delete_queue.task_done()
except Empty:
continue
except Exception as e:
logger.error(f"Unexpected delete worker error: {e}")
time.sleep(1)
def _delete_single_backend(self, backend_name, backend, domain, remaining_domains):
"""Delete a zone from a single backend"""
try:
if backend.delete_zone(domain):
logger.debug(f"Deleted {domain} from {backend_name}")
if backend.get_name() == "bind":
backend.update_named_conf(remaining_domains)
backend.reload_zone()
else:
backend.reload_zone(zone_name=domain)
else:
logger.error(f"Failed to delete {domain} from {backend_name}")
except Exception as e:
logger.error(f"Error deleting {domain} from {backend_name}: {e}")
def _process_backends_delete_parallel(self, backends, domain, remaining_domains):
"""Delete a zone from multiple backends in parallel"""
start_time = time.monotonic()
with ThreadPoolExecutor(
max_workers=len(backends),
thread_name_prefix="backend_del",
) as executor:
futures = {
executor.submit(
self._delete_single_backend,
backend_name,
backend,
domain,
remaining_domains,
): backend_name
for backend_name, backend in backends.items()
}
for future in as_completed(futures):
backend_name = futures[future]
try:
future.result()
except Exception as e:
logger.error(f"Unhandled error deleting from {backend_name}: {e}")
elapsed = (time.monotonic() - start_time) * 1000
logger.debug(
f"Parallel delete of {domain} across "
f"{len(backends)} backends completed in {elapsed:.0f}ms"
)
def _process_backends_parallel(self, backends, item, session):
"""Process zone updates across multiple backends in parallel"""
start_time = time.monotonic()
with ThreadPoolExecutor(
max_workers=len(backends), thread_name_prefix="backend"
) as executor:
futures = {
executor.submit(
self._process_single_backend, backend_name, backend, item, session
): backend_name
for backend_name, backend in backends.items()
}
for future in as_completed(futures):
backend_name = futures[future]
try:
future.result()
except Exception as e:
logger.error(
f"Unhandled error processing backend "
f"{backend_name}: {str(e)}"
)
elapsed = (time.monotonic() - start_time) * 1000
logger.debug(
f"Parallel processing of {item['domain']} across "
f"{len(backends)} backends completed in {elapsed:.0f}ms"
)
def _verify_backend_record_count(self, backend_name, backend, zone_name, zone_data):
"""Verify and reconcile the backend record count against the
authoritative BIND zone from DirectAdmin.
After a successful write, this method checks whether the number of
records stored in the backend matches the number of records parsed
from the source zone file. If there are **extra** records in the
backend (e.g. from replication drift or stale data) they are
automatically removed via the backend's reconcile method.
Args:
backend_name: Display name of the backend instance
backend: The backend instance
zone_name: The zone that was just written
zone_data: The raw BIND zone file content (authoritative source)
"""
try:
expected = count_zone_records(zone_data, zone_name)
if expected < 0:
logger.warning(
f"[{backend_name}] Could not parse source zone for "
f"{zone_name} — skipping record count verification"
)
return
matches, actual = backend.verify_zone_record_count(zone_name, expected)
if matches:
return # All good
if actual > expected:
logger.warning(
f"[{backend_name}] Backend has {actual - expected} extra "
f"record(s) for {zone_name} — reconciling against "
f"DirectAdmin source zone"
)
success, removed = backend.reconcile_zone_records(zone_name, zone_data)
if success and removed > 0:
# Verify again after reconciliation
matches, new_count = backend.verify_zone_record_count(
zone_name, expected
)
if matches:
logger.success(
f"[{backend_name}] Reconciliation successful for "
f"{zone_name}: removed {removed} extra record(s), "
f"count now matches source ({new_count})"
)
else:
logger.error(
f"[{backend_name}] Reconciliation for {zone_name} "
f"removed {removed} record(s) but count still "
f"mismatched: expected {expected}, got {new_count}"
)
else:
logger.warning(
f"[{backend_name}] Backend has fewer records than source "
f"for {zone_name} (expected {expected}, got {actual}) — "
f"this may indicate a write failure; the next zone push "
f"from DirectAdmin should correct this"
)
except NotImplementedError:
logger.debug(
f"[{backend_name}] Record count verification not "
f"supported — skipping"
)
except Exception as e:
logger.error(
f"[{backend_name}] Error during record count verification "
f"for {zone_name}: {e}"
)
def start(self):
"""Start background workers"""
if self._running:
return
self._running = True
self._save_thread = threading.Thread(
target=self._process_save_queue, daemon=True, name="save_queue_worker"
)
self._delete_thread = threading.Thread(
target=self._process_delete_queue, daemon=True, name="delete_queue_worker"
)
self._save_thread.start()
self._delete_thread.start()
logger.info(
f"Started worker threads: {self._save_thread.name}, {self._delete_thread.name}"
)
self._reconciler = ReconciliationWorker(
delete_queue=self.delete_queue,
reconciliation_config=self._reconciliation_config,
)
self._reconciler.start()
def stop(self):
"""Stop background workers gracefully"""
self._running = False
if self._reconciler:
self._reconciler.stop()
if self._save_thread:
self._save_thread.join(timeout=5)
if self._delete_thread:
self._delete_thread.join(timeout=5)
logger.info("Workers stopped")
def queue_status(self):
"""Return current queue status"""
return {
"save_queue_size": self.save_queue.qsize(),
"delete_queue_size": self.delete_queue.qsize(),
"save_worker_alive": self._save_thread and self._save_thread.is_alive(),
"delete_worker_alive": self._delete_thread
and self._delete_thread.is_alive(),
"reconciler_alive": (
self._reconciler.is_alive if self._reconciler else False
),
}

View File

@@ -1,48 +1,52 @@
version: '3.7'
services:
app:
image: registry.dockerprod.ultrafast.co.nz/uff/apikeyhandler:0.10
networks:
- traefik-net
volumes:
- /etc/localtime:/etc/localtime:ro # Mount Timezone config to container
- /data/swarm-vols/apikeyhandler:/opt/apikeyhandler/config # Store Config on Persistent drive shared between nodes
deploy:
mode: replicated
replicas: 1
placement:
constraints:
- node.role == worker # Place this service on Worker Nodes alternatively may specify manager if you want service on manager node.
labels:
- "traefik.http.routers.apikeyauth.rule=Host(`apiauth-internal.dockertest.ultrafast.co.nz`)" # This label creates a route Traefik will listen on
- "traefik.http.routers.apikeyauth.tls=true" # Enable TLS, in this example using default TLS cert
- "traefik.http.services.apikeyauth.loadbalancer.server.port=8080" # Set Port to proxy
- "traefik.enable=true" # This flag enables load balancing through Traefik :)
- "traefik.docker.network=traefik-net" # Set the network to connect to container on
- "traefik.http.middlewares.apikeyauth.forwardauth.address=https://apiauth-internal.dockertest.ultrafast.co.nz"
- "traefik.http.middlewares.apikeyauth.forwardauth.trustForwardHeader=true"
- "traefik.http.middlewares.apikeyauth.forwardauth.authResponseHeaders=X-Client-Id"
- "traefik.http.middlewares.apikeyauth.forwardauth.tls.insecureSkipVerify=true"
test_app:
image: containous/whoami
networks:
- traefik-net
volumes:
- /etc/localtime:/etc/localtime:ro # Mount Timezone config to container
deploy:
mode: replicated
replicas: 1
placement:
constraints:
- node.role == worker # Place this service on Worker Nodes alternatively may specify manager if you want service on manager node.
labels:
- "traefik.http.routers.testapp.rule=Host(`testapp.dockertest.ultrafast.co.nz`)" # This label creates a route Traefik will listen on
- "traefik.http.routers.testapp.tls=true" # Enable TLS, in this example using default TLS cert
- "traefik.http.routers.testapp.middlewares=apikeyauth"
- "traefik.http.services.testapp.loadbalancer.server.port=80" # Set Port to proxy
- "traefik.enable=true" # This flag enables load balancing through Traefik :)
- "traefik.docker.network=traefik-net" # Set the network to connect to container on
version: '3.8'
networks:
traefik-net:
external: true
services:
mysql:
image: mysql:8.0
container_name: dadns_mysql
environment:
MYSQL_ROOT_PASSWORD: rootpassword
MYSQL_DATABASE: coredns
MYSQL_USER: coredns
MYSQL_PASSWORD: coredns123
ports:
- "3306:3306"
volumes:
- ./schema/coredns_mysql.sql:/docker-entrypoint-initdb.d/init.sql
- mysql_data:/var/lib/mysql
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
interval: 5s
timeout: 5s
retries: 5
dadns:
build:
dockerfile: Dockerfile.deepseek
context: .
no_cache: false
container_name: dadns_app
depends_on:
mysql:
condition: service_healthy
ports:
- "2222:2222"
volumes:
- ./config:/app/config
- ./data:/app/data
- ./logs:/app/logs
environment:
- TZ=Pacific/Auckland
- DNS_BACKENDS__BIND__ENABLED=true
- DNS_BACKENDS__BIND__ZONES_DIR=/etc/named/zones/dadns
- DNS_BACKENDS__BIND__NAMED_CONF=/etc/bind/named.conf.local
- DNS_BACKENDS__COREDNS_MYSQL__ENABLED=true
- DNS_BACKENDS__COREDNS_MYSQL__HOST=mysql
- DNS_BACKENDS__COREDNS_MYSQL__PORT=3306
- DNS_BACKENDS__COREDNS_MYSQL__DATABASE=coredns
- DNS_BACKENDS__COREDNS_MYSQL__USERNAME=coredns
- DNS_BACKENDS__COREDNS_MYSQL__PASSWORD=coredns123
restart: unless-stopped
volumes:
mysql_data:

12
docker/entrypoint.sh Executable file
View File

@@ -0,0 +1,12 @@
#!/bin/bash
# Start BIND
/usr/sbin/named -u bind -f &
## Initialize MySQL schema if needed
#if [ -f /app/schema/coredns_mysql.sql ]; then
# mysql -h mysql -u root -prootpassword coredns < /app/schema/coredns_mysql.sql
#fi
# Start the application
poetry run python directdnsonly/main.py

4
docker/named.conf.local Normal file
View File

@@ -0,0 +1,4 @@
zone "guise.nz" {
type master;
file "/etc/named/zones/dadns/guise.nz.db";
};

View File

@@ -0,0 +1,8 @@
options {
directory "/var/cache/bind";
allow-query { any; };
recursion no;
dnssec-validation no;
listen-on { any; };
listen-on-v6 { any; };
};

98
justfile Normal file
View File

@@ -0,0 +1,98 @@
#!/usr/bin/env just --justfile
# directdnsonly — developer task runner
# Requires: just, pyenv, poetry
APP_NAME := "directdnsonly"
# Ensure pyenv shims and common install locations are on PATH so that `python`
# resolves via pyenv (.python-version) and `poetry` is found without a full
# shell init in every recipe.
export PATH := env_var("HOME") + "/.pyenv/shims:" + env_var("HOME") + "/.pyenv/bin:" + env_var("HOME") + "/.local/bin:" + env_var("PATH")
# List available recipes (default)
default:
@just --list
# ---------------------------------------------------------------------------
# Setup
# ---------------------------------------------------------------------------
# Install all dependencies (including dev group)
install:
poetry install
# Install only production dependencies
install-prod:
poetry install --only main
# Show the Python interpreter that will be used
which-python:
@poetry run python --version
@poetry run python -c "import sys; print(sys.executable)"
# ---------------------------------------------------------------------------
# Testing
# ---------------------------------------------------------------------------
# Run the full test suite
test:
poetry run pytest tests/ -v
# Run tests with terminal coverage report
coverage:
poetry run pytest tests/ -v --cov=directdnsonly --cov-report=term-missing
# Run tests with HTML coverage report (opens in browser)
coverage-html:
poetry run pytest tests/ --cov=directdnsonly --cov-report=html
@echo "Coverage report: htmlcov/index.html"
# Run a single test file or pattern, e.g. just test-one test_reconciler
test-one target:
poetry run pytest tests/ -v -k "{{target}}"
# ---------------------------------------------------------------------------
# Code quality
# ---------------------------------------------------------------------------
# Format all source and test files with black
fmt:
poetry run black directdnsonly/ tests/
# Check formatting without making changes (CI-safe)
fmt-check:
poetry run black --check directdnsonly/ tests/
# CI gate — run fmt-check then test, fail fast
ci: fmt-check test
# Start the application
run:
poetry run python -m directdnsonly
# ---------------------------------------------------------------------------
# Build
# ---------------------------------------------------------------------------
# Build a standalone binary with PyInstaller
build:
poetry run pyinstaller \
--hidden-import=json \
--hidden-import=pymysql \
--hidden-import=cheroot \
--hidden-import=cheroot.ssl.pyopenssl \
--hidden-import=cheroot.ssl.builtin \
--noconfirm --onefile \
--name=directdnsonly \
directdnsonly/main.py
rm -f *.spec
# ---------------------------------------------------------------------------
# Clean
# ---------------------------------------------------------------------------
# Remove build artefacts, caches, and compiled bytecode
clean:
rm -rf dist/ build/*.spec .coverage htmlcov/ .pytest_cache/
find . -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
find . -name "*.pyc" -delete 2>/dev/null || true

1263
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

2
poetry.toml Normal file
View File

@@ -0,0 +1,2 @@
[virtualenvs]
in-project = true

35
pyproject.toml Normal file
View File

@@ -0,0 +1,35 @@
[project]
name = "directdnsonly"
version = "1.0.9"
description = "DNS Management System - DirectAdmin to multiple backends"
authors = [
{name = "Aaron Guise",email = "aaron@guise.net.nz"}
]
license = {text = "MIT"}
readme = "README.md"
requires-python = ">=3.11,<3.14"
dependencies = [
"vyper-config (>=1.2.1,<2.0.0)",
"loguru (>=0.7.3,<0.8.0)",
"persist-queue (>=1.0.0,<2.0.0)",
"cherrypy (>=18.10.0,<19.0.0)",
"sqlalchemy (<2.0.0)",
"pymysql (>=1.1.1,<2.0.0)",
"dnspython (>=2.7.0,<3.0.0)",
"pyyaml (>=6.0.2,<7.0.0)",
"requests (>=2.32.0,<3.0.0)",
]
[tool.poetry]
package-mode = true
[tool.poetry.group.dev.dependencies]
black = "^25.1.0"
pyinstaller = "^6.13.0"
pytest = "^8.3.5"
pytest-cov = "^6.1.1"
pytest-mock = "^3.14.0"
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"

View File

@@ -1,9 +0,0 @@
cherrypy==18.6.1
pyyaml==5.3.1
python-json-logger
sqlalchemy==1.3.20
pyinstaller==4.5.1
patchelf-wrapper
staticx
pyopenssl
persistqueue

12
schema/coredns_mysql.sql Normal file
View File

@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS `records` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`zone` varchar(255) NOT NULL,
`name` varchar(255) NOT NULL,
`ttl` int(11) DEFAULT NULL,
`type` varchar(10) NOT NULL,
`data` text NOT NULL,
PRIMARY KEY (`id`),
KEY `idx_zone` (`zone`),
KEY `idx_name` (`name`),
KEY `idx_type` (`type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

View File

@@ -1,8 +0,0 @@
environment: ''
proxy_support: true
log_level: debug
log_to: file
proxy_support_base: http://127.0.0.1
server_port: 2222
token_valid_for_days: 30
timezone: Pacific/Auckland

View File

@@ -1,16 +0,0 @@
---
# Port that the server will run on.
server_port: 8080
# Turn Proxy support on. True if behind Traefik/Nginx for example
proxy_support: True
# Proxy support Base URI
proxy_support_base: http://127.0.0.1
# Values accepted are '', production, staging
environment: ''
api_keys:
- key: ZU7f3NogDxIzhfW5tsv9
name: super # Name of user
expires: never # Expiry never for superuser
- key: DhCIZ5yKjVN7ReKVh6Tl
name: jaydeep
expires: 21/11/2020, 10:56:24

File diff suppressed because it is too large Load Diff

View File

@@ -1,67 +0,0 @@
import re
from sqlalchemy import create_engine
def connect(hostname, sid, username, password, port=1521) -> object:
host = hostname
port = port
sid = sid
user = username
password = password
sid = cx_Oracle.makedsn(host, port, sid=sid)
connection_string = 'oracle://{user}:{password}@{sid}'.format(
user=user,
password=password,
sid=sid
)
engine = create_engine(
connection_string,
convert_unicode=False,
pool_recycle=10,
pool_size=50,
)
return engine
def make_address(flt_num: str, street_name: str, area: str, address_string: str) -> str:
"""
Take input parameters from GIS Data and returns Address as string
:rtype: str
"""
street_number = nv(flt_num).upper().strip() # Make the street number upper case and strip whitespace
street_name = nv(street_name).upper().strip() # Make the street name upper case and strip whitespace
town_name = nv(area)
if area != 'None' and \
area is not None and \
area != '':
town_name = nv(area).upper().strip() # Make the area upper case and strip whitespace
else:
town_name = nv(lookup_town_in_string(address_string)) # Get the town from address string
# Assemble the address string
full_address = street_number + " " + street_name + " " + town_name
return full_address
def lookup_town_in_string(address: str) -> str:
for p in street_type_lookup():
first_word = r"^(\w+)\s?((?!\\1)([\w]+)?)(?:\s+[\d]{4})" # Return First Words
try:
f = address.index(p)
size = len(p)
if f is not None:
m = re.search(first_word, address[f + size::].strip())
if m.group(1) is not None and m.group(2) is not None:
if m.group(1) != m.group(2):
return m.group(1) + ' ' + m.group(2)
else:
return m.group(1)
elif m.group(1) is not None and m.group(2) is None:
return m.group(1)
except ValueError:
pass

View File

@@ -1,370 +0,0 @@
import cherrypy
from cherrypy import request
from pythonjsonlogger import jsonlogger
from persistqueue import Queue, Empty
import logging
from logging.handlers import TimedRotatingFileHandler
import os
import subprocess
import time
import sys
import yaml
import threading
import lib.common
import lib.db
import lib.db.models
import urllib.parse
class DaDNS(object):
@cherrypy.expose
def CMD_API_LOGIN_TEST(self):
return urllib.parse.urlencode({'error': 0,
'text': 'Login OK'})
@cherrypy.expose
def CMD_API_DNS_ADMIN(self):
applog.debug('Processing Method: '.format(request.method))
if request.method == 'POST':
action = request.params.get('action')
applog.debug('Action received via querystring: {}'.format(action))
body = str(request.body.read(), 'utf-8')
decoded_params = None
if action is None:
applog.debug('Action was not specified, check body')
decoded_params = decode_params(str(body))
applog.debug('Parameters decoded: {}'.format(decoded_params))
action = decoded_params['action']
zone_file = body
applog.debug(zone_file)
if action == 'delete':
# TODO: Support multiple domain deletion
# Domain is being removed from the DNS
queue_item('delete', {'hostname': decoded_params['hostname'],
'domain': decoded_params['select0']})
return urllib.parse.urlencode({'error': 0})
if action == 'rawsave':
# DirectAdmin wants to add/update a domain
queue_item('save', {'hostname': request.params.get('hostname'),
'username': request.params.get('username'),
'domain': request.params.get('domain'),
'zone_file': zone_file})
applog.info('Enqueued {} request for {}'.format('save', request.params.get('domain')))
return urllib.parse.urlencode({'error': 0})
elif request.method == 'GET':
applog.debug('Action Type: ' + request.params.get('action'))
action = request.params.get('action')
check_parent = bool(request.params.get('check_for_parent_domain'))
if action == 'exists' and check_parent:
domain_result = check_zone_exists(request.params.get('domain'))
applog.debug('Domain result: {}'.format(domain_result))
parent_result = check_parent_domain_owner(request.params.get('domain'))
applog.debug('Domain result: {}'.format(domain_result))
if not domain_result and not parent_result:
return urllib.parse.urlencode({'error': 0,
'exists': 0})
elif domain_result:
domain_record = session.query(lib.db.models.Domain).filter_by(
domain=request.params.get('domain')).one()
return urllib.parse.urlencode({'error': 0,
'exists': 1,
'details': 'Domain exists on {}'
.format(domain_record.hostname)
})
elif parent_result:
parent_domain = ".".join(request.params.get('domain').split('.')[1:])
domain_record = session.query(lib.db.models.Domain).filter_by(
domain=parent_domain).one()
return urllib.parse.urlencode({'error': 0,
'exists': 2,
'details': 'Parent Domain exists on {}'
.format(domain_record.hostname)
})
elif action == 'exists':
# DirectAdmin is checking whether the domain is in the cluster
if check_zone_exists(request.params.get('domain')):
domain_record = session.query(lib.db.models.Domain).filter_by(
domain=request.params.get('domain')).one()
return urllib.parse.urlencode({'error': 0,
'exists': 1,
'details': 'Domain exists on {}'
.format(domain_record.hostname)
})
else:
return urllib.parse.urlencode({'exists': 0})
def put_zone_index(zone_name, host_name, user_name):
# add a new zone to index
applog.debug('Placed zone into database.. {}'.format(str(zone_name)))
domain = lib.db.models.Domain(domain=zone_name, hostname=host_name, username=user_name)
session.add(domain)
session.commit()
def queue_item(action, data=None):
data = {'payload': data}
if action == 'save':
save_queue.put(data)
elif action == 'delete':
delete_queue.put(data)
def delete_zone_file(zone_name):
# Delete the zone file
applog.debug('Zone Name for delete: ' + zone_name)
os.remove(zones_dir + '/' + zone_name + '.db')
applog.debug('Zone deleted: {}'.format(zones_dir + '/' + zone_name + '.db'))
def write_zone_file(zone_name, data):
# Write the zone to file
applog.debug('Zone Name for write: ' + zone_name)
applog.debug('Zone file to write: \n' + data)
with open(zones_dir + '/' + zone_name + '.db', 'w') as f:
f.write(data)
applog.debug('Zone written to {}'.format(zones_dir + '/' + zone_name + '.db'))
def write_named_include():
applog.debug('Rewrite named zone include...')
domains = session.query(lib.db.models.Domain).all()
with open(named_conf, 'w') as f:
for domain in domains:
applog.debug('Writing zone {} to named.config'.format(domain.domain))
f.write('zone "' + domain.domain
+ '" { type master; file "' + zones_dir + '/'
+ domain.domain + '.db"; };\n')
def check_parent_domain_owner(zone_name):
applog.debug('Checking if {} exists in the DB'.format(zone_name))
# check try to find domain name
parent_domain = ".".join(zone_name.split('.')[1:])
domain_exists = session.query(session.query(lib.db.models.Domain).filter_by(domain=parent_domain).exists()).scalar()
if domain_exists:
# domain exists in the db
applog.debug('{} exists in db'.format(parent_domain))
domain_record = session.query(lib.db.models.Domain).filter_by(domain=parent_domain).one()
applog.debug(str(domain_record))
return True
else:
return False
def reconfigure_nameserver():
env = dict(os.environ) # make a copy of the environment
lp_key = 'LD_LIBRARY_PATH' # for Linux and *BSD
lp_orig = env.get(lp_key + '_ORIG') # pyinstaller >= 20160820
if lp_orig is not None:
env[lp_key] = lp_orig # restore the original
else:
env.pop(lp_key, None) # last resort: remove the env var
reconfigure = subprocess.run(['rndc', 'reconfig'],
capture_output=True,
universal_newlines=True,
env=env)
applog.debug("Stdout: {}".format(reconfigure.stdout))
applog.info('Reloaded bind')
def reload_nameserver(zone=None):
# Workaround for LD_LIBRARY_PATH/ LIBPATH issues
#
env = dict(os.environ) # make a copy of the environment
lp_key = 'LD_LIBRARY_PATH' # for Linux and *BSD
lp_orig = env.get(lp_key + '_ORIG') # pyinstaller >= 20160820
if lp_orig is not None:
env[lp_key] = lp_orig # restore the original
else:
env.pop(lp_key, None) # last resort: remove the env var
if zone is not None:
reload = subprocess.run(['rndc', 'reload', zone],
capture_output=True,
universal_newlines=True,
env=env)
applog.debug("Stdout: {}".format(reload.stdout))
applog.info('Reloaded bind for {}'.format(zone))
else:
reload = subprocess.run(['rndc', 'reload'],
capture_output=True,
universal_newlines=True,
env=env)
applog.debug("Stdout: {}".format(reload.stdout))
applog.info('Reloaded bind')
def check_zone_exists(zone_name):
# Check if zone is present in the index
applog.debug('Checking if {} is present in the DB'.format(zone_name))
domain_exists = bool(session.query(lib.db.models.Domain.id).filter_by(domain=zone_name).first())
applog.debug('Returned from query: {}'.format(domain_exists))
if domain_exists:
return True
else:
return False
def decode_params(payload):
from urllib.parse import parse_qs
response = parse_qs(payload)
params = dict()
for key, val in response.items():
params[key] = val[0]
return params
def background_thread(worker_type):
if worker_type == 'save':
applog.debug('Started worker thread for save action')
while True:
try:
item = save_queue.get(block=True, timeout=10)
data = item['payload']
applog.info('Processing save from queue for {}'.format(data['domain']))
applog.debug('Domain name to check: ' + data['domain'])
applog.debug('Does zone exist? ' + str(check_zone_exists(str(data['domain']))))
if not check_zone_exists(str(data['domain'])):
applog.debug('Zone is not present in db')
put_zone_index(str(data['domain']), str(data['hostname']), str(data['username']))
write_zone_file(str(data['domain']), data['zone_file'])
write_named_include()
reconfigure_nameserver()
reload_nameserver(str(data['domain']))
else:
# Domain already exists
applog.debug('Zone is present in db')
write_zone_file(str(data['domain']), data['zone_file'])
write_named_include()
reload_nameserver(str(data['domain']))
save_queue.task_done()
except Empty:
# Queue is empty
applog.debug('Save queue is empty')
elif worker_type == 'delete':
applog.debug('Started worker thread for delete action')
while True:
try:
item = delete_queue.get(block=True, timeout=10)
data = item['payload']
applog.info('Processing deletion from queue for {}'.format(data['domain']))
record = session.query(lib.db.models.Domain).filter_by(domain=data['domain']).one()
if record.hostname == data['hostname']:
applog.debug('Hostname matches the original host {}: Delete is allowed'.format(data['domain']))
session.delete(record)
session.commit()
applog.info('{} deleted from database'.format(data['domain']))
delete_zone_file(data['domain'])
write_named_include()
reload_nameserver()
delete_queue.task_done()
time.sleep(5)
except Empty:
# Queue is empty
applog.debug('Delete queue is empty')
except Exception as e:
applog.error(e)
def setup_logging():
os.environ['TZ'] = config['timezone']
time.tzset()
_applog = logging.getLogger()
_applog.setLevel(level=getattr(logging, config['log_level'].upper()))
if config['log_to'] == 'stdout':
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level=getattr(logging, config['log_level'].upper()))
formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(levelname)s %(message)s'
)
handler.setFormatter(formatter)
_applog.addHandler(handler)
elif config['log_to'] == 'file':
handler = TimedRotatingFileHandler(config['log_path'],
when='midnight',
backupCount=10)
handler.setLevel(level=getattr(logging, config['log_level'].upper()))
formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(levelname)s %(message)s'
)
handler.setFormatter(formatter)
_applog.addHandler(handler)
return _applog
if __name__ == '__main__':
app_version = "1.0.9"
if os.path.isfile("/lib/x86_64-linux-gnu/" + "libgcc_s.so.1"):
# Load local library
libgcc_s = ctypes.cdll.LoadLibrary("/lib/x86_64-linux-gnu/" + "libgcc_s.so.1")
# We are about to start our application
with open(r'conf/app.yml') as config_file:
config = yaml.load(config_file, Loader=yaml.SafeLoader)
applog = setup_logging()
applog.info('DirectDNS Starting')
applog.info('Timezone is {}'.format(config['timezone']))
applog.info('Get Database Connection')
session = lib.db.connect(config['db_location'])
applog.info('Database Connected!')
zones_dir = "/etc/named/directdnsonly"
named_conf = "/etc/named/directdnsonly.inc"
save_queue = Queue(config['queue_location'] + '/rawsave')
save_thread = threading.Thread(target=background_thread, args=('save',))
save_thread.daemon = True # Daemonize thread
save_thread.start() # Start the execution
delete_queue = Queue(config['queue_location'] + '/delete')
delete_thread = threading.Thread(target=background_thread, args=('delete',))
delete_thread.daemon = True # Daemonize thread
delete_thread.start() # Start the execution
cherrypy.__version__ = ''
cherrypy._cperror._HTTPErrorTemplate = cherrypy._cperror._HTTPErrorTemplate.replace(
'Powered by <a href="http://www.cherrypy.org">CherryPy %(version)s</a>\n', '%(version)s')
user_password_dict = {'test': 'test'}
check_password = cherrypy.lib.auth_basic.checkpassword_dict(user_password_dict)
cherrypy.config.update({
'server.socket_host': '0.0.0.0',
'server.socket_port': config['server_port'],
'tools.proxy.on': config['proxy_support'],
'tools.proxy.base': config['proxy_support_base'],
'tools.auth_basic.on': True,
'tools.auth_basic.realm': 'dadns',
'tools.auth_basic.checkpassword': check_password,
'tools.response_headers.on': True,
'tools.response_headers.headers': [('Server', 'DirectDNS v' + app_version)],
'environment': config['environment']
})
if bool(config['ssl_enable']):
cherrypy.config.update({
'server.ssl_module': 'builtin',
'server.ssl_certificate': config['ssl_cert'],
'server.ssl_private_key': config['ssl_key'],
'server.ssl_certificate_chain': config['ssl_bundle']
})
# cherrypy.log.error_log.propagate = False
if config['log_level'].upper() != 'DEBUG':
cherrypy.log.access_log.propagate = False
if not lib.common.check_if_super_user_exists(session):
password_str = lib.common.get_random_string(35)
applog.info('Creating superuser account: {}'.format('super'))
applog.info('Password: {}'.format(password_str))
superuser = lib.db.models.Key(key=password_str, name='super', service='*')
session.add(superuser)
session.commit()
else:
applog.info('Superuser account already exists: skipping creation')
cherrypy.quickstart(DaDNS())

View File

@@ -1,18 +0,0 @@
import random
import string
import lib.db.models
def check_if_super_user_exists(session):
exists = session.query(session.query(lib.db.models.Key).filter_by(name='super').exists()).scalar()
return exists
def check_if_domain_exists(session):
pass
def get_random_string(length):
letters_and_digits = string.ascii_letters + string.digits
result_str = ''.join(random.choice(letters_and_digits) for i in range(length))
return result_str

View File

@@ -1,15 +0,0 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
import datetime
Base = declarative_base()
def connect(db_location):
# Start SQLite engine
engine = create_engine('sqlite:///' + db_location, connect_args={'check_same_thread': False})
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
return session

View File

@@ -1,6 +0,0 @@
import yaml
data = {'ZU7f3NogDxIzhfW5tsv9' : {'name': 'super',
'expires': '11/21/2020, 10:56:24'}}
print(yaml.dump(data))

View File

@@ -1,18 +0,0 @@
import datetime
import lib.common
import lib.db
import lib.db.models
new_expiry_date = datetime.datetime.now() + datetime.timedelta(int(10))
session = lib.db.connect()
if not lib.common.check_if_super_user_exists(session):
password_str = lib.common.get_random_string(20)
print('Creating superuser account: {}'.format('super'))
print('Password: {}'.format(password_str))
super = lib.db.models.Key(key=password_str, name='super', service='*')
session.add(super)
session.commit()
else:
print('Superuser account already exists: skipping creation')

40
tests/conftest.py Normal file
View File

@@ -0,0 +1,40 @@
"""Shared test fixtures for directdnsonly test suite."""
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from directdnsonly.app.db import Base
from directdnsonly.app.db.models import (
Domain,
Key,
) # noqa: F401 — registers models with Base
@pytest.fixture
def engine():
eng = create_engine("sqlite:///:memory:")
Base.metadata.create_all(eng)
yield eng
eng.dispose()
@pytest.fixture
def db_session(engine):
session = sessionmaker(bind=engine)()
yield session
session.close()
@pytest.fixture
def patch_connect(db_session, monkeypatch):
"""Patch connect() at every call-site, returning the shared test session.
Modules that import connect() directly (e.g. utils, reconciler) are
patched at their local name so the in-memory SQLite session is used
instead of trying to read from vyper config.
"""
_factory = lambda: db_session # noqa: E731
monkeypatch.setattr("directdnsonly.app.utils.connect", _factory)
monkeypatch.setattr("directdnsonly.app.reconciler.connect", _factory)
return db_session

219
tests/test_admin_api.py Normal file
View File

@@ -0,0 +1,219 @@
"""Tests for directdnsonly.app.api.admin — DNSAdminAPI handler methods."""
import pytest
from unittest.mock import MagicMock, patch
from urllib.parse import parse_qs
import cherrypy
from directdnsonly.app.api.admin import DNSAdminAPI
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def save_queue():
return MagicMock()
@pytest.fixture
def delete_queue():
return MagicMock()
@pytest.fixture
def api(save_queue, delete_queue):
return DNSAdminAPI(save_queue, delete_queue, backend_registry=MagicMock())
# ---------------------------------------------------------------------------
# CMD_API_LOGIN_TEST
# ---------------------------------------------------------------------------
def test_login_test_returns_success(api):
result = api.CMD_API_LOGIN_TEST()
parsed = parse_qs(result)
assert parsed["error"] == ["0"]
assert parsed["text"] == ["Login OK"]
# ---------------------------------------------------------------------------
# _handle_exists — GET action=exists
# ---------------------------------------------------------------------------
def test_handle_exists_missing_domain_returns_error(api):
with patch.object(cherrypy, "response", MagicMock()):
result = api._handle_exists({"action": "exists"})
parsed = parse_qs(result)
assert parsed["error"] == ["1"]
def test_handle_exists_unsupported_action_returns_error(api):
with patch.object(cherrypy, "response", MagicMock()):
result = api._handle_exists({"action": "rawsave"})
parsed = parse_qs(result)
assert parsed["error"] == ["1"]
def test_handle_exists_domain_not_found(api):
with (
patch("directdnsonly.app.api.admin.check_zone_exists", return_value=False),
patch(
"directdnsonly.app.api.admin.check_parent_domain_owner", return_value=False
),
):
result = api._handle_exists({"action": "exists", "domain": "example.com"})
parsed = parse_qs(result)
assert parsed["error"] == ["0"]
assert parsed["exists"] == ["0"]
def test_handle_exists_domain_found(api):
record = MagicMock()
record.hostname = "da1.example.com"
with (
patch("directdnsonly.app.api.admin.check_zone_exists", return_value=True),
patch("directdnsonly.app.api.admin.get_domain_record", return_value=record),
):
result = api._handle_exists({"action": "exists", "domain": "example.com"})
parsed = parse_qs(result)
assert parsed["error"] == ["0"]
assert parsed["exists"] == ["1"]
assert "da1.example.com" in parsed["details"][0]
def test_handle_exists_parent_found(api):
parent = MagicMock()
parent.hostname = "da2.example.com"
with (
patch("directdnsonly.app.api.admin.check_zone_exists", return_value=False),
patch(
"directdnsonly.app.api.admin.check_parent_domain_owner", return_value=True
),
patch(
"directdnsonly.app.api.admin.get_parent_domain_record", return_value=parent
),
):
result = api._handle_exists(
{
"action": "exists",
"domain": "sub.example.com",
"check_for_parent_domain": "1",
}
)
parsed = parse_qs(result)
assert parsed["error"] == ["0"]
assert parsed["exists"] == ["2"]
assert "da2.example.com" in parsed["details"][0]
def test_handle_exists_no_parent_check_when_flag_absent(api):
"""check_parent_domain_owner should not be called if flag not set."""
record = MagicMock()
record.hostname = "da1.example.com"
with (
patch("directdnsonly.app.api.admin.check_zone_exists", return_value=True),
patch("directdnsonly.app.api.admin.check_parent_domain_owner") as mock_parent,
patch("directdnsonly.app.api.admin.get_domain_record", return_value=record),
):
api._handle_exists({"action": "exists", "domain": "example.com"})
mock_parent.assert_not_called()
# ---------------------------------------------------------------------------
# _handle_rawsave
# ---------------------------------------------------------------------------
SAMPLE_ZONE = "$ORIGIN example.com.\n$TTL 300\nexample.com. 300 IN A 1.2.3.4\n"
def test_rawsave_enqueues_item(api, save_queue):
with (
patch(
"directdnsonly.app.api.admin.validate_and_normalize_zone",
return_value=SAMPLE_ZONE,
),
patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))),
):
result = api._handle_rawsave(
"example.com",
{
"zone_file": SAMPLE_ZONE,
"hostname": "da1.example.com",
"username": "admin",
},
)
save_queue.put.assert_called_once()
item = save_queue.put.call_args[0][0]
assert item["domain"] == "example.com"
assert item["hostname"] == "da1.example.com"
assert item["username"] == "admin"
assert item["client_ip"] == "127.0.0.1"
parsed = parse_qs(result)
assert parsed["error"] == ["0"]
def test_rawsave_missing_zone_file_raises(api):
with patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))):
with pytest.raises(ValueError, match="Missing zone file"):
api._handle_rawsave("example.com", {})
def test_rawsave_invalid_zone_raises(api):
with (
patch(
"directdnsonly.app.api.admin.validate_and_normalize_zone",
side_effect=ValueError("Invalid zone data: bad record"),
),
patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))),
):
with pytest.raises(ValueError, match="Invalid zone data"):
api._handle_rawsave("example.com", {"zone_file": "garbage"})
# ---------------------------------------------------------------------------
# _handle_delete
# ---------------------------------------------------------------------------
def test_delete_enqueues_item(api, delete_queue):
with patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="10.0.0.1"))):
result = api._handle_delete(
"example.com",
{
"hostname": "da1.example.com",
"username": "admin",
},
)
delete_queue.put.assert_called_once()
item = delete_queue.put.call_args[0][0]
assert item["domain"] == "example.com"
assert item["hostname"] == "da1.example.com"
assert item["client_ip"] == "10.0.0.1"
parsed = parse_qs(result)
assert parsed["error"] == ["0"]
def test_delete_missing_params_uses_empty_strings(api, delete_queue):
with patch.object(cherrypy, "request", MagicMock(remote=MagicMock(ip="127.0.0.1"))):
api._handle_delete("example.com", {})
item = delete_queue.put.call_args[0][0]
assert item["hostname"] == ""
assert item["username"] == ""

167
tests/test_coredns_mysql.py Normal file
View File

@@ -0,0 +1,167 @@
"""Tests for the CoreDNS MySQL backend (run against in-memory SQLite)."""
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from directdnsonly.app.backends.coredns_mysql import (
Base,
CoreDNSMySQLBackend,
Record,
Zone,
)
# ---------------------------------------------------------------------------
# Fixture — in-memory SQLite backend (bypasses real MySQL connection)
# ---------------------------------------------------------------------------
@pytest.fixture
def mysql_backend():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
class _TestBackend(CoreDNSMySQLBackend):
def __init__(self):
# Manually initialise without triggering the MySQL create_engine call
self.config = {}
self.instance_name = "test"
self.engine = engine
self.Session = scoped_session(sessionmaker(bind=engine))
yield _TestBackend()
engine.dispose()
# ---------------------------------------------------------------------------
# write_zone / zone_exists / delete_zone
# ---------------------------------------------------------------------------
ZONE_DATA = """\
$ORIGIN example.com.
$TTL 300
example.com. 300 IN SOA ns.example.com. admin.example.com. (2023 3600 1800 604800 86400)
example.com. 300 IN A 192.0.2.1
"""
def test_write_zone_creates_zone(mysql_backend):
assert mysql_backend.write_zone("example.com", ZONE_DATA)
def test_zone_exists_after_write(mysql_backend):
mysql_backend.write_zone("example.com", ZONE_DATA)
assert mysql_backend.zone_exists("example.com")
def test_zone_does_not_exist_before_write(mysql_backend):
assert not mysql_backend.zone_exists("missing.com")
def test_write_zone_idempotent(mysql_backend):
assert mysql_backend.write_zone("example.com", ZONE_DATA)
assert mysql_backend.write_zone("example.com", ZONE_DATA)
def test_write_zone_updates_records(mysql_backend):
mysql_backend.write_zone("example.com", ZONE_DATA)
updated = """\
$ORIGIN example.com.
$TTL 300
example.com. 3600 IN A 192.0.2.1
example.com. 300 IN AAAA 2001:db8::1
"""
assert mysql_backend.write_zone("example.com", updated)
def test_write_zone_removes_stale_records(mysql_backend):
mysql_backend.write_zone("example.com", ZONE_DATA)
reduced = "example.com. 300 IN A 192.0.2.1"
mysql_backend.write_zone("example.com", reduced)
session = mysql_backend.Session()
zone = session.query(Zone).filter_by(zone_name="example.com.").first()
records = session.query(Record).filter_by(zone_id=zone.id, type="AAAA").all()
assert records == []
session.close()
def test_delete_zone_removes_zone_and_records(mysql_backend):
mysql_backend.write_zone("example.com", ZONE_DATA)
assert mysql_backend.delete_zone("example.com")
assert not mysql_backend.zone_exists("example.com")
def test_delete_nonexistent_zone_returns_false(mysql_backend):
assert not mysql_backend.delete_zone("ghost.com")
def test_reload_zone_returns_true(mysql_backend):
assert mysql_backend.reload_zone("example.com")
assert mysql_backend.reload_zone()
# ---------------------------------------------------------------------------
# verify_zone_record_count
# ---------------------------------------------------------------------------
def test_verify_zone_record_count_match(mysql_backend):
mysql_backend.write_zone("example.com", ZONE_DATA)
# SOA + A = 2 records total
matches, count = mysql_backend.verify_zone_record_count("example.com", 2)
assert matches
assert count == 2
def test_verify_zone_record_count_mismatch(mysql_backend):
mysql_backend.write_zone("example.com", ZONE_DATA)
matches, count = mysql_backend.verify_zone_record_count("example.com", 99)
assert not matches
assert count == 2
def test_verify_zone_record_count_missing_zone(mysql_backend):
matches, count = mysql_backend.verify_zone_record_count("ghost.com", 0)
assert not matches
assert count == 0
# ---------------------------------------------------------------------------
# reconcile_zone_records
# ---------------------------------------------------------------------------
def test_reconcile_removes_extra_records(mysql_backend):
mysql_backend.write_zone("example.com", ZONE_DATA)
# Inject a phantom record directly into the DB
session = mysql_backend.Session()
zone = session.query(Zone).filter_by(zone_name="example.com.").first()
session.add(
Record(
zone_id=zone.id,
hostname="phantom",
type="A",
data="10.0.0.99",
ttl=300,
online=True,
)
)
session.commit()
session.close()
success, removed = mysql_backend.reconcile_zone_records("example.com", ZONE_DATA)
assert success
assert removed == 1
def test_reconcile_no_changes_when_zone_matches(mysql_backend):
mysql_backend.write_zone("example.com", ZONE_DATA)
success, removed = mysql_backend.reconcile_zone_records("example.com", ZONE_DATA)
assert success
assert removed == 0

299
tests/test_reconciler.py Normal file
View File

@@ -0,0 +1,299 @@
"""Tests for directdnsonly.app.reconciler — ReconciliationWorker."""
import pytest
import requests.exceptions
from queue import Queue
from unittest.mock import MagicMock, patch
from directdnsonly.app.reconciler import ReconciliationWorker
from directdnsonly.app.db.models import Domain
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
SERVER = {
"hostname": "da1.example.com",
"port": 2222,
"username": "admin",
"password": "secret",
"ssl": True,
}
BASE_CONFIG = {
"enabled": True,
"dry_run": False,
"interval_minutes": 60,
"verify_ssl": True,
"ipp": 100,
"directadmin_servers": [SERVER],
}
@pytest.fixture
def delete_queue():
return Queue()
@pytest.fixture
def worker(delete_queue):
return ReconciliationWorker(delete_queue, BASE_CONFIG)
@pytest.fixture
def dry_run_worker(delete_queue):
cfg = {**BASE_CONFIG, "dry_run": True}
return ReconciliationWorker(delete_queue, cfg)
# ---------------------------------------------------------------------------
# _reconcile_all — orphan detection
# ---------------------------------------------------------------------------
def test_orphan_queued_when_domain_missing_from_da(worker, delete_queue, patch_connect):
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value=set()):
worker._reconcile_all()
assert not delete_queue.empty()
item = delete_queue.get_nowait()
assert item["domain"] == "orphan.com"
assert item["source"] == "reconciler"
def test_orphan_not_queued_in_dry_run(dry_run_worker, delete_queue, patch_connect):
patch_connect.add(
Domain(domain="orphan.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with patch.object(dry_run_worker, "_fetch_da_domains", return_value=set()):
dry_run_worker._reconcile_all()
assert delete_queue.empty()
def test_orphan_not_queued_for_unknown_server(worker, delete_queue, patch_connect):
"""Domains whose recorded master is NOT in our configured servers are skipped."""
patch_connect.add(
Domain(domain="other.com", hostname="da99.unknown.com", username="admin")
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value=set()):
worker._reconcile_all()
assert delete_queue.empty()
def test_active_domain_not_queued(worker, delete_queue, patch_connect):
patch_connect.add(
Domain(domain="good.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value={"good.com"}):
worker._reconcile_all()
assert delete_queue.empty()
# ---------------------------------------------------------------------------
# _reconcile_all — hostname backfill and migration
# ---------------------------------------------------------------------------
def test_backfill_null_hostname(worker, patch_connect):
patch_connect.add(Domain(domain="backfill.com", hostname=None, username="admin"))
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value={"backfill.com"}):
worker._reconcile_all()
record = patch_connect.query(Domain).filter_by(domain="backfill.com").first()
assert record.hostname == "da1.example.com"
def test_migration_updates_hostname(worker, patch_connect):
patch_connect.add(
Domain(domain="moved.com", hostname="da-old.example.com", username="admin")
)
patch_connect.commit()
with patch.object(worker, "_fetch_da_domains", return_value={"moved.com"}):
worker._reconcile_all()
record = patch_connect.query(Domain).filter_by(domain="moved.com").first()
assert record.hostname == "da1.example.com"
def test_dry_run_still_backfills(dry_run_worker, patch_connect):
"""Backfill is a data-repair operation, applied even in dry-run mode."""
patch_connect.add(Domain(domain="fill.com", hostname=None, username="admin"))
patch_connect.commit()
with patch.object(dry_run_worker, "_fetch_da_domains", return_value={"fill.com"}):
dry_run_worker._reconcile_all()
record = patch_connect.query(Domain).filter_by(domain="fill.com").first()
assert record.hostname == "da1.example.com"
# ---------------------------------------------------------------------------
# _fetch_da_domains — HTTP handling
# ---------------------------------------------------------------------------
def _make_json_response(domains_dict, total_pages=1):
"""Return a mock requests.Response with JSON payload matching DA format."""
data = {str(i): {"domain": d} for i, d in enumerate(domains_dict)}
data["info"] = {"total_pages": total_pages}
mock = MagicMock()
mock.status_code = 200
mock.is_redirect = False
mock.headers = {"Content-Type": "application/json"}
mock.json.return_value = data
mock.raise_for_status = MagicMock()
return mock
def test_fetch_returns_domains_from_json(worker):
mock_resp = _make_json_response(["example.com", "test.com"])
with patch("requests.get", return_value=mock_resp):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result == {"example.com", "test.com"}
def test_fetch_paginates(worker):
page1 = _make_json_response(["a.com"], total_pages=2)
page2 = _make_json_response(["b.com"], total_pages=2)
with patch("requests.get", side_effect=[page1, page2]):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result == {"a.com", "b.com"}
def test_fetch_redirect_triggers_session_login(worker):
redirect_resp = MagicMock()
redirect_resp.status_code = 302
redirect_resp.is_redirect = True
with (
patch("requests.get", return_value=redirect_resp),
patch.object(worker, "_da_session_login", return_value=None),
):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
def test_fetch_html_response_returns_none(worker):
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.is_redirect = False
mock_resp.headers = {"Content-Type": "text/html; charset=utf-8"}
mock_resp.raise_for_status = MagicMock()
with patch("requests.get", return_value=mock_resp):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
def test_fetch_connection_error_returns_none(worker):
with patch(
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
def test_fetch_timeout_returns_none(worker):
with patch("requests.get", side_effect=requests.exceptions.Timeout()):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
def test_fetch_ssl_error_returns_none(worker):
with patch(
"requests.get", side_effect=requests.exceptions.SSLError("cert verify failed")
):
result = worker._fetch_da_domains(
"da1.example.com", 2222, "admin", "secret", True
)
assert result is None
# ---------------------------------------------------------------------------
# _parse_da_domain_list — legacy format fallback
# ---------------------------------------------------------------------------
def test_parse_standard_querystring():
body = "list[]=example.com&list[]=test.com"
result = ReconciliationWorker._parse_da_domain_list(body)
assert result == {"example.com", "test.com"}
def test_parse_newline_separated():
body = "list[]=example.com\nlist[]=test.com"
result = ReconciliationWorker._parse_da_domain_list(body)
assert result == {"example.com", "test.com"}
def test_parse_empty_body_returns_empty_set():
assert ReconciliationWorker._parse_da_domain_list("") == set()
def test_parse_normalises_to_lowercase():
result = ReconciliationWorker._parse_da_domain_list("list[]=EXAMPLE.COM")
assert "example.com" in result
assert "EXAMPLE.COM" not in result
def test_parse_strips_whitespace():
result = ReconciliationWorker._parse_da_domain_list("list[]= example.com ")
assert "example.com" in result
# ---------------------------------------------------------------------------
# Worker lifecycle
# ---------------------------------------------------------------------------
def test_disabled_worker_does_not_start(delete_queue):
cfg = {**BASE_CONFIG, "enabled": False}
w = ReconciliationWorker(delete_queue, cfg)
w.start()
assert not w.is_alive
def test_no_servers_does_not_start(delete_queue):
cfg = {**BASE_CONFIG, "directadmin_servers": []}
w = ReconciliationWorker(delete_queue, cfg)
w.start()
assert not w.is_alive

138
tests/test_utils.py Normal file
View File

@@ -0,0 +1,138 @@
"""Tests for directdnsonly.app.utils — zone index helper functions."""
import pytest
from directdnsonly.app.db.models import Domain
from directdnsonly.app.utils import (
check_zone_exists,
check_parent_domain_owner,
get_domain_record,
get_parent_domain_record,
put_zone_index,
)
# ---------------------------------------------------------------------------
# check_zone_exists
# ---------------------------------------------------------------------------
def test_check_zone_exists_not_found(patch_connect):
assert check_zone_exists("example.com") is False
def test_check_zone_exists_found(patch_connect):
patch_connect.add(
Domain(domain="example.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
assert check_zone_exists("example.com") is True
def test_check_zone_exists_does_not_match_partial(patch_connect):
patch_connect.add(
Domain(domain="example.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
assert check_zone_exists("sub.example.com") is False
# ---------------------------------------------------------------------------
# put_zone_index
# ---------------------------------------------------------------------------
def test_put_zone_index_adds_record(patch_connect):
put_zone_index("new.com", "da1.example.com", "admin")
record = patch_connect.query(Domain).filter_by(domain="new.com").first()
assert record is not None
assert record.hostname == "da1.example.com"
assert record.username == "admin"
def test_put_zone_index_stores_domain_name(patch_connect):
put_zone_index("another.nz", "da2.example.com", "user1")
assert check_zone_exists("another.nz") is True
# ---------------------------------------------------------------------------
# get_domain_record
# ---------------------------------------------------------------------------
def test_get_domain_record_returns_none_when_missing(patch_connect):
assert get_domain_record("missing.com") is None
def test_get_domain_record_returns_record(patch_connect):
patch_connect.add(
Domain(domain="found.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
record = get_domain_record("found.com")
assert record is not None
assert record.domain == "found.com"
assert record.hostname == "da1.example.com"
# ---------------------------------------------------------------------------
# check_parent_domain_owner
# ---------------------------------------------------------------------------
def test_check_parent_domain_owner_not_found(patch_connect):
assert check_parent_domain_owner("sub.example.com") is False
def test_check_parent_domain_owner_found(patch_connect):
patch_connect.add(
Domain(domain="example.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
assert check_parent_domain_owner("sub.example.com") is True
def test_check_parent_domain_owner_single_label_returns_false(patch_connect):
# A single-label name like "com" has no parent
assert check_parent_domain_owner("com") is False
def test_check_parent_domain_owner_ignores_grandparent(patch_connect):
# Only the immediate parent is checked, not grandparents
patch_connect.add(
Domain(domain="example.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
# deep.sub.example.com's immediate parent is sub.example.com (not in DB)
assert check_parent_domain_owner("deep.sub.example.com") is False
# ---------------------------------------------------------------------------
# get_parent_domain_record
# ---------------------------------------------------------------------------
def test_get_parent_domain_record_returns_none_when_missing(patch_connect):
assert get_parent_domain_record("sub.example.com") is None
def test_get_parent_domain_record_returns_parent(patch_connect):
patch_connect.add(
Domain(domain="example.com", hostname="da1.example.com", username="admin")
)
patch_connect.commit()
parent = get_parent_domain_record("sub.example.com")
assert parent is not None
assert parent.domain == "example.com"
def test_get_parent_domain_record_single_label_returns_none(patch_connect):
assert get_parent_domain_record("com") is None

101
tests/test_zone_parser.py Normal file
View File

@@ -0,0 +1,101 @@
"""Tests for directdnsonly.app.utils.zone_parser."""
import pytest
from dns.exception import DNSException
from directdnsonly.app.utils.zone_parser import (
count_zone_records,
validate_and_normalize_zone,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
MINIMAL_ZONE = "example.com. 300 IN A 1.2.3.4"
FULL_ZONE = """\
$ORIGIN example.com.
$TTL 300
@ IN SOA ns1.example.com. admin.example.com. 2024010101 3600 900 604800 300
@ IN NS ns1.example.com.
@ IN A 1.2.3.4
www IN A 5.6.7.8
mail IN MX 10 mail.example.com.
"""
# ---------------------------------------------------------------------------
# validate_and_normalize_zone
# ---------------------------------------------------------------------------
def test_validate_adds_origin_when_missing():
result = validate_and_normalize_zone(MINIMAL_ZONE, "example.com")
assert "$ORIGIN example.com." in result
def test_validate_adds_ttl_when_missing():
result = validate_and_normalize_zone(MINIMAL_ZONE, "example.com")
assert "$TTL" in result
def test_validate_does_not_duplicate_origin():
zone = "$ORIGIN example.com.\nexample.com. 300 IN A 1.2.3.4"
result = validate_and_normalize_zone(zone, "example.com")
assert result.count("$ORIGIN") == 1
def test_validate_does_not_duplicate_ttl():
zone = "$TTL 300\nexample.com. 300 IN A 1.2.3.4"
result = validate_and_normalize_zone(zone, "example.com")
assert result.count("$TTL") == 1
def test_validate_appends_dot_to_domain():
result = validate_and_normalize_zone(MINIMAL_ZONE, "example.com")
assert "$ORIGIN example.com." in result
def test_validate_returns_string():
result = validate_and_normalize_zone(MINIMAL_ZONE, "example.com")
assert isinstance(result, str)
def test_validate_full_zone_passes():
result = validate_and_normalize_zone(FULL_ZONE, "example.com")
assert result is not None
def test_validate_raises_on_invalid_zone():
bad_zone = "this is not a zone file at all !!!"
with pytest.raises(ValueError, match="Invalid zone data"):
validate_and_normalize_zone(bad_zone, "example.com")
# ---------------------------------------------------------------------------
# count_zone_records
# ---------------------------------------------------------------------------
def test_count_records_simple_zone():
zone = "$ORIGIN example.com.\n$TTL 300\n@ IN A 1.2.3.4\n@ IN AAAA ::1\n"
count = count_zone_records(zone, "example.com")
assert count == 2
def test_count_records_soa_included():
count = count_zone_records(FULL_ZONE, "example.com")
# SOA + NS + A (apex) + A (www) + MX = 5
assert count == 5
def test_count_records_returns_negative_on_bad_zone():
count = count_zone_records("not a valid zone", "example.com")
assert count == -1
def test_count_records_empty_zone():
zone = "$ORIGIN example.com.\n$TTL 300\n"
count = count_zone_records(zone, "example.com")
assert count == 0