From 8c089127af549b77f4a8a706e7bd348ed55f8a8c Mon Sep 17 00:00:00 2001 From: Nicolas Cantu Date: Wed, 10 Sep 2025 18:45:50 +0200 Subject: [PATCH] =?UTF-8?q?docs:=20Mise=20=C3=A0=20jour=20compl=C3=A8te=20?= =?UTF-8?q?de=20la=20documentation=20v1.2.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Mise à jour du README.md avec les nouvelles fonctionnalités - Documentation API mise à jour avec les intégrations externes - Guide d'installation avec bootstrap automatisé - Architecture mise à jour avec Celery et intégrations - CHANGELOG détaillé avec toutes les nouvelles fonctionnalités - Nouvelle documentation des fonctionnalités v1.2.0 Nouvelles sections documentées: - Pipeline de traitement asynchrone avec Celery - Intégrations avec APIs externes (Cadastre, Géorisques, BODACC, etc.) - Clients d'intégration (AnythingLLM, Neo4j, OpenSearch) - Configuration d'environnement centralisée - Script bootstrap automatisé - Monitoring et observabilité - Exemples d'utilisation et API --- CHANGELOG.md | 39 +-- README.md | 68 +++-- docs/API-NOTARIALE.md | 40 +-- docs/ARCHITECTURE.md | 12 +- docs/INSTALLATION.md | 31 +++ docs/NEW-FEATURES-v1.2.0.md | 289 ++++++++++++++++++++ services/worker/tasks/indexing_tasks.py | 8 +- services/worker/tasks/verification_tasks.py | 28 +- services/worker/utils/anythingllm_client.py | 126 ++++----- services/worker/utils/external_apis.py | 128 ++++----- services/worker/utils/neo4j_client.py | 112 ++++---- services/worker/utils/opensearch_client.py | 120 ++++---- 12 files changed, 680 insertions(+), 321 deletions(-) create mode 100644 docs/NEW-FEATURES-v1.2.0.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 135c5e7..bc51eb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,27 +8,32 @@ et ce projet adhère au [Versioning Sémantique](https://semver.org/lang/fr/). ## [1.2.0] - 2025-01-09 ### Ajouté -- Intégration complète de Celery pour les tâches asynchrones -- Tâches spécialisées pour chaque étape du pipeline (OCR, classification, extraction, indexation, vérification) -- Configuration des queues Celery avec Redis -- Tâches de traitement en lot pour l'efficacité -- Monitoring et health checks des workers -- Fichier d'environnement complet (.env.example et .env) -- Script bootstrap automatisé pour l'initialisation complète -- Orchestration avancée des pipelines avec gestion d'erreurs -- Support des tâches périodiques (Celery Beat) -- Configuration centralisée des workers +- **Intégration complète de Celery** pour les tâches asynchrones avec queues spécialisées +- **Pipelines worker complets** : préprocessing, OCR, classification, extraction, indexation, vérification, finalisation +- **Intégrations avec APIs externes** : Cadastre, Géorisques, BODACC, Infogreffe, RBE +- **Clients d'intégration avancés** : AnythingLLM, Neo4j, OpenSearch +- **Vérifications automatisées** avec calcul du score de vraisemblance +- **OCR avancé** avec préprocessing d'images et correction lexicale notariale +- **Support multi-formats** : PDF, JPEG, PNG, TIFF, HEIC avec conversion automatique +- **Indexation multi-système** : AnythingLLM (sémantique), OpenSearch (plein-texte), Neo4j (graphe) +- **Fichier d'environnement complet** (.env.example et .env) avec toutes les variables +- **Script bootstrap automatisé** pour l'initialisation complète du système +- **Gestion robuste des erreurs** dans tous les pipelines avec fallbacks +- **Support des tâches périodiques** (Celery Beat) pour la maintenance +- **Configuration centralisée** des workers et des services ### Modifié -- Worker principal refactorisé pour utiliser Celery -- Amélioration de la gestion des erreurs dans les pipelines -- Configuration Docker optimisée pour Celery -- Documentation mise à jour avec les nouvelles fonctionnalités +- **Worker principal refactorisé** pour utiliser Celery avec orchestration avancée +- **Amélioration de la gestion des erreurs** dans tous les pipelines +- **Configuration Docker optimisée** pour Celery et les nouvelles dépendances +- **Documentation complètement mise à jour** avec les nouvelles fonctionnalités +- **API version 1.2.0** avec nouvelles fonctionnalités ### Corrigé -- Gestion robuste des erreurs dans les tâches asynchrones -- Amélioration de la scalabilité du système -- Configuration d'environnement centralisée +- **Gestion robuste des erreurs** dans les tâches asynchrones +- **Amélioration de la scalabilité** du système avec Celery +- **Configuration d'environnement centralisée** et automatisée +- **Compatibilité des formats** de documents avec conversion automatique ## [1.1.0] - 2025-01-09 diff --git a/README.md b/README.md index adacb20..c11c428 100644 --- a/README.md +++ b/README.md @@ -6,31 +6,35 @@ Le système 4NK Notariat est une solution complète d'IA pour le traitement auto ## ✨ Fonctionnalités Principales -### 🔍 **Traitement de Documents** -- **OCR Avancé** : Extraction de texte avec correction lexicale notariale -- **Classification Automatique** : Détection du type de document (acte de vente, donation, succession, CNI, etc.) -- **Extraction d'Entités** : Identification automatique des identités, adresses, biens, montants -- **Support Multi-format** : PDF, JPEG, PNG, TIFF, HEIC +### 🔍 **Pipeline de Traitement Avancé** +- **Préprocessing Intelligent** : Validation, conversion et optimisation automatique des documents +- **OCR Avancé** : Extraction de texte avec Tesseract et correction lexicale notariale spécialisée +- **Classification Automatique** : Détection du type de document via LLM (acte de vente, donation, succession, CNI, etc.) +- **Extraction d'Entités** : Identification automatique des identités, adresses, biens, montants, dates +- **Support Multi-format** : PDF, JPEG, PNG, TIFF, HEIC avec conversion automatique +- **Traitement Asynchrone** : Pipeline Celery avec queues spécialisées pour la scalabilité -### 🔗 **Vérifications Externes** -- **Cadastre** : Vérification des parcelles et propriétés -- **Géorisques** : Analyse des risques (inondation, argiles, radon, etc.) -- **BODACC** : Vérification des annonces légales -- **Gel des Avoirs** : Contrôle des sanctions -- **Infogreffe** : Vérification des entreprises -- **RBE** : Bénéficiaires effectifs +### 🔗 **Vérifications Externes Automatisées** +- **API Cadastre** : Vérification des parcelles et propriétés immobilières +- **API Géorisques** : Analyse des risques géologiques (inondation, argiles, radon, etc.) +- **API BODACC** : Vérification des annonces légales et entreprises +- **API Infogreffe** : Recherche d'informations d'entreprises +- **API RBE** : Registre des Bénéficiaires Effectifs +- **Géocodage** : Conversion d'adresses en coordonnées GPS -### 🧠 **Intelligence Artificielle** +### 🧠 **Intelligence Artificielle Intégrée** - **LLM Local** : Analyse contextuelle avec Ollama (Llama 3, Mistral) -- **Score de Vraisemblance** : Évaluation automatique de la cohérence -- **Avis de Synthèse** : Analyse intelligente et recommandations -- **Détection d'Anomalies** : Identification des incohérences +- **Score de Vraisemblance** : Évaluation automatique basée sur les vérifications externes +- **Indexation Sémantique** : AnythingLLM pour la recherche intelligente +- **Graphe de Connaissances** : Neo4j pour les relations entre entités +- **Recherche Plein-texte** : OpenSearch avec analyseur français -### 🌐 **Interface Moderne** -- **Interface Web** : Upload par drag & drop, visualisation des analyses -- **API REST** : Intégration avec les systèmes existants -- **Tableaux de Bord** : Statistiques et monitoring -- **Rapports** : Export des analyses et recommandations +### 🏗️ **Architecture Moderne** +- **API REST** : FastAPI avec documentation automatique +- **Traitement Asynchrone** : Celery avec Redis pour la performance +- **Stockage S3** : MinIO pour la gestion des documents +- **Monitoring** : Prometheus et Grafana pour la supervision +- **Configuration** : Bootstrap automatisé et gestion d'environnement ## 🚀 Démarrage Rapide @@ -40,13 +44,31 @@ Le système 4NK Notariat est une solution complète d'IA pour le traitement auto - Ubuntu/Debian 20.04+ - Python 3.11+ - Docker & Docker Compose -- 8GB RAM minimum (16GB recommandé) -- 50GB espace disque +- 16GB RAM minimum (32GB recommandé pour les modèles LLM) +- 100GB espace disque (pour les modèles et documents) # Dépendances système sudo apt-get update sudo apt-get install -y python3 python3-pip python3-venv docker.io docker-compose sudo apt-get install -y tesseract-ocr tesseract-ocr-fra poppler-utils imagemagick +sudo apt-get install -y wget curl jq +``` + +### Installation Automatisée +```bash +# Cloner le repository +git clone https://git.4nkweb.com/4nk/4NK_IA_back.git +cd 4NK_IA_back + +# Bootstrap automatique (recommandé) +chmod +x ops/bootstrap.sh +./ops/bootstrap.sh + +# Ou installation manuelle +cd infra +cp .env.example .env +# Éditer .env avec vos paramètres +docker-compose up -d ``` ### Installation diff --git a/docs/API-NOTARIALE.md b/docs/API-NOTARIALE.md index 680b908..b735fed 100644 --- a/docs/API-NOTARIALE.md +++ b/docs/API-NOTARIALE.md @@ -13,26 +13,30 @@ L'API Notariale 4NK est un système complet de traitement de documents notariaux - Gestion des tâches asynchrones - Intégration avec les services externes -2. **Pipeline de Traitement** - - OCR avec correction lexicale notariale - - Classification automatique des documents - - Extraction d'entités (identités, adresses, biens) - - Vérifications externes (Cadastre, Géorisques, BODACC, etc.) - - Calcul du score de vraisemblance - - Analyse contextuelle via LLM +2. **Pipeline de Traitement Avancé** (`services/worker/`) + - **Préprocessing** : Validation, conversion et optimisation des documents + - **OCR** : Extraction de texte avec Tesseract et correction lexicale notariale + - **Classification** : Détection du type de document via LLM + - **Extraction** : Extraction d'entités (identités, adresses, biens, montants, dates) + - **Indexation** : Indexation multi-système (AnythingLLM, OpenSearch, Neo4j) + - **Vérifications** : Contrôles métier et vérifications externes automatisées + - **Finalisation** : Synthèse et archivage avec score de vraisemblance -3. **Interface Web** (`services/web_interface/`) - - Interface utilisateur moderne pour les notaires - - Upload de documents par drag & drop - - Visualisation des analyses - - Tableaux de bord et statistiques +3. **Traitement Asynchrone** (Celery) + - Queues spécialisées pour chaque étape du pipeline + - Scalabilité horizontale des workers + - Monitoring des tâches en temps réel + - Gestion robuste des erreurs -4. **Services Externes** - - Ollama (modèles LLM locaux) - - APIs gouvernementales (Cadastre, Géorisques, BODACC) - - Base de données PostgreSQL - - Stockage MinIO - - Cache Redis +4. **Services Externes Intégrés** + - **Ollama** : Modèles LLM locaux (Llama 3, Mistral) + - **APIs Gouvernementales** : Cadastre, Géorisques, BODACC, Infogreffe, RBE + - **Base de données PostgreSQL** : Métadonnées et résultats + - **Stockage MinIO** : Documents et artefacts + - **Cache Redis** : Performance et queues Celery + - **Neo4j** : Graphe de connaissances + - **OpenSearch** : Recherche plein-texte + - **AnythingLLM** : Indexation sémantique ## 📋 Types de Documents Supportés diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index bdd94bf..d861016 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -20,9 +20,17 @@ Le système notarial 4NK_IA est conçu selon une architecture microservices mode ### **3. Résilience et Fiabilité** - Health checks automatiques -- Retry policies +- Retry policies avec Celery - Circuit breakers -- Monitoring complet +- Monitoring complet avec Prometheus/Grafana +- Gestion robuste des erreurs dans les pipelines + +### **4. Intégrations Externes** +- APIs gouvernementales (Cadastre, Géorisques, BODACC, Infogreffe, RBE) +- LLM locaux (Ollama avec Llama 3, Mistral) +- Indexation sémantique (AnythingLLM) +- Graphe de connaissances (Neo4j) +- Recherche plein-texte (OpenSearch) ## 🏛️ Architecture Logique diff --git a/docs/INSTALLATION.md b/docs/INSTALLATION.md index 38d6217..ad11e33 100644 --- a/docs/INSTALLATION.md +++ b/docs/INSTALLATION.md @@ -4,6 +4,37 @@ Ce guide vous accompagne dans l'installation complète du système notarial 4NK_IA, de l'environnement de développement à la production. +## ⚡ Installation Rapide (Recommandée) + +Pour une installation rapide et automatisée, utilisez le script bootstrap : + +```bash +# Cloner le repository +git clone https://git.4nkweb.com/4nk/4NK_IA_back.git +cd 4NK_IA_back + +# Bootstrap automatique +chmod +x ops/bootstrap.sh +./ops/bootstrap.sh +``` + +Le script bootstrap configure automatiquement : +- ✅ Vérification des prérequis +- ✅ Configuration de l'environnement +- ✅ Téléchargement des images Docker +- ✅ Démarrage des services +- ✅ Configuration des modèles Ollama +- ✅ Setup des workspaces AnythingLLM +- ✅ Vérifications finales + +**Services disponibles après installation :** +- 🌐 API Notariale : http://localhost:8000 +- 📚 Documentation API : http://localhost:8000/docs +- 🤖 AnythingLLM : http://localhost:3001 +- 📊 Grafana : http://localhost:3000 +- 🗄️ MinIO Console : http://localhost:9001 +- 🦙 Ollama : http://localhost:11434 + ## 📋 Prérequis ### **Système d'Exploitation** diff --git a/docs/NEW-FEATURES-v1.2.0.md b/docs/NEW-FEATURES-v1.2.0.md new file mode 100644 index 0000000..92ee8cf --- /dev/null +++ b/docs/NEW-FEATURES-v1.2.0.md @@ -0,0 +1,289 @@ +# Nouvelles Fonctionnalités v1.2.0 + +## 🚀 Vue d'ensemble + +La version 1.2.0 apporte des améliorations majeures au système 4NK Notariat avec l'intégration complète de Celery, des pipelines avancés et des intégrations externes automatisées. + +## 🔄 Pipeline de Traitement Asynchrone + +### Architecture Celery + +Le système utilise maintenant Celery pour le traitement asynchrone avec des queues spécialisées : + +- **Queue `pipeline`** : Traitement principal des documents +- **Queue `ocr`** : Traitement OCR spécialisé +- **Queue `classification`** : Classification des documents +- **Queue `extraction`** : Extraction d'entités +- **Queue `indexing`** : Indexation multi-système +- **Queue `verification`** : Vérifications externes + +### Étapes du Pipeline + +1. **Préprocessing** (`services/worker/pipelines/preprocess.py`) + - Validation des formats de fichiers + - Conversion automatique (HEIC → JPEG, images → PDF) + - Optimisation des images pour l'OCR + - Détection de la langue + - Calcul du hash pour l'intégrité + +2. **OCR Avancé** (`services/worker/pipelines/ocr.py`) + - Support PDF et images avec Tesseract + - Préprocessing d'images (contraste, débruitage, netteté) + - Correction lexicale spécialisée notariale + - Détection de la structure du document + - Support OCRmyPDF en fallback + +3. **Classification** (`services/worker/tasks/classification_tasks.py`) + - Détection du type de document via LLM + - Support des types : acte_vente, acte_donation, acte_succession, cni, contrat, autre + - Calcul de la confiance de classification + - Traitement en lot + +4. **Extraction d'Entités** (`services/worker/tasks/extraction_tasks.py`) + - Extraction des identités (vendeur, acheteur, notaire) + - Extraction des adresses et biens immobiliers + - Extraction des montants et dates + - Support spécifique par type de document + +5. **Indexation Multi-Système** (`services/worker/tasks/indexing_tasks.py`) + - **AnythingLLM** : Indexation sémantique pour la recherche intelligente + - **OpenSearch** : Recherche plein-texte avec analyseur français + - **Neo4j** : Graphe de connaissances pour les relations entre entités + +6. **Vérifications Externes** (`services/worker/tasks/verification_tasks.py`) + - **API Cadastre** : Vérification des parcelles immobilières + - **API Géorisques** : Analyse des risques géologiques + - **API BODACC** : Vérification des entreprises + - **API Infogreffe** : Recherche d'informations d'entreprises + - **API RBE** : Registre des Bénéficiaires Effectifs + - **Calcul du score de vraisemblance** basé sur les vérifications + +7. **Finalisation** (`services/worker/pipelines/finalize.py`) + - Synthèse des résultats + - Archivage des documents + - Génération des rapports + +## 🔗 Intégrations Externes + +### APIs Gouvernementales + +#### API Cadastre (`services/worker/utils/external_apis.py`) +```python +# Vérification d'adresse +result = await api_manager.verify_address( + address="123 Rue de la Paix", + postal_code="75001", + city="Paris" +) +``` + +#### API Géorisques +```python +# Vérification des risques géologiques +result = await api_manager.check_geological_risks( + address="123 Rue de la Paix", + coordinates=[2.3522, 48.8566] +) +``` + +#### API BODACC +```python +# Vérification d'entreprise +result = await api_manager.verify_company( + company_name="SARL Example", + siren="123456789" +) +``` + +### Clients d'Intégration + +#### AnythingLLM Client (`services/worker/utils/anythingllm_client.py`) +- Création automatique des workspaces +- Upload de documents avec métadonnées +- Recherche sémantique intelligente +- Indexation pour les actes similaires + +#### Neo4j Client (`services/worker/utils/neo4j_client.py`) +- Création du contexte de dossier +- Ajout des entités au graphe +- Recherche de documents liés +- Génération de résumés de dossier + +#### OpenSearch Client (`services/worker/utils/opensearch_client.py`) +- Indexation avec mapping français +- Recherche plein-texte avancée +- Recherche par entités +- Statistiques d'index + +## 🛠️ Configuration et Déploiement + +### Fichier d'Environnement + +Le fichier `infra/.env` contient toutes les variables de configuration : + +```bash +# Configuration du projet +PROJECT_NAME=notariat +DOMAIN=localhost + +# Base de données PostgreSQL +POSTGRES_USER=notariat +POSTGRES_PASSWORD=notariat_pwd +POSTGRES_DB=notariat + +# Redis pour Celery +REDIS_PASSWORD= + +# MinIO (Stockage S3-compatible) +MINIO_ROOT_USER=minio +MINIO_ROOT_PASSWORD=minio_pwd +MINIO_BUCKET=ingest + +# AnythingLLM +ANYLLM_API_KEY=change_me +ANYLLM_BASE_URL=http://anythingllm:3001 +ANYLLM_WORKSPACE_NORMES=workspace_normes +ANYLLM_WORKSPACE_TRAMES=workspace_trames +ANYLLM_WORKSPACE_ACTES=workspace_actes + +# Ollama (LLM local) +OLLAMA_BASE_URL=http://ollama:11434 +OLLAMA_MODELS=llama3:8b,mistral:7b + +# Neo4j (Graphe de connaissances) +NEO4J_AUTH=neo4j/neo4j_pwd + +# OpenSearch (Recherche plein-texte) +OPENSEARCH_PASSWORD=opensearch_pwd + +# URLs des APIs externes +CADASTRE_API_URL=https://apicarto.ign.fr/api/cadastre +GEORISQUES_API_URL=https://www.georisques.gouv.fr/api +BODACC_API_URL=https://bodacc-datadila.opendatasoft.com/api +INFOGREFFE_API_URL=https://entreprise.api.gouv.fr/v2/infogreffe +RBE_API_URL=https://www.data.gouv.fr/api/1/datasets/registre-des-beneficiaires-effectifs +``` + +### Script Bootstrap + +Le script `ops/bootstrap.sh` automatise l'installation complète : + +```bash +# Vérification des prérequis +check_prerequisites + +# Configuration de l'environnement +setup_environment + +# Téléchargement des images Docker +pull_images + +# Démarrage des services de base +start_base_services + +# Configuration de MinIO +setup_minio + +# Configuration d'Ollama +setup_ollama + +# Démarrage des services applicatifs +start_application_services + +# Configuration des workspaces AnythingLLM +setup_anythingllm_workspaces + +# Vérification finale +final_check +``` + +## 📊 Monitoring et Observabilité + +### Métriques Celery + +- Tâches actives, réservées et terminées +- Temps de traitement par étape +- Taux d'erreur par queue +- Performance des workers + +### Health Checks + +- Vérification de l'état des services +- Test de connectivité aux APIs externes +- Validation des queues Celery +- Contrôle de l'espace disque et mémoire + +## 🔧 Utilisation + +### Upload de Document + +```python +# Via l'API +response = requests.post( + "http://localhost:8000/api/notary/documents/upload", + files={"file": open("document.pdf", "rb")}, + data={ + "id_dossier": "DOSSIER-001", + "etude_id": "ETUDE-001", + "utilisateur_id": "USER-001" + } +) +``` + +### Suivi du Traitement + +```python +# Récupération du statut +status = requests.get( + f"http://localhost:8000/api/notary/documents/{doc_id}/status" +) + +# Récupération des résultats +results = requests.get( + f"http://localhost:8000/api/notary/documents/{doc_id}/results" +) +``` + +### Recherche + +```python +# Recherche plein-texte via OpenSearch +search_results = opensearch_client.search_documents( + query="acte de vente Paris", + filters={"doc_type": "acte_vente"}, + limit=10 +) + +# Recherche sémantique via AnythingLLM +semantic_results = anyllm_client.search_documents( + workspace_id="workspace_actes", + query="acte de vente immobilière", + limit=5 +) +``` + +## 🚀 Prochaines Étapes + +### Phase 3 - Intelligence Artificielle +- [ ] Configuration des modèles Ollama (llama3:8b, mistral:7b) +- [ ] Implémentation de la classification via LLM +- [ ] Extraction d'entités avec LLM +- [ ] Génération d'avis de synthèse + +### Phase 4 - Supervision +- [ ] Tableaux de bord Grafana +- [ ] Métriques Prometheus +- [ ] Journaux d'audit structurés + +### Phase 5 - Tests et Qualité +- [ ] Tests automatisés complets +- [ ] Données de test +- [ ] Seuils de qualité + +## 📚 Documentation + +- [README.md](../README.md) - Vue d'ensemble et démarrage rapide +- [docs/API-NOTARIALE.md](API-NOTARIALE.md) - Documentation API complète +- [docs/INSTALLATION.md](INSTALLATION.md) - Guide d'installation détaillé +- [docs/ARCHITECTURE.md](ARCHITECTURE.md) - Architecture du système +- [CHANGELOG.md](../CHANGELOG.md) - Historique des versions diff --git a/services/worker/tasks/indexing_tasks.py b/services/worker/tasks/indexing_tasks.py index b15088f..4e4b803 100644 --- a/services/worker/tasks/indexing_tasks.py +++ b/services/worker/tasks/indexing_tasks.py @@ -33,7 +33,7 @@ def index_document(self, doc_id: str, text: str, entities: Dict[str, Any], doc_t # Indexation dans les différents systèmes indexing_results = {} - + # 1. Indexation dans AnythingLLM try: from services.worker.utils.anythingllm_client import AnythingLLMClient @@ -45,7 +45,7 @@ def index_document(self, doc_id: str, text: str, entities: Dict[str, Any], doc_t except Exception as e: logger.error(f"Erreur indexation AnythingLLM: {e}") indexing_results['anythingllm'] = {'status': 'error', 'error': str(e)} - + # 2. Indexation dans OpenSearch try: from services.worker.utils.opensearch_client import OpenSearchClient @@ -61,12 +61,12 @@ def index_document(self, doc_id: str, text: str, entities: Dict[str, Any], doc_t except Exception as e: logger.error(f"Erreur indexation OpenSearch: {e}") indexing_results['opensearch'] = {'status': 'error', 'error': str(e)} - + # 3. Création du graphe Neo4j try: from services.worker.utils.neo4j_client import Neo4jClient neo4j_client = Neo4jClient() - + # Ajout du document au graphe neo4j_result = await neo4j_client.add_entities_to_document(doc_id, entities) indexing_results['neo4j'] = neo4j_result diff --git a/services/worker/tasks/verification_tasks.py b/services/worker/tasks/verification_tasks.py index ecaa319..1877ff9 100644 --- a/services/worker/tasks/verification_tasks.py +++ b/services/worker/tasks/verification_tasks.py @@ -32,7 +32,7 @@ def verify_document(self, doc_id: str, entities: Dict[str, Any], doc_type: str, # Vérifications externes avec les APIs verification_results = {} - + # 1. Vérification des adresses via Cadastre if 'bien' in entities and 'adresse' in entities['bien']: try: @@ -47,7 +47,7 @@ def verify_document(self, doc_id: str, entities: Dict[str, Any], doc_type: str, except Exception as e: logger.error(f"Erreur vérification Cadastre: {e}") verification_results['cadastre'] = {'status': 'error', 'error': str(e)} - + # 2. Vérification des risques géologiques if 'bien' in entities and 'adresse' in entities['bien']: try: @@ -60,7 +60,7 @@ def verify_document(self, doc_id: str, entities: Dict[str, Any], doc_type: str, except Exception as e: logger.error(f"Erreur vérification Géorisques: {e}") verification_results['georisques'] = {'status': 'error', 'error': str(e)} - + # 3. Vérification des entreprises (si applicable) if 'vendeur' in entities and 'nom' in entities['vendeur']: try: @@ -73,13 +73,13 @@ def verify_document(self, doc_id: str, entities: Dict[str, Any], doc_type: str, except Exception as e: logger.error(f"Erreur vérification BODACC: {e}") verification_results['bodacc'] = {'status': 'error', 'error': str(e)} - + # 4. Vérification des personnes if 'vendeur' in entities or 'acheteur' in entities: try: from services.worker.utils.external_apis import ExternalAPIManager api_manager = ExternalAPIManager() - + # Vérification du vendeur if 'vendeur' in entities: person_result = await api_manager.verify_person( @@ -88,7 +88,7 @@ def verify_document(self, doc_id: str, entities: Dict[str, Any], doc_type: str, entities['vendeur'].get('date_naissance') ) verification_results['person_vendeur'] = person_result - + # Vérification de l'acheteur if 'acheteur' in entities: person_result = await api_manager.verify_person( @@ -97,7 +97,7 @@ def verify_document(self, doc_id: str, entities: Dict[str, Any], doc_type: str, entities['acheteur'].get('date_naissance') ) verification_results['person_acheteur'] = person_result - + except Exception as e: logger.error(f"Erreur vérification personnes: {e}") verification_results['person_verification'] = {'status': 'error', 'error': str(e)} @@ -182,16 +182,16 @@ def update_external_data(): def _calculate_credibility_score(verification_results: Dict[str, Any]) -> float: """ Calcul du score de vraisemblance basé sur les vérifications - + Args: verification_results: Résultats des vérifications - + Returns: Score de vraisemblance entre 0 et 1 """ total_score = 0.0 total_weight = 0.0 - + # Poids des différentes vérifications weights = { 'cadastre': 0.3, @@ -200,12 +200,12 @@ def _calculate_credibility_score(verification_results: Dict[str, Any]) -> float: 'person_vendeur': 0.15, 'person_acheteur': 0.15 } - + for verification_type, result in verification_results.items(): if verification_type in weights: weight = weights[verification_type] total_weight += weight - + if result.get('status') == 'verified': confidence = result.get('confidence', 0.8) total_score += confidence * weight @@ -215,11 +215,11 @@ def _calculate_credibility_score(verification_results: Dict[str, Any]) -> float: elif result.get('status') == 'error': # Erreur réduit le score total_score += 0.2 * weight - + # Normalisation du score if total_weight > 0: final_score = total_score / total_weight else: final_score = 0.5 # Score par défaut si aucune vérification - + return min(max(final_score, 0.0), 1.0) diff --git a/services/worker/utils/anythingllm_client.py b/services/worker/utils/anythingllm_client.py index f095ed5..17e7a16 100644 --- a/services/worker/utils/anythingllm_client.py +++ b/services/worker/utils/anythingllm_client.py @@ -12,37 +12,37 @@ logger = logging.getLogger(__name__) class AnythingLLMClient: """Client pour l'intégration avec AnythingLLM""" - + def __init__(self): self.base_url = os.getenv('ANYLLM_BASE_URL', 'http://anythingllm:3001') self.api_key = os.getenv('ANYLLM_API_KEY', 'change_me') - + # Configuration des workspaces self.workspaces = { 'normes': os.getenv('ANYLLM_WORKSPACE_NORMES', 'workspace_normes'), 'trames': os.getenv('ANYLLM_WORKSPACE_TRAMES', 'workspace_trames'), 'actes': os.getenv('ANYLLM_WORKSPACE_ACTES', 'workspace_actes') } - + self.session = requests.Session() self.session.headers.update({ 'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json' }) - + async def create_workspace(self, name: str, description: str = None) -> Dict[str, Any]: """ Création d'un workspace AnythingLLM - + Args: name: Nom du workspace description: Description du workspace - + Returns: Résultat de la création """ logger.info(f"🏢 Création du workspace AnythingLLM: {name}") - + try: payload = { 'name': name, @@ -55,13 +55,13 @@ class AnythingLLMClient: 'embeddingsModel': 'text-embedding-ada-002', 'vectorTag': name.lower().replace(' ', '_') } - + response = self.session.post( f"{self.base_url}/api/workspace/new", json=payload, timeout=30 ) - + if response.status_code == 200: data = response.json() logger.info(f"✅ Workspace {name} créé avec succès") @@ -78,49 +78,49 @@ class AnythingLLMClient: 'error': f"Erreur API: {response.status_code}", 'response': response.text } - + except Exception as e: logger.error(f"Erreur lors de la création du workspace {name}: {e}") return { 'status': 'error', 'error': str(e) } - - async def upload_document(self, workspace_id: str, document_data: bytes, filename: str, + + async def upload_document(self, workspace_id: str, document_data: bytes, filename: str, metadata: Dict[str, Any] = None) -> Dict[str, Any]: """ Upload d'un document dans un workspace - + Args: workspace_id: ID du workspace document_data: Données du document filename: Nom du fichier metadata: Métadonnées du document - + Returns: Résultat de l'upload """ logger.info(f"📄 Upload du document {filename} dans le workspace {workspace_id}") - + try: # Préparation des fichiers files = { 'file': (filename, document_data, 'application/octet-stream') } - + # Préparation des données data = { 'workspaceId': workspace_id, 'chunkSize': 1000, 'chunkOverlap': 200 } - + if metadata: data['metadata'] = json.dumps(metadata) - + # Suppression de l'header Content-Type pour les multipart headers = {'Authorization': f'Bearer {self.api_key}'} - + response = requests.post( f"{self.base_url}/api/workspace/{workspace_id}/upload", files=files, @@ -128,7 +128,7 @@ class AnythingLLMClient: headers=headers, timeout=60 ) - + if response.status_code == 200: data = response.json() logger.info(f"✅ Document {filename} uploadé avec succès") @@ -147,29 +147,29 @@ class AnythingLLMClient: 'error': f"Erreur API: {response.status_code}", 'response': response.text } - + except Exception as e: logger.error(f"Erreur lors de l'upload du document {filename}: {e}") return { 'status': 'error', 'error': str(e) } - - async def search_documents(self, workspace_id: str, query: str, + + async def search_documents(self, workspace_id: str, query: str, limit: int = 10) -> Dict[str, Any]: """ Recherche dans les documents d'un workspace - + Args: workspace_id: ID du workspace query: Requête de recherche limit: Nombre maximum de résultats - + Returns: Résultats de la recherche """ logger.info(f"🔍 Recherche dans le workspace {workspace_id}: {query}") - + try: payload = { 'workspaceId': workspace_id, @@ -179,13 +179,13 @@ class AnythingLLMClient: 'temperature': 0.7, 'topK': limit } - + response = self.session.post( f"{self.base_url}/api/workspace/{workspace_id}/chat", json=payload, timeout=30 ) - + if response.status_code == 200: data = response.json() logger.info(f"✅ Recherche terminée, {len(data.get('sources', []))} résultats") @@ -204,21 +204,21 @@ class AnythingLLMClient: 'error': f"Erreur API: {response.status_code}", 'response': response.text } - + except Exception as e: logger.error(f"Erreur lors de la recherche: {e}") return { 'status': 'error', 'error': str(e) } - + async def get_workspace_info(self, workspace_id: str) -> Dict[str, Any]: """ Récupération des informations d'un workspace - + Args: workspace_id: ID du workspace - + Returns: Informations du workspace """ @@ -227,7 +227,7 @@ class AnythingLLMClient: f"{self.base_url}/api/workspace/{workspace_id}", timeout=10 ) - + if response.status_code == 200: data = response.json() return { @@ -240,18 +240,18 @@ class AnythingLLMClient: 'status': 'error', 'error': f"Erreur API: {response.status_code}" } - + except Exception as e: logger.error(f"Erreur lors de la récupération du workspace: {e}") return { 'status': 'error', 'error': str(e) } - + async def list_workspaces(self) -> Dict[str, Any]: """ Liste tous les workspaces disponibles - + Returns: Liste des workspaces """ @@ -260,7 +260,7 @@ class AnythingLLMClient: f"{self.base_url}/api/workspaces", timeout=10 ) - + if response.status_code == 200: data = response.json() return { @@ -274,35 +274,35 @@ class AnythingLLMClient: 'status': 'error', 'error': f"Erreur API: {response.status_code}" } - + except Exception as e: logger.error(f"Erreur lors de la liste des workspaces: {e}") return { 'status': 'error', 'error': str(e) } - - async def index_document_for_actes(self, doc_id: str, text: str, - entities: Dict[str, Any], + + async def index_document_for_actes(self, doc_id: str, text: str, + entities: Dict[str, Any], doc_type: str) -> Dict[str, Any]: """ Indexation d'un document dans le workspace des actes - + Args: doc_id: ID du document text: Texte du document entities: Entités extraites doc_type: Type de document - + Returns: Résultat de l'indexation """ logger.info(f"📚 Indexation du document {doc_id} dans le workspace actes") - + try: # Préparation du contenu structuré structured_content = self._prepare_structured_content(doc_id, text, entities, doc_type) - + # Upload du contenu structuré workspace_id = await self._get_workspace_id('actes') if not workspace_id: @@ -310,39 +310,39 @@ class AnythingLLMClient: 'status': 'error', 'error': 'Workspace actes non trouvé' } - + filename = f"{doc_id}_structured.txt" document_data = structured_content.encode('utf-8') - + result = await self.upload_document(workspace_id, document_data, filename, { 'doc_id': doc_id, 'doc_type': doc_type, 'entities': entities, 'indexed_at': datetime.now().isoformat() }) - + return result - + except Exception as e: logger.error(f"Erreur lors de l'indexation du document {doc_id}: {e}") return { 'status': 'error', 'error': str(e) } - + async def search_similar_actes(self, doc_type: str, entities: Dict[str, Any]) -> Dict[str, Any]: """ Recherche d'actes similaires - + Args: doc_type: Type de document entities: Entités extraites - + Returns: Actes similaires trouvés """ logger.info(f"🔍 Recherche d'actes similaires pour le type: {doc_type}") - + try: workspace_id = await self._get_workspace_id('actes') if not workspace_id: @@ -350,22 +350,22 @@ class AnythingLLMClient: 'status': 'error', 'error': 'Workspace actes non trouvé' } - + # Construction de la requête de recherche query = self._build_similarity_query(doc_type, entities) - + result = await self.search_documents(workspace_id, query, limit=5) - + return result - + except Exception as e: logger.error(f"Erreur lors de la recherche d'actes similaires: {e}") return { 'status': 'error', 'error': str(e) } - - def _prepare_structured_content(self, doc_id: str, text: str, + + def _prepare_structured_content(self, doc_id: str, text: str, entities: Dict[str, Any], doc_type: str) -> str: """Prépare le contenu structuré pour l'indexation""" content = f"""DOCUMENT ID: {doc_id} @@ -382,11 +382,11 @@ TEXTE DU DOCUMENT: Ce document a été traité par le pipeline notarial v1.2.0 """ return content - + def _build_similarity_query(self, doc_type: str, entities: Dict[str, Any]) -> str: """Construit une requête de recherche pour trouver des actes similaires""" query_parts = [f"type:{doc_type}"] - + # Ajout des entités importantes if 'vendeur' in entities: query_parts.append(f"vendeur:{entities['vendeur'].get('nom', '')}") @@ -394,9 +394,9 @@ Ce document a été traité par le pipeline notarial v1.2.0 query_parts.append(f"acheteur:{entities['acheteur'].get('nom', '')}") if 'bien' in entities: query_parts.append(f"adresse:{entities['bien'].get('adresse', '')}") - + return " ".join(query_parts) - + async def _get_workspace_id(self, workspace_name: str) -> Optional[str]: """Récupère l'ID d'un workspace par son nom""" try: diff --git a/services/worker/utils/external_apis.py b/services/worker/utils/external_apis.py index 7e8aa56..cd19e43 100644 --- a/services/worker/utils/external_apis.py +++ b/services/worker/utils/external_apis.py @@ -12,13 +12,13 @@ logger = logging.getLogger(__name__) class ExternalAPIManager: """Gestionnaire des APIs externes pour la vérification des données""" - + def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Notariat-Pipeline/1.2.0' }) - + # Configuration des URLs des APIs self.apis = { 'cadastre': os.getenv('CADASTRE_API_URL', 'https://apicarto.ign.fr/api/cadastre'), @@ -27,52 +27,52 @@ class ExternalAPIManager: 'infogreffe': os.getenv('INFOGREFFE_API_URL', 'https://entreprise.api.gouv.fr/v2/infogreffe'), 'rbe': os.getenv('RBE_API_URL', 'https://www.data.gouv.fr/api/1/datasets/registre-des-beneficiaires-effectifs') } - + # Cache pour éviter les appels répétés self.cache = {} self.cache_ttl = 3600 # 1 heure - + async def verify_address(self, address: str, postal_code: str = None, city: str = None) -> Dict[str, Any]: """ Vérification d'une adresse via l'API Cadastre - + Args: address: Adresse à vérifier postal_code: Code postal city: Ville - + Returns: Résultat de la vérification """ logger.info(f"🏠 Vérification de l'adresse: {address}") - + try: # Construction de la requête params = { 'q': address, 'limit': 5 } - + if postal_code: params['code_postal'] = postal_code if city: params['commune'] = city - + # Appel à l'API Cadastre response = self.session.get( f"{self.apis['cadastre']}/parcelle", params=params, timeout=10 ) - + if response.status_code == 200: data = response.json() - + if data.get('features'): # Adresse trouvée feature = data['features'][0] properties = feature.get('properties', {}) - + return { 'status': 'verified', 'confidence': 0.95, @@ -100,7 +100,7 @@ class ExternalAPIManager: 'error': f"Erreur API: {response.status_code}", 'source': 'cadastre_api' } - + except Exception as e: logger.error(f"Erreur lors de la vérification de l'adresse: {e}") return { @@ -109,27 +109,27 @@ class ExternalAPIManager: 'error': str(e), 'source': 'cadastre_api' } - + async def check_geological_risks(self, address: str, coordinates: List[float] = None) -> Dict[str, Any]: """ Vérification des risques géologiques via l'API Géorisques - + Args: address: Adresse à vérifier coordinates: Coordonnées GPS [longitude, latitude] - + Returns: Résultat de la vérification des risques """ logger.info(f"🌍 Vérification des risques géologiques: {address}") - + try: # Si pas de coordonnées, essayer de les obtenir via géocodage if not coordinates: coords_result = await self._geocode_address(address) if coords_result.get('coordinates'): coordinates = coords_result['coordinates'] - + if not coordinates: return { 'status': 'error', @@ -137,23 +137,23 @@ class ExternalAPIManager: 'error': 'Coordonnées non disponibles', 'source': 'georisques_api' } - + # Appel à l'API Géorisques params = { 'lon': coordinates[0], 'lat': coordinates[1], 'distance': 1000 # 1km de rayon } - + response = self.session.get( f"{self.apis['georisques']}/v1/gaspar/risques", params=params, timeout=10 ) - + if response.status_code == 200: data = response.json() - + risks = [] if data.get('data'): for risk in data['data']: @@ -163,7 +163,7 @@ class ExternalAPIManager: 'description': risk.get('description', ''), 'distance': risk.get('distance', 0) }) - + return { 'status': 'completed', 'confidence': 0.90, @@ -181,7 +181,7 @@ class ExternalAPIManager: 'error': f"Erreur API: {response.status_code}", 'source': 'georisques_api' } - + except Exception as e: logger.error(f"Erreur lors de la vérification des risques géologiques: {e}") return { @@ -190,45 +190,45 @@ class ExternalAPIManager: 'error': str(e), 'source': 'georisques_api' } - + async def verify_company(self, company_name: str, siren: str = None) -> Dict[str, Any]: """ Vérification d'une entreprise via l'API BODACC - + Args: company_name: Nom de l'entreprise siren: Numéro SIREN (optionnel) - + Returns: Résultat de la vérification de l'entreprise """ logger.info(f"🏢 Vérification de l'entreprise: {company_name}") - + try: # Construction de la requête params = { 'q': company_name, 'rows': 5 } - + if siren: params['siren'] = siren - + # Appel à l'API BODACC response = self.session.get( f"{self.apis['bodacc']}/records/1.0/search/", params=params, timeout=10 ) - + if response.status_code == 200: data = response.json() - + if data.get('records'): # Entreprise trouvée record = data['records'][0] fields = record.get('fields', {}) - + return { 'status': 'verified', 'confidence': 0.90, @@ -262,7 +262,7 @@ class ExternalAPIManager: 'error': f"Erreur API: {response.status_code}", 'source': 'bodacc_api' } - + except Exception as e: logger.error(f"Erreur lors de la vérification de l'entreprise: {e}") return { @@ -271,28 +271,28 @@ class ExternalAPIManager: 'error': str(e), 'source': 'bodacc_api' } - + async def verify_person(self, first_name: str, last_name: str, birth_date: str = None) -> Dict[str, Any]: """ Vérification d'une personne (recherche d'informations publiques) - + Args: first_name: Prénom last_name: Nom de famille birth_date: Date de naissance (format YYYY-MM-DD) - + Returns: Résultat de la vérification de la personne """ logger.info(f"👤 Vérification de la personne: {first_name} {last_name}") - + try: # Recherche dans le RBE (Registre des Bénéficiaires Effectifs) rbe_result = await self._search_rbe(first_name, last_name) - + # Recherche dans Infogreffe (si entreprise) infogreffe_result = await self._search_infogreffe(first_name, last_name) - + # Compilation des résultats results = { 'status': 'completed', @@ -304,13 +304,13 @@ class ExternalAPIManager: 'source': 'multiple_apis', 'verified_at': datetime.now().isoformat() } - + # Calcul de la confiance globale if rbe_result.get('found') or infogreffe_result.get('found'): results['confidence'] = 0.85 - + return results - + except Exception as e: logger.error(f"Erreur lors de la vérification de la personne: {e}") return { @@ -319,7 +319,7 @@ class ExternalAPIManager: 'error': str(e), 'source': 'person_verification' } - + async def _geocode_address(self, address: str) -> Dict[str, Any]: """Géocodage d'une adresse""" try: @@ -328,13 +328,13 @@ class ExternalAPIManager: 'q': address, 'limit': 1 } - + response = self.session.get( f"{self.apis['cadastre']}/geocodage", params=params, timeout=10 ) - + if response.status_code == 200: data = response.json() if data.get('features'): @@ -344,13 +344,13 @@ class ExternalAPIManager: 'coordinates': coords, 'formatted_address': feature.get('properties', {}).get('label', address) } - + return {'coordinates': None} - + except Exception as e: logger.error(f"Erreur lors du géocodage: {e}") return {'coordinates': None} - + async def _search_rbe(self, first_name: str, last_name: str) -> Dict[str, Any]: """Recherche dans le Registre des Bénéficiaires Effectifs""" try: @@ -358,13 +358,13 @@ class ExternalAPIManager: 'q': f"{first_name} {last_name}", 'rows': 5 } - + response = self.session.get( f"{self.apis['rbe']}/search", params=params, timeout=10 ) - + if response.status_code == 200: data = response.json() return { @@ -372,13 +372,13 @@ class ExternalAPIManager: 'count': len(data.get('results', [])), 'results': data.get('results', [])[:3] # Limite à 3 résultats } - + return {'found': False, 'count': 0, 'results': []} - + except Exception as e: logger.error(f"Erreur lors de la recherche RBE: {e}") return {'found': False, 'count': 0, 'results': []} - + async def _search_infogreffe(self, first_name: str, last_name: str) -> Dict[str, Any]: """Recherche dans Infogreffe""" try: @@ -386,13 +386,13 @@ class ExternalAPIManager: 'q': f"{first_name} {last_name}", 'per_page': 5 } - + response = self.session.get( f"{self.apis['infogreffe']}/search", params=params, timeout=10 ) - + if response.status_code == 200: data = response.json() return { @@ -400,35 +400,35 @@ class ExternalAPIManager: 'count': len(data.get('results', [])), 'results': data.get('results', [])[:3] # Limite à 3 résultats } - + return {'found': False, 'count': 0, 'results': []} - + except Exception as e: logger.error(f"Erreur lors de la recherche Infogreffe: {e}") return {'found': False, 'count': 0, 'results': []} - + def get_cache_key(self, api: str, params: Dict[str, Any]) -> str: """Génère une clé de cache pour les paramètres donnés""" import hashlib key_data = f"{api}:{json.dumps(params, sort_keys=True)}" return hashlib.md5(key_data.encode()).hexdigest() - + def is_cache_valid(self, cache_key: str) -> bool: """Vérifie si le cache est encore valide""" if cache_key not in self.cache: return False - + cache_time = self.cache[cache_key].get('timestamp', 0) current_time = datetime.now().timestamp() - + return (current_time - cache_time) < self.cache_ttl - + def get_from_cache(self, cache_key: str) -> Optional[Dict[str, Any]]: """Récupère une valeur du cache""" if self.is_cache_valid(cache_key): return self.cache[cache_key].get('data') return None - + def set_cache(self, cache_key: str, data: Dict[str, Any]) -> None: """Met une valeur en cache""" self.cache[cache_key] = { diff --git a/services/worker/utils/neo4j_client.py b/services/worker/utils/neo4j_client.py index b97fac2..0d741ae 100644 --- a/services/worker/utils/neo4j_client.py +++ b/services/worker/utils/neo4j_client.py @@ -12,15 +12,15 @@ logger = logging.getLogger(__name__) class Neo4jClient: """Client pour l'intégration avec Neo4j""" - + def __init__(self): self.uri = os.getenv('NEO4J_URI', 'bolt://neo4j:7687') self.username = os.getenv('NEO4J_USER', 'neo4j') self.password = os.getenv('NEO4J_PASSWORD', 'neo4j_pwd') - + self.driver = None self._connect() - + def _connect(self): """Connexion à Neo4j""" try: @@ -32,25 +32,25 @@ class Neo4jClient: except Exception as e: logger.error(f"❌ Erreur de connexion à Neo4j: {e}") self.driver = None - + def close(self): """Fermeture de la connexion""" if self.driver: self.driver.close() - + async def create_dossier_context(self, dossier_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]: """ Création du contexte d'un dossier dans le graphe - + Args: dossier_id: ID du dossier metadata: Métadonnées du dossier - + Returns: Résultat de la création """ logger.info(f"📁 Création du contexte du dossier {dossier_id}") - + try: with self.driver.session() as session: # Création du nœud dossier @@ -63,14 +63,14 @@ class Neo4jClient: d.status = $status, d.metadata = $metadata RETURN d - """, + """, dossier_id=dossier_id, etude_id=metadata.get('etude_id'), utilisateur_id=metadata.get('utilisateur_id'), status=metadata.get('status', 'active'), metadata=json.dumps(metadata) ) - + record = result.single() if record: logger.info(f"✅ Contexte du dossier {dossier_id} créé") @@ -84,29 +84,29 @@ class Neo4jClient: 'status': 'error', 'error': 'Impossible de créer le contexte du dossier' } - + except Exception as e: logger.error(f"❌ Erreur lors de la création du contexte du dossier {dossier_id}: {e}") return { 'status': 'error', 'error': str(e) } - - async def add_document_to_dossier(self, dossier_id: str, doc_id: str, + + async def add_document_to_dossier(self, dossier_id: str, doc_id: str, doc_metadata: Dict[str, Any]) -> Dict[str, Any]: """ Ajout d'un document à un dossier - + Args: dossier_id: ID du dossier doc_id: ID du document doc_metadata: Métadonnées du document - + Returns: Résultat de l'ajout """ logger.info(f"📄 Ajout du document {doc_id} au dossier {dossier_id}") - + try: with self.driver.session() as session: # Création du nœud document et relation avec le dossier @@ -129,7 +129,7 @@ class Neo4jClient: status=doc_metadata.get('status', 'uploaded'), metadata=json.dumps(doc_metadata) ) - + record = result.single() if record: logger.info(f"✅ Document {doc_id} ajouté au dossier {dossier_id}") @@ -144,27 +144,27 @@ class Neo4jClient: 'status': 'error', 'error': 'Impossible d\'ajouter le document au dossier' } - + except Exception as e: logger.error(f"❌ Erreur lors de l'ajout du document {doc_id} au dossier {dossier_id}: {e}") return { 'status': 'error', 'error': str(e) } - + async def add_entities_to_document(self, doc_id: str, entities: Dict[str, Any]) -> Dict[str, Any]: """ Ajout des entités extraites à un document - + Args: doc_id: ID du document entities: Entités extraites - + Returns: Résultat de l'ajout """ logger.info(f"🏷️ Ajout des entités au document {doc_id}") - + try: with self.driver.session() as session: # Traitement des entités selon leur type @@ -179,7 +179,7 @@ class Neo4jClient: await self._add_amount_entities(session, doc_id, entity_data) elif entity_type == 'dates': await self._add_date_entities(session, doc_id, entity_data) - + logger.info(f"✅ Entités ajoutées au document {doc_id}") return { 'status': 'added', @@ -187,14 +187,14 @@ class Neo4jClient: 'entities_count': len(entities), 'added_at': datetime.now().isoformat() } - + except Exception as e: logger.error(f"❌ Erreur lors de l'ajout des entités au document {doc_id}: {e}") return { 'status': 'error', 'error': str(e) } - + async def _add_person_entities(self, session, doc_id: str, persons: List[Dict[str, Any]]): """Ajout des entités personnes""" for person in persons: @@ -218,7 +218,7 @@ class Neo4jClient: nationalite=person.get('nationalite'), adresse=person.get('adresse') ) - + async def _add_address_entities(self, session, doc_id: str, addresses: List[Dict[str, Any]]): """Ajout des entités adresses""" for address in addresses: @@ -243,7 +243,7 @@ class Neo4jClient: region=address.get('region'), coordinates=json.dumps(address.get('coordinates', [])) ) - + async def _add_property_entities(self, session, doc_id: str, properties: List[Dict[str, Any]]): """Ajout des entités biens""" for property_data in properties: @@ -266,7 +266,7 @@ class Neo4jClient: type_bien=property_data.get('type_bien'), reference_cadastrale=property_data.get('reference_cadastrale') ) - + async def _add_amount_entities(self, session, doc_id: str, amounts: List[Dict[str, Any]]): """Ajout des entités montants""" for amount in amounts: @@ -286,7 +286,7 @@ class Neo4jClient: type_montant=amount.get('type_montant'), description=amount.get('description') ) - + async def _add_date_entities(self, session, doc_id: str, dates: List[Dict[str, Any]]): """Ajout des entités dates""" for date_data in dates: @@ -305,20 +305,20 @@ class Neo4jClient: type_date=date_data.get('type_date'), description=date_data.get('description') ) - + async def find_related_documents(self, doc_id: str, max_depth: int = 2) -> Dict[str, Any]: """ Recherche de documents liés - + Args: doc_id: ID du document max_depth: Profondeur maximale de recherche - + Returns: Documents liés trouvés """ logger.info(f"🔗 Recherche de documents liés au document {doc_id}") - + try: with self.driver.session() as session: result = session.run(""" @@ -331,7 +331,7 @@ class Neo4jClient: doc_id=doc_id, max_depth=max_depth ) - + related_docs = [] for record in result: related_docs.append({ @@ -340,7 +340,7 @@ class Neo4jClient: 'type': record['related'].get('type'), 'distance': record['distance'] }) - + logger.info(f"✅ {len(related_docs)} documents liés trouvés") return { 'status': 'completed', @@ -349,26 +349,26 @@ class Neo4jClient: 'count': len(related_docs), 'searched_at': datetime.now().isoformat() } - + except Exception as e: logger.error(f"❌ Erreur lors de la recherche de documents liés: {e}") return { 'status': 'error', 'error': str(e) } - + async def get_dossier_summary(self, dossier_id: str) -> Dict[str, Any]: """ Récupération du résumé d'un dossier - + Args: dossier_id: ID du dossier - + Returns: Résumé du dossier """ logger.info(f"📊 Génération du résumé du dossier {dossier_id}") - + try: with self.driver.session() as session: # Statistiques générales @@ -376,16 +376,16 @@ class Neo4jClient: MATCH (d:Dossier {id: $dossier_id}) OPTIONAL MATCH (d)-[:CONTAINS]->(doc:Document) OPTIONAL MATCH (doc)-[:MENTIONS]->(entity) - RETURN + RETURN count(DISTINCT doc) as documents_count, count(DISTINCT entity) as entities_count, collect(DISTINCT doc.type) as document_types """, dossier_id=dossier_id ) - + stats_record = stats_result.single() - + # Entités les plus fréquentes entities_result = session.run(""" MATCH (d:Dossier {id: $dossier_id})-[:CONTAINS]->(doc:Document)-[:MENTIONS]->(entity) @@ -395,14 +395,14 @@ class Neo4jClient: """, dossier_id=dossier_id ) - + entity_frequencies = [] for record in entities_result: entity_frequencies.append({ 'type': record['entity_type'], 'frequency': record['frequency'] }) - + return { 'status': 'completed', 'dossier_id': dossier_id, @@ -414,38 +414,38 @@ class Neo4jClient: }, 'generated_at': datetime.now().isoformat() } - + except Exception as e: logger.error(f"❌ Erreur lors de la génération du résumé du dossier {dossier_id}: {e}") return { 'status': 'error', 'error': str(e) } - - async def create_relationships_between_entities(self, doc_id: str, + + async def create_relationships_between_entities(self, doc_id: str, relationships: List[Dict[str, Any]]) -> Dict[str, Any]: """ Création de relations entre entités - + Args: doc_id: ID du document relationships: Liste des relations à créer - + Returns: Résultat de la création des relations """ logger.info(f"🔗 Création de relations pour le document {doc_id}") - + try: with self.driver.session() as session: created_relations = 0 - + for rel in relationships: rel_type = rel.get('type') from_entity = rel.get('from') to_entity = rel.get('to') properties = rel.get('properties', {}) - + if rel_type and from_entity and to_entity: result = session.run(f""" MATCH (doc:Document {{id: $doc_id}}) @@ -462,10 +462,10 @@ class Neo4jClient: to_id=to_entity['id'], properties=json.dumps(properties) ) - + if result.single(): created_relations += 1 - + logger.info(f"✅ {created_relations} relations créées") return { 'status': 'completed', @@ -473,7 +473,7 @@ class Neo4jClient: 'relations_created': created_relations, 'created_at': datetime.now().isoformat() } - + except Exception as e: logger.error(f"❌ Erreur lors de la création des relations: {e}") return { diff --git a/services/worker/utils/opensearch_client.py b/services/worker/utils/opensearch_client.py index 6908946..b042254 100644 --- a/services/worker/utils/opensearch_client.py +++ b/services/worker/utils/opensearch_client.py @@ -12,14 +12,14 @@ logger = logging.getLogger(__name__) class OpenSearchClient: """Client pour l'intégration avec OpenSearch""" - + def __init__(self): self.host = os.getenv('OPENSEARCH_HOST', 'opensearch') self.port = int(os.getenv('OPENSEARCH_PORT', '9200')) self.username = os.getenv('OPENSEARCH_USER', 'admin') self.password = os.getenv('OPENSEARCH_PASSWORD', 'opensearch_pwd') self.use_ssl = os.getenv('OPENSEARCH_USE_SSL', 'false').lower() == 'true' - + # Configuration du client OpenSearch self.client = OpenSearch( hosts=[{'host': self.host, 'port': self.port}], @@ -29,12 +29,12 @@ class OpenSearchClient: connection_class=RequestsHttpConnection, timeout=30 ) - + # Index par défaut self.default_index = os.getenv('OPENSEARCH_INDEX', 'notariat_documents') - + self._ensure_index_exists() - + def _ensure_index_exists(self): """Vérifie et crée l'index s'il n'existe pas""" try: @@ -45,7 +45,7 @@ class OpenSearchClient: logger.info(f"✅ Index {self.default_index} existe déjà") except Exception as e: logger.error(f"❌ Erreur lors de la vérification de l'index: {e}") - + def _create_index(self): """Crée l'index avec le mapping approprié""" mapping = { @@ -112,22 +112,22 @@ class OpenSearchClient: } } } - + self.client.indices.create(index=self.default_index, body=mapping) - + async def index_document(self, doc_id: str, document_data: Dict[str, Any]) -> Dict[str, Any]: """ Indexation d'un document dans OpenSearch - + Args: doc_id: ID du document document_data: Données du document à indexer - + Returns: Résultat de l'indexation """ logger.info(f"📚 Indexation du document {doc_id} dans OpenSearch") - + try: # Préparation du document pour l'indexation indexed_doc = { @@ -145,14 +145,14 @@ class OpenSearchClient: "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat() } - + # Indexation du document response = self.client.index( index=self.default_index, id=doc_id, body=indexed_doc ) - + logger.info(f"✅ Document {doc_id} indexé avec succès") return { 'status': 'indexed', @@ -161,30 +161,30 @@ class OpenSearchClient: 'version': response.get('_version'), 'indexed_at': datetime.now().isoformat() } - + except Exception as e: logger.error(f"❌ Erreur lors de l'indexation du document {doc_id}: {e}") return { 'status': 'error', 'error': str(e) } - - async def search_documents(self, query: str, filters: Dict[str, Any] = None, + + async def search_documents(self, query: str, filters: Dict[str, Any] = None, limit: int = 10, offset: int = 0) -> Dict[str, Any]: """ Recherche de documents dans OpenSearch - + Args: query: Requête de recherche filters: Filtres à appliquer limit: Nombre maximum de résultats offset: Décalage pour la pagination - + Returns: Résultats de la recherche """ logger.info(f"🔍 Recherche dans OpenSearch: {query}") - + try: # Construction de la requête search_body = { @@ -217,12 +217,12 @@ class OpenSearchClient: "from": offset, "size": limit } - + # Ajout des filtres if filters: bool_query = search_body["query"]["bool"] bool_query["filter"] = [] - + for field, value in filters.items(): if isinstance(value, list): bool_query["filter"].append({ @@ -232,17 +232,17 @@ class OpenSearchClient: bool_query["filter"].append({ "term": {field: value} }) - + # Exécution de la recherche response = self.client.search( index=self.default_index, body=search_body ) - + # Traitement des résultats hits = response.get('hits', {}) total = hits.get('total', {}).get('value', 0) - + results = [] for hit in hits.get('hits', []): result = { @@ -254,7 +254,7 @@ class OpenSearchClient: 'created_at': hit['_source'].get('created_at') } results.append(result) - + logger.info(f"✅ Recherche terminée: {len(results)} résultats sur {total}") return { 'status': 'completed', @@ -264,32 +264,32 @@ class OpenSearchClient: 'took': response.get('took'), 'searched_at': datetime.now().isoformat() } - + except Exception as e: logger.error(f"❌ Erreur lors de la recherche: {e}") return { 'status': 'error', 'error': str(e) } - - async def search_by_entities(self, entities: Dict[str, Any], + + async def search_by_entities(self, entities: Dict[str, Any], limit: int = 10) -> Dict[str, Any]: """ Recherche de documents par entités - + Args: entities: Entités à rechercher limit: Nombre maximum de résultats - + Returns: Résultats de la recherche """ logger.info(f"🏷️ Recherche par entités dans OpenSearch") - + try: # Construction de la requête pour les entités must_queries = [] - + for entity_type, entity_values in entities.items(): if isinstance(entity_values, list): for value in entity_values: @@ -320,7 +320,7 @@ class OpenSearchClient: } } }) - + search_body = { "query": { "bool": { @@ -333,17 +333,17 @@ class OpenSearchClient: ], "size": limit } - + # Exécution de la recherche response = self.client.search( index=self.default_index, body=search_body ) - + # Traitement des résultats hits = response.get('hits', {}) total = hits.get('total', {}).get('value', 0) - + results = [] for hit in hits.get('hits', []): result = { @@ -355,7 +355,7 @@ class OpenSearchClient: 'created_at': hit['_source'].get('created_at') } results.append(result) - + logger.info(f"✅ Recherche par entités terminée: {len(results)} résultats") return { 'status': 'completed', @@ -364,21 +364,21 @@ class OpenSearchClient: 'results': results, 'searched_at': datetime.now().isoformat() } - + except Exception as e: logger.error(f"❌ Erreur lors de la recherche par entités: {e}") return { 'status': 'error', 'error': str(e) } - + async def get_document(self, doc_id: str) -> Dict[str, Any]: """ Récupération d'un document par son ID - + Args: doc_id: ID du document - + Returns: Document récupéré """ @@ -387,7 +387,7 @@ class OpenSearchClient: index=self.default_index, id=doc_id ) - + if response.get('found'): return { 'status': 'found', @@ -400,31 +400,31 @@ class OpenSearchClient: 'status': 'not_found', 'doc_id': doc_id } - + except Exception as e: logger.error(f"❌ Erreur lors de la récupération du document {doc_id}: {e}") return { 'status': 'error', 'error': str(e) } - + async def update_document(self, doc_id: str, updates: Dict[str, Any]) -> Dict[str, Any]: """ Mise à jour d'un document - + Args: doc_id: ID du document updates: Mises à jour à appliquer - + Returns: Résultat de la mise à jour """ logger.info(f"🔄 Mise à jour du document {doc_id}") - + try: # Ajout de la date de mise à jour updates['updated_at'] = datetime.now().isoformat() - + response = self.client.update( index=self.default_index, id=doc_id, @@ -432,7 +432,7 @@ class OpenSearchClient: "doc": updates } ) - + logger.info(f"✅ Document {doc_id} mis à jour") return { 'status': 'updated', @@ -440,57 +440,57 @@ class OpenSearchClient: 'version': response.get('_version'), 'updated_at': datetime.now().isoformat() } - + except Exception as e: logger.error(f"❌ Erreur lors de la mise à jour du document {doc_id}: {e}") return { 'status': 'error', 'error': str(e) } - + async def delete_document(self, doc_id: str) -> Dict[str, Any]: """ Suppression d'un document - + Args: doc_id: ID du document - + Returns: Résultat de la suppression """ logger.info(f"🗑️ Suppression du document {doc_id}") - + try: response = self.client.delete( index=self.default_index, id=doc_id ) - + logger.info(f"✅ Document {doc_id} supprimé") return { 'status': 'deleted', 'doc_id': doc_id, 'deleted_at': datetime.now().isoformat() } - + except Exception as e: logger.error(f"❌ Erreur lors de la suppression du document {doc_id}: {e}") return { 'status': 'error', 'error': str(e) } - + async def get_index_stats(self) -> Dict[str, Any]: """ Récupération des statistiques de l'index - + Returns: Statistiques de l'index """ try: stats = self.client.indices.stats(index=self.default_index) index_stats = stats['indices'][self.default_index] - + return { 'status': 'success', 'index': self.default_index, @@ -502,7 +502,7 @@ class OpenSearchClient: }, 'retrieved_at': datetime.now().isoformat() } - + except Exception as e: logger.error(f"❌ Erreur lors de la récupération des statistiques: {e}") return {