import click import time from functools import wraps from security.rbac_engine import RBACEngine from security.audit import SecureAudit from typing import Optional rbac = RBACEngine() def timed_command(func): @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) execution_time = time.time() - start_time kwargs['audit_logger'].log_performance( command=func.__name__, execution_time=execution_time ) return result return wrapper class CLICommand: def __init__(self, audit_logger: SecureAudit): self.audit_logger = audit_logger @click.command() @click.option('--task-id', required=True, help='Task ID to add') @click.option('--user', required=True, help='User adding task') @timed_command def add_task(self, task_id: str, user: str): """Add a new task with RBAC validation""" self.audit_logger.log_attempt( command='add_task', user=user, params={'task_id': task_id} ) if not rbac.validate_permission(user, 'task_add'): self.audit_logger.log_denial( command='add_task', user=user, reason='RBAC validation failed' ) click.echo("Permission denied") return # Implementation would go here click.echo(f"Added task {task_id}") self.audit_logger.log_success( command='add_task', user=user, result={'task_id': task_id} ) @click.command() @click.option('--user', required=True, help='User requesting task') @timed_command def get_next_task(self, user: str): """Get next available task with RBAC validation""" self.audit_logger.log_attempt( command='get_next_task', user=user ) if not rbac.validate_permission(user, 'task_read'): self.audit_logger.log_denial( command='get_next_task', user=user, reason='RBAC validation failed' ) click.echo("Permission denied") return # Implementation would go here click.echo("Retrieved next task") self.audit_logger.log_success( command='get_next_task', user=user ) @click.command() @click.option('--task-id', required=True, help='Task ID to process') @click.option('--user', required=True, help='User processing task') @timed_command def process_task(self, task_id: str, user: str): """Process a task with RBAC validation""" self.audit_logger.log_attempt( command='process_task', user=user, params={'task_id': task_id} ) if not rbac.validate_permission(user, 'task_process'): self.audit_logger.log_denial( command='process_task', user=user, reason='RBAC validation failed' ) click.echo("Permission denied") return # Implementation would go here click.echo(f"Processed task {task_id}") self.audit_logger.log_success( command='process_task', user=user, result={'task_id': task_id} ) @click.command() @click.option('--user', required=True, help='User to validate') @click.option('--permission', required=True, help='Permission to validate') @timed_command def validate_permissions(self, user: str, permission: str): """Validate user permissions""" self.audit_logger.log_attempt( command='validate_permissions', user=user, params={'permission': permission} ) result = rbac.validate_permission(user, permission) self.audit_logger.log_validation( user=user, permission=permission, result=result ) click.echo(f"Permission {'granted' if result else 'denied'}")