ncantu 447357d41a feat: Implémentation complète du système notarial 4NK avec IA
- API FastAPI complète pour le traitement de documents notariaux
- Pipeline OCR avec correction lexicale notariale
- Classification automatique des documents (règles + LLM)
- Extraction d'entités (identités, adresses, biens, montants)
- Intégration de 6 APIs externes (Cadastre, Géorisques, BODACC, etc.)
- Système de vérification et score de vraisemblance
- Analyse contextuelle via LLM (Ollama)
- Interface web moderne avec drag & drop
- Tests complets et documentation exhaustive
- Scripts de déploiement automatisés

Types de documents supportés:
- Acte de vente, donation, succession
- CNI avec détection du pays
- Contrats divers

Fonctionnalités:
- Upload et traitement asynchrone
- Vérifications externes automatiques
- Score de vraisemblance (0-1)
- Recommandations personnalisées
- Tableaux de bord et statistiques

Prêt pour la production avec démarrage en une commande.
2025-09-09 03:48:56 +02:00

453 lines
15 KiB
Python

"""
Client LLM pour la contextualisation et l'analyse des documents notariaux
"""
import asyncio
import logging
import json
import aiohttp
from typing import Dict, Any, Optional, List
import os
logger = logging.getLogger(__name__)
class LLMClient:
"""Client pour l'interaction avec les modèles LLM (Ollama)"""
def __init__(self):
self.ollama_base_url = os.getenv("OLLAMA_BASE_URL", "http://ollama:11434")
self.default_model = os.getenv("OLLAMA_DEFAULT_MODEL", "llama3:8b")
self.session = None
self.timeout = aiohttp.ClientTimeout(total=120) # 2 minutes pour les LLM
async def __aenter__(self):
"""Context manager entry"""
self.session = aiohttp.ClientSession(timeout=self.timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
if self.session:
await self.session.close()
async def generate_response(self, prompt: str, model: Optional[str] = None) -> str:
"""
Génération de réponse avec le LLM
"""
try:
if not self.session:
self.session = aiohttp.ClientSession(timeout=self.timeout)
model = model or self.default_model
# Vérification que le modèle est disponible
await self._ensure_model_available(model)
# Génération de la réponse
url = f"{self.ollama_base_url}/api/generate"
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.1, # Faible température pour plus de cohérence
"top_p": 0.9,
"max_tokens": 2000
}
}
async with self.session.post(url, json=payload) as response:
if response.status == 200:
result = await response.json()
return result.get("response", "")
else:
error_text = await response.text()
logger.error(f"Erreur LLM: {response.status} - {error_text}")
raise Exception(f"Erreur LLM: {response.status}")
except Exception as e:
logger.error(f"Erreur lors de la génération LLM: {e}")
raise
async def generate_synthesis(
self,
document_type: str,
extracted_text: str,
entities: Dict[str, Any],
verifications: Dict[str, Any],
credibility_score: float
) -> Dict[str, Any]:
"""
Génération d'un avis de synthèse complet
"""
try:
prompt = self._build_synthesis_prompt(
document_type, extracted_text, entities, verifications, credibility_score
)
response = await self.generate_response(prompt)
# Parsing de la réponse
synthesis = self._parse_synthesis_response(response)
return synthesis
except Exception as e:
logger.error(f"Erreur lors de la génération de synthèse: {e}")
return {
"avis_global": "Erreur lors de l'analyse",
"points_cles": [],
"recommandations": ["Vérification manuelle recommandée"],
"score_qualite": 0.0,
"error": str(e)
}
async def analyze_document_coherence(
self,
document_type: str,
entities: Dict[str, Any],
verifications: Dict[str, Any]
) -> Dict[str, Any]:
"""
Analyse de la cohérence du document
"""
try:
prompt = self._build_coherence_prompt(document_type, entities, verifications)
response = await self.generate_response(prompt)
return self._parse_coherence_response(response)
except Exception as e:
logger.error(f"Erreur lors de l'analyse de cohérence: {e}")
return {
"coherence_score": 0.0,
"incoherences": ["Erreur d'analyse"],
"recommandations": ["Vérification manuelle"]
}
async def generate_recommendations(
self,
document_type: str,
entities: Dict[str, Any],
verifications: Dict[str, Any],
credibility_score: float
) -> List[str]:
"""
Génération de recommandations spécifiques
"""
try:
prompt = self._build_recommendations_prompt(
document_type, entities, verifications, credibility_score
)
response = await self.generate_response(prompt)
# Parsing des recommandations
recommendations = self._parse_recommendations_response(response)
return recommendations
except Exception as e:
logger.error(f"Erreur lors de la génération de recommandations: {e}")
return ["Vérification manuelle recommandée"]
def _build_synthesis_prompt(
self,
document_type: str,
extracted_text: str,
entities: Dict[str, Any],
verifications: Dict[str, Any],
credibility_score: float
) -> str:
"""
Construction du prompt pour la synthèse
"""
# Limitation du texte pour éviter les tokens excessifs
text_sample = extracted_text[:1500] + "..." if len(extracted_text) > 1500 else extracted_text
prompt = f"""
Tu es un expert notarial. Analyse ce document et fournis un avis de synthèse complet.
TYPE DE DOCUMENT: {document_type}
SCORE DE VRAISEMBLANCE: {credibility_score:.2f}
TEXTE EXTRAIT:
{text_sample}
ENTITÉS IDENTIFIÉES:
{json.dumps(entities, indent=2, ensure_ascii=False)}
VÉRIFICATIONS EXTERNES:
{json.dumps(verifications, indent=2, ensure_ascii=False)}
Fournis une analyse structurée en JSON:
{{
"avis_global": "avis général sur la qualité et vraisemblance du document",
"points_cles": [
"point clé 1",
"point clé 2"
],
"recommandations": [
"recommandation 1",
"recommandation 2"
],
"score_qualite": 0.95,
"alertes": [
"alerte si problème détecté"
],
"conformite_legale": "évaluation de la conformité légale",
"risques_identifies": [
"risque 1",
"risque 2"
]
}}
"""
return prompt
def _build_coherence_prompt(
self,
document_type: str,
entities: Dict[str, Any],
verifications: Dict[str, Any]
) -> str:
"""
Construction du prompt pour l'analyse de cohérence
"""
prompt = f"""
Analyse la cohérence de ce document notarial de type {document_type}.
ENTITÉS:
{json.dumps(entities, indent=2, ensure_ascii=False)}
VÉRIFICATIONS:
{json.dumps(verifications, indent=2, ensure_ascii=False)}
Évalue la cohérence et réponds en JSON:
{{
"coherence_score": 0.9,
"incoherences": [
"incohérence détectée"
],
"recommandations": [
"recommandation pour corriger"
],
"elements_manquants": [
"élément qui devrait être présent"
]
}}
"""
return prompt
def _build_recommendations_prompt(
self,
document_type: str,
entities: Dict[str, Any],
verifications: Dict[str, Any],
credibility_score: float
) -> str:
"""
Construction du prompt pour les recommandations
"""
prompt = f"""
En tant qu'expert notarial, fournis des recommandations spécifiques pour ce document.
TYPE: {document_type}
SCORE: {credibility_score:.2f}
ENTITÉS: {json.dumps(entities, indent=2, ensure_ascii=False)}
VÉRIFICATIONS: {json.dumps(verifications, indent=2, ensure_ascii=False)}
Liste les recommandations prioritaires (format JSON):
{{
"recommandations": [
"recommandation 1",
"recommandation 2"
],
"priorite": [
"haute",
"moyenne"
]
}}
"""
return prompt
def _parse_synthesis_response(self, response: str) -> Dict[str, Any]:
"""
Parse la réponse de synthèse
"""
try:
# Extraction du JSON
import re
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group(0)
return json.loads(json_str)
# Fallback si pas de JSON
return {
"avis_global": response[:200] + "..." if len(response) > 200 else response,
"points_cles": [],
"recommandations": ["Vérification manuelle recommandée"],
"score_qualite": 0.5
}
except Exception as e:
logger.error(f"Erreur parsing synthèse: {e}")
return {
"avis_global": "Erreur d'analyse",
"points_cles": [],
"recommandations": ["Vérification manuelle"],
"score_qualite": 0.0,
"error": str(e)
}
def _parse_coherence_response(self, response: str) -> Dict[str, Any]:
"""
Parse la réponse d'analyse de cohérence
"""
try:
import re
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group(0)
return json.loads(json_str)
return {
"coherence_score": 0.5,
"incoherences": ["Analyse non disponible"],
"recommandations": ["Vérification manuelle"]
}
except Exception as e:
logger.error(f"Erreur parsing cohérence: {e}")
return {
"coherence_score": 0.0,
"incoherences": ["Erreur d'analyse"],
"recommandations": ["Vérification manuelle"]
}
def _parse_recommendations_response(self, response: str) -> List[str]:
"""
Parse la réponse de recommandations
"""
try:
import re
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group(0)
data = json.loads(json_str)
return data.get("recommandations", [])
# Fallback: extraction simple
lines = response.split('\n')
recommendations = []
for line in lines:
line = line.strip()
if line and (line.startswith('-') or line.startswith('') or line.startswith('*')):
recommendations.append(line[1:].strip())
return recommendations if recommendations else ["Vérification manuelle recommandée"]
except Exception as e:
logger.error(f"Erreur parsing recommandations: {e}")
return ["Vérification manuelle recommandée"]
async def _ensure_model_available(self, model: str):
"""
Vérifie que le modèle est disponible, le télécharge si nécessaire
"""
try:
if not self.session:
self.session = aiohttp.ClientSession(timeout=self.timeout)
# Vérification des modèles disponibles
list_url = f"{self.ollama_base_url}/api/tags"
async with self.session.get(list_url) as response:
if response.status == 200:
data = await response.json()
available_models = [m["name"] for m in data.get("models", [])]
if model not in available_models:
logger.info(f"Téléchargement du modèle {model}")
await self._pull_model(model)
else:
logger.info(f"Modèle {model} disponible")
else:
logger.warning("Impossible de vérifier les modèles disponibles")
except Exception as e:
logger.error(f"Erreur lors de la vérification du modèle: {e}")
# Continue quand même, le modèle pourrait être disponible
async def _pull_model(self, model: str):
"""
Télécharge un modèle Ollama
"""
try:
pull_url = f"{self.ollama_base_url}/api/pull"
payload = {"name": model}
async with self.session.post(pull_url, json=payload) as response:
if response.status == 200:
# Lecture du stream de téléchargement
async for line in response.content:
if line:
try:
data = json.loads(line.decode())
if data.get("status") == "success":
logger.info(f"Modèle {model} téléchargé avec succès")
break
except json.JSONDecodeError:
continue
else:
logger.error(f"Erreur lors du téléchargement du modèle {model}: {response.status}")
except Exception as e:
logger.error(f"Erreur lors du téléchargement du modèle {model}: {e}")
async def get_available_models(self) -> List[str]:
"""
Récupère la liste des modèles disponibles
"""
try:
if not self.session:
self.session = aiohttp.ClientSession(timeout=self.timeout)
list_url = f"{self.ollama_base_url}/api/tags"
async with self.session.get(list_url) as response:
if response.status == 200:
data = await response.json()
return [m["name"] for m in data.get("models", [])]
else:
return []
except Exception as e:
logger.error(f"Erreur lors de la récupération des modèles: {e}")
return []
async def test_connection(self) -> Dict[str, Any]:
"""
Test de connexion au service LLM
"""
try:
if not self.session:
self.session = aiohttp.ClientSession(timeout=self.timeout)
# Test simple
test_prompt = "Réponds simplement 'OK' si tu reçois ce message."
response = await self.generate_response(test_prompt)
return {
"connected": True,
"model": self.default_model,
"response": response[:100],
"base_url": self.ollama_base_url
}
except Exception as e:
logger.error(f"Erreur de connexion LLM: {e}")
return {
"connected": False,
"error": str(e),
"base_url": self.ollama_base_url
}