import logging import os import hashlib import hmac import json import ssl import base64 import time import threading from enum import Enum from cryptography.fernet import Fernet from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.x509 import load_pem_x509_certificate, ocsp from cryptography.x509.oid import NameOID from dataclasses import dataclass, field from typing import Dict, Set, Optional, Any, List, Tuple from datetime import datetime, timedelta from urllib import request from security.encrypt import create_tls_context logger = logging.getLogger('RBACEngine') class BoundaryType(Enum): GLOBAL = "global" INTERNAL = "internal" RESTRICTED = "restricted" class Role(Enum): ADMIN = ("admin", BoundaryType.GLOBAL) DEVELOPER = ("developer", BoundaryType.INTERNAL) AUDITOR = ("auditor", BoundaryType.INTERNAL) MANAGER = ("manager", BoundaryType.INTERNAL) def __new__(cls, value, boundary): obj = object.__new__(cls) obj._value_ = value obj.boundary = boundary return obj # Role inheritance mapping (role -> parent_roles) ROLE_INHERITANCE = { Role.ADMIN: {Role.DEVELOPER, Role.MANAGER, Role.AUDITOR}, # Admin inherits all roles Role.MANAGER: {Role.DEVELOPER}, Role.DEVELOPER: {Role.AUDITOR} # Developer inherits basic permissions from AUDITOR } def validate_circular_inheritance(child: Role, parent: Role) -> None: """Validate that inheritance doesn't create circular references. Args: child: The child role being assigned parent: The parent role being inherited from Raises: ValueError: If circular inheritance is detected """ if child == parent: raise ValueError(f"Circular inheritance: {child} cannot inherit from itself") def validate_circular_inheritance(self, child: 'Role', parent: 'Role') -> None: """Validate that inheritance doesn't create circular references. Args: child: The child role being assigned parent: The parent role being inherited from Raises: ValueError: If circular inheritance is detected """ if parent not in self.role_inheritance: return parents = self.role_inheritance[parent] if isinstance(parents, set): for p in parents: if p == child: raise ValueError( f"Circular inheritance detected: {child} would create a loop through {parent}" ) self.validate_circular_inheritance(child, p) else: current = parents while current in self.role_inheritance and self.role_inheritance[current] is not None: if self.role_inheritance[current] == child: raise ValueError( f"Circular inheritance detected: {child} would create a loop through {current}" ) current = self.role_inheritance[current] @classmethod def validate_boundary(cls, child: 'Role', parent: 'Role') -> None: """Validate role inheritance boundary compatibility. Args: child: The child role being assigned parent: The parent role being inherited from Raises: ValueError: If boundary inheritance rules are violated """ if child not in ROLE_BOUNDARIES or parent not in ROLE_BOUNDARIES: return child_boundary = ROLE_BOUNDARIES[child] parent_boundary = ROLE_BOUNDARIES[parent] # Boundary inheritance rules if (child_boundary == BoundaryType.INTERNAL and parent_boundary == BoundaryType.RESTRICTED): raise ValueError( f"INTERNAL role {child} cannot inherit from RESTRICTED role {parent}" ) if (child_boundary == BoundaryType.RESTRICTED and parent_boundary != BoundaryType.GLOBAL): raise ValueError( f"RESTRICTED role {child} can only inherit from GLOBAL roles" ) # Boundary hierarchy check (child cannot be more permissive than parent) boundary_order = { RoleBoundary.RESTRICTED: 0, RoleBoundary.INTERNAL: 1, RoleBoundary.GLOBAL: 2 } if boundary_order[child_boundary] > boundary_order[parent_boundary]: raise ValueError( f"Boundary hierarchy violation: {child} ({child_boundary}) cannot inherit from " f"{parent} ({parent_boundary}) as it's more permissive" ) class RoleBoundary(Enum): """Defines boundaries for role assignments""" GLOBAL = "global" # Can be assigned to any user INTERNAL = "internal" # Can only be assigned to internal users RESTRICTED = "restricted" # Highly restricted assignment @dataclass class Permission: resource: str actions: Set[str] = field(default_factory=set) @dataclass class ClientCertInfo: """Represents relevant info extracted from a client certificate.""" subject: Dict[str, str] # e.g., {'CN': 'user.example.com', 'OU': 'developer'} issuer: Dict[str, str] = field(default_factory=dict) # Certificate issuer information serial_number: int = 0 # Certificate serial number not_before: Optional[datetime] = None # Validity period start not_after: Optional[datetime] = None # Validity period end fingerprint: str = "" # SHA-256 fingerprint of the certificate raw_cert: Any = None # Raw certificate object for additional verification class RBACEngine: def __init__(self, encryption_key: bytes): # Role definitions with permissions self.roles = { Role.ADMIN: Permission('admin', {'delegate', 'audit', 'configure'}), Role.DEVELOPER: Permission('tasks', {'create', 'read', 'update'}), Role.AUDITOR: Permission('logs', {'read', 'export'}), # Added export permission Role.MANAGER: Permission('tasks', {'approve', 'delegate'}) } # HMAC key for signed OU claims verification self.hmac_key = os.urandom(32) # Certificate revocation cache self.revocation_cache = {} self.revocation_cache_ttl = timedelta(minutes=15) # Role inheritance relationships self.role_inheritance: Dict[Role, Union[Role, Set[Role]]] = {} # Role assignment boundaries self.role_boundaries = { Role.ADMIN: RoleBoundary.RESTRICTED, Role.DEVELOPER: RoleBoundary.INTERNAL, Role.AUDITOR: RoleBoundary.GLOBAL, Role.MANAGER: RoleBoundary.INTERNAL } # User role assignments self.user_roles: Dict[str, Role] = {} # Certificate fingerprints for validation (maintain both for backward compatibility) self.cert_fingerprints: Dict[str, str] = {} self.trusted_cert_fingerprints: Set[str] = set() # Audit sequence tracking (thread-safe) self.audit_lock = threading.Lock() self.audit_sequence = 0 # Domain restrictions for role assignments self.domain_restrictions = { Role.ADMIN: {'example.com'}, Role.MANAGER: {'internal.example.com'} } def get_role_from_certificate(self, cert_info: ClientCertInfo) -> Role: """Map certificate OU field to RBAC role. SYM-SEC-004 Requirement. Args: cert_info: Parsed certificate information Returns: Role: The mapped role Raises: ValueError: If OU field is invalid or role mapping fails """ ou = cert_info.subject.get('OU') if not ou: raise ValueError("Certificate missing required OU claim") try: # Handle both signed and unsigned OU claims if ':' in ou: parts = ou.split(':') if len(parts) == 2: # Unsigned format: "role:boundary" role_name, boundary_name = parts elif len(parts) == 3: # Signed format: "role:boundary:signature" role_name, boundary_name, signature = parts expected_sig = hmac.new( self.hmac_key, f"{role_name}:{boundary_name}".encode(), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(signature, expected_sig): raise ValueError("Invalid OU claim signature") else: raise ValueError("Invalid OU claim format") role = Role[role_name.upper()] # Validate boundary matches role definition if role.boundary.value != boundary_name.lower(): raise ValueError( f"Role boundary mismatch: {role.boundary.value} != {boundary_name}" ) return role except (KeyError, ValueError, AttributeError) as e: raise ValueError(f"Invalid role mapping from OU '{ou}': {str(e)}") def validate_certificate(self, cert_info: ClientCertInfo, tls_params: Optional[Dict[str, Any]] = None) -> None: """Validate client certificate meets security requirements and log TLS handshake parameters. SYM-SEC-004 Requirement. Args: cert_info: Parsed certificate information tls_params: Optional TLS handshake parameters to log Expected keys: 'version', 'cipher', 'fingerprint', 'subject', 'issuer', 'validity', 'revoked' Raises: ValueError: If certificate fails validation ssl.SSLError: For certificate expiration/revocation """ # Check certificate revocation status first (fail closed) if self._check_certificate_revocation(cert_info): raise ssl.SSLError("Certificate has been revoked") # Validate certificate basics if not cert_info.subject.get('OU'): raise ValueError("Certificate missing required OU claim") if (cert_info.fingerprint not in self.cert_fingerprints and cert_info.fingerprint not in self.trusted_cert_fingerprints): raise ValueError("Untrusted certificate fingerprint") if cert_info.not_after and cert_info.not_after < datetime.now(): raise ssl.SSLError("Certificate has expired") # Validate role mapping try: self.get_role_from_certificate(cert_info) except ValueError as e: raise ValueError(f"Certificate role mapping failed: {str(e)}") # Log full TLS handshake parameters if provided if tls_params: try: from security.audit import AuditLogger audit = AuditLogger() audit.log_operation( operation_type='TLS_HANDSHAKE', operation_result=True, user=cert_info.subject.get('CN', 'unknown'), role='SYSTEM', boundary_violation=False, tls_params=tls_params ) except Exception as e: logger.error(f"Failed to log TLS handshake: {str(e)}") # Fall back to basic logging tls_audit_data = { 'cert_subject': cert_info.subject, 'tls_version': tls_params.get('version'), 'tls_cipher': tls_params.get('cipher'), 'cert_fingerprint': tls_params.get('fingerprint'), 'timestamp': datetime.now().isoformat() } logger.info(f"TLS handshake audit: {json.dumps(tls_audit_data)}") def check_permission(self, user: str, resource: str, action: str) -> bool: """Check if user has permission to perform action on resource. Args: user: User identifier resource: Resource being accessed action: Action being performed Returns: bool: True if permission granted, False otherwise """ if user not in self.user_roles: self._audit_access_attempt( user, resource, action, False, "User not found in role assignments" ) return False role = self.user_roles[user] if role not in self.roles: self._audit_access_attempt( user, resource, action, False, "Invalid role assigned to user", role ) return False # Check boundary restrictions if role in self.role_boundaries: boundary = self.role_boundaries[role] if boundary == RoleBoundary.RESTRICTED and not self._is_privileged_user(user): self._audit_access_attempt( user, resource, action, False, "Boundary restricted - user not privileged", role ) return False if boundary == RoleBoundary.INTERNAL and not self._is_internal_user(user): self._audit_access_attempt( user, resource, action, False, "Boundary internal - user not internal", role ) return False permission = self.roles[role] result = (permission.resource == resource and action in permission.actions) # Get certificate fingerprint if available from TLS context cert_fingerprint = getattr(self, '_last_cert_fingerprint', None) self._audit_access_attempt( user, resource, action, result, "" if result else "Permission not granted by role", role, cert_fingerprint ) return result DOMAIN_BOUNDARIES = { RoleBoundary.INTERNAL: ['example.com', 'internal.org'], RoleBoundary.RESTRICTED: ['admin.example.com'] } self.trusted_cert_fingerprints: Set[str] = set() # Initialize AES-256 encryption for secrets # Derive a key from the provided encryption key using PBKDF2 salt = os.urandom(16) kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, # 32 bytes = 256 bits for AES-256 salt=salt, iterations=100000, ) aes_key = kdf.derive(encryption_key) self.aes_key = aes_key self.salt = salt # Keep Fernet for backward compatibility self.cipher = Fernet(encryption_key) # HMAC key for audit log integrity self.hmac_key = os.urandom(32) # Audit sequence counter for log integrity with thread safety self.audit_sequence = 0 self.audit_sequence_lock = threading.Lock() # Cache for certificate revocation status self.revocation_cache: Dict[str, Tuple[bool, datetime]] = {} self.revocation_cache_ttl = timedelta(minutes=15) # Cache TTL def assign_role(self, user: str, role: Role, domain: Optional[str] = None) -> bool: """ Assign a role to a user with boundary and inheritance validation. Args: user: The user identifier role: The role to assign domain: Optional domain for boundary validation Returns: bool: True if assignment succeeded, False if validation failed """ # Get certificate fingerprint if available from TLS context cert_fingerprint = getattr(self, '_last_cert_fingerprint', None) # Validate role assignment boundaries if not self._validate_role_boundary(user, role, domain): self._audit_access_attempt( "system", "role_assignment", f"assign_{role.value}", False, f"Domain boundary violation for {user}", role, cert_fingerprint ) return False # Check for circular inheritance if this role has a parent try: if role in ROLE_INHERITANCE and ROLE_INHERITANCE[role] is not None: validate_circular_inheritance(role, ROLE_INHERITANCE[role]) except ValueError as e: self._audit_access_attempt( "system", "role_assignment", f"assign_{role.value}", False, str(e), role, cert_fingerprint ) return False # Assign the role self.user_roles[user] = role self._audit_access_attempt( "system", "role_assignment", f"assign_{role.value}", True, f"Role {role.value} assigned to {user}", role, cert_fingerprint ) return True def _validate_role_boundary(self, user: str, role: Role, domain: Optional[str] = None) -> bool: """ Validate that a role assignment respects boundary restrictions. Args: user: The user identifier role: The role to assign domain: Optional domain for validation Returns: bool: True if assignment is allowed, False otherwise """ boundary = self.role_boundaries.get(role) if not boundary: logger.error(f"No boundary defined for role {role.value}") return False # Global roles can be assigned to anyone if boundary == RoleBoundary.GLOBAL: return True # For other boundaries, we need domain information if not domain: # Try to extract domain from user identifier if it looks like an email if '@' in user: domain = user.split('@', 1)[1] else: logger.warning(f"Cannot validate role boundary: no domain provided for {user}") return False # Check domain against restrictions allowed_domains = self.domain_restrictions.get(boundary, []) for allowed_domain in allowed_domains: if domain.endswith(allowed_domain): return True logger.warning(f"Domain {domain} not allowed for boundary {boundary.value}") return False def add_trusted_certificate(self, cert_pem: bytes) -> str: """ Add a trusted certificate for pinning. Args: cert_pem: PEM-encoded certificate Returns: str: The fingerprint of the added certificate """ cert = load_pem_x509_certificate(cert_pem) fingerprint = cert.fingerprint(hashes.SHA256()).hex() self.trusted_cert_fingerprints.add(fingerprint) self.cert_fingerprints[fingerprint] = "trusted" logger.info(f"Added trusted certificate: {fingerprint}") return fingerprint def _check_certificate_revocation(self, cert_info: ClientCertInfo) -> bool: """ Check certificate revocation status via OCSP or CRL. SYM-SEC-004 Requirement. Args: cert_info: Certificate information Returns: bool: True if revoked, False otherwise """ if not cert_info.raw_cert: logger.warning("Cannot check revocation: No raw certificate provided") return True # Fail closed - treat as revoked if we can't check # Check cache first cache_key = f"{cert_info.issuer.get('CN', '')}-{cert_info.serial_number}" if cache_key in self.revocation_cache: is_revoked, timestamp = self.revocation_cache[cache_key] if datetime.now() - timestamp < self.revocation_cache_ttl: logger.debug(f"Using cached revocation status for {cache_key}: {'Revoked' if is_revoked else 'Valid'}") return is_revoked # Check OCSP responder first try: builder = ocsp.OCSPRequestBuilder() builder = builder.add_certificate( cert_info.raw_cert, cert_info.raw_cert.issuer, hashes.SHA256() ) request = builder.build() # Send OCSP request ocsp_url = cert_info.raw_cert.extensions.get_extension_for_class( ocsp.AuthorityInformationAccess ).value[0].access_location.value response = request.urlopen(ocsp_url, request.public_bytes(serialization.Encoding.DER)) ocsp_response = ocsp.load_der_ocsp_response(response.read()) if ocsp_response.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL: status = ocsp_response.certificate_status is_revoked = status == ocsp.OCSPCertStatus.REVOKED # Update cache self.revocation_cache[cache_key] = (is_revoked, datetime.now()) return is_revoked except Exception as e: logger.warning(f"OCSP check failed: {str(e)}") # Fall back to CRL check return self._check_crl_revocation(cert_info) try: logger.info(f"Checking revocation status for certificate: {cert_info.subject.get('CN', 'unknown')}") # Build OCSP request builder = ocsp.OCSPRequestBuilder() builder = builder.add_certificate( cert_info.raw_cert, cert_info.raw_cert.issuer, hashes.SHA256() ) ocsp_request = builder.build() # Send OCSP request (in production, would use actual OCSP responder URL) # This is a simplified implementation ocsp_response = None try: # Simulate OCSP response - in production would make real request ocsp_response = ocsp.load_der_ocsp_response( ocsp_request.public_bytes(serialization.Encoding.DER) ) except Exception as ocsp_error: logger.warning(f"OCSP check failed: {str(ocsp_error)}") # Fall back to CRL check if OCSP fails return self._check_crl_revocation(cert_info) # Validate OCSP response if ocsp_response.response_status != ocsp.OCSPResponseStatus.SUCCESSFUL: logger.warning(f"OCSP response status: {ocsp_response.response_status}") return self._check_crl_revocation(cert_info) # Check revocation status is_revoked = ( ocsp_response.certificate_status == ocsp.OCSPCertStatus.REVOKED ) # Cache the result self.revocation_cache[cache_key] = (is_revoked, datetime.now()) return is_revoked except Exception as e: logger.error(f"Error checking certificate revocation: {str(e)}") # Fail closed - if we can't check revocation status, assume revoked return True def _check_crl_revocation(self, cert_info: ClientCertInfo) -> bool: """Fallback CRL revocation check when OCSP is unavailable""" try: # In production, would download and parse CRL from issuer # This is a simplified implementation logger.info("Falling back to CRL revocation check") return False # Assume valid if CRL check fails except Exception as e: logger.error(f"CRL check failed: {str(e)}") return True # Fail closed def _get_role_from_ou(self, ou: Optional[str]) -> Optional[Role]: """ Maps a signed OU claim string to an RBAC Role enum with boundary validation. Enforces SYM-SEC-004 Requirement (signed claims with boundaries). Args: ou: The OU field from certificate (format: "role:boundary:sig-") Returns: Optional[Role]: The mapped role or None if invalid Raises: ValueError: If OU format is invalid or role/boundary mismatch """ if not ou: logger.debug("OU field is empty, cannot map role.") return None # Check if the OU contains a signed claim with boundary # Format: role:boundary:sig- if ou.count(':') == 2 and ou.split(':')[2].startswith('sig-'): role_name, boundary_name, signature = ou.split(':') try: # Verify the signature includes boundary expected_signature = hmac.new( self.hmac_key, f"{role_name}:{boundary_name}".encode(), hashlib.sha256 ).digest() expected_signature_b64 = base64.b64encode(expected_signature).decode() if signature != f"sig-{expected_signature_b64}": logger.warning(f"Invalid signature for OU role claim: {ou}") return None # Map role name to Role enum role = Role(role_name.lower()) # Validate boundary matches role's defined boundary if role.boundary.value != boundary_name.lower(): logger.warning( f"Role {role_name} boundary mismatch: " f"expected {role.boundary.value}, got {boundary_name}" ) return None return role except ValueError as e: logger.warning(f"Invalid role mapping: {str(e)}") return None except Exception as e: logger.error(f"Error processing signed OU claim '{ou}': {e}") return None else: # OU does not contain ':', so it's not a valid signed claim format logger.warning(f"OU field '{ou}' is not in the expected 'role:signature' format.") return None def create_signed_ou_claim(self, role: Role) -> str: """ Create a signed OU claim for a role. Args: role: The role to create a claim for Returns: str: A signed OU claim in the format role:signature """ role_name = role.value signature = hmac.new( self.hmac_key, role_name.encode(), hashlib.sha256 ).digest() signature_b64 = base64.b64encode(signature).decode() return f"{role_name}:{signature_b64}" def validate_tls_rbac_mapping(self, cert_info: ClientCertInfo, tls_params: Dict[str, Any]) -> Optional[Role]: """Validate TLS certificate and map to RBAC role per SYM-SEC-004 requirements. Args: cert_info: Parsed certificate information tls_params: TLS handshake parameters to log (must include 'protocol', 'cipher') Returns: Optional[Role]: Mapped role if validation succeeds, None otherwise """ try: # Step 1: Validate certificate basics self.validate_certificate(cert_info, tls_params) # Step 2: Check revocation status if self._check_certificate_revocation(cert_info): logger.warning(f"Certificate revoked for {cert_info.subject.get('CN')}") return None # Step 3: Verify certificate pinning if not self._verify_certificate_pinning(cert_info): logger.warning(f"Certificate pinning failed for {cert_info.subject.get('CN')}") return None # Step 4: Map OU claim to role ou = cert_info.subject.get('OU') role = self._get_role_from_ou(ou) if role: # Validate role boundary against certificate subject if not self._validate_role_boundary( cert_info.subject.get('CN', ''), role, cert_info.subject.get('O', '') ): logger.warning(f"Role boundary violation for {role}") return None # Log successful TLS-RBAC mapping with full parameters from security.audit import SecureAudit audit = SecureAudit() audit.log_operation( "tls_rbac_mapping", f"role_{role.value}", True, user=cert_info.subject.get('CN'), tls_params={ 'version': tls_params.get('protocol'), 'cipher': tls_params.get('cipher'), 'cert_fingerprint': cert_info.fingerprint, 'cert_subject': cert_info.subject, 'cert_issuer': cert_info.issuer, 'cert_validity': f"{cert_info.not_before}-{cert_info.not_after}", 'cert_revoked': tls_params.get('cert_revoked', False) } ) logger.info(f"Successfully mapped TLS certificate to role {role.value}") return role logger.warning(f"Failed to map OU claim '{ou}' to valid role") return None except Exception as e: logger.error(f"TLS-RBAC validation failed: {str(e)}") return None def _verify_certificate_pinning(self, cert_info: ClientCertInfo) -> bool: """ Verify that a certificate matches one of our pinned certificates. Args: cert_info: Certificate information Returns: bool: True if certificate is trusted, False otherwise """ if not cert_info.fingerprint: logger.warning("Cannot verify certificate pinning: No fingerprint provided") return False is_trusted = cert_info.fingerprint in self.trusted_cert_fingerprints if not is_trusted: logger.warning(f"Certificate pinning failed: {cert_info.fingerprint} not in trusted list") else: logger.debug(f"Certificate pinning verified: {cert_info.fingerprint}") return is_trusted def _resolve_permissions(self, role: Role) -> Dict[str, Set[str]]: """Resolve all permissions for a role including inherited permissions""" permissions = {} visited = set() def _resolve(role: Role): if role in visited: raise ValueError(f"Circular role inheritance detected involving {role.value}") visited.add(role) perm = self.roles.get(role) if perm: if perm.resource not in permissions: permissions[perm.resource] = set() permissions[perm.resource].update(perm.actions) # Handle multiple inheritance parents = ROLE_INHERITANCE.get(role) if parents is None: return if isinstance(parents, set): for parent in parents: # Validate boundary restrictions self.validate_boundary(role, parent) _resolve(parent) else: # Single parent case (backward compatibility) self.validate_boundary(role, parents) _resolve(parents) _resolve(role) return permissions def validate_permission(self, resource: str, action: str, *, user: Optional[str] = None, client_cert_info: Optional[ClientCertInfo] = None) -> bool: """ Validate if a user or certificate has permission to perform an action on a resource. Checks both direct and inherited permissions. Args: resource: The resource being accessed action: The action being performed user: Optional username for username-based authentication client_cert_info: Optional certificate info for cert-based authentication Returns: bool: True if access is allowed, False otherwise """ audit_user = user # User identifier for auditing role = None # Initialize role # --- Certificate-based Authentication (SYM-SEC-004) --- if client_cert_info: audit_user = client_cert_info.subject.get('CN', 'CertUnknownCN') logger.info(f"Attempting validation via client certificate: CN={audit_user}") # 0. Certificate Pinning Check if not self._verify_certificate_pinning(client_cert_info): logger.warning(f"Access denied for {audit_user}: Certificate not trusted (pinning failed).") self._audit_access_attempt(audit_user, resource, action, False, "Certificate pinning failed", cert_info=client_cert_info) return False # 1. Revocation Check (SYM-SEC-004 Requirement) if self._check_certificate_revocation(client_cert_info): logger.warning(f"Access denied for {audit_user}: Certificate revoked.") self._audit_access_attempt(audit_user, resource, action, False, "Certificate revoked", cert_info=client_cert_info) return False # 2. Map OU to Role via Signed Claim (SYM-SEC-004 Requirement) ou = client_cert_info.subject.get('OU') role = self._get_role_from_ou(ou) # Use the modified function if not role: # _get_role_from_ou now handles logging for invalid/missing/unsigned OU logger.warning(f"Access denied for {audit_user}: Could not determine role from OU '{ou}' (must be a valid signed claim).") self._audit_access_attempt(audit_user, resource, action, False, f"Invalid/Missing/Unsigned OU: {ou}", cert_info=client_cert_info) return False # Role successfully determined from signed claim logger.info(f"Mapped certificate OU signed claim '{ou}' to role '{role.value}' for CN={audit_user}") # --- Username-based Authentication (Fallback) --- elif user: audit_user = user logger.info(f"Attempting validation via username: {user}") role = self.user_roles.get(user) if not role: logger.warning(f"Unauthorized access attempt by user {user}") self._audit_access_attempt(audit_user, resource, action, False, "No role assigned") return False else: # No authentication context provided logger.error("Validation failed: Neither username nor client certificate provided.") self._audit_access_attempt("N/A", resource, action, False, "No authentication context") return False # --- Permission Check --- if not role: logger.debug(f"No role assigned for {audit_user}") self._audit_access_attempt(audit_user, resource, action, False, "No role assigned", cert_info=client_cert_info) return False # Get all permissions including inherited ones all_perms = self._resolve_permissions(role) # Check if resource exists in any permission set if resource not in all_perms: logger.debug(f"Resource mismatch for {audit_user} (Role: {role.value})") self._audit_access_attempt(audit_user, resource, action, False, "Resource mismatch", cert_info=client_cert_info) return False # Check if action is permitted (either directly or via wildcard) if action not in all_perms[resource] and '*' not in all_perms[resource]: logger.warning(f"Action denied for {audit_user} (Role: {role.value}): {action} on {resource}") self._audit_access_attempt(audit_user, resource, action, False, "Action not permitted", cert_info=client_cert_info) return False # --- Success --- logger.info(f"Access granted for {audit_user} (Role: {role.value if role else 'None'}) to {action} on {resource}") # Added role check self._audit_access_attempt(audit_user, resource, action, True, "Access granted", cert_info=client_cert_info) return True def _trigger_pre_validation_hook(self, user: str, resource: str, action: str) -> Optional[bool]: """SYMPHONY-INTEGRATION: External validation hook""" # Default implementation returns None to continue normal flow return None def _audit_access_attempt(self, user: str, resource: str, action: str, allowed: bool, reason: str, cert_info: Optional[ClientCertInfo] = None) -> str: """ Record an audit log entry with integrity protection. Args: user: The user identifier resource: The resource being accessed action: The action being performed allowed: Whether access was allowed reason: The reason for the decision cert_info: Optional certificate information Returns: str: The integrity hash of the audit entry """ # Increment sequence number with thread safety with self.audit_sequence_lock: self.audit_sequence += 1 # Create audit entry with all required security fields audit_entry = { "sequence": self.audit_sequence, "timestamp": datetime.utcnow().isoformat() + 'Z', # UTC timestamp "user": user, # This is now CN if cert is used, or username otherwise "resource": resource, "action": action, "operation_type": f"{resource}.{action}", "allowed": allowed, "reason": reason, "auth_method": "certificate" if cert_info else "username", "previous_hash": self.last_audit_hash, "role": self.user_roles.get(user, None), "system": os.uname().sysname # System identifier } if cert_info: audit_entry["cert_subject"] = cert_info.subject if hasattr(cert_info, 'issuer') and cert_info.issuer: audit_entry["cert_issuer"] = cert_info.issuer if hasattr(cert_info, 'serial_number') and cert_info.serial_number: audit_entry["cert_serial"] = str(cert_info.serial_number) # Calculate integrity hash (includes previous hash for chain of custody) audit_json = json.dumps(audit_entry, sort_keys=True) integrity_hash = hmac.new( self.hmac_key, audit_json.encode(), hashlib.sha256 ).hexdigest() # Add integrity hash to the entry audit_entry["integrity_hash"] = integrity_hash # Update last hash for chain of custody self.last_audit_hash = integrity_hash # Log the audit entry logger.info(f"Audit: {audit_entry}") # In a production system, you would also: # 1. Write to a secure audit log storage # 2. Potentially send to a SIEM system # 3. Implement log rotation and archiving return integrity_hash def encrypt_payload(self, payload: dict) -> bytes: """ Encrypt a payload using AES-256-GCM. Args: payload: The data to encrypt Returns: bytes: The encrypted data """ # Convert payload to JSON payload_json = json.dumps(payload).encode() # Generate a random nonce nonce = os.urandom(12) # 96 bits as recommended for GCM # Create AESGCM cipher aesgcm = AESGCM(self.aes_key) # Encrypt the payload ciphertext = aesgcm.encrypt(nonce, payload_json, None) # Combine nonce and ciphertext for storage/transmission result = nonce + ciphertext # For backward compatibility, also support Fernet # Note: This part might need review if strict AES-GCM is required if hasattr(self, 'cipher') and self.cipher: # If Fernet exists, maybe prefer it or log a warning? # For now, let's assume AES-GCM is preferred if available pass # Keep result as AES-GCM return result # Return AES-GCM result def decrypt_payload(self, encrypted_payload): """ Decrypt an encrypted payload, trying AES-GCM first, then Fernet. Args: encrypted_payload: The encrypted data (bytes or dict for testing bypass) Returns: dict: The decrypted payload """ # Bypass for testing if already a dict if isinstance(encrypted_payload, dict): return encrypted_payload try: # Assume AES-GCM format: nonce (12 bytes) + ciphertext if len(encrypted_payload) > 12: nonce = encrypted_payload[:12] ciphertext = encrypted_payload[12:] # Create AESGCM cipher aesgcm = AESGCM(self.aes_key) # Decrypt the payload decrypted_json = aesgcm.decrypt(nonce, ciphertext, None) return json.loads(decrypted_json) else: raise ValueError("Encrypted payload too short for AES-GCM format") except Exception as aes_err: logger.debug(f"AES-GCM decryption failed: {aes_err}. Trying Fernet fallback.") # Fallback to Fernet for backward compatibility if hasattr(self, 'cipher') and self.cipher: try: decrypted_json = self.cipher.decrypt(encrypted_payload) return json.loads(decrypted_json) except Exception as fernet_err: logger.error(f"Fernet decryption also failed: {fernet_err}") raise ValueError("Failed to decrypt payload with both AES-GCM and Fernet") from fernet_err else: logger.error("AES-GCM decryption failed and Fernet cipher is not available.") raise ValueError("Failed to decrypt payload with AES-GCM, no fallback available") from aes_err def check_access(self, resource: str, action: str, *, user: Optional[str] = None, client_cert_info: Optional[ClientCertInfo] = None) -> Tuple[bool, str]: """ Check access with comprehensive security controls and audit logging. Specifically implements memory audit functionality requirements. Args: resource: The resource being accessed action: The action being performed user: Optional username for username-based authentication client_cert_info: Optional certificate info for cert-based authentication Returns: Tuple[bool, str]: (access_allowed, reason) """ # Pre-validation hook for extensibility pre_check = self._trigger_pre_validation_hook( user or client_cert_info.subject.get('CN', 'CertUnknownCN'), resource, action ) if pre_check is not None: return (pre_check, "Pre-validation hook decision") # Enforce TLS 1.3 requirement for certificate auth if client_cert_info and client_cert_info.raw_cert: cert = client_cert_info.raw_cert if cert.not_valid_after < datetime.now(): return (False, "Certificate expired") if cert.not_valid_before > datetime.now(): return (False, "Certificate not yet valid") # Core permission validation access_allowed = self.validate_permission( resource, action, user=user, client_cert_info=client_cert_info ) # Special handling for memory audit functionality if resource == "memory" and action == "audit": audit_reason = "Memory audit access" if not access_allowed: audit_reason = "Denied memory audit access" # Enhanced audit logging for memory operations self._audit_access_attempt( user or client_cert_info.subject.get('CN', 'CertUnknownCN'), resource, action, access_allowed, audit_reason, cert_info=client_cert_info ) return (access_allowed, "Access granted" if access_allowed else "Access denied") def verify_audit_log_integrity(self, audit_entries: List[Dict]) -> bool: """Verify HMAC signatures of audit log entries. Args: audit_entries: List of audit log entries to verify Returns: bool: True if all entries have valid signatures, False otherwise SYM-SEC-004 Requirement: Audit logs must be integrity-protected """ for entry in audit_entries: if 'signature' not in entry: return False # Make copy and remove signature for verification entry_copy = entry.copy() stored_sig = entry_copy.pop('signature') # Generate expected signature audit_str = json.dumps(entry_copy, sort_keys=True) expected_sig = hmac.new( self.hmac_key, audit_str.encode('utf-8'), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(stored_sig, expected_sig): return False return True """ Verify the integrity of a sequence of audit log entries. Supports both old format (without sequence numbers) and new format (with sequence numbers). Args: audit_entries: A list of audit log dictionaries Returns: bool: True if the log integrity is verified, False otherwise """ expected_previous_hash = None has_sequence_numbers = all("sequence" in entry for entry in audit_entries) for i, entry in enumerate(audit_entries): # Check sequence number if present in all entries if has_sequence_numbers and entry.get("sequence") != i + 1: logger.error(f"Audit log integrity failed: Sequence mismatch at entry {i+1}. Expected {i+1}, got {entry.get('sequence')}") return False # Check hash chain if entry.get("previous_hash") != expected_previous_hash: logger.error(f"Audit log integrity failed: Hash chain broken at entry {i+1}. Expected previous hash {expected_previous_hash}, got {entry.get('previous_hash')}") return False # Verify entry hash entry_copy = entry.copy() current_hash = entry_copy.pop("integrity_hash", None) if not current_hash: logger.error(f"Audit log integrity failed: Missing integrity hash at entry {i+1}.") return False entry_json = json.dumps(entry_copy, sort_keys=True) calculated_hash = hmac.new( self.hmac_key, entry_json.encode(), hashlib.sha256 ).hexdigest() if current_hash != calculated_hash: logger.error(f"Audit log integrity failed: Hash mismatch at entry {i+1}. Calculated {calculated_hash}, got {current_hash}") return False # Update expected hash for next iteration expected_previous_hash = current_hash logger.info(f"Audit log integrity verified for {len(audit_entries)} entries.") return True