You've already forked transaction-tracker
161 lines
5.7 KiB
Python
161 lines
5.7 KiB
Python
import os
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
import shutil
|
|
import logging
|
|
from functools import wraps
|
|
from .exceptions import TransactionAlreadyProcessedError
|
|
|
|
|
|
class TransactionTracker:
|
|
"""Track processed transactions with automatic 3-day TTL."""
|
|
|
|
def __init__(self, storage_dir="transaction_records", ttl_days=3):
|
|
"""Initialize with storage directory path and TTL in days."""
|
|
self.storage_dir = storage_dir
|
|
self.ttl_days = ttl_days
|
|
self._ensure_storage_exists()
|
|
self.logger = logging.getLogger(__name__)
|
|
# Run cleanup at initialization
|
|
self.cleanup_expired()
|
|
|
|
def _ensure_storage_exists(self):
|
|
"""Create storage directory if it doesn't exist."""
|
|
if not os.path.exists(self.storage_dir):
|
|
os.makedirs(self.storage_dir)
|
|
|
|
def mark_processed(self, transaction_id):
|
|
"""Mark a transaction as processed."""
|
|
# Create record file with timestamp
|
|
record_path = os.path.join(self.storage_dir, f"{transaction_id}.json")
|
|
record_data = {
|
|
"transaction_id": str(transaction_id), # Convert to string for non-string IDs
|
|
"processed_at": datetime.now().isoformat(),
|
|
"expires_at": (datetime.now() + timedelta(days=self.ttl_days)).isoformat()
|
|
}
|
|
|
|
with open(record_path, "w") as f:
|
|
json.dump(record_data, f)
|
|
|
|
self.logger.debug(f"Transaction {transaction_id} marked as processed")
|
|
|
|
def is_processed(self, transaction_id):
|
|
"""Check if transaction was already processed."""
|
|
record_path = os.path.join(self.storage_dir, f"{transaction_id}.json")
|
|
return os.path.exists(record_path)
|
|
|
|
def require_unique_transaction(self, id_arg='transaction_id'):
|
|
"""
|
|
Decorator factory to ensure a function is only run once per transaction ID.
|
|
|
|
Args:
|
|
id_arg: The argument name containing the transaction ID
|
|
|
|
Usage:
|
|
@tracker.require_unique_transaction()
|
|
def process_payment(transaction_id, amount):
|
|
# Process payment logic
|
|
return "Success"
|
|
"""
|
|
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
# Extract transaction_id from args or kwargs
|
|
if id_arg in kwargs:
|
|
transaction_id = kwargs[id_arg]
|
|
else:
|
|
# Find the position of transaction_id in the function signature
|
|
import inspect
|
|
sig = inspect.signature(func)
|
|
param_names = list(sig.parameters.keys())
|
|
try:
|
|
idx = param_names.index(id_arg)
|
|
if idx < len(args):
|
|
transaction_id = args[idx]
|
|
else:
|
|
raise ValueError(f"Could not find {id_arg} in arguments")
|
|
except ValueError:
|
|
raise ValueError(f"Could not find {id_arg} in arguments")
|
|
|
|
# Always convert to string for consistent handling
|
|
str_transaction_id = str(transaction_id)
|
|
|
|
# Check if transaction was already processed
|
|
if self.is_processed(str_transaction_id):
|
|
raise TransactionAlreadyProcessedError(
|
|
f"Transaction {transaction_id} was already processed"
|
|
)
|
|
|
|
# Process the transaction
|
|
result = func(*args, **kwargs)
|
|
|
|
# Mark as processed
|
|
self.mark_processed(str_transaction_id)
|
|
|
|
return result
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
def cleanup_expired(self):
|
|
"""Remove records older than the TTL period."""
|
|
now = datetime.now()
|
|
count = 0
|
|
for filename in os.listdir(self.storage_dir):
|
|
if not filename.endswith('.json'):
|
|
continue
|
|
|
|
file_path = os.path.join(self.storage_dir, filename)
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
record = json.load(f)
|
|
|
|
expires_at = datetime.fromisoformat(record.get('expires_at'))
|
|
if now >= expires_at:
|
|
os.remove(file_path)
|
|
count += 1
|
|
except (json.JSONDecodeError, KeyError, ValueError):
|
|
# If file is corrupted, remove it
|
|
os.remove(file_path)
|
|
count += 1
|
|
|
|
if count > 0:
|
|
self.logger.info(f"Cleaned up {count} expired transaction records")
|
|
|
|
def clear_all(self):
|
|
"""Remove all transaction records."""
|
|
if os.path.exists(self.storage_dir):
|
|
shutil.rmtree(self.storage_dir)
|
|
self._ensure_storage_exists()
|
|
self.logger.info("Cleared all transaction records")
|
|
|
|
|
|
class TransactionProcessor:
|
|
"""Process transactions with duplicate detection."""
|
|
|
|
def __init__(self, storage_dir="transaction_records", ttl_days=3):
|
|
"""Initialize with a TransactionTracker."""
|
|
self.tracker = TransactionTracker(storage_dir, ttl_days)
|
|
|
|
def unique_transaction(self, id_arg='transaction_id'):
|
|
"""
|
|
Decorator to ensure a transaction is processed only once.
|
|
|
|
Usage:
|
|
@processor.unique_transaction()
|
|
def process_payment(transaction_id, amount):
|
|
# Your processing logic
|
|
return "Success"
|
|
"""
|
|
return self.tracker.require_unique_transaction(id_arg)
|
|
|
|
def cleanup(self):
|
|
"""Clean up expired transactions."""
|
|
self.tracker.cleanup_expired()
|
|
|
|
def reset(self):
|
|
"""Clear all transaction records."""
|
|
self.tracker.clear_all()
|