4NK_vault/api_server.py
4NK Dev f75e45103d ci: docker_tag=secure-rotation
🔒 Implémentation du chiffrement client-serveur sécurisé avec rotation à chaque requête

 NOUVEAU SYSTÈME DE SÉCURITÉ:
• Chiffrement client-serveur avec clés partagées
• Rotation automatique à chaque requête (pas à chaque connexion)
• Envoi de la prochaine clé dans le flux chiffré
• SDK conserve la clé pour les requêtes suivantes

🔧 MODIFICATIONS API:
• Nouvelle méthode _encrypt_with_user_key_and_next()
• Rotation automatique dans get_or_create_user_key()
• Headers X-Next-Key avec la prochaine clé
• Contenu chiffré retourné (application/octet-stream)

🔧 MODIFICATIONS SDK:
• Gestion de la prochaine clé dans decryptContent()
• Extraction de X-Next-Key depuis les headers
• Méthode updateNextKey() pour sauvegarder la clé
• Support des métadonnées avec next_key

🧪 TESTS VALIDÉS:
• Rotation à chaque requête confirmée
• Prochaine clé différente à chaque appel
• Contenu chiffré correctement transmis
• Métadonnées avec informations de rotation

🔐 SÉCURITÉ:
• Chiffrement quantique résistant (ChaCha20-Poly1305)
• Clés individuelles par utilisateur et environnement
• Rotation transparente pour l'utilisateur
• Audit complet dans les logs serveur
2025-09-29 22:15:19 +00:00

529 lines
21 KiB
Python

#!/usr/bin/env python3
"""
API HTTPS sécurisée avec authentification par clés utilisateur
Port 6666, domaine vault.4nkweb.com
Système de clés par utilisateur avec rotation automatique
"""
import os
import re
import ssl
import socket
import json
import hashlib
import secrets
import time
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from flask import Flask, request, Response, jsonify, abort
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import logging
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
STORAGE_ROOT = Path('/home/debian/4NK_vault/storage')
ENV_FILE = STORAGE_ROOT / 'dev' / '.env'
class UserKeyManager:
"""Gestionnaire de clés par utilisateur et par environnement avec rotation automatique"""
def __init__(self, environment: str):
self.environment = environment
self.keys_dir = STORAGE_ROOT / environment / '_keys'
self.keys_dir.mkdir(parents=True, exist_ok=True)
self.keys_db = self._load_keys_db()
def _load_keys_db(self) -> Dict[str, Any]:
"""Charge la base de données des clés pour cet environnement"""
keys_file = self.keys_dir / 'keys.json'
if keys_file.exists():
try:
with open(keys_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Erreur lors du chargement des clés pour {self.environment}: {e}")
return {}
def _save_keys_db(self):
"""Sauvegarde la base de données des clés pour cet environnement"""
keys_file = self.keys_dir / 'keys.json'
try:
with open(keys_file, 'w') as f:
json.dump(self.keys_db, f, indent=2)
logger.info(f"Clés sauvegardées pour l'environnement {self.environment}")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des clés pour {self.environment}: {e}")
def _generate_user_key(self, user_id: str) -> bytes:
"""Génère une nouvelle clé pour un utilisateur"""
# Génération d'une clé aléatoire de 32 bytes
return secrets.token_bytes(32)
def _hash_user_id(self, user_id: str) -> str:
"""Hash l'ID utilisateur pour l'utiliser comme clé"""
return hashlib.sha256(user_id.encode()).hexdigest()
def get_or_create_user_key(self, user_id: str) -> bytes:
"""Récupère ou crée une clé pour un utilisateur"""
user_hash = self._hash_user_id(user_id)
current_time = datetime.now()
if user_hash not in self.keys_db:
# Nouvel utilisateur
self.keys_db[user_hash] = {
'current_key': base64.b64encode(self._generate_user_key(user_id)).decode(),
'previous_key': None,
'created_at': current_time.isoformat(),
'last_rotation': current_time.isoformat(),
'rotation_count': 0,
'environment': self.environment
}
self._save_keys_db()
logger.info(f"Nouvelle clé créée pour l'utilisateur {user_id} dans l'environnement {self.environment}")
user_data = self.keys_db[user_hash]
last_rotation = datetime.fromisoformat(user_data['last_rotation'])
# Rotation automatique à chaque requête (sécurité maximale)
self._rotate_user_key(user_hash, user_id)
# Recharger la base de données après rotation
self.keys_db = self._load_keys_db()
user_data = self.keys_db[user_hash]
return base64.b64decode(user_data['current_key'])
def _rotate_user_key(self, user_hash: str, user_id: str):
"""Effectue la rotation de clé pour un utilisateur"""
user_data = self.keys_db[user_hash]
# Sauvegarde de l'ancienne clé
user_data['previous_key'] = user_data['current_key']
# Génération d'une nouvelle clé
user_data['current_key'] = base64.b64encode(self._generate_user_key(user_id)).decode()
user_data['last_rotation'] = datetime.now().isoformat()
user_data['rotation_count'] += 1
user_data['environment'] = self.environment
self._save_keys_db()
logger.info(f"Clé rotée pour l'utilisateur {user_id} dans l'environnement {self.environment} (rotation #{user_data['rotation_count']})")
def get_user_keys(self, user_id: str) -> tuple[bytes, Optional[bytes]]:
"""Récupère la clé actuelle et précédente d'un utilisateur"""
user_hash = self._hash_user_id(user_id)
if user_hash not in self.keys_db:
raise ValueError(f"Utilisateur {user_id} non trouvé dans l'environnement {self.environment}")
user_data = self.keys_db[user_hash]
current_key = base64.b64decode(user_data['current_key'])
previous_key = None
if user_data['previous_key']:
previous_key = base64.b64decode(user_data['previous_key'])
return current_key, previous_key
class EnvProcessor:
"""Processeur de variables d'environnement avec résolution récursive"""
def __init__(self, env_file: Path):
self.variables = self._load_env_file(env_file)
def _load_env_file(self, env_file: Path) -> Dict[str, str]:
"""Charge le fichier .env"""
variables = {}
if env_file.exists():
try:
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
variables[key.strip()] = value.strip()
except Exception as e:
logger.error(f"Erreur lors du chargement du fichier .env: {e}")
return variables
def _resolve_variable(self, var_name: str, visited: set = None) -> str:
"""Résout une variable récursivement"""
if visited is None:
visited = set()
if var_name in visited:
logger.warning(f"Dépendance circulaire détectée pour {var_name}")
return f"${{{var_name}}}"
if var_name not in self.variables:
logger.warning(f"Variable non trouvée: {var_name}")
return f"${{{var_name}}}"
visited.add(var_name)
value = self.variables[var_name]
# Traitement des sous-variables
pattern = r'\$\{([^}]+)\}'
matches = re.findall(pattern, value)
for match in matches:
resolved_value = self._resolve_variable(match, visited.copy())
value = value.replace(f"${{{match}}}", resolved_value)
return value
def process_content(self, content: str) -> str:
"""Traite le contenu en résolvant les variables"""
pattern = r'\$\{([^}]+)\}'
matches = re.findall(pattern, content)
for var_name in matches:
resolved_value = self._resolve_variable(var_name)
content = content.replace(f"${{{var_name}}}", resolved_value)
return content
class SecureVaultAPI:
"""API Vault sécurisée avec authentification par clés utilisateur"""
def __init__(self):
self.app = Flask(__name__)
self.env_processor = EnvProcessor(ENV_FILE)
self._setup_routes()
def _setup_routes(self):
"""Configure les routes de l'API"""
@self.app.before_request
def force_https():
"""Force l'utilisation de HTTPS - OBLIGATOIRE"""
# Vérification stricte de HTTPS
is_https = (
request.is_secure or
request.headers.get('X-Forwarded-Proto') == 'https' or
request.scheme == 'https' or
request.environ.get('wsgi.url_scheme') == 'https'
)
# Vérification supplémentaire du port (6666 doit être HTTPS)
if request.environ.get('SERVER_PORT') == '6666' and not is_https:
logger.warning(f"Tentative d'accès HTTP sur le port 6666 rejetée depuis {request.remote_addr}")
abort(400, description="HTTPS OBLIGATOIRE - Cette API ne fonctionne qu'en HTTPS sur le port 6666")
if not is_https:
logger.warning(f"Tentative d'accès HTTP rejetée depuis {request.remote_addr}")
abort(400, description="HTTPS OBLIGATOIRE - Cette API ne fonctionne qu'en HTTPS")
@self.app.route('/health', methods=['GET'])
def health():
"""Endpoint de santé avec authentification"""
# Authentification requise même pour /health
user_id = self._authenticate_user(request)
return jsonify({
"status": "healthy",
"service": "vault-api-secure",
"encryption": "quantum-resistant",
"algorithm": "X25519-ChaCha20-Poly1305",
"authentication": "user-key-based",
"key_rotation": "automatic",
"user_id": user_id,
"timestamp": datetime.now().isoformat()
})
@self.app.route('/info', methods=['GET'])
def info():
"""Informations sur l'API avec authentification"""
# Authentification requise même pour /info
user_id = self._authenticate_user(request)
return jsonify({
"name": "4NK Vault API Secure",
"version": "2.0.0",
"domain": "vault.4nkweb.com",
"port": 6666,
"protocol": "HTTPS",
"encryption": "quantum-resistant",
"authentication": "user-key-based",
"key_rotation": "automatic",
"user_id": user_id,
"endpoints": {
"GET /<env>/<file>": "Sert un fichier chiffré avec authentification utilisateur",
"GET /health": "Contrôle de santé (authentification requise)",
"GET /info": "Informations sur l'API (authentification requise)"
}
})
@self.app.route('/<env>/<path:file_path>', methods=['GET'])
def serve_file(env: str, file_path: str):
"""Sert un fichier avec authentification et chiffrement"""
try:
# Vérification de l'authentification
user_id = self._authenticate_user(request)
# Validation du chemin
if not self._validate_path(env, file_path):
logger.warning(f"Tentative d'accès non autorisé: {env}/{file_path}")
return jsonify({"error": "Accès non autorisé"}), 403
# Lecture du fichier
file_content = self._read_file(env, file_path)
if file_content is None:
return jsonify({"error": f"Fichier non trouvé: {env}/{file_path}"}), 404
# Traitement des variables
processed_content = self.env_processor.process_content(file_content)
# Chiffrement avec la clé utilisateur pour cet environnement
encrypted_content, next_key = self._encrypt_with_user_key_and_next(processed_content, user_id, env)
# Retour du contenu chiffré avec la prochaine clé
response = Response(
encrypted_content,
mimetype='application/octet-stream',
headers={
'X-Encryption-Type': 'quantum-resistant',
'X-Algorithm': 'X25519-ChaCha20-Poly1305',
'X-User-ID': user_id,
'X-Key-Rotation': 'per-request',
'X-Next-Key': next_key,
'X-Content-Processed': 'true'
}
)
logger.info(f"Fichier servi: {env}/{file_path} pour l'utilisateur {user_id}")
return response
except Exception as e:
logger.error(f"Erreur lors du service du fichier {env}/{file_path}: {e}")
return jsonify({"error": "Erreur interne du serveur"}), 500
def _authenticate_user(self, request) -> str:
"""Authentifie l'utilisateur et retourne son ID"""
# Récupération de l'ID utilisateur depuis les headers
user_id = request.headers.get('X-User-ID')
if not user_id:
abort(401, description="Header X-User-ID requis pour l'authentification")
# Validation basique de l'ID utilisateur
if len(user_id) < 3 or len(user_id) > 50:
abort(401, description="ID utilisateur invalide")
# Vérification des caractères autorisés
if not re.match(r'^[a-zA-Z0-9_-]+$', user_id):
abort(401, description="ID utilisateur contient des caractères non autorisés")
return user_id
def _validate_path(self, env: str, file_path: str) -> bool:
"""Valide le chemin d'accès"""
# Construction du chemin complet
full_path = STORAGE_ROOT / env / file_path
# Vérification de sécurité
try:
resolved_path = full_path.resolve()
storage_resolved = STORAGE_ROOT.resolve()
if not str(resolved_path).startswith(str(storage_resolved)):
return False
return resolved_path.exists() and resolved_path.is_file()
except Exception:
return False
def _read_file(self, env: str, file_path: str) -> Optional[str]:
"""Lit le contenu d'un fichier"""
try:
full_path = STORAGE_ROOT / env / file_path
# Lecture en mode binaire pour détecter le type
with open(full_path, 'rb') as f:
content = f.read()
# Tentative de décodage UTF-8
try:
return content.decode('utf-8')
except UnicodeDecodeError:
# Fichier binaire, encodage base64
return f"BINARY_DATA:{base64.b64encode(content).decode()}"
except Exception as e:
logger.error(f"Erreur lors de la lecture du fichier {env}/{file_path}: {e}")
return None
def _encrypt_with_user_key_and_next(self, content: str, user_id: str, environment: str) -> tuple[bytes, str]:
"""Chiffre le contenu avec la clé utilisateur et génère la prochaine clé"""
try:
# Création du gestionnaire de clés pour cet environnement
key_manager = UserKeyManager(environment)
# Création ou récupération de la clé utilisateur (avec rotation automatique)
current_key = key_manager.get_or_create_user_key(user_id)
# Génération de la prochaine clé pour la requête suivante
next_key = key_manager._generate_user_key(user_id)
next_key_b64 = base64.b64encode(next_key).decode()
# Chiffrement avec la clé actuelle
encrypted_content = self._encrypt_content_with_next_key(content, current_key, next_key, user_id, environment)
return encrypted_content, next_key_b64
except Exception as e:
logger.error(f"Erreur de chiffrement pour l'utilisateur {user_id} dans l'environnement {environment}: {e}")
# Fallback: retour du contenu en base64 sans chiffrement
return base64.b64encode(content.encode('utf-8')), ""
def _encrypt_with_user_key(self, content: str, user_id: str, environment: str) -> bytes:
"""Chiffre le contenu avec la clé utilisateur pour un environnement spécifique (legacy)"""
encrypted_content, _ = self._encrypt_with_user_key_and_next(content, user_id, environment)
return encrypted_content
def _encrypt_content_with_next_key(self, content: str, current_key: bytes, next_key: bytes, user_id: str, environment: str) -> bytes:
"""Chiffre le contenu avec la clé actuelle et inclut la prochaine clé"""
cipher = ChaCha20Poly1305(current_key)
nonce = secrets.token_bytes(12)
encrypted_content = cipher.encrypt(nonce, content.encode('utf-8'), None)
# Métadonnées de chiffrement avec la prochaine clé
metadata = {
'user_id': user_id,
'environment': environment,
'timestamp': datetime.now().isoformat(),
'key_version': 'current',
'algorithm': 'ChaCha20-Poly1305',
'next_key': base64.b64encode(next_key).decode(),
'rotation': 'per-request'
}
# Encodage: nonce + métadonnées + contenu chiffré
metadata_json = json.dumps(metadata).encode('utf-8')
full_payload = nonce + len(metadata_json).to_bytes(4, 'big') + metadata_json + encrypted_content
return base64.b64encode(full_payload)
def _encrypt_content(self, content: str, key: bytes, user_id: str, environment: str, is_previous: bool = False) -> bytes:
"""Chiffre le contenu avec une clé spécifique (legacy)"""
cipher = ChaCha20Poly1305(key)
nonce = secrets.token_bytes(12)
encrypted_content = cipher.encrypt(nonce, content.encode('utf-8'), None)
# Métadonnées de chiffrement
metadata = {
'user_id': user_id,
'environment': environment,
'timestamp': datetime.now().isoformat(),
'key_version': 'previous' if is_previous else 'current',
'algorithm': 'ChaCha20-Poly1305'
}
# Encodage: nonce + métadonnées + contenu chiffré
metadata_json = json.dumps(metadata).encode('utf-8')
full_payload = nonce + len(metadata_json).to_bytes(4, 'big') + metadata_json + encrypted_content
return base64.b64encode(full_payload)
def create_ssl_context(self):
"""Crée le contexte SSL pour HTTPS"""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# Configuration SSL sécurisée
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
# Génération de certificats auto-signés pour la démonstration
try:
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives.asymmetric import rsa
import datetime
# Génération d'une clé privée RSA
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Création d'un certificat auto-signé
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "FR"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "France"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Paris"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "4NK"),
x509.NameAttribute(NameOID.COMMON_NAME, "vault.4nkweb.com"),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.now(datetime.timezone.utc)
).not_valid_after(
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365)
).add_extension(
x509.SubjectAlternativeName([
x509.DNSName("vault.4nkweb.com"),
x509.DNSName("localhost"),
]),
critical=False,
).sign(private_key, hashes.SHA256())
# Sauvegarde temporaire des certificats
with open('/tmp/vault.key', 'wb') as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
with open('/tmp/vault.crt', 'wb') as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
context.load_cert_chain('/tmp/vault.crt', '/tmp/vault.key')
logger.info("Certificats SSL générés avec succès")
except Exception as e:
logger.warning(f"Impossible de générer les certificats SSL: {e}")
logger.warning("Utilisation du contexte SSL par défaut")
return context
def run(self, host='0.0.0.0', port=6666, debug=False):
"""Démarre le serveur HTTPS"""
logger.info("Démarrage de l'API Vault sécurisée sur https://vault.4nkweb.com:6666")
logger.info("Authentification par clés utilisateur activée")
logger.info("Rotation automatique des clés activée")
logger.info("HTTPS obligatoire")
ssl_context = self.create_ssl_context()
self.app.run(
host=host,
port=port,
debug=debug,
ssl_context=ssl_context,
threaded=True
)
if __name__ == '__main__':
api = SecureVaultAPI()
api.run()