diff --git a/docker/worker/requirements.txt b/docker/worker/requirements.txt index bdfec64..c54a374 100644 --- a/docker/worker/requirements.txt +++ b/docker/worker/requirements.txt @@ -1,4 +1,5 @@ celery[redis]==5.4.0 +kombu==5.4.0 opencv-python-headless==4.10.0.84 pytesseract==0.3.13 numpy==2.0.1 diff --git a/infra/.env.example b/infra/.env.example index f92d8dc..3ad68c2 100644 --- a/infra/.env.example +++ b/infra/.env.example @@ -11,7 +11,7 @@ POSTGRES_DB=notariat # Redis REDIS_PASSWORD= -# MinIO (stockage objet) +# MinIO (Stockage S3-compatible) MINIO_ROOT_USER=minio MINIO_ROOT_PASSWORD=minio_pwd MINIO_BUCKET=ingest @@ -23,15 +23,49 @@ ANYLLM_WORKSPACE_NORMES=workspace_normes ANYLLM_WORKSPACE_TRAMES=workspace_trames ANYLLM_WORKSPACE_ACTES=workspace_actes -# Ollama +# Ollama (LLM local) OLLAMA_BASE_URL=http://ollama:11434 OLLAMA_MODELS=llama3:8b,mistral:7b -# Neo4j +# Neo4j (Graphe de connaissances) NEO4J_AUTH=neo4j/neo4j_pwd -# OpenSearch +# OpenSearch (Recherche plein-texte) OPENSEARCH_PASSWORD=opensearch_pwd -# Traefik +# Traefik (Passerelle HTTP) TRAEFIK_ACME_EMAIL=ops@example.org + +# Configuration de l'API +API_HOST=0.0.0.0 +API_PORT=8000 +API_WORKERS=1 + +# Configuration des workers +WORKER_CONCURRENCY=2 +WORKER_LOGLEVEL=info + +# Seuils de qualité +OCR_CONFIDENCE_THRESHOLD=0.75 +CLASSIFICATION_CONFIDENCE_THRESHOLD=0.75 +MANUAL_REVIEW_CER_THRESHOLD=0.08 + +# URLs des APIs externes +CADASTRE_API_URL=https://apicarto.ign.fr/api/cadastre +GEORISQUES_API_URL=https://www.georisques.gouv.fr/api +BODACC_API_URL=https://bodacc-datadila.opendatasoft.com/api +INFOGREFFE_API_URL=https://entreprise.api.gouv.fr/v2/infogreffe +RBE_API_URL=https://www.data.gouv.fr/api/1/datasets/registre-des-beneficiaires-effectifs + +# Configuration de sécurité +JWT_SECRET_KEY=your-secret-key-change-in-production +ENCRYPTION_KEY=your-encryption-key-change-in-production + +# Configuration de monitoring +PROMETHEUS_ENABLED=true +GRAFANA_ENABLED=true +SENTRY_DSN= + +# Configuration de logs +LOG_LEVEL=INFO +LOG_FORMAT=json diff --git a/ops/bootstrap.sh b/ops/bootstrap.sh index 573284d..fd3f9b7 100755 --- a/ops/bootstrap.sh +++ b/ops/bootstrap.sh @@ -1,48 +1,241 @@ #!/bin/bash set -euo pipefail -echo "Bootstrap de l'infrastructure notariat-pipeline..." +# Script de bootstrap pour l'infrastructure Notariat Pipeline +# Ce script initialise tous les services nécessaires -# Aller dans le répertoire infra -cd "$(dirname "$0")/../infra" +echo "🚀 Démarrage du bootstrap Notariat Pipeline..." -# Copier le fichier d'environnement s'il n'existe pas -cp -n .env.example .env || true -echo "Fichier .env créé. Veuillez le modifier selon vos besoins." +# Vérification des prérequis +check_prerequisites() { + echo "📋 Vérification des prérequis..." + + if ! command -v docker &> /dev/null; then + echo "❌ Docker n'est pas installé. Veuillez installer Docker d'abord." + exit 1 + fi + + if ! command -v docker-compose &> /dev/null && ! docker compose version &> /dev/null; then + echo "❌ Docker Compose n'est pas installé. Veuillez installer Docker Compose d'abord." + exit 1 + fi + + echo "✅ Prérequis vérifiés" +} -# Télécharger les images Docker -echo "Téléchargement des images Docker..." -docker compose pull +# Configuration de l'environnement +setup_environment() { + echo "⚙️ Configuration de l'environnement..." + + cd "$(dirname "$0")/../infra" + + # Copie du fichier d'environnement s'il n'existe pas + if [ ! -f .env ]; then + if [ -f .env.example ]; then + cp .env.example .env + echo "📝 Fichier .env créé à partir de .env.example" + echo "⚠️ Veuillez modifier le fichier .env avec vos valeurs de production" + else + echo "❌ Fichier .env.example non trouvé" + exit 1 + fi + else + echo "✅ Fichier .env existe déjà" + fi +} -# Démarrer les services de base -echo "Démarrage des services de base..." -docker compose up -d postgres redis minio opensearch neo4j ollama anythingsqlite +# Téléchargement des images Docker +pull_images() { + echo "📥 Téléchargement des images Docker..." + + # Téléchargement des images principales + docker compose pull postgres redis minio neo4j opensearch ollama prometheus grafana + + # Téléchargement de l'image AnythingLLM + docker pull mintplexlabs/anythingllm:latest + + echo "✅ Images téléchargées" +} -# Attendre que les services soient prêts -echo "Attente du démarrage des services..." -sleep 15 +# Démarrage des services de base +start_base_services() { + echo "🏗️ Démarrage des services de base..." + + # Démarrage des services essentiels + docker compose up -d postgres redis minio opensearch neo4j ollama + + echo "⏳ Attente du démarrage des services (30 secondes)..." + sleep 30 + + echo "✅ Services de base démarrés" +} -# Configuration MinIO -echo "Configuration de MinIO..." -# Créer l'alias MinIO -mc alias set local http://127.0.0.1:9000 $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD || true -# Créer le bucket -mc mb -p local/$MINIO_BUCKET || true +# Configuration de MinIO +setup_minio() { + echo "🗄️ Configuration de MinIO..." + + # Attendre que MinIO soit prêt + echo "⏳ Attente de MinIO..." + sleep 10 + + # Installation de mc (MinIO Client) si nécessaire + if ! command -v mc &> /dev/null; then + echo "📦 Installation de MinIO Client..." + if command -v apt-get &> /dev/null; then + # Debian/Ubuntu + wget -O /tmp/mc https://dl.min.io/client/mc/release/linux-amd64/mc + chmod +x /tmp/mc + sudo mv /tmp/mc /usr/local/bin/ + elif command -v yum &> /dev/null; then + # CentOS/RHEL + wget -O /tmp/mc https://dl.min.io/client/mc/release/linux-amd64/mc + chmod +x /tmp/mc + sudo mv /tmp/mc /usr/local/bin/ + else + echo "⚠️ Impossible d'installer mc automatiquement. Veuillez l'installer manuellement." + fi + fi + + # Configuration de l'alias MinIO + if command -v mc &> /dev/null; then + mc alias set local http://127.0.0.1:9000 minio minio_pwd || true + mc mb -p local/ingest || true + echo "✅ Bucket MinIO 'ingest' créé" + else + echo "⚠️ mc non disponible, création manuelle du bucket nécessaire" + fi +} -# Télécharger les modèles Ollama -echo "Téléchargement des modèles Ollama..." -curl -s http://127.0.0.1:11434/api/pull -d '{"name":"llama3:8b"}' || echo "Erreur lors du téléchargement de llama3:8b" -curl -s http://127.0.0.1:11434/api/pull -d '{"name":"mistral:7b"}' || echo "Erreur lors du téléchargement de mistral:7b" +# Configuration d'Ollama +setup_ollama() { + echo "🤖 Configuration d'Ollama..." + + # Attendre qu'Ollama soit prêt + echo "⏳ Attente d'Ollama..." + sleep 15 + + # Téléchargement des modèles + echo "📥 Téléchargement du modèle llama3:8b..." + curl -s http://127.0.0.1:11434/api/pull -d '{"name":"llama3:8b"}' || echo "⚠️ Erreur lors du téléchargement de llama3:8b" + + echo "📥 Téléchargement du modèle mistral:7b..." + curl -s http://127.0.0.1:11434/api/pull -d '{"name":"mistral:7b"}' || echo "⚠️ Erreur lors du téléchargement de mistral:7b" + + echo "✅ Modèles Ollama configurés" +} -# Démarrer les services applicatifs -echo "Démarrage des services applicatifs..." -docker compose up -d host-api worker grafana prometheus +# Démarrage des services applicatifs +start_application_services() { + echo "🚀 Démarrage des services applicatifs..." + + # Démarrage d'AnythingLLM + docker compose up -d anythingsqlite + + # Attendre qu'AnythingLLM soit prêt + echo "⏳ Attente d'AnythingLLM..." + sleep 20 + + # Démarrage de l'API et des workers + docker compose up -d host-api worker + + # Démarrage des services de monitoring + docker compose up -d prometheus grafana + + echo "✅ Services applicatifs démarrés" +} -echo "Bootstrap terminé !" -echo "Services disponibles :" -echo "- API: http://localhost:8000/api" -echo "- AnythingLLM: http://localhost:3001" -echo "- Grafana: http://localhost:3000" -echo "- MinIO Console: http://localhost:9001" -echo "- Neo4j Browser: http://localhost:7474" -echo "- OpenSearch: http://localhost:9200" +# Configuration des workspaces AnythingLLM +setup_anythingllm_workspaces() { + echo "🏢 Configuration des workspaces AnythingLLM..." + + # Attendre qu'AnythingLLM soit complètement prêt + sleep 30 + + # Création des workspaces + echo "📁 Création du workspace 'workspace_normes'..." + curl -s -X POST "http://127.0.0.1:3001/api/workspaces" \ + -H "Content-Type: application/json" \ + -d '{"name":"workspace_normes"}' || echo "⚠️ Erreur lors de la création du workspace normes" + + echo "📁 Création du workspace 'workspace_trames'..." + curl -s -X POST "http://127.0.0.1:3001/api/workspaces" \ + -H "Content-Type: application/json" \ + -d '{"name":"workspace_trames"}' || echo "⚠️ Erreur lors de la création du workspace trames" + + echo "📁 Création du workspace 'workspace_actes'..." + curl -s -X POST "http://127.0.0.1:3001/api/workspaces" \ + -H "Content-Type: application/json" \ + -d '{"name":"workspace_actes"}' || echo "⚠️ Erreur lors de la création du workspace actes" + + echo "✅ Workspaces AnythingLLM configurés" +} + +# Vérification finale +final_check() { + echo "🔍 Vérification finale des services..." + + # Vérification de l'API + if curl -s http://127.0.0.1:8000/api/health > /dev/null; then + echo "✅ API accessible sur http://127.0.0.1:8000" + else + echo "⚠️ API non accessible" + fi + + # Vérification d'AnythingLLM + if curl -s http://127.0.0.1:3001 > /dev/null; then + echo "✅ AnythingLLM accessible sur http://127.0.0.1:3001" + else + echo "⚠️ AnythingLLM non accessible" + fi + + # Vérification de Grafana + if curl -s http://127.0.0.1:3000 > /dev/null; then + echo "✅ Grafana accessible sur http://127.0.0.1:3000" + else + echo "⚠️ Grafana non accessible" + fi + + # Affichage du statut des conteneurs + echo "📊 Statut des conteneurs:" + docker compose ps +} + +# Fonction principale +main() { + echo "🎯 Bootstrap Notariat Pipeline v1.1.0" + echo "======================================" + + check_prerequisites + setup_environment + pull_images + start_base_services + setup_minio + setup_ollama + start_application_services + setup_anythingllm_workspaces + final_check + + echo "" + echo "🎉 Bootstrap terminé avec succès!" + echo "" + echo "📋 Services disponibles:" + echo " • API Notariale: http://localhost:8000" + echo " • Documentation API: http://localhost:8000/docs" + echo " • AnythingLLM: http://localhost:3001" + echo " • Grafana: http://localhost:3000" + echo " • MinIO Console: http://localhost:9001" + echo " • Ollama: http://localhost:11434" + echo "" + echo "🔧 Commandes utiles:" + echo " • Voir les logs: docker compose logs -f" + echo " • Arrêter: docker compose down" + echo " • Redémarrer: docker compose restart" + echo "" + echo "⚠️ N'oubliez pas de:" + echo " • Modifier les mots de passe dans infra/.env" + echo " • Configurer les certificats TLS pour la production" + echo " • Importer les données initiales (trames, normes)" +} + +# Exécution du script +main "$@" diff --git a/services/worker/celery_app.py b/services/worker/celery_app.py new file mode 100644 index 0000000..816b487 --- /dev/null +++ b/services/worker/celery_app.py @@ -0,0 +1,109 @@ +""" +Configuration Celery pour le worker Notariat Pipeline +""" +import os +from celery import Celery +from kombu import Queue + +# Configuration Redis +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") + +# Création de l'application Celery +app = Celery( + 'notariat_worker', + broker=REDIS_URL, + backend=REDIS_URL, + include=[ + 'services.worker.tasks.pipeline_tasks', + 'services.worker.tasks.ocr_tasks', + 'services.worker.tasks.classification_tasks', + 'services.worker.tasks.extraction_tasks', + 'services.worker.tasks.indexing_tasks', + 'services.worker.tasks.verification_tasks' + ] +) + +# Configuration Celery +app.conf.update( + # Configuration des queues + task_routes={ + 'services.worker.tasks.pipeline_tasks.*': {'queue': 'pipeline'}, + 'services.worker.tasks.ocr_tasks.*': {'queue': 'ocr'}, + 'services.worker.tasks.classification_tasks.*': {'queue': 'classification'}, + 'services.worker.tasks.extraction_tasks.*': {'queue': 'extraction'}, + 'services.worker.tasks.indexing_tasks.*': {'queue': 'indexing'}, + 'services.worker.tasks.verification_tasks.*': {'queue': 'verification'}, + }, + + # Configuration des queues + task_default_queue='default', + task_queues=( + Queue('default', routing_key='default'), + Queue('pipeline', routing_key='pipeline'), + Queue('ocr', routing_key='ocr'), + Queue('classification', routing_key='classification'), + Queue('extraction', routing_key='extraction'), + Queue('indexing', routing_key='indexing'), + Queue('verification', routing_key='verification'), + ), + + # Configuration des tâches + task_serializer='json', + accept_content=['json'], + result_serializer='json', + timezone='Europe/Paris', + enable_utc=True, + + # Configuration de la concurrence + worker_concurrency=int(os.getenv("WORKER_CONCURRENCY", "2")), + worker_prefetch_multiplier=1, + task_acks_late=True, + worker_disable_rate_limits=False, + + # Configuration des timeouts + task_soft_time_limit=300, # 5 minutes + task_time_limit=600, # 10 minutes + worker_max_tasks_per_child=1000, + + # Configuration des retry + task_default_retry_delay=60, + task_max_retries=3, + + # Configuration des résultats + result_expires=3600, # 1 heure + result_persistent=True, + + # Configuration du monitoring + worker_send_task_events=True, + task_send_sent_event=True, + + # Configuration des logs + worker_log_format='[%(asctime)s: %(levelname)s/%(processName)s] %(message)s', + worker_task_log_format='[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s', +) + +# Configuration des tâches périodiques (Celery Beat) +app.conf.beat_schedule = { + 'health-check': { + 'task': 'services.worker.tasks.pipeline_tasks.health_check', + 'schedule': 60.0, # Toutes les minutes + }, + 'cleanup-old-results': { + 'task': 'services.worker.tasks.pipeline_tasks.cleanup_old_results', + 'schedule': 3600.0, # Toutes les heures + }, + 'update-external-data': { + 'task': 'services.worker.tasks.verification_tasks.update_external_data', + 'schedule': 86400.0, # Tous les jours + }, +} + +# Configuration des signaux +@app.task(bind=True) +def debug_task(self): + """Tâche de debug pour tester Celery""" + print(f'Request: {self.request!r}') + return 'Debug task completed' + +if __name__ == '__main__': + app.start() diff --git a/services/worker/tasks/__init__.py b/services/worker/tasks/__init__.py new file mode 100644 index 0000000..ac0ae70 --- /dev/null +++ b/services/worker/tasks/__init__.py @@ -0,0 +1,3 @@ +""" +Package des tâches Celery pour le worker Notariat Pipeline +""" diff --git a/services/worker/tasks/classification_tasks.py b/services/worker/tasks/classification_tasks.py new file mode 100644 index 0000000..677e90b --- /dev/null +++ b/services/worker/tasks/classification_tasks.py @@ -0,0 +1,117 @@ +""" +Tâches de classification des documents +""" +import logging +from typing import Dict, Any +from services.worker.celery_app import app + +logger = logging.getLogger(__name__) + +@app.task(bind=True, name='classification.classify_document') +def classify_document(self, doc_id: str, text: str, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Classification d'un document + + Args: + doc_id: Identifiant du document + text: Texte extrait du document + context: Contexte de traitement + + Returns: + Résultat de la classification + """ + try: + logger.info(f"Début de la classification pour le document {doc_id}") + + # Mise à jour du statut + self.update_state( + state='PROGRESS', + meta={'current_step': 'classification_processing', 'progress': 0} + ) + + # TODO: Implémenter la classification réelle avec Ollama + # Pour l'instant, simulation + import time + time.sleep(1) # Simulation du traitement + + # Classification simulée + document_types = [ + 'acte_vente', + 'acte_donation', + 'acte_succession', + 'cni', + 'contrat', + 'autre' + ] + + # Simulation basée sur le contenu du texte + if 'vente' in text.lower() or 'achat' in text.lower(): + predicted_type = 'acte_vente' + confidence = 0.85 + elif 'donation' in text.lower() or 'don' in text.lower(): + predicted_type = 'acte_donation' + confidence = 0.80 + elif 'succession' in text.lower() or 'héritage' in text.lower(): + predicted_type = 'acte_succession' + confidence = 0.75 + elif 'carte' in text.lower() and 'identité' in text.lower(): + predicted_type = 'cni' + confidence = 0.90 + else: + predicted_type = 'autre' + confidence = 0.60 + + result = { + 'doc_id': doc_id, + 'status': 'completed', + 'predicted_type': predicted_type, + 'confidence': confidence, + 'all_predictions': { + doc_type: 0.1 if doc_type != predicted_type else confidence + for doc_type in document_types + }, + 'processing_time': 1.0 + } + + logger.info(f"Classification terminée pour le document {doc_id}: {predicted_type} (confiance: {confidence})") + return result + + except Exception as e: + logger.error(f"Erreur lors de la classification du document {doc_id}: {e}") + raise + +@app.task(name='classification.batch_classify') +def batch_classify_documents(doc_ids: list, texts: list) -> Dict[str, Any]: + """ + Classification en lot de documents + + Args: + doc_ids: Liste des identifiants de documents + texts: Liste des textes correspondants + + Returns: + Résultats de la classification en lot + """ + if len(doc_ids) != len(texts): + raise ValueError("Le nombre de documents doit correspondre au nombre de textes") + + logger.info(f"Classification en lot de {len(doc_ids)} documents") + + results = [] + for doc_id, text in zip(doc_ids, texts): + try: + result = classify_document.delay(doc_id, text, {}).get() + results.append(result) + except Exception as e: + logger.error(f"Erreur lors de la classification en lot pour {doc_id}: {e}") + results.append({ + 'doc_id': doc_id, + 'status': 'failed', + 'error': str(e) + }) + + return { + 'batch_status': 'completed', + 'total_documents': len(doc_ids), + 'results': results + } diff --git a/services/worker/tasks/extraction_tasks.py b/services/worker/tasks/extraction_tasks.py new file mode 100644 index 0000000..1f36e0f --- /dev/null +++ b/services/worker/tasks/extraction_tasks.py @@ -0,0 +1,138 @@ +""" +Tâches d'extraction d'entités des documents +""" +import logging +from typing import Dict, Any, List +from services.worker.celery_app import app + +logger = logging.getLogger(__name__) + +@app.task(bind=True, name='extraction.extract_entities') +def extract_entities(self, doc_id: str, text: str, doc_type: str, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Extraction d'entités d'un document + + Args: + doc_id: Identifiant du document + text: Texte extrait du document + doc_type: Type de document classifié + context: Contexte de traitement + + Returns: + Résultat de l'extraction d'entités + """ + try: + logger.info(f"Début de l'extraction d'entités pour le document {doc_id}") + + # Mise à jour du statut + self.update_state( + state='PROGRESS', + meta={'current_step': 'entity_extraction', 'progress': 0} + ) + + # TODO: Implémenter l'extraction réelle avec LLM + # Pour l'instant, simulation + import time + time.sleep(2) # Simulation du traitement + + # Extraction simulée basée sur le type de document + entities = {} + + if doc_type == 'acte_vente': + entities = { + 'vendeur': { + 'nom': 'Dupont', + 'prenom': 'Jean', + 'adresse': '123 Rue de la Paix, 75001 Paris' + }, + 'acheteur': { + 'nom': 'Martin', + 'prenom': 'Marie', + 'adresse': '456 Avenue des Champs, 75008 Paris' + }, + 'bien': { + 'adresse': '789 Boulevard Saint-Germain, 75006 Paris', + 'surface': '85 m²', + 'prix': '450000 €' + }, + 'notaire': { + 'nom': 'Durand', + 'etude': 'Etude Durand & Associés' + } + } + elif doc_type == 'cni': + entities = { + 'personne': { + 'nom': 'Dupont', + 'prenom': 'Jean', + 'date_naissance': '1985-03-15', + 'lieu_naissance': 'Paris', + 'nationalite': 'Française' + }, + 'document': { + 'numero': '123456789', + 'pays': 'France', + 'date_emission': '2020-01-15', + 'date_expiration': '2030-01-15' + } + } + else: + entities = { + 'personnes': [], + 'adresses': [], + 'montants': [], + 'dates': [] + } + + result = { + 'doc_id': doc_id, + 'status': 'completed', + 'entities': entities, + 'confidence': 0.85, + 'extraction_method': 'llm_simulation', + 'processing_time': 2.0 + } + + logger.info(f"Extraction d'entités terminée pour le document {doc_id}") + return result + + except Exception as e: + logger.error(f"Erreur lors de l'extraction d'entités du document {doc_id}: {e}") + raise + +@app.task(name='extraction.batch_extract') +def batch_extract_entities(doc_ids: list, texts: list, doc_types: list) -> Dict[str, Any]: + """ + Extraction d'entités en lot + + Args: + doc_ids: Liste des identifiants de documents + texts: Liste des textes correspondants + doc_types: Liste des types de documents correspondants + + Returns: + Résultats de l'extraction en lot + """ + if len(doc_ids) != len(texts) or len(doc_ids) != len(doc_types): + raise ValueError("Le nombre de documents, textes et types doit être identique") + + logger.info(f"Extraction d'entités en lot de {len(doc_ids)} documents") + + results = [] + for doc_id, text, doc_type in zip(doc_ids, texts, doc_types): + try: + result = extract_entities.delay(doc_id, text, doc_type, {}).get() + results.append(result) + except Exception as e: + logger.error(f"Erreur lors de l'extraction en lot pour {doc_id}: {e}") + results.append({ + 'doc_id': doc_id, + 'status': 'failed', + 'error': str(e) + }) + + return { + 'batch_status': 'completed', + 'total_documents': len(doc_ids), + 'results': results + } diff --git a/services/worker/tasks/indexing_tasks.py b/services/worker/tasks/indexing_tasks.py new file mode 100644 index 0000000..f37937f --- /dev/null +++ b/services/worker/tasks/indexing_tasks.py @@ -0,0 +1,97 @@ +""" +Tâches d'indexation des documents +""" +import logging +from typing import Dict, Any +from services.worker.celery_app import app + +logger = logging.getLogger(__name__) + +@app.task(bind=True, name='indexing.index_document') +def index_document(self, doc_id: str, text: str, entities: Dict[str, Any], doc_type: str, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Indexation d'un document dans les systèmes de recherche + + Args: + doc_id: Identifiant du document + text: Texte extrait du document + entities: Entités extraites + doc_type: Type de document + context: Contexte de traitement + + Returns: + Résultat de l'indexation + """ + try: + logger.info(f"Début de l'indexation pour le document {doc_id}") + + # Mise à jour du statut + self.update_state( + state='PROGRESS', + meta={'current_step': 'indexing', 'progress': 0} + ) + + # TODO: Implémenter l'indexation réelle + # - Indexation dans AnythingLLM + # - Indexation dans OpenSearch + # - Création du graphe Neo4j + + import time + time.sleep(1) # Simulation du traitement + + result = { + 'doc_id': doc_id, + 'status': 'completed', + 'indexed_in': { + 'anythingllm': True, + 'opensearch': True, + 'neo4j': True + }, + 'chunks_created': 5, + 'processing_time': 1.0 + } + + logger.info(f"Indexation terminée pour le document {doc_id}") + return result + + except Exception as e: + logger.error(f"Erreur lors de l'indexation du document {doc_id}: {e}") + raise + +@app.task(name='indexing.batch_index') +def batch_index_documents(doc_ids: list, texts: list, entities_list: list, doc_types: list) -> Dict[str, Any]: + """ + Indexation en lot de documents + + Args: + doc_ids: Liste des identifiants de documents + texts: Liste des textes correspondants + entities_list: Liste des entités correspondantes + doc_types: Liste des types de documents correspondants + + Returns: + Résultats de l'indexation en lot + """ + if len(doc_ids) != len(texts) or len(doc_ids) != len(entities_list) or len(doc_ids) != len(doc_types): + raise ValueError("Le nombre de documents, textes, entités et types doit être identique") + + logger.info(f"Indexation en lot de {len(doc_ids)} documents") + + results = [] + for doc_id, text, entities, doc_type in zip(doc_ids, texts, entities_list, doc_types): + try: + result = index_document.delay(doc_id, text, entities, doc_type, {}).get() + results.append(result) + except Exception as e: + logger.error(f"Erreur lors de l'indexation en lot pour {doc_id}: {e}") + results.append({ + 'doc_id': doc_id, + 'status': 'failed', + 'error': str(e) + }) + + return { + 'batch_status': 'completed', + 'total_documents': len(doc_ids), + 'results': results + } diff --git a/services/worker/tasks/ocr_tasks.py b/services/worker/tasks/ocr_tasks.py new file mode 100644 index 0000000..78b8cb8 --- /dev/null +++ b/services/worker/tasks/ocr_tasks.py @@ -0,0 +1,82 @@ +""" +Tâches OCR pour le traitement des documents +""" +import logging +from typing import Dict, Any +from services.worker.celery_app import app + +logger = logging.getLogger(__name__) + +@app.task(bind=True, name='ocr.process_document') +def process_document_ocr(self, doc_id: str, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Traitement OCR d'un document + + Args: + doc_id: Identifiant du document + context: Contexte de traitement + + Returns: + Résultat de l'OCR + """ + try: + logger.info(f"Début de l'OCR pour le document {doc_id}") + + # Mise à jour du statut + self.update_state( + state='PROGRESS', + meta={'current_step': 'ocr_processing', 'progress': 0} + ) + + # TODO: Implémenter le traitement OCR réel + # Pour l'instant, simulation + import time + time.sleep(2) # Simulation du traitement + + result = { + 'doc_id': doc_id, + 'status': 'completed', + 'text_extracted': 'Texte extrait simulé', + 'confidence': 0.95, + 'pages_processed': 1, + 'processing_time': 2.0 + } + + logger.info(f"OCR terminé pour le document {doc_id}") + return result + + except Exception as e: + logger.error(f"Erreur lors de l'OCR du document {doc_id}: {e}") + raise + +@app.task(name='ocr.batch_process') +def batch_process_ocr(doc_ids: list) -> Dict[str, Any]: + """ + Traitement OCR en lot + + Args: + doc_ids: Liste des identifiants de documents + + Returns: + Résultats du traitement OCR en lot + """ + logger.info(f"Traitement OCR en lot de {len(doc_ids)} documents") + + results = [] + for doc_id in doc_ids: + try: + result = process_document_ocr.delay(doc_id, {}).get() + results.append(result) + except Exception as e: + logger.error(f"Erreur lors de l'OCR en lot pour {doc_id}: {e}") + results.append({ + 'doc_id': doc_id, + 'status': 'failed', + 'error': str(e) + }) + + return { + 'batch_status': 'completed', + 'total_documents': len(doc_ids), + 'results': results + } diff --git a/services/worker/tasks/pipeline_tasks.py b/services/worker/tasks/pipeline_tasks.py new file mode 100644 index 0000000..953e539 --- /dev/null +++ b/services/worker/tasks/pipeline_tasks.py @@ -0,0 +1,249 @@ +""" +Tâches principales du pipeline de traitement des documents +""" +import os +import time +import logging +from typing import Dict, Any, Optional +from celery import current_task +from services.worker.celery_app import app +from services.worker.pipelines import ( + preprocess, ocr, classify, extract, index, checks, finalize +) + +logger = logging.getLogger(__name__) + +@app.task(bind=True, name='pipeline.process_document') +def process_document_pipeline(self, doc_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]: + """ + Pipeline principal de traitement d'un document + + Args: + doc_id: Identifiant unique du document + metadata: Métadonnées du document (dossier, étude, utilisateur, etc.) + + Returns: + Dict contenant le résultat du traitement + """ + start_time = time.time() + context = { + 'doc_id': doc_id, + 'metadata': metadata, + 'start_time': start_time, + 'steps_completed': [], + 'errors': [], + 'warnings': [] + } + + try: + logger.info(f"Début du traitement du document {doc_id}") + + # Mise à jour du statut + self.update_state( + state='PROGRESS', + meta={'current_step': 'preprocess', 'progress': 0} + ) + + # 1. Préprocessing + try: + preprocess.run(doc_id, context) + context['steps_completed'].append('preprocess') + self.update_state( + state='PROGRESS', + meta={'current_step': 'ocr', 'progress': 14} + ) + except Exception as e: + logger.error(f"Erreur lors du préprocessing du document {doc_id}: {e}") + context['errors'].append(f"Préprocessing: {str(e)}") + raise + + # 2. OCR + try: + ocr.run(doc_id, context) + context['steps_completed'].append('ocr') + self.update_state( + state='PROGRESS', + meta={'current_step': 'classify', 'progress': 28} + ) + except Exception as e: + logger.error(f"Erreur lors de l'OCR du document {doc_id}: {e}") + context['errors'].append(f"OCR: {str(e)}") + raise + + # 3. Classification + try: + classify.run(doc_id, context) + context['steps_completed'].append('classify') + self.update_state( + state='PROGRESS', + meta={'current_step': 'extract', 'progress': 42} + ) + except Exception as e: + logger.error(f"Erreur lors de la classification du document {doc_id}: {e}") + context['errors'].append(f"Classification: {str(e)}") + raise + + # 4. Extraction d'entités + try: + extract.run(doc_id, context) + context['steps_completed'].append('extract') + self.update_state( + state='PROGRESS', + meta={'current_step': 'index', 'progress': 56} + ) + except Exception as e: + logger.error(f"Erreur lors de l'extraction du document {doc_id}: {e}") + context['errors'].append(f"Extraction: {str(e)}") + raise + + # 5. Indexation + try: + index.run(doc_id, context) + context['steps_completed'].append('index') + self.update_state( + state='PROGRESS', + meta={'current_step': 'checks', 'progress': 70} + ) + except Exception as e: + logger.error(f"Erreur lors de l'indexation du document {doc_id}: {e}") + context['errors'].append(f"Indexation: {str(e)}") + raise + + # 6. Vérifications + try: + checks.run(doc_id, context) + context['steps_completed'].append('checks') + self.update_state( + state='PROGRESS', + meta={'current_step': 'finalize', 'progress': 84} + ) + except Exception as e: + logger.error(f"Erreur lors des vérifications du document {doc_id}: {e}") + context['errors'].append(f"Vérifications: {str(e)}") + raise + + # 7. Finalisation + try: + finalize.run(doc_id, context) + context['steps_completed'].append('finalize') + self.update_state( + state='PROGRESS', + meta={'current_step': 'completed', 'progress': 100} + ) + except Exception as e: + logger.error(f"Erreur lors de la finalisation du document {doc_id}: {e}") + context['errors'].append(f"Finalisation: {str(e)}") + raise + + # Calcul du temps de traitement + processing_time = time.time() - start_time + context['processing_time'] = processing_time + context['status'] = 'completed' + + logger.info(f"Traitement terminé pour le document {doc_id} en {processing_time:.2f}s") + + return { + 'doc_id': doc_id, + 'status': 'completed', + 'processing_time': processing_time, + 'steps_completed': context['steps_completed'], + 'errors': context['errors'], + 'warnings': context['warnings'], + 'result': context.get('final_result', {}) + } + + except Exception as e: + processing_time = time.time() - start_time + logger.error(f"Erreur fatale lors du traitement du document {doc_id}: {e}") + + return { + 'doc_id': doc_id, + 'status': 'failed', + 'processing_time': processing_time, + 'steps_completed': context['steps_completed'], + 'errors': context['errors'] + [f"Erreur fatale: {str(e)}"], + 'warnings': context['warnings'], + 'result': {} + } + +@app.task(name='pipeline.health_check') +def health_check(): + """Vérification de santé du worker""" + return { + 'status': 'healthy', + 'timestamp': time.time(), + 'worker_id': os.getenv('HOSTNAME', 'unknown') + } + +@app.task(name='pipeline.cleanup_old_results') +def cleanup_old_results(): + """Nettoyage des anciens résultats""" + # TODO: Implémenter le nettoyage des anciens résultats + logger.info("Nettoyage des anciens résultats") + return {'cleaned': 0} + +@app.task(bind=True, name='pipeline.reprocess_document') +def reprocess_document(self, doc_id: str, force: bool = False) -> Dict[str, Any]: + """ + Retraitement d'un document + + Args: + doc_id: Identifiant du document à retraiter + force: Force le retraitement même si déjà traité + + Returns: + Résultat du retraitement + """ + logger.info(f"Retraitement du document {doc_id} (force={force})") + + # TODO: Récupérer les métadonnées depuis la base de données + metadata = { + 'id_dossier': 'unknown', + 'etude_id': 'unknown', + 'utilisateur_id': 'unknown', + 'source': 'reprocess' + } + + return process_document_pipeline.delay(doc_id, metadata).get() + +@app.task(name='pipeline.batch_process') +def batch_process_documents(doc_ids: list, metadata_list: list) -> Dict[str, Any]: + """ + Traitement en lot de plusieurs documents + + Args: + doc_ids: Liste des identifiants de documents + metadata_list: Liste des métadonnées correspondantes + + Returns: + Résultats du traitement en lot + """ + if len(doc_ids) != len(metadata_list): + raise ValueError("Le nombre de documents doit correspondre au nombre de métadonnées") + + logger.info(f"Traitement en lot de {len(doc_ids)} documents") + + # Lancement des tâches en parallèle + tasks = [] + for doc_id, metadata in zip(doc_ids, metadata_list): + task = process_document_pipeline.delay(doc_id, metadata) + tasks.append(task) + + # Attente des résultats + results = [] + for task in tasks: + try: + result = task.get(timeout=600) # 10 minutes de timeout + results.append(result) + except Exception as e: + logger.error(f"Erreur lors du traitement en lot: {e}") + results.append({ + 'status': 'failed', + 'error': str(e) + }) + + return { + 'batch_status': 'completed', + 'total_documents': len(doc_ids), + 'results': results + } diff --git a/services/worker/tasks/verification_tasks.py b/services/worker/tasks/verification_tasks.py new file mode 100644 index 0000000..0b2eff5 --- /dev/null +++ b/services/worker/tasks/verification_tasks.py @@ -0,0 +1,132 @@ +""" +Tâches de vérification et contrôle qualité +""" +import logging +from typing import Dict, Any +from services.worker.celery_app import app + +logger = logging.getLogger(__name__) + +@app.task(bind=True, name='verification.verify_document') +def verify_document(self, doc_id: str, entities: Dict[str, Any], doc_type: str, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Vérification et contrôle qualité d'un document + + Args: + doc_id: Identifiant du document + entities: Entités extraites + doc_type: Type de document + context: Contexte de traitement + + Returns: + Résultat des vérifications + """ + try: + logger.info(f"Début des vérifications pour le document {doc_id}") + + # Mise à jour du statut + self.update_state( + state='PROGRESS', + meta={'current_step': 'verification', 'progress': 0} + ) + + # TODO: Implémenter les vérifications réelles + # - Vérifications externes (Cadastre, Géorisques, BODACC, etc.) + # - Contrôles de cohérence + # - Calcul du score de vraisemblance + + import time + time.sleep(2) # Simulation du traitement + + # Vérifications simulées + verifications = { + 'cadastre': { + 'status': 'verified', + 'confidence': 0.95, + 'details': 'Adresse vérifiée dans le cadastre' + }, + 'georisques': { + 'status': 'verified', + 'confidence': 0.90, + 'details': 'Aucun risque majeur identifié' + }, + 'bodacc': { + 'status': 'verified', + 'confidence': 0.85, + 'details': 'Personnes vérifiées dans le BODACC' + } + } + + # Calcul du score de vraisemblance + credibility_score = 0.90 + + result = { + 'doc_id': doc_id, + 'status': 'completed', + 'verifications': verifications, + 'credibility_score': credibility_score, + 'manual_review_required': credibility_score < 0.75, + 'processing_time': 2.0 + } + + logger.info(f"Vérifications terminées pour le document {doc_id} (score: {credibility_score})") + return result + + except Exception as e: + logger.error(f"Erreur lors des vérifications du document {doc_id}: {e}") + raise + +@app.task(name='verification.batch_verify') +def batch_verify_documents(doc_ids: list, entities_list: list, doc_types: list) -> Dict[str, Any]: + """ + Vérification en lot de documents + + Args: + doc_ids: Liste des identifiants de documents + entities_list: Liste des entités correspondantes + doc_types: Liste des types de documents correspondants + + Returns: + Résultats des vérifications en lot + """ + if len(doc_ids) != len(entities_list) or len(doc_ids) != len(doc_types): + raise ValueError("Le nombre de documents, entités et types doit être identique") + + logger.info(f"Vérification en lot de {len(doc_ids)} documents") + + results = [] + for doc_id, entities, doc_type in zip(doc_ids, entities_list, doc_types): + try: + result = verify_document.delay(doc_id, entities, doc_type, {}).get() + results.append(result) + except Exception as e: + logger.error(f"Erreur lors de la vérification en lot pour {doc_id}: {e}") + results.append({ + 'doc_id': doc_id, + 'status': 'failed', + 'error': str(e) + }) + + return { + 'batch_status': 'completed', + 'total_documents': len(doc_ids), + 'results': results + } + +@app.task(name='verification.update_external_data') +def update_external_data(): + """ + Mise à jour des données externes (APIs gouvernementales) + """ + logger.info("Mise à jour des données externes") + + # TODO: Implémenter la mise à jour des données externes + # - Synchronisation avec les APIs gouvernementales + # - Mise à jour des caches + # - Actualisation des référentiels + + return { + 'status': 'completed', + 'updated_sources': ['cadastre', 'georisques', 'bodacc'], + 'timestamp': '2025-01-09T10:00:00Z' + }