4NK_IA_back/services/worker/tasks/pipeline_tasks.py
Nicolas Cantu 5ad559a263 feat: Intégration Celery et amélioration infrastructure
- Ajout de l'intégration Celery pour les tâches asynchrones
- Création des tâches spécialisées (OCR, classification, extraction, indexation, vérification)
- Configuration des queues Celery avec Redis
- Création du fichier d'environnement complet (.env.example et .env)
- Script bootstrap automatisé pour l'initialisation
- Amélioration du worker avec orchestration des pipelines
- Ajout des dépendances Celery et Kombu

Fonctionnalités ajoutées:
- Pipeline de traitement asynchrone avec Celery
- Tâches de traitement en lot
- Monitoring et health checks
- Configuration d'environnement centralisée
- Script d'initialisation automatisé
2025-09-10 18:26:53 +02:00

250 lines
8.2 KiB
Python

"""
Tâches principales du pipeline de traitement des documents
"""
import os
import time
import logging
from typing import Dict, Any, Optional
from celery import current_task
from services.worker.celery_app import app
from services.worker.pipelines import (
preprocess, ocr, classify, extract, index, checks, finalize
)
logger = logging.getLogger(__name__)
@app.task(bind=True, name='pipeline.process_document')
def process_document_pipeline(self, doc_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Pipeline principal de traitement d'un document
Args:
doc_id: Identifiant unique du document
metadata: Métadonnées du document (dossier, étude, utilisateur, etc.)
Returns:
Dict contenant le résultat du traitement
"""
start_time = time.time()
context = {
'doc_id': doc_id,
'metadata': metadata,
'start_time': start_time,
'steps_completed': [],
'errors': [],
'warnings': []
}
try:
logger.info(f"Début du traitement du document {doc_id}")
# Mise à jour du statut
self.update_state(
state='PROGRESS',
meta={'current_step': 'preprocess', 'progress': 0}
)
# 1. Préprocessing
try:
preprocess.run(doc_id, context)
context['steps_completed'].append('preprocess')
self.update_state(
state='PROGRESS',
meta={'current_step': 'ocr', 'progress': 14}
)
except Exception as e:
logger.error(f"Erreur lors du préprocessing du document {doc_id}: {e}")
context['errors'].append(f"Préprocessing: {str(e)}")
raise
# 2. OCR
try:
ocr.run(doc_id, context)
context['steps_completed'].append('ocr')
self.update_state(
state='PROGRESS',
meta={'current_step': 'classify', 'progress': 28}
)
except Exception as e:
logger.error(f"Erreur lors de l'OCR du document {doc_id}: {e}")
context['errors'].append(f"OCR: {str(e)}")
raise
# 3. Classification
try:
classify.run(doc_id, context)
context['steps_completed'].append('classify')
self.update_state(
state='PROGRESS',
meta={'current_step': 'extract', 'progress': 42}
)
except Exception as e:
logger.error(f"Erreur lors de la classification du document {doc_id}: {e}")
context['errors'].append(f"Classification: {str(e)}")
raise
# 4. Extraction d'entités
try:
extract.run(doc_id, context)
context['steps_completed'].append('extract')
self.update_state(
state='PROGRESS',
meta={'current_step': 'index', 'progress': 56}
)
except Exception as e:
logger.error(f"Erreur lors de l'extraction du document {doc_id}: {e}")
context['errors'].append(f"Extraction: {str(e)}")
raise
# 5. Indexation
try:
index.run(doc_id, context)
context['steps_completed'].append('index')
self.update_state(
state='PROGRESS',
meta={'current_step': 'checks', 'progress': 70}
)
except Exception as e:
logger.error(f"Erreur lors de l'indexation du document {doc_id}: {e}")
context['errors'].append(f"Indexation: {str(e)}")
raise
# 6. Vérifications
try:
checks.run(doc_id, context)
context['steps_completed'].append('checks')
self.update_state(
state='PROGRESS',
meta={'current_step': 'finalize', 'progress': 84}
)
except Exception as e:
logger.error(f"Erreur lors des vérifications du document {doc_id}: {e}")
context['errors'].append(f"Vérifications: {str(e)}")
raise
# 7. Finalisation
try:
finalize.run(doc_id, context)
context['steps_completed'].append('finalize')
self.update_state(
state='PROGRESS',
meta={'current_step': 'completed', 'progress': 100}
)
except Exception as e:
logger.error(f"Erreur lors de la finalisation du document {doc_id}: {e}")
context['errors'].append(f"Finalisation: {str(e)}")
raise
# Calcul du temps de traitement
processing_time = time.time() - start_time
context['processing_time'] = processing_time
context['status'] = 'completed'
logger.info(f"Traitement terminé pour le document {doc_id} en {processing_time:.2f}s")
return {
'doc_id': doc_id,
'status': 'completed',
'processing_time': processing_time,
'steps_completed': context['steps_completed'],
'errors': context['errors'],
'warnings': context['warnings'],
'result': context.get('final_result', {})
}
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Erreur fatale lors du traitement du document {doc_id}: {e}")
return {
'doc_id': doc_id,
'status': 'failed',
'processing_time': processing_time,
'steps_completed': context['steps_completed'],
'errors': context['errors'] + [f"Erreur fatale: {str(e)}"],
'warnings': context['warnings'],
'result': {}
}
@app.task(name='pipeline.health_check')
def health_check():
"""Vérification de santé du worker"""
return {
'status': 'healthy',
'timestamp': time.time(),
'worker_id': os.getenv('HOSTNAME', 'unknown')
}
@app.task(name='pipeline.cleanup_old_results')
def cleanup_old_results():
"""Nettoyage des anciens résultats"""
# TODO: Implémenter le nettoyage des anciens résultats
logger.info("Nettoyage des anciens résultats")
return {'cleaned': 0}
@app.task(bind=True, name='pipeline.reprocess_document')
def reprocess_document(self, doc_id: str, force: bool = False) -> Dict[str, Any]:
"""
Retraitement d'un document
Args:
doc_id: Identifiant du document à retraiter
force: Force le retraitement même si déjà traité
Returns:
Résultat du retraitement
"""
logger.info(f"Retraitement du document {doc_id} (force={force})")
# TODO: Récupérer les métadonnées depuis la base de données
metadata = {
'id_dossier': 'unknown',
'etude_id': 'unknown',
'utilisateur_id': 'unknown',
'source': 'reprocess'
}
return process_document_pipeline.delay(doc_id, metadata).get()
@app.task(name='pipeline.batch_process')
def batch_process_documents(doc_ids: list, metadata_list: list) -> Dict[str, Any]:
"""
Traitement en lot de plusieurs documents
Args:
doc_ids: Liste des identifiants de documents
metadata_list: Liste des métadonnées correspondantes
Returns:
Résultats du traitement en lot
"""
if len(doc_ids) != len(metadata_list):
raise ValueError("Le nombre de documents doit correspondre au nombre de métadonnées")
logger.info(f"Traitement en lot de {len(doc_ids)} documents")
# Lancement des tâches en parallèle
tasks = []
for doc_id, metadata in zip(doc_ids, metadata_list):
task = process_document_pipeline.delay(doc_id, metadata)
tasks.append(task)
# Attente des résultats
results = []
for task in tasks:
try:
result = task.get(timeout=600) # 10 minutes de timeout
results.append(result)
except Exception as e:
logger.error(f"Erreur lors du traitement en lot: {e}")
results.append({
'status': 'failed',
'error': str(e)
})
return {
'batch_status': 'completed',
'total_documents': len(doc_ids),
'results': results
}