Files
transaction-tracker/transaction_tracker/__init__.py
Aaron Guise 374d1019fa
Some checks failed
create-release / build (push) Successful in 11s
CI / release (release) Failing after 7m3s
Initial project 🎉
2025-05-16 16:57:20 +12:00

161 lines
5.7 KiB
Python

import os
import json
from datetime import datetime, timedelta
import shutil
import logging
from functools import wraps
from .exceptions import TransactionAlreadyProcessedError
class TransactionTracker:
"""Track processed transactions with automatic 3-day TTL."""
def __init__(self, storage_dir="transaction_records", ttl_days=3):
"""Initialize with storage directory path and TTL in days."""
self.storage_dir = storage_dir
self.ttl_days = ttl_days
self._ensure_storage_exists()
self.logger = logging.getLogger(__name__)
# Run cleanup at initialization
self.cleanup_expired()
def _ensure_storage_exists(self):
"""Create storage directory if it doesn't exist."""
if not os.path.exists(self.storage_dir):
os.makedirs(self.storage_dir)
def mark_processed(self, transaction_id):
"""Mark a transaction as processed."""
# Create record file with timestamp
record_path = os.path.join(self.storage_dir, f"{transaction_id}.json")
record_data = {
"transaction_id": str(transaction_id), # Convert to string for non-string IDs
"processed_at": datetime.now().isoformat(),
"expires_at": (datetime.now() + timedelta(days=self.ttl_days)).isoformat()
}
with open(record_path, "w") as f:
json.dump(record_data, f)
self.logger.debug(f"Transaction {transaction_id} marked as processed")
def is_processed(self, transaction_id):
"""Check if transaction was already processed."""
record_path = os.path.join(self.storage_dir, f"{transaction_id}.json")
return os.path.exists(record_path)
def require_unique_transaction(self, id_arg='transaction_id'):
"""
Decorator factory to ensure a function is only run once per transaction ID.
Args:
id_arg: The argument name containing the transaction ID
Usage:
@tracker.require_unique_transaction()
def process_payment(transaction_id, amount):
# Process payment logic
return "Success"
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Extract transaction_id from args or kwargs
if id_arg in kwargs:
transaction_id = kwargs[id_arg]
else:
# Find the position of transaction_id in the function signature
import inspect
sig = inspect.signature(func)
param_names = list(sig.parameters.keys())
try:
idx = param_names.index(id_arg)
if idx < len(args):
transaction_id = args[idx]
else:
raise ValueError(f"Could not find {id_arg} in arguments")
except ValueError:
raise ValueError(f"Could not find {id_arg} in arguments")
# Always convert to string for consistent handling
str_transaction_id = str(transaction_id)
# Check if transaction was already processed
if self.is_processed(str_transaction_id):
raise TransactionAlreadyProcessedError(
f"Transaction {transaction_id} was already processed"
)
# Process the transaction
result = func(*args, **kwargs)
# Mark as processed
self.mark_processed(str_transaction_id)
return result
return wrapper
return decorator
def cleanup_expired(self):
"""Remove records older than the TTL period."""
now = datetime.now()
count = 0
for filename in os.listdir(self.storage_dir):
if not filename.endswith('.json'):
continue
file_path = os.path.join(self.storage_dir, filename)
try:
with open(file_path, 'r') as f:
record = json.load(f)
expires_at = datetime.fromisoformat(record.get('expires_at'))
if now >= expires_at:
os.remove(file_path)
count += 1
except (json.JSONDecodeError, KeyError, ValueError):
# If file is corrupted, remove it
os.remove(file_path)
count += 1
if count > 0:
self.logger.info(f"Cleaned up {count} expired transaction records")
def clear_all(self):
"""Remove all transaction records."""
if os.path.exists(self.storage_dir):
shutil.rmtree(self.storage_dir)
self._ensure_storage_exists()
self.logger.info("Cleared all transaction records")
class TransactionProcessor:
"""Process transactions with duplicate detection."""
def __init__(self, storage_dir="transaction_records", ttl_days=3):
"""Initialize with a TransactionTracker."""
self.tracker = TransactionTracker(storage_dir, ttl_days)
def unique_transaction(self, id_arg='transaction_id'):
"""
Decorator to ensure a transaction is processed only once.
Usage:
@processor.unique_transaction()
def process_payment(transaction_id, amount):
# Your processing logic
return "Success"
"""
return self.tracker.require_unique_transaction(id_arg)
def cleanup(self):
"""Clean up expired transactions."""
self.tracker.cleanup_expired()
def reset(self):
"""Clear all transaction records."""
self.tracker.clear_all()