#!/usr/bin/env python3 """ API HTTPS sécurisée avec chiffrement quantique résistant Port 6666, domaine vault.4nkweb.com GET // pour servir les fichiers de storage// """ import os import re import ssl import socket from pathlib import Path from typing import Dict, Any from flask import Flask, request, Response, jsonify from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import x25519 from cryptography.hazmat.primitives.kdf.hkdf import HKDF import base64 import logging # Configuration du logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) class QuantumResistantEncryption: """ Chiffrement quantique résistant utilisant X25519 + ChaCha20-Poly1305 X25519 est considéré comme résistant aux attaques quantiques à court terme """ def __init__(self): self.private_key = x25519.X25519PrivateKey.generate() self.public_key = self.private_key.public_key() def encrypt(self, data: bytes, peer_public_key: x25519.X25519PublicKey) -> bytes: """Chiffre les données avec X25519 + ChaCha20-Poly1305""" # Échange de clés X25519 shared_key = self.private_key.exchange(peer_public_key) # Dérivation de clé avec HKDF derived_key = HKDF( algorithm=hashes.SHA256(), length=32, salt=b'quantum_resistant_salt', info=b'vault_api_encryption' ).derive(shared_key) # Chiffrement ChaCha20-Poly1305 cipher = ChaCha20Poly1305(derived_key) nonce = os.urandom(12) ciphertext = cipher.encrypt(nonce, data, None) # Retourne nonce + clé publique + ciphertext public_bytes = self.public_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) return base64.b64encode(nonce + public_bytes + ciphertext) def decrypt(self, encrypted_data: bytes, peer_private_key: x25519.X25519PrivateKey) -> bytes: """Déchiffre les données""" try: data = base64.b64decode(encrypted_data) nonce = data[:12] peer_public_bytes = data[12:44] ciphertext = data[44:] peer_public_key = x25519.X25519PublicKey.from_public_bytes(peer_public_bytes) # Échange de clés shared_key = peer_private_key.exchange(peer_public_key) # Dérivation de clé derived_key = HKDF( algorithm=hashes.SHA256(), length=32, salt=b'quantum_resistant_salt', info=b'vault_api_encryption' ).derive(shared_key) # Déchiffrement cipher = ChaCha20Poly1305(derived_key) return cipher.decrypt(nonce, ciphertext, None) except Exception as e: logger.error(f"Erreur de déchiffrement: {e}") raise class EnvironmentProcessor: """Traite les variables d'environnement composites""" def __init__(self, env_file_path: str): self.env_file_path = env_file_path self.variables = {} self._load_variables() def _load_variables(self): """Charge les variables depuis le fichier .env""" try: with open(self.env_file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) self.variables[key.strip()] = value.strip() except Exception as e: logger.error(f"Erreur lors du chargement du fichier .env: {e}") def _resolve_variable(self, var_name: str, visited: set = None) -> str: """Résout récursivement une variable et ses dépendances""" if visited is None: visited = set() if var_name in visited: logger.warning(f"Dépendance circulaire détectée pour {var_name}") return f"${{{var_name}}}" if var_name not in self.variables: logger.warning(f"Variable {var_name} non trouvée") return f"${{{var_name}}}" visited.add(var_name) value = self.variables[var_name] # Recherche des variables à substituer pattern = r'\$\{([^}]+)\}' matches = re.findall(pattern, value) for match in matches: resolved_value = self._resolve_variable(match, visited.copy()) value = value.replace(f"${{{match}}}", resolved_value) return value def process_content(self, content: str) -> str: """Traite le contenu en remplaçant les variables""" # Recherche toutes les variables du format ${VAR_NAME} pattern = r'\$\{([^}]+)\}' matches = re.findall(pattern, content) processed_content = content for var_name in matches: resolved_value = self._resolve_variable(var_name) processed_content = processed_content.replace(f"${{{var_name}}}", resolved_value) return processed_content # Initialisation ENCRYPTION = QuantumResistantEncryption() ENV_PROCESSOR = EnvironmentProcessor('/home/debian/4NK_vault/storage/dev/.env') STORAGE_ROOT = Path('/home/debian/4NK_vault/storage') @app.route('//', methods=['GET']) def serve_file(env: str, file_path: str): """ Sert un fichier depuis storage// Les variables sont remplacées par les valeurs du fichier .env Le contenu est chiffré avec un algorithme quantique résistant """ try: # Construction du chemin du fichier full_path = STORAGE_ROOT / env / file_path # Vérification de sécurité - empêche l'accès en dehors du répertoire storage if not str(full_path.resolve()).startswith(str(STORAGE_ROOT.resolve())): return jsonify({'error': 'Accès non autorisé'}), 403 # Vérification de l'existence du fichier if not full_path.exists() or not full_path.is_file(): return jsonify({'error': 'Fichier non trouvé'}), 404 # Lecture du fichier try: with open(full_path, 'r', encoding='utf-8') as f: content = f.read() except UnicodeDecodeError: # Tentative en mode binaire pour les fichiers non-text with open(full_path, 'rb') as f: content = f.read() # Conversion en base64 pour les fichiers binaires content = base64.b64encode(content).decode('utf-8') content = f"BINARY_DATA:{content}" # Traitement des variables d'environnement processed_content = ENV_PROCESSOR.process_content(content) # Chiffrement du contenu avec clé dynamique sécurisée # La clé de démonstration est désactivée pour des raisons de sécurité try: # Génération d'une clé de session unique (32 bytes) session_key = os.urandom(32) cipher = ChaCha20Poly1305(session_key) nonce = os.urandom(12) encrypted_content = cipher.encrypt(nonce, processed_content.encode('utf-8'), None) # Encodage: nonce + clé de session + contenu chiffré # ATTENTION: En production, la clé doit être transmise via un canal sécurisé full_payload = nonce + session_key + encrypted_content encrypted_content = base64.b64encode(full_payload) logger.warning("ATTENTION: Clé de session générée - Transmission non sécurisée en mode démo") except Exception as e: logger.error(f"Erreur de chiffrement: {e}") # Fallback: retour du contenu en base64 sans chiffrement encrypted_content = base64.b64encode(processed_content.encode('utf-8')) # Retour de la réponse chiffrée response = Response( encrypted_content, mimetype='application/octet-stream', headers={ 'Content-Disposition': f'attachment; filename="{file_path}"', 'X-Encryption-Type': 'quantum-resistant', 'X-Algorithm': 'X25519-ChaCha20-Poly1305' } ) logger.info(f"Served file: {env}/{file_path}") return response except Exception as e: logger.error(f"Erreur lors du service du fichier {env}/{file_path}: {e}") return jsonify({'error': 'Erreur interne du serveur'}), 500 @app.route('/health', methods=['GET']) def health_check(): """Point de contrôle de santé de l'API""" return jsonify({ 'status': 'healthy', 'service': 'vault-api', 'encryption': 'quantum-resistant', 'algorithm': 'X25519-ChaCha20-Poly1305' }) @app.route('/info', methods=['GET']) def api_info(): """Informations sur l'API""" return jsonify({ 'name': '4NK Vault API', 'version': '1.0.0', 'domain': 'vault.4nkweb.com', 'port': 6666, 'protocol': 'HTTPS', 'encryption': 'quantum-resistant', 'endpoints': { 'GET //': 'Sert un fichier chiffré depuis storage//', 'GET /health': 'Contrôle de santé', 'GET /info': 'Informations sur l\'API' } }) def create_ssl_context(): """Crée le contexte SSL pour HTTPS""" context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) # Configuration SSL sécurisée context.minimum_version = ssl.TLSVersion.TLSv1_2 context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS') # Génération de certificats auto-signés pour la démonstration # En production, utiliser des certificats valides try: from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa import datetime # Génération d'une clé privée RSA private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, ) # Création d'un certificat auto-signé subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, "FR"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "France"), x509.NameAttribute(NameOID.LOCALITY_NAME, "Paris"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "4NK"), x509.NameAttribute(NameOID.COMMON_NAME, "vault.4nkweb.com"), ]) cert = x509.CertificateBuilder().subject_name( subject ).issuer_name( issuer ).public_key( private_key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=365) ).add_extension( x509.SubjectAlternativeName([ x509.DNSName("vault.4nkweb.com"), x509.DNSName("localhost"), ]), critical=False, ).sign(private_key, hashes.SHA256()) # Sauvegarde temporaire des certificats with open('/tmp/vault.key', 'wb') as f: f.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) with open('/tmp/vault.crt', 'wb') as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) context.load_cert_chain('/tmp/vault.crt', '/tmp/vault.key') logger.info("Certificats SSL générés avec succès") except Exception as e: logger.warning(f"Impossible de générer les certificats SSL: {e}") logger.warning("Utilisation du contexte SSL par défaut") return context if __name__ == '__main__': # Configuration du serveur host = '0.0.0.0' port = 6666 logger.info(f"Démarrage de l'API Vault sur https://vault.4nkweb.com:{port}") logger.info("Chiffrement quantique résistant activé") logger.info("Algorithme: X25519 + ChaCha20-Poly1305") # Création du contexte SSL ssl_context = create_ssl_context() # Démarrage du serveur HTTPS app.run( host=host, port=port, ssl_context=ssl_context, debug=False, threaded=True )