import os import json from datetime import datetime, timedelta import shutil import logging from functools import wraps from .exceptions import TransactionAlreadyProcessedError class TransactionTracker: """Track processed transactions with automatic 3-day TTL.""" def __init__(self, storage_dir="transaction_records", ttl_days=3): """Initialize with storage directory path and TTL in days.""" self.storage_dir = storage_dir self.ttl_days = ttl_days self._ensure_storage_exists() self.logger = logging.getLogger(__name__) # Run cleanup at initialization self.cleanup_expired() def _ensure_storage_exists(self): """Create storage directory if it doesn't exist.""" if not os.path.exists(self.storage_dir): os.makedirs(self.storage_dir) def mark_processed(self, transaction_id): """Mark a transaction as processed.""" # Create record file with timestamp record_path = os.path.join(self.storage_dir, f"{transaction_id}.json") record_data = { "transaction_id": str(transaction_id), # Convert to string for non-string IDs "processed_at": datetime.now().isoformat(), "expires_at": (datetime.now() + timedelta(days=self.ttl_days)).isoformat() } with open(record_path, "w") as f: json.dump(record_data, f) self.logger.debug(f"Transaction {transaction_id} marked as processed") def is_processed(self, transaction_id): """Check if transaction was already processed.""" record_path = os.path.join(self.storage_dir, f"{transaction_id}.json") return os.path.exists(record_path) def require_unique_transaction(self, id_arg='transaction_id'): """ Decorator factory to ensure a function is only run once per transaction ID. Args: id_arg: The argument name containing the transaction ID Usage: @tracker.require_unique_transaction() def process_payment(transaction_id, amount): # Process payment logic return "Success" """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): # Extract transaction_id from args or kwargs if id_arg in kwargs: transaction_id = kwargs[id_arg] else: # Find the position of transaction_id in the function signature import inspect sig = inspect.signature(func) param_names = list(sig.parameters.keys()) try: idx = param_names.index(id_arg) if idx < len(args): transaction_id = args[idx] else: raise ValueError(f"Could not find {id_arg} in arguments") except ValueError: raise ValueError(f"Could not find {id_arg} in arguments") # Always convert to string for consistent handling str_transaction_id = str(transaction_id) # Check if transaction was already processed if self.is_processed(str_transaction_id): raise TransactionAlreadyProcessedError( f"Transaction {transaction_id} was already processed" ) # Process the transaction result = func(*args, **kwargs) # Mark as processed self.mark_processed(str_transaction_id) return result return wrapper return decorator def cleanup_expired(self): """Remove records older than the TTL period.""" now = datetime.now() count = 0 for filename in os.listdir(self.storage_dir): if not filename.endswith('.json'): continue file_path = os.path.join(self.storage_dir, filename) try: with open(file_path, 'r') as f: record = json.load(f) expires_at = datetime.fromisoformat(record.get('expires_at')) if now >= expires_at: os.remove(file_path) count += 1 except (json.JSONDecodeError, KeyError, ValueError): # If file is corrupted, remove it os.remove(file_path) count += 1 if count > 0: self.logger.info(f"Cleaned up {count} expired transaction records") def clear_all(self): """Remove all transaction records.""" if os.path.exists(self.storage_dir): shutil.rmtree(self.storage_dir) self._ensure_storage_exists() self.logger.info("Cleared all transaction records") class TransactionProcessor: """Process transactions with duplicate detection.""" def __init__(self, storage_dir="transaction_records", ttl_days=3): """Initialize with a TransactionTracker.""" self.tracker = TransactionTracker(storage_dir, ttl_days) def unique_transaction(self, id_arg='transaction_id'): """ Decorator to ensure a transaction is processed only once. Usage: @processor.unique_transaction() def process_payment(transaction_id, amount): # Your processing logic return "Success" """ return self.tracker.require_unique_transaction(id_arg) def cleanup(self): """Clean up expired transactions.""" self.tracker.cleanup_expired() def reset(self): """Clear all transaction records.""" self.tracker.clear_all()