"""Integration tests for event framework.""" import unittest import time import threading from unittest.mock import patch, MagicMock from events.core import EventSystem, EventDispatcher from security.encrypt import AES256Cipher class TestEventFrameworkIntegration(unittest.TestCase): """Tests event framework integration points.""" def setUp(self): self.scheduler = MagicMock() self.system = EventSystem(self.scheduler) self.cipher = AES256Cipher() def test_encrypted_event_flow(self): """Test full encrypted event lifecycle.""" test_event = {'type': 'test', 'data': 'secret'} # Capture decrypted event received_event = None def handler(event): nonlocal received_event received_event = event self.system.subscribe('test', handler) self.system.publish(test_event) # Allow time for async processing time.sleep(0.1) self.assertEqual(received_event['data'], 'secret') self.assertTrue('encrypted' not in received_event) def test_concurrent_encrypted_events(self): """Test handling of concurrent encrypted events.""" results = [] lock = threading.Lock() def handler(event): with lock: results.append(event['data']) self.system.subscribe('concurrent', handler) threads = [] for i in range(10): t = threading.Thread( target=self.system.publish, args=({'type': 'concurrent', 'data': str(i)},) ) threads.append(t) t.start() def test_max_size_event_handling(self): """Test handling of maximum size encrypted events.""" max_size = 1024 * 1024 # 1MB large_data = 'x' * max_size start_time = time.time() received = None def handler(event): nonlocal received received = event self.system.subscribe('large', handler) self.system.publish({'type': 'large', 'data': large_data}) time.sleep(0.5) # Extra time for large payload elapsed = (time.time() - start_time) * 1000 # ms self.assertEqual(len(received['data']), max_size) self.assertLess(elapsed, 1000, f"Large event took {elapsed}ms (max 1000ms)") print(f"Max size event processed in {elapsed}ms") def test_malformed_encrypted_payloads(self): """Test handling of malformed encrypted payloads.""" test_cases = [ {'type': 'malformed', 'data': None}, {'type': 'malformed', 'data': {'nested': 'value'}}, {'type': 'malformed', 'data': b'invalid_bytes'} ] errors = [] def error_handler(event): errors.append(event) self.system.subscribe('malformed', error_handler) start_time = time.time() for case in test_cases: with self.assertRaises(ValueError): self.system.publish(case) elapsed = (time.time() - start_time) * 1000 / len(test_cases) self.assertLess(elapsed, 50, f"Malformed handling took {elapsed}ms/case (max 50ms)") print(f"Malformed payload handling: {elapsed}ms per case") def test_concurrent_large_events(self): """Test concurrent handling of large encrypted events.""" event_size = 512 * 1024 # 512KB event_count = 10 results = [] lock = threading.Lock() def handler(event): with lock: results.append(len(event['data'])) self.system.subscribe('concurrent_large', handler) start_time = time.time() threads = [] for i in range(event_count): t = threading.Thread( target=self.system.publish, args=({'type': 'concurrent_large', 'data': 'x' * event_size},) ) threads.append(t) t.start() for t in threads: t.join() elapsed = (time.time() - start_time) * 1000 # ms avg_time = elapsed / event_count self.assertEqual(len(results), event_count) self.assertLess(avg_time, 500, f"Avg large event took {avg_time}ms (max 500ms)") print(f"Concurrent large events: {avg_time}ms avg per event") def test_mixed_workload_performance(self): """Test performance with mixed event sizes and types.""" small_events = 100 large_events = 10 large_size = 256 * 1024 # 256KB start_time = time.time() # Small events for i in range(small_events): self.system.publish({'type': 'mixed', 'data': str(i)}) # Large events for i in range(large_events): self.system.publish({'type': 'mixed', 'data': 'x' * large_size}) elapsed = (time.time() - start_time) * 1000 # ms total_events = small_events + large_events avg_time = elapsed / total_events self.assertLess(avg_time, 20, f"Mixed workload avg {avg_time}ms/event (max 20ms)") print(f"Mixed workload performance: {avg_time}ms avg per event") for t in threads: t.join() time.sleep(0.2) # Allow processing self.assertEqual(len(results), 10) self.assertEqual(sorted(results), [str(i) for i in range(10)]) def test_event_priority_handling(self): """Test priority queue handling with encryption.""" results = [] def handler(event): results.append(event['priority']) self.system.subscribe('priority', handler) for i in range(5, 0, -1): self.system.publish( {'type': 'priority', 'priority': i}, priority=i ) time.sleep(0.1) self.assertEqual(results, [5,4,3,2,1]) @patch('security.encrypt.AES256Cipher.decrypt') def test_decryption_failure_handling(self, mock_decrypt): """Test graceful handling of decryption failures.""" mock_decrypt.side_effect = Exception("Invalid key") errors = [] def error_handler(event): errors.append(event) self.system.subscribe('error', error_handler) self.system.publish({'type': 'error', 'data': 'fail'}) time.sleep(0.1) self.assertEqual(len(errors), 1) def test_performance_metrics(self): """Test performance metric collection.""" for i in range(10): self.system.publish({'type': 'perf', 'data': str(i)}) stats = self.system.get_performance_stats() self.assertEqual(stats['total_events'], 10) self.assertLess(stats['avg_latency'], 0.1) if __name__ == '__main__': unittest.main()