""" Client LLM pour la contextualisation et l'analyse des documents notariaux """ import asyncio import logging import json import aiohttp from typing import Dict, Any, Optional, List import os logger = logging.getLogger(__name__) class LLMClient: """Client pour l'interaction avec les modèles LLM (Ollama)""" def __init__(self): self.ollama_base_url = os.getenv("OLLAMA_BASE_URL", "http://ollama:11434") self.default_model = os.getenv("OLLAMA_DEFAULT_MODEL", "llama3:8b") self.session = None self.timeout = aiohttp.ClientTimeout(total=120) # 2 minutes pour les LLM async def __aenter__(self): """Context manager entry""" self.session = aiohttp.ClientSession(timeout=self.timeout) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Context manager exit""" if self.session: await self.session.close() async def generate_response(self, prompt: str, model: Optional[str] = None) -> str: """ Génération de réponse avec le LLM """ try: if not self.session: self.session = aiohttp.ClientSession(timeout=self.timeout) model = model or self.default_model # Vérification que le modèle est disponible await self._ensure_model_available(model) # Génération de la réponse url = f"{self.ollama_base_url}/api/generate" payload = { "model": model, "prompt": prompt, "stream": False, "options": { "temperature": 0.1, # Faible température pour plus de cohérence "top_p": 0.9, "max_tokens": 2000 } } async with self.session.post(url, json=payload) as response: if response.status == 200: result = await response.json() return result.get("response", "") else: error_text = await response.text() logger.error(f"Erreur LLM: {response.status} - {error_text}") raise Exception(f"Erreur LLM: {response.status}") except Exception as e: logger.error(f"Erreur lors de la génération LLM: {e}") raise async def generate_synthesis( self, document_type: str, extracted_text: str, entities: Dict[str, Any], verifications: Dict[str, Any], credibility_score: float ) -> Dict[str, Any]: """ Génération d'un avis de synthèse complet """ try: prompt = self._build_synthesis_prompt( document_type, extracted_text, entities, verifications, credibility_score ) response = await self.generate_response(prompt) # Parsing de la réponse synthesis = self._parse_synthesis_response(response) return synthesis except Exception as e: logger.error(f"Erreur lors de la génération de synthèse: {e}") return { "avis_global": "Erreur lors de l'analyse", "points_cles": [], "recommandations": ["Vérification manuelle recommandée"], "score_qualite": 0.0, "error": str(e) } async def analyze_document_coherence( self, document_type: str, entities: Dict[str, Any], verifications: Dict[str, Any] ) -> Dict[str, Any]: """ Analyse de la cohérence du document """ try: prompt = self._build_coherence_prompt(document_type, entities, verifications) response = await self.generate_response(prompt) return self._parse_coherence_response(response) except Exception as e: logger.error(f"Erreur lors de l'analyse de cohérence: {e}") return { "coherence_score": 0.0, "incoherences": ["Erreur d'analyse"], "recommandations": ["Vérification manuelle"] } async def generate_recommendations( self, document_type: str, entities: Dict[str, Any], verifications: Dict[str, Any], credibility_score: float ) -> List[str]: """ Génération de recommandations spécifiques """ try: prompt = self._build_recommendations_prompt( document_type, entities, verifications, credibility_score ) response = await self.generate_response(prompt) # Parsing des recommandations recommendations = self._parse_recommendations_response(response) return recommendations except Exception as e: logger.error(f"Erreur lors de la génération de recommandations: {e}") return ["Vérification manuelle recommandée"] def _build_synthesis_prompt( self, document_type: str, extracted_text: str, entities: Dict[str, Any], verifications: Dict[str, Any], credibility_score: float ) -> str: """ Construction du prompt pour la synthèse """ # Limitation du texte pour éviter les tokens excessifs text_sample = extracted_text[:1500] + "..." if len(extracted_text) > 1500 else extracted_text prompt = f""" Tu es un expert notarial. Analyse ce document et fournis un avis de synthèse complet. TYPE DE DOCUMENT: {document_type} SCORE DE VRAISEMBLANCE: {credibility_score:.2f} TEXTE EXTRAIT: {text_sample} ENTITÉS IDENTIFIÉES: {json.dumps(entities, indent=2, ensure_ascii=False)} VÉRIFICATIONS EXTERNES: {json.dumps(verifications, indent=2, ensure_ascii=False)} Fournis une analyse structurée en JSON: {{ "avis_global": "avis général sur la qualité et vraisemblance du document", "points_cles": [ "point clé 1", "point clé 2" ], "recommandations": [ "recommandation 1", "recommandation 2" ], "score_qualite": 0.95, "alertes": [ "alerte si problème détecté" ], "conformite_legale": "évaluation de la conformité légale", "risques_identifies": [ "risque 1", "risque 2" ] }} """ return prompt def _build_coherence_prompt( self, document_type: str, entities: Dict[str, Any], verifications: Dict[str, Any] ) -> str: """ Construction du prompt pour l'analyse de cohérence """ prompt = f""" Analyse la cohérence de ce document notarial de type {document_type}. ENTITÉS: {json.dumps(entities, indent=2, ensure_ascii=False)} VÉRIFICATIONS: {json.dumps(verifications, indent=2, ensure_ascii=False)} Évalue la cohérence et réponds en JSON: {{ "coherence_score": 0.9, "incoherences": [ "incohérence détectée" ], "recommandations": [ "recommandation pour corriger" ], "elements_manquants": [ "élément qui devrait être présent" ] }} """ return prompt def _build_recommendations_prompt( self, document_type: str, entities: Dict[str, Any], verifications: Dict[str, Any], credibility_score: float ) -> str: """ Construction du prompt pour les recommandations """ prompt = f""" En tant qu'expert notarial, fournis des recommandations spécifiques pour ce document. TYPE: {document_type} SCORE: {credibility_score:.2f} ENTITÉS: {json.dumps(entities, indent=2, ensure_ascii=False)} VÉRIFICATIONS: {json.dumps(verifications, indent=2, ensure_ascii=False)} Liste les recommandations prioritaires (format JSON): {{ "recommandations": [ "recommandation 1", "recommandation 2" ], "priorite": [ "haute", "moyenne" ] }} """ return prompt def _parse_synthesis_response(self, response: str) -> Dict[str, Any]: """ Parse la réponse de synthèse """ try: # Extraction du JSON import re json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: json_str = json_match.group(0) return json.loads(json_str) # Fallback si pas de JSON return { "avis_global": response[:200] + "..." if len(response) > 200 else response, "points_cles": [], "recommandations": ["Vérification manuelle recommandée"], "score_qualite": 0.5 } except Exception as e: logger.error(f"Erreur parsing synthèse: {e}") return { "avis_global": "Erreur d'analyse", "points_cles": [], "recommandations": ["Vérification manuelle"], "score_qualite": 0.0, "error": str(e) } def _parse_coherence_response(self, response: str) -> Dict[str, Any]: """ Parse la réponse d'analyse de cohérence """ try: import re json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: json_str = json_match.group(0) return json.loads(json_str) return { "coherence_score": 0.5, "incoherences": ["Analyse non disponible"], "recommandations": ["Vérification manuelle"] } except Exception as e: logger.error(f"Erreur parsing cohérence: {e}") return { "coherence_score": 0.0, "incoherences": ["Erreur d'analyse"], "recommandations": ["Vérification manuelle"] } def _parse_recommendations_response(self, response: str) -> List[str]: """ Parse la réponse de recommandations """ try: import re json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: json_str = json_match.group(0) data = json.loads(json_str) return data.get("recommandations", []) # Fallback: extraction simple lines = response.split('\n') recommendations = [] for line in lines: line = line.strip() if line and (line.startswith('-') or line.startswith('•') or line.startswith('*')): recommendations.append(line[1:].strip()) return recommendations if recommendations else ["Vérification manuelle recommandée"] except Exception as e: logger.error(f"Erreur parsing recommandations: {e}") return ["Vérification manuelle recommandée"] async def _ensure_model_available(self, model: str): """ Vérifie que le modèle est disponible, le télécharge si nécessaire """ try: if not self.session: self.session = aiohttp.ClientSession(timeout=self.timeout) # Vérification des modèles disponibles list_url = f"{self.ollama_base_url}/api/tags" async with self.session.get(list_url) as response: if response.status == 200: data = await response.json() available_models = [m["name"] for m in data.get("models", [])] if model not in available_models: logger.info(f"Téléchargement du modèle {model}") await self._pull_model(model) else: logger.info(f"Modèle {model} disponible") else: logger.warning("Impossible de vérifier les modèles disponibles") except Exception as e: logger.error(f"Erreur lors de la vérification du modèle: {e}") # Continue quand même, le modèle pourrait être disponible async def _pull_model(self, model: str): """ Télécharge un modèle Ollama """ try: pull_url = f"{self.ollama_base_url}/api/pull" payload = {"name": model} async with self.session.post(pull_url, json=payload) as response: if response.status == 200: # Lecture du stream de téléchargement async for line in response.content: if line: try: data = json.loads(line.decode()) if data.get("status") == "success": logger.info(f"Modèle {model} téléchargé avec succès") break except json.JSONDecodeError: continue else: logger.error(f"Erreur lors du téléchargement du modèle {model}: {response.status}") except Exception as e: logger.error(f"Erreur lors du téléchargement du modèle {model}: {e}") async def get_available_models(self) -> List[str]: """ Récupère la liste des modèles disponibles """ try: if not self.session: self.session = aiohttp.ClientSession(timeout=self.timeout) list_url = f"{self.ollama_base_url}/api/tags" async with self.session.get(list_url) as response: if response.status == 200: data = await response.json() return [m["name"] for m in data.get("models", [])] else: return [] except Exception as e: logger.error(f"Erreur lors de la récupération des modèles: {e}") return [] async def test_connection(self) -> Dict[str, Any]: """ Test de connexion au service LLM """ try: if not self.session: self.session = aiohttp.ClientSession(timeout=self.timeout) # Test simple test_prompt = "Réponds simplement 'OK' si tu reçois ce message." response = await self.generate_response(test_prompt) return { "connected": True, "model": self.default_model, "response": response[:100], "base_url": self.ollama_base_url } except Exception as e: logger.error(f"Erreur de connexion LLM: {e}") return { "connected": False, "error": str(e), "base_url": self.ollama_base_url }