"""Performance tests for event system.""" import time import threading import pytest from ..core import EventSystem from orchestrator.scheduler import Scheduler from orchestrator.core.dispatcher import Dispatcher @pytest.fixture def event_system(): """Test fixture for event system.""" dispatcher = Dispatcher() scheduler = Scheduler(dispatcher, test_mode=True) return EventSystem(scheduler) def test_event_throughput(event_system): """Test system can handle 100+ events per second.""" event_count = 1000 processed = 0 lock = threading.Lock() def handler(_): nonlocal processed with lock: processed += 1 # Subscribe to test events event_system.subscribe("perf_test", handler) # Start processing event_system.dispatcher.start() # Send events as fast as possible start_time = time.time() for i in range(event_count): event_system.publish({"type": "perf_test", "data": i}) # Wait for processing to complete while processed < event_count and time.time() - start_time < 10: time.sleep(0.1) elapsed = time.time() - start_time rate = event_count / elapsed # Cleanup event_system.dispatcher.stop() assert rate >= 100, f"Event rate {rate:.1f}/sec below required 100/sec" print(f"Processed {event_count} events in {elapsed:.3f} seconds ({rate:.1f}/sec)") def test_concurrent_publishers(event_system): """Test system handles concurrent publishers.""" event_count = 1000 processed = 0 lock = threading.Lock() def handler(_): nonlocal processed with lock: processed += 1 event_system.subscribe("concurrent_test", handler) event_system.dispatcher.start() def publisher_thread(): for _ in range(event_count // 10): event_system.publish({"type": "concurrent_test"}) start_time = time.time() threads = [threading.Thread(target=publisher_thread) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() while processed < event_count and time.time() - start_time < 10: time.sleep(0.1) elapsed = time.time() - start_time rate = event_count / elapsed event_system.dispatcher.stop() assert rate >= 100, f"Concurrent event rate {rate:.1f}/sec below required 100/sec" print(f"Processed {event_count} concurrent events in {elapsed:.3f} seconds ({rate:.1f}/sec)") def test_scheduled_events(event_system): """Test integration with scheduler for delayed events.""" processed = 0 lock = threading.Lock() def handler(_): nonlocal processed with lock: processed += 1 event_system.subscribe("scheduled_test", handler) event_system.dispatcher.start() # Schedule 100 events with 0.01s delay start_time = time.time() for i in range(100): event_system.dispatcher.schedule_event( {"type": "scheduled_test"}, 0.01 ) # Wait for processing while processed < 100 and time.time() - start_time < 2: time.sleep(0.1) elapsed = time.time() - start_time event_system.dispatcher.stop() assert processed == 100, f"Only processed {processed}/100 scheduled events" assert elapsed < 1.5, f"Scheduled events took too long ({elapsed:.2f}s)" print(f"Processed 100 scheduled events in {elapsed:.3f} seconds") def test_api_response_time(event_system): """Test API response time meets ≤800ms requirement.""" event_system.dispatcher.start() # Measure response time for critical API path start_time = time.time() event_system.publish({"type": "api_request", "path": "/critical"}) response = event_system.get_response("api_request") elapsed = (time.time() - start_time) * 1000 # Convert to ms event_system.dispatcher.stop() assert elapsed <= 800, f"API response time {elapsed:.1f}ms exceeds 800ms limit" print(f"API response time: {elapsed:.1f}ms") def test_encrypted_event_performance(event_system): """Test performance impact of encrypted events.""" event_count = 1000 processed = 0 lock = threading.Lock() def handler(_): nonlocal processed with lock: processed += 1 event_system.subscribe("encrypted_test", handler) event_system.dispatcher.start() # Send encrypted events start_time = time.time() for i in range(event_count): event = {"type": "encrypted_test", "data": i, "encrypted": True} event_system.publish(event) while processed < event_count and time.time() - start_time < 10: time.sleep(0.1) elapsed = time.time() - start_time rate = event_count / elapsed event_system.dispatcher.stop() assert rate >= 80, f"Encrypted event rate {rate:.1f}/sec below required 80/sec" print(f"Processed {event_count} encrypted events in {elapsed:.3f} seconds ({rate:.1f}/sec)") def test_key_rotation_performance(event_system): """Test performance impact of key rotation.""" start_time = time.time() event_system.rotate_keys() elapsed = (time.time() - start_time) * 1000 # Convert to ms assert elapsed <= 500, f"Key rotation took {elapsed:.1f}ms (max 500ms)" print(f"Key rotation completed in {elapsed:.1f}ms") def test_invalid_key_handling(event_system): """Test performance of invalid key detection.""" invalid_events = 100 start_time = time.time() for i in range(invalid_events): with pytest.raises(InvalidKeyError): event_system.publish({"type": "invalid_test", "key": "bad_key"}) elapsed = (time.time() - start_time) * 1000 / invalid_events assert elapsed <= 10, f"Invalid key handling took {elapsed:.1f}ms/event (max 10ms)" print(f"Invalid key handling: {elapsed:.1f}ms per event") def test_tamper_detection_performance(event_system): """Test performance of tamper detection.""" tampered_events = 100 start_time = time.time() for i in range(tampered_events): with pytest.raises(TamperDetectedError): event = {"type": "tampered_test", "data": i} event["_signature"] = "invalid_signature" event_system.publish(event) elapsed = (time.time() - start_time) * 1000 / tampered_events assert elapsed <= 15, f"Tamper detection took {elapsed:.1f}ms/event (max 15ms)" print(f"Tamper detection: {elapsed:.1f}ms per event") def test_audit_log_performance(event_system): """Test performance impact of audit logging.""" event_count = 1000 start_time = time.time() for i in range(event_count): event_system.publish({"type": "audit_test", "data": i}) elapsed = (time.time() - start_time) * 1000 / event_count assert elapsed <= 5, f"Audit logging took {elapsed:.1f}ms/event (max 5ms)" print(f"Audit logging: {elapsed:.1f}ms per event") def test_critical_path_coverage(event_system): """Test 100% critical path coverage timing.""" paths = [ "auth", "dispatch", "encrypt", "decrypt", "validate", "log" ] max_times = { "auth": 50, # ms "dispatch": 100, "encrypt": 150, "decrypt": 150, "validate": 75, "log": 20 } event_system.dispatcher.start() results = {} for path in paths: start_time = time.time() event_system.publish({"type": "timing_test", "path": path}) response = event_system.get_response("timing_test") elapsed = (time.time() - start_time) * 1000 results[path] = elapsed assert response["status"] == "ok" event_system.dispatcher.stop() for path, time_ms in results.items(): assert time_ms <= max_times[path], \ f"{path} path took {time_ms:.1f}ms (max {max_times[path]}ms)" print(f"{path} path: {time_ms:.1f}ms") def test_edge_case_handling(event_system): """Test edge case handling performance.""" test_cases = [ {"type": "edge_case", "data": None}, {"type": "edge_case", "data": ""}, {"type": "edge_case", "data": {}}, {"type": "edge_case", "data": []}, {"type": "edge_case", "data": "x"*10000} ] event_system.dispatcher.start() results = [] for case in test_cases: start_time = time.time() event_system.publish(case) response = event_system.get_response("edge_case") elapsed = (time.time() - start_time) * 1000 results.append(elapsed) assert response["status"] == "handled" event_system.dispatcher.stop() avg_time = sum(results) / len(results) assert avg_time <= 100, f"Edge case avg time {avg_time:.1f}ms > 100ms" print(f"Edge case avg handling time: {avg_time:.1f}ms") def test_high_priority_events(event_system): """Test high priority event timing.""" event_system.dispatcher.start() # Send mixed priority events start_time = time.time() for i in range(100): priority = "high" if i % 10 == 0 else "normal" event_system.publish({ "type": "priority_test", "priority": priority, "seq": i }) # Get timing for high priority events high_priority_times = [] for i in range(0, 100, 10): response = event_system.get_response("priority_test", filter_fn=lambda r: r["seq"] == i) elapsed = (time.time() - start_time) * 1000 high_priority_times.append(elapsed) assert response["priority"] == "high" event_system.dispatcher.stop() avg_high_priority_time = sum(high_priority_times) / len(high_priority_times) assert avg_high_priority_time <= 50, \ f"High priority avg time {avg_high_priority_time:.1f}ms > 50ms" print(f"High priority avg time: {avg_high_priority_time:.1f}ms")