
- API FastAPI complète pour le traitement de documents notariaux - Pipeline OCR avec correction lexicale notariale - Classification automatique des documents (règles + LLM) - Extraction d'entités (identités, adresses, biens, montants) - Intégration de 6 APIs externes (Cadastre, Géorisques, BODACC, etc.) - Système de vérification et score de vraisemblance - Analyse contextuelle via LLM (Ollama) - Interface web moderne avec drag & drop - Tests complets et documentation exhaustive - Scripts de déploiement automatisés Types de documents supportés: - Acte de vente, donation, succession - CNI avec détection du pays - Contrats divers Fonctionnalités: - Upload et traitement asynchrone - Vérifications externes automatiques - Score de vraisemblance (0-1) - Recommandations personnalisées - Tableaux de bord et statistiques Prêt pour la production avec démarrage en une commande.
331 lines
11 KiB
Python
331 lines
11 KiB
Python
"""
|
|
Processeur OCR spécialisé pour les documents notariaux
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import tempfile
|
|
import subprocess
|
|
import json
|
|
from typing import Dict, Any, Optional
|
|
from pathlib import Path
|
|
import re
|
|
|
|
from PIL import Image
|
|
import pytesseract
|
|
import cv2
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class OCRProcessor:
|
|
"""Processeur OCR avec correction lexicale notariale"""
|
|
|
|
def __init__(self):
|
|
self.notarial_dictionary = self._load_notarial_dictionary()
|
|
self.ocr_config = self._get_ocr_config()
|
|
|
|
def _load_notarial_dictionary(self) -> Dict[str, str]:
|
|
"""
|
|
Charge le dictionnaire de correction lexicale notariale
|
|
"""
|
|
# TODO: Charger depuis ops/seed/dictionaries/ocr_fr_notarial.txt
|
|
return {
|
|
# Corrections courantes en notariat
|
|
"notaire": "notaire",
|
|
"étude": "étude",
|
|
"acte": "acte",
|
|
"vente": "vente",
|
|
"donation": "donation",
|
|
"succession": "succession",
|
|
"héritier": "héritier",
|
|
"héritiers": "héritiers",
|
|
"parcelle": "parcelle",
|
|
"commune": "commune",
|
|
"département": "département",
|
|
"euro": "euro",
|
|
"euros": "euros",
|
|
"francs": "francs",
|
|
"franc": "franc",
|
|
# Corrections OCR courantes
|
|
"0": "O", # O majuscule confondu avec 0
|
|
"1": "I", # I majuscule confondu avec 1
|
|
"5": "S", # S confondu avec 5
|
|
"8": "B", # B confondu avec 8
|
|
}
|
|
|
|
def _get_ocr_config(self) -> str:
|
|
"""
|
|
Configuration Tesseract optimisée pour les documents notariaux
|
|
"""
|
|
return "--oem 3 --psm 6 -l fra"
|
|
|
|
async def process_document(self, file_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Traitement OCR complet d'un document
|
|
"""
|
|
logger.info(f"Traitement OCR du fichier: {file_path}")
|
|
|
|
try:
|
|
# 1. Préparation du document
|
|
processed_images = await self._prepare_document(file_path)
|
|
|
|
# 2. OCR sur chaque page
|
|
ocr_results = []
|
|
for i, image in enumerate(processed_images):
|
|
logger.info(f"OCR de la page {i+1}")
|
|
page_result = await self._ocr_page(image, i+1)
|
|
ocr_results.append(page_result)
|
|
|
|
# 3. Fusion du texte
|
|
full_text = self._merge_text(ocr_results)
|
|
|
|
# 4. Correction lexicale
|
|
corrected_text = self._apply_lexical_corrections(full_text)
|
|
|
|
# 5. Post-traitement
|
|
processed_text = self._post_process_text(corrected_text)
|
|
|
|
result = {
|
|
"original_text": full_text,
|
|
"corrected_text": processed_text,
|
|
"text": processed_text, # Texte final
|
|
"pages": ocr_results,
|
|
"confidence": self._calculate_confidence(ocr_results),
|
|
"word_count": len(processed_text.split()),
|
|
"character_count": len(processed_text),
|
|
"processing_metadata": {
|
|
"pages_processed": len(processed_images),
|
|
"corrections_applied": len(full_text) - len(processed_text),
|
|
"language": "fra"
|
|
}
|
|
}
|
|
|
|
logger.info(f"OCR terminé: {result['word_count']} mots, confiance: {result['confidence']:.2f}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du traitement OCR: {e}")
|
|
raise
|
|
|
|
async def _prepare_document(self, file_path: str) -> list:
|
|
"""
|
|
Prépare le document pour l'OCR (conversion PDF en images, amélioration)
|
|
"""
|
|
file_path = Path(file_path)
|
|
images = []
|
|
|
|
if file_path.suffix.lower() == '.pdf':
|
|
# Conversion PDF en images avec ocrmypdf
|
|
images = await self._pdf_to_images(file_path)
|
|
else:
|
|
# Image directe
|
|
image = cv2.imread(str(file_path))
|
|
if image is not None:
|
|
images = [image]
|
|
|
|
# Amélioration des images
|
|
processed_images = []
|
|
for image in images:
|
|
enhanced = self._enhance_image(image)
|
|
processed_images.append(enhanced)
|
|
|
|
return processed_images
|
|
|
|
async def _pdf_to_images(self, pdf_path: Path) -> list:
|
|
"""
|
|
Convertit un PDF en images avec ocrmypdf
|
|
"""
|
|
images = []
|
|
|
|
try:
|
|
# Utilisation d'ocrmypdf pour une meilleure qualité
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
output_pdf = Path(temp_dir) / "output.pdf"
|
|
|
|
# Commande ocrmypdf
|
|
cmd = [
|
|
"ocrmypdf",
|
|
"--force-ocr",
|
|
"--output-type", "pdf",
|
|
"--language", "fra",
|
|
str(pdf_path),
|
|
str(output_pdf)
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode == 0:
|
|
# Conversion en images avec pdf2image
|
|
from pdf2image import convert_from_path
|
|
pdf_images = convert_from_path(str(output_pdf), dpi=300)
|
|
|
|
for img in pdf_images:
|
|
# Conversion PIL vers OpenCV
|
|
img_cv = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
|
|
images.append(img_cv)
|
|
else:
|
|
logger.warning(f"ocrmypdf échoué, utilisation de pdf2image: {result.stderr}")
|
|
# Fallback avec pdf2image
|
|
from pdf2image import convert_from_path
|
|
pdf_images = convert_from_path(str(pdf_path), dpi=300)
|
|
|
|
for img in pdf_images:
|
|
img_cv = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
|
|
images.append(img_cv)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la conversion PDF: {e}")
|
|
raise
|
|
|
|
return images
|
|
|
|
def _enhance_image(self, image: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Améliore la qualité de l'image pour l'OCR
|
|
"""
|
|
# Conversion en niveaux de gris
|
|
if len(image.shape) == 3:
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
else:
|
|
gray = image
|
|
|
|
# Dénuage
|
|
denoised = cv2.fastNlMeansDenoising(gray)
|
|
|
|
# Amélioration du contraste
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
|
|
enhanced = clahe.apply(denoised)
|
|
|
|
# Binarisation adaptative
|
|
binary = cv2.adaptiveThreshold(
|
|
enhanced, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2
|
|
)
|
|
|
|
# Morphologie pour nettoyer
|
|
kernel = np.ones((1,1), np.uint8)
|
|
cleaned = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
|
|
|
|
return cleaned
|
|
|
|
async def _ocr_page(self, image: np.ndarray, page_num: int) -> Dict[str, Any]:
|
|
"""
|
|
OCR d'une page avec Tesseract
|
|
"""
|
|
try:
|
|
# OCR avec Tesseract
|
|
text = pytesseract.image_to_string(image, config=self.ocr_config)
|
|
|
|
# Détails de confiance
|
|
data = pytesseract.image_to_data(image, config=self.ocr_config, output_type=pytesseract.Output.DICT)
|
|
|
|
# Calcul de la confiance moyenne
|
|
confidences = [int(conf) for conf in data['conf'] if int(conf) > 0]
|
|
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
|
|
|
|
# Extraction des mots avec positions
|
|
words = []
|
|
for i in range(len(data['text'])):
|
|
if int(data['conf'][i]) > 0:
|
|
words.append({
|
|
'text': data['text'][i],
|
|
'confidence': int(data['conf'][i]),
|
|
'bbox': {
|
|
'x': data['left'][i],
|
|
'y': data['top'][i],
|
|
'width': data['width'][i],
|
|
'height': data['height'][i]
|
|
}
|
|
})
|
|
|
|
return {
|
|
'page': page_num,
|
|
'text': text.strip(),
|
|
'confidence': avg_confidence,
|
|
'word_count': len(words),
|
|
'words': words
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur OCR page {page_num}: {e}")
|
|
return {
|
|
'page': page_num,
|
|
'text': '',
|
|
'confidence': 0,
|
|
'word_count': 0,
|
|
'words': [],
|
|
'error': str(e)
|
|
}
|
|
|
|
def _merge_text(self, ocr_results: list) -> str:
|
|
"""
|
|
Fusionne le texte de toutes les pages
|
|
"""
|
|
texts = []
|
|
for result in ocr_results:
|
|
if result['text']:
|
|
texts.append(result['text'])
|
|
|
|
return '\n\n'.join(texts)
|
|
|
|
def _apply_lexical_corrections(self, text: str) -> str:
|
|
"""
|
|
Applique les corrections lexicales notariales
|
|
"""
|
|
corrected_text = text
|
|
|
|
# Corrections du dictionnaire
|
|
for wrong, correct in self.notarial_dictionary.items():
|
|
# Remplacement insensible à la casse
|
|
pattern = re.compile(re.escape(wrong), re.IGNORECASE)
|
|
corrected_text = pattern.sub(correct, corrected_text)
|
|
|
|
# Corrections contextuelles spécifiques
|
|
corrected_text = self._apply_contextual_corrections(corrected_text)
|
|
|
|
return corrected_text
|
|
|
|
def _apply_contextual_corrections(self, text: str) -> str:
|
|
"""
|
|
Corrections contextuelles spécifiques au notariat
|
|
"""
|
|
# Correction des montants
|
|
text = re.sub(r'(\d+)\s*euros?', r'\1 euros', text, flags=re.IGNORECASE)
|
|
text = re.sub(r'(\d+)\s*francs?', r'\1 francs', text, flags=re.IGNORECASE)
|
|
|
|
# Correction des dates
|
|
text = re.sub(r'(\d{1,2})/(\d{1,2})/(\d{4})', r'\1/\2/\3', text)
|
|
|
|
# Correction des adresses
|
|
text = re.sub(r'(\d+)\s*rue\s+de\s+la\s+paix', r'\1 rue de la Paix', text, flags=re.IGNORECASE)
|
|
|
|
# Correction des noms propres (première lettre en majuscule)
|
|
text = re.sub(r'\b([a-z])([a-z]+)\b', lambda m: m.group(1).upper() + m.group(2).lower(), text)
|
|
|
|
return text
|
|
|
|
def _post_process_text(self, text: str) -> str:
|
|
"""
|
|
Post-traitement du texte extrait
|
|
"""
|
|
# Suppression des espaces multiples
|
|
text = re.sub(r'\s+', ' ', text)
|
|
|
|
# Suppression des lignes vides multiples
|
|
text = re.sub(r'\n\s*\n', '\n\n', text)
|
|
|
|
# Nettoyage des caractères de contrôle
|
|
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text)
|
|
|
|
return text.strip()
|
|
|
|
def _calculate_confidence(self, ocr_results: list) -> float:
|
|
"""
|
|
Calcule la confiance globale de l'OCR
|
|
"""
|
|
if not ocr_results:
|
|
return 0.0
|
|
|
|
total_confidence = sum(result['confidence'] for result in ocr_results)
|
|
return total_confidence / len(ocr_results)
|