
- API FastAPI complète pour le traitement de documents notariaux - Pipeline OCR avec correction lexicale notariale - Classification automatique des documents (règles + LLM) - Extraction d'entités (identités, adresses, biens, montants) - Intégration de 6 APIs externes (Cadastre, Géorisques, BODACC, etc.) - Système de vérification et score de vraisemblance - Analyse contextuelle via LLM (Ollama) - Interface web moderne avec drag & drop - Tests complets et documentation exhaustive - Scripts de déploiement automatisés Types de documents supportés: - Acte de vente, donation, succession - CNI avec détection du pays - Contrats divers Fonctionnalités: - Upload et traitement asynchrone - Vérifications externes automatiques - Score de vraisemblance (0-1) - Recommandations personnalisées - Tableaux de bord et statistiques Prêt pour la production avec démarrage en une commande.
453 lines
15 KiB
Python
453 lines
15 KiB
Python
"""
|
|
Client LLM pour la contextualisation et l'analyse des documents notariaux
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import json
|
|
import aiohttp
|
|
from typing import Dict, Any, Optional, List
|
|
import os
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class LLMClient:
|
|
"""Client pour l'interaction avec les modèles LLM (Ollama)"""
|
|
|
|
def __init__(self):
|
|
self.ollama_base_url = os.getenv("OLLAMA_BASE_URL", "http://ollama:11434")
|
|
self.default_model = os.getenv("OLLAMA_DEFAULT_MODEL", "llama3:8b")
|
|
self.session = None
|
|
self.timeout = aiohttp.ClientTimeout(total=120) # 2 minutes pour les LLM
|
|
|
|
async def __aenter__(self):
|
|
"""Context manager entry"""
|
|
self.session = aiohttp.ClientSession(timeout=self.timeout)
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit"""
|
|
if self.session:
|
|
await self.session.close()
|
|
|
|
async def generate_response(self, prompt: str, model: Optional[str] = None) -> str:
|
|
"""
|
|
Génération de réponse avec le LLM
|
|
"""
|
|
try:
|
|
if not self.session:
|
|
self.session = aiohttp.ClientSession(timeout=self.timeout)
|
|
|
|
model = model or self.default_model
|
|
|
|
# Vérification que le modèle est disponible
|
|
await self._ensure_model_available(model)
|
|
|
|
# Génération de la réponse
|
|
url = f"{self.ollama_base_url}/api/generate"
|
|
payload = {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"options": {
|
|
"temperature": 0.1, # Faible température pour plus de cohérence
|
|
"top_p": 0.9,
|
|
"max_tokens": 2000
|
|
}
|
|
}
|
|
|
|
async with self.session.post(url, json=payload) as response:
|
|
if response.status == 200:
|
|
result = await response.json()
|
|
return result.get("response", "")
|
|
else:
|
|
error_text = await response.text()
|
|
logger.error(f"Erreur LLM: {response.status} - {error_text}")
|
|
raise Exception(f"Erreur LLM: {response.status}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la génération LLM: {e}")
|
|
raise
|
|
|
|
async def generate_synthesis(
|
|
self,
|
|
document_type: str,
|
|
extracted_text: str,
|
|
entities: Dict[str, Any],
|
|
verifications: Dict[str, Any],
|
|
credibility_score: float
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Génération d'un avis de synthèse complet
|
|
"""
|
|
try:
|
|
prompt = self._build_synthesis_prompt(
|
|
document_type, extracted_text, entities, verifications, credibility_score
|
|
)
|
|
|
|
response = await self.generate_response(prompt)
|
|
|
|
# Parsing de la réponse
|
|
synthesis = self._parse_synthesis_response(response)
|
|
|
|
return synthesis
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la génération de synthèse: {e}")
|
|
return {
|
|
"avis_global": "Erreur lors de l'analyse",
|
|
"points_cles": [],
|
|
"recommandations": ["Vérification manuelle recommandée"],
|
|
"score_qualite": 0.0,
|
|
"error": str(e)
|
|
}
|
|
|
|
async def analyze_document_coherence(
|
|
self,
|
|
document_type: str,
|
|
entities: Dict[str, Any],
|
|
verifications: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Analyse de la cohérence du document
|
|
"""
|
|
try:
|
|
prompt = self._build_coherence_prompt(document_type, entities, verifications)
|
|
response = await self.generate_response(prompt)
|
|
|
|
return self._parse_coherence_response(response)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'analyse de cohérence: {e}")
|
|
return {
|
|
"coherence_score": 0.0,
|
|
"incoherences": ["Erreur d'analyse"],
|
|
"recommandations": ["Vérification manuelle"]
|
|
}
|
|
|
|
async def generate_recommendations(
|
|
self,
|
|
document_type: str,
|
|
entities: Dict[str, Any],
|
|
verifications: Dict[str, Any],
|
|
credibility_score: float
|
|
) -> List[str]:
|
|
"""
|
|
Génération de recommandations spécifiques
|
|
"""
|
|
try:
|
|
prompt = self._build_recommendations_prompt(
|
|
document_type, entities, verifications, credibility_score
|
|
)
|
|
|
|
response = await self.generate_response(prompt)
|
|
|
|
# Parsing des recommandations
|
|
recommendations = self._parse_recommendations_response(response)
|
|
|
|
return recommendations
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la génération de recommandations: {e}")
|
|
return ["Vérification manuelle recommandée"]
|
|
|
|
def _build_synthesis_prompt(
|
|
self,
|
|
document_type: str,
|
|
extracted_text: str,
|
|
entities: Dict[str, Any],
|
|
verifications: Dict[str, Any],
|
|
credibility_score: float
|
|
) -> str:
|
|
"""
|
|
Construction du prompt pour la synthèse
|
|
"""
|
|
# Limitation du texte pour éviter les tokens excessifs
|
|
text_sample = extracted_text[:1500] + "..." if len(extracted_text) > 1500 else extracted_text
|
|
|
|
prompt = f"""
|
|
Tu es un expert notarial. Analyse ce document et fournis un avis de synthèse complet.
|
|
|
|
TYPE DE DOCUMENT: {document_type}
|
|
SCORE DE VRAISEMBLANCE: {credibility_score:.2f}
|
|
|
|
TEXTE EXTRAIT:
|
|
{text_sample}
|
|
|
|
ENTITÉS IDENTIFIÉES:
|
|
{json.dumps(entities, indent=2, ensure_ascii=False)}
|
|
|
|
VÉRIFICATIONS EXTERNES:
|
|
{json.dumps(verifications, indent=2, ensure_ascii=False)}
|
|
|
|
Fournis une analyse structurée en JSON:
|
|
{{
|
|
"avis_global": "avis général sur la qualité et vraisemblance du document",
|
|
"points_cles": [
|
|
"point clé 1",
|
|
"point clé 2"
|
|
],
|
|
"recommandations": [
|
|
"recommandation 1",
|
|
"recommandation 2"
|
|
],
|
|
"score_qualite": 0.95,
|
|
"alertes": [
|
|
"alerte si problème détecté"
|
|
],
|
|
"conformite_legale": "évaluation de la conformité légale",
|
|
"risques_identifies": [
|
|
"risque 1",
|
|
"risque 2"
|
|
]
|
|
}}
|
|
"""
|
|
return prompt
|
|
|
|
def _build_coherence_prompt(
|
|
self,
|
|
document_type: str,
|
|
entities: Dict[str, Any],
|
|
verifications: Dict[str, Any]
|
|
) -> str:
|
|
"""
|
|
Construction du prompt pour l'analyse de cohérence
|
|
"""
|
|
prompt = f"""
|
|
Analyse la cohérence de ce document notarial de type {document_type}.
|
|
|
|
ENTITÉS:
|
|
{json.dumps(entities, indent=2, ensure_ascii=False)}
|
|
|
|
VÉRIFICATIONS:
|
|
{json.dumps(verifications, indent=2, ensure_ascii=False)}
|
|
|
|
Évalue la cohérence et réponds en JSON:
|
|
{{
|
|
"coherence_score": 0.9,
|
|
"incoherences": [
|
|
"incohérence détectée"
|
|
],
|
|
"recommandations": [
|
|
"recommandation pour corriger"
|
|
],
|
|
"elements_manquants": [
|
|
"élément qui devrait être présent"
|
|
]
|
|
}}
|
|
"""
|
|
return prompt
|
|
|
|
def _build_recommendations_prompt(
|
|
self,
|
|
document_type: str,
|
|
entities: Dict[str, Any],
|
|
verifications: Dict[str, Any],
|
|
credibility_score: float
|
|
) -> str:
|
|
"""
|
|
Construction du prompt pour les recommandations
|
|
"""
|
|
prompt = f"""
|
|
En tant qu'expert notarial, fournis des recommandations spécifiques pour ce document.
|
|
|
|
TYPE: {document_type}
|
|
SCORE: {credibility_score:.2f}
|
|
|
|
ENTITÉS: {json.dumps(entities, indent=2, ensure_ascii=False)}
|
|
VÉRIFICATIONS: {json.dumps(verifications, indent=2, ensure_ascii=False)}
|
|
|
|
Liste les recommandations prioritaires (format JSON):
|
|
{{
|
|
"recommandations": [
|
|
"recommandation 1",
|
|
"recommandation 2"
|
|
],
|
|
"priorite": [
|
|
"haute",
|
|
"moyenne"
|
|
]
|
|
}}
|
|
"""
|
|
return prompt
|
|
|
|
def _parse_synthesis_response(self, response: str) -> Dict[str, Any]:
|
|
"""
|
|
Parse la réponse de synthèse
|
|
"""
|
|
try:
|
|
# Extraction du JSON
|
|
import re
|
|
json_match = re.search(r'\{.*\}', response, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group(0)
|
|
return json.loads(json_str)
|
|
|
|
# Fallback si pas de JSON
|
|
return {
|
|
"avis_global": response[:200] + "..." if len(response) > 200 else response,
|
|
"points_cles": [],
|
|
"recommandations": ["Vérification manuelle recommandée"],
|
|
"score_qualite": 0.5
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur parsing synthèse: {e}")
|
|
return {
|
|
"avis_global": "Erreur d'analyse",
|
|
"points_cles": [],
|
|
"recommandations": ["Vérification manuelle"],
|
|
"score_qualite": 0.0,
|
|
"error": str(e)
|
|
}
|
|
|
|
def _parse_coherence_response(self, response: str) -> Dict[str, Any]:
|
|
"""
|
|
Parse la réponse d'analyse de cohérence
|
|
"""
|
|
try:
|
|
import re
|
|
json_match = re.search(r'\{.*\}', response, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group(0)
|
|
return json.loads(json_str)
|
|
|
|
return {
|
|
"coherence_score": 0.5,
|
|
"incoherences": ["Analyse non disponible"],
|
|
"recommandations": ["Vérification manuelle"]
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur parsing cohérence: {e}")
|
|
return {
|
|
"coherence_score": 0.0,
|
|
"incoherences": ["Erreur d'analyse"],
|
|
"recommandations": ["Vérification manuelle"]
|
|
}
|
|
|
|
def _parse_recommendations_response(self, response: str) -> List[str]:
|
|
"""
|
|
Parse la réponse de recommandations
|
|
"""
|
|
try:
|
|
import re
|
|
json_match = re.search(r'\{.*\}', response, re.DOTALL)
|
|
if json_match:
|
|
json_str = json_match.group(0)
|
|
data = json.loads(json_str)
|
|
return data.get("recommandations", [])
|
|
|
|
# Fallback: extraction simple
|
|
lines = response.split('\n')
|
|
recommendations = []
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line and (line.startswith('-') or line.startswith('•') or line.startswith('*')):
|
|
recommendations.append(line[1:].strip())
|
|
|
|
return recommendations if recommendations else ["Vérification manuelle recommandée"]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur parsing recommandations: {e}")
|
|
return ["Vérification manuelle recommandée"]
|
|
|
|
async def _ensure_model_available(self, model: str):
|
|
"""
|
|
Vérifie que le modèle est disponible, le télécharge si nécessaire
|
|
"""
|
|
try:
|
|
if not self.session:
|
|
self.session = aiohttp.ClientSession(timeout=self.timeout)
|
|
|
|
# Vérification des modèles disponibles
|
|
list_url = f"{self.ollama_base_url}/api/tags"
|
|
async with self.session.get(list_url) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
available_models = [m["name"] for m in data.get("models", [])]
|
|
|
|
if model not in available_models:
|
|
logger.info(f"Téléchargement du modèle {model}")
|
|
await self._pull_model(model)
|
|
else:
|
|
logger.info(f"Modèle {model} disponible")
|
|
else:
|
|
logger.warning("Impossible de vérifier les modèles disponibles")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la vérification du modèle: {e}")
|
|
# Continue quand même, le modèle pourrait être disponible
|
|
|
|
async def _pull_model(self, model: str):
|
|
"""
|
|
Télécharge un modèle Ollama
|
|
"""
|
|
try:
|
|
pull_url = f"{self.ollama_base_url}/api/pull"
|
|
payload = {"name": model}
|
|
|
|
async with self.session.post(pull_url, json=payload) as response:
|
|
if response.status == 200:
|
|
# Lecture du stream de téléchargement
|
|
async for line in response.content:
|
|
if line:
|
|
try:
|
|
data = json.loads(line.decode())
|
|
if data.get("status") == "success":
|
|
logger.info(f"Modèle {model} téléchargé avec succès")
|
|
break
|
|
except json.JSONDecodeError:
|
|
continue
|
|
else:
|
|
logger.error(f"Erreur lors du téléchargement du modèle {model}: {response.status}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du téléchargement du modèle {model}: {e}")
|
|
|
|
async def get_available_models(self) -> List[str]:
|
|
"""
|
|
Récupère la liste des modèles disponibles
|
|
"""
|
|
try:
|
|
if not self.session:
|
|
self.session = aiohttp.ClientSession(timeout=self.timeout)
|
|
|
|
list_url = f"{self.ollama_base_url}/api/tags"
|
|
async with self.session.get(list_url) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
return [m["name"] for m in data.get("models", [])]
|
|
else:
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la récupération des modèles: {e}")
|
|
return []
|
|
|
|
async def test_connection(self) -> Dict[str, Any]:
|
|
"""
|
|
Test de connexion au service LLM
|
|
"""
|
|
try:
|
|
if not self.session:
|
|
self.session = aiohttp.ClientSession(timeout=self.timeout)
|
|
|
|
# Test simple
|
|
test_prompt = "Réponds simplement 'OK' si tu reçois ce message."
|
|
response = await self.generate_response(test_prompt)
|
|
|
|
return {
|
|
"connected": True,
|
|
"model": self.default_model,
|
|
"response": response[:100],
|
|
"base_url": self.ollama_base_url
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur de connexion LLM: {e}")
|
|
return {
|
|
"connected": False,
|
|
"error": str(e),
|
|
"base_url": self.ollama_base_url
|
|
}
|