ai-agent/security/memory.py

79 lines
No EOL
2.8 KiB
Python

from abc import ABC, abstractmethod
from typing import Optional, Union
from security.rbac_engine import validate_permission
from security.encrypt import AES256GCM
import logging
from datetime import datetime
class MemoryInterface(ABC):
"""Abstract base class for encrypted memory operations"""
def __init__(self):
self.encryptor = AES256GCM()
self.logger = logging.getLogger('memory_interface')
@abstractmethod
def create(self, key: str, value: bytes, user: str) -> bool:
"""Encrypt and store value with key"""
self._log_operation('create', key, user)
if not validate_permission('memory', 'write', user=user):
raise PermissionError('Access denied')
encrypted = self.encryptor.encrypt(value)
return self._create_impl(key, encrypted)
@abstractmethod
def _create_impl(self, key: str, encrypted: bytes) -> bool:
"""Implementation-specific create logic"""
pass
@abstractmethod
def read(self, key: str, user: str) -> Optional[bytes]:
"""Retrieve and decrypt value for key"""
self._log_operation('read', key, user)
if not validate_permission('memory', 'read', user=user):
raise PermissionError('Access denied')
encrypted = self._read_impl(key)
if encrypted is None:
return None
return self.encryptor.decrypt(encrypted)
@abstractmethod
def _read_impl(self, key: str) -> Optional[bytes]:
"""Implementation-specific read logic"""
pass
@abstractmethod
def update(self, key: str, value: bytes, user: str) -> bool:
"""Update encrypted value for existing key"""
self._log_operation('update', key, user)
if not validate_permission('memory', 'write', user=user):
raise PermissionError('Access denied')
encrypted = self.encryptor.encrypt(value)
return self._update_impl(key, encrypted)
@abstractmethod
def _update_impl(self, key: str, encrypted: bytes) -> bool:
"""Implementation-specific update logic"""
pass
@abstractmethod
def delete(self, key: str, user: str) -> bool:
"""Remove key and encrypted value"""
self._log_operation('delete', key, user)
if not validate_permission('memory', 'delete', user=user):
raise PermissionError('Access denied')
return self._delete_impl(key)
@abstractmethod
def _delete_impl(self, key: str) -> bool:
"""Implementation-specific delete logic"""
pass
def _log_operation(self, op_type: str, key: str, user: str):
"""Log memory operation for auditing"""
self.logger.info(
f"{datetime.utcnow().isoformat()} | "
f"Operation: {op_type} | "
f"Key: {hash(key)} | "
f"User: {user}"
)