ai-agent/cli_commands.py

129 lines
No EOL
4.1 KiB
Python

import click
import time
from functools import wraps
from security.rbac_engine import RBACEngine
from security.audit import SecureAudit
from typing import Optional
rbac = RBACEngine()
def timed_command(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
execution_time = time.time() - start_time
kwargs['audit_logger'].log_performance(
command=func.__name__,
execution_time=execution_time
)
return result
return wrapper
class CLICommand:
def __init__(self, audit_logger: SecureAudit):
self.audit_logger = audit_logger
@click.command()
@click.option('--task-id', required=True, help='Task ID to add')
@click.option('--user', required=True, help='User adding task')
@timed_command
def add_task(self, task_id: str, user: str):
"""Add a new task with RBAC validation"""
self.audit_logger.log_attempt(
command='add_task',
user=user,
params={'task_id': task_id}
)
if not rbac.validate_permission(user, 'task_add'):
self.audit_logger.log_denial(
command='add_task',
user=user,
reason='RBAC validation failed'
)
click.echo("Permission denied")
return
# Implementation would go here
click.echo(f"Added task {task_id}")
self.audit_logger.log_success(
command='add_task',
user=user,
result={'task_id': task_id}
)
@click.command()
@click.option('--user', required=True, help='User requesting task')
@timed_command
def get_next_task(self, user: str):
"""Get next available task with RBAC validation"""
self.audit_logger.log_attempt(
command='get_next_task',
user=user
)
if not rbac.validate_permission(user, 'task_read'):
self.audit_logger.log_denial(
command='get_next_task',
user=user,
reason='RBAC validation failed'
)
click.echo("Permission denied")
return
# Implementation would go here
click.echo("Retrieved next task")
self.audit_logger.log_success(
command='get_next_task',
user=user
)
@click.command()
@click.option('--task-id', required=True, help='Task ID to process')
@click.option('--user', required=True, help='User processing task')
@timed_command
def process_task(self, task_id: str, user: str):
"""Process a task with RBAC validation"""
self.audit_logger.log_attempt(
command='process_task',
user=user,
params={'task_id': task_id}
)
if not rbac.validate_permission(user, 'task_process'):
self.audit_logger.log_denial(
command='process_task',
user=user,
reason='RBAC validation failed'
)
click.echo("Permission denied")
return
# Implementation would go here
click.echo(f"Processed task {task_id}")
self.audit_logger.log_success(
command='process_task',
user=user,
result={'task_id': task_id}
)
@click.command()
@click.option('--user', required=True, help='User to validate')
@click.option('--permission', required=True, help='Permission to validate')
@timed_command
def validate_permissions(self, user: str, permission: str):
"""Validate user permissions"""
self.audit_logger.log_attempt(
command='validate_permissions',
user=user,
params={'permission': permission}
)
result = rbac.validate_permission(user, permission)
self.audit_logger.log_validation(
user=user,
permission=permission,
result=result
)
click.echo(f"Permission {'granted' if result else 'denied'}")