Files
transaction-tracker/tests/test_transact_cache.py
Aaron Guise 59c0bbde0c
Some checks failed
create-release / build (push) Successful in 35s
CI / release (release) Failing after 1m31s
fix(tracker): is_processed now checks expires_at, not just file existence 🐛
Previously is_processed() returned True for any record file that existed,
relying entirely on cleanup_expired() (called at __init__) to delete stale
files. Because cleanup runs at container startup — before Akahu transactions
are fetched — any record that expired exactly on that startup would be deleted
and then immediately missed, letting the duplicate through.

Fix: is_processed() reads the expires_at field from the JSON and returns False
if the record has expired, regardless of whether cleanup has run.

Also adds migrate_ttl.py script to retroactively extend expires_at on existing
records that were written under a shorter TTL, and bumps version to 0.1.3.
2026-06-15 22:20:47 +12:00

273 lines
9.9 KiB
Python

import os
import shutil
import tempfile
import time
from datetime import datetime, timedelta
import pytest
# from unittest.mock import patch, MagicMock
from functools import wraps
# Import the code to test
from transaction_tracker import TransactionProcessor, TransactionAlreadyProcessedError
class TestTransactionProcessor:
"""Test suite for TransactionProcessor class."""
@pytest.fixture
def temp_dir(self):
"""Create a temporary directory for testing."""
temp_dir = tempfile.mkdtemp()
yield temp_dir
# Clean up after the test
shutil.rmtree(temp_dir)
@pytest.fixture
def processor(self, temp_dir):
"""Create a TransactionProcessor instance for testing."""
return TransactionProcessor(storage_dir=temp_dir, ttl_days=3)
def test_unique_transaction_decorator(self, processor):
"""Test the unique transaction decorator works."""
# Define a function with the decorator
@processor.unique_transaction()
def sample_transaction(transaction_id, amount):
return {"status": "success", "amount": amount}
# First call should succeed
result = sample_transaction("test_tx_1", 100.00)
assert result == {"status": "success", "amount": 100.00}
# Second call with same ID should raise exception
with pytest.raises(TransactionAlreadyProcessedError):
sample_transaction("test_tx_1", 200.00)
# Different ID should work
result = sample_transaction("test_tx_2", 300.00)
assert result == {"status": "success", "amount": 300.00}
def test_custom_id_arg(self, processor):
"""Test decorator with custom transaction ID argument name."""
@processor.unique_transaction(id_arg='custom_id')
def custom_transaction(custom_id, amount):
return {"status": "success", "amount": amount, "id": custom_id}
# First call should succeed
result = custom_transaction("custom_1", 100.00)
assert result == {"status": "success", "amount": 100.00, "id": "custom_1"}
# Second call with same ID should raise exception
with pytest.raises(TransactionAlreadyProcessedError):
custom_transaction("custom_1", 200.00)
def test_keyword_arguments(self, processor):
"""Test decorator with keyword arguments."""
@processor.unique_transaction()
def kw_transaction(transaction_id, amount):
return {"status": "success", "amount": amount}
# Using keyword arguments
result = kw_transaction(transaction_id="kw_1", amount=100.00)
assert result == {"status": "success", "amount": 100.00}
# Second call should fail
with pytest.raises(TransactionAlreadyProcessedError):
kw_transaction(transaction_id="kw_1", amount=200.00)
def test_cleanup_expired(self, temp_dir):
"""Test cleanup of expired transactions."""
# Create processor with short TTL for testing
processor = TransactionProcessor(storage_dir=temp_dir, ttl_days=0.01) # ~15 minutes
# Define a test function
@processor.unique_transaction()
def expired_transaction(transaction_id, amount):
return {"status": "success", "amount": amount}
# Process a transaction
expired_transaction("expire_1", 100.00)
# Verify record exists
record_path = os.path.join(temp_dir, "expire_1.json")
assert os.path.exists(record_path)
# Manually modify the expiration time to make it expire immediately
import json
with open(record_path, 'r') as f:
record = json.load(f)
# Set expires_at to 1 second ago
record['expires_at'] = (datetime.now() - timedelta(seconds=1)).isoformat()
with open(record_path, 'w') as f:
json.dump(record, f)
# Run cleanup
processor.cleanup()
# Verify record was removed
assert not os.path.exists(record_path)
def test_reset(self, processor):
"""Test resetting all transaction records."""
@processor.unique_transaction()
def reset_transaction(transaction_id, amount):
return {"status": "success", "amount": amount}
# Process multiple transactions
reset_transaction("reset_1", 100.00)
reset_transaction("reset_2", 200.00)
# Reset all records
processor.reset()
# Should be able to process the same transactions again
result1 = reset_transaction("reset_1", 100.00)
result2 = reset_transaction("reset_2", 200.00)
assert result1 == {"status": "success", "amount": 100.00}
assert result2 == {"status": "success", "amount": 200.00}
def test_missing_transaction_id(self, processor):
"""Test error handling when transaction ID argument is missing."""
@processor.unique_transaction(id_arg='missing_id')
def missing_transaction(transaction_id, amount):
return {"status": "success", "amount": amount}
# Should raise ValueError because 'missing_id' is not in the function signature
with pytest.raises(ValueError):
missing_transaction("tx_1", 100.00)
def test_corrupt_file_handling(self, temp_dir, processor):
"""Test handling of corrupt transaction record files."""
# Manually create a corrupt record file
corrupt_path = os.path.join(temp_dir, "corrupt_tx.json")
with open(corrupt_path, 'w') as f:
f.write("This is not valid JSON")
# Cleanup should remove the corrupt file without errors
processor.cleanup()
# File should be gone
assert not os.path.exists(corrupt_path)
@pytest.mark.slow
def test_real_ttl_expiration(self, temp_dir):
"""Test actual TTL expiration (takes longer to run)."""
# Create processor with very short TTL
processor = TransactionProcessor(storage_dir=temp_dir, ttl_days=0.0007) # ~1 minute
@processor.unique_transaction()
def quick_expire_transaction(transaction_id, amount):
return {"status": "success", "amount": amount}
# Process a transaction
quick_expire_transaction("quick_expire", 100.00)
# Verify we can't process it again immediately
with pytest.raises(TransactionAlreadyProcessedError):
quick_expire_transaction("quick_expire", 100.00)
# Wait for TTL to expire (slightly over 1 minute)
time.sleep(65)
# Run cleanup
processor.cleanup()
# Should be able to process it again
result = quick_expire_transaction("quick_expire", 100.00)
assert result == {"status": "success", "amount": 100.00}
def test_multiple_decorators(self, processor):
"""Test stacking multiple decorators."""
calls = []
def another_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
calls.append("another_decorator")
return func(*args, **kwargs)
return wrapper
@another_decorator
@processor.unique_transaction()
def multi_decorated(transaction_id, amount):
calls.append("original_function")
return {"status": "success", "amount": amount}
# Call the function
result = multi_decorated("multi_1", 100.00)
# Check result
assert result == {"status": "success", "amount": 100.00}
# Check call order
assert calls == ["another_decorator", "original_function"]
# Second call should still be caught by our transaction decorator
with pytest.raises(TransactionAlreadyProcessedError):
multi_decorated("multi_1", 100.00)
def test_is_processed_respects_expiry_without_cleanup(self, temp_dir):
"""Expired record must not block re-processing even if cleanup hasn't run.
This is the regression test for the bug where is_processed() only checked
file existence. cleanup_expired() runs at startup and deletes expired files,
but if the expired file is deleted AFTER a duplicate arrives in the same run
the duplicate would slip through. The fix: is_processed() reads expires_at
from the JSON directly.
"""
import json
processor = TransactionProcessor(storage_dir=temp_dir, ttl_days=1)
@processor.unique_transaction()
def pay(transaction_id, amount):
return "ok"
pay("ttl_check", 50.0)
record_path = os.path.join(temp_dir, "ttl_check.json")
assert os.path.exists(record_path)
# Manually backdate expires_at so the record is expired
with open(record_path) as f:
record = json.load(f)
record["expires_at"] = (datetime.now() - timedelta(seconds=1)).isoformat()
with open(record_path, "w") as f:
json.dump(record, f)
# is_processed must return False — the file still exists but it's expired
assert not processor.tracker.is_processed("ttl_check")
def test_is_processed_returns_true_for_valid_record(self, processor):
"""is_processed returns True for a record that exists and has not expired."""
processor.tracker.mark_processed("valid_tx")
assert processor.tracker.is_processed("valid_tx")
def test_non_string_transaction_id(self, processor):
"""Test handling of non-string transaction IDs."""
@processor.unique_transaction()
def int_id_transaction(transaction_id, amount):
return {"status": "success", "amount": amount}
# Use an integer as transaction_id
result = int_id_transaction(12345, 100.00)
assert result == {"status": "success", "amount": 100.00}
# Try again with same ID
with pytest.raises(TransactionAlreadyProcessedError):
int_id_transaction(12345, 200.00)
# The string representation should also be caught
with pytest.raises(TransactionAlreadyProcessedError):
int_id_transaction("12345", 300.00)