""" Pipeline de finalisation et mise à jour de la base de données """ import os import logging from typing import Dict, Any from utils.database import Document, ProcessingLog, SessionLocal from utils.storage import cleanup_temp_file logger = logging.getLogger(__name__) def run(doc_id: str, ctx: dict): """ Finalisation du traitement d'un document """ logger.info(f"Finalisation du document {doc_id}") try: db = ctx.get("db") if not db: db = SessionLocal() ctx["db"] = db # Récupération du document document = db.query(Document).filter(Document.id == doc_id).first() if not document: raise ValueError(f"Document {doc_id} non trouvé") # Récupération des résultats de traitement classification = ctx.get("classification", {}) extracted_data = ctx.get("extracted_data", {}) checks_results = ctx.get("checks_results", []) overall_status = ctx.get("overall_status", "completed") # Mise à jour du document _update_document_status(document, overall_status, classification, extracted_data, checks_results, db) # Nettoyage des fichiers temporaires _cleanup_temp_files(ctx) # Création du log de finalisation _create_finalization_log(doc_id, overall_status, db) # Métadonnées de finalisation finalize_meta = { "finalization_completed": True, "final_status": overall_status, "total_processing_time": ctx.get("total_processing_time", 0), "cleanup_completed": True } ctx["finalize_meta"] = finalize_meta logger.info(f"Finalisation terminée pour le document {doc_id} - Statut: {overall_status}") except Exception as e: logger.error(f"Erreur lors de la finalisation du document {doc_id}: {e}") raise def _update_document_status(document: Document, status: str, classification: Dict[str, Any], extracted_data: Dict[str, Any], checks_results: list, db): """ Mise à jour du statut et des données du document """ try: # Mise à jour du statut document.status = status # Mise à jour des données extraites document.extracted_data = extracted_data # Mise à jour des étapes de traitement processing_steps = { "preprocessing": ctx.get("preprocessing_meta", {}), "ocr": ctx.get("ocr_meta", {}), "classification": ctx.get("classify_meta", {}), "extraction": ctx.get("extract_meta", {}), "indexation": ctx.get("index_meta", {}), "checks": ctx.get("checks_meta", {}), "finalization": ctx.get("finalize_meta", {}) } document.processing_steps = processing_steps # Mise à jour des erreurs si nécessaire if status == "failed": errors = document.errors or [] errors.append("Traitement échoué") document.errors = errors elif status == "manual_review": errors = document.errors or [] errors.append("Révision manuelle requise") document.errors = errors # Sauvegarde db.commit() logger.info(f"Document {document.id} mis à jour avec le statut {status}") except Exception as e: logger.error(f"Erreur lors de la mise à jour du document: {e}") db.rollback() raise def _cleanup_temp_files(ctx: Dict[str, Any]): """ Nettoyage des fichiers temporaires """ try: # Nettoyage du fichier PDF temporaire temp_pdf = ctx.get("temp_pdf_path") if temp_pdf: cleanup_temp_file(temp_pdf) logger.info(f"Fichier PDF temporaire nettoyé: {temp_pdf}") # Nettoyage du fichier image temporaire temp_image = ctx.get("temp_image_path") if temp_image: cleanup_temp_file(temp_image) logger.info(f"Fichier image temporaire nettoyé: {temp_image}") except Exception as e: logger.warning(f"Erreur lors du nettoyage des fichiers temporaires: {e}") def _create_finalization_log(doc_id: str, status: str, db): """ Création du log de finalisation """ try: log_entry = ProcessingLog( document_id=doc_id, step_name="finalization", status="completed" if status in ["completed", "manual_review"] else "failed", metadata={ "final_status": status, "step": "finalization" } ) db.add(log_entry) db.commit() logger.info(f"Log de finalisation créé pour le document {doc_id}") except Exception as e: logger.error(f"Erreur lors de la création du log de finalisation: {e}") def _generate_processing_summary(ctx: Dict[str, Any]) -> Dict[str, Any]: """ Génération d'un résumé du traitement """ summary = { "document_id": ctx.get("doc_id"), "processing_steps": { "preprocessing": ctx.get("preprocessing_meta", {}), "ocr": ctx.get("ocr_meta", {}), "classification": ctx.get("classify_meta", {}), "extraction": ctx.get("extract_meta", {}), "indexation": ctx.get("index_meta", {}), "checks": ctx.get("checks_meta", {}), "finalization": ctx.get("finalize_meta", {}) }, "results": { "classification": ctx.get("classification", {}), "extracted_data": ctx.get("extracted_data", {}), "checks_results": ctx.get("checks_results", []), "overall_status": ctx.get("overall_status", "unknown") }, "statistics": { "text_length": len(ctx.get("extracted_text", "")), "processing_time": ctx.get("total_processing_time", 0), "artifacts_created": len(ctx.get("artifacts", [])) } } return summary