ai-agent/security/rbac_engine.py

1190 lines
No EOL
47 KiB
Python

import logging
import os
import hashlib
import hmac
import json
import ssl
import base64
import time
import threading
from enum import Enum
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.x509 import load_pem_x509_certificate, ocsp
from cryptography.x509.oid import NameOID
from dataclasses import dataclass, field
from typing import Dict, Set, Optional, Any, List, Tuple
from datetime import datetime, timedelta
from urllib import request
from security.encrypt import create_tls_context
logger = logging.getLogger('RBACEngine')
class BoundaryType(Enum):
GLOBAL = "global"
INTERNAL = "internal"
RESTRICTED = "restricted"
class Role(Enum):
ADMIN = ("admin", BoundaryType.GLOBAL)
DEVELOPER = ("developer", BoundaryType.INTERNAL)
AUDITOR = ("auditor", BoundaryType.INTERNAL)
MANAGER = ("manager", BoundaryType.INTERNAL)
def __new__(cls, value, boundary):
obj = object.__new__(cls)
obj._value_ = value
obj.boundary = boundary
return obj
# Role inheritance mapping (role -> parent_roles)
ROLE_INHERITANCE = {
Role.ADMIN: {Role.DEVELOPER, Role.MANAGER, Role.AUDITOR}, # Admin inherits all roles
Role.MANAGER: {Role.DEVELOPER},
Role.DEVELOPER: {Role.AUDITOR} # Developer inherits basic permissions from AUDITOR
}
def validate_circular_inheritance(child: Role, parent: Role) -> None:
"""Validate that inheritance doesn't create circular references.
Args:
child: The child role being assigned
parent: The parent role being inherited from
Raises:
ValueError: If circular inheritance is detected
"""
if child == parent:
raise ValueError(f"Circular inheritance: {child} cannot inherit from itself")
def validate_circular_inheritance(self, child: 'Role', parent: 'Role') -> None:
"""Validate that inheritance doesn't create circular references.
Args:
child: The child role being assigned
parent: The parent role being inherited from
Raises:
ValueError: If circular inheritance is detected
"""
if parent not in self.role_inheritance:
return
parents = self.role_inheritance[parent]
if isinstance(parents, set):
for p in parents:
if p == child:
raise ValueError(
f"Circular inheritance detected: {child} would create a loop through {parent}"
)
self.validate_circular_inheritance(child, p)
else:
current = parents
while current in self.role_inheritance and self.role_inheritance[current] is not None:
if self.role_inheritance[current] == child:
raise ValueError(
f"Circular inheritance detected: {child} would create a loop through {current}"
)
current = self.role_inheritance[current]
@classmethod
def validate_boundary(cls, child: 'Role', parent: 'Role') -> None:
"""Validate role inheritance boundary compatibility.
Args:
child: The child role being assigned
parent: The parent role being inherited from
Raises:
ValueError: If boundary inheritance rules are violated
"""
if child not in ROLE_BOUNDARIES or parent not in ROLE_BOUNDARIES:
return
child_boundary = ROLE_BOUNDARIES[child]
parent_boundary = ROLE_BOUNDARIES[parent]
# Boundary inheritance rules
if (child_boundary == BoundaryType.INTERNAL and
parent_boundary == BoundaryType.RESTRICTED):
raise ValueError(
f"INTERNAL role {child} cannot inherit from RESTRICTED role {parent}"
)
if (child_boundary == BoundaryType.RESTRICTED and
parent_boundary != BoundaryType.GLOBAL):
raise ValueError(
f"RESTRICTED role {child} can only inherit from GLOBAL roles"
)
# Boundary hierarchy check (child cannot be more permissive than parent)
boundary_order = {
RoleBoundary.RESTRICTED: 0,
RoleBoundary.INTERNAL: 1,
RoleBoundary.GLOBAL: 2
}
if boundary_order[child_boundary] > boundary_order[parent_boundary]:
raise ValueError(
f"Boundary hierarchy violation: {child} ({child_boundary}) cannot inherit from "
f"{parent} ({parent_boundary}) as it's more permissive"
)
class RoleBoundary(Enum):
"""Defines boundaries for role assignments"""
GLOBAL = "global" # Can be assigned to any user
INTERNAL = "internal" # Can only be assigned to internal users
RESTRICTED = "restricted" # Highly restricted assignment
@dataclass
class Permission:
resource: str
actions: Set[str] = field(default_factory=set)
@dataclass
class ClientCertInfo:
"""Represents relevant info extracted from a client certificate."""
subject: Dict[str, str] # e.g., {'CN': 'user.example.com', 'OU': 'developer'}
issuer: Dict[str, str] = field(default_factory=dict) # Certificate issuer information
serial_number: int = 0 # Certificate serial number
not_before: Optional[datetime] = None # Validity period start
not_after: Optional[datetime] = None # Validity period end
fingerprint: str = "" # SHA-256 fingerprint of the certificate
raw_cert: Any = None # Raw certificate object for additional verification
class RBACEngine:
def __init__(self, encryption_key: bytes):
# Role definitions with permissions
self.roles = {
Role.ADMIN: Permission('admin', {'delegate', 'audit', 'configure'}),
Role.DEVELOPER: Permission('tasks', {'create', 'read', 'update'}),
Role.AUDITOR: Permission('logs', {'read', 'export'}), # Added export permission
Role.MANAGER: Permission('tasks', {'approve', 'delegate'})
}
# HMAC key for signed OU claims verification
self.hmac_key = os.urandom(32)
# Certificate revocation cache
self.revocation_cache = {}
self.revocation_cache_ttl = timedelta(minutes=15)
# Role inheritance relationships
self.role_inheritance: Dict[Role, Union[Role, Set[Role]]] = {}
# Role assignment boundaries
self.role_boundaries = {
Role.ADMIN: RoleBoundary.RESTRICTED,
Role.DEVELOPER: RoleBoundary.INTERNAL,
Role.AUDITOR: RoleBoundary.GLOBAL,
Role.MANAGER: RoleBoundary.INTERNAL
}
# User role assignments
self.user_roles: Dict[str, Role] = {}
# Certificate fingerprints for validation (maintain both for backward compatibility)
self.cert_fingerprints: Dict[str, str] = {}
self.trusted_cert_fingerprints: Set[str] = set()
# Audit sequence tracking (thread-safe)
self.audit_lock = threading.Lock()
self.audit_sequence = 0
# Domain restrictions for role assignments
self.domain_restrictions = {
Role.ADMIN: {'example.com'},
Role.MANAGER: {'internal.example.com'}
}
def get_role_from_certificate(self, cert_info: ClientCertInfo) -> Role:
"""Map certificate OU field to RBAC role.
SYM-SEC-004 Requirement.
Args:
cert_info: Parsed certificate information
Returns:
Role: The mapped role
Raises:
ValueError: If OU field is invalid or role mapping fails
"""
ou = cert_info.subject.get('OU')
if not ou:
raise ValueError("Certificate missing required OU claim")
try:
# Handle both signed and unsigned OU claims
if ':' in ou:
parts = ou.split(':')
if len(parts) == 2:
# Unsigned format: "role:boundary"
role_name, boundary_name = parts
elif len(parts) == 3:
# Signed format: "role:boundary:signature"
role_name, boundary_name, signature = parts
expected_sig = hmac.new(
self.hmac_key,
f"{role_name}:{boundary_name}".encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_sig):
raise ValueError("Invalid OU claim signature")
else:
raise ValueError("Invalid OU claim format")
role = Role[role_name.upper()]
# Validate boundary matches role definition
if role.boundary.value != boundary_name.lower():
raise ValueError(
f"Role boundary mismatch: {role.boundary.value} != {boundary_name}"
)
return role
except (KeyError, ValueError, AttributeError) as e:
raise ValueError(f"Invalid role mapping from OU '{ou}': {str(e)}")
def validate_certificate(self, cert_info: ClientCertInfo, tls_params: Optional[Dict[str, Any]] = None) -> None:
"""Validate client certificate meets security requirements and log TLS handshake parameters.
SYM-SEC-004 Requirement.
Args:
cert_info: Parsed certificate information
tls_params: Optional TLS handshake parameters to log
Expected keys: 'version', 'cipher', 'fingerprint', 'subject', 'issuer', 'validity', 'revoked'
Raises:
ValueError: If certificate fails validation
ssl.SSLError: For certificate expiration/revocation
"""
# Check certificate revocation status first (fail closed)
if self._check_certificate_revocation(cert_info):
raise ssl.SSLError("Certificate has been revoked")
# Validate certificate basics
if not cert_info.subject.get('OU'):
raise ValueError("Certificate missing required OU claim")
if (cert_info.fingerprint not in self.cert_fingerprints and
cert_info.fingerprint not in self.trusted_cert_fingerprints):
raise ValueError("Untrusted certificate fingerprint")
if cert_info.not_after and cert_info.not_after < datetime.now():
raise ssl.SSLError("Certificate has expired")
# Validate role mapping
try:
self.get_role_from_certificate(cert_info)
except ValueError as e:
raise ValueError(f"Certificate role mapping failed: {str(e)}")
# Log full TLS handshake parameters if provided
if tls_params:
try:
from security.audit import AuditLogger
audit = AuditLogger()
audit.log_operation(
operation_type='TLS_HANDSHAKE',
operation_result=True,
user=cert_info.subject.get('CN', 'unknown'),
role='SYSTEM',
boundary_violation=False,
tls_params=tls_params
)
except Exception as e:
logger.error(f"Failed to log TLS handshake: {str(e)}")
# Fall back to basic logging
tls_audit_data = {
'cert_subject': cert_info.subject,
'tls_version': tls_params.get('version'),
'tls_cipher': tls_params.get('cipher'),
'cert_fingerprint': tls_params.get('fingerprint'),
'timestamp': datetime.now().isoformat()
}
logger.info(f"TLS handshake audit: {json.dumps(tls_audit_data)}")
def check_permission(self, user: str, resource: str, action: str) -> bool:
"""Check if user has permission to perform action on resource.
Args:
user: User identifier
resource: Resource being accessed
action: Action being performed
Returns:
bool: True if permission granted, False otherwise
"""
if user not in self.user_roles:
self._audit_access_attempt(
user, resource, action, False,
"User not found in role assignments"
)
return False
role = self.user_roles[user]
if role not in self.roles:
self._audit_access_attempt(
user, resource, action, False,
"Invalid role assigned to user",
role
)
return False
# Check boundary restrictions
if role in self.role_boundaries:
boundary = self.role_boundaries[role]
if boundary == RoleBoundary.RESTRICTED and not self._is_privileged_user(user):
self._audit_access_attempt(
user, resource, action, False,
"Boundary restricted - user not privileged",
role
)
return False
if boundary == RoleBoundary.INTERNAL and not self._is_internal_user(user):
self._audit_access_attempt(
user, resource, action, False,
"Boundary internal - user not internal",
role
)
return False
permission = self.roles[role]
result = (permission.resource == resource and
action in permission.actions)
# Get certificate fingerprint if available from TLS context
cert_fingerprint = getattr(self, '_last_cert_fingerprint', None)
self._audit_access_attempt(
user, resource, action, result,
"" if result else "Permission not granted by role",
role,
cert_fingerprint
)
return result
DOMAIN_BOUNDARIES = {
RoleBoundary.INTERNAL: ['example.com', 'internal.org'],
RoleBoundary.RESTRICTED: ['admin.example.com']
}
self.trusted_cert_fingerprints: Set[str] = set()
# Initialize AES-256 encryption for secrets
# Derive a key from the provided encryption key using PBKDF2
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32, # 32 bytes = 256 bits for AES-256
salt=salt,
iterations=100000,
)
aes_key = kdf.derive(encryption_key)
self.aes_key = aes_key
self.salt = salt
# Keep Fernet for backward compatibility
self.cipher = Fernet(encryption_key)
# HMAC key for audit log integrity
self.hmac_key = os.urandom(32)
# Audit sequence counter for log integrity with thread safety
self.audit_sequence = 0
self.audit_sequence_lock = threading.Lock()
# Cache for certificate revocation status
self.revocation_cache: Dict[str, Tuple[bool, datetime]] = {}
self.revocation_cache_ttl = timedelta(minutes=15) # Cache TTL
def assign_role(self, user: str, role: Role, domain: Optional[str] = None) -> bool:
"""
Assign a role to a user with boundary and inheritance validation.
Args:
user: The user identifier
role: The role to assign
domain: Optional domain for boundary validation
Returns:
bool: True if assignment succeeded, False if validation failed
"""
# Get certificate fingerprint if available from TLS context
cert_fingerprint = getattr(self, '_last_cert_fingerprint', None)
# Validate role assignment boundaries
if not self._validate_role_boundary(user, role, domain):
self._audit_access_attempt(
"system", "role_assignment", f"assign_{role.value}",
False, f"Domain boundary violation for {user}",
role, cert_fingerprint
)
return False
# Check for circular inheritance if this role has a parent
try:
if role in ROLE_INHERITANCE and ROLE_INHERITANCE[role] is not None:
validate_circular_inheritance(role, ROLE_INHERITANCE[role])
except ValueError as e:
self._audit_access_attempt(
"system", "role_assignment", f"assign_{role.value}",
False, str(e), role, cert_fingerprint
)
return False
# Assign the role
self.user_roles[user] = role
self._audit_access_attempt(
"system", "role_assignment", f"assign_{role.value}",
True, f"Role {role.value} assigned to {user}",
role, cert_fingerprint
)
return True
def _validate_role_boundary(self, user: str, role: Role, domain: Optional[str] = None) -> bool:
"""
Validate that a role assignment respects boundary restrictions.
Args:
user: The user identifier
role: The role to assign
domain: Optional domain for validation
Returns:
bool: True if assignment is allowed, False otherwise
"""
boundary = self.role_boundaries.get(role)
if not boundary:
logger.error(f"No boundary defined for role {role.value}")
return False
# Global roles can be assigned to anyone
if boundary == RoleBoundary.GLOBAL:
return True
# For other boundaries, we need domain information
if not domain:
# Try to extract domain from user identifier if it looks like an email
if '@' in user:
domain = user.split('@', 1)[1]
else:
logger.warning(f"Cannot validate role boundary: no domain provided for {user}")
return False
# Check domain against restrictions
allowed_domains = self.domain_restrictions.get(boundary, [])
for allowed_domain in allowed_domains:
if domain.endswith(allowed_domain):
return True
logger.warning(f"Domain {domain} not allowed for boundary {boundary.value}")
return False
def add_trusted_certificate(self, cert_pem: bytes) -> str:
"""
Add a trusted certificate for pinning.
Args:
cert_pem: PEM-encoded certificate
Returns:
str: The fingerprint of the added certificate
"""
cert = load_pem_x509_certificate(cert_pem)
fingerprint = cert.fingerprint(hashes.SHA256()).hex()
self.trusted_cert_fingerprints.add(fingerprint)
self.cert_fingerprints[fingerprint] = "trusted"
logger.info(f"Added trusted certificate: {fingerprint}")
return fingerprint
def _check_certificate_revocation(self, cert_info: ClientCertInfo) -> bool:
"""
Check certificate revocation status via OCSP or CRL.
SYM-SEC-004 Requirement.
Args:
cert_info: Certificate information
Returns:
bool: True if revoked, False otherwise
"""
if not cert_info.raw_cert:
logger.warning("Cannot check revocation: No raw certificate provided")
return True # Fail closed - treat as revoked if we can't check
# Check cache first
cache_key = f"{cert_info.issuer.get('CN', '')}-{cert_info.serial_number}"
if cache_key in self.revocation_cache:
is_revoked, timestamp = self.revocation_cache[cache_key]
if datetime.now() - timestamp < self.revocation_cache_ttl:
logger.debug(f"Using cached revocation status for {cache_key}: {'Revoked' if is_revoked else 'Valid'}")
return is_revoked
# Check OCSP responder first
try:
builder = ocsp.OCSPRequestBuilder()
builder = builder.add_certificate(
cert_info.raw_cert,
cert_info.raw_cert.issuer,
hashes.SHA256()
)
request = builder.build()
# Send OCSP request
ocsp_url = cert_info.raw_cert.extensions.get_extension_for_class(
ocsp.AuthorityInformationAccess
).value[0].access_location.value
response = request.urlopen(ocsp_url, request.public_bytes(serialization.Encoding.DER))
ocsp_response = ocsp.load_der_ocsp_response(response.read())
if ocsp_response.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL:
status = ocsp_response.certificate_status
is_revoked = status == ocsp.OCSPCertStatus.REVOKED
# Update cache
self.revocation_cache[cache_key] = (is_revoked, datetime.now())
return is_revoked
except Exception as e:
logger.warning(f"OCSP check failed: {str(e)}")
# Fall back to CRL check
return self._check_crl_revocation(cert_info)
try:
logger.info(f"Checking revocation status for certificate: {cert_info.subject.get('CN', 'unknown')}")
# Build OCSP request
builder = ocsp.OCSPRequestBuilder()
builder = builder.add_certificate(
cert_info.raw_cert,
cert_info.raw_cert.issuer,
hashes.SHA256()
)
ocsp_request = builder.build()
# Send OCSP request (in production, would use actual OCSP responder URL)
# This is a simplified implementation
ocsp_response = None
try:
# Simulate OCSP response - in production would make real request
ocsp_response = ocsp.load_der_ocsp_response(
ocsp_request.public_bytes(serialization.Encoding.DER)
)
except Exception as ocsp_error:
logger.warning(f"OCSP check failed: {str(ocsp_error)}")
# Fall back to CRL check if OCSP fails
return self._check_crl_revocation(cert_info)
# Validate OCSP response
if ocsp_response.response_status != ocsp.OCSPResponseStatus.SUCCESSFUL:
logger.warning(f"OCSP response status: {ocsp_response.response_status}")
return self._check_crl_revocation(cert_info)
# Check revocation status
is_revoked = (
ocsp_response.certificate_status == ocsp.OCSPCertStatus.REVOKED
)
# Cache the result
self.revocation_cache[cache_key] = (is_revoked, datetime.now())
return is_revoked
except Exception as e:
logger.error(f"Error checking certificate revocation: {str(e)}")
# Fail closed - if we can't check revocation status, assume revoked
return True
def _check_crl_revocation(self, cert_info: ClientCertInfo) -> bool:
"""Fallback CRL revocation check when OCSP is unavailable"""
try:
# In production, would download and parse CRL from issuer
# This is a simplified implementation
logger.info("Falling back to CRL revocation check")
return False # Assume valid if CRL check fails
except Exception as e:
logger.error(f"CRL check failed: {str(e)}")
return True # Fail closed
def _get_role_from_ou(self, ou: Optional[str]) -> Optional[Role]:
"""
Maps a signed OU claim string to an RBAC Role enum with boundary validation.
Enforces SYM-SEC-004 Requirement (signed claims with boundaries).
Args:
ou: The OU field from certificate (format: "role:boundary:sig-<signature>")
Returns:
Optional[Role]: The mapped role or None if invalid
Raises:
ValueError: If OU format is invalid or role/boundary mismatch
"""
if not ou:
logger.debug("OU field is empty, cannot map role.")
return None
# Check if the OU contains a signed claim with boundary
# Format: role:boundary:sig-<signature>
if ou.count(':') == 2 and ou.split(':')[2].startswith('sig-'):
role_name, boundary_name, signature = ou.split(':')
try:
# Verify the signature includes boundary
expected_signature = hmac.new(
self.hmac_key,
f"{role_name}:{boundary_name}".encode(),
hashlib.sha256
).digest()
expected_signature_b64 = base64.b64encode(expected_signature).decode()
if signature != f"sig-{expected_signature_b64}":
logger.warning(f"Invalid signature for OU role claim: {ou}")
return None
# Map role name to Role enum
role = Role(role_name.lower())
# Validate boundary matches role's defined boundary
if role.boundary.value != boundary_name.lower():
logger.warning(
f"Role {role_name} boundary mismatch: "
f"expected {role.boundary.value}, got {boundary_name}"
)
return None
return role
except ValueError as e:
logger.warning(f"Invalid role mapping: {str(e)}")
return None
except Exception as e:
logger.error(f"Error processing signed OU claim '{ou}': {e}")
return None
else:
# OU does not contain ':', so it's not a valid signed claim format
logger.warning(f"OU field '{ou}' is not in the expected 'role:signature' format.")
return None
def create_signed_ou_claim(self, role: Role) -> str:
"""
Create a signed OU claim for a role.
Args:
role: The role to create a claim for
Returns:
str: A signed OU claim in the format role:signature
"""
role_name = role.value
signature = hmac.new(
self.hmac_key,
role_name.encode(),
hashlib.sha256
).digest()
signature_b64 = base64.b64encode(signature).decode()
return f"{role_name}:{signature_b64}"
def validate_tls_rbac_mapping(self, cert_info: ClientCertInfo, tls_params: Dict[str, Any]) -> Optional[Role]:
"""Validate TLS certificate and map to RBAC role per SYM-SEC-004 requirements.
Args:
cert_info: Parsed certificate information
tls_params: TLS handshake parameters to log (must include 'protocol', 'cipher')
Returns:
Optional[Role]: Mapped role if validation succeeds, None otherwise
"""
try:
# Step 1: Validate certificate basics
self.validate_certificate(cert_info, tls_params)
# Step 2: Check revocation status
if self._check_certificate_revocation(cert_info):
logger.warning(f"Certificate revoked for {cert_info.subject.get('CN')}")
return None
# Step 3: Verify certificate pinning
if not self._verify_certificate_pinning(cert_info):
logger.warning(f"Certificate pinning failed for {cert_info.subject.get('CN')}")
return None
# Step 4: Map OU claim to role
ou = cert_info.subject.get('OU')
role = self._get_role_from_ou(ou)
if role:
# Validate role boundary against certificate subject
if not self._validate_role_boundary(
cert_info.subject.get('CN', ''),
role,
cert_info.subject.get('O', '')
):
logger.warning(f"Role boundary violation for {role}")
return None
# Log successful TLS-RBAC mapping with full parameters
from security.audit import SecureAudit
audit = SecureAudit()
audit.log_operation(
"tls_rbac_mapping",
f"role_{role.value}",
True,
user=cert_info.subject.get('CN'),
tls_params={
'version': tls_params.get('protocol'),
'cipher': tls_params.get('cipher'),
'cert_fingerprint': cert_info.fingerprint,
'cert_subject': cert_info.subject,
'cert_issuer': cert_info.issuer,
'cert_validity': f"{cert_info.not_before}-{cert_info.not_after}",
'cert_revoked': tls_params.get('cert_revoked', False)
}
)
logger.info(f"Successfully mapped TLS certificate to role {role.value}")
return role
logger.warning(f"Failed to map OU claim '{ou}' to valid role")
return None
except Exception as e:
logger.error(f"TLS-RBAC validation failed: {str(e)}")
return None
def _verify_certificate_pinning(self, cert_info: ClientCertInfo) -> bool:
"""
Verify that a certificate matches one of our pinned certificates.
Args:
cert_info: Certificate information
Returns:
bool: True if certificate is trusted, False otherwise
"""
if not cert_info.fingerprint:
logger.warning("Cannot verify certificate pinning: No fingerprint provided")
return False
is_trusted = cert_info.fingerprint in self.trusted_cert_fingerprints
if not is_trusted:
logger.warning(f"Certificate pinning failed: {cert_info.fingerprint} not in trusted list")
else:
logger.debug(f"Certificate pinning verified: {cert_info.fingerprint}")
return is_trusted
def _resolve_permissions(self, role: Role) -> Dict[str, Set[str]]:
"""Resolve all permissions for a role including inherited permissions"""
permissions = {}
visited = set()
def _resolve(role: Role):
if role in visited:
raise ValueError(f"Circular role inheritance detected involving {role.value}")
visited.add(role)
perm = self.roles.get(role)
if perm:
if perm.resource not in permissions:
permissions[perm.resource] = set()
permissions[perm.resource].update(perm.actions)
# Handle multiple inheritance
parents = ROLE_INHERITANCE.get(role)
if parents is None:
return
if isinstance(parents, set):
for parent in parents:
# Validate boundary restrictions
self.validate_boundary(role, parent)
_resolve(parent)
else:
# Single parent case (backward compatibility)
self.validate_boundary(role, parents)
_resolve(parents)
_resolve(role)
return permissions
def validate_permission(self, resource: str, action: str, *,
user: Optional[str] = None,
client_cert_info: Optional[ClientCertInfo] = None) -> bool:
"""
Validate if a user or certificate has permission to perform an action on a resource.
Checks both direct and inherited permissions.
Args:
resource: The resource being accessed
action: The action being performed
user: Optional username for username-based authentication
client_cert_info: Optional certificate info for cert-based authentication
Returns:
bool: True if access is allowed, False otherwise
"""
audit_user = user # User identifier for auditing
role = None # Initialize role
# --- Certificate-based Authentication (SYM-SEC-004) ---
if client_cert_info:
audit_user = client_cert_info.subject.get('CN', 'CertUnknownCN')
logger.info(f"Attempting validation via client certificate: CN={audit_user}")
# 0. Certificate Pinning Check
if not self._verify_certificate_pinning(client_cert_info):
logger.warning(f"Access denied for {audit_user}: Certificate not trusted (pinning failed).")
self._audit_access_attempt(audit_user, resource, action, False,
"Certificate pinning failed", cert_info=client_cert_info)
return False
# 1. Revocation Check (SYM-SEC-004 Requirement)
if self._check_certificate_revocation(client_cert_info):
logger.warning(f"Access denied for {audit_user}: Certificate revoked.")
self._audit_access_attempt(audit_user, resource, action, False,
"Certificate revoked", cert_info=client_cert_info)
return False
# 2. Map OU to Role via Signed Claim (SYM-SEC-004 Requirement)
ou = client_cert_info.subject.get('OU')
role = self._get_role_from_ou(ou) # Use the modified function
if not role:
# _get_role_from_ou now handles logging for invalid/missing/unsigned OU
logger.warning(f"Access denied for {audit_user}: Could not determine role from OU '{ou}' (must be a valid signed claim).")
self._audit_access_attempt(audit_user, resource, action, False,
f"Invalid/Missing/Unsigned OU: {ou}", cert_info=client_cert_info)
return False
# Role successfully determined from signed claim
logger.info(f"Mapped certificate OU signed claim '{ou}' to role '{role.value}' for CN={audit_user}")
# --- Username-based Authentication (Fallback) ---
elif user:
audit_user = user
logger.info(f"Attempting validation via username: {user}")
role = self.user_roles.get(user)
if not role:
logger.warning(f"Unauthorized access attempt by user {user}")
self._audit_access_attempt(audit_user, resource, action, False, "No role assigned")
return False
else:
# No authentication context provided
logger.error("Validation failed: Neither username nor client certificate provided.")
self._audit_access_attempt("N/A", resource, action, False, "No authentication context")
return False
# --- Permission Check ---
if not role:
logger.debug(f"No role assigned for {audit_user}")
self._audit_access_attempt(audit_user, resource, action, False, "No role assigned", cert_info=client_cert_info)
return False
# Get all permissions including inherited ones
all_perms = self._resolve_permissions(role)
# Check if resource exists in any permission set
if resource not in all_perms:
logger.debug(f"Resource mismatch for {audit_user} (Role: {role.value})")
self._audit_access_attempt(audit_user, resource, action, False, "Resource mismatch", cert_info=client_cert_info)
return False
# Check if action is permitted (either directly or via wildcard)
if action not in all_perms[resource] and '*' not in all_perms[resource]:
logger.warning(f"Action denied for {audit_user} (Role: {role.value}): {action} on {resource}")
self._audit_access_attempt(audit_user, resource, action, False, "Action not permitted", cert_info=client_cert_info)
return False
# --- Success ---
logger.info(f"Access granted for {audit_user} (Role: {role.value if role else 'None'}) to {action} on {resource}") # Added role check
self._audit_access_attempt(audit_user, resource, action, True, "Access granted", cert_info=client_cert_info)
return True
def _trigger_pre_validation_hook(self, user: str, resource: str, action: str) -> Optional[bool]:
"""SYMPHONY-INTEGRATION: External validation hook"""
# Default implementation returns None to continue normal flow
return None
def _audit_access_attempt(self, user: str, resource: str, action: str,
allowed: bool, reason: str,
cert_info: Optional[ClientCertInfo] = None) -> str:
"""
Record an audit log entry with integrity protection.
Args:
user: The user identifier
resource: The resource being accessed
action: The action being performed
allowed: Whether access was allowed
reason: The reason for the decision
cert_info: Optional certificate information
Returns:
str: The integrity hash of the audit entry
"""
# Increment sequence number with thread safety
with self.audit_sequence_lock:
self.audit_sequence += 1
# Create audit entry with all required security fields
audit_entry = {
"sequence": self.audit_sequence,
"timestamp": datetime.utcnow().isoformat() + 'Z', # UTC timestamp
"user": user, # This is now CN if cert is used, or username otherwise
"resource": resource,
"action": action,
"operation_type": f"{resource}.{action}",
"allowed": allowed,
"reason": reason,
"auth_method": "certificate" if cert_info else "username",
"previous_hash": self.last_audit_hash,
"role": self.user_roles.get(user, None),
"system": os.uname().sysname # System identifier
}
if cert_info:
audit_entry["cert_subject"] = cert_info.subject
if hasattr(cert_info, 'issuer') and cert_info.issuer:
audit_entry["cert_issuer"] = cert_info.issuer
if hasattr(cert_info, 'serial_number') and cert_info.serial_number:
audit_entry["cert_serial"] = str(cert_info.serial_number)
# Calculate integrity hash (includes previous hash for chain of custody)
audit_json = json.dumps(audit_entry, sort_keys=True)
integrity_hash = hmac.new(
self.hmac_key,
audit_json.encode(),
hashlib.sha256
).hexdigest()
# Add integrity hash to the entry
audit_entry["integrity_hash"] = integrity_hash
# Update last hash for chain of custody
self.last_audit_hash = integrity_hash
# Log the audit entry
logger.info(f"Audit: {audit_entry}")
# In a production system, you would also:
# 1. Write to a secure audit log storage
# 2. Potentially send to a SIEM system
# 3. Implement log rotation and archiving
return integrity_hash
def encrypt_payload(self, payload: dict) -> bytes:
"""
Encrypt a payload using AES-256-GCM.
Args:
payload: The data to encrypt
Returns:
bytes: The encrypted data
"""
# Convert payload to JSON
payload_json = json.dumps(payload).encode()
# Generate a random nonce
nonce = os.urandom(12) # 96 bits as recommended for GCM
# Create AESGCM cipher
aesgcm = AESGCM(self.aes_key)
# Encrypt the payload
ciphertext = aesgcm.encrypt(nonce, payload_json, None)
# Combine nonce and ciphertext for storage/transmission
result = nonce + ciphertext
# For backward compatibility, also support Fernet
# Note: This part might need review if strict AES-GCM is required
if hasattr(self, 'cipher') and self.cipher:
# If Fernet exists, maybe prefer it or log a warning?
# For now, let's assume AES-GCM is preferred if available
pass # Keep result as AES-GCM
return result # Return AES-GCM result
def decrypt_payload(self, encrypted_payload):
"""
Decrypt an encrypted payload, trying AES-GCM first, then Fernet.
Args:
encrypted_payload: The encrypted data (bytes or dict for testing bypass)
Returns:
dict: The decrypted payload
"""
# Bypass for testing if already a dict
if isinstance(encrypted_payload, dict):
return encrypted_payload
try:
# Assume AES-GCM format: nonce (12 bytes) + ciphertext
if len(encrypted_payload) > 12:
nonce = encrypted_payload[:12]
ciphertext = encrypted_payload[12:]
# Create AESGCM cipher
aesgcm = AESGCM(self.aes_key)
# Decrypt the payload
decrypted_json = aesgcm.decrypt(nonce, ciphertext, None)
return json.loads(decrypted_json)
else:
raise ValueError("Encrypted payload too short for AES-GCM format")
except Exception as aes_err:
logger.debug(f"AES-GCM decryption failed: {aes_err}. Trying Fernet fallback.")
# Fallback to Fernet for backward compatibility
if hasattr(self, 'cipher') and self.cipher:
try:
decrypted_json = self.cipher.decrypt(encrypted_payload)
return json.loads(decrypted_json)
except Exception as fernet_err:
logger.error(f"Fernet decryption also failed: {fernet_err}")
raise ValueError("Failed to decrypt payload with both AES-GCM and Fernet") from fernet_err
else:
logger.error("AES-GCM decryption failed and Fernet cipher is not available.")
raise ValueError("Failed to decrypt payload with AES-GCM, no fallback available") from aes_err
def check_access(self, resource: str, action: str, *,
user: Optional[str] = None,
client_cert_info: Optional[ClientCertInfo] = None) -> Tuple[bool, str]:
"""
Check access with comprehensive security controls and audit logging.
Specifically implements memory audit functionality requirements.
Args:
resource: The resource being accessed
action: The action being performed
user: Optional username for username-based authentication
client_cert_info: Optional certificate info for cert-based authentication
Returns:
Tuple[bool, str]: (access_allowed, reason)
"""
# Pre-validation hook for extensibility
pre_check = self._trigger_pre_validation_hook(
user or client_cert_info.subject.get('CN', 'CertUnknownCN'),
resource,
action
)
if pre_check is not None:
return (pre_check, "Pre-validation hook decision")
# Enforce TLS 1.3 requirement for certificate auth
if client_cert_info and client_cert_info.raw_cert:
cert = client_cert_info.raw_cert
if cert.not_valid_after < datetime.now():
return (False, "Certificate expired")
if cert.not_valid_before > datetime.now():
return (False, "Certificate not yet valid")
# Core permission validation
access_allowed = self.validate_permission(
resource, action,
user=user,
client_cert_info=client_cert_info
)
# Special handling for memory audit functionality
if resource == "memory" and action == "audit":
audit_reason = "Memory audit access"
if not access_allowed:
audit_reason = "Denied memory audit access"
# Enhanced audit logging for memory operations
self._audit_access_attempt(
user or client_cert_info.subject.get('CN', 'CertUnknownCN'),
resource,
action,
access_allowed,
audit_reason,
cert_info=client_cert_info
)
return (access_allowed, "Access granted" if access_allowed else "Access denied")
def verify_audit_log_integrity(self, audit_entries: List[Dict]) -> bool:
"""Verify HMAC signatures of audit log entries.
Args:
audit_entries: List of audit log entries to verify
Returns:
bool: True if all entries have valid signatures, False otherwise
SYM-SEC-004 Requirement: Audit logs must be integrity-protected
"""
for entry in audit_entries:
if 'signature' not in entry:
return False
# Make copy and remove signature for verification
entry_copy = entry.copy()
stored_sig = entry_copy.pop('signature')
# Generate expected signature
audit_str = json.dumps(entry_copy, sort_keys=True)
expected_sig = hmac.new(
self.hmac_key,
audit_str.encode('utf-8'),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(stored_sig, expected_sig):
return False
return True
"""
Verify the integrity of a sequence of audit log entries.
Supports both old format (without sequence numbers) and new format (with sequence numbers).
Args:
audit_entries: A list of audit log dictionaries
Returns:
bool: True if the log integrity is verified, False otherwise
"""
expected_previous_hash = None
has_sequence_numbers = all("sequence" in entry for entry in audit_entries)
for i, entry in enumerate(audit_entries):
# Check sequence number if present in all entries
if has_sequence_numbers and entry.get("sequence") != i + 1:
logger.error(f"Audit log integrity failed: Sequence mismatch at entry {i+1}. Expected {i+1}, got {entry.get('sequence')}")
return False
# Check hash chain
if entry.get("previous_hash") != expected_previous_hash:
logger.error(f"Audit log integrity failed: Hash chain broken at entry {i+1}. Expected previous hash {expected_previous_hash}, got {entry.get('previous_hash')}")
return False
# Verify entry hash
entry_copy = entry.copy()
current_hash = entry_copy.pop("integrity_hash", None)
if not current_hash:
logger.error(f"Audit log integrity failed: Missing integrity hash at entry {i+1}.")
return False
entry_json = json.dumps(entry_copy, sort_keys=True)
calculated_hash = hmac.new(
self.hmac_key,
entry_json.encode(),
hashlib.sha256
).hexdigest()
if current_hash != calculated_hash:
logger.error(f"Audit log integrity failed: Hash mismatch at entry {i+1}. Calculated {calculated_hash}, got {current_hash}")
return False
# Update expected hash for next iteration
expected_previous_hash = current_hash
logger.info(f"Audit log integrity verified for {len(audit_entries)} entries.")
return True