ai-agent/web_interface.py

152 lines
No EOL
4.4 KiB
Python

from flask import Flask, request, jsonify
from security.rbac_engine import RBACEngine
from security.audit import SecureAudit
from functools import wraps
import time
import ssl
from werkzeug.middleware.proxy_fix import ProxyFix
from flask_talisman import Talisman
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_caching import Cache
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
# Configure caching
cache = Cache(config={'CACHE_TYPE': 'SimpleCache', 'CACHE_DEFAULT_TIMEOUT': 60})
cache.init_app(app)
# Security Configuration
csp = {
'default-src': "'self'",
'script-src': "'self' 'unsafe-inline'",
'style-src': "'self' 'unsafe-inline'",
'img-src': "'self' data:",
'font-src': "'self'"
}
talisman = Talisman(
app,
force_https=True,
strict_transport_security=True,
session_cookie_secure=True,
content_security_policy=csp,
referrer_policy='strict-origin-when-cross-origin'
)
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
rbac = RBACEngine()
audit = SecureAudit(rbac, db_path="audit.db", key_path="audit.key")
from functools import lru_cache
@lru_cache(maxsize=4096)
def cached_validate_permission(user, permission):
"""Cached version of RBAC validation"""
return rbac.validate_permission(user, permission)
def rbac_required(permission):
"""RBAC middleware decorator"""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
start_time = time.time()
# Extract user from client certificate
user = request.headers.get('X-Client-Cert-User')
if not user:
return jsonify({'error': 'Missing user certificate'}), 401
if not cached_validate_permission(user, permission):
return jsonify({'error': 'Permission denied'}), 403
response = f(*args, **kwargs)
response_time = time.time() - start_time
# Ensure response time < 500ms
if response_time > 0.5:
app.logger.warning(f"Slow response: {response_time:.3f}s")
return response
return wrapped
return decorator
@app.route('/tasks', methods=['POST'])
@rbac_required('task_add')
@limiter.limit("10 per minute")
def add_task():
"""Add a new task"""
user = request.headers.get('X-Client-Cert-User')
audit.log_operation(
operation='task_add',
key=str(request.json),
success=True,
user=user,
reason='started'
)
try:
# Implementation will go here
result = {'status': 'Task added'}
audit.log_operation(
operation='task_add',
key=str(request.json),
success=True,
user=user,
reason='completed'
)
return jsonify(result)
except Exception as e:
audit.log_operation(
operation='task_add',
key=str({'error': str(e)}),
success=False,
user=user,
reason='failed'
)
raise
@app.route('/tasks/next', methods=['GET'])
@rbac_required('task_read')
@cache.cached(timeout=60)
def get_next_task():
"""Get next available task"""
# Implementation will go here
return jsonify({'task': 'Next task data'})
@app.route('/tasks/<task_id>/process', methods=['POST'])
@rbac_required('task_process')
def process_task(task_id):
"""Process a task"""
# Implementation will go here
return jsonify({'status': f'Processed task {task_id}'})
@app.route('/permissions/validate', methods=['GET'])
def validate_permissions():
"""Validate user permissions"""
user = request.args.get('user')
permission = request.args.get('permission')
if not user or not permission:
return jsonify({'error': 'Missing parameters'}), 400
result = rbac.validate_permission(user, permission)
return jsonify({'permission_granted': result})
if __name__ == '__main__':
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_3)
context.set_ciphers('AES256-GCM:CHACHA20')
context.load_cert_chain('cert.pem', 'key.pem')
context.verify_mode = ssl.CERT_REQUIRED
app.run(
host='0.0.0.0',
port=5000,
ssl_context=context,
threaded=True
)