
- API FastAPI complète pour le traitement de documents notariaux - Pipeline OCR avec correction lexicale notariale - Classification automatique des documents (règles + LLM) - Extraction d'entités (identités, adresses, biens, montants) - Intégration de 6 APIs externes (Cadastre, Géorisques, BODACC, etc.) - Système de vérification et score de vraisemblance - Analyse contextuelle via LLM (Ollama) - Interface web moderne avec drag & drop - Tests complets et documentation exhaustive - Scripts de déploiement automatisés Types de documents supportés: - Acte de vente, donation, succession - CNI avec détection du pays - Contrats divers Fonctionnalités: - Upload et traitement asynchrone - Vérifications externes automatiques - Score de vraisemblance (0-1) - Recommandations personnalisées - Tableaux de bord et statistiques Prêt pour la production avec démarrage en une commande.
369 lines
12 KiB
Python
369 lines
12 KiB
Python
"""
|
|
Classificateur de documents notariaux
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import json
|
|
import re
|
|
from typing import Dict, Any, Optional, List
|
|
from enum import Enum
|
|
|
|
import requests
|
|
from utils.llm_client import LLMClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DocumentType(str, Enum):
|
|
"""Types de documents notariaux"""
|
|
ACTE_VENTE = "acte_vente"
|
|
ACTE_DONATION = "acte_donation"
|
|
ACTE_SUCCESSION = "acte_succession"
|
|
CNI = "cni"
|
|
CONTRAT = "contrat"
|
|
AUTRE = "autre"
|
|
|
|
class DocumentClassifier:
|
|
"""Classificateur de documents notariaux avec LLM et règles"""
|
|
|
|
def __init__(self):
|
|
self.llm_client = LLMClient()
|
|
self.classification_rules = self._load_classification_rules()
|
|
self.keywords = self._load_keywords()
|
|
|
|
def _load_classification_rules(self) -> Dict[str, List[str]]:
|
|
"""
|
|
Règles de classification basées sur des mots-clés
|
|
"""
|
|
return {
|
|
DocumentType.ACTE_VENTE: [
|
|
r"acte\s+de\s+vente",
|
|
r"vente\s+immobilière",
|
|
r"vendeur.*acheteur",
|
|
r"prix\s+de\s+vente",
|
|
r"acquisition\s+immobilière"
|
|
],
|
|
DocumentType.ACTE_DONATION: [
|
|
r"acte\s+de\s+donation",
|
|
r"donation\s+entre\s+vifs",
|
|
r"donateur.*donataire",
|
|
r"donation\s+partage"
|
|
],
|
|
DocumentType.ACTE_SUCCESSION: [
|
|
r"acte\s+de\s+notoriété",
|
|
r"succession",
|
|
r"héritier",
|
|
r"héritiers",
|
|
r"défunt",
|
|
r"legs",
|
|
r"testament"
|
|
],
|
|
DocumentType.CNI: [
|
|
r"carte\s+d'identité",
|
|
r"carte\s+nationale\s+d'identité",
|
|
r"république\s+française",
|
|
r"ministère\s+de\s+l'intérieur",
|
|
r"nom.*prénom.*né.*le"
|
|
],
|
|
DocumentType.CONTRAT: [
|
|
r"contrat\s+de\s+",
|
|
r"convention",
|
|
r"accord",
|
|
r"engagement",
|
|
r"obligation"
|
|
]
|
|
}
|
|
|
|
def _load_keywords(self) -> Dict[str, List[str]]:
|
|
"""
|
|
Mots-clés spécifiques par type de document
|
|
"""
|
|
return {
|
|
DocumentType.ACTE_VENTE: [
|
|
"vendeur", "acheteur", "prix", "vente", "acquisition",
|
|
"immobilier", "appartement", "maison", "terrain"
|
|
],
|
|
DocumentType.ACTE_DONATION: [
|
|
"donateur", "donataire", "donation", "don", "gratuit"
|
|
],
|
|
DocumentType.ACTE_SUCCESSION: [
|
|
"héritier", "défunt", "succession", "legs", "testament",
|
|
"notoriété", "décès"
|
|
],
|
|
DocumentType.CNI: [
|
|
"carte", "identité", "république", "française", "ministère",
|
|
"intérieur", "né", "nationalité"
|
|
],
|
|
DocumentType.CONTRAT: [
|
|
"contrat", "convention", "accord", "engagement", "obligation",
|
|
"parties", "clause"
|
|
]
|
|
}
|
|
|
|
async def classify_document(
|
|
self,
|
|
text: str,
|
|
expected_type: Optional[DocumentType] = None,
|
|
force_reclassification: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Classification d'un document notarial
|
|
"""
|
|
logger.info("Début de la classification du document")
|
|
|
|
try:
|
|
# 1. Classification par règles (rapide)
|
|
rule_based_result = self._classify_by_rules(text)
|
|
|
|
# 2. Classification par LLM (plus précise)
|
|
llm_result = await self._classify_by_llm(text, expected_type)
|
|
|
|
# 3. Fusion des résultats
|
|
final_result = self._merge_classification_results(
|
|
rule_based_result, llm_result, expected_type
|
|
)
|
|
|
|
# 4. Validation du résultat
|
|
validated_result = self._validate_classification(final_result, text)
|
|
|
|
logger.info(f"Classification terminée: {validated_result['type']} (confiance: {validated_result['confidence']:.2f})")
|
|
|
|
return validated_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la classification: {e}")
|
|
# Retour d'un résultat par défaut
|
|
return {
|
|
"type": DocumentType.AUTRE,
|
|
"confidence": 0.0,
|
|
"method": "error",
|
|
"error": str(e)
|
|
}
|
|
|
|
def _classify_by_rules(self, text: str) -> Dict[str, Any]:
|
|
"""
|
|
Classification basée sur des règles et mots-clés
|
|
"""
|
|
text_lower = text.lower()
|
|
scores = {}
|
|
|
|
# Calcul des scores par type
|
|
for doc_type, patterns in self.classification_rules.items():
|
|
score = 0
|
|
matches = []
|
|
|
|
# Score basé sur les expressions régulières
|
|
for pattern in patterns:
|
|
if re.search(pattern, text_lower):
|
|
score += 2
|
|
matches.append(pattern)
|
|
|
|
# Score basé sur les mots-clés
|
|
keywords = self.keywords.get(doc_type, [])
|
|
for keyword in keywords:
|
|
if keyword in text_lower:
|
|
score += 1
|
|
matches.append(keyword)
|
|
|
|
scores[doc_type] = {
|
|
"score": score,
|
|
"matches": matches
|
|
}
|
|
|
|
# Détermination du type avec le meilleur score
|
|
if scores:
|
|
best_type = max(scores.keys(), key=lambda k: scores[k]["score"])
|
|
best_score = scores[best_type]["score"]
|
|
|
|
# Normalisation du score (0-1)
|
|
max_possible_score = max(
|
|
len(self.classification_rules.get(doc_type, [])) * 2 +
|
|
len(self.keywords.get(doc_type, []))
|
|
for doc_type in DocumentType
|
|
)
|
|
confidence = min(best_score / max_possible_score, 1.0) if max_possible_score > 0 else 0.0
|
|
|
|
return {
|
|
"type": best_type,
|
|
"confidence": confidence,
|
|
"method": "rules",
|
|
"scores": scores,
|
|
"matches": scores[best_type]["matches"]
|
|
}
|
|
else:
|
|
return {
|
|
"type": DocumentType.AUTRE,
|
|
"confidence": 0.0,
|
|
"method": "rules",
|
|
"scores": scores
|
|
}
|
|
|
|
async def _classify_by_llm(self, text: str, expected_type: Optional[DocumentType] = None) -> Dict[str, Any]:
|
|
"""
|
|
Classification par LLM (Ollama)
|
|
"""
|
|
try:
|
|
# Préparation du prompt
|
|
prompt = self._build_classification_prompt(text, expected_type)
|
|
|
|
# Appel au LLM
|
|
response = await self.llm_client.generate_response(prompt)
|
|
|
|
# Parsing de la réponse
|
|
result = self._parse_llm_classification_response(response)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la classification LLM: {e}")
|
|
return {
|
|
"type": DocumentType.AUTRE,
|
|
"confidence": 0.0,
|
|
"method": "llm_error",
|
|
"error": str(e)
|
|
}
|
|
|
|
def _build_classification_prompt(self, text: str, expected_type: Optional[DocumentType] = None) -> str:
|
|
"""
|
|
Construction du prompt pour la classification LLM
|
|
"""
|
|
# Limitation du texte pour éviter les tokens excessifs
|
|
text_sample = text[:2000] + "..." if len(text) > 2000 else text
|
|
|
|
prompt = f"""
|
|
Tu es un expert en documents notariaux. Analyse le texte suivant et détermine son type.
|
|
|
|
Types possibles:
|
|
- acte_vente: Acte de vente immobilière
|
|
- acte_donation: Acte de donation
|
|
- acte_succession: Acte de succession ou de notoriété
|
|
- cni: Carte nationale d'identité
|
|
- contrat: Contrat ou convention
|
|
- autre: Autre type de document
|
|
|
|
Texte à analyser:
|
|
{text_sample}
|
|
|
|
Réponds UNIQUEMENT avec un JSON dans ce format:
|
|
{{
|
|
"type": "type_detecte",
|
|
"confidence": 0.95,
|
|
"reasoning": "explication courte de la décision",
|
|
"key_indicators": ["indicateur1", "indicateur2"]
|
|
}}
|
|
"""
|
|
|
|
if expected_type:
|
|
prompt += f"\n\nType attendu: {expected_type.value}"
|
|
|
|
return prompt
|
|
|
|
def _parse_llm_classification_response(self, response: str) -> Dict[str, Any]:
|
|
"""
|
|
Parse la réponse du LLM pour la classification
|
|
"""
|
|
try:
|
|
# Extraction du JSON de la réponse
|
|
json_match = re.search(r'\{.*\}', response, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group(0)
|
|
result = json.loads(json_str)
|
|
|
|
# Validation du type
|
|
if result.get("type") in [t.value for t in DocumentType]:
|
|
return {
|
|
"type": DocumentType(result["type"]),
|
|
"confidence": float(result.get("confidence", 0.0)),
|
|
"method": "llm",
|
|
"reasoning": result.get("reasoning", ""),
|
|
"key_indicators": result.get("key_indicators", [])
|
|
}
|
|
|
|
# Fallback si le parsing échoue
|
|
return {
|
|
"type": DocumentType.AUTRE,
|
|
"confidence": 0.0,
|
|
"method": "llm_parse_error",
|
|
"raw_response": response
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du parsing de la réponse LLM: {e}")
|
|
return {
|
|
"type": DocumentType.AUTRE,
|
|
"confidence": 0.0,
|
|
"method": "llm_parse_error",
|
|
"error": str(e),
|
|
"raw_response": response
|
|
}
|
|
|
|
def _merge_classification_results(
|
|
self,
|
|
rule_result: Dict[str, Any],
|
|
llm_result: Dict[str, Any],
|
|
expected_type: Optional[DocumentType]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Fusion des résultats de classification
|
|
"""
|
|
# Poids des différentes méthodes
|
|
rule_weight = 0.3
|
|
llm_weight = 0.7
|
|
|
|
# Si un type est attendu et correspond, bonus de confiance
|
|
expected_bonus = 0.0
|
|
if expected_type:
|
|
if rule_result["type"] == expected_type:
|
|
expected_bonus += 0.1
|
|
if llm_result["type"] == expected_type:
|
|
expected_bonus += 0.1
|
|
|
|
# Calcul de la confiance fusionnée
|
|
if rule_result["type"] == llm_result["type"]:
|
|
# Accord entre les méthodes
|
|
confidence = (rule_result["confidence"] * rule_weight +
|
|
llm_result["confidence"] * llm_weight) + expected_bonus
|
|
final_type = rule_result["type"]
|
|
else:
|
|
# Désaccord, on privilégie le LLM
|
|
confidence = llm_result["confidence"] * llm_weight + expected_bonus
|
|
final_type = llm_result["type"]
|
|
|
|
return {
|
|
"type": final_type,
|
|
"confidence": min(confidence, 1.0),
|
|
"method": "merged",
|
|
"rule_result": rule_result,
|
|
"llm_result": llm_result,
|
|
"expected_type": expected_type,
|
|
"expected_bonus": expected_bonus
|
|
}
|
|
|
|
def _validate_classification(self, result: Dict[str, Any], text: str) -> Dict[str, Any]:
|
|
"""
|
|
Validation finale de la classification
|
|
"""
|
|
# Vérifications de cohérence
|
|
type_ = result["type"]
|
|
confidence = result["confidence"]
|
|
|
|
# Validation spécifique par type
|
|
if type_ == DocumentType.CNI:
|
|
# Vérification des éléments obligatoires d'une CNI
|
|
cni_indicators = ["république", "française", "carte", "identité"]
|
|
if not any(indicator in text.lower() for indicator in cni_indicators):
|
|
confidence *= 0.5 # Réduction de confiance
|
|
|
|
elif type_ in [DocumentType.ACTE_VENTE, DocumentType.ACTE_DONATION, DocumentType.ACTE_SUCCESSION]:
|
|
# Vérification de la présence d'éléments notariaux
|
|
notarial_indicators = ["notaire", "étude", "acte", "authentique"]
|
|
if not any(indicator in text.lower() for indicator in notarial_indicators):
|
|
confidence *= 0.7 # Réduction modérée
|
|
|
|
# Seuil minimum de confiance
|
|
if confidence < 0.3:
|
|
result["type"] = DocumentType.AUTRE
|
|
result["confidence"] = 0.3
|
|
result["validation_note"] = "Confiance trop faible, classé comme 'autre'"
|
|
|
|
return result
|