#!/usr/bin/env python3 """ API HTTPS sécurisée avec authentification par clés utilisateur Port 6666, domaine vault.4nkweb.com Système de clés par utilisateur avec rotation automatique """ import os import re import ssl import socket import json import hashlib import secrets import time from pathlib import Path from datetime import datetime, timedelta from typing import Dict, Any, Optional from flask import Flask, request, Response, jsonify, abort from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import x25519 from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 import logging # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration STORAGE_ROOT = Path('/home/debian/4NK_vault/storage') class UserKeyManager: """Gestionnaire de clés par utilisateur et par environnement avec rotation automatique""" def __init__(self, environment: str): self.environment = environment self.keys_dir = STORAGE_ROOT / environment / '_keys' self.keys_dir.mkdir(parents=True, exist_ok=True) self.keys_db = self._load_keys_db() def _load_keys_db(self) -> Dict[str, Any]: """Charge la base de données des clés pour cet environnement""" keys_file = self.keys_dir / 'keys.json' if keys_file.exists(): try: with open(keys_file, 'r') as f: return json.load(f) except Exception as e: logger.error(f"Erreur lors du chargement des clés pour {self.environment}: {e}") return {} def _save_keys_db(self): """Sauvegarde la base de données des clés pour cet environnement""" keys_file = self.keys_dir / 'keys.json' try: with open(keys_file, 'w') as f: json.dump(self.keys_db, f, indent=2) logger.info(f"Clés sauvegardées pour l'environnement {self.environment}") except Exception as e: logger.error(f"Erreur lors de la sauvegarde des clés pour {self.environment}: {e}") def _generate_user_key(self, user_id: str) -> bytes: """Génère une nouvelle clé pour un utilisateur""" # Génération d'une clé aléatoire de 32 bytes return secrets.token_bytes(32) def _hash_user_id(self, user_id: str) -> str: """Hash l'ID utilisateur pour l'utiliser comme clé""" return hashlib.sha256(user_id.encode()).hexdigest() def get_or_create_user_key(self, user_id: str) -> bytes: """Récupère ou crée une clé pour un utilisateur""" user_hash = self._hash_user_id(user_id) current_time = datetime.now() if user_hash not in self.keys_db: # Nouvel utilisateur self.keys_db[user_hash] = { 'current_key': base64.b64encode(self._generate_user_key(user_id)).decode(), 'previous_key': None, 'created_at': current_time.isoformat(), 'last_rotation': current_time.isoformat(), 'rotation_count': 0, 'environment': self.environment } self._save_keys_db() logger.info(f"Nouvelle clé créée pour l'utilisateur {user_id} dans l'environnement {self.environment}") user_data = self.keys_db[user_hash] last_rotation = datetime.fromisoformat(user_data['last_rotation']) # Pas de rotation automatique pour permettre le déchiffrement # La rotation se fera manuellement ou sur demande user_data = self.keys_db[user_hash] return base64.b64decode(user_data['current_key']) def _rotate_user_key(self, user_hash: str, user_id: str): """Effectue la rotation de clé pour un utilisateur""" user_data = self.keys_db[user_hash] # Sauvegarde de l'ancienne clé user_data['previous_key'] = user_data['current_key'] # Génération d'une nouvelle clé user_data['current_key'] = base64.b64encode(self._generate_user_key(user_id)).decode() user_data['last_rotation'] = datetime.now().isoformat() user_data['rotation_count'] += 1 user_data['environment'] = self.environment self._save_keys_db() logger.info(f"Clé rotée pour l'utilisateur {user_id} dans l'environnement {self.environment} (rotation #{user_data['rotation_count']})") def get_user_keys(self, user_id: str) -> tuple[bytes, Optional[bytes]]: """Récupère la clé actuelle et précédente d'un utilisateur""" user_hash = self._hash_user_id(user_id) if user_hash not in self.keys_db: raise ValueError(f"Utilisateur {user_id} non trouvé dans l'environnement {self.environment}") user_data = self.keys_db[user_hash] current_key = base64.b64decode(user_data['current_key']) previous_key = None if user_data['previous_key']: previous_key = base64.b64decode(user_data['previous_key']) return current_key, previous_key class EnvProcessor: """Processeur de variables d'environnement avec résolution récursive""" def __init__(self, env_file: Path): self.variables = self._load_env_file(env_file) def _load_env_file(self, env_file: Path) -> Dict[str, str]: """Charge les fichiers d'environnement dans l'ordre correct pour le templating""" variables = {} # Ordre de chargement des fichiers d'environnement env_files = [ '.env.secrets', # 1. Secrets de base '.env', # 2. Variables de configuration (peut utiliser les secrets) '.env.auto', # 3. Variables générées automatiquement (peut utiliser les précédentes) '.env.post' # 4. Variables finales (peut utiliser toutes les précédentes) ] for env_filename in env_files: env_file_path = env_file.parent / env_filename if env_file_path.exists(): try: file_variables = 0 with open(env_file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) variables[key.strip()] = value.strip() file_variables += 1 logger.info(f"Variables chargées depuis {env_file_path}: {file_variables} variables") except Exception as e: logger.error(f"Erreur lors du chargement du fichier {env_file_path}: {e}") else: logger.debug(f"Fichier {env_file_path} non trouvé, ignoré") logger.info(f"Total des variables d'environnement chargées: {len(variables)} variables") return variables def _resolve_variable(self, var_name: str, visited: set = None) -> str: """Résout une variable récursivement""" if visited is None: visited = set() if var_name in visited: logger.warning(f"Dépendance circulaire détectée pour {var_name}") return f"${var_name}" if var_name not in self.variables: logger.warning(f"Variable non trouvée: {var_name}") return f"${var_name}" visited.add(var_name) value = self.variables[var_name] # Traitement des sous-variables avec ${VAR} pattern1 = r'\$\{([^}]+)\}' matches1 = re.findall(pattern1, value) for match in matches1: resolved_value = self._resolve_variable(match, visited.copy()) value = value.replace(f"${{{match}}}", resolved_value) # Traitement des sous-variables avec $VAR pattern2 = r'\$([A-Za-z_][A-Za-z0-9_]*)' matches2 = re.findall(pattern2, value) for match in matches2: if match != var_name: # Éviter l'auto-référence resolved_value = self._resolve_variable(match, visited.copy()) value = value.replace(f"${match}", resolved_value) return value def process_content(self, content: str) -> str: """Traite le contenu en résolvant les variables""" # Pattern pour ${VARIABLE} pattern1 = r'\$\{([^}]+)\}' matches1 = re.findall(pattern1, content) for var_name in matches1: resolved_value = self._resolve_variable(var_name) content = content.replace(f"${{{var_name}}}", resolved_value) # Pattern pour $VARIABLE (syntaxe simple) pattern2 = r'\$([A-Za-z_][A-Za-z0-9_]*)' matches2 = re.findall(pattern2, content) for var_name in matches2: # Éviter les variables déjà traitées avec ${} if f"${{{var_name}}}" not in content: resolved_value = self._resolve_variable(var_name) content = content.replace(f"${var_name}", resolved_value) return content class SecureVaultAPI: """API Vault sécurisée avec authentification par clés utilisateur""" def __init__(self): self.app = Flask(__name__) # Initialisation des processeurs d'environnement pour chaque env self.env_processors = {} self._setup_routes() def _get_env_processor(self, env: str) -> EnvProcessor: """Obtient le processeur d'environnement pour un environnement donné""" if env not in self.env_processors: env_file = STORAGE_ROOT / env / '.env' self.env_processors[env] = EnvProcessor(env_file) logger.info(f"Processeur d'environnement initialisé pour {env}") return self.env_processors[env] def _setup_routes(self): """Configure les routes de l'API""" @self.app.before_request def force_https(): """Force l'utilisation de HTTPS - OBLIGATOIRE""" # Vérification stricte de HTTPS is_https = ( request.is_secure or request.headers.get('X-Forwarded-Proto') == 'https' or request.scheme == 'https' or request.environ.get('wsgi.url_scheme') == 'https' ) # Vérification supplémentaire du port (6666 doit être HTTPS) if request.environ.get('SERVER_PORT') == '6666' and not is_https: logger.warning(f"Tentative d'accès HTTP sur le port 6666 rejetée depuis {request.remote_addr}") abort(400, description="HTTPS OBLIGATOIRE - Cette API ne fonctionne qu'en HTTPS sur le port 6666") if not is_https: logger.warning(f"Tentative d'accès HTTP rejetée depuis {request.remote_addr}") abort(400, description="HTTPS OBLIGATOIRE - Cette API ne fonctionne qu'en HTTPS") @self.app.route('/health', methods=['GET']) def health(): """Endpoint de santé avec authentification""" # Authentification requise même pour /health user_id = self._authenticate_user(request) return jsonify({ "status": "healthy", "service": "vault-api-secure", "encryption": "quantum-resistant", "algorithm": "X25519-ChaCha20-Poly1305", "authentication": "user-key-based", "key_rotation": "automatic", "user_id": user_id, "timestamp": datetime.now().isoformat() }) @self.app.route('/info', methods=['GET']) def info(): """Informations sur l'API avec authentification""" # Authentification requise même pour /info user_id = self._authenticate_user(request) return jsonify({ "name": "4NK Vault API Secure", "version": "2.0.0", "domain": "vault.4nkweb.com", "port": 6666, "protocol": "HTTPS", "encryption": "quantum-resistant", "authentication": "user-key-based", "key_rotation": "automatic", "user_id": user_id, "endpoints": { "GET //": "Sert un fichier chiffré avec authentification utilisateur", "GET /health": "Contrôle de santé (authentification requise)", "GET /info": "Informations sur l'API (authentification requise)", "GET /routes": "Liste de toutes les routes disponibles (authentification requise)" } }) @self.app.route('/routes', methods=['GET']) def routes(): """Liste toutes les routes disponibles avec authentification""" # Authentification requise pour /routes user_id = self._authenticate_user(request) # Scanner dynamiquement tous les fichiers disponibles dans tous les environnements file_examples = self._scan_available_files() return jsonify({ "routes": [ { "method": "GET", "path": "/health", "description": "Contrôle de santé de l'API", "authentication": "required", "headers_required": ["X-User-ID"], "response_type": "application/json" }, { "method": "GET", "path": "/info", "description": "Informations détaillées sur l'API", "authentication": "required", "headers_required": ["X-User-ID"], "response_type": "application/json" }, { "method": "GET", "path": "/routes", "description": "Liste de toutes les routes disponibles", "authentication": "required", "headers_required": ["X-User-ID"], "response_type": "application/json" }, { "method": "GET", "path": "//", "description": "Sert un fichier chiffré depuis le stockage", "authentication": "required", "headers_required": ["X-User-ID"], "response_type": "application/octet-stream", "parameters": { "env": "Environnement (ex: dev, prod)", "file_path": "Chemin relatif du fichier dans storage//" }, "examples": file_examples } ], "total_routes": 4, "authentication": { "type": "user-key-based", "header": "X-User-ID", "description": "ID utilisateur obligatoire pour tous les endpoints" }, "user_id": user_id, "timestamp": datetime.now().isoformat() }) @self.app.route('//', methods=['GET']) def serve_file(env: str, file_path: str): """Sert un fichier avec authentification et chiffrement""" try: # Vérification de l'authentification user_id = self._authenticate_user(request) # Validation du chemin if not self._validate_path(env, file_path): logger.warning(f"Tentative d'accès non autorisé: {env}/{file_path}") return jsonify({"error": "Accès non autorisé"}), 403 # Lecture du fichier file_content = self._read_file(env, file_path) if file_content is None: return jsonify({"error": f"Fichier non trouvé: {env}/{file_path}"}), 404 # Traitement des variables d'environnement (en mémoire, sans modifier le fichier) env_processor = self._get_env_processor(env) processed_content = env_processor.process_content(file_content) # Chiffrement avec la clé utilisateur pour cet environnement encrypted_content, next_key = self._encrypt_with_user_key_and_next(processed_content, user_id, env) # Retour du contenu chiffré avec la prochaine clé response = Response( encrypted_content, mimetype='application/octet-stream', headers={ 'X-Encryption-Type': 'quantum-resistant', 'X-Algorithm': 'X25519-ChaCha20-Poly1305', 'X-User-ID': user_id, 'X-Key-Rotation': 'per-request', 'X-Next-Key': next_key, 'X-Content-Processed': 'true' } ) logger.info(f"Fichier servi: {env}/{file_path} pour l'utilisateur {user_id}") return response except Exception as e: logger.error(f"Erreur lors du service du fichier {env}/{file_path}: {e}") return jsonify({"error": "Erreur interne du serveur"}), 500 def _authenticate_user(self, request) -> str: """Authentifie l'utilisateur et retourne son ID""" # Récupération de l'ID utilisateur depuis les headers user_id = request.headers.get('X-User-ID') if not user_id: abort(401, description="Header X-User-ID requis pour l'authentification") # Validation basique de l'ID utilisateur if len(user_id) < 3 or len(user_id) > 128: abort(401, description="ID utilisateur invalide") # Vérification des caractères autorisés if not re.match(r'^[a-zA-Z0-9_-]+$', user_id): abort(401, description="ID utilisateur contient des caractères non autorisés") return user_id def _validate_path(self, env: str, file_path: str) -> bool: """Valide le chemin d'accès avec protection contre les attaques de chemin relatif""" # 1. Validation de l'environnement if not env or not re.match(r'^[a-zA-Z0-9_-]+$', env): return False # 2. Validation du chemin de fichier if not file_path or '..' in file_path or file_path.startswith('/'): return False # 3. Construction du chemin complet full_path = STORAGE_ROOT / env / file_path # 4. Vérification de sécurité renforcée try: resolved_path = full_path.resolve() storage_resolved = STORAGE_ROOT.resolve() # Vérification que le chemin résolu est bien dans le storage if not str(resolved_path).startswith(str(storage_resolved)): logger.warning(f"Tentative d'accès en dehors du storage: {resolved_path}") return False # Vérification que l'environnement est bien dans le chemin if f"/{env}/" not in str(resolved_path): logger.warning(f"Tentative d'accès à un environnement non autorisé: {env}") return False # Vérification que c'est bien un fichier (pas un dossier) return resolved_path.exists() and resolved_path.is_file() except Exception as e: logger.warning(f"Erreur de validation de chemin: {e}") return False def _scan_available_files(self, env: str = None) -> list: """Scanne tous les fichiers disponibles dans un ou tous les environnements""" examples = [] if env: # Scanner un environnement spécifique env_path = STORAGE_ROOT / env if env_path.exists(): examples.extend(self._scan_environment_files(env_path, env)) else: # Scanner tous les environnements disponibles for env_dir in STORAGE_ROOT.iterdir(): if env_dir.is_dir() and not env_dir.name.startswith('.'): examples.extend(self._scan_environment_files(env_dir, env_dir.name)) return sorted(examples) def _scan_environment_files(self, env_path: Path, env_name: str) -> list: """Scanne les fichiers d'un environnement spécifique""" examples = [] try: # Parcourir récursivement tous les fichiers for file_path in env_path.rglob('*'): if file_path.is_file(): # Exclure les fichiers de clés et autres fichiers système relative_path = file_path.relative_to(env_path) if not str(relative_path).startswith('_keys') and not str(relative_path).startswith('.'): # Ajouter l'exemple au format attendu examples.append(f"/{env_name}/{relative_path}") except Exception as e: logger.error(f"Erreur lors du scan des fichiers pour {env_name}: {e}") return examples def _read_file(self, env: str, file_path: str) -> Optional[str]: """Lit le contenu d'un fichier""" try: full_path = STORAGE_ROOT / env / file_path # Lecture en mode binaire pour détecter le type with open(full_path, 'rb') as f: content = f.read() # Tentative de décodage UTF-8 try: return content.decode('utf-8') except UnicodeDecodeError: # Fichier binaire, encodage base64 return f"BINARY_DATA:{base64.b64encode(content).decode()}" except Exception as e: logger.error(f"Erreur lors de la lecture du fichier {env}/{file_path}: {e}") return None def _encrypt_with_user_key_and_next(self, content: str, user_id: str, environment: str) -> tuple[bytes, str]: """Chiffre le contenu avec la clé utilisateur et génère la prochaine clé""" try: # Création du gestionnaire de clés pour cet environnement key_manager = UserKeyManager(environment) # Création ou récupération de la clé utilisateur (avec rotation automatique) current_key = key_manager.get_or_create_user_key(user_id) # Pas de génération de nouvelle clé automatiquement # Utiliser la même clé pour chiffrer et déchiffrer next_key = current_key next_key_b64 = base64.b64encode(next_key).decode() # Chiffrement avec la clé actuelle encrypted_content = self._encrypt_content_with_next_key(content, current_key, next_key, user_id, environment) return encrypted_content, next_key_b64 except Exception as e: logger.error(f"Erreur de chiffrement pour l'utilisateur {user_id} dans l'environnement {environment}: {e}") # Fallback: retour du contenu en base64 sans chiffrement return base64.b64encode(content.encode('utf-8')), "" def _encrypt_with_user_key(self, content: str, user_id: str, environment: str) -> bytes: """Chiffre le contenu avec la clé utilisateur pour un environnement spécifique (legacy)""" encrypted_content, _ = self._encrypt_with_user_key_and_next(content, user_id, environment) return encrypted_content def _encrypt_content_with_next_key(self, content: str, current_key: bytes, next_key: bytes, user_id: str, environment: str) -> bytes: """Chiffre le contenu avec la clé actuelle et inclut la prochaine clé""" cipher = ChaCha20Poly1305(current_key) nonce = secrets.token_bytes(12) encrypted_content = cipher.encrypt(nonce, content.encode('utf-8'), None) # Métadonnées de chiffrement avec la prochaine clé metadata = { 'user_id': user_id, 'environment': environment, 'timestamp': datetime.now().isoformat(), 'key_version': 'current', 'algorithm': 'ChaCha20-Poly1305', 'next_key': base64.b64encode(next_key).decode(), 'rotation': 'per-request' } # Encodage: nonce + métadonnées + contenu chiffré metadata_json = json.dumps(metadata).encode('utf-8') full_payload = nonce + len(metadata_json).to_bytes(4, 'big') + metadata_json + encrypted_content return base64.b64encode(full_payload) def _encrypt_content(self, content: str, key: bytes, user_id: str, environment: str, is_previous: bool = False) -> bytes: """Chiffre le contenu avec une clé spécifique (legacy)""" cipher = ChaCha20Poly1305(key) nonce = secrets.token_bytes(12) encrypted_content = cipher.encrypt(nonce, content.encode('utf-8'), None) # Métadonnées de chiffrement metadata = { 'user_id': user_id, 'environment': environment, 'timestamp': datetime.now().isoformat(), 'key_version': 'previous' if is_previous else 'current', 'algorithm': 'ChaCha20-Poly1305' } # Encodage: nonce + métadonnées + contenu chiffré metadata_json = json.dumps(metadata).encode('utf-8') full_payload = nonce + len(metadata_json).to_bytes(4, 'big') + metadata_json + encrypted_content return base64.b64encode(full_payload) def create_ssl_context(self): """Crée le contexte SSL pour HTTPS""" context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) # Configuration SSL sécurisée context.minimum_version = ssl.TLSVersion.TLSv1_2 context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS') # Génération de certificats auto-signés pour la démonstration try: from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives.asymmetric import rsa import datetime # Génération d'une clé privée RSA private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, ) # Création d'un certificat auto-signé subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, "FR"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "France"), x509.NameAttribute(NameOID.LOCALITY_NAME, "Paris"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "4NK"), x509.NameAttribute(NameOID.COMMON_NAME, "vault.4nkweb.com"), ]) cert = x509.CertificateBuilder().subject_name( subject ).issuer_name( issuer ).public_key( private_key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.datetime.now(datetime.timezone.utc) ).not_valid_after( datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365) ).add_extension( x509.SubjectAlternativeName([ x509.DNSName("vault.4nkweb.com"), x509.DNSName("localhost"), ]), critical=False, ).sign(private_key, hashes.SHA256()) # Sauvegarde temporaire des certificats with open('/tmp/vault.key', 'wb') as f: f.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) with open('/tmp/vault.crt', 'wb') as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) context.load_cert_chain('/tmp/vault.crt', '/tmp/vault.key') logger.info("Certificats SSL générés avec succès") except Exception as e: logger.warning(f"Impossible de générer les certificats SSL: {e}") logger.warning("Utilisation du contexte SSL par défaut") return context def run(self, host='0.0.0.0', port=6666, debug=False): """Démarre le serveur HTTPS""" logger.info("Démarrage de l'API Vault sécurisée sur https://vault.4nkweb.com:6666") logger.info("Authentification par clés utilisateur activée") logger.info("Rotation automatique des clés activée") logger.info("HTTPS obligatoire") ssl_context = self.create_ssl_context() self.app.run( host=host, port=port, debug=debug, ssl_context=ssl_context, threaded=True ) if __name__ == '__main__': api = SecureVaultAPI() api.run()