feat: add CMD_MULTI_SERVER methods to DirectAdminClient 🔌

Adds get_extra_dns_servers(), add_extra_dns_server(), and the
high-level ensure_extra_dns_server() which registers a node and
enforces dns=yes + domain_check=yes in a single call.  Also adds
the generic post() helper.  10 new tests, 141 total.
This commit is contained in:
2026-02-25 16:29:21 +13:00
parent 3f6a061ffe
commit 0f417da204
2 changed files with 298 additions and 0 deletions

View File

@@ -161,6 +161,136 @@ class DirectAdminClient:
logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}") logger.error(f"[da:{self.hostname}] GET {command} failed: {exc}")
return None return None
def post(
self, command: str, data: Optional[dict] = None
) -> Optional[requests.Response]:
"""Authenticated POST to any DA CMD_* endpoint."""
url = f"{self.scheme}://{self.hostname}:{self.port}/{command}"
kwargs: dict = dict(
data=data or {},
timeout=30,
verify=self.verify_ssl,
allow_redirects=False,
)
if self._cookies:
kwargs["cookies"] = self._cookies
else:
kwargs["auth"] = (self.username, self.password)
try:
return requests.post(url, **kwargs)
except Exception as exc:
logger.error(f"[da:{self.hostname}] POST {command} failed: {exc}")
return None
def get_extra_dns_servers(self) -> dict:
"""Return the Extra DNS server map from CMD_MULTI_SERVER (GET).
Returns a dict keyed by server hostname/IP, each value being the
per-server settings dict (dns, domain_check, port, user, ssl, …).
Returns ``{}`` on any error.
"""
resp = self.get("CMD_MULTI_SERVER", params={"json": "yes"})
if resp is None or resp.status_code != 200:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER GET failed")
return {}
try:
return resp.json().get("servers", {})
except Exception as exc:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER parse error: {exc}")
return {}
def add_extra_dns_server(
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
) -> bool:
"""Register a new Extra DNS server via CMD_MULTI_SERVER action=add.
Returns ``True`` if DA reports success, ``False`` otherwise.
"""
resp = self.post(
"CMD_MULTI_SERVER",
data={
"action": "add",
"json": "yes",
"ip": ip,
"port": str(port),
"user": user,
"passwd": passwd,
"ssl": "yes" if ssl else "no",
},
)
if resp is None or resp.status_code != 200:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add failed for {ip}")
return False
try:
result = resp.json()
if result.get("success"):
logger.info(f"[da:{self.hostname}] Added Extra DNS server {ip}")
return True
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER add error: {result.get('result', result)}"
)
return False
except Exception as exc:
logger.error(f"[da:{self.hostname}] CMD_MULTI_SERVER add parse error: {exc}")
return False
def ensure_extra_dns_server(
self, ip: str, port: int, user: str, passwd: str, ssl: bool = False
) -> bool:
"""Add (if absent) and configure a directdnsonly Extra DNS server.
Ensures the server is registered with ``dns=yes`` and
``domain_check=yes`` so DirectAdmin pushes zone updates to it.
Returns ``True`` if fully configured, ``False`` on any failure.
"""
servers = self.get_extra_dns_servers()
if ip not in servers:
if not self.add_extra_dns_server(ip, port, user, passwd, ssl):
return False
ssl_str = "yes" if ssl else "no"
resp = self.post(
"CMD_MULTI_SERVER",
data={
"action": "multiple",
"save": "yes",
"json": "yes",
"passwd": "",
"select0": ip,
f"port-{ip}": str(port),
f"user-{ip}": user,
f"ssl-{ip}": ssl_str,
f"dns-{ip}": "yes",
f"domain_check-{ip}": "yes",
f"user_check-{ip}": "no",
f"email-{ip}": "no",
f"show_all_users-{ip}": "no",
},
)
if resp is None or resp.status_code != 200:
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save failed for {ip}"
)
return False
try:
result = resp.json()
if result.get("success"):
logger.info(
f"[da:{self.hostname}] Extra DNS server {ip} configured "
f"(dns=yes domain_check=yes)"
)
return True
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save error: {result.get('result', result)}"
)
return False
except Exception as exc:
logger.error(
f"[da:{self.hostname}] CMD_MULTI_SERVER save parse error: {exc}"
)
return False
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Internal # Internal
# ------------------------------------------------------------------ # ------------------------------------------------------------------

View File

@@ -200,3 +200,171 @@ def test_login_returns_false_on_exception():
result = client._login() result = client._login()
assert result is False assert result is False
# ---------------------------------------------------------------------------
# get_extra_dns_servers
# ---------------------------------------------------------------------------
def _multi_server_get_resp(servers=None):
mock = MagicMock()
mock.status_code = 200
mock.is_redirect = False
mock.headers = {"Content-Type": "application/json"}
mock.json.return_value = {"CLUSTER_ON": "yes", "servers": servers or {}}
mock.raise_for_status = MagicMock()
return mock
def test_get_extra_dns_servers_returns_servers_dict():
servers = {
"1.2.3.4": {"dns": "yes", "domain_check": "yes", "port": "2222", "ssl": "no"}
}
with patch("requests.get", return_value=_multi_server_get_resp(servers)):
result = _client().get_extra_dns_servers()
assert "1.2.3.4" in result
assert result["1.2.3.4"]["dns"] == "yes"
def test_get_extra_dns_servers_returns_empty_on_http_error():
mock_resp = MagicMock()
mock_resp.status_code = 500
with patch("requests.get", return_value=mock_resp):
result = _client().get_extra_dns_servers()
assert result == {}
def test_get_extra_dns_servers_returns_empty_on_connection_error():
with patch(
"requests.get", side_effect=requests.exceptions.ConnectionError("refused")
):
result = _client().get_extra_dns_servers()
assert result == {}
# ---------------------------------------------------------------------------
# add_extra_dns_server
# ---------------------------------------------------------------------------
def test_add_extra_dns_server_returns_true_on_success():
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {"result": "", "success": "Connection Added"}
with patch("requests.post", return_value=mock_resp):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is True
def test_add_extra_dns_server_returns_false_on_da_error():
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {"result": "Server already exists", "success": ""}
with patch("requests.post", return_value=mock_resp):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is False
def test_add_extra_dns_server_returns_false_on_connection_error():
with patch(
"requests.post", side_effect=requests.exceptions.ConnectionError("refused")
):
result = _client().add_extra_dns_server("1.2.3.4", 2222, "ddnsonly", "s3cr3t")
assert result is False
# ---------------------------------------------------------------------------
# ensure_extra_dns_server
# ---------------------------------------------------------------------------
def _add_success_resp():
mock = MagicMock()
mock.status_code = 200
mock.json.return_value = {"result": "", "success": "Connection Added"}
return mock
def _save_success_resp():
mock = MagicMock()
mock.status_code = 200
mock.json.return_value = {"result": "", "success": "Connections Saved"}
return mock
def test_ensure_extra_dns_server_adds_and_configures_new_server():
"""Server not yet registered — adds it, then saves dns+domain_check settings."""
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch(
"requests.post",
side_effect=[_add_success_resp(), _save_success_resp()],
),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is True
def test_ensure_extra_dns_server_skips_add_when_already_present():
"""Server already registered — no add call, only saves settings."""
existing = {
"1.2.3.4": {"dns": "no", "domain_check": "no", "port": "2222", "ssl": "no"}
}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers=existing)),
patch("requests.post", return_value=_save_success_resp()) as mock_post,
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is True
assert mock_post.call_count == 1 # save only, no add
def test_ensure_extra_dns_server_returns_false_when_add_fails():
fail_resp = MagicMock()
fail_resp.status_code = 200
fail_resp.json.return_value = {"result": "error", "success": ""}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch("requests.post", return_value=fail_resp),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is False
def test_ensure_extra_dns_server_returns_false_when_save_fails():
"""Add succeeds but the subsequent settings save fails."""
fail_save = MagicMock()
fail_save.status_code = 200
fail_save.json.return_value = {"result": "error", "success": ""}
with (
patch("requests.get", return_value=_multi_server_get_resp(servers={})),
patch(
"requests.post",
side_effect=[_add_success_resp(), fail_save],
),
):
result = _client().ensure_extra_dns_server(
"1.2.3.4", 2222, "ddnsonly", "s3cr3t"
)
assert result is False