""" Classificateur de documents notariaux """ import asyncio import logging import json import re from typing import Dict, Any, Optional, List from enum import Enum import requests from utils.llm_client import LLMClient logger = logging.getLogger(__name__) class DocumentType(str, Enum): """Types de documents notariaux""" ACTE_VENTE = "acte_vente" ACTE_DONATION = "acte_donation" ACTE_SUCCESSION = "acte_succession" CNI = "cni" CONTRAT = "contrat" AUTRE = "autre" class DocumentClassifier: """Classificateur de documents notariaux avec LLM et règles""" def __init__(self): self.llm_client = LLMClient() self.classification_rules = self._load_classification_rules() self.keywords = self._load_keywords() def _load_classification_rules(self) -> Dict[str, List[str]]: """ Règles de classification basées sur des mots-clés """ return { DocumentType.ACTE_VENTE: [ r"acte\s+de\s+vente", r"vente\s+immobilière", r"vendeur.*acheteur", r"prix\s+de\s+vente", r"acquisition\s+immobilière" ], DocumentType.ACTE_DONATION: [ r"acte\s+de\s+donation", r"donation\s+entre\s+vifs", r"donateur.*donataire", r"donation\s+partage" ], DocumentType.ACTE_SUCCESSION: [ r"acte\s+de\s+notoriété", r"succession", r"héritier", r"héritiers", r"défunt", r"legs", r"testament" ], DocumentType.CNI: [ r"carte\s+d'identité", r"carte\s+nationale\s+d'identité", r"république\s+française", r"ministère\s+de\s+l'intérieur", r"nom.*prénom.*né.*le" ], DocumentType.CONTRAT: [ r"contrat\s+de\s+", r"convention", r"accord", r"engagement", r"obligation" ] } def _load_keywords(self) -> Dict[str, List[str]]: """ Mots-clés spécifiques par type de document """ return { DocumentType.ACTE_VENTE: [ "vendeur", "acheteur", "prix", "vente", "acquisition", "immobilier", "appartement", "maison", "terrain" ], DocumentType.ACTE_DONATION: [ "donateur", "donataire", "donation", "don", "gratuit" ], DocumentType.ACTE_SUCCESSION: [ "héritier", "défunt", "succession", "legs", "testament", "notoriété", "décès" ], DocumentType.CNI: [ "carte", "identité", "république", "française", "ministère", "intérieur", "né", "nationalité" ], DocumentType.CONTRAT: [ "contrat", "convention", "accord", "engagement", "obligation", "parties", "clause" ] } async def classify_document( self, text: str, expected_type: Optional[DocumentType] = None, force_reclassification: bool = False ) -> Dict[str, Any]: """ Classification d'un document notarial """ logger.info("Début de la classification du document") try: # 1. Classification par règles (rapide) rule_based_result = self._classify_by_rules(text) # 2. Classification par LLM (plus précise) llm_result = await self._classify_by_llm(text, expected_type) # 3. Fusion des résultats final_result = self._merge_classification_results( rule_based_result, llm_result, expected_type ) # 4. Validation du résultat validated_result = self._validate_classification(final_result, text) logger.info(f"Classification terminée: {validated_result['type']} (confiance: {validated_result['confidence']:.2f})") return validated_result except Exception as e: logger.error(f"Erreur lors de la classification: {e}") # Retour d'un résultat par défaut return { "type": DocumentType.AUTRE, "confidence": 0.0, "method": "error", "error": str(e) } def _classify_by_rules(self, text: str) -> Dict[str, Any]: """ Classification basée sur des règles et mots-clés """ text_lower = text.lower() scores = {} # Calcul des scores par type for doc_type, patterns in self.classification_rules.items(): score = 0 matches = [] # Score basé sur les expressions régulières for pattern in patterns: if re.search(pattern, text_lower): score += 2 matches.append(pattern) # Score basé sur les mots-clés keywords = self.keywords.get(doc_type, []) for keyword in keywords: if keyword in text_lower: score += 1 matches.append(keyword) scores[doc_type] = { "score": score, "matches": matches } # Détermination du type avec le meilleur score if scores: best_type = max(scores.keys(), key=lambda k: scores[k]["score"]) best_score = scores[best_type]["score"] # Normalisation du score (0-1) max_possible_score = max( len(self.classification_rules.get(doc_type, [])) * 2 + len(self.keywords.get(doc_type, [])) for doc_type in DocumentType ) confidence = min(best_score / max_possible_score, 1.0) if max_possible_score > 0 else 0.0 return { "type": best_type, "confidence": confidence, "method": "rules", "scores": scores, "matches": scores[best_type]["matches"] } else: return { "type": DocumentType.AUTRE, "confidence": 0.0, "method": "rules", "scores": scores } async def _classify_by_llm(self, text: str, expected_type: Optional[DocumentType] = None) -> Dict[str, Any]: """ Classification par LLM (Ollama) """ try: # Préparation du prompt prompt = self._build_classification_prompt(text, expected_type) # Appel au LLM response = await self.llm_client.generate_response(prompt) # Parsing de la réponse result = self._parse_llm_classification_response(response) return result except Exception as e: logger.error(f"Erreur lors de la classification LLM: {e}") return { "type": DocumentType.AUTRE, "confidence": 0.0, "method": "llm_error", "error": str(e) } def _build_classification_prompt(self, text: str, expected_type: Optional[DocumentType] = None) -> str: """ Construction du prompt pour la classification LLM """ # Limitation du texte pour éviter les tokens excessifs text_sample = text[:2000] + "..." if len(text) > 2000 else text prompt = f""" Tu es un expert en documents notariaux. Analyse le texte suivant et détermine son type. Types possibles: - acte_vente: Acte de vente immobilière - acte_donation: Acte de donation - acte_succession: Acte de succession ou de notoriété - cni: Carte nationale d'identité - contrat: Contrat ou convention - autre: Autre type de document Texte à analyser: {text_sample} Réponds UNIQUEMENT avec un JSON dans ce format: {{ "type": "type_detecte", "confidence": 0.95, "reasoning": "explication courte de la décision", "key_indicators": ["indicateur1", "indicateur2"] }} """ if expected_type: prompt += f"\n\nType attendu: {expected_type.value}" return prompt def _parse_llm_classification_response(self, response: str) -> Dict[str, Any]: """ Parse la réponse du LLM pour la classification """ try: # Extraction du JSON de la réponse json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: json_str = json_match.group(0) result = json.loads(json_str) # Validation du type if result.get("type") in [t.value for t in DocumentType]: return { "type": DocumentType(result["type"]), "confidence": float(result.get("confidence", 0.0)), "method": "llm", "reasoning": result.get("reasoning", ""), "key_indicators": result.get("key_indicators", []) } # Fallback si le parsing échoue return { "type": DocumentType.AUTRE, "confidence": 0.0, "method": "llm_parse_error", "raw_response": response } except Exception as e: logger.error(f"Erreur lors du parsing de la réponse LLM: {e}") return { "type": DocumentType.AUTRE, "confidence": 0.0, "method": "llm_parse_error", "error": str(e), "raw_response": response } def _merge_classification_results( self, rule_result: Dict[str, Any], llm_result: Dict[str, Any], expected_type: Optional[DocumentType] ) -> Dict[str, Any]: """ Fusion des résultats de classification """ # Poids des différentes méthodes rule_weight = 0.3 llm_weight = 0.7 # Si un type est attendu et correspond, bonus de confiance expected_bonus = 0.0 if expected_type: if rule_result["type"] == expected_type: expected_bonus += 0.1 if llm_result["type"] == expected_type: expected_bonus += 0.1 # Calcul de la confiance fusionnée if rule_result["type"] == llm_result["type"]: # Accord entre les méthodes confidence = (rule_result["confidence"] * rule_weight + llm_result["confidence"] * llm_weight) + expected_bonus final_type = rule_result["type"] else: # Désaccord, on privilégie le LLM confidence = llm_result["confidence"] * llm_weight + expected_bonus final_type = llm_result["type"] return { "type": final_type, "confidence": min(confidence, 1.0), "method": "merged", "rule_result": rule_result, "llm_result": llm_result, "expected_type": expected_type, "expected_bonus": expected_bonus } def _validate_classification(self, result: Dict[str, Any], text: str) -> Dict[str, Any]: """ Validation finale de la classification """ # Vérifications de cohérence type_ = result["type"] confidence = result["confidence"] # Validation spécifique par type if type_ == DocumentType.CNI: # Vérification des éléments obligatoires d'une CNI cni_indicators = ["république", "française", "carte", "identité"] if not any(indicator in text.lower() for indicator in cni_indicators): confidence *= 0.5 # Réduction de confiance elif type_ in [DocumentType.ACTE_VENTE, DocumentType.ACTE_DONATION, DocumentType.ACTE_SUCCESSION]: # Vérification de la présence d'éléments notariaux notarial_indicators = ["notaire", "étude", "acte", "authentique"] if not any(indicator in text.lower() for indicator in notarial_indicators): confidence *= 0.7 # Réduction modérée # Seuil minimum de confiance if confidence < 0.3: result["type"] = DocumentType.AUTRE result["confidence"] = 0.3 result["validation_note"] = "Confiance trop faible, classé comme 'autre'" return result