4NK_IA_back/services/worker/tasks/pipeline_tasks.py
Nicolas Cantu f50481cc38
Some checks failed
publish-images / docker-build-and-push (push) Failing after 23s
chore(ci): ajout workflow publish + nettoyage services + conf hosts
2025-09-11 16:49:23 +02:00

250 lines
8.0 KiB
Python

"""
Tâches principales du pipeline de traitement des documents
"""
import os
import time
import logging
from typing import Dict, Any, Optional
from celery import current_task
from celery_app import app
from pipelines import (
preprocess, ocr, classify, extract, index, checks, finalize
)
logger = logging.getLogger(__name__)
@app.task(bind=True, name='pipeline.process_document')
def process_document_pipeline(self, doc_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Pipeline principal de traitement d'un document
Args:
doc_id: Identifiant unique du document
metadata: Métadonnées du document (dossier, étude, utilisateur, etc.)
Returns:
Dict contenant le résultat du traitement
"""
start_time = time.time()
context = {
'doc_id': doc_id,
'metadata': metadata,
'start_time': start_time,
'steps_completed': [],
'errors': [],
'warnings': []
}
try:
logger.info(f"Début du traitement du document {doc_id}")
# Mise à jour du statut
self.update_state(
state='PROGRESS',
meta={'current_step': 'preprocess', 'progress': 0}
)
# 1. Préprocessing
try:
preprocess.run(doc_id, context)
context['steps_completed'].append('preprocess')
self.update_state(
state='PROGRESS',
meta={'current_step': 'ocr', 'progress': 14}
)
except Exception as e:
logger.error(f"Erreur lors du préprocessing du document {doc_id}: {e}")
context['errors'].append(f"Préprocessing: {str(e)}")
raise
# 2. OCR
try:
ocr.run(doc_id, context)
context['steps_completed'].append('ocr')
self.update_state(
state='PROGRESS',
meta={'current_step': 'classify', 'progress': 28}
)
except Exception as e:
logger.error(f"Erreur lors de l'OCR du document {doc_id}: {e}")
context['errors'].append(f"OCR: {str(e)}")
raise
# 3. Classification
try:
classify.run(doc_id, context)
context['steps_completed'].append('classify')
self.update_state(
state='PROGRESS',
meta={'current_step': 'extract', 'progress': 42}
)
except Exception as e:
logger.error(f"Erreur lors de la classification du document {doc_id}: {e}")
context['errors'].append(f"Classification: {str(e)}")
raise
# 4. Extraction d'entités
try:
extract.run(doc_id, context)
context['steps_completed'].append('extract')
self.update_state(
state='PROGRESS',
meta={'current_step': 'index', 'progress': 56}
)
except Exception as e:
logger.error(f"Erreur lors de l'extraction du document {doc_id}: {e}")
context['errors'].append(f"Extraction: {str(e)}")
raise
# 5. Indexation
try:
index.run(doc_id, context)
context['steps_completed'].append('index')
self.update_state(
state='PROGRESS',
meta={'current_step': 'checks', 'progress': 70}
)
except Exception as e:
logger.error(f"Erreur lors de l'indexation du document {doc_id}: {e}")
context['errors'].append(f"Indexation: {str(e)}")
raise
# 6. Vérifications
try:
checks.run(doc_id, context)
context['steps_completed'].append('checks')
self.update_state(
state='PROGRESS',
meta={'current_step': 'finalize', 'progress': 84}
)
except Exception as e:
logger.error(f"Erreur lors des vérifications du document {doc_id}: {e}")
context['errors'].append(f"Vérifications: {str(e)}")
raise
# 7. Finalisation
try:
finalize.run(doc_id, context)
context['steps_completed'].append('finalize')
self.update_state(
state='PROGRESS',
meta={'current_step': 'completed', 'progress': 100}
)
except Exception as e:
logger.error(f"Erreur lors de la finalisation du document {doc_id}: {e}")
context['errors'].append(f"Finalisation: {str(e)}")
raise
# Calcul du temps de traitement
processing_time = time.time() - start_time
context['processing_time'] = processing_time
context['status'] = 'completed'
logger.info(f"Traitement terminé pour le document {doc_id} en {processing_time:.2f}s")
return {
'doc_id': doc_id,
'status': 'completed',
'processing_time': processing_time,
'steps_completed': context['steps_completed'],
'errors': context['errors'],
'warnings': context['warnings'],
'result': context.get('final_result', {})
}
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"Erreur fatale lors du traitement du document {doc_id}: {e}")
return {
'doc_id': doc_id,
'status': 'failed',
'processing_time': processing_time,
'steps_completed': context['steps_completed'],
'errors': context['errors'] + [f"Erreur fatale: {str(e)}"],
'warnings': context['warnings'],
'result': {}
}
@app.task(name='pipeline.health_check')
def health_check():
"""Vérification de santé du worker"""
return {
'status': 'healthy',
'timestamp': time.time(),
'worker_id': os.getenv('HOSTNAME', 'unknown')
}
@app.task(name='pipeline.cleanup_old_results')
def cleanup_old_results():
"""Nettoyage des anciens résultats"""
# TODO: Implémenter le nettoyage des anciens résultats
logger.info("Nettoyage des anciens résultats")
return {'cleaned': 0}
@app.task(bind=True, name='pipeline.reprocess_document')
def reprocess_document(self, doc_id: str, force: bool = False) -> Dict[str, Any]:
"""
Retraitement d'un document
Args:
doc_id: Identifiant du document à retraiter
force: Force le retraitement même si déjà traité
Returns:
Résultat du retraitement
"""
logger.info(f"Retraitement du document {doc_id} (force={force})")
# TODO: Récupérer les métadonnées depuis la base de données
metadata = {
'id_dossier': 'unknown',
'etude_id': 'unknown',
'utilisateur_id': 'unknown',
'source': 'reprocess'
}
return process_document_pipeline.delay(doc_id, metadata).get()
@app.task(name='pipeline.batch_process')
def batch_process_documents(doc_ids: list, metadata_list: list) -> Dict[str, Any]:
"""
Traitement en lot de plusieurs documents
Args:
doc_ids: Liste des identifiants de documents
metadata_list: Liste des métadonnées correspondantes
Returns:
Résultats du traitement en lot
"""
if len(doc_ids) != len(metadata_list):
raise ValueError("Le nombre de documents doit correspondre au nombre de métadonnées")
logger.info(f"Traitement en lot de {len(doc_ids)} documents")
# Lancement des tâches en parallèle
tasks = []
for doc_id, metadata in zip(doc_ids, metadata_list):
task = process_document_pipeline.delay(doc_id, metadata)
tasks.append(task)
# Attente des résultats
results = []
for task in tasks:
try:
result = task.get(timeout=600) # 10 minutes de timeout
results.append(result)
except Exception as e:
logger.error(f"Erreur lors du traitement en lot: {e}")
results.append({
'status': 'failed',
'error': str(e)
})
return {
'batch_status': 'completed',
'total_documents': len(doc_ids),
'results': results
}