129 lines
No EOL
4.1 KiB
Python
129 lines
No EOL
4.1 KiB
Python
import click
|
|
import time
|
|
from functools import wraps
|
|
from security.rbac_engine import RBACEngine
|
|
from security.audit import SecureAudit
|
|
from typing import Optional
|
|
|
|
rbac = RBACEngine()
|
|
|
|
def timed_command(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
result = func(*args, **kwargs)
|
|
execution_time = time.time() - start_time
|
|
kwargs['audit_logger'].log_performance(
|
|
command=func.__name__,
|
|
execution_time=execution_time
|
|
)
|
|
return result
|
|
return wrapper
|
|
|
|
class CLICommand:
|
|
def __init__(self, audit_logger: SecureAudit):
|
|
self.audit_logger = audit_logger
|
|
|
|
@click.command()
|
|
@click.option('--task-id', required=True, help='Task ID to add')
|
|
@click.option('--user', required=True, help='User adding task')
|
|
@timed_command
|
|
def add_task(self, task_id: str, user: str):
|
|
"""Add a new task with RBAC validation"""
|
|
self.audit_logger.log_attempt(
|
|
command='add_task',
|
|
user=user,
|
|
params={'task_id': task_id}
|
|
)
|
|
|
|
if not rbac.validate_permission(user, 'task_add'):
|
|
self.audit_logger.log_denial(
|
|
command='add_task',
|
|
user=user,
|
|
reason='RBAC validation failed'
|
|
)
|
|
click.echo("Permission denied")
|
|
return
|
|
|
|
# Implementation would go here
|
|
click.echo(f"Added task {task_id}")
|
|
self.audit_logger.log_success(
|
|
command='add_task',
|
|
user=user,
|
|
result={'task_id': task_id}
|
|
)
|
|
|
|
@click.command()
|
|
@click.option('--user', required=True, help='User requesting task')
|
|
@timed_command
|
|
def get_next_task(self, user: str):
|
|
"""Get next available task with RBAC validation"""
|
|
self.audit_logger.log_attempt(
|
|
command='get_next_task',
|
|
user=user
|
|
)
|
|
|
|
if not rbac.validate_permission(user, 'task_read'):
|
|
self.audit_logger.log_denial(
|
|
command='get_next_task',
|
|
user=user,
|
|
reason='RBAC validation failed'
|
|
)
|
|
click.echo("Permission denied")
|
|
return
|
|
|
|
# Implementation would go here
|
|
click.echo("Retrieved next task")
|
|
self.audit_logger.log_success(
|
|
command='get_next_task',
|
|
user=user
|
|
)
|
|
|
|
@click.command()
|
|
@click.option('--task-id', required=True, help='Task ID to process')
|
|
@click.option('--user', required=True, help='User processing task')
|
|
@timed_command
|
|
def process_task(self, task_id: str, user: str):
|
|
"""Process a task with RBAC validation"""
|
|
self.audit_logger.log_attempt(
|
|
command='process_task',
|
|
user=user,
|
|
params={'task_id': task_id}
|
|
)
|
|
|
|
if not rbac.validate_permission(user, 'task_process'):
|
|
self.audit_logger.log_denial(
|
|
command='process_task',
|
|
user=user,
|
|
reason='RBAC validation failed'
|
|
)
|
|
click.echo("Permission denied")
|
|
return
|
|
|
|
# Implementation would go here
|
|
click.echo(f"Processed task {task_id}")
|
|
self.audit_logger.log_success(
|
|
command='process_task',
|
|
user=user,
|
|
result={'task_id': task_id}
|
|
)
|
|
|
|
@click.command()
|
|
@click.option('--user', required=True, help='User to validate')
|
|
@click.option('--permission', required=True, help='Permission to validate')
|
|
@timed_command
|
|
def validate_permissions(self, user: str, permission: str):
|
|
"""Validate user permissions"""
|
|
self.audit_logger.log_attempt(
|
|
command='validate_permissions',
|
|
user=user,
|
|
params={'permission': permission}
|
|
)
|
|
|
|
result = rbac.validate_permission(user, permission)
|
|
self.audit_logger.log_validation(
|
|
user=user,
|
|
permission=permission,
|
|
result=result
|
|
)
|
|
click.echo(f"Permission {'granted' if result else 'denied'}") |