4NK_IA_back/services/host_api/utils/entity_extractor.py
ncantu 447357d41a feat: Implémentation complète du système notarial 4NK avec IA
- API FastAPI complète pour le traitement de documents notariaux
- Pipeline OCR avec correction lexicale notariale
- Classification automatique des documents (règles + LLM)
- Extraction d'entités (identités, adresses, biens, montants)
- Intégration de 6 APIs externes (Cadastre, Géorisques, BODACC, etc.)
- Système de vérification et score de vraisemblance
- Analyse contextuelle via LLM (Ollama)
- Interface web moderne avec drag & drop
- Tests complets et documentation exhaustive
- Scripts de déploiement automatisés

Types de documents supportés:
- Acte de vente, donation, succession
- CNI avec détection du pays
- Contrats divers

Fonctionnalités:
- Upload et traitement asynchrone
- Vérifications externes automatiques
- Score de vraisemblance (0-1)
- Recommandations personnalisées
- Tableaux de bord et statistiques

Prêt pour la production avec démarrage en une commande.
2025-09-09 03:48:56 +02:00

517 lines
17 KiB
Python

"""
Extracteur d'entités pour les documents notariaux
"""
import asyncio
import logging
import re
import json
from typing import Dict, Any, List, Optional
from datetime import datetime
from dataclasses import dataclass
from utils.llm_client import LLMClient
logger = logging.getLogger(__name__)
@dataclass
class Person:
"""Représentation d'une personne"""
nom: str
prenom: str
type: str # vendeur, acheteur, héritier, etc.
adresse: Optional[str] = None
date_naissance: Optional[str] = None
lieu_naissance: Optional[str] = None
profession: Optional[str] = None
confidence: float = 0.0
@dataclass
class Address:
"""Représentation d'une adresse"""
adresse_complete: str
numero: Optional[str] = None
rue: Optional[str] = None
code_postal: Optional[str] = None
ville: Optional[str] = None
type: str = "adresse" # bien_vendu, domicile, etc.
confidence: float = 0.0
@dataclass
class Property:
"""Représentation d'un bien"""
description: str
type_bien: str # appartement, maison, terrain, etc.
surface: Optional[str] = None
prix: Optional[str] = None
adresse: Optional[str] = None
confidence: float = 0.0
@dataclass
class Company:
"""Représentation d'une entreprise"""
nom: str
siret: Optional[str] = None
adresse: Optional[str] = None
representant: Optional[str] = None
confidence: float = 0.0
class EntityExtractor:
"""Extracteur d'entités spécialisé pour les documents notariaux"""
def __init__(self):
self.llm_client = LLMClient()
self.patterns = self._load_extraction_patterns()
def _load_extraction_patterns(self) -> Dict[str, List[str]]:
"""
Patterns d'extraction par expressions régulières
"""
return {
"personnes": [
r"(?:M\.|Mme|Mademoiselle)\s+([A-Z][a-z]+)\s+([A-Z][a-z]+)",
r"([A-Z][A-Z\s]+)\s+([A-Z][a-z]+)",
r"nom[:\s]+([A-Z][a-z]+)\s+prénom[:\s]+([A-Z][a-z]+)"
],
"adresses": [
r"(\d+[,\s]*[a-zA-Z\s]+(?:rue|avenue|boulevard|place|chemin|impasse)[,\s]*[^,]+)",
r"adresse[:\s]+([^,\n]+)",
r"domicilié[:\s]+([^,\n]+)"
],
"montants": [
r"(\d+(?:\s?\d{3})*(?:[.,]\d{2})?)\s*(?:euros?|€|EUR)",
r"prix[:\s]+(\d+(?:\s?\d{3})*(?:[.,]\d{2})?)\s*(?:euros?|€|EUR)",
r"(\d+(?:\s?\d{3})*(?:[.,]\d{2})?)\s*(?:francs?|F)"
],
"dates": [
r"(\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{4})",
r"(\d{1,2}\s+(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)\s+\d{4})",
r"\s+(?:le\s+)?(\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{4})"
],
"surfaces": [
r"(\d+(?:[.,]\d+)?)\s*(?:m²|m2|mètres?\s+carrés?)",
r"surface[:\s]+(\d+(?:[.,]\d+)?)\s*(?:m²|m2|mètres?\s+carrés?)"
],
"siret": [
r"(\d{3}\s?\d{3}\s?\d{3}\s?\d{5})",
r"SIRET[:\s]+(\d{3}\s?\d{3}\s?\d{3}\s?\d{5})"
]
}
async def extract_entities(self, text: str, document_type: str) -> Dict[str, Any]:
"""
Extraction complète des entités d'un document
"""
logger.info(f"Extraction des entités pour un document de type: {document_type}")
try:
# 1. Extraction par patterns (rapide)
pattern_entities = self._extract_by_patterns(text)
# 2. Extraction par LLM (plus précise)
llm_entities = await self._extract_by_llm(text, document_type)
# 3. Fusion et validation
final_entities = self._merge_entities(pattern_entities, llm_entities)
# 4. Post-traitement spécifique au type de document
processed_entities = self._post_process_entities(final_entities, document_type)
logger.info(f"Extraction terminée: {len(processed_entities.get('identites', []))} identités, "
f"{len(processed_entities.get('adresses', []))} adresses")
return processed_entities
except Exception as e:
logger.error(f"Erreur lors de l'extraction des entités: {e}")
return {
"identites": [],
"adresses": [],
"biens": [],
"entreprises": [],
"montants": [],
"dates": [],
"error": str(e)
}
def _extract_by_patterns(self, text: str) -> Dict[str, List[Any]]:
"""
Extraction basée sur des patterns regex
"""
entities = {
"identites": [],
"adresses": [],
"montants": [],
"dates": [],
"surfaces": [],
"siret": []
}
# Extraction des personnes
for pattern in self.patterns["personnes"]:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
if len(match.groups()) >= 2:
person = Person(
nom=match.group(1).strip(),
prenom=match.group(2).strip(),
type="personne",
confidence=0.7
)
entities["identites"].append(person.__dict__)
# Extraction des adresses
for pattern in self.patterns["adresses"]:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
address = Address(
adresse_complete=match.group(1).strip(),
type="adresse",
confidence=0.7
)
entities["adresses"].append(address.__dict__)
# Extraction des montants
for pattern in self.patterns["montants"]:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
entities["montants"].append({
"montant": match.group(1).strip(),
"confidence": 0.8
})
# Extraction des dates
for pattern in self.patterns["dates"]:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
entities["dates"].append({
"date": match.group(1).strip(),
"confidence": 0.8
})
# Extraction des surfaces
for pattern in self.patterns["surfaces"]:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
entities["surfaces"].append({
"surface": match.group(1).strip(),
"confidence": 0.8
})
# Extraction des SIRET
for pattern in self.patterns["siret"]:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
entities["siret"].append({
"siret": match.group(1).strip(),
"confidence": 0.9
})
return entities
async def _extract_by_llm(self, text: str, document_type: str) -> Dict[str, Any]:
"""
Extraction par LLM (plus précise et contextuelle)
"""
try:
# Limitation du texte
text_sample = text[:3000] + "..." if len(text) > 3000 else text
prompt = self._build_extraction_prompt(text_sample, document_type)
response = await self.llm_client.generate_response(prompt)
return self._parse_llm_extraction_response(response)
except Exception as e:
logger.error(f"Erreur lors de l'extraction LLM: {e}")
return {}
def _build_extraction_prompt(self, text: str, document_type: str) -> str:
"""
Construction du prompt pour l'extraction LLM
"""
prompt = f"""
Tu es un expert en extraction d'entités pour documents notariaux.
Extrais toutes les entités pertinentes du texte suivant.
Type de document: {document_type}
Entités à extraire:
- identites: personnes (nom, prénom, type: vendeur/acheteur/héritier/etc.)
- adresses: adresses complètes avec type (bien_vendu/domicile/etc.)
- biens: descriptions de biens avec surface, prix si disponible
- entreprises: noms d'entreprises avec SIRET si disponible
- montants: tous les montants en euros ou francs
- dates: dates importantes (naissance, signature, etc.)
Texte à analyser:
{text}
Réponds UNIQUEMENT avec un JSON dans ce format:
{{
"identites": [
{{"nom": "DUPONT", "prenom": "Jean", "type": "vendeur", "confidence": 0.95}}
],
"adresses": [
{{"adresse_complete": "123 rue de la Paix, 75001 Paris", "type": "bien_vendu", "confidence": 0.9}}
],
"biens": [
{{"description": "Appartement 3 pièces", "surface": "75m²", "prix": "250000€", "confidence": 0.9}}
],
"entreprises": [
{{"nom": "SARL EXAMPLE", "siret": "12345678901234", "confidence": 0.8}}
],
"montants": [
{{"montant": "250000", "devise": "euros", "confidence": 0.9}}
],
"dates": [
{{"date": "15/03/1980", "type": "naissance", "confidence": 0.8}}
]
}}
"""
return prompt
def _parse_llm_extraction_response(self, response: str) -> Dict[str, Any]:
"""
Parse la réponse du LLM pour l'extraction
"""
try:
# Extraction du JSON
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group(0)
return json.loads(json_str)
return {}
except Exception as e:
logger.error(f"Erreur lors du parsing de la réponse LLM: {e}")
return {}
def _merge_entities(self, pattern_entities: Dict[str, List[Any]], llm_entities: Dict[str, Any]) -> Dict[str, List[Any]]:
"""
Fusion des entités extraites par patterns et LLM
"""
merged = {
"identites": [],
"adresses": [],
"biens": [],
"entreprises": [],
"montants": [],
"dates": []
}
# Fusion des identités
merged["identites"] = self._merge_identities(
pattern_entities.get("identites", []),
llm_entities.get("identites", [])
)
# Fusion des adresses
merged["adresses"] = self._merge_addresses(
pattern_entities.get("adresses", []),
llm_entities.get("adresses", [])
)
# Fusion des montants
merged["montants"] = self._merge_simple_entities(
pattern_entities.get("montants", []),
llm_entities.get("montants", [])
)
# Fusion des dates
merged["dates"] = self._merge_simple_entities(
pattern_entities.get("dates", []),
llm_entities.get("dates", [])
)
# Entités spécifiques au LLM
merged["biens"] = llm_entities.get("biens", [])
merged["entreprises"] = llm_entities.get("entreprises", [])
return merged
def _merge_identities(self, pattern_identities: List[Dict], llm_identities: List[Dict]) -> List[Dict]:
"""
Fusion des identités avec déduplication
"""
merged = []
# Ajout des identités LLM (priorité)
for identity in llm_identities:
merged.append(identity)
# Ajout des identités pattern si pas de doublon
for identity in pattern_identities:
if not self._is_duplicate_identity(identity, merged):
merged.append(identity)
return merged
def _merge_addresses(self, pattern_addresses: List[Dict], llm_addresses: List[Dict]) -> List[Dict]:
"""
Fusion des adresses avec déduplication
"""
merged = []
# Ajout des adresses LLM (priorité)
for address in llm_addresses:
merged.append(address)
# Ajout des adresses pattern si pas de doublon
for address in pattern_addresses:
if not self._is_duplicate_address(address, merged):
merged.append(address)
return merged
def _merge_simple_entities(self, pattern_entities: List[Dict], llm_entities: List[Dict]) -> List[Dict]:
"""
Fusion d'entités simples (montants, dates)
"""
merged = []
# Ajout des entités LLM
merged.extend(llm_entities)
# Ajout des entités pattern si pas de doublon
for entity in pattern_entities:
if not self._is_duplicate_simple_entity(entity, merged):
merged.append(entity)
return merged
def _is_duplicate_identity(self, identity: Dict, existing: List[Dict]) -> bool:
"""
Vérifie si une identité est un doublon
"""
for existing_identity in existing:
if (existing_identity.get("nom", "").lower() == identity.get("nom", "").lower() and
existing_identity.get("prenom", "").lower() == identity.get("prenom", "").lower()):
return True
return False
def _is_duplicate_address(self, address: Dict, existing: List[Dict]) -> bool:
"""
Vérifie si une adresse est un doublon
"""
for existing_address in existing:
if existing_address.get("adresse_complete", "").lower() == address.get("adresse_complete", "").lower():
return True
return False
def _is_duplicate_simple_entity(self, entity: Dict, existing: List[Dict]) -> bool:
"""
Vérifie si une entité simple est un doublon
"""
entity_value = None
for key in entity:
if key != "confidence":
entity_value = entity[key]
break
if entity_value:
for existing_entity in existing:
for key in existing_entity:
if key != "confidence" and existing_entity[key] == entity_value:
return True
return False
def _post_process_entities(self, entities: Dict[str, List[Any]], document_type: str) -> Dict[str, List[Any]]:
"""
Post-traitement spécifique au type de document
"""
# Classification des identités selon le type de document
if document_type == "acte_vente":
entities["identites"] = self._classify_identities_vente(entities["identites"])
elif document_type == "acte_donation":
entities["identites"] = self._classify_identities_donation(entities["identites"])
elif document_type == "acte_succession":
entities["identites"] = self._classify_identities_succession(entities["identites"])
# Nettoyage et validation
entities = self._clean_entities(entities)
return entities
def _classify_identities_vente(self, identities: List[Dict]) -> List[Dict]:
"""
Classification des identités pour un acte de vente
"""
for identity in identities:
if identity.get("type") == "personne":
# Logique simple basée sur le contexte
# TODO: Améliorer avec plus de contexte
identity["type"] = "partie"
return identities
def _classify_identities_donation(self, identities: List[Dict]) -> List[Dict]:
"""
Classification des identités pour un acte de donation
"""
for identity in identities:
if identity.get("type") == "personne":
identity["type"] = "partie"
return identities
def _classify_identities_succession(self, identities: List[Dict]) -> List[Dict]:
"""
Classification des identités pour un acte de succession
"""
for identity in identities:
if identity.get("type") == "personne":
identity["type"] = "héritier"
return identities
def _clean_entities(self, entities: Dict[str, List[Any]]) -> Dict[str, List[Any]]:
"""
Nettoyage et validation des entités
"""
cleaned = {}
for entity_type, entity_list in entities.items():
cleaned[entity_type] = []
for entity in entity_list:
# Validation basique
if self._is_valid_entity(entity, entity_type):
# Nettoyage des valeurs
cleaned_entity = self._clean_entity_values(entity)
cleaned[entity_type].append(cleaned_entity)
return cleaned
def _is_valid_entity(self, entity: Dict, entity_type: str) -> bool:
"""
Validation d'une entité
"""
if entity_type == "identites":
return bool(entity.get("nom") and entity.get("prenom"))
elif entity_type == "adresses":
return bool(entity.get("adresse_complete"))
elif entity_type == "montants":
return bool(entity.get("montant"))
elif entity_type == "dates":
return bool(entity.get("date"))
return True
def _clean_entity_values(self, entity: Dict) -> Dict:
"""
Nettoyage des valeurs d'une entité
"""
cleaned = {}
for key, value in entity.items():
if isinstance(value, str):
# Nettoyage des chaînes
cleaned_value = value.strip()
cleaned_value = re.sub(r'\s+', ' ', cleaned_value) # Espaces multiples
cleaned[key] = cleaned_value
else:
cleaned[key] = value
return cleaned