ai-agent/tests/security/test_rbac_engine.py

1181 lines
No EOL
53 KiB
Python

import unittest
import time
import json
import base64
import hmac
import hashlib
from unittest.mock import patch, MagicMock
from security.rbac_engine import RBACEngine, Role, ClientCertInfo, RoleBoundary, Permission
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
class TestRBACEngine(unittest.TestCase):
def setUp(self):
self.encryption_key = Fernet.generate_key()
self.rbac = RBACEngine(self.encryption_key)
# Assign roles with domain information for boundary validation
self.rbac.assign_role("admin_user@admin.example.com", Role.ADMIN, "admin.example.com")
self.rbac.assign_role("dev_user@example.com", Role.DEVELOPER, "example.com")
self.rbac.assign_role("audit_user@external.org", Role.AUDITOR, "external.org")
# Create and add test certificates to trusted list
self.cert_fingerprints = {}
# Create test certificates for each role
for cn, ou in [
("cert_admin", "admin"),
("cert_dev", "developer"),
("cert_audit", "auditor"),
("cert_manager", "manager"),
("cert_invalid", "unknown_group"),
("cert_no_ou", None),
("cert_revoked", "developer"),
("audit_cert_dev", "developer"),
("audit_cert_manager", "manager"),
("audit_cert_invalid", "bad_ou")
]:
# In a real test, we would create actual certificates
# For this test, we'll just create fingerprints and add them to trusted list
fingerprint = f"test_fingerprint_{cn}"
self.cert_fingerprints[cn] = fingerprint
self.rbac.trusted_cert_fingerprints.add(fingerprint)
def test_role_assignments(self):
self.assertEqual(self.rbac.user_roles["admin_user@admin.example.com"], Role.ADMIN)
self.assertEqual(self.rbac.user_roles["dev_user@example.com"], Role.DEVELOPER)
self.assertEqual(self.rbac.user_roles["audit_user@external.org"], Role.AUDITOR)
def test_admin_permissions_correct(self):
# Test allowed actions on the correct resource
self.assertTrue(self.rbac.validate_permission(user="admin_user@admin.example.com", resource="admin", action="delegate"))
self.assertTrue(self.rbac.validate_permission(user="admin_user@admin.example.com", resource="admin", action="audit"))
self.assertTrue(self.rbac.validate_permission(user="admin_user@admin.example.com", resource="admin", action="configure"))
# Test denied actions on the correct resource
self.assertFalse(self.rbac.validate_permission(user="admin_user@admin.example.com", resource="admin", action="read"))
# Test denied access to other resources
self.assertFalse(self.rbac.validate_permission(user="admin_user@admin.example.com", resource="tasks", action="create"))
self.assertFalse(self.rbac.validate_permission(user="admin_user@admin.example.com", resource="logs", action="read"))
def test_developer_permissions(self):
self.assertTrue(self.rbac.validate_permission(user="dev_user@example.com", resource="tasks", action="create"))
self.assertFalse(self.rbac.validate_permission(user="dev_user@example.com", resource="tasks", action="delete"))
# Developer inherits read from AUDITOR but not export
self.assertTrue(self.rbac.validate_permission(user="dev_user@example.com", resource="logs", action="read"))
self.assertFalse(self.rbac.validate_permission(user="dev_user@example.com", resource="logs", action="export"))
def test_manager_permissions(self):
"""Test manager-specific permissions and restrictions"""
# Assign manager role
self.rbac.assign_role("manager_user@example.com", Role.MANAGER, "example.com")
# Test allowed actions
self.assertTrue(self.rbac.validate_permission(user="manager_user@example.com", resource="tasks", action="approve"))
self.assertTrue(self.rbac.validate_permission(user="manager_user@example.com", resource="tasks", action="delegate"))
# Test denied actions (manager doesn't inherit create/update by default)
self.assertFalse(self.rbac.validate_permission(user="manager_user@example.com", resource="tasks", action="create"))
self.assertFalse(self.rbac.validate_permission(user="manager_user@example.com", resource="admin", action="configure"))
# Test boundary enforcement (INTERNAL)
self.assertTrue(self.rbac.validate_permission(user="manager_user@example.com", resource="tasks", action="approve"))
self.assertFalse(self.rbac.validate_permission(user="manager_user@external.org", resource="tasks", action="approve"))
def test_manager_inheritance(self):
"""Test that manager inherits from developer"""
# Assign manager role
self.rbac.assign_role("manager_user@example.com", Role.MANAGER, "example.com")
# Verify manager inherits developer permissions
self.assertTrue(self.rbac.validate_permission(user="manager_user@example.com", resource="tasks", action="read"))
self.assertTrue(self.rbac.validate_permission(user="manager_user@example.com", resource="tasks", action="update"))
# Verify boundary still enforced
self.assertFalse(self.rbac.validate_permission(user="manager_user@example.com", resource="logs", action="read"))
def test_role_inheritance_username(self):
"""Test role inheritance works with username authentication"""
# Setup inheritance: ADMIN inherits from DEVELOPER, MANAGER and AUDITOR
self.rbac.role_inheritance[Role.ADMIN] = [Role.DEVELOPER, Role.MANAGER, Role.AUDITOR]
# Verify admin inherits all permissions
self.assertTrue(self.rbac.validate_permission(user="admin_user@admin.example.com", resource="tasks", action="create"))
self.assertTrue(self.rbac.validate_permission(user="admin_user@admin.example.com", resource="tasks", action="approve"))
# Verify boundary still enforced - admin can't access logs even though auditor can
self.assertFalse(self.rbac.validate_permission(user="admin_user@admin.example.com", resource="logs", action="read"))
# Verify parent_role consistency
self.assertEqual(Role.ADMIN.parent_role, None)
self.assertEqual(Role.DEVELOPER.parent_role, None)
def test_role_inheritance_certificate(self):
"""Test role inheritance works with certificate authentication"""
# Setup inheritance: ADMIN inherits from DEVELOPER
self.rbac.role_inheritance[Role.ADMIN] = [Role.DEVELOPER]
# Create admin certificate info
cert_info = ClientCertInfo(
subject={'CN': 'cert_admin', 'OU': 'admin'},
fingerprint=self.cert_fingerprints['cert_admin'],
raw_cert=object()
)
# Verify admin inherits developer permissions via cert
self.assertTrue(self.rbac.validate_permission(resource="tasks", action="create", client_cert_info=cert_info))
def test_auditor_permissions(self):
self.assertTrue(self.rbac.validate_permission(user="audit_user@external.org", resource="logs", action="read"))
self.assertTrue(self.rbac.validate_permission(user="audit_user@external.org", resource="logs", action="export"))
self.assertFalse(self.rbac.validate_permission(user="audit_user@external.org", resource="tasks", action="create"))
def test_circular_inheritance_prevention(self):
"""Test that circular role inheritance is prevented"""
# Setup circular inheritance: ADMIN -> DEVELOPER -> MANAGER -> ADMIN
self.rbac.role_inheritance[Role.ADMIN] = [Role.DEVELOPER]
self.rbac.role_inheritance[Role.DEVELOPER] = [Role.MANAGER]
with self.assertRaises(ValueError) as context:
self.rbac.role_inheritance[Role.MANAGER] = [Role.ADMIN]
self.assertIn("Circular role inheritance detected", str(context.exception))
def test_boundary_restrictions_with_inheritance(self):
"""Test that boundary restrictions are enforced with role inheritance"""
# Setup inheritance: ADMIN inherits from DEVELOPER
self.rbac.role_inheritance[Role.ADMIN] = [Role.DEVELOPER]
# Assign admin role with different boundary
self.rbac.assign_role("admin2@restricted.org", Role.ADMIN, "restricted.org")
# Verify admin inherits developer permissions but boundaries still enforced
self.assertTrue(self.rbac.validate_permission(
user="admin2@restricted.org",
resource="tasks",
action="create"))
# Verify boundary still enforced - can't access resources outside boundary
self.assertFalse(self.rbac.validate_permission(
user="admin2@restricted.org",
resource="tasks",
action="create",
resource_domain="example.com")) # Different domain than assigned
def test_parent_role_with_inheritance(self):
"""Test parent_role works alongside role_inheritance"""
# Setup parent_role relationship
Role.ADMIN.parent_role = Role.MANAGER
# Setup role_inheritance
self.rbac.role_inheritance[Role.MANAGER] = [Role.DEVELOPER]
# Assign admin role
self.rbac.assign_role("admin3@example.com", Role.ADMIN, "example.com")
# Verify admin inherits from manager which inherits from developer
self.assertTrue(self.rbac.validate_permission(
user="admin3@example.com",
resource="tasks",
action="create"))
# Verify boundary still enforced
self.assertFalse(self.rbac.validate_permission(
user="admin3@example.com",
resource="logs",
action="read"))
def test_multiple_inheritance_chains(self):
"""Test complex inheritance chains with boundaries"""
# Setup multiple inheritance paths
self.rbac.role_inheritance[Role.ADMIN] = [Role.DEVELOPER, Role.MANAGER]
self.rbac.role_inheritance[Role.MANAGER] = [Role.AUDITOR]
# Assign admin role with boundary
self.rbac.assign_role("admin4@multi.org", Role.ADMIN, "multi.org")
# Verify all inherited permissions
self.assertTrue(self.rbac.validate_permission(
user="admin4@multi.org",
resource="tasks",
action="create")) # From DEVELOPER
self.assertTrue(self.rbac.validate_permission(
user="admin4@multi.org",
resource="tasks",
action="approve")) # From MANAGER
self.assertTrue(self.rbac.validate_permission(
user="admin4@multi.org",
resource="logs",
action="read")) # From AUDITOR via MANAGER
# Verify boundary restrictions
self.assertFalse(self.rbac.validate_permission(
user="admin4@multi.org",
resource="tasks",
action="create",
resource_domain="other.org"))
def test_parent_role_inheritance(self):
"""Test parent_role inheritance path"""
# Create roles with parent_role relationships
admin = Role("admin")
dev = Role("developer", parent_role=admin)
user = Role("user", parent_role=dev)
# Verify inheritance chain
self.assertEqual(user.parent_role.name, "developer")
self.assertEqual(dev.parent_role.name, "admin")
self.assertIsNone(admin.parent_role)
def test_role_inheritance_boundary(self):
"""Test inheritance respects role boundaries"""
# Setup inheritance: ADMIN inherits from DEVELOPER and MANAGER
self.rbac.role_inheritance[Role.ADMIN] = [Role.DEVELOPER, Role.MANAGER]
# Verify admin inherits all permissions but boundaries still enforced
self.assertTrue(self.rbac.validate_permission(
user="admin_user@admin.example.com",
resource="tasks",
action="create"))
self.assertTrue(self.rbac.validate_permission(
user="admin_user@admin.example.com",
resource="tasks",
action="approve"))
# Verify boundary still enforced - admin can't access logs even though auditor can
self.assertFalse(self.rbac.validate_permission(
user="admin_user@admin.example.com",
resource="logs",
action="read"))
# Verify boundary enforcement with parent_role
dev = Role("developer", parent_role=Role("admin"))
self.assertFalse(self.rbac.validate_permission(
user="dev_user@example.com",
resource="admin",
action="configure"))
def test_encryption_decryption(self):
test_payload = {"key": "value"}
encrypted = self.rbac.encrypt_payload(test_payload)
decrypted = self.rbac.decrypt_payload(encrypted)
self.assertEqual(decrypted, test_payload)
def test_encryption_decryption_aes_gcm(self):
"""Test encryption/decryption using AES-GCM."""
test_rbac = RBACEngine(Fernet.generate_key())
# Remove Fernet cipher to force AES-GCM
test_rbac.cipher = None
test_payload = {"key": "value"}
encrypted = test_rbac.encrypt_payload(test_payload)
decrypted = test_rbac.decrypt_payload(encrypted)
self.assertEqual(decrypted, test_payload)
def test_decryption_aes_gcm_exception(self):
"""Test AES-GCM decryption exception handling."""
test_rbac = RBACEngine(Fernet.generate_key())
# Create invalid encrypted data that will cause AES-GCM to fail
invalid_encrypted = b'invalid_encrypted_data'
# Mock the Fernet decrypt method to return a valid result
test_rbac.cipher.decrypt = MagicMock(return_value=b'{"key": "value"}')
# Decrypt should fall back to Fernet
decrypted = test_rbac.decrypt_payload(invalid_encrypted)
self.assertEqual(decrypted, {"key": "value"})
test_rbac.cipher.decrypt.assert_called_once_with(invalid_encrypted)
def test_unauthorized_access_username(self):
self.assertFalse(self.rbac.validate_permission(user="unknown_user@example.com", resource="tasks", action="read"))
def test_unauthorized_access_no_context(self):
"""Test validation fails if neither user nor cert is provided."""
self.assertFalse(self.rbac.validate_permission(resource="tasks", action="read"))
def test_pre_validation_hook_override(self):
"""Test SYMPHONY-INTEGRATION-POINT: Pre-validation hook override"""
# Create new instance to avoid test isolation issues
test_rbac = RBACEngine(Fernet.generate_key())
test_rbac.assign_role("hook_test_user@admin.example.com", Role.ADMIN, "admin.example.com")
# Override hook to block all access
def block_all_hook(user, resource, action):
return False
test_rbac._trigger_pre_validation_hook = block_all_hook
self.assertFalse(test_rbac.validate_permission(user="hook_test_user@admin.example.com", resource="tasks", action="read"))
def test_pre_validation_hook_default(self):
"""Test default pre-validation hook behavior."""
test_rbac = RBACEngine(Fernet.generate_key())
# Call the default hook implementation directly
result = test_rbac._trigger_pre_validation_hook("user", "resource", "action")
# Default implementation should return None
self.assertIsNone(result)
@patch('security.rbac_engine.logger')
def test_audit_logging_username(self, mock_logger):
"""Test SYMPHONY-INTEGRATION-POINT: Audit logging callback"""
test_rbac = RBACEngine(Fernet.generate_key())
test_rbac.assign_role("audit_test_user@example.com", Role.DEVELOPER, "example.com")
# Test denied access (resource mismatch)
test_rbac.validate_permission(user="audit_test_user@example.com", resource="logs", action="read")
# Test allowed access
test_rbac.validate_permission(user="audit_test_user@example.com", resource="tasks", action="read")
# Test denied access (action mismatch)
test_rbac.validate_permission(user="audit_test_user@example.com", resource="tasks", action="delete")
# Verify audit entries were logged
# We expect at least 6 log entries (1 assign + 3 validations with internal logs)
self.assertGreaterEqual(mock_logger.info.call_count, 6,
"Expected at least 6 info log calls (assign + 3 validations with internal logs)")
# More specific checks on logged reasons
log_messages = [str(call.args[0]) for call in mock_logger.info.call_args_list] # Convert to str
# Check assignment log
self.assertTrue(any(f"Assigned {Role.DEVELOPER.value} role to audit_test_user@example.com" in msg for msg in log_messages))
# Check denied log entry for resource mismatch
denied_resource_log_found = False
for msg in log_messages:
if "Audit:" in msg and "'allowed': False" in msg and "'reason': 'Resource mismatch'" in msg and "'user': 'audit_test_user@example.com'" in msg and "'resource': 'logs'" in msg and "'auth_method': 'username'" in msg:
denied_resource_log_found = True
break
self.assertTrue(denied_resource_log_found, "Missing specific denied audit log (Resource mismatch)")
# Check allowed log entry
allowed_log_found = False
for msg in log_messages:
if "Audit:" in msg and "'allowed': True" in msg and "'reason': 'Access granted'" in msg and "'user': 'audit_test_user@example.com'" in msg and "'resource': 'tasks'" in msg and "'auth_method': 'username'" in msg:
allowed_log_found = True
break
self.assertTrue(allowed_log_found, "Missing specific allowed audit log")
# Check denied log entry for action mismatch
denied_action_log_found = False
for msg in log_messages:
if "Audit:" in msg and "'allowed': False" in msg and "'reason': 'Action not permitted'" in msg and "'user': 'audit_test_user@example.com'" in msg and "'resource': 'tasks'" in msg and "'action': 'delete'" in msg and "'auth_method': 'username'" in msg:
denied_action_log_found = True
break
self.assertTrue(denied_action_log_found, "Missing specific denied audit log (Action mismatch)")
def test_decrypt_payload_dict_bypass(self):
"""Test that decrypt_payload bypasses decryption for dict input (test helper)."""
test_payload = {"test": "data"}
# Pass a dict directly, should return it unchanged without decryption error
decrypted = self.rbac.decrypt_payload(test_payload)
self.assertEqual(decrypted, test_payload)
self.assertIs(decrypted, test_payload) # Check it's the same object
# --- TLS Client Certificate RBAC Integration Tests ---
def test_cert_validation_admin_allowed(self):
"""Test successful validation using cert with Admin OU."""
cert_info = ClientCertInfo(
subject={'CN': 'cert_admin', 'OU': 'admin'},
fingerprint=self.cert_fingerprints['cert_admin'],
raw_cert=object() # Provide a dummy object for raw_cert
)
self.assertTrue(self.rbac.validate_permission(resource="admin", action="delegate", client_cert_info=cert_info))
def test_cert_validation_developer_allowed(self):
"""Test successful validation using cert with Developer OU."""
cert_info = ClientCertInfo(
subject={'CN': 'cert_dev', 'OU': 'developer'},
fingerprint=self.cert_fingerprints['cert_dev'],
raw_cert=object() # Provide a dummy object for raw_cert
)
self.assertTrue(self.rbac.validate_permission(resource="tasks", action="create", client_cert_info=cert_info))
def test_cert_validation_manager_allowed(self):
"""Test successful validation using cert with Manager OU."""
cert_info = ClientCertInfo(
subject={'CN': 'cert_manager', 'OU': 'manager'},
fingerprint=self.cert_fingerprints['cert_manager'],
raw_cert=object() # Provide a dummy object for raw_cert
)
self.assertTrue(self.rbac.validate_permission(resource="tasks", action="approve", client_cert_info=cert_info))
self.assertTrue(self.rbac.validate_permission(resource="tasks", action="delegate", client_cert_info=cert_info))
# Verify boundary enforcement
self.assertFalse(self.rbac.validate_permission(resource="admin", action="configure", client_cert_info=cert_info))
def test_cert_validation_auditor_allowed(self):
"""Test successful validation using cert with Auditor OU."""
cert_info = ClientCertInfo(
subject={'CN': 'cert_audit', 'OU': 'auditor'},
fingerprint=self.cert_fingerprints['cert_audit'],
raw_cert=object() # Provide a dummy object for raw_cert
)
self.assertTrue(self.rbac.validate_permission(resource="logs", action="read", client_cert_info=cert_info))
def test_cert_validation_denied_wrong_resource(self):
"""Test cert validation fails for correct role but wrong resource."""
cert_info = ClientCertInfo(
subject={'CN': 'cert_dev', 'OU': 'developer'},
fingerprint=self.cert_fingerprints['cert_dev'],
raw_cert=object() # Provide a dummy object for raw_cert
)
self.assertFalse(self.rbac.validate_permission(resource="admin", action="delegate", client_cert_info=cert_info))
def test_cert_validation_denied_wrong_action(self):
"""Test cert validation fails for correct role/resource but wrong action."""
cert_info = ClientCertInfo(
subject={'CN': 'cert_dev', 'OU': 'developer'},
fingerprint=self.cert_fingerprints['cert_dev'],
raw_cert=object() # Provide a dummy object for raw_cert
)
self.assertFalse(self.rbac.validate_permission(resource="tasks", action="delete", client_cert_info=cert_info))
def test_cert_validation_invalid_ou(self):
"""Test cert validation fails if OU doesn't map to a role."""
cert_info = ClientCertInfo(
subject={'CN': 'cert_invalid', 'OU': 'unknown_group'},
fingerprint=self.cert_fingerprints['cert_invalid'],
raw_cert=object() # Provide a dummy object for raw_cert
)
self.assertFalse(self.rbac.validate_permission(resource="tasks", action="create", client_cert_info=cert_info))
def test_cert_validation_missing_ou(self):
"""Test cert validation fails if OU is missing."""
cert_info = ClientCertInfo(
subject={'CN': 'cert_no_ou'}, # OU is missing
fingerprint=self.cert_fingerprints['cert_no_ou'],
raw_cert=object() # Provide a dummy object for raw_cert
)
self.assertFalse(self.rbac.validate_permission(resource="tasks", action="create", client_cert_info=cert_info))
# Override the _check_certificate_revocation method to always return False (not revoked)
# This is a duplicate setUp method - removing it as it's already defined at the beginning of the class
def test_cert_validation_revoked(self):
"""Test cert validation fails if certificate is revoked."""
# Create a new RBAC engine instance for this test
test_rbac = RBACEngine(Fernet.generate_key())
# Add the certificate to the trusted list
test_rbac.trusted_cert_fingerprints.add(self.cert_fingerprints['cert_revoked'])
# Override the certificate revocation check to return True (revoked)
test_rbac._check_certificate_revocation = lambda cert_info: True
cert_info = ClientCertInfo(
subject={'CN': 'cert_revoked', 'OU': 'developer'},
fingerprint=self.cert_fingerprints['cert_revoked'],
raw_cert=object() # Provide a dummy object for raw_cert
)
# This should fail because the certificate is revoked
self.assertFalse(test_rbac.validate_permission(resource="tasks", action="create", client_cert_info=cert_info))
def test_cert_validation_no_raw_cert(self):
"""Test certificate revocation check with no raw certificate."""
test_rbac = RBACEngine(Fernet.generate_key())
# Add the certificate to the trusted list
fingerprint = "test_fingerprint_no_raw_cert"
test_rbac.trusted_cert_fingerprints.add(fingerprint)
# Create certificate info with no raw certificate
cert_info = ClientCertInfo(
subject={'CN': 'cert_no_raw', 'OU': 'developer'},
fingerprint=fingerprint,
raw_cert=None # No raw certificate
)
# This should fail because there's no raw certificate for revocation check
self.assertFalse(test_rbac.validate_permission(resource="tasks", action="create", client_cert_info=cert_info))
def test_check_access_memory_audit(self):
"""Test memory audit functionality through check_access()."""
# Test allowed memory audit access
result = self.rbac.check_access(resource="memory", action="audit",
user="audit_user@external.org")
self.assertTrue(result[0], "Memory audit should be allowed for auditors")
self.assertEqual(result[1], "Access granted")
# Test denied memory audit access for non-auditors
result = self.rbac.check_access(resource="memory", action="audit",
user="dev_user@example.com")
self.assertFalse(result[0], "Memory audit should be denied for non-auditors")
self.assertEqual(result[1], "Access denied")
def test_check_access_cert_validation(self):
"""Test certificate validation through check_access()."""
# Create valid certificate info
cert_info = ClientCertInfo(
subject={'CN': 'cert_audit', 'OU': 'auditor'},
fingerprint=self.cert_fingerprints['cert_audit'],
raw_cert=object()
)
# Test allowed access via cert
result = self.rbac.check_access(resource="logs", action="read",
client_cert_info=cert_info)
self.assertTrue(result[0], "Log read should be allowed for auditor certs")
self.assertEqual(result[1], "Access granted")
# Test expired certificate
expired_cert = MagicMock()
expired_cert.not_valid_after = datetime(2020, 1, 1)
cert_info.raw_cert = expired_cert
result = self.rbac.check_access(resource="logs", action="read",
client_cert_info=cert_info)
self.assertFalse(result[0], "Should reject expired certificates")
self.assertEqual(result[1], "Certificate expired")
def test_check_access_pre_validation_hook(self):
"""Test pre-validation hook integration in check_access()."""
test_rbac = RBACEngine(Fernet.generate_key())
# Override hook to block all access
def block_all_hook(user, resource, action):
return False
test_rbac._trigger_pre_validation_hook = block_all_hook
result = test_rbac.check_access(resource="tasks", action="read",
user="test_user@example.com")
self.assertFalse(result[0], "Pre-validation hook should block access")
self.assertEqual(result[1], "Pre-validation hook decision")
def test_verify_audit_log_integrity_empty(self):
"""Test verification of empty audit log."""
test_rbac = RBACEngine(Fernet.generate_key())
# Verify empty audit log
self.assertTrue(test_rbac.verify_audit_log_integrity([]))
@patch('security.rbac_engine.logger')
def test_audit_logging_cert_auth(self, mock_logger):
"""Test audit logging specifically for certificate authentication."""
test_rbac = RBACEngine(Fernet.generate_key()) # Use separate instance
# Override the certificate revocation check to always return False (not revoked)
test_rbac._check_certificate_revocation = lambda cert_info: False
# Add certificates to trusted list
test_rbac.trusted_cert_fingerprints.add("test_fingerprint_audit_cert_dev")
test_rbac.trusted_cert_fingerprints.add("test_fingerprint_audit_cert_invalid")
# Allowed access via cert
cert_info_dev = ClientCertInfo(
subject={'CN': 'audit_cert_dev', 'OU': 'developer'},
fingerprint="test_fingerprint_audit_cert_dev",
raw_cert=object() # Provide a dummy object for raw_cert
)
test_rbac.validate_permission(resource="tasks", action="read", client_cert_info=cert_info_dev)
# Denied access via cert (invalid OU)
cert_info_invalid = ClientCertInfo(
subject={'CN': 'audit_cert_invalid', 'OU': 'bad_ou'},
fingerprint="test_fingerprint_audit_cert_invalid",
raw_cert=object() # Provide a dummy object for raw_cert
)
test_rbac.validate_permission(resource="tasks", action="read", client_cert_info=cert_info_invalid)
@patch('security.rbac_engine.logger')
def test_audit_logging_cert_auth_with_metadata(self, mock_logger):
"""Test audit logging with certificate metadata."""
test_rbac = RBACEngine(Fernet.generate_key())
# Override the certificate revocation check to always return False (not revoked)
test_rbac._check_certificate_revocation = lambda cert_info: False
# Add certificate to trusted list
test_rbac.trusted_cert_fingerprints.add("test_fingerprint_cert_metadata")
# Create certificate info with issuer and serial number
cert_info = ClientCertInfo(
subject={'CN': 'cert_metadata', 'OU': 'developer'},
fingerprint="test_fingerprint_cert_metadata",
raw_cert=object(),
issuer={'CN': 'Test CA', 'O': 'Test Organization'},
serial_number=12345
)
# Validate permission to trigger audit logging
test_rbac.validate_permission(resource="tasks", action="read", client_cert_info=cert_info)
# Verify audit log contains certificate metadata
log_messages = [str(call.args[0]) for call in mock_logger.info.call_args_list]
cert_metadata_log_found = False
for msg in log_messages:
if "Audit:" in msg and "'cert_issuer'" in msg and "'cert_serial': '12345'" in msg:
cert_metadata_log_found = True
break
self.assertTrue(cert_metadata_log_found, "Missing certificate metadata in audit log")
# Check allowed log entry for cert auth - look for key parts only
allowed_cert_log_found = False
for msg in log_messages:
if "Audit:" in msg and "'allowed': True" in msg and "'reason': 'Access granted'" in msg and "'user': 'cert_metadata'" in msg and "'resource': 'tasks'" in msg and "'auth_method': 'certificate'" in msg:
allowed_cert_log_found = True
break
self.assertTrue(allowed_cert_log_found, "Missing specific allowed audit log for cert auth")
# --- Unit Tests for All RBAC Functions ---
def test_validate_role_boundary_global(self):
"""Test that global roles can be assigned to any user."""
test_rbac = RBACEngine(Fernet.generate_key())
test_rbac.role_boundaries[Role.AUDITOR] = RoleBoundary.GLOBAL
# Should allow assignment to any domain
self.assertTrue(test_rbac._validate_role_boundary("user@example.com", Role.AUDITOR, "example.com"))
self.assertTrue(test_rbac._validate_role_boundary("user@external.org", Role.AUDITOR, "external.org"))
self.assertTrue(test_rbac._validate_role_boundary("user@random.net", Role.AUDITOR, "random.net"))
def test_validate_role_boundary_internal(self):
"""Test that internal roles can only be assigned to internal domains."""
test_rbac = RBACEngine(Fernet.generate_key())
test_rbac.role_boundaries[Role.DEVELOPER] = RoleBoundary.INTERNAL
test_rbac.domain_restrictions[RoleBoundary.INTERNAL] = ['example.com', 'internal.org']
# Should allow assignment to internal domains
self.assertTrue(test_rbac._validate_role_boundary("user@example.com", Role.DEVELOPER, "example.com"))
self.assertTrue(test_rbac._validate_role_boundary("user@sub.example.com", Role.DEVELOPER, "sub.example.com"))
self.assertTrue(test_rbac._validate_role_boundary("user@internal.org", Role.DEVELOPER, "internal.org"))
# Should deny assignment to external domains
self.assertFalse(test_rbac._validate_role_boundary("user@external.org", Role.DEVELOPER, "external.org"))
self.assertFalse(test_rbac._validate_role_boundary("user@random.net", Role.DEVELOPER, "random.net"))
def test_validate_role_boundary_restricted(self):
"""Test that restricted roles can only be assigned to specific domains."""
test_rbac = RBACEngine(Fernet.generate_key())
test_rbac.role_boundaries[Role.ADMIN] = RoleBoundary.RESTRICTED
test_rbac.domain_restrictions[RoleBoundary.RESTRICTED] = ['admin.example.com']
# Should allow assignment to restricted domains
self.assertTrue(test_rbac._validate_role_boundary("user@admin.example.com", Role.ADMIN, "admin.example.com"))
# Should deny assignment to other domains, even internal ones
self.assertFalse(test_rbac._validate_role_boundary("user@example.com", Role.ADMIN, "example.com"))
self.assertFalse(test_rbac._validate_role_boundary("user@internal.org", Role.ADMIN, "internal.org"))
def test_validate_role_boundary_no_domain(self):
"""Test boundary validation when no domain is provided but can be extracted from email."""
test_rbac = RBACEngine(Fernet.generate_key())
test_rbac.role_boundaries[Role.DEVELOPER] = RoleBoundary.INTERNAL
test_rbac.domain_restrictions[RoleBoundary.INTERNAL] = ['example.com']
# Should extract domain from email
self.assertTrue(test_rbac._validate_role_boundary("user@example.com", Role.DEVELOPER))
self.assertFalse(test_rbac._validate_role_boundary("user@external.org", Role.DEVELOPER))
# Should fail if no domain can be extracted
self.assertFalse(test_rbac._validate_role_boundary("username", Role.DEVELOPER))
def test_validate_role_boundary_undefined_boundary(self):
"""Test boundary validation for undefined role boundary."""
test_rbac = RBACEngine(Fernet.generate_key())
# Remove boundary definition
test_rbac.role_boundaries.pop(Role.DEVELOPER, None)
# Should fail if boundary is not defined
self.assertFalse(test_rbac._validate_role_boundary("user@example.com", Role.DEVELOPER, "example.com"))
def test_add_trusted_certificate(self):
"""Test adding a trusted certificate."""
test_rbac = RBACEngine(Fernet.generate_key())
# Create a mock certificate
mock_cert = MagicMock()
mock_cert.fingerprint.return_value = b'mock_fingerprint'
with patch('security.rbac_engine.load_pem_x509_certificate', return_value=mock_cert):
fingerprint = test_rbac.add_trusted_certificate(b'mock_cert_pem')
# Verify the fingerprint was added to trusted list
self.assertIn(fingerprint, test_rbac.trusted_cert_fingerprints)
def test_create_signed_ou_claim(self):
"""Test creating a signed OU claim."""
test_rbac = RBACEngine(Fernet.generate_key())
# Create a signed claim
claim = test_rbac.create_signed_ou_claim(Role.ADMIN)
# Verify the claim format
self.assertIn(':', claim)
role_name, signature = claim.split(':', 1)
self.assertEqual(role_name, Role.ADMIN.value)
# Verify the signature
expected_signature = hmac.new(
test_rbac.hmac_key,
role_name.encode(),
hashlib.sha256
).digest()
expected_signature_b64 = base64.b64encode(expected_signature).decode()
self.assertEqual(signature, expected_signature_b64)
def test_get_role_from_ou_signed_claim(self):
"""Test extracting role from a signed OU claim."""
test_rbac = RBACEngine(Fernet.generate_key())
# Create a signed claim
claim = test_rbac.create_signed_ou_claim(Role.ADMIN)
# Verify role extraction
role = test_rbac._get_role_from_ou(claim)
self.assertEqual(role, Role.ADMIN)
# Test with invalid signature
invalid_claim = f"{Role.ADMIN.value}:invalid_signature"
self.assertIsNone(test_rbac._get_role_from_ou(invalid_claim))
# Test with invalid role name
hmac_key = test_rbac.hmac_key
invalid_role = "invalid_role"
signature = hmac.new(
hmac_key,
invalid_role.encode(),
hashlib.sha256
).digest()
signature_b64 = base64.b64encode(signature).decode()
invalid_role_claim = f"{invalid_role}:{signature_b64}"
self.assertIsNone(test_rbac._get_role_from_ou(invalid_role_claim))
# --- Integration Tests for TLS Certificate Mapping ---
def test_cert_validation_with_signed_ou_claim(self):
"""Test certificate validation with a signed OU claim."""
test_rbac = RBACEngine(Fernet.generate_key())
# Add certificate to trusted list
fingerprint = "test_fingerprint_signed_ou"
test_rbac.trusted_cert_fingerprints.add(fingerprint)
# Create a signed OU claim
signed_claim = test_rbac.create_signed_ou_claim(Role.ADMIN)
# Create certificate info with signed claim
cert_info = ClientCertInfo(
subject={'CN': 'cert_signed_ou', 'OU': signed_claim},
fingerprint=fingerprint,
raw_cert=object()
)
# Verify permission validation
self.assertTrue(test_rbac.validate_permission(resource="admin", action="delegate", client_cert_info=cert_info))
self.assertFalse(test_rbac.validate_permission(resource="tasks", action="create", client_cert_info=cert_info))
def test_cert_validation_with_tampered_ou_claim(self):
"""Test certificate validation with a tampered OU claim."""
test_rbac = RBACEngine(Fernet.generate_key())
# Add certificate to trusted list
fingerprint = "test_fingerprint_tampered_ou"
test_rbac.trusted_cert_fingerprints.add(fingerprint)
# Create a signed OU claim and tamper with it
signed_claim = test_rbac.create_signed_ou_claim(Role.DEVELOPER)
tampered_claim = signed_claim.replace(Role.DEVELOPER.value, Role.ADMIN.value)
# Create certificate info with tampered claim
cert_info = ClientCertInfo(
subject={'CN': 'cert_tampered_ou', 'OU': tampered_claim},
fingerprint=fingerprint,
raw_cert=object()
)
# Verify permission validation fails
self.assertFalse(test_rbac.validate_permission(resource="admin", action="delegate", client_cert_info=cert_info))
# --- Negative Test Cases for Boundary Violations ---
def test_assign_role_boundary_violation(self):
"""Test role assignment with boundary violation."""
test_rbac = RBACEngine(Fernet.generate_key())
# Set up boundary restrictions
test_rbac.role_boundaries[Role.ADMIN] = RoleBoundary.RESTRICTED
test_rbac.domain_restrictions[RoleBoundary.RESTRICTED] = ['admin.example.com']
# Attempt to assign admin role to non-admin domain
result = test_rbac.assign_role("user@example.com", Role.ADMIN, "example.com")
self.assertFalse(result)
self.assertNotIn("user@example.com", test_rbac.user_roles)
# Verify correct assignment works
result = test_rbac.assign_role("admin@admin.example.com", Role.ADMIN, "admin.example.com")
self.assertTrue(result)
self.assertIn("admin@admin.example.com", test_rbac.user_roles)
def test_cert_validation_pinning_failure(self):
"""Test certificate validation with pinning failure."""
test_rbac = RBACEngine(Fernet.generate_key())
# Create certificate info with unknown fingerprint
cert_info = ClientCertInfo(
subject={'CN': 'cert_unknown', 'OU': 'admin'},
fingerprint="unknown_fingerprint",
raw_cert=object()
)
# Verify permission validation fails due to pinning
self.assertFalse(test_rbac.validate_permission(resource="admin", action="delegate", client_cert_info=cert_info))
def test_cert_validation_missing_fingerprint(self):
"""Test certificate validation with missing fingerprint."""
test_rbac = RBACEngine(Fernet.generate_key())
# Create certificate info with missing fingerprint
cert_info = ClientCertInfo(
subject={'CN': 'cert_no_fingerprint', 'OU': 'admin'},
fingerprint="", # Empty fingerprint
raw_cert=object()
)
# Verify permission validation fails due to missing fingerprint
self.assertFalse(test_rbac.validate_permission(resource="admin", action="delegate", client_cert_info=cert_info))
# --- Audit Log Verification Tests ---
def test_verify_audit_log_integrity_empty(self):
"""Test verification of empty audit log."""
test_rbac = RBACEngine(Fernet.generate_key())
# Verify empty audit log
self.assertTrue(test_rbac.verify_audit_log_integrity([]))
def test_verify_audit_log_integrity_valid(self):
"""Test verification of valid audit log integrity."""
test_rbac = RBACEngine(Fernet.generate_key())
# Generate a sequence of audit log entries
audit_entries = []
previous_hash = None
for i in range(5):
entry = {
"sequence": i + 1,
"timestamp": "2025-05-02T12:00:00",
"user": f"user{i}",
"resource": "test",
"action": "read",
"allowed": True,
"reason": "Test",
"auth_method": "username",
"previous_hash": previous_hash
}
# Calculate integrity hash
entry_json = json.dumps(entry, sort_keys=True)
integrity_hash = hmac.new(
test_rbac.hmac_key,
entry_json.encode(),
hashlib.sha256
).hexdigest()
# Add integrity hash to entry
entry["integrity_hash"] = integrity_hash
# Update previous hash for next entry
previous_hash = integrity_hash
# Add entry to list
audit_entries.append(entry)
# Verify integrity
self.assertTrue(test_rbac.verify_audit_log_integrity(audit_entries))
def test_verify_audit_log_integrity_tampered(self):
"""Test verification of tampered audit log integrity."""
test_rbac = RBACEngine(Fernet.generate_key())
# Generate a sequence of audit log entries
audit_entries = []
previous_hash = None
for i in range(5):
entry = {
"sequence": i + 1,
"timestamp": "2025-05-02T12:00:00",
"user": f"user{i}",
"resource": "test",
"action": "read",
"allowed": True,
"reason": "Test",
"auth_method": "username",
"previous_hash": previous_hash
}
# Calculate integrity hash
entry_json = json.dumps(entry, sort_keys=True)
integrity_hash = hmac.new(
test_rbac.hmac_key,
entry_json.encode(),
hashlib.sha256
).hexdigest()
# Add integrity hash to entry
entry["integrity_hash"] = integrity_hash
# Update previous hash for next entry
previous_hash = integrity_hash
# Add entry to list
audit_entries.append(entry)
# Tamper with an entry
audit_entries[2]["allowed"] = False
# Verify integrity fails
self.assertFalse(test_rbac.verify_audit_log_integrity(audit_entries))
def test_verify_audit_log_integrity_broken_chain(self):
"""Test verification of audit log with broken chain."""
test_rbac = RBACEngine(Fernet.generate_key())
# Generate a sequence of audit log entries
audit_entries = []
previous_hash = None
for i in range(5):
entry = {
"sequence": i + 1,
"timestamp": "2025-05-02T12:00:00",
"user": f"user{i}",
"resource": "test",
"action": "read",
"allowed": True,
"reason": "Test",
"auth_method": "username",
"previous_hash": previous_hash
}
# Calculate integrity hash
entry_json = json.dumps(entry, sort_keys=True)
integrity_hash = hmac.new(
test_rbac.hmac_key,
entry_json.encode(),
hashlib.sha256
).hexdigest()
# Add integrity hash to entry
entry["integrity_hash"] = integrity_hash
# Update previous hash for next entry
previous_hash = integrity_hash
# Add entry to list
audit_entries.append(entry)
# Break the chain by changing a previous_hash
audit_entries[3]["previous_hash"] = "invalid_hash"
# Verify integrity fails
self.assertFalse(test_rbac.verify_audit_log_integrity(audit_entries))
def test_verify_audit_log_integrity_missing_hash(self):
"""Test verification of audit log with missing integrity hash."""
test_rbac = RBACEngine(Fernet.generate_key())
# Generate a sequence of audit log entries
audit_entries = []
previous_hash = None
for i in range(5):
entry = {
"sequence": i + 1,
"timestamp": "2025-05-02T12:00:00",
"user": f"user{i}",
"resource": "test",
"action": "read",
"allowed": True,
"reason": "Test",
"auth_method": "username",
"previous_hash": previous_hash
}
# Calculate integrity hash
entry_json = json.dumps(entry, sort_keys=True)
integrity_hash = hmac.new(
test_rbac.hmac_key,
entry_json.encode(),
hashlib.sha256
).hexdigest()
# Add integrity hash to entry
entry["integrity_hash"] = integrity_hash
# Update previous hash for next entry
previous_hash = integrity_hash
# Add entry to list
audit_entries.append(entry)
# Remove integrity hash from an entry
del audit_entries[2]["integrity_hash"]
# Verify integrity fails
self.assertFalse(test_rbac.verify_audit_log_integrity(audit_entries))
# --- Performance Benchmark Tests ---
def test_permission_validation_performance(self):
"""Test performance of permission validation."""
test_rbac = RBACEngine(Fernet.generate_key())
test_rbac.assign_role("test_user", Role.DEVELOPER)
# Measure time for 1000 permission validations
iterations = 1000
start_time = time.time()
for _ in range(iterations):
test_rbac.validate_permission(user="test_user", resource="tasks", action="create")
end_time = time.time()
elapsed_time = end_time - start_time
# Calculate operations per second
ops_per_second = iterations / elapsed_time
# Log performance metrics
print(f"\nPermission validation performance: {ops_per_second:.2f} ops/sec")
print(f"Average validation time: {(elapsed_time / iterations) * 1000:.2f} ms")
# Assert reasonable performance (adjust threshold as needed)
self.assertGreater(ops_per_second, 100, "Permission validation performance below threshold")
def test_encryption_decryption_performance(self):
"""Test performance of encryption and decryption."""
test_rbac = RBACEngine(Fernet.generate_key())
test_payload = {"key": "value", "nested": {"data": [1, 2, 3, 4, 5]}}
# Measure time for 100 encryption/decryption cycles
iterations = 100
start_time = time.time()
for _ in range(iterations):
encrypted = test_rbac.encrypt_payload(test_payload)
decrypted = test_rbac.decrypt_payload(encrypted)
self.assertEqual(decrypted, test_payload)
end_time = time.time()
elapsed_time = end_time - start_time
# Calculate operations per second
ops_per_second = iterations / elapsed_time
# Log performance metrics
print(f"\nEncryption/decryption performance: {ops_per_second:.2f} cycles/sec")
print(f"Average cycle time: {(elapsed_time / iterations) * 1000:.2f} ms")
# Assert reasonable performance (adjust threshold as needed)
self.assertGreater(ops_per_second, 10, "Encryption/decryption performance below threshold")
def test_certificate_validation_performance(self):
"""Test performance of certificate validation."""
test_rbac = RBACEngine(Fernet.generate_key())
# Add certificate to trusted list
fingerprint = "test_fingerprint_perf"
test_rbac.trusted_cert_fingerprints.add(fingerprint)
# Create certificate info
cert_info = ClientCertInfo(
subject={'CN': 'cert_perf', 'OU': 'developer'},
fingerprint=fingerprint,
raw_cert=object()
)
# Measure time for 1000 certificate validations
iterations = 1000
start_time = time.time()
for _ in range(iterations):
test_rbac.validate_permission(resource="tasks", action="create", client_cert_info=cert_info)
end_time = time.time()
elapsed_time = end_time - start_time
# Calculate operations per second
ops_per_second = iterations / elapsed_time
# Log performance metrics
print(f"\nCertificate validation performance: {ops_per_second:.2f} ops/sec")
print(f"Average validation time: {(elapsed_time / iterations) * 1000:.2f} ms")
# Assert reasonable performance (adjust threshold as needed)
self.assertGreater(ops_per_second, 100, "Certificate validation performance below threshold")
if __name__ == '__main__':
unittest.main()
def test_certificate_validation(self):
"""Test certificate validation scenarios"""
from datetime import datetime, timedelta
# Valid certificate
valid_cert = ClientCertInfo(
subject={'OU': 'admin'},
fingerprint=self.cert_fingerprints['cert_admin'],
not_after=datetime.now() + timedelta(days=1))
self.rbac.validate_certificate(valid_cert)
# Missing OU claim
no_ou_cert = ClientCertInfo(
subject={},
fingerprint=self.cert_fingerprints['cert_no_ou'],
not_after=datetime.now() + timedelta(days=1))
with self.assertRaises(ValueError):
self.rbac.validate_certificate(no_ou_cert)
# Untrusted fingerprint
untrusted_cert = ClientCertInfo(
subject={'OU': 'admin'},
fingerprint='untrusted_fingerprint',
not_after=datetime.now() + timedelta(days=1))
with self.assertRaises(ValueError):
self.rbac.validate_certificate(untrusted_cert)
# Expired certificate
expired_cert = ClientCertInfo(
subject={'OU': 'admin'},
fingerprint=self.cert_fingerprints['cert_admin'],
not_after=datetime.now() - timedelta(days=1))
with self.assertRaises(ValueError):
self.rbac.validate_certificate(expired_cert)
def test_boundary_enforcement(self):
"""Test role boundary enforcement"""
# Admin should have full access
self.assertTrue(self.rbac.check_permission(
"admin_user@admin.example.com",
"sensitive_data",
"read"))
# Developer should be restricted from admin functions
self.assertFalse(self.rbac.check_permission(
"dev_user@example.com",
"admin_console",
"access"))
# Auditor should only have read access
self.assertTrue(self.rbac.check_permission(
"audit_user@external.org",
"audit_logs",
"read"))
self.assertFalse(self.rbac.check_permission(
"audit_user@external.org",
"audit_logs",
"delete"))