152 lines
No EOL
4.4 KiB
Python
152 lines
No EOL
4.4 KiB
Python
from flask import Flask, request, jsonify
|
|
from security.rbac_engine import RBACEngine
|
|
from security.audit import SecureAudit
|
|
from functools import wraps
|
|
import time
|
|
import ssl
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from flask_talisman import Talisman
|
|
from flask_limiter import Limiter
|
|
from flask_limiter.util import get_remote_address
|
|
|
|
from flask_caching import Cache
|
|
|
|
app = Flask(__name__)
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
|
|
|
|
# Configure caching
|
|
cache = Cache(config={'CACHE_TYPE': 'SimpleCache', 'CACHE_DEFAULT_TIMEOUT': 60})
|
|
cache.init_app(app)
|
|
|
|
# Security Configuration
|
|
csp = {
|
|
'default-src': "'self'",
|
|
'script-src': "'self' 'unsafe-inline'",
|
|
'style-src': "'self' 'unsafe-inline'",
|
|
'img-src': "'self' data:",
|
|
'font-src': "'self'"
|
|
}
|
|
|
|
talisman = Talisman(
|
|
app,
|
|
force_https=True,
|
|
strict_transport_security=True,
|
|
session_cookie_secure=True,
|
|
content_security_policy=csp,
|
|
referrer_policy='strict-origin-when-cross-origin'
|
|
)
|
|
|
|
limiter = Limiter(
|
|
app=app,
|
|
key_func=get_remote_address,
|
|
default_limits=["200 per day", "50 per hour"]
|
|
)
|
|
|
|
rbac = RBACEngine()
|
|
audit = SecureAudit(rbac, db_path="audit.db", key_path="audit.key")
|
|
|
|
from functools import lru_cache
|
|
|
|
@lru_cache(maxsize=4096)
|
|
def cached_validate_permission(user, permission):
|
|
"""Cached version of RBAC validation"""
|
|
return rbac.validate_permission(user, permission)
|
|
|
|
def rbac_required(permission):
|
|
"""RBAC middleware decorator"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def wrapped(*args, **kwargs):
|
|
start_time = time.time()
|
|
|
|
# Extract user from client certificate
|
|
user = request.headers.get('X-Client-Cert-User')
|
|
if not user:
|
|
return jsonify({'error': 'Missing user certificate'}), 401
|
|
|
|
if not cached_validate_permission(user, permission):
|
|
return jsonify({'error': 'Permission denied'}), 403
|
|
|
|
response = f(*args, **kwargs)
|
|
response_time = time.time() - start_time
|
|
|
|
# Ensure response time < 500ms
|
|
if response_time > 0.5:
|
|
app.logger.warning(f"Slow response: {response_time:.3f}s")
|
|
|
|
return response
|
|
return wrapped
|
|
return decorator
|
|
|
|
@app.route('/tasks', methods=['POST'])
|
|
@rbac_required('task_add')
|
|
@limiter.limit("10 per minute")
|
|
def add_task():
|
|
"""Add a new task"""
|
|
user = request.headers.get('X-Client-Cert-User')
|
|
audit.log_operation(
|
|
operation='task_add',
|
|
key=str(request.json),
|
|
success=True,
|
|
user=user,
|
|
reason='started'
|
|
)
|
|
try:
|
|
# Implementation will go here
|
|
result = {'status': 'Task added'}
|
|
audit.log_operation(
|
|
operation='task_add',
|
|
key=str(request.json),
|
|
success=True,
|
|
user=user,
|
|
reason='completed'
|
|
)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
audit.log_operation(
|
|
operation='task_add',
|
|
key=str({'error': str(e)}),
|
|
success=False,
|
|
user=user,
|
|
reason='failed'
|
|
)
|
|
raise
|
|
|
|
@app.route('/tasks/next', methods=['GET'])
|
|
@rbac_required('task_read')
|
|
@cache.cached(timeout=60)
|
|
def get_next_task():
|
|
"""Get next available task"""
|
|
# Implementation will go here
|
|
return jsonify({'task': 'Next task data'})
|
|
|
|
@app.route('/tasks/<task_id>/process', methods=['POST'])
|
|
@rbac_required('task_process')
|
|
def process_task(task_id):
|
|
"""Process a task"""
|
|
# Implementation will go here
|
|
return jsonify({'status': f'Processed task {task_id}'})
|
|
|
|
@app.route('/permissions/validate', methods=['GET'])
|
|
def validate_permissions():
|
|
"""Validate user permissions"""
|
|
user = request.args.get('user')
|
|
permission = request.args.get('permission')
|
|
if not user or not permission:
|
|
return jsonify({'error': 'Missing parameters'}), 400
|
|
|
|
result = rbac.validate_permission(user, permission)
|
|
return jsonify({'permission_granted': result})
|
|
|
|
if __name__ == '__main__':
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_3)
|
|
context.set_ciphers('AES256-GCM:CHACHA20')
|
|
context.load_cert_chain('cert.pem', 'key.pem')
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
|
|
|
app.run(
|
|
host='0.0.0.0',
|
|
port=5000,
|
|
ssl_context=context,
|
|
threaded=True
|
|
) |