4NK_IA_back/services/worker/utils/text_normalize.py
root 5d8ad901d1 Initial commit: Pipeline notarial complet
- Infrastructure complète de traitement de documents notariaux
- API FastAPI d'ingestion et d'orchestration
- Pipelines Celery pour le traitement asynchrone
- Support des formats PDF, JPEG, PNG, TIFF, HEIC
- OCR avec Tesseract et correction lexicale
- Classification automatique des documents avec Ollama
- Extraction de données structurées
- Indexation dans AnythingLLM et OpenSearch
- Système de vérifications et contrôles métier
- Base de données PostgreSQL pour le métier
- Stockage objet avec MinIO
- Base de données graphe Neo4j
- Recherche plein-texte avec OpenSearch
- Supervision avec Prometheus et Grafana
- Scripts d'installation pour Debian
- Documentation complète
- Tests unitaires et de performance
- Service systemd pour le déploiement
- Scripts de déploiement automatisés
2025-09-08 22:05:22 +02:00

169 lines
5.1 KiB
Python

"""
Utilitaires de normalisation et correction de texte pour le domaine notarial
"""
import re
import os
from typing import Dict, List
def correct_notarial_text(text: str, dict_path: str = "/seed/dictionaries/ocr_fr_notarial.txt") -> str:
"""
Correction lexicale du texte OCR pour le domaine notarial
"""
if not text:
return text
# Chargement du dictionnaire de corrections
corrections = _load_corrections_dict(dict_path)
# Normalisation de base
text = _normalize_whitespace(text)
text = _fix_common_ocr_errors(text)
# Application des corrections spécifiques au notariat
text = _apply_notarial_corrections(text, corrections)
# Correction des abréviations courantes
text = _expand_notarial_abbreviations(text)
return text
def _load_corrections_dict(dict_path: str) -> Dict[str, str]:
"""
Chargement du dictionnaire de corrections
"""
corrections = {}
try:
if os.path.exists(dict_path):
with open(dict_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
parts = line.split('|')
if len(parts) == 2:
corrections[parts[0].strip()] = parts[1].strip()
except Exception as e:
print(f"Erreur lors du chargement du dictionnaire de corrections: {e}")
return corrections
def _normalize_whitespace(text: str) -> str:
"""
Normalisation des espaces blancs
"""
# Remplacement des espaces multiples par un seul
text = re.sub(r'\s+', ' ', text)
# Suppression des espaces en début et fin
text = text.strip()
# Correction des retours à la ligne
text = re.sub(r'\n\s*\n', '\n\n', text)
return text
def _fix_common_ocr_errors(text: str) -> str:
"""
Correction des erreurs OCR courantes
"""
# Corrections courantes
common_fixes = {
# Caractères mal reconnus
'0': 'O', # O majuscule confondu avec 0
'1': 'l', # l minuscule confondu avec 1
'5': 'S', # S majuscule confondu avec 5
'8': 'B', # B majuscule confondu avec 8
# Mots courants mal reconnus
'acte': 'acte',
'notaire': 'notaire',
'étude': 'étude',
'client': 'client',
'vendeur': 'vendeur',
'acheteur': 'acheteur',
'propriété': 'propriété',
'vente': 'vente',
'achat': 'achat',
'donation': 'donation',
'testament': 'testament',
'succession': 'succession',
}
for wrong, correct in common_fixes.items():
text = text.replace(wrong, correct)
return text
def _apply_notarial_corrections(text: str, corrections: Dict[str, str]) -> str:
"""
Application des corrections spécifiques au notariat
"""
for wrong, correct in corrections.items():
# Remplacement insensible à la casse
text = re.sub(re.escape(wrong), correct, text, flags=re.IGNORECASE)
return text
def _expand_notarial_abbreviations(text: str) -> str:
"""
Expansion des abréviations courantes du notariat
"""
abbreviations = {
r'\bM\.\s*': 'Monsieur ',
r'\bMme\.\s*': 'Madame ',
r'\bMlle\.\s*': 'Mademoiselle ',
r'\bDr\.\s*': 'Docteur ',
r'\bPr\.\s*': 'Professeur ',
r'\bSt\.\s*': 'Saint ',
r'\bSte\.\s*': 'Sainte ',
r'\bBd\.\s*': 'Boulevard ',
r'\bAv\.\s*': 'Avenue ',
r'\bR\.\s*': 'Rue ',
r'\bPl\.\s*': 'Place ',
r'\bCh\.\s*': 'Chemin ',
r'\bImp\.\s*': 'Impasse ',
r'\bN°\s*': 'Numéro ',
r'\b°C\b': 'degrés Celsius',
r'\b€\s*': 'euros ',
r'\b€\b': 'euros',
}
for pattern, replacement in abbreviations.items():
text = re.sub(pattern, replacement, text)
return text
def extract_dates(text: str) -> List[str]:
"""
Extraction des dates du texte
"""
date_patterns = [
r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', # DD/MM/YYYY ou DD-MM-YYYY
r'\b\d{1,2}\s+(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)\s+\d{2,4}\b', # DD mois YYYY
r'\b(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)\s+\d{2,4}\b', # mois YYYY
]
dates = []
for pattern in date_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
dates.extend(matches)
return list(set(dates)) # Suppression des doublons
def extract_amounts(text: str) -> List[str]:
"""
Extraction des montants du texte
"""
amount_patterns = [
r'\b\d{1,3}(?:\s\d{3})*(?:[.,]\d{2})?\s*€\b', # Montants en euros
r'\b\d{1,3}(?:\s\d{3})*(?:[.,]\d{2})?\s*euros?\b', # Montants en euros (texte)
r'\b\d{1,3}(?:\s\d{3})*(?:[.,]\d{2})?\s*F\b', # Montants en francs
]
amounts = []
for pattern in amount_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
amounts.extend(matches)
return list(set(amounts)) # Suppression des doublons