ai-agent/tests/security/test_rbac_engine.py

151 lines
No EOL
7.8 KiB
Python

import unittest
from unittest.mock import patch
from security.rbac_engine import RBACEngine, Role
from cryptography.fernet import Fernet
class TestRBACEngine(unittest.TestCase):
def setUp(self):
self.encryption_key = Fernet.generate_key()
self.rbac = RBACEngine(self.encryption_key)
self.rbac.assign_role("admin_user", Role.ADMIN)
self.rbac.assign_role("dev_user", Role.DEVELOPER)
self.rbac.assign_role("audit_user", Role.AUDITOR)
def test_role_assignments(self):
self.assertEqual(self.rbac.user_roles["admin_user"], Role.ADMIN)
self.assertEqual(self.rbac.user_roles["dev_user"], Role.DEVELOPER)
self.assertEqual(self.rbac.user_roles["audit_user"], Role.AUDITOR)
def test_admin_permissions_correct(self):
# Test allowed actions on the correct resource
self.assertTrue(self.rbac.validate_permission("admin_user", "admin", "delegate"))
self.assertTrue(self.rbac.validate_permission("admin_user", "admin", "audit"))
self.assertTrue(self.rbac.validate_permission("admin_user", "admin", "configure"))
# Test denied actions on the correct resource
self.assertFalse(self.rbac.validate_permission("admin_user", "admin", "read"))
# Test denied access to other resources
self.assertFalse(self.rbac.validate_permission("admin_user", "tasks", "create"))
self.assertFalse(self.rbac.validate_permission("admin_user", "logs", "read"))
def test_developer_permissions(self):
self.assertTrue(self.rbac.validate_permission("dev_user", "tasks", "create"))
self.assertFalse(self.rbac.validate_permission("dev_user", "tasks", "delete"))
self.assertFalse(self.rbac.validate_permission("dev_user", "logs", "read"))
def test_auditor_permissions(self):
self.assertTrue(self.rbac.validate_permission("audit_user", "logs", "read"))
self.assertFalse(self.rbac.validate_permission("audit_user", "tasks", "create"))
def test_encryption_decryption(self):
test_payload = {"key": "value"}
encrypted = self.rbac.encrypt_payload(test_payload)
decrypted = self.rbac.decrypt_payload(encrypted)
self.assertEqual(decrypted, test_payload)
def test_unauthorized_access(self):
self.assertFalse(self.rbac.validate_permission("unknown_user", "tasks", "read"))
def test_pre_validation_hook_override(self):
"""Test SYMPHONY-INTEGRATION-POINT: Pre-validation hook override"""
# Create new instance to avoid test isolation issues
test_rbac = RBACEngine(Fernet.generate_key())
test_rbac.assign_role("hook_test_user", Role.ADMIN)
# Override hook to block all access
def block_all_hook(user, resource, action):
return False
test_rbac._trigger_pre_validation_hook = block_all_hook
self.assertFalse(test_rbac.validate_permission("hook_test_user", "tasks", "read"))
@patch('security.rbac_engine.logger')
def test_audit_logging(self, mock_logger):
"""Test SYMPHONY-INTEGRATION-POINT: Audit logging callback"""
test_rbac = RBACEngine(Fernet.generate_key())
test_rbac.assign_role("audit_test_user", Role.DEVELOPER)
# Test denied access
test_rbac.validate_permission("audit_test_user", "logs", "read")
# Test allowed access
test_rbac.validate_permission("audit_test_user", "tasks", "read")
# Verify audit entries were logged
self.assertEqual(mock_logger.info.call_count, 3) # Assign role + 2 validations
# More specific checks on logged reasons
log_messages = [call.args[0] for call in mock_logger.info.call_args_list]
# Check assignment log
self.assertTrue(any(f"Assigned {Role.DEVELOPER.value} role to audit_test_user" in msg for msg in log_messages))
# Check denied log entry for resource mismatch
denied_log_found = False
for msg in log_messages:
if "Audit entry:" in msg and "'allowed': False" in msg and "'reason': 'Resource mismatch'" in msg and "'user': 'audit_test_user'" in msg and "'resource': 'logs'" in msg:
denied_log_found = True
break
self.assertTrue(denied_log_found, "Missing specific denied audit log (Resource mismatch)")
# Check allowed log entry
allowed_log_found = False
for msg in log_messages:
if "Audit entry:" in msg and "'allowed': True" in msg and "'reason': 'Access granted'" in msg and "'user': 'audit_test_user'" in msg and "'resource': 'tasks'" in msg:
allowed_log_found = True
break
self.assertTrue(allowed_log_found, "Missing specific allowed audit log")
def test_decrypt_payload_dict_bypass(self):
"""Test that decrypt_payload bypasses decryption for dict input (test helper)."""
test_payload = {"test": "data"}
# Pass a dict directly, should return it unchanged without decryption error
decrypted = self.rbac.decrypt_payload(test_payload)
self.assertEqual(decrypted, test_payload)
self.assertIs(decrypted, test_payload) # Check it's the same object
# --- Placeholder for TLS-RBAC Integration Tests ---
@patch('security.rbac_engine.RBACEngine._get_user_role_from_tls_cert') # Mock the cert extraction
def test_permission_validation_with_tls_cert_role_admin(self, mock_get_role):
"""Verify permission check uses role extracted from TLS cert context (Admin)."""
# Mock the function to return ADMIN role for a specific cert context/user ID
mock_get_role.return_value = Role.ADMIN
# Simulate a TLS context object or relevant identifier
mock_tls_context = {"subject_dn": "CN=admin_user,OU=AdminGroup"}
# Use a user identifier derived from the cert context
cert_user_id = "tls:CN=admin_user,OU=AdminGroup"
# Assign role based on cert (though mock handles the lookup here)
# In a real scenario, assign_role might not be needed if lookup is dynamic
self.rbac.assign_role(cert_user_id, Role.ADMIN)
# Test permissions expected for ADMIN role derived from TLS
self.assertTrue(self.rbac.validate_permission(cert_user_id, "admin", "delegate", tls_context=mock_tls_context))
self.assertFalse(self.rbac.validate_permission(cert_user_id, "tasks", "create", tls_context=mock_tls_context))
mock_get_role.assert_called_once_with(mock_tls_context)
@patch('security.rbac_engine.RBACEngine._get_user_role_from_tls_cert') # Mock the cert extraction
def test_permission_validation_with_tls_cert_role_developer(self, mock_get_role):
"""Verify permission check uses role extracted from TLS cert context (Developer)."""
mock_get_role.return_value = Role.DEVELOPER
mock_tls_context = {"subject_dn": "CN=dev_user,OU=DevGroup"}
cert_user_id = "tls:CN=dev_user,OU=DevGroup"
self.rbac.assign_role(cert_user_id, Role.DEVELOPER)
self.assertTrue(self.rbac.validate_permission(cert_user_id, "tasks", "create", tls_context=mock_tls_context))
self.assertFalse(self.rbac.validate_permission(cert_user_id, "admin", "delegate", tls_context=mock_tls_context))
mock_get_role.assert_called_once_with(mock_tls_context)
# Add tests for other roles (Auditor) and scenarios like missing/invalid certs,
# revocation checks (if applicable before RBAC), and OU mapping logic.
# The _get_user_role_from_tls_cert method itself would need separate unit tests.
# Removed test_admin_cant_access_wildcard_resources and test_action_wildcards_rejected_in_definition
# as they were based on incorrect assumptions and non-existent methods.
# The core logic is tested by test_admin_permissions_correct and the validation implementation.
if __name__ == '__main__':
unittest.main()