ai-agent/events/core.py

179 lines
No EOL
6.3 KiB
Python

"""Event-driven framework core implementation."""
import threading
import time
import heapq
from typing import Callable, Dict, Any
from security.encrypt import encrypt_data, decrypt_data, AES256Cipher
from contextlib import contextmanager
class EventQueue:
"""Priority queue for event processing with thread safety."""
def __init__(self):
self._queue = []
self._lock = threading.RLock()
self._event = threading.Event()
def push(self, priority: int, event: Dict[str, Any]) -> None:
"""Add event to queue with priority."""
with self._lock:
heapq.heappush(self._queue, (priority, time.time(), event))
self._event.set()
def pop(self) -> Dict[str, Any]:
"""Get highest priority event."""
while True:
with self._lock:
if self._queue:
return heapq.heappop(self._queue)[2]
self._event.wait()
self._event.clear()
class EventDispatcher:
"""Core event routing and handling system."""
def __init__(self, scheduler, worker_count=4, cipher_pool_size=4):
self._handlers = {}
self._queue = EventQueue()
self._running = False
self._scheduler = scheduler
self._worker_threads = []
self._worker_count = worker_count
self._metrics = {
'events_processed': 0,
'errors': 0,
'last_event_time': None
}
self._cipher_pool = CipherPool(
size=cipher_pool_size,
algorithm='AES-256'
)
def register_handler(self, event_type: str, handler: Callable) -> None:
"""Register event handler for specific event type."""
with threading.RLock():
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
def dispatch(self, event: Dict[str, Any]) -> None:
"""Dispatch event to appropriate handlers."""
event_type = event.get('type')
if not event_type:
return
handlers = self._handlers.get(event_type, [])
for handler in handlers:
try:
handler(event)
except Exception as e:
print(f"Error in event handler: {str(e)}")
def start(self) -> None:
"""Start event processing loop."""
if self._running:
return
self._running = True
for i in range(self._worker_count):
thread = threading.Thread(
target=self._process_events,
daemon=True,
name=f"EventWorker-{i}"
)
thread.start()
self._worker_threads.append(thread)
def _process_events(self) -> None:
"""Main event processing loop."""
while self._running:
event = self._queue.pop()
with threading.RLock():
self._metrics['events_processed'] += 1
self._metrics['last_event_time'] = time.time()
try:
with self._cipher_pool.get_cipher() as cipher:
encrypted_event = {
'type': event.get('type'),
'timestamp': time.time(),
'data': cipher.encrypt(event)
}
self.dispatch(encrypted_event)
except Exception as e:
with threading.RLock():
self._metrics['errors'] += 1
self._metrics['last_error'] = str(e)
def stop(self) -> None:
"""Stop event processing."""
self._running = False
for thread in self._worker_threads:
thread.join()
def schedule_event(self, event: Dict[str, Any], delay: float) -> None:
"""Schedule delayed event execution."""
def delayed_dispatch():
time.sleep(delay)
self._queue.push(0, event)
self._scheduler.register_task(
f"delayed_{time.time()}",
f"* * * * *", # Will run immediately
delayed_dispatch
)
class EventSystem:
"""Main event system interface."""
def __init__(self, scheduler):
self.dispatcher = EventDispatcher(scheduler)
self.encryption_enabled = True
self._performance_stats = {
'min_latency': float('inf'),
'max_latency': 0,
'avg_latency': 0,
'total_events': 0
}
def publish(self, event: Dict[str, Any], priority: int = 0) -> None:
"""Publish event to system."""
if self.encryption_enabled:
event = {
'encrypted': True,
'data': encrypt_data(event)
}
start_time = time.time()
self.dispatcher._queue.push(priority, event)
latency = time.time() - start_time
self._update_stats(latency)
def subscribe(self, event_type: str, handler: Callable) -> None:
"""Subscribe to events of specific type."""
if self.encryption_enabled:
def wrapped_handler(event):
if event.get('encrypted'):
try:
event = decrypt_data(event['data'])
except Exception as e:
print(f"Decryption error: {str(e)}")
return
handler(event)
self.dispatcher.register_handler(event_type, wrapped_handler)
else:
self.dispatcher.register_handler(event_type, handler)
def _update_stats(self, latency):
"""Update performance statistics."""
with threading.RLock():
stats = self._performance_stats
stats['total_events'] += 1
stats['min_latency'] = min(stats['min_latency'], latency)
stats['max_latency'] = max(stats['max_latency'], latency)
stats['avg_latency'] = (
(stats['avg_latency'] * (stats['total_events'] - 1) + latency)
/ stats['total_events']
)
def get_performance_stats(self):
"""Get current performance statistics."""
with threading.RLock():
return self._performance_stats.copy()