From 59c0bbde0c7105b340288a33aa4252826570f67a Mon Sep 17 00:00:00 2001 From: Aaron Guise Date: Mon, 15 Jun 2026 22:20:47 +1200 Subject: [PATCH] fix(tracker): is_processed now checks expires_at, not just file existence :bug: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously is_processed() returned True for any record file that existed, relying entirely on cleanup_expired() (called at __init__) to delete stale files. Because cleanup runs at container startup — before Akahu transactions are fetched — any record that expired exactly on that startup would be deleted and then immediately missed, letting the duplicate through. Fix: is_processed() reads the expires_at field from the JSON and returns False if the record has expired, regardless of whether cleanup has run. Also adds migrate_ttl.py script to retroactively extend expires_at on existing records that were written under a shorter TTL, and bumps version to 0.1.3. --- pyproject.toml | 2 +- scripts/migrate_ttl.py | 65 +++++++++++++++++++++++++++++++++ tests/test_transact_cache.py | 37 +++++++++++++++++++ transaction_tracker/__init__.py | 12 +++++- 4 files changed, 113 insertions(+), 3 deletions(-) create mode 100644 scripts/migrate_ttl.py diff --git a/pyproject.toml b/pyproject.toml index ea1e34a..2c4508d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "transaction-tracker" -version = "0.1.0" +version = "0.1.3" description = "" authors = [ {name = "Aaron Guise",email = "aaron@guise.net.nz"} diff --git a/scripts/migrate_ttl.py b/scripts/migrate_ttl.py new file mode 100644 index 0000000..d3c8872 --- /dev/null +++ b/scripts/migrate_ttl.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +""" +Migrate existing transaction records to a new TTL. + +Reads all .json record files in the given storage directory and rewrites +expires_at to processed_at + ttl_days. Records that have already expired +are left untouched (they will be cleaned up on next startup). + +Usage: + python migrate_ttl.py + +Example (run on the server): + python migrate_ttl.py /opt/data/transaction_records 14 +""" +import json +import os +import sys +from datetime import datetime, timedelta + + +def migrate(storage_dir: str, ttl_days: int) -> None: + if not os.path.isdir(storage_dir): + print(f"Directory not found: {storage_dir}") + sys.exit(1) + + now = datetime.now() + updated = skipped = errors = 0 + + for filename in os.listdir(storage_dir): + if not filename.endswith(".json"): + continue + path = os.path.join(storage_dir, filename) + try: + with open(path) as f: + record = json.load(f) + + processed_at = datetime.fromisoformat(record["processed_at"]) + current_expires = datetime.fromisoformat(record["expires_at"]) + + if current_expires <= now: + skipped += 1 + continue + + new_expires = processed_at + timedelta(days=ttl_days) + if new_expires <= current_expires: + skipped += 1 + continue + + record["expires_at"] = new_expires.isoformat() + with open(path, "w") as f: + json.dump(record, f) + updated += 1 + + except (json.JSONDecodeError, KeyError, ValueError, OSError) as e: + print(f" ERROR {filename}: {e}") + errors += 1 + + print(f"Done: {updated} updated, {skipped} skipped (already expired or TTL already sufficient), {errors} errors") + + +if __name__ == "__main__": + if len(sys.argv) != 3: + print(__doc__) + sys.exit(1) + migrate(sys.argv[1], int(sys.argv[2])) diff --git a/tests/test_transact_cache.py b/tests/test_transact_cache.py index 3a3de7f..2f32bd1 100644 --- a/tests/test_transact_cache.py +++ b/tests/test_transact_cache.py @@ -215,6 +215,43 @@ class TestTransactionProcessor: with pytest.raises(TransactionAlreadyProcessedError): multi_decorated("multi_1", 100.00) + def test_is_processed_respects_expiry_without_cleanup(self, temp_dir): + """Expired record must not block re-processing even if cleanup hasn't run. + + This is the regression test for the bug where is_processed() only checked + file existence. cleanup_expired() runs at startup and deletes expired files, + but if the expired file is deleted AFTER a duplicate arrives in the same run + the duplicate would slip through. The fix: is_processed() reads expires_at + from the JSON directly. + """ + import json + + processor = TransactionProcessor(storage_dir=temp_dir, ttl_days=1) + + @processor.unique_transaction() + def pay(transaction_id, amount): + return "ok" + + pay("ttl_check", 50.0) + + record_path = os.path.join(temp_dir, "ttl_check.json") + assert os.path.exists(record_path) + + # Manually backdate expires_at so the record is expired + with open(record_path) as f: + record = json.load(f) + record["expires_at"] = (datetime.now() - timedelta(seconds=1)).isoformat() + with open(record_path, "w") as f: + json.dump(record, f) + + # is_processed must return False — the file still exists but it's expired + assert not processor.tracker.is_processed("ttl_check") + + def test_is_processed_returns_true_for_valid_record(self, processor): + """is_processed returns True for a record that exists and has not expired.""" + processor.tracker.mark_processed("valid_tx") + assert processor.tracker.is_processed("valid_tx") + def test_non_string_transaction_id(self, processor): """Test handling of non-string transaction IDs.""" diff --git a/transaction_tracker/__init__.py b/transaction_tracker/__init__.py index 6ef5b8b..390b864 100644 --- a/transaction_tracker/__init__.py +++ b/transaction_tracker/__init__.py @@ -40,9 +40,17 @@ class TransactionTracker: self.logger.debug(f"Transaction {transaction_id} marked as processed") def is_processed(self, transaction_id): - """Check if transaction was already processed.""" + """Check if transaction was already processed and its record has not expired.""" record_path = os.path.join(self.storage_dir, f"{transaction_id}.json") - return os.path.exists(record_path) + if not os.path.exists(record_path): + return False + try: + with open(record_path, "r") as f: + record = json.load(f) + expires_at = datetime.fromisoformat(record["expires_at"]) + return datetime.now() < expires_at + except (json.JSONDecodeError, KeyError, ValueError, TypeError): + return False def require_unique_transaction(self, id_arg='transaction_id'): """