from abc import ABC, abstractmethod from typing import Optional, Union from security.rbac_engine import validate_permission from security.encrypt import AES256GCM import logging from datetime import datetime class MemoryInterface(ABC): """Abstract base class for encrypted memory operations""" def __init__(self): self.encryptor = AES256GCM() self.logger = logging.getLogger('memory_interface') @abstractmethod def create(self, key: str, value: bytes, user: str) -> bool: """Encrypt and store value with key""" self._log_operation('create', key, user) if not validate_permission('memory', 'write', user=user): raise PermissionError('Access denied') encrypted = self.encryptor.encrypt(value) return self._create_impl(key, encrypted) @abstractmethod def _create_impl(self, key: str, encrypted: bytes) -> bool: """Implementation-specific create logic""" pass @abstractmethod def read(self, key: str, user: str) -> Optional[bytes]: """Retrieve and decrypt value for key""" self._log_operation('read', key, user) if not validate_permission('memory', 'read', user=user): raise PermissionError('Access denied') encrypted = self._read_impl(key) if encrypted is None: return None return self.encryptor.decrypt(encrypted) @abstractmethod def _read_impl(self, key: str) -> Optional[bytes]: """Implementation-specific read logic""" pass @abstractmethod def update(self, key: str, value: bytes, user: str) -> bool: """Update encrypted value for existing key""" self._log_operation('update', key, user) if not validate_permission('memory', 'write', user=user): raise PermissionError('Access denied') encrypted = self.encryptor.encrypt(value) return self._update_impl(key, encrypted) @abstractmethod def _update_impl(self, key: str, encrypted: bytes) -> bool: """Implementation-specific update logic""" pass @abstractmethod def delete(self, key: str, user: str) -> bool: """Remove key and encrypted value""" self._log_operation('delete', key, user) if not validate_permission('memory', 'delete', user=user): raise PermissionError('Access denied') return self._delete_impl(key) @abstractmethod def _delete_impl(self, key: str) -> bool: """Implementation-specific delete logic""" pass def _log_operation(self, op_type: str, key: str, user: str): """Log memory operation for auditing""" self.logger.info( f"{datetime.utcnow().isoformat()} | " f"Operation: {op_type} | " f"Key: {hash(key)} | " f"User: {user}" )