from flask import Flask, request, jsonify from security.rbac_engine import RBACEngine from security.audit import SecureAudit from functools import wraps import time import ssl from werkzeug.middleware.proxy_fix import ProxyFix from flask_talisman import Talisman from flask_limiter import Limiter from flask_limiter.util import get_remote_address from flask_caching import Cache app = Flask(__name__) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) # Configure caching cache = Cache(config={'CACHE_TYPE': 'SimpleCache', 'CACHE_DEFAULT_TIMEOUT': 60}) cache.init_app(app) # Security Configuration csp = { 'default-src': "'self'", 'script-src': "'self' 'unsafe-inline'", 'style-src': "'self' 'unsafe-inline'", 'img-src': "'self' data:", 'font-src': "'self'" } talisman = Talisman( app, force_https=True, strict_transport_security=True, session_cookie_secure=True, content_security_policy=csp, referrer_policy='strict-origin-when-cross-origin' ) limiter = Limiter( app=app, key_func=get_remote_address, default_limits=["200 per day", "50 per hour"] ) rbac = RBACEngine() audit = SecureAudit(rbac, db_path="audit.db", key_path="audit.key") from functools import lru_cache @lru_cache(maxsize=4096) def cached_validate_permission(user, permission): """Cached version of RBAC validation""" return rbac.validate_permission(user, permission) def rbac_required(permission): """RBAC middleware decorator""" def decorator(f): @wraps(f) def wrapped(*args, **kwargs): start_time = time.time() # Extract user from client certificate user = request.headers.get('X-Client-Cert-User') if not user: return jsonify({'error': 'Missing user certificate'}), 401 if not cached_validate_permission(user, permission): return jsonify({'error': 'Permission denied'}), 403 response = f(*args, **kwargs) response_time = time.time() - start_time # Ensure response time < 500ms if response_time > 0.5: app.logger.warning(f"Slow response: {response_time:.3f}s") return response return wrapped return decorator @app.route('/tasks', methods=['POST']) @rbac_required('task_add') @limiter.limit("10 per minute") def add_task(): """Add a new task""" user = request.headers.get('X-Client-Cert-User') audit.log_operation( operation='task_add', key=str(request.json), success=True, user=user, reason='started' ) try: # Implementation will go here result = {'status': 'Task added'} audit.log_operation( operation='task_add', key=str(request.json), success=True, user=user, reason='completed' ) return jsonify(result) except Exception as e: audit.log_operation( operation='task_add', key=str({'error': str(e)}), success=False, user=user, reason='failed' ) raise @app.route('/tasks/next', methods=['GET']) @rbac_required('task_read') @cache.cached(timeout=60) def get_next_task(): """Get next available task""" # Implementation will go here return jsonify({'task': 'Next task data'}) @app.route('/tasks//process', methods=['POST']) @rbac_required('task_process') def process_task(task_id): """Process a task""" # Implementation will go here return jsonify({'status': f'Processed task {task_id}'}) @app.route('/permissions/validate', methods=['GET']) def validate_permissions(): """Validate user permissions""" user = request.args.get('user') permission = request.args.get('permission') if not user or not permission: return jsonify({'error': 'Missing parameters'}), 400 result = rbac.validate_permission(user, permission) return jsonify({'permission_granted': result}) if __name__ == '__main__': context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_3) context.set_ciphers('AES256-GCM:CHACHA20') context.load_cert_chain('cert.pem', 'key.pem') context.verify_mode = ssl.CERT_REQUIRED app.run( host='0.0.0.0', port=5000, ssl_context=context, threaded=True )