""" Pipeline de classification des documents notariaux """ import os import json import requests from typing import Dict, Any, List import logging logger = logging.getLogger(__name__) # Types de documents supportés DOCUMENT_TYPES = { "acte_vente": { "name": "Acte de Vente", "keywords": ["vente", "achat", "vendeur", "acquéreur", "prix", "bien immobilier"], "patterns": [r"acte.*vente", r"vente.*immobilier", r"achat.*appartement"] }, "acte_donation": { "name": "Acte de Donation", "keywords": ["donation", "don", "donateur", "donataire", "gratuit", "libéralité"], "patterns": [r"acte.*donation", r"donation.*partage", r"don.*manuel"] }, "acte_succession": { "name": "Acte de Succession", "keywords": ["succession", "héritage", "héritier", "défunt", "legs", "testament"], "patterns": [r"acte.*succession", r"partage.*succession", r"inventaire.*succession"] }, "cni": { "name": "Carte d'Identité", "keywords": ["carte", "identité", "nationalité", "naissance", "domicile"], "patterns": [r"carte.*identité", r"passeport", r"titre.*séjour"] }, "contrat": { "name": "Contrat", "keywords": ["contrat", "bail", "location", "engagement", "convention"], "patterns": [r"contrat.*bail", r"contrat.*travail", r"convention.*collective"] }, "autre": { "name": "Autre Document", "keywords": [], "patterns": [] } } def run(doc_id: str, ctx: Dict[str, Any]) -> None: """ Pipeline de classification des documents Args: doc_id: Identifiant du document ctx: Contexte de traitement partagé entre les pipelines """ logger.info(f"🏷️ Début de la classification pour le document {doc_id}") try: # 1. Vérification des prérequis if "ocr_error" in ctx: raise Exception(f"Erreur OCR: {ctx['ocr_error']}") ocr_text = ctx.get("ocr_text", "") if not ocr_text: raise ValueError("Texte OCR manquant") # 2. Classification par règles (rapide) rule_based_classification = _classify_by_rules(ocr_text) # 3. Classification par LLM (plus précise) llm_classification = _classify_by_llm(ocr_text, doc_id) # 4. Fusion des résultats final_classification = _merge_classifications(rule_based_classification, llm_classification) # 5. Mise à jour du contexte ctx.update({ "document_type": final_classification["type"], "classification_confidence": final_classification["confidence"], "classification_method": final_classification["method"], "classification_details": final_classification["details"] }) logger.info(f"✅ Classification terminée pour {doc_id}") logger.info(f" - Type: {final_classification['type']}") logger.info(f" - Confiance: {final_classification['confidence']:.2f}") logger.info(f" - Méthode: {final_classification['method']}") except Exception as e: logger.error(f"❌ Erreur lors de la classification de {doc_id}: {e}") ctx["classification_error"] = str(e) # Classification par défaut ctx.update({ "document_type": "autre", "classification_confidence": 0.0, "classification_method": "error_fallback" }) def _classify_by_rules(text: str) -> Dict[str, Any]: """Classification basée sur des règles et mots-clés""" logger.info("📋 Classification par règles") text_lower = text.lower() scores = {} for doc_type, config in DOCUMENT_TYPES.items(): if doc_type == "autre": continue score = 0 matched_keywords = [] # Score basé sur les mots-clés for keyword in config["keywords"]: if keyword in text_lower: score += 1 matched_keywords.append(keyword) # Score basé sur les patterns regex import re for pattern in config["patterns"]: if re.search(pattern, text_lower): score += 2 # Normalisation du score max_possible_score = len(config["keywords"]) + len(config["patterns"]) * 2 normalized_score = score / max_possible_score if max_possible_score > 0 else 0 scores[doc_type] = { "score": normalized_score, "matched_keywords": matched_keywords, "method": "rules" } # Sélection du meilleur score if scores: best_type = max(scores.keys(), key=lambda k: scores[k]["score"]) best_score = scores[best_type]["score"] return { "type": best_type if best_score > 0.1 else "autre", "confidence": best_score, "method": "rules", "details": scores[best_type] if best_score > 0.1 else {"score": 0, "method": "rules"} } else: return { "type": "autre", "confidence": 0.0, "method": "rules", "details": {"score": 0, "method": "rules"} } def _classify_by_llm(text: str, doc_id: str) -> Dict[str, Any]: """Classification par LLM (Ollama)""" logger.info("🤖 Classification par LLM") try: # Configuration Ollama ollama_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") model = os.getenv("OLLAMA_MODEL", "llama3:8b") # Limitation du texte pour le contexte text_sample = text[:4000] if len(text) > 4000 else text # Prompt de classification prompt = _build_classification_prompt(text_sample) # Appel à Ollama response = requests.post( f"{ollama_url}/api/generate", json={ "model": model, "prompt": prompt, "stream": False, "options": { "temperature": 0.1, "top_p": 0.9 } }, timeout=60 ) if response.status_code == 200: result = response.json() llm_response = result.get("response", "").strip() # Parsing de la réponse JSON try: classification_result = json.loads(llm_response) return { "type": classification_result.get("type", "autre"), "confidence": classification_result.get("confidence", 0.0), "method": "llm", "details": { "model": model, "reasoning": classification_result.get("reasoning", ""), "raw_response": llm_response } } except json.JSONDecodeError: logger.warning("Réponse LLM non-JSON, utilisation de la classification par règles") return _classify_by_rules(text) else: logger.warning(f"Erreur LLM: {response.status_code}") return _classify_by_rules(text) except requests.exceptions.RequestException as e: logger.warning(f"Erreur de connexion LLM: {e}") return _classify_by_rules(text) except Exception as e: logger.warning(f"Erreur LLM: {e}") return _classify_by_rules(text) def _build_classification_prompt(text: str) -> str: """Construit le prompt pour la classification LLM""" return f"""Tu es un expert en documents notariaux. Analyse le texte suivant et classe-le dans une des catégories suivantes : Types de documents possibles : - acte_vente : Acte de vente immobilière - acte_donation : Acte de donation ou don - acte_succession : Acte de succession ou partage - cni : Carte d'identité ou document d'identité - contrat : Contrat (bail, travail, etc.) - autre : Autre type de document Texte à analyser : {text} Réponds UNIQUEMENT avec un JSON valide dans ce format : {{ "type": "acte_vente", "confidence": 0.85, "reasoning": "Le document contient les termes 'vente', 'vendeur', 'acquéreur' et mentionne un bien immobilier" }} Assure-toi que le JSON est valide et que le type correspond exactement à une des catégories listées.""" def _merge_classifications(rule_result: Dict[str, Any], llm_result: Dict[str, Any]) -> Dict[str, Any]: """Fusionne les résultats de classification par règles et LLM""" logger.info("🔄 Fusion des classifications") # Poids des méthodes rule_weight = 0.3 llm_weight = 0.7 # Si LLM a une confiance élevée, on lui fait confiance if llm_result["confidence"] > 0.8: return llm_result # Si les deux méthodes sont d'accord if rule_result["type"] == llm_result["type"]: # Moyenne pondérée des confiances combined_confidence = (rule_result["confidence"] * rule_weight + llm_result["confidence"] * llm_weight) return { "type": rule_result["type"], "confidence": combined_confidence, "method": "merged", "details": { "rule_result": rule_result, "llm_result": llm_result, "weights": {"rules": rule_weight, "llm": llm_weight} } } # Si les méthodes ne sont pas d'accord, on privilégie LLM si sa confiance est > 0.5 if llm_result["confidence"] > 0.5: return llm_result else: return rule_result def get_document_type_info(doc_type: str) -> Dict[str, Any]: """Retourne les informations sur un type de document""" return DOCUMENT_TYPES.get(doc_type, DOCUMENT_TYPES["autre"]) def get_supported_types() -> List[str]: """Retourne la liste des types de documents supportés""" return list(DOCUMENT_TYPES.keys())