""" Pipeline de classification des documents notariaux """ import os import json import requests import logging from typing import Dict, Any logger = logging.getLogger(__name__) # Configuration Ollama OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://ollama:11434") OLLAMA_MODEL = "llama3:8b" # Modèle par défaut def run(doc_id: str, ctx: dict): """ Classification d'un document notarial """ logger.info(f"Classification du document {doc_id}") try: # Récupération du texte extrait extracted_text = ctx.get("extracted_text", "") if not extracted_text: raise ValueError("Aucun texte extrait disponible pour la classification") # Limitation de la taille du texte pour le contexte text_sample = extracted_text[:16000] # Limite de contexte # Classification avec Ollama classification_result = _classify_with_ollama(text_sample) # Stockage du résultat ctx["classification"] = classification_result # Métadonnées de classification classify_meta = { "classification_completed": True, "document_type": classification_result.get("label"), "confidence": classification_result.get("confidence", 0.0), "model_used": OLLAMA_MODEL } ctx["classify_meta"] = classify_meta logger.info(f"Classification terminée pour le document {doc_id}: {classification_result.get('label')} (confiance: {classification_result.get('confidence', 0.0):.2f})") except Exception as e: logger.error(f"Erreur lors de la classification du document {doc_id}: {e}") raise def _classify_with_ollama(text: str) -> Dict[str, Any]: """ Classification du document avec Ollama """ try: # Chargement du prompt de classification prompt = _load_classification_prompt() # Remplacement du placeholder par le texte full_prompt = prompt.replace("{{TEXT}}", text) # Appel à l'API Ollama payload = { "model": OLLAMA_MODEL, "prompt": full_prompt, "stream": False, "options": { "temperature": 0.1, # Faible température pour plus de cohérence "top_p": 0.9, "max_tokens": 500 } } response = requests.post( f"{OLLAMA_BASE_URL}/api/generate", json=payload, timeout=120 ) if response.status_code != 200: raise RuntimeError(f"Erreur API Ollama: {response.status_code} - {response.text}") result = response.json() # Parsing de la réponse JSON try: classification_data = json.loads(result["response"]) except json.JSONDecodeError: # Fallback si la réponse n'est pas du JSON valide classification_data = _parse_fallback_response(result["response"]) return classification_data except Exception as e: logger.error(f"Erreur lors de la classification avec Ollama: {e}") # Classification par défaut en cas d'erreur return { "label": "document_inconnu", "confidence": 0.0, "error": str(e) } def _load_classification_prompt() -> str: """ Chargement du prompt de classification """ prompt_path = "/app/models/prompts/classify_prompt.txt" try: if os.path.exists(prompt_path): with open(prompt_path, 'r', encoding='utf-8') as f: return f.read() except Exception as e: logger.warning(f"Impossible de charger le prompt de classification: {e}") # Prompt par défaut return """ Tu es un expert en droit notarial. Analyse le texte suivant et classe le document selon les catégories suivantes : CATÉGORIES POSSIBLES : - acte_vente : Acte de vente immobilière - acte_achat : Acte d'achat immobilière - donation : Acte de donation - testament : Testament - succession : Acte de succession - contrat_mariage : Contrat de mariage - procuration : Procuration - attestation : Attestation - facture : Facture notariale - document_inconnu : Document non classifiable TEXTE À ANALYSER : {{TEXT}} Réponds UNIQUEMENT avec un JSON valide contenant : { "label": "catégorie_choisie", "confidence": 0.95, "reasoning": "explication_courte" } La confiance doit être entre 0.0 et 1.0. """ def _parse_fallback_response(response_text: str) -> Dict[str, Any]: """ Parsing de fallback si la réponse n'est pas du JSON valide """ # Recherche de mots-clés dans la réponse response_lower = response_text.lower() if "vente" in response_lower or "vendu" in response_lower: return {"label": "acte_vente", "confidence": 0.7, "reasoning": "Mots-clés de vente détectés"} elif "achat" in response_lower or "acheté" in response_lower: return {"label": "acte_achat", "confidence": 0.7, "reasoning": "Mots-clés d'achat détectés"} elif "donation" in response_lower or "donné" in response_lower: return {"label": "donation", "confidence": 0.7, "reasoning": "Mots-clés de donation détectés"} elif "testament" in response_lower: return {"label": "testament", "confidence": 0.7, "reasoning": "Mots-clés de testament détectés"} elif "succession" in response_lower or "héritage" in response_lower: return {"label": "succession", "confidence": 0.7, "reasoning": "Mots-clés de succession détectés"} else: return {"label": "document_inconnu", "confidence": 0.3, "reasoning": "Classification par défaut"} def get_document_type_features(text: str) -> Dict[str, Any]: """ Extraction de caractéristiques pour la classification """ features = { "has_dates": len(_extract_dates(text)) > 0, "has_amounts": len(_extract_amounts(text)) > 0, "has_addresses": _has_addresses(text), "has_personal_names": _has_personal_names(text), "text_length": len(text), "word_count": len(text.split()) } return features def _extract_dates(text: str) -> list: """Extraction des dates du texte""" import re date_patterns = [ r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', r'\b\d{1,2}\s+(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)\s+\d{2,4}\b' ] dates = [] for pattern in date_patterns: dates.extend(re.findall(pattern, text, re.IGNORECASE)) return dates def _extract_amounts(text: str) -> list: """Extraction des montants du texte""" import re amount_patterns = [ r'\b\d{1,3}(?:\s\d{3})*(?:[.,]\d{2})?\s*€\b', r'\b\d{1,3}(?:\s\d{3})*(?:[.,]\d{2})?\s*euros?\b' ] amounts = [] for pattern in amount_patterns: amounts.extend(re.findall(pattern, text, re.IGNORECASE)) return amounts def _has_addresses(text: str) -> bool: """Détection de la présence d'adresses""" import re address_indicators = [ r'\b(?:rue|avenue|boulevard|place|chemin|impasse)\b', r'\b\d{5}\b', # Code postal r'\b(?:Paris|Lyon|Marseille|Toulouse|Nice|Nantes|Strasbourg|Montpellier|Bordeaux|Lille)\b' ] for pattern in address_indicators: if re.search(pattern, text, re.IGNORECASE): return True return False def _has_personal_names(text: str) -> bool: """Détection de la présence de noms de personnes""" import re name_indicators = [ r'\b(?:Monsieur|Madame|Mademoiselle|M\.|Mme\.|Mlle\.)\s+[A-Z][a-z]+', r'\b[A-Z][a-z]+\s+[A-Z][a-z]+\b' # Prénom Nom ] for pattern in name_indicators: if re.search(pattern, text): return True return False