You've already forked transaction-tracker
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 59c0bbde0c |
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "transaction-tracker"
|
||||
version = "0.1.0"
|
||||
version = "0.1.3"
|
||||
description = ""
|
||||
authors = [
|
||||
{name = "Aaron Guise",email = "aaron@guise.net.nz"}
|
||||
|
||||
65
scripts/migrate_ttl.py
Normal file
65
scripts/migrate_ttl.py
Normal file
@@ -0,0 +1,65 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Migrate existing transaction records to a new TTL.
|
||||
|
||||
Reads all .json record files in the given storage directory and rewrites
|
||||
expires_at to processed_at + ttl_days. Records that have already expired
|
||||
are left untouched (they will be cleaned up on next startup).
|
||||
|
||||
Usage:
|
||||
python migrate_ttl.py <storage_dir> <ttl_days>
|
||||
|
||||
Example (run on the server):
|
||||
python migrate_ttl.py /opt/data/transaction_records 14
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
def migrate(storage_dir: str, ttl_days: int) -> None:
|
||||
if not os.path.isdir(storage_dir):
|
||||
print(f"Directory not found: {storage_dir}")
|
||||
sys.exit(1)
|
||||
|
||||
now = datetime.now()
|
||||
updated = skipped = errors = 0
|
||||
|
||||
for filename in os.listdir(storage_dir):
|
||||
if not filename.endswith(".json"):
|
||||
continue
|
||||
path = os.path.join(storage_dir, filename)
|
||||
try:
|
||||
with open(path) as f:
|
||||
record = json.load(f)
|
||||
|
||||
processed_at = datetime.fromisoformat(record["processed_at"])
|
||||
current_expires = datetime.fromisoformat(record["expires_at"])
|
||||
|
||||
if current_expires <= now:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
new_expires = processed_at + timedelta(days=ttl_days)
|
||||
if new_expires <= current_expires:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
record["expires_at"] = new_expires.isoformat()
|
||||
with open(path, "w") as f:
|
||||
json.dump(record, f)
|
||||
updated += 1
|
||||
|
||||
except (json.JSONDecodeError, KeyError, ValueError, OSError) as e:
|
||||
print(f" ERROR {filename}: {e}")
|
||||
errors += 1
|
||||
|
||||
print(f"Done: {updated} updated, {skipped} skipped (already expired or TTL already sufficient), {errors} errors")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 3:
|
||||
print(__doc__)
|
||||
sys.exit(1)
|
||||
migrate(sys.argv[1], int(sys.argv[2]))
|
||||
@@ -215,6 +215,43 @@ class TestTransactionProcessor:
|
||||
with pytest.raises(TransactionAlreadyProcessedError):
|
||||
multi_decorated("multi_1", 100.00)
|
||||
|
||||
def test_is_processed_respects_expiry_without_cleanup(self, temp_dir):
|
||||
"""Expired record must not block re-processing even if cleanup hasn't run.
|
||||
|
||||
This is the regression test for the bug where is_processed() only checked
|
||||
file existence. cleanup_expired() runs at startup and deletes expired files,
|
||||
but if the expired file is deleted AFTER a duplicate arrives in the same run
|
||||
the duplicate would slip through. The fix: is_processed() reads expires_at
|
||||
from the JSON directly.
|
||||
"""
|
||||
import json
|
||||
|
||||
processor = TransactionProcessor(storage_dir=temp_dir, ttl_days=1)
|
||||
|
||||
@processor.unique_transaction()
|
||||
def pay(transaction_id, amount):
|
||||
return "ok"
|
||||
|
||||
pay("ttl_check", 50.0)
|
||||
|
||||
record_path = os.path.join(temp_dir, "ttl_check.json")
|
||||
assert os.path.exists(record_path)
|
||||
|
||||
# Manually backdate expires_at so the record is expired
|
||||
with open(record_path) as f:
|
||||
record = json.load(f)
|
||||
record["expires_at"] = (datetime.now() - timedelta(seconds=1)).isoformat()
|
||||
with open(record_path, "w") as f:
|
||||
json.dump(record, f)
|
||||
|
||||
# is_processed must return False — the file still exists but it's expired
|
||||
assert not processor.tracker.is_processed("ttl_check")
|
||||
|
||||
def test_is_processed_returns_true_for_valid_record(self, processor):
|
||||
"""is_processed returns True for a record that exists and has not expired."""
|
||||
processor.tracker.mark_processed("valid_tx")
|
||||
assert processor.tracker.is_processed("valid_tx")
|
||||
|
||||
def test_non_string_transaction_id(self, processor):
|
||||
"""Test handling of non-string transaction IDs."""
|
||||
|
||||
|
||||
@@ -40,9 +40,17 @@ class TransactionTracker:
|
||||
self.logger.debug(f"Transaction {transaction_id} marked as processed")
|
||||
|
||||
def is_processed(self, transaction_id):
|
||||
"""Check if transaction was already processed."""
|
||||
"""Check if transaction was already processed and its record has not expired."""
|
||||
record_path = os.path.join(self.storage_dir, f"{transaction_id}.json")
|
||||
return os.path.exists(record_path)
|
||||
if not os.path.exists(record_path):
|
||||
return False
|
||||
try:
|
||||
with open(record_path, "r") as f:
|
||||
record = json.load(f)
|
||||
expires_at = datetime.fromisoformat(record["expires_at"])
|
||||
return datetime.now() < expires_at
|
||||
except (json.JSONDecodeError, KeyError, ValueError, TypeError):
|
||||
return False
|
||||
|
||||
def require_unique_transaction(self, id_arg='transaction_id'):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user