
- Infrastructure complète de traitement de documents notariaux - API FastAPI d'ingestion et d'orchestration - Pipelines Celery pour le traitement asynchrone - Support des formats PDF, JPEG, PNG, TIFF, HEIC - OCR avec Tesseract et correction lexicale - Classification automatique des documents avec Ollama - Extraction de données structurées - Indexation dans AnythingLLM et OpenSearch - Système de vérifications et contrôles métier - Base de données PostgreSQL pour le métier - Stockage objet avec MinIO - Base de données graphe Neo4j - Recherche plein-texte avec OpenSearch - Supervision avec Prometheus et Grafana - Scripts d'installation pour Debian - Documentation complète - Tests unitaires et de performance - Service systemd pour le déploiement - Scripts de déploiement automatisés
238 lines
7.6 KiB
Python
238 lines
7.6 KiB
Python
"""
|
|
Pipeline de classification des documents notariaux
|
|
"""
|
|
import os
|
|
import json
|
|
import requests
|
|
import logging
|
|
from typing import Dict, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration Ollama
|
|
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://ollama:11434")
|
|
OLLAMA_MODEL = "llama3:8b" # Modèle par défaut
|
|
|
|
def run(doc_id: str, ctx: dict):
|
|
"""
|
|
Classification d'un document notarial
|
|
"""
|
|
logger.info(f"Classification du document {doc_id}")
|
|
|
|
try:
|
|
# Récupération du texte extrait
|
|
extracted_text = ctx.get("extracted_text", "")
|
|
if not extracted_text:
|
|
raise ValueError("Aucun texte extrait disponible pour la classification")
|
|
|
|
# Limitation de la taille du texte pour le contexte
|
|
text_sample = extracted_text[:16000] # Limite de contexte
|
|
|
|
# Classification avec Ollama
|
|
classification_result = _classify_with_ollama(text_sample)
|
|
|
|
# Stockage du résultat
|
|
ctx["classification"] = classification_result
|
|
|
|
# Métadonnées de classification
|
|
classify_meta = {
|
|
"classification_completed": True,
|
|
"document_type": classification_result.get("label"),
|
|
"confidence": classification_result.get("confidence", 0.0),
|
|
"model_used": OLLAMA_MODEL
|
|
}
|
|
|
|
ctx["classify_meta"] = classify_meta
|
|
|
|
logger.info(f"Classification terminée pour le document {doc_id}: {classification_result.get('label')} (confiance: {classification_result.get('confidence', 0.0):.2f})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la classification du document {doc_id}: {e}")
|
|
raise
|
|
|
|
def _classify_with_ollama(text: str) -> Dict[str, Any]:
|
|
"""
|
|
Classification du document avec Ollama
|
|
"""
|
|
try:
|
|
# Chargement du prompt de classification
|
|
prompt = _load_classification_prompt()
|
|
|
|
# Remplacement du placeholder par le texte
|
|
full_prompt = prompt.replace("{{TEXT}}", text)
|
|
|
|
# Appel à l'API Ollama
|
|
payload = {
|
|
"model": OLLAMA_MODEL,
|
|
"prompt": full_prompt,
|
|
"stream": False,
|
|
"options": {
|
|
"temperature": 0.1, # Faible température pour plus de cohérence
|
|
"top_p": 0.9,
|
|
"max_tokens": 500
|
|
}
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{OLLAMA_BASE_URL}/api/generate",
|
|
json=payload,
|
|
timeout=120
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise RuntimeError(f"Erreur API Ollama: {response.status_code} - {response.text}")
|
|
|
|
result = response.json()
|
|
|
|
# Parsing de la réponse JSON
|
|
try:
|
|
classification_data = json.loads(result["response"])
|
|
except json.JSONDecodeError:
|
|
# Fallback si la réponse n'est pas du JSON valide
|
|
classification_data = _parse_fallback_response(result["response"])
|
|
|
|
return classification_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la classification avec Ollama: {e}")
|
|
# Classification par défaut en cas d'erreur
|
|
return {
|
|
"label": "document_inconnu",
|
|
"confidence": 0.0,
|
|
"error": str(e)
|
|
}
|
|
|
|
def _load_classification_prompt() -> str:
|
|
"""
|
|
Chargement du prompt de classification
|
|
"""
|
|
prompt_path = "/app/models/prompts/classify_prompt.txt"
|
|
|
|
try:
|
|
if os.path.exists(prompt_path):
|
|
with open(prompt_path, 'r', encoding='utf-8') as f:
|
|
return f.read()
|
|
except Exception as e:
|
|
logger.warning(f"Impossible de charger le prompt de classification: {e}")
|
|
|
|
# Prompt par défaut
|
|
return """
|
|
Tu es un expert en droit notarial. Analyse le texte suivant et classe le document selon les catégories suivantes :
|
|
|
|
CATÉGORIES POSSIBLES :
|
|
- acte_vente : Acte de vente immobilière
|
|
- acte_achat : Acte d'achat immobilière
|
|
- donation : Acte de donation
|
|
- testament : Testament
|
|
- succession : Acte de succession
|
|
- contrat_mariage : Contrat de mariage
|
|
- procuration : Procuration
|
|
- attestation : Attestation
|
|
- facture : Facture notariale
|
|
- document_inconnu : Document non classifiable
|
|
|
|
TEXTE À ANALYSER :
|
|
{{TEXT}}
|
|
|
|
Réponds UNIQUEMENT avec un JSON valide contenant :
|
|
{
|
|
"label": "catégorie_choisie",
|
|
"confidence": 0.95,
|
|
"reasoning": "explication_courte"
|
|
}
|
|
|
|
La confiance doit être entre 0.0 et 1.0.
|
|
"""
|
|
|
|
def _parse_fallback_response(response_text: str) -> Dict[str, Any]:
|
|
"""
|
|
Parsing de fallback si la réponse n'est pas du JSON valide
|
|
"""
|
|
# Recherche de mots-clés dans la réponse
|
|
response_lower = response_text.lower()
|
|
|
|
if "vente" in response_lower or "vendu" in response_lower:
|
|
return {"label": "acte_vente", "confidence": 0.7, "reasoning": "Mots-clés de vente détectés"}
|
|
elif "achat" in response_lower or "acheté" in response_lower:
|
|
return {"label": "acte_achat", "confidence": 0.7, "reasoning": "Mots-clés d'achat détectés"}
|
|
elif "donation" in response_lower or "donné" in response_lower:
|
|
return {"label": "donation", "confidence": 0.7, "reasoning": "Mots-clés de donation détectés"}
|
|
elif "testament" in response_lower:
|
|
return {"label": "testament", "confidence": 0.7, "reasoning": "Mots-clés de testament détectés"}
|
|
elif "succession" in response_lower or "héritage" in response_lower:
|
|
return {"label": "succession", "confidence": 0.7, "reasoning": "Mots-clés de succession détectés"}
|
|
else:
|
|
return {"label": "document_inconnu", "confidence": 0.3, "reasoning": "Classification par défaut"}
|
|
|
|
def get_document_type_features(text: str) -> Dict[str, Any]:
|
|
"""
|
|
Extraction de caractéristiques pour la classification
|
|
"""
|
|
features = {
|
|
"has_dates": len(_extract_dates(text)) > 0,
|
|
"has_amounts": len(_extract_amounts(text)) > 0,
|
|
"has_addresses": _has_addresses(text),
|
|
"has_personal_names": _has_personal_names(text),
|
|
"text_length": len(text),
|
|
"word_count": len(text.split())
|
|
}
|
|
|
|
return features
|
|
|
|
def _extract_dates(text: str) -> list:
|
|
"""Extraction des dates du texte"""
|
|
import re
|
|
date_patterns = [
|
|
r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b',
|
|
r'\b\d{1,2}\s+(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)\s+\d{2,4}\b'
|
|
]
|
|
|
|
dates = []
|
|
for pattern in date_patterns:
|
|
dates.extend(re.findall(pattern, text, re.IGNORECASE))
|
|
|
|
return dates
|
|
|
|
def _extract_amounts(text: str) -> list:
|
|
"""Extraction des montants du texte"""
|
|
import re
|
|
amount_patterns = [
|
|
r'\b\d{1,3}(?:\s\d{3})*(?:[.,]\d{2})?\s*€\b',
|
|
r'\b\d{1,3}(?:\s\d{3})*(?:[.,]\d{2})?\s*euros?\b'
|
|
]
|
|
|
|
amounts = []
|
|
for pattern in amount_patterns:
|
|
amounts.extend(re.findall(pattern, text, re.IGNORECASE))
|
|
|
|
return amounts
|
|
|
|
def _has_addresses(text: str) -> bool:
|
|
"""Détection de la présence d'adresses"""
|
|
import re
|
|
address_indicators = [
|
|
r'\b(?:rue|avenue|boulevard|place|chemin|impasse)\b',
|
|
r'\b\d{5}\b', # Code postal
|
|
r'\b(?:Paris|Lyon|Marseille|Toulouse|Nice|Nantes|Strasbourg|Montpellier|Bordeaux|Lille)\b'
|
|
]
|
|
|
|
for pattern in address_indicators:
|
|
if re.search(pattern, text, re.IGNORECASE):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _has_personal_names(text: str) -> bool:
|
|
"""Détection de la présence de noms de personnes"""
|
|
import re
|
|
name_indicators = [
|
|
r'\b(?:Monsieur|Madame|Mademoiselle|M\.|Mme\.|Mlle\.)\s+[A-Z][a-z]+',
|
|
r'\b[A-Z][a-z]+\s+[A-Z][a-z]+\b' # Prénom Nom
|
|
]
|
|
|
|
for pattern in name_indicators:
|
|
if re.search(pattern, text):
|
|
return True
|
|
|
|
return False
|