4NK_IA_back/services/host_api/utils/document_classifier.py
ncantu 447357d41a feat: Implémentation complète du système notarial 4NK avec IA
- API FastAPI complète pour le traitement de documents notariaux
- Pipeline OCR avec correction lexicale notariale
- Classification automatique des documents (règles + LLM)
- Extraction d'entités (identités, adresses, biens, montants)
- Intégration de 6 APIs externes (Cadastre, Géorisques, BODACC, etc.)
- Système de vérification et score de vraisemblance
- Analyse contextuelle via LLM (Ollama)
- Interface web moderne avec drag & drop
- Tests complets et documentation exhaustive
- Scripts de déploiement automatisés

Types de documents supportés:
- Acte de vente, donation, succession
- CNI avec détection du pays
- Contrats divers

Fonctionnalités:
- Upload et traitement asynchrone
- Vérifications externes automatiques
- Score de vraisemblance (0-1)
- Recommandations personnalisées
- Tableaux de bord et statistiques

Prêt pour la production avec démarrage en une commande.
2025-09-09 03:48:56 +02:00

369 lines
12 KiB
Python

"""
Classificateur de documents notariaux
"""
import asyncio
import logging
import json
import re
from typing import Dict, Any, Optional, List
from enum import Enum
import requests
from utils.llm_client import LLMClient
logger = logging.getLogger(__name__)
class DocumentType(str, Enum):
"""Types de documents notariaux"""
ACTE_VENTE = "acte_vente"
ACTE_DONATION = "acte_donation"
ACTE_SUCCESSION = "acte_succession"
CNI = "cni"
CONTRAT = "contrat"
AUTRE = "autre"
class DocumentClassifier:
"""Classificateur de documents notariaux avec LLM et règles"""
def __init__(self):
self.llm_client = LLMClient()
self.classification_rules = self._load_classification_rules()
self.keywords = self._load_keywords()
def _load_classification_rules(self) -> Dict[str, List[str]]:
"""
Règles de classification basées sur des mots-clés
"""
return {
DocumentType.ACTE_VENTE: [
r"acte\s+de\s+vente",
r"vente\s+immobilière",
r"vendeur.*acheteur",
r"prix\s+de\s+vente",
r"acquisition\s+immobilière"
],
DocumentType.ACTE_DONATION: [
r"acte\s+de\s+donation",
r"donation\s+entre\s+vifs",
r"donateur.*donataire",
r"donation\s+partage"
],
DocumentType.ACTE_SUCCESSION: [
r"acte\s+de\s+notoriété",
r"succession",
r"héritier",
r"héritiers",
r"défunt",
r"legs",
r"testament"
],
DocumentType.CNI: [
r"carte\s+d'identité",
r"carte\s+nationale\s+d'identité",
r"république\s+française",
r"ministère\s+de\s+l'intérieur",
r"nom.*prénom.*né.*le"
],
DocumentType.CONTRAT: [
r"contrat\s+de\s+",
r"convention",
r"accord",
r"engagement",
r"obligation"
]
}
def _load_keywords(self) -> Dict[str, List[str]]:
"""
Mots-clés spécifiques par type de document
"""
return {
DocumentType.ACTE_VENTE: [
"vendeur", "acheteur", "prix", "vente", "acquisition",
"immobilier", "appartement", "maison", "terrain"
],
DocumentType.ACTE_DONATION: [
"donateur", "donataire", "donation", "don", "gratuit"
],
DocumentType.ACTE_SUCCESSION: [
"héritier", "défunt", "succession", "legs", "testament",
"notoriété", "décès"
],
DocumentType.CNI: [
"carte", "identité", "république", "française", "ministère",
"intérieur", "", "nationalité"
],
DocumentType.CONTRAT: [
"contrat", "convention", "accord", "engagement", "obligation",
"parties", "clause"
]
}
async def classify_document(
self,
text: str,
expected_type: Optional[DocumentType] = None,
force_reclassification: bool = False
) -> Dict[str, Any]:
"""
Classification d'un document notarial
"""
logger.info("Début de la classification du document")
try:
# 1. Classification par règles (rapide)
rule_based_result = self._classify_by_rules(text)
# 2. Classification par LLM (plus précise)
llm_result = await self._classify_by_llm(text, expected_type)
# 3. Fusion des résultats
final_result = self._merge_classification_results(
rule_based_result, llm_result, expected_type
)
# 4. Validation du résultat
validated_result = self._validate_classification(final_result, text)
logger.info(f"Classification terminée: {validated_result['type']} (confiance: {validated_result['confidence']:.2f})")
return validated_result
except Exception as e:
logger.error(f"Erreur lors de la classification: {e}")
# Retour d'un résultat par défaut
return {
"type": DocumentType.AUTRE,
"confidence": 0.0,
"method": "error",
"error": str(e)
}
def _classify_by_rules(self, text: str) -> Dict[str, Any]:
"""
Classification basée sur des règles et mots-clés
"""
text_lower = text.lower()
scores = {}
# Calcul des scores par type
for doc_type, patterns in self.classification_rules.items():
score = 0
matches = []
# Score basé sur les expressions régulières
for pattern in patterns:
if re.search(pattern, text_lower):
score += 2
matches.append(pattern)
# Score basé sur les mots-clés
keywords = self.keywords.get(doc_type, [])
for keyword in keywords:
if keyword in text_lower:
score += 1
matches.append(keyword)
scores[doc_type] = {
"score": score,
"matches": matches
}
# Détermination du type avec le meilleur score
if scores:
best_type = max(scores.keys(), key=lambda k: scores[k]["score"])
best_score = scores[best_type]["score"]
# Normalisation du score (0-1)
max_possible_score = max(
len(self.classification_rules.get(doc_type, [])) * 2 +
len(self.keywords.get(doc_type, []))
for doc_type in DocumentType
)
confidence = min(best_score / max_possible_score, 1.0) if max_possible_score > 0 else 0.0
return {
"type": best_type,
"confidence": confidence,
"method": "rules",
"scores": scores,
"matches": scores[best_type]["matches"]
}
else:
return {
"type": DocumentType.AUTRE,
"confidence": 0.0,
"method": "rules",
"scores": scores
}
async def _classify_by_llm(self, text: str, expected_type: Optional[DocumentType] = None) -> Dict[str, Any]:
"""
Classification par LLM (Ollama)
"""
try:
# Préparation du prompt
prompt = self._build_classification_prompt(text, expected_type)
# Appel au LLM
response = await self.llm_client.generate_response(prompt)
# Parsing de la réponse
result = self._parse_llm_classification_response(response)
return result
except Exception as e:
logger.error(f"Erreur lors de la classification LLM: {e}")
return {
"type": DocumentType.AUTRE,
"confidence": 0.0,
"method": "llm_error",
"error": str(e)
}
def _build_classification_prompt(self, text: str, expected_type: Optional[DocumentType] = None) -> str:
"""
Construction du prompt pour la classification LLM
"""
# Limitation du texte pour éviter les tokens excessifs
text_sample = text[:2000] + "..." if len(text) > 2000 else text
prompt = f"""
Tu es un expert en documents notariaux. Analyse le texte suivant et détermine son type.
Types possibles:
- acte_vente: Acte de vente immobilière
- acte_donation: Acte de donation
- acte_succession: Acte de succession ou de notoriété
- cni: Carte nationale d'identité
- contrat: Contrat ou convention
- autre: Autre type de document
Texte à analyser:
{text_sample}
Réponds UNIQUEMENT avec un JSON dans ce format:
{{
"type": "type_detecte",
"confidence": 0.95,
"reasoning": "explication courte de la décision",
"key_indicators": ["indicateur1", "indicateur2"]
}}
"""
if expected_type:
prompt += f"\n\nType attendu: {expected_type.value}"
return prompt
def _parse_llm_classification_response(self, response: str) -> Dict[str, Any]:
"""
Parse la réponse du LLM pour la classification
"""
try:
# Extraction du JSON de la réponse
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group(0)
result = json.loads(json_str)
# Validation du type
if result.get("type") in [t.value for t in DocumentType]:
return {
"type": DocumentType(result["type"]),
"confidence": float(result.get("confidence", 0.0)),
"method": "llm",
"reasoning": result.get("reasoning", ""),
"key_indicators": result.get("key_indicators", [])
}
# Fallback si le parsing échoue
return {
"type": DocumentType.AUTRE,
"confidence": 0.0,
"method": "llm_parse_error",
"raw_response": response
}
except Exception as e:
logger.error(f"Erreur lors du parsing de la réponse LLM: {e}")
return {
"type": DocumentType.AUTRE,
"confidence": 0.0,
"method": "llm_parse_error",
"error": str(e),
"raw_response": response
}
def _merge_classification_results(
self,
rule_result: Dict[str, Any],
llm_result: Dict[str, Any],
expected_type: Optional[DocumentType]
) -> Dict[str, Any]:
"""
Fusion des résultats de classification
"""
# Poids des différentes méthodes
rule_weight = 0.3
llm_weight = 0.7
# Si un type est attendu et correspond, bonus de confiance
expected_bonus = 0.0
if expected_type:
if rule_result["type"] == expected_type:
expected_bonus += 0.1
if llm_result["type"] == expected_type:
expected_bonus += 0.1
# Calcul de la confiance fusionnée
if rule_result["type"] == llm_result["type"]:
# Accord entre les méthodes
confidence = (rule_result["confidence"] * rule_weight +
llm_result["confidence"] * llm_weight) + expected_bonus
final_type = rule_result["type"]
else:
# Désaccord, on privilégie le LLM
confidence = llm_result["confidence"] * llm_weight + expected_bonus
final_type = llm_result["type"]
return {
"type": final_type,
"confidence": min(confidence, 1.0),
"method": "merged",
"rule_result": rule_result,
"llm_result": llm_result,
"expected_type": expected_type,
"expected_bonus": expected_bonus
}
def _validate_classification(self, result: Dict[str, Any], text: str) -> Dict[str, Any]:
"""
Validation finale de la classification
"""
# Vérifications de cohérence
type_ = result["type"]
confidence = result["confidence"]
# Validation spécifique par type
if type_ == DocumentType.CNI:
# Vérification des éléments obligatoires d'une CNI
cni_indicators = ["république", "française", "carte", "identité"]
if not any(indicator in text.lower() for indicator in cni_indicators):
confidence *= 0.5 # Réduction de confiance
elif type_ in [DocumentType.ACTE_VENTE, DocumentType.ACTE_DONATION, DocumentType.ACTE_SUCCESSION]:
# Vérification de la présence d'éléments notariaux
notarial_indicators = ["notaire", "étude", "acte", "authentique"]
if not any(indicator in text.lower() for indicator in notarial_indicators):
confidence *= 0.7 # Réduction modérée
# Seuil minimum de confiance
if confidence < 0.3:
result["type"] = DocumentType.AUTRE
result["confidence"] = 0.3
result["validation_note"] = "Confiance trop faible, classé comme 'autre'"
return result