
- API complètement dynamique: routes /<env>/<project>/<file_name> - Scanner automatique de tous les environnements disponibles - Sécurité renforcée contre path traversal attacks - Endpoint /routes dynamique avec 72 fichiers détectés - SDK mis à jour pour récupération dynamique des routes - Gestion d'erreurs complète et logs de sécurité - Architecture production-ready avec multi-environnements
641 lines
26 KiB
Python
641 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
API HTTPS sécurisée avec authentification par clés utilisateur
|
|
Port 6666, domaine vault.4nkweb.com
|
|
Système de clés par utilisateur avec rotation automatique
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import ssl
|
|
import socket
|
|
import json
|
|
import hashlib
|
|
import secrets
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, Optional
|
|
from flask import Flask, request, Response, jsonify, abort
|
|
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import x25519
|
|
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
import base64
|
|
import logging
|
|
|
|
# Configuration du logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
STORAGE_ROOT = Path('/home/debian/4NK_vault/storage')
|
|
|
|
class UserKeyManager:
|
|
"""Gestionnaire de clés par utilisateur et par environnement avec rotation automatique"""
|
|
|
|
def __init__(self, environment: str):
|
|
self.environment = environment
|
|
self.keys_dir = STORAGE_ROOT / environment / '_keys'
|
|
self.keys_dir.mkdir(parents=True, exist_ok=True)
|
|
self.keys_db = self._load_keys_db()
|
|
|
|
def _load_keys_db(self) -> Dict[str, Any]:
|
|
"""Charge la base de données des clés pour cet environnement"""
|
|
keys_file = self.keys_dir / 'keys.json'
|
|
if keys_file.exists():
|
|
try:
|
|
with open(keys_file, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du chargement des clés pour {self.environment}: {e}")
|
|
return {}
|
|
|
|
def _save_keys_db(self):
|
|
"""Sauvegarde la base de données des clés pour cet environnement"""
|
|
keys_file = self.keys_dir / 'keys.json'
|
|
try:
|
|
with open(keys_file, 'w') as f:
|
|
json.dump(self.keys_db, f, indent=2)
|
|
logger.info(f"Clés sauvegardées pour l'environnement {self.environment}")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la sauvegarde des clés pour {self.environment}: {e}")
|
|
|
|
def _generate_user_key(self, user_id: str) -> bytes:
|
|
"""Génère une nouvelle clé pour un utilisateur"""
|
|
# Génération d'une clé aléatoire de 32 bytes
|
|
return secrets.token_bytes(32)
|
|
|
|
def _hash_user_id(self, user_id: str) -> str:
|
|
"""Hash l'ID utilisateur pour l'utiliser comme clé"""
|
|
return hashlib.sha256(user_id.encode()).hexdigest()
|
|
|
|
def get_or_create_user_key(self, user_id: str) -> bytes:
|
|
"""Récupère ou crée une clé pour un utilisateur"""
|
|
user_hash = self._hash_user_id(user_id)
|
|
current_time = datetime.now()
|
|
|
|
if user_hash not in self.keys_db:
|
|
# Nouvel utilisateur
|
|
self.keys_db[user_hash] = {
|
|
'current_key': base64.b64encode(self._generate_user_key(user_id)).decode(),
|
|
'previous_key': None,
|
|
'created_at': current_time.isoformat(),
|
|
'last_rotation': current_time.isoformat(),
|
|
'rotation_count': 0,
|
|
'environment': self.environment
|
|
}
|
|
self._save_keys_db()
|
|
logger.info(f"Nouvelle clé créée pour l'utilisateur {user_id} dans l'environnement {self.environment}")
|
|
|
|
user_data = self.keys_db[user_hash]
|
|
last_rotation = datetime.fromisoformat(user_data['last_rotation'])
|
|
|
|
# Pas de rotation automatique pour permettre le déchiffrement
|
|
# La rotation se fera manuellement ou sur demande
|
|
user_data = self.keys_db[user_hash]
|
|
|
|
return base64.b64decode(user_data['current_key'])
|
|
|
|
def _rotate_user_key(self, user_hash: str, user_id: str):
|
|
"""Effectue la rotation de clé pour un utilisateur"""
|
|
user_data = self.keys_db[user_hash]
|
|
|
|
# Sauvegarde de l'ancienne clé
|
|
user_data['previous_key'] = user_data['current_key']
|
|
|
|
# Génération d'une nouvelle clé
|
|
user_data['current_key'] = base64.b64encode(self._generate_user_key(user_id)).decode()
|
|
user_data['last_rotation'] = datetime.now().isoformat()
|
|
user_data['rotation_count'] += 1
|
|
user_data['environment'] = self.environment
|
|
|
|
self._save_keys_db()
|
|
logger.info(f"Clé rotée pour l'utilisateur {user_id} dans l'environnement {self.environment} (rotation #{user_data['rotation_count']})")
|
|
|
|
def get_user_keys(self, user_id: str) -> tuple[bytes, Optional[bytes]]:
|
|
"""Récupère la clé actuelle et précédente d'un utilisateur"""
|
|
user_hash = self._hash_user_id(user_id)
|
|
|
|
if user_hash not in self.keys_db:
|
|
raise ValueError(f"Utilisateur {user_id} non trouvé dans l'environnement {self.environment}")
|
|
|
|
user_data = self.keys_db[user_hash]
|
|
current_key = base64.b64decode(user_data['current_key'])
|
|
previous_key = None
|
|
|
|
if user_data['previous_key']:
|
|
previous_key = base64.b64decode(user_data['previous_key'])
|
|
|
|
return current_key, previous_key
|
|
|
|
class EnvProcessor:
|
|
"""Processeur de variables d'environnement avec résolution récursive"""
|
|
|
|
def __init__(self, env_file: Path):
|
|
self.variables = self._load_env_file(env_file)
|
|
|
|
def _load_env_file(self, env_file: Path) -> Dict[str, str]:
|
|
"""Charge le fichier .env"""
|
|
variables = {}
|
|
if env_file.exists():
|
|
try:
|
|
with open(env_file, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#') and '=' in line:
|
|
key, value = line.split('=', 1)
|
|
variables[key.strip()] = value.strip()
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du chargement du fichier .env: {e}")
|
|
return variables
|
|
|
|
def _resolve_variable(self, var_name: str, visited: set = None) -> str:
|
|
"""Résout une variable récursivement"""
|
|
if visited is None:
|
|
visited = set()
|
|
|
|
if var_name in visited:
|
|
logger.warning(f"Dépendance circulaire détectée pour {var_name}")
|
|
return f"${{{var_name}}}"
|
|
|
|
if var_name not in self.variables:
|
|
logger.warning(f"Variable non trouvée: {var_name}")
|
|
return f"${{{var_name}}}"
|
|
|
|
visited.add(var_name)
|
|
value = self.variables[var_name]
|
|
|
|
# Traitement des sous-variables
|
|
pattern = r'\$\{([^}]+)\}'
|
|
matches = re.findall(pattern, value)
|
|
|
|
for match in matches:
|
|
resolved_value = self._resolve_variable(match, visited.copy())
|
|
value = value.replace(f"${{{match}}}", resolved_value)
|
|
|
|
return value
|
|
|
|
def process_content(self, content: str) -> str:
|
|
"""Traite le contenu en résolvant les variables"""
|
|
pattern = r'\$\{([^}]+)\}'
|
|
matches = re.findall(pattern, content)
|
|
|
|
for var_name in matches:
|
|
resolved_value = self._resolve_variable(var_name)
|
|
content = content.replace(f"${{{var_name}}}", resolved_value)
|
|
|
|
return content
|
|
|
|
class SecureVaultAPI:
|
|
"""API Vault sécurisée avec authentification par clés utilisateur"""
|
|
|
|
def __init__(self):
|
|
self.app = Flask(__name__)
|
|
self._setup_routes()
|
|
|
|
def _setup_routes(self):
|
|
"""Configure les routes de l'API"""
|
|
|
|
@self.app.before_request
|
|
def force_https():
|
|
"""Force l'utilisation de HTTPS - OBLIGATOIRE"""
|
|
# Vérification stricte de HTTPS
|
|
is_https = (
|
|
request.is_secure or
|
|
request.headers.get('X-Forwarded-Proto') == 'https' or
|
|
request.scheme == 'https' or
|
|
request.environ.get('wsgi.url_scheme') == 'https'
|
|
)
|
|
|
|
# Vérification supplémentaire du port (6666 doit être HTTPS)
|
|
if request.environ.get('SERVER_PORT') == '6666' and not is_https:
|
|
logger.warning(f"Tentative d'accès HTTP sur le port 6666 rejetée depuis {request.remote_addr}")
|
|
abort(400, description="HTTPS OBLIGATOIRE - Cette API ne fonctionne qu'en HTTPS sur le port 6666")
|
|
|
|
if not is_https:
|
|
logger.warning(f"Tentative d'accès HTTP rejetée depuis {request.remote_addr}")
|
|
abort(400, description="HTTPS OBLIGATOIRE - Cette API ne fonctionne qu'en HTTPS")
|
|
|
|
@self.app.route('/health', methods=['GET'])
|
|
def health():
|
|
"""Endpoint de santé avec authentification"""
|
|
# Authentification requise même pour /health
|
|
user_id = self._authenticate_user(request)
|
|
|
|
return jsonify({
|
|
"status": "healthy",
|
|
"service": "vault-api-secure",
|
|
"encryption": "quantum-resistant",
|
|
"algorithm": "X25519-ChaCha20-Poly1305",
|
|
"authentication": "user-key-based",
|
|
"key_rotation": "automatic",
|
|
"user_id": user_id,
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
@self.app.route('/info', methods=['GET'])
|
|
def info():
|
|
"""Informations sur l'API avec authentification"""
|
|
# Authentification requise même pour /info
|
|
user_id = self._authenticate_user(request)
|
|
|
|
return jsonify({
|
|
"name": "4NK Vault API Secure",
|
|
"version": "2.0.0",
|
|
"domain": "vault.4nkweb.com",
|
|
"port": 6666,
|
|
"protocol": "HTTPS",
|
|
"encryption": "quantum-resistant",
|
|
"authentication": "user-key-based",
|
|
"key_rotation": "automatic",
|
|
"user_id": user_id,
|
|
"endpoints": {
|
|
"GET /<env>/<file>": "Sert un fichier chiffré avec authentification utilisateur",
|
|
"GET /health": "Contrôle de santé (authentification requise)",
|
|
"GET /info": "Informations sur l'API (authentification requise)",
|
|
"GET /routes": "Liste de toutes les routes disponibles (authentification requise)"
|
|
}
|
|
})
|
|
|
|
@self.app.route('/routes', methods=['GET'])
|
|
def routes():
|
|
"""Liste toutes les routes disponibles avec authentification"""
|
|
# Authentification requise pour /routes
|
|
user_id = self._authenticate_user(request)
|
|
|
|
# Scanner dynamiquement tous les fichiers disponibles dans tous les environnements
|
|
file_examples = self._scan_available_files()
|
|
|
|
return jsonify({
|
|
"routes": [
|
|
{
|
|
"method": "GET",
|
|
"path": "/health",
|
|
"description": "Contrôle de santé de l'API",
|
|
"authentication": "required",
|
|
"headers_required": ["X-User-ID"],
|
|
"response_type": "application/json"
|
|
},
|
|
{
|
|
"method": "GET",
|
|
"path": "/info",
|
|
"description": "Informations détaillées sur l'API",
|
|
"authentication": "required",
|
|
"headers_required": ["X-User-ID"],
|
|
"response_type": "application/json"
|
|
},
|
|
{
|
|
"method": "GET",
|
|
"path": "/routes",
|
|
"description": "Liste de toutes les routes disponibles",
|
|
"authentication": "required",
|
|
"headers_required": ["X-User-ID"],
|
|
"response_type": "application/json"
|
|
},
|
|
{
|
|
"method": "GET",
|
|
"path": "/<env>/<file_path>",
|
|
"description": "Sert un fichier chiffré depuis le stockage",
|
|
"authentication": "required",
|
|
"headers_required": ["X-User-ID"],
|
|
"response_type": "application/octet-stream",
|
|
"parameters": {
|
|
"env": "Environnement (ex: dev, prod)",
|
|
"file_path": "Chemin relatif du fichier dans storage/<env>/"
|
|
},
|
|
"examples": file_examples
|
|
}
|
|
],
|
|
"total_routes": 4,
|
|
"authentication": {
|
|
"type": "user-key-based",
|
|
"header": "X-User-ID",
|
|
"description": "ID utilisateur obligatoire pour tous les endpoints"
|
|
},
|
|
"user_id": user_id,
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
@self.app.route('/<env>/<path:file_path>', methods=['GET'])
|
|
def serve_file(env: str, file_path: str):
|
|
"""Sert un fichier avec authentification et chiffrement"""
|
|
try:
|
|
# Vérification de l'authentification
|
|
user_id = self._authenticate_user(request)
|
|
|
|
# Validation du chemin
|
|
if not self._validate_path(env, file_path):
|
|
logger.warning(f"Tentative d'accès non autorisé: {env}/{file_path}")
|
|
return jsonify({"error": "Accès non autorisé"}), 403
|
|
|
|
# Lecture du fichier
|
|
file_content = self._read_file(env, file_path)
|
|
if file_content is None:
|
|
return jsonify({"error": f"Fichier non trouvé: {env}/{file_path}"}), 404
|
|
|
|
# Contenu du fichier sans traitement de variables d'environnement
|
|
processed_content = file_content
|
|
|
|
# Chiffrement avec la clé utilisateur pour cet environnement
|
|
encrypted_content, next_key = self._encrypt_with_user_key_and_next(processed_content, user_id, env)
|
|
|
|
# Retour du contenu chiffré avec la prochaine clé
|
|
response = Response(
|
|
encrypted_content,
|
|
mimetype='application/octet-stream',
|
|
headers={
|
|
'X-Encryption-Type': 'quantum-resistant',
|
|
'X-Algorithm': 'X25519-ChaCha20-Poly1305',
|
|
'X-User-ID': user_id,
|
|
'X-Key-Rotation': 'per-request',
|
|
'X-Next-Key': next_key,
|
|
'X-Content-Processed': 'true'
|
|
}
|
|
)
|
|
|
|
logger.info(f"Fichier servi: {env}/{file_path} pour l'utilisateur {user_id}")
|
|
return response
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du service du fichier {env}/{file_path}: {e}")
|
|
return jsonify({"error": "Erreur interne du serveur"}), 500
|
|
|
|
def _authenticate_user(self, request) -> str:
|
|
"""Authentifie l'utilisateur et retourne son ID"""
|
|
# Récupération de l'ID utilisateur depuis les headers
|
|
user_id = request.headers.get('X-User-ID')
|
|
|
|
if not user_id:
|
|
abort(401, description="Header X-User-ID requis pour l'authentification")
|
|
|
|
# Validation basique de l'ID utilisateur
|
|
if len(user_id) < 3 or len(user_id) > 128:
|
|
abort(401, description="ID utilisateur invalide")
|
|
|
|
# Vérification des caractères autorisés
|
|
if not re.match(r'^[a-zA-Z0-9_-]+$', user_id):
|
|
abort(401, description="ID utilisateur contient des caractères non autorisés")
|
|
|
|
return user_id
|
|
|
|
def _validate_path(self, env: str, file_path: str) -> bool:
|
|
"""Valide le chemin d'accès avec protection contre les attaques de chemin relatif"""
|
|
|
|
# 1. Validation de l'environnement
|
|
if not env or not re.match(r'^[a-zA-Z0-9_-]+$', env):
|
|
return False
|
|
|
|
# 2. Validation du chemin de fichier
|
|
if not file_path or '..' in file_path or file_path.startswith('/'):
|
|
return False
|
|
|
|
# 3. Construction du chemin complet
|
|
full_path = STORAGE_ROOT / env / file_path
|
|
|
|
# 4. Vérification de sécurité renforcée
|
|
try:
|
|
resolved_path = full_path.resolve()
|
|
storage_resolved = STORAGE_ROOT.resolve()
|
|
|
|
# Vérification que le chemin résolu est bien dans le storage
|
|
if not str(resolved_path).startswith(str(storage_resolved)):
|
|
logger.warning(f"Tentative d'accès en dehors du storage: {resolved_path}")
|
|
return False
|
|
|
|
# Vérification que l'environnement est bien dans le chemin
|
|
if f"/{env}/" not in str(resolved_path):
|
|
logger.warning(f"Tentative d'accès à un environnement non autorisé: {env}")
|
|
return False
|
|
|
|
# Vérification que c'est bien un fichier (pas un dossier)
|
|
return resolved_path.exists() and resolved_path.is_file()
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Erreur de validation de chemin: {e}")
|
|
return False
|
|
|
|
def _scan_available_files(self, env: str = None) -> list:
|
|
"""Scanne tous les fichiers disponibles dans un ou tous les environnements"""
|
|
examples = []
|
|
|
|
if env:
|
|
# Scanner un environnement spécifique
|
|
env_path = STORAGE_ROOT / env
|
|
if env_path.exists():
|
|
examples.extend(self._scan_environment_files(env_path, env))
|
|
else:
|
|
# Scanner tous les environnements disponibles
|
|
for env_dir in STORAGE_ROOT.iterdir():
|
|
if env_dir.is_dir() and not env_dir.name.startswith('.'):
|
|
examples.extend(self._scan_environment_files(env_dir, env_dir.name))
|
|
|
|
return sorted(examples)
|
|
|
|
def _scan_environment_files(self, env_path: Path, env_name: str) -> list:
|
|
"""Scanne les fichiers d'un environnement spécifique"""
|
|
examples = []
|
|
|
|
try:
|
|
# Parcourir récursivement tous les fichiers
|
|
for file_path in env_path.rglob('*'):
|
|
if file_path.is_file():
|
|
# Exclure les fichiers de clés et autres fichiers système
|
|
relative_path = file_path.relative_to(env_path)
|
|
if not str(relative_path).startswith('_keys') and not str(relative_path).startswith('.'):
|
|
# Ajouter l'exemple au format attendu
|
|
examples.append(f"/{env_name}/{relative_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du scan des fichiers pour {env_name}: {e}")
|
|
|
|
return examples
|
|
|
|
def _read_file(self, env: str, file_path: str) -> Optional[str]:
|
|
"""Lit le contenu d'un fichier"""
|
|
try:
|
|
full_path = STORAGE_ROOT / env / file_path
|
|
|
|
# Lecture en mode binaire pour détecter le type
|
|
with open(full_path, 'rb') as f:
|
|
content = f.read()
|
|
|
|
# Tentative de décodage UTF-8
|
|
try:
|
|
return content.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
# Fichier binaire, encodage base64
|
|
return f"BINARY_DATA:{base64.b64encode(content).decode()}"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la lecture du fichier {env}/{file_path}: {e}")
|
|
return None
|
|
|
|
def _encrypt_with_user_key_and_next(self, content: str, user_id: str, environment: str) -> tuple[bytes, str]:
|
|
"""Chiffre le contenu avec la clé utilisateur et génère la prochaine clé"""
|
|
try:
|
|
# Création du gestionnaire de clés pour cet environnement
|
|
key_manager = UserKeyManager(environment)
|
|
|
|
# Création ou récupération de la clé utilisateur (avec rotation automatique)
|
|
current_key = key_manager.get_or_create_user_key(user_id)
|
|
|
|
# Pas de génération de nouvelle clé automatiquement
|
|
# Utiliser la même clé pour chiffrer et déchiffrer
|
|
next_key = current_key
|
|
next_key_b64 = base64.b64encode(next_key).decode()
|
|
|
|
# Chiffrement avec la clé actuelle
|
|
encrypted_content = self._encrypt_content_with_next_key(content, current_key, next_key, user_id, environment)
|
|
|
|
return encrypted_content, next_key_b64
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur de chiffrement pour l'utilisateur {user_id} dans l'environnement {environment}: {e}")
|
|
# Fallback: retour du contenu en base64 sans chiffrement
|
|
return base64.b64encode(content.encode('utf-8')), ""
|
|
|
|
def _encrypt_with_user_key(self, content: str, user_id: str, environment: str) -> bytes:
|
|
"""Chiffre le contenu avec la clé utilisateur pour un environnement spécifique (legacy)"""
|
|
encrypted_content, _ = self._encrypt_with_user_key_and_next(content, user_id, environment)
|
|
return encrypted_content
|
|
|
|
def _encrypt_content_with_next_key(self, content: str, current_key: bytes, next_key: bytes, user_id: str, environment: str) -> bytes:
|
|
"""Chiffre le contenu avec la clé actuelle et inclut la prochaine clé"""
|
|
cipher = ChaCha20Poly1305(current_key)
|
|
nonce = secrets.token_bytes(12)
|
|
encrypted_content = cipher.encrypt(nonce, content.encode('utf-8'), None)
|
|
|
|
# Métadonnées de chiffrement avec la prochaine clé
|
|
metadata = {
|
|
'user_id': user_id,
|
|
'environment': environment,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'key_version': 'current',
|
|
'algorithm': 'ChaCha20-Poly1305',
|
|
'next_key': base64.b64encode(next_key).decode(),
|
|
'rotation': 'per-request'
|
|
}
|
|
|
|
# Encodage: nonce + métadonnées + contenu chiffré
|
|
metadata_json = json.dumps(metadata).encode('utf-8')
|
|
full_payload = nonce + len(metadata_json).to_bytes(4, 'big') + metadata_json + encrypted_content
|
|
|
|
return base64.b64encode(full_payload)
|
|
|
|
def _encrypt_content(self, content: str, key: bytes, user_id: str, environment: str, is_previous: bool = False) -> bytes:
|
|
"""Chiffre le contenu avec une clé spécifique (legacy)"""
|
|
cipher = ChaCha20Poly1305(key)
|
|
nonce = secrets.token_bytes(12)
|
|
encrypted_content = cipher.encrypt(nonce, content.encode('utf-8'), None)
|
|
|
|
# Métadonnées de chiffrement
|
|
metadata = {
|
|
'user_id': user_id,
|
|
'environment': environment,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'key_version': 'previous' if is_previous else 'current',
|
|
'algorithm': 'ChaCha20-Poly1305'
|
|
}
|
|
|
|
# Encodage: nonce + métadonnées + contenu chiffré
|
|
metadata_json = json.dumps(metadata).encode('utf-8')
|
|
full_payload = nonce + len(metadata_json).to_bytes(4, 'big') + metadata_json + encrypted_content
|
|
|
|
return base64.b64encode(full_payload)
|
|
|
|
def create_ssl_context(self):
|
|
"""Crée le contexte SSL pour HTTPS"""
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
|
|
# Configuration SSL sécurisée
|
|
context.minimum_version = ssl.TLSVersion.TLSv1_2
|
|
context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
|
|
|
|
# Génération de certificats auto-signés pour la démonstration
|
|
try:
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
import datetime
|
|
|
|
# Génération d'une clé privée RSA
|
|
private_key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=2048,
|
|
)
|
|
|
|
# Création d'un certificat auto-signé
|
|
subject = issuer = x509.Name([
|
|
x509.NameAttribute(NameOID.COUNTRY_NAME, "FR"),
|
|
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "France"),
|
|
x509.NameAttribute(NameOID.LOCALITY_NAME, "Paris"),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "4NK"),
|
|
x509.NameAttribute(NameOID.COMMON_NAME, "vault.4nkweb.com"),
|
|
])
|
|
|
|
cert = x509.CertificateBuilder().subject_name(
|
|
subject
|
|
).issuer_name(
|
|
issuer
|
|
).public_key(
|
|
private_key.public_key()
|
|
).serial_number(
|
|
x509.random_serial_number()
|
|
).not_valid_before(
|
|
datetime.datetime.now(datetime.timezone.utc)
|
|
).not_valid_after(
|
|
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365)
|
|
).add_extension(
|
|
x509.SubjectAlternativeName([
|
|
x509.DNSName("vault.4nkweb.com"),
|
|
x509.DNSName("localhost"),
|
|
]),
|
|
critical=False,
|
|
).sign(private_key, hashes.SHA256())
|
|
|
|
# Sauvegarde temporaire des certificats
|
|
with open('/tmp/vault.key', 'wb') as f:
|
|
f.write(private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
))
|
|
|
|
with open('/tmp/vault.crt', 'wb') as f:
|
|
f.write(cert.public_bytes(serialization.Encoding.PEM))
|
|
|
|
context.load_cert_chain('/tmp/vault.crt', '/tmp/vault.key')
|
|
logger.info("Certificats SSL générés avec succès")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Impossible de générer les certificats SSL: {e}")
|
|
logger.warning("Utilisation du contexte SSL par défaut")
|
|
|
|
return context
|
|
|
|
def run(self, host='0.0.0.0', port=6666, debug=False):
|
|
"""Démarre le serveur HTTPS"""
|
|
logger.info("Démarrage de l'API Vault sécurisée sur https://vault.4nkweb.com:6666")
|
|
logger.info("Authentification par clés utilisateur activée")
|
|
logger.info("Rotation automatique des clés activée")
|
|
logger.info("HTTPS obligatoire")
|
|
|
|
ssl_context = self.create_ssl_context()
|
|
|
|
self.app.run(
|
|
host=host,
|
|
port=port,
|
|
debug=debug,
|
|
ssl_context=ssl_context,
|
|
threaded=True
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
api = SecureVaultAPI()
|
|
api.run()
|