""" Extracteur d'entités pour les documents notariaux """ import asyncio import logging import re import json from typing import Dict, Any, List, Optional from datetime import datetime from dataclasses import dataclass from utils.llm_client import LLMClient logger = logging.getLogger(__name__) @dataclass class Person: """Représentation d'une personne""" nom: str prenom: str type: str # vendeur, acheteur, héritier, etc. adresse: Optional[str] = None date_naissance: Optional[str] = None lieu_naissance: Optional[str] = None profession: Optional[str] = None confidence: float = 0.0 @dataclass class Address: """Représentation d'une adresse""" adresse_complete: str numero: Optional[str] = None rue: Optional[str] = None code_postal: Optional[str] = None ville: Optional[str] = None type: str = "adresse" # bien_vendu, domicile, etc. confidence: float = 0.0 @dataclass class Property: """Représentation d'un bien""" description: str type_bien: str # appartement, maison, terrain, etc. surface: Optional[str] = None prix: Optional[str] = None adresse: Optional[str] = None confidence: float = 0.0 @dataclass class Company: """Représentation d'une entreprise""" nom: str siret: Optional[str] = None adresse: Optional[str] = None representant: Optional[str] = None confidence: float = 0.0 class EntityExtractor: """Extracteur d'entités spécialisé pour les documents notariaux""" def __init__(self): self.llm_client = LLMClient() self.patterns = self._load_extraction_patterns() def _load_extraction_patterns(self) -> Dict[str, List[str]]: """ Patterns d'extraction par expressions régulières """ return { "personnes": [ r"(?:M\.|Mme|Mademoiselle)\s+([A-Z][a-z]+)\s+([A-Z][a-z]+)", r"([A-Z][A-Z\s]+)\s+([A-Z][a-z]+)", r"nom[:\s]+([A-Z][a-z]+)\s+prénom[:\s]+([A-Z][a-z]+)" ], "adresses": [ r"(\d+[,\s]*[a-zA-Z\s]+(?:rue|avenue|boulevard|place|chemin|impasse)[,\s]*[^,]+)", r"adresse[:\s]+([^,\n]+)", r"domicilié[:\s]+([^,\n]+)" ], "montants": [ r"(\d+(?:\s?\d{3})*(?:[.,]\d{2})?)\s*(?:euros?|€|EUR)", r"prix[:\s]+(\d+(?:\s?\d{3})*(?:[.,]\d{2})?)\s*(?:euros?|€|EUR)", r"(\d+(?:\s?\d{3})*(?:[.,]\d{2})?)\s*(?:francs?|F)" ], "dates": [ r"(\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{4})", r"(\d{1,2}\s+(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)\s+\d{4})", r"né\s+(?:le\s+)?(\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{4})" ], "surfaces": [ r"(\d+(?:[.,]\d+)?)\s*(?:m²|m2|mètres?\s+carrés?)", r"surface[:\s]+(\d+(?:[.,]\d+)?)\s*(?:m²|m2|mètres?\s+carrés?)" ], "siret": [ r"(\d{3}\s?\d{3}\s?\d{3}\s?\d{5})", r"SIRET[:\s]+(\d{3}\s?\d{3}\s?\d{3}\s?\d{5})" ] } async def extract_entities(self, text: str, document_type: str) -> Dict[str, Any]: """ Extraction complète des entités d'un document """ logger.info(f"Extraction des entités pour un document de type: {document_type}") try: # 1. Extraction par patterns (rapide) pattern_entities = self._extract_by_patterns(text) # 2. Extraction par LLM (plus précise) llm_entities = await self._extract_by_llm(text, document_type) # 3. Fusion et validation final_entities = self._merge_entities(pattern_entities, llm_entities) # 4. Post-traitement spécifique au type de document processed_entities = self._post_process_entities(final_entities, document_type) logger.info(f"Extraction terminée: {len(processed_entities.get('identites', []))} identités, " f"{len(processed_entities.get('adresses', []))} adresses") return processed_entities except Exception as e: logger.error(f"Erreur lors de l'extraction des entités: {e}") return { "identites": [], "adresses": [], "biens": [], "entreprises": [], "montants": [], "dates": [], "error": str(e) } def _extract_by_patterns(self, text: str) -> Dict[str, List[Any]]: """ Extraction basée sur des patterns regex """ entities = { "identites": [], "adresses": [], "montants": [], "dates": [], "surfaces": [], "siret": [] } # Extraction des personnes for pattern in self.patterns["personnes"]: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: if len(match.groups()) >= 2: person = Person( nom=match.group(1).strip(), prenom=match.group(2).strip(), type="personne", confidence=0.7 ) entities["identites"].append(person.__dict__) # Extraction des adresses for pattern in self.patterns["adresses"]: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: address = Address( adresse_complete=match.group(1).strip(), type="adresse", confidence=0.7 ) entities["adresses"].append(address.__dict__) # Extraction des montants for pattern in self.patterns["montants"]: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: entities["montants"].append({ "montant": match.group(1).strip(), "confidence": 0.8 }) # Extraction des dates for pattern in self.patterns["dates"]: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: entities["dates"].append({ "date": match.group(1).strip(), "confidence": 0.8 }) # Extraction des surfaces for pattern in self.patterns["surfaces"]: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: entities["surfaces"].append({ "surface": match.group(1).strip(), "confidence": 0.8 }) # Extraction des SIRET for pattern in self.patterns["siret"]: matches = re.finditer(pattern, text, re.IGNORECASE) for match in matches: entities["siret"].append({ "siret": match.group(1).strip(), "confidence": 0.9 }) return entities async def _extract_by_llm(self, text: str, document_type: str) -> Dict[str, Any]: """ Extraction par LLM (plus précise et contextuelle) """ try: # Limitation du texte text_sample = text[:3000] + "..." if len(text) > 3000 else text prompt = self._build_extraction_prompt(text_sample, document_type) response = await self.llm_client.generate_response(prompt) return self._parse_llm_extraction_response(response) except Exception as e: logger.error(f"Erreur lors de l'extraction LLM: {e}") return {} def _build_extraction_prompt(self, text: str, document_type: str) -> str: """ Construction du prompt pour l'extraction LLM """ prompt = f""" Tu es un expert en extraction d'entités pour documents notariaux. Extrais toutes les entités pertinentes du texte suivant. Type de document: {document_type} Entités à extraire: - identites: personnes (nom, prénom, type: vendeur/acheteur/héritier/etc.) - adresses: adresses complètes avec type (bien_vendu/domicile/etc.) - biens: descriptions de biens avec surface, prix si disponible - entreprises: noms d'entreprises avec SIRET si disponible - montants: tous les montants en euros ou francs - dates: dates importantes (naissance, signature, etc.) Texte à analyser: {text} Réponds UNIQUEMENT avec un JSON dans ce format: {{ "identites": [ {{"nom": "DUPONT", "prenom": "Jean", "type": "vendeur", "confidence": 0.95}} ], "adresses": [ {{"adresse_complete": "123 rue de la Paix, 75001 Paris", "type": "bien_vendu", "confidence": 0.9}} ], "biens": [ {{"description": "Appartement 3 pièces", "surface": "75m²", "prix": "250000€", "confidence": 0.9}} ], "entreprises": [ {{"nom": "SARL EXAMPLE", "siret": "12345678901234", "confidence": 0.8}} ], "montants": [ {{"montant": "250000", "devise": "euros", "confidence": 0.9}} ], "dates": [ {{"date": "15/03/1980", "type": "naissance", "confidence": 0.8}} ] }} """ return prompt def _parse_llm_extraction_response(self, response: str) -> Dict[str, Any]: """ Parse la réponse du LLM pour l'extraction """ try: # Extraction du JSON json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: json_str = json_match.group(0) return json.loads(json_str) return {} except Exception as e: logger.error(f"Erreur lors du parsing de la réponse LLM: {e}") return {} def _merge_entities(self, pattern_entities: Dict[str, List[Any]], llm_entities: Dict[str, Any]) -> Dict[str, List[Any]]: """ Fusion des entités extraites par patterns et LLM """ merged = { "identites": [], "adresses": [], "biens": [], "entreprises": [], "montants": [], "dates": [] } # Fusion des identités merged["identites"] = self._merge_identities( pattern_entities.get("identites", []), llm_entities.get("identites", []) ) # Fusion des adresses merged["adresses"] = self._merge_addresses( pattern_entities.get("adresses", []), llm_entities.get("adresses", []) ) # Fusion des montants merged["montants"] = self._merge_simple_entities( pattern_entities.get("montants", []), llm_entities.get("montants", []) ) # Fusion des dates merged["dates"] = self._merge_simple_entities( pattern_entities.get("dates", []), llm_entities.get("dates", []) ) # Entités spécifiques au LLM merged["biens"] = llm_entities.get("biens", []) merged["entreprises"] = llm_entities.get("entreprises", []) return merged def _merge_identities(self, pattern_identities: List[Dict], llm_identities: List[Dict]) -> List[Dict]: """ Fusion des identités avec déduplication """ merged = [] # Ajout des identités LLM (priorité) for identity in llm_identities: merged.append(identity) # Ajout des identités pattern si pas de doublon for identity in pattern_identities: if not self._is_duplicate_identity(identity, merged): merged.append(identity) return merged def _merge_addresses(self, pattern_addresses: List[Dict], llm_addresses: List[Dict]) -> List[Dict]: """ Fusion des adresses avec déduplication """ merged = [] # Ajout des adresses LLM (priorité) for address in llm_addresses: merged.append(address) # Ajout des adresses pattern si pas de doublon for address in pattern_addresses: if not self._is_duplicate_address(address, merged): merged.append(address) return merged def _merge_simple_entities(self, pattern_entities: List[Dict], llm_entities: List[Dict]) -> List[Dict]: """ Fusion d'entités simples (montants, dates) """ merged = [] # Ajout des entités LLM merged.extend(llm_entities) # Ajout des entités pattern si pas de doublon for entity in pattern_entities: if not self._is_duplicate_simple_entity(entity, merged): merged.append(entity) return merged def _is_duplicate_identity(self, identity: Dict, existing: List[Dict]) -> bool: """ Vérifie si une identité est un doublon """ for existing_identity in existing: if (existing_identity.get("nom", "").lower() == identity.get("nom", "").lower() and existing_identity.get("prenom", "").lower() == identity.get("prenom", "").lower()): return True return False def _is_duplicate_address(self, address: Dict, existing: List[Dict]) -> bool: """ Vérifie si une adresse est un doublon """ for existing_address in existing: if existing_address.get("adresse_complete", "").lower() == address.get("adresse_complete", "").lower(): return True return False def _is_duplicate_simple_entity(self, entity: Dict, existing: List[Dict]) -> bool: """ Vérifie si une entité simple est un doublon """ entity_value = None for key in entity: if key != "confidence": entity_value = entity[key] break if entity_value: for existing_entity in existing: for key in existing_entity: if key != "confidence" and existing_entity[key] == entity_value: return True return False def _post_process_entities(self, entities: Dict[str, List[Any]], document_type: str) -> Dict[str, List[Any]]: """ Post-traitement spécifique au type de document """ # Classification des identités selon le type de document if document_type == "acte_vente": entities["identites"] = self._classify_identities_vente(entities["identites"]) elif document_type == "acte_donation": entities["identites"] = self._classify_identities_donation(entities["identites"]) elif document_type == "acte_succession": entities["identites"] = self._classify_identities_succession(entities["identites"]) # Nettoyage et validation entities = self._clean_entities(entities) return entities def _classify_identities_vente(self, identities: List[Dict]) -> List[Dict]: """ Classification des identités pour un acte de vente """ for identity in identities: if identity.get("type") == "personne": # Logique simple basée sur le contexte # TODO: Améliorer avec plus de contexte identity["type"] = "partie" return identities def _classify_identities_donation(self, identities: List[Dict]) -> List[Dict]: """ Classification des identités pour un acte de donation """ for identity in identities: if identity.get("type") == "personne": identity["type"] = "partie" return identities def _classify_identities_succession(self, identities: List[Dict]) -> List[Dict]: """ Classification des identités pour un acte de succession """ for identity in identities: if identity.get("type") == "personne": identity["type"] = "héritier" return identities def _clean_entities(self, entities: Dict[str, List[Any]]) -> Dict[str, List[Any]]: """ Nettoyage et validation des entités """ cleaned = {} for entity_type, entity_list in entities.items(): cleaned[entity_type] = [] for entity in entity_list: # Validation basique if self._is_valid_entity(entity, entity_type): # Nettoyage des valeurs cleaned_entity = self._clean_entity_values(entity) cleaned[entity_type].append(cleaned_entity) return cleaned def _is_valid_entity(self, entity: Dict, entity_type: str) -> bool: """ Validation d'une entité """ if entity_type == "identites": return bool(entity.get("nom") and entity.get("prenom")) elif entity_type == "adresses": return bool(entity.get("adresse_complete")) elif entity_type == "montants": return bool(entity.get("montant")) elif entity_type == "dates": return bool(entity.get("date")) return True def _clean_entity_values(self, entity: Dict) -> Dict: """ Nettoyage des valeurs d'une entité """ cleaned = {} for key, value in entity.items(): if isinstance(value, str): # Nettoyage des chaînes cleaned_value = value.strip() cleaned_value = re.sub(r'\s+', ' ', cleaned_value) # Espaces multiples cleaned[key] = cleaned_value else: cleaned[key] = value return cleaned