""" Moteur de vérification et calcul du score de vraisemblance """ import logging import re from typing import Dict, Any, List, Optional from dataclasses import dataclass from datetime import datetime import math logger = logging.getLogger(__name__) @dataclass class VerificationRule: """Règle de vérification""" name: str weight: float description: str validator: callable @dataclass class VerificationResult: """Résultat d'une vérification""" rule_name: str passed: bool score: float message: str details: Dict[str, Any] class VerificationEngine: """Moteur de vérification et calcul du score de vraisemblance""" def __init__(self): self.rules = self._initialize_verification_rules() self.weights = self._initialize_weights() def _initialize_verification_rules(self) -> List[VerificationRule]: """ Initialisation des règles de vérification """ return [ # Règles de cohérence générale VerificationRule( name="coherence_generale", weight=0.2, description="Cohérence générale du document", validator=self._validate_general_coherence ), # Règles de format et structure VerificationRule( name="format_document", weight=0.15, description="Format et structure du document", validator=self._validate_document_format ), # Règles d'entités VerificationRule( name="entites_completes", weight=0.2, description="Complétude des entités extraites", validator=self._validate_entities_completeness ), # Règles de vérifications externes VerificationRule( name="verifications_externes", weight=0.25, description="Cohérence avec les vérifications externes", validator=self._validate_external_verifications ), # Règles spécifiques au type de document VerificationRule( name="specificite_type", weight=0.2, description="Spécificité au type de document", validator=self._validate_document_specificity ) ] def _initialize_weights(self) -> Dict[str, float]: """ Poids des différents éléments dans le calcul du score """ return { "ocr_confidence": 0.15, "classification_confidence": 0.2, "entities_quality": 0.25, "external_verifications": 0.25, "coherence_rules": 0.15 } async def calculate_credibility_score( self, ocr_result: Dict[str, Any], classification_result: Dict[str, Any], entities: Dict[str, Any], verifications: Dict[str, Any] ) -> float: """ Calcul du score de vraisemblance global """ logger.info("Calcul du score de vraisemblance") try: # 1. Score basé sur la confiance OCR ocr_score = self._calculate_ocr_score(ocr_result) # 2. Score basé sur la classification classification_score = self._calculate_classification_score(classification_result) # 3. Score basé sur la qualité des entités entities_score = self._calculate_entities_score(entities) # 4. Score basé sur les vérifications externes verifications_score = self._calculate_verifications_score(verifications) # 5. Score basé sur les règles de cohérence coherence_score = self._calculate_coherence_score( ocr_result, classification_result, entities, verifications ) # 6. Calcul du score final pondéré final_score = ( ocr_score * self.weights["ocr_confidence"] + classification_score * self.weights["classification_confidence"] + entities_score * self.weights["entities_quality"] + verifications_score * self.weights["external_verifications"] + coherence_score * self.weights["coherence_rules"] ) # 7. Application de pénalités final_score = self._apply_penalties(final_score, ocr_result, entities, verifications) # 8. Normalisation finale final_score = max(0.0, min(1.0, final_score)) logger.info(f"Score de vraisemblance calculé: {final_score:.3f}") return final_score except Exception as e: logger.error(f"Erreur lors du calcul du score: {e}") return 0.0 def _calculate_ocr_score(self, ocr_result: Dict[str, Any]) -> float: """ Calcul du score basé sur la qualité OCR """ confidence = ocr_result.get("confidence", 0.0) word_count = ocr_result.get("word_count", 0) # Score de base basé sur la confiance base_score = confidence / 100.0 if confidence > 100 else confidence # Bonus pour un nombre de mots raisonnable if 50 <= word_count <= 2000: word_bonus = 0.1 elif word_count < 50: word_bonus = -0.2 # Pénalité pour texte trop court else: word_bonus = 0.0 return max(0.0, min(1.0, base_score + word_bonus)) def _calculate_classification_score(self, classification_result: Dict[str, Any]) -> float: """ Calcul du score basé sur la classification """ confidence = classification_result.get("confidence", 0.0) method = classification_result.get("method", "") # Score de base base_score = confidence # Bonus selon la méthode if method == "merged": method_bonus = 0.1 # Accord entre méthodes elif method == "llm": method_bonus = 0.05 # LLM seul else: method_bonus = 0.0 return max(0.0, min(1.0, base_score + method_bonus)) def _calculate_entities_score(self, entities: Dict[str, Any]) -> float: """ Calcul du score basé sur la qualité des entités """ total_entities = 0 total_confidence = 0.0 for entity_type, entity_list in entities.items(): if isinstance(entity_list, list): for entity in entity_list: if isinstance(entity, dict): total_entities += 1 confidence = entity.get("confidence", 0.5) total_confidence += confidence if total_entities == 0: return 0.0 avg_confidence = total_confidence / total_entities # Bonus pour la diversité des entités entity_types = len([k for k, v in entities.items() if isinstance(v, list) and len(v) > 0]) diversity_bonus = min(0.1, entity_types * 0.02) return max(0.0, min(1.0, avg_confidence + diversity_bonus)) def _calculate_verifications_score(self, verifications: Dict[str, Any]) -> float: """ Calcul du score basé sur les vérifications externes """ if not verifications: return 0.5 # Score neutre si pas de vérifications total_verifications = 0 positive_verifications = 0 total_confidence = 0.0 for service, result in verifications.items(): if isinstance(result, dict): total_verifications += 1 status = result.get("status", "error") confidence = result.get("confidence", 0.0) if status == "verified": positive_verifications += 1 total_confidence += confidence elif status == "not_found": total_confidence += 0.3 # Score neutre else: total_confidence += 0.1 # Score faible if total_verifications == 0: return 0.5 # Score basé sur le ratio de vérifications positives verification_ratio = positive_verifications / total_verifications # Score basé sur la confiance moyenne avg_confidence = total_confidence / total_verifications # Combinaison des scores final_score = (verification_ratio * 0.6 + avg_confidence * 0.4) return max(0.0, min(1.0, final_score)) def _calculate_coherence_score( self, ocr_result: Dict[str, Any], classification_result: Dict[str, Any], entities: Dict[str, Any], verifications: Dict[str, Any] ) -> float: """ Calcul du score de cohérence basé sur les règles """ total_score = 0.0 total_weight = 0.0 for rule in self.rules: try: result = rule.validator(ocr_result, classification_result, entities, verifications) total_score += result.score * rule.weight total_weight += rule.weight except Exception as e: logger.error(f"Erreur dans la règle {rule.name}: {e}") # Score neutre en cas d'erreur total_score += 0.5 * rule.weight total_weight += rule.weight return total_score / total_weight if total_weight > 0 else 0.5 def _validate_general_coherence( self, ocr_result: Dict[str, Any], classification_result: Dict[str, Any], entities: Dict[str, Any], verifications: Dict[str, Any] ) -> VerificationResult: """ Validation de la cohérence générale """ score = 0.5 issues = [] # Vérification de la cohérence entre classification et entités doc_type = classification_result.get("type", "") entities_count = sum(len(v) for v in entities.values() if isinstance(v, list)) if doc_type == "acte_vente" and entities_count < 3: issues.append("Acte de vente avec peu d'entités") score -= 0.2 if doc_type == "cni" and "identites" not in entities: issues.append("CNI sans identité extraite") score -= 0.3 return VerificationResult( rule_name="coherence_generale", passed=score >= 0.5, score=max(0.0, score), message="Cohérence générale" + (" OK" if score >= 0.5 else " - Problèmes détectés"), details={"issues": issues} ) def _validate_document_format( self, ocr_result: Dict[str, Any], classification_result: Dict[str, Any], entities: Dict[str, Any], verifications: Dict[str, Any] ) -> VerificationResult: """ Validation du format du document """ score = 0.5 issues = [] text = ocr_result.get("text", "") # Vérification de la présence d'éléments structurants if not re.search(r'\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{4}', text): issues.append("Aucune date détectée") score -= 0.1 if not re.search(r'[A-Z]{2,}', text): issues.append("Aucun nom en majuscules détecté") score -= 0.1 if len(text.split()) < 20: issues.append("Texte trop court") score -= 0.2 return VerificationResult( rule_name="format_document", passed=score >= 0.5, score=max(0.0, score), message="Format document" + (" OK" if score >= 0.5 else " - Problèmes détectés"), details={"issues": issues} ) def _validate_entities_completeness( self, ocr_result: Dict[str, Any], classification_result: Dict[str, Any], entities: Dict[str, Any], verifications: Dict[str, Any] ) -> VerificationResult: """ Validation de la complétude des entités """ score = 0.5 issues = [] doc_type = classification_result.get("type", "") # Vérifications spécifiques par type if doc_type == "acte_vente": if not entities.get("identites"): issues.append("Aucune identité extraite") score -= 0.3 if not entities.get("adresses"): issues.append("Aucune adresse extraite") score -= 0.2 if not entities.get("montants"): issues.append("Aucun montant extrait") score -= 0.2 elif doc_type == "cni": if not entities.get("identites"): issues.append("Aucune identité extraite") score -= 0.4 if not entities.get("dates"): issues.append("Aucune date de naissance extraite") score -= 0.3 # Bonus pour la diversité entity_types = len([k for k, v in entities.items() if isinstance(v, list) and len(v) > 0]) if entity_types >= 3: score += 0.1 return VerificationResult( rule_name="entites_completes", passed=score >= 0.5, score=max(0.0, score), message="Entités" + (" OK" if score >= 0.5 else " - Incomplètes"), details={"issues": issues, "entity_types": entity_types} ) def _validate_external_verifications( self, ocr_result: Dict[str, Any], classification_result: Dict[str, Any], entities: Dict[str, Any], verifications: Dict[str, Any] ) -> VerificationResult: """ Validation des vérifications externes """ score = 0.5 issues = [] if not verifications: issues.append("Aucune vérification externe") score -= 0.2 return VerificationResult( rule_name="verifications_externes", passed=False, score=score, message="Vérifications externes - Aucune", details={"issues": issues} ) # Analyse des résultats de vérification verified_count = 0 error_count = 0 for service, result in verifications.items(): if isinstance(result, dict): status = result.get("status", "error") if status == "verified": verified_count += 1 elif status == "error": error_count += 1 total_verifications = len(verifications) if total_verifications > 0: verification_ratio = verified_count / total_verifications error_ratio = error_count / total_verifications score = verification_ratio - (error_ratio * 0.3) if error_ratio > 0.5: issues.append("Trop d'erreurs de vérification") return VerificationResult( rule_name="verifications_externes", passed=score >= 0.5, score=max(0.0, score), message=f"Vérifications externes - {verified_count}/{total_verifications} OK", details={"verified": verified_count, "errors": error_count, "issues": issues} ) def _validate_document_specificity( self, ocr_result: Dict[str, Any], classification_result: Dict[str, Any], entities: Dict[str, Any], verifications: Dict[str, Any] ) -> VerificationResult: """ Validation de la spécificité au type de document """ score = 0.5 issues = [] doc_type = classification_result.get("type", "") text = ocr_result.get("text", "").lower() # Vérifications spécifiques par type if doc_type == "acte_vente": if "vendeur" not in text and "acheteur" not in text: issues.append("Acte de vente sans vendeur/acheteur") score -= 0.3 if "prix" not in text and "euro" not in text: issues.append("Acte de vente sans prix") score -= 0.2 elif doc_type == "cni": if "république française" not in text: issues.append("CNI sans mention République Française") score -= 0.2 if "carte" not in text and "identité" not in text: issues.append("CNI sans mention carte d'identité") score -= 0.3 elif doc_type == "acte_succession": if "héritier" not in text and "succession" not in text: issues.append("Acte de succession sans mention héritier/succession") score -= 0.3 return VerificationResult( rule_name="specificite_type", passed=score >= 0.5, score=max(0.0, score), message="Spécificité type" + (" OK" if score >= 0.5 else " - Problèmes détectés"), details={"issues": issues} ) def _apply_penalties( self, score: float, ocr_result: Dict[str, Any], entities: Dict[str, Any], verifications: Dict[str, Any] ) -> float: """ Application de pénalités spécifiques """ penalties = 0.0 # Pénalité pour OCR de mauvaise qualité ocr_confidence = ocr_result.get("confidence", 0.0) if ocr_confidence < 50: penalties += 0.2 elif ocr_confidence < 70: penalties += 0.1 # Pénalité pour peu d'entités total_entities = sum(len(v) for v in entities.values() if isinstance(v, list)) if total_entities < 2: penalties += 0.15 # Pénalité pour erreurs de vérification if verifications: error_count = sum(1 for v in verifications.values() if isinstance(v, dict) and v.get("status") == "error") if error_count > 0: penalties += min(0.2, error_count * 0.05) return score - penalties async def get_detailed_verification_report( self, ocr_result: Dict[str, Any], classification_result: Dict[str, Any], entities: Dict[str, Any], verifications: Dict[str, Any] ) -> Dict[str, Any]: """ Génération d'un rapport détaillé de vérification """ report = { "score_global": 0.0, "scores_composants": {}, "verifications_detaillees": [], "recommandations": [] } try: # Calcul des scores composants report["scores_composants"] = { "ocr": self._calculate_ocr_score(ocr_result), "classification": self._calculate_classification_score(classification_result), "entites": self._calculate_entities_score(entities), "verifications_externes": self._calculate_verifications_score(verifications), "coherence": self._calculate_coherence_score(ocr_result, classification_result, entities, verifications) } # Exécution des vérifications détaillées for rule in self.rules: try: result = rule.validator(ocr_result, classification_result, entities, verifications) report["verifications_detaillees"].append({ "nom": result.rule_name, "passe": result.passed, "score": result.score, "message": result.message, "details": result.details }) except Exception as e: logger.error(f"Erreur dans la règle {rule.name}: {e}") # Calcul du score global report["score_global"] = await self.calculate_credibility_score( ocr_result, classification_result, entities, verifications ) # Génération de recommandations report["recommandations"] = self._generate_recommendations(report) except Exception as e: logger.error(f"Erreur lors de la génération du rapport: {e}") report["error"] = str(e) return report def _generate_recommendations(self, report: Dict[str, Any]) -> List[str]: """ Génération de recommandations basées sur le rapport """ recommendations = [] scores = report.get("scores_composants", {}) if scores.get("ocr", 1.0) < 0.7: recommendations.append("Améliorer la qualité de l'image pour un meilleur OCR") if scores.get("entites", 1.0) < 0.6: recommendations.append("Vérifier l'extraction des entités") if scores.get("verifications_externes", 1.0) < 0.5: recommendations.append("Effectuer des vérifications externes supplémentaires") verifications = report.get("verifications_detaillees", []) for verification in verifications: if not verification["passe"]: recommendations.append(f"Corriger: {verification['message']}") if not recommendations: recommendations.append("Document de bonne qualité, traitement standard recommandé") return recommendations