From 5ba6079728fe73d1e77676cbec798ea66a1dbdfd Mon Sep 17 00:00:00 2001 From: Ruben Ramirez Date: Sat, 3 May 2025 10:03:35 -0500 Subject: [PATCH] Implement Goal-4 deliverables: SQLite adapter, audit logging, performance benchmarks --- security/memory/audit.py | 107 ++++++++++ storage/adapters/sqlite_adapter.py | 308 +++++++++++++++++++++++++++++ tests/performance/benchmarks.py | 172 ++++++++++++++++ 3 files changed, 587 insertions(+) create mode 100644 security/memory/audit.py create mode 100644 storage/adapters/sqlite_adapter.py create mode 100644 tests/performance/benchmarks.py diff --git a/security/memory/audit.py b/security/memory/audit.py new file mode 100644 index 0000000..adb3615 --- /dev/null +++ b/security/memory/audit.py @@ -0,0 +1,107 @@ +import hashlib +import hmac +import threading +import os +from datetime import datetime +from typing import Dict, List, Optional, Tuple + +class MemoryAudit: + def __init__(self, rbac_engine, hmac_key: Optional[bytes] = None): + """Initialize audit logger with RBAC integration and HMAC protection""" + self.rbac = rbac_engine + self.sequence = 0 + self.log_entries: List[Dict] = [] + self._lock = threading.Lock() + self.hmac_key = hmac_key or os.urandom(32) # 256-bit HMAC key + self._previous_hash = b'' # For chained hashes + + def _generate_hmac(self, data: str) -> Tuple[str, bytes]: + """Generate HMAC-SHA256 hash with chaining""" + h = hmac.new(self.hmac_key, digestmod=hashlib.sha256) + h.update(self._previous_hash) # Chain with previous hash + h.update(data.encode()) + current_hash = h.digest() + self._previous_hash = current_hash + return h.hexdigest(), current_hash + + def log_operation( + self, + operation: str, + key: str, + success: bool, + user: Optional[str] = None, + reason: Optional[str] = None + ) -> str: + """Log an operation with integrity verification hash""" + with self._lock: + self.sequence += 1 + timestamp = datetime.utcnow().isoformat() + hashed_key = self._hash_key(key) + + entry = { + "sequence": self.sequence, + "timestamp": timestamp, + "operation": operation, + "key_hash": hashed_key, + "success": success, + "user": user, + "reason": reason or "" + } + + # Generate HMAC-SHA256 integrity hash with chaining + integrity_hash, _ = self._generate_hmac(str(entry)) + entry["integrity_hash"] = integrity_hash + + # Store entry + self.log_entries.append(entry) + + # Notify RBAC system + if user: + self.rbac._audit_access_attempt( + user, + "memory", + operation, + success, + reason or f"Memory {operation} operation" + ) + + return integrity_hash + + def verify_log_integrity(self) -> bool: + """Verify all log entries' HMAC integrity with chaining""" + with self._lock: + if not self.log_entries: + return True + + # Recompute all hashes with chaining + test_key = self.hmac_key + previous_hash = b'' + for entry in self.log_entries: + h = hmac.new(test_key, digestmod=hashlib.sha256) + h.update(previous_hash) + h.update(str({k:v for k,v in entry.items() + if k != "integrity_hash"}).encode()) + computed_hash = h.hexdigest() + if computed_hash != entry["integrity_hash"]: + return False + previous_hash = h.digest() + + return True + + def by_operation(self, operation: str) -> List[Dict]: + """Filter log entries by operation type""" + with self._lock: + return [entry for entry in self.log_entries + if entry["operation"] == operation] + + def by_user(self, user: str) -> List[Dict]: + """Filter log entries by user""" + with self._lock: + return [entry for entry in self.log_entries + if entry.get("user") == user] + + def by_time_range(self, start: str, end: str) -> List[Dict]: + """Filter log entries between start and end timestamps (ISO format)""" + with self._lock: + return [entry for entry in self.log_entries + if start <= entry["timestamp"] <= end] \ No newline at end of file diff --git a/storage/adapters/sqlite_adapter.py b/storage/adapters/sqlite_adapter.py new file mode 100644 index 0000000..74f4b73 --- /dev/null +++ b/storage/adapters/sqlite_adapter.py @@ -0,0 +1,308 @@ +from security.encrypt import encrypt_data, decrypt_data, generate_key +import sqlite3 +import threading +import hashlib +import logging +import base64 +from typing import Optional, Dict, Any, Union +from security.encrypt import encrypt_data, decrypt_data +from security.rbac_engine import RBACEngine + +class AccessDenied(Exception): + """Raised when RBAC validation fails""" + +class NotFound(Exception): + """Raised when requested key doesn't exist""" + +class EncryptionError(Exception): + """Raised when encryption operation fails""" + +class SQLiteAdapter: + """SQLite storage adapter with RBAC and encryption support. + Integrates with TaskDispatcher for secure storage operations. + + Attributes: + db_path: Path to SQLite database file + encryption_key: Key used for data encryption + rbac: RBAC engine instance (must be set before use) + _lock: Thread lock for concurrent access + logger: Logger instance for operations + """ + + rbac: RBACEngine = None # Must be set by application + logger = logging.getLogger('SQLiteAdapter') + + def __init__(self, db_path: str, encryption_key: str): + """Initialize SQLite adapter with dispatcher integration. + + Args: + db_path: Path to SQLite database file + encryption_key: Encryption key for data protection (32+ chars or 32 bytes) + + Raises: + RuntimeError: If encryption key is invalid + """ + if not encryption_key or (isinstance(encryption_key, str) and len(encryption_key) < 32): + raise RuntimeError("Encryption key must be at least 32 characters or 32 bytes") + + self.db_path = db_path + self.encryption_key = self._convert_key(encryption_key) + self._lock = threading.Lock() + self._init_db() + self.logger.info(f"Initialized SQLite adapter for {db_path}") + + def _init_db(self): + """Initialize database tables.""" + with self._lock, sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS storage ( + key_hash TEXT PRIMARY KEY, + encrypted_value BLOB, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + created_by TEXT + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS access_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + key_hash TEXT, + operation TEXT, + user_id TEXT, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY(key_hash) REFERENCES storage(key_hash) + ) + """) + + def _hash_key(self, key): + """Generate SHA-256 hash of key.""" + return hashlib.sha256(key.encode()).hexdigest() + + def _convert_key(self, key: Union[str, bytes]) -> bytes: + """Convert encryption key to bytes format required by AES-256-GCM. + Supports: + - 32-byte raw keys + - 44-byte base64 encoded keys + - String keys (hashed to 32 bytes) + + Args: + key: Original key (str or bytes) + + Returns: + bytes: 32-byte key + + Raises: + RuntimeError: If key cannot be converted to 32 bytes + """ + if isinstance(key, bytes): + if len(key) == 32: + return key + if len(key) == 44: # Base64 encoded + try: + decoded = base64.urlsafe_b64decode(key) + if len(decoded) == 32: + return decoded + raise ValueError("Decoded key must be 32 bytes") + except Exception as e: + raise RuntimeError(f"Invalid base64 key: {str(e)}") + raise RuntimeError("Bytes key must be 32 raw bytes or 44 base64 bytes") + + # Convert string key to bytes using SHA-256 for consistent length + return hashlib.sha256(key.encode()).digest()[:32] + + def create(self, key: str, value: Any, user_id: str, client_cert_info: Optional[Dict] = None) -> bool: + """Create new storage entry with RBAC check. + Integrates with dispatcher's RBAC system. + + Args: + key: Storage key + value: Value to store (will be encrypted) + user_id: User ID for RBAC validation + client_cert_info: Optional client certificate info for enhanced auth + + Returns: + bool: True if successful, False if unauthorized + + Raises: + ValueError: If key or value is invalid + """ + if not key or not isinstance(key, str): + raise ValueError("Key must be non-empty string") + + if not self.rbac or not self.rbac.validate_permission(user_id, "storage", "create"): + self.logger.warning(f"Unauthorized create attempt by {user_id}") + return False + + key_hash = self._hash_key(key) + try: + encrypted_value = encrypt_data(value, self.encryption_key) + except Exception as e: + self.logger.error(f"Encryption failed: {str(e)}") + raise RuntimeError("Data encryption failed") from e + + with self._lock, sqlite3.connect(self.db_path) as conn: + try: + conn.execute( + "INSERT INTO storage (key_hash, encrypted_value, created_by) VALUES (?, ?, ?)", + (key_hash, encrypted_value, user_id) + ) + conn.execute( + "INSERT INTO access_log (key_hash, operation, user_id) VALUES (?, ?, ?)", + (key_hash, "create", user_id) + ) + return True + except sqlite3.IntegrityError: + return False + + def read(self, key: str, user_id: str) -> Optional[Any]: + """Read storage entry with RBAC check. + Integrates with dispatcher's RBAC system. + + Args: + key: Storage key + user_id: User ID for RBAC validation + + Returns: + Decrypted value or None if unauthorized/not found + + Raises: + ValueError: If key is invalid + RuntimeError: If decryption fails + """ + if not key or not isinstance(key, str): + raise ValueError("Key must be non-empty string") + + if not self.rbac or not self.rbac.validate_permission(user_id, "storage", "read"): + self.logger.warning(f"Unauthorized read attempt by {user_id}") + return None + + key_hash = self._hash_key(key) + + with self._lock, sqlite3.connect(self.db_path) as conn: + cursor = conn.execute( + "SELECT encrypted_value FROM storage WHERE key_hash = ?", + (key_hash,)) + row = cursor.fetchone() + + if row: + conn.execute( + "INSERT INTO access_log (key_hash, operation, user_id) VALUES (?, ?, ?)", + (key_hash, "read", user_id) + ) + try: + return decrypt_data(row[0], self.encryption_key) + except Exception as e: + self.logger.error(f"Decryption failed: {str(e)}") + raise RuntimeError("Data decryption failed") from e + return None + + def delete(self, key: str, user_id: str) -> bool: + """Delete storage entry with RBAC check. + Integrates with dispatcher's RBAC system. + + Args: + key: Storage key + user_id: User ID for RBAC validation + + Returns: + bool: True if successful, False if unauthorized or not found + + Raises: + ValueError: If key is invalid + """ + if not key or not isinstance(key, str): + raise ValueError("Key must be non-empty string") + + if not self.rbac or not self.rbac.validate_permission(user_id, "storage", "delete"): + self.logger.warning(f"Unauthorized delete attempt by {user_id}") + return False + + key_hash = self._hash_key(key) + + with self._lock, sqlite3.connect(self.db_path) as conn: + cursor = conn.execute( + "DELETE FROM storage WHERE key_hash = ?", + (key_hash,)) + + if cursor.rowcount > 0: + conn.execute( + "INSERT INTO access_log (key_hash, operation, user_id) VALUES (?, ?, ?)", + (key_hash, "delete", user_id) + ) + return True + return False + + def close(self): + """Close database connections.""" + pass # SQLite connections are closed automatically + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def update(self, key: str, value: bytes, user_id: str) -> None: + """Update storage entry with RBAC check. + Integrates with dispatcher's RBAC system. + + Args: + key: Storage key (non-empty string) + value: New value to store as bytes (will be encrypted) + user_id: User ID for RBAC validation + + Returns: + None + + Raises: + ValueError: If key is invalid + NotFound: If key doesn't exist + AccessDenied: If RBAC validation fails + EncryptionError: If encryption fails + """ + if not key or not isinstance(key, str): + raise ValueError("Key must be non-empty string") + + if not self.rbac or not self.rbac.validate_permission(user_id, "storage", "update"): + self.logger.warning(f"Unauthorized update attempt by {user_id}") + raise AccessDenied("Update permission denied") + + key_hash = self._hash_key(key) + try: + encrypted_value = encrypt_data(value, self.encryption_key) + except Exception as e: + self.logger.error(f"Encryption failed: {str(e)}") + raise EncryptionError("Data encryption failed") from e + + with self._lock, sqlite3.connect(self.db_path) as conn: + cursor = conn.execute( + "UPDATE storage SET encrypted_value = ?, updated_at = CURRENT_TIMESTAMP WHERE key_hash = ?", + (encrypted_value, key_hash)) + + if cursor.rowcount == 0: + raise NotFound(f"Key {key} not found") + + conn.execute( + "INSERT INTO access_log (key_hash, operation, user_id) VALUES (?, ?, ?)", + (key_hash, "update", user_id) + ) + + def begin_transaction(self): + """Begin a new transaction.""" + self._lock.acquire() + self._transaction_conn = sqlite3.connect(self.db_path) + self._transaction_conn.execute("BEGIN") + + def commit_transaction(self): + """Commit the current transaction.""" + if hasattr(self, '_transaction_conn'): + self._transaction_conn.commit() + self._transaction_conn.close() + self._lock.release() + + def rollback_transaction(self): + """Rollback the current transaction.""" + if hasattr(self, '_transaction_conn'): + self._transaction_conn.rollback() + self._transaction_conn.close() + self._lock.release() \ No newline at end of file diff --git a/tests/performance/benchmarks.py b/tests/performance/benchmarks.py new file mode 100644 index 0000000..89ea4c3 --- /dev/null +++ b/tests/performance/benchmarks.py @@ -0,0 +1,172 @@ +import pytest +import time +import statistics +import threading +import logging +from orchestrator.core.dispatcher import Task, TaskQueue, TaskDispatcher +from security.rbac_engine import RBACEngine, Role +from storage.adapters.sqlite_adapter import SQLiteAdapter +from unittest.mock import MagicMock + +class TestPerformanceBenchmarks: + """Performance benchmark suite for system components""" + + @pytest.fixture + def sample_task(self): + return Task( + id="test-1", + payload={}, + requester="system", + priority=2, + metadata={"resource": "tasks", "action": "execute"} + ) + + @pytest.fixture + def sqlite_adapter(self): + adapter = SQLiteAdapter(":memory:", "test-encryption-key-12345678901234567890") + adapter.rbac = MagicMock() + adapter.rbac.validate_permission.return_value = True + return adapter + + def test_rbac_operation_latency(self, sample_task): + """Benchmark RBAC permission validation latency""" + # Setup + queue = TaskQueue() + queue.rbac.assign_role("test-user", Role.ADMIN) + + # Benchmark + times = [] + for _ in range(1000): + start = time.perf_counter_ns() + queue._validate_permissions("test-user", sample_task) + times.append(time.perf_counter_ns() - start) + + median = statistics.median(times) / 1_000_000 # Convert to ms + assert median < 0.8 # Architectural guardian: ≤800ms + + def test_sqlite_crud_operations(self, sqlite_adapter): + """Benchmark SQLite CRUD operations under different load conditions""" + test_user = "benchmark-user" + logging.basicConfig(filename='metrics/api_performance.log', level=logging.INFO) + + def run_operations(iterations, load_type): + """Run CRUD operations and log metrics""" + create_times = [] + read_times = [] + delete_times = [] + + for i in range(iterations): + # Create + start = time.perf_counter_ns() + sqlite_adapter.create(f"task-{i}-{load_type}", {"data": "test"}, test_user) + create_time = time.perf_counter_ns() - start + create_times.append(create_time) + + # Read + start = time.perf_counter_ns() + sqlite_adapter.read(f"task-{i}-{load_type}", test_user) + read_time = time.perf_counter_ns() - start + read_times.append(read_time) + + # Delete + start = time.perf_counter_ns() + sqlite_adapter.delete(f"task-{i}-{load_type}", test_user) + delete_time = time.perf_counter_ns() - start + delete_times.append(delete_time) + + # Verify architectural guardian + if create_time > 800_000_000 or read_time > 800_000_000 or delete_time > 800_000_000: + logging.warning(f"Operation exceeded 800ms threshold in {load_type} load") + + # Log metrics + logging.info(f"{load_type.upper()} LOAD RESULTS:") + logging.info(f"Create avg: {sum(create_times)/len(create_times)/1_000_000:.2f}ms") + logging.info(f"Read avg: {sum(read_times)/len(read_times)/1_000_000:.2f}ms") + logging.info(f"Delete avg: {sum(delete_times)/len(delete_times)/1_000_000:.2f}ms") + + # Idle load (single thread) + run_operations(100, "idle") + + # Medium load (10 threads) + threads = [] + for _ in range(10): + t = threading.Thread(target=run_operations, args=(100, "medium")) + threads.append(t) + t.start() + for t in threads: + t.join() + + # Peak load (50 threads) + threads = [] + for _ in range(50): + t = threading.Thread(target=run_operations, args=(100, "peak")) + threads.append(t) + t.start() + for t in threads: + t.join() + + # Verify all operations meet performance targets + assert statistics.median(create_times) / 1_000_000 < 0.8 + assert statistics.median(read_times) / 1_000_000 < 0.8 + assert statistics.median(delete_times) / 1_000_000 < 0.8 + + def test_dispatcher_throughput(self, sample_task): + """Benchmark dispatcher task processing throughput""" + dispatcher = TaskDispatcher() + dispatcher._process_task = MagicMock(return_value=True) + + # Add 1000 tasks + for i in range(1000): + task = Task( + id=f"task-{i}", + payload={}, + requester="system", + priority=1, + metadata={"resource": "tasks", "action": "execute"} + ) + dispatcher.queue.add_task(task) + + # Benchmark processing + start = time.perf_counter_ns() + dispatcher.dispatch() + duration = (time.perf_counter_ns() - start) / 1_000_000_000 # Convert to seconds + + # Calculate throughput (tasks/second) + throughput = 1000 / duration + assert throughput > 100 # Target: 100 tasks/second + + def test_load_conditions(self, sample_task): + """Test performance under different load conditions""" + dispatcher = TaskDispatcher() + dispatcher._process_task = MagicMock(return_value=True) + + def simulate_load(iterations): + """Simulate task processing load""" + for i in range(iterations): + task = Task( + id=f"load-task-{i}", + payload={}, + requester="system", + priority=1, + metadata={"resource": "tasks", "action": "execute"} + ) + dispatcher.queue.add_task(task) + + start = time.perf_counter_ns() + dispatcher.dispatch() + return (time.perf_counter_ns() - start) / 1_000_000 # ms + + # Idle load (single task) + idle_time = simulate_load(1) + logging.info(f"Idle load processing time: {idle_time:.2f}ms") + + # Medium load (100 tasks) + medium_time = simulate_load(100) + logging.info(f"Medium load processing time: {medium_time:.2f}ms") + + # Peak load (1000 tasks) + peak_time = simulate_load(1000) + logging.info(f"Peak load processing time: {peak_time:.2f}ms") + + # Verify architectural guardian + assert peak_time < 800 # ≤800ms for 1000 tasks \ No newline at end of file