""" Client pour l'intégration avec OpenSearch """ import os import logging from typing import Dict, Any, List, Optional from opensearchpy import OpenSearch, RequestsHttpConnection import json from datetime import datetime logger = logging.getLogger(__name__) class OpenSearchClient: """Client pour l'intégration avec OpenSearch""" def __init__(self): self.host = os.getenv('OPENSEARCH_HOST', 'opensearch') self.port = int(os.getenv('OPENSEARCH_PORT', '9200')) self.username = os.getenv('OPENSEARCH_USER', 'admin') self.password = os.getenv('OPENSEARCH_PASSWORD', 'opensearch_pwd') self.use_ssl = os.getenv('OPENSEARCH_USE_SSL', 'false').lower() == 'true' # Configuration du client OpenSearch self.client = OpenSearch( hosts=[{'host': self.host, 'port': self.port}], http_auth=(self.username, self.password), use_ssl=self.use_ssl, verify_certs=False, connection_class=RequestsHttpConnection, timeout=30 ) # Index par défaut self.default_index = os.getenv('OPENSEARCH_INDEX', 'notariat_documents') self._ensure_index_exists() def _ensure_index_exists(self): """Vérifie et crée l'index s'il n'existe pas""" try: if not self.client.indices.exists(index=self.default_index): self._create_index() logger.info(f"✅ Index {self.default_index} créé") else: logger.info(f"✅ Index {self.default_index} existe déjà") except Exception as e: logger.error(f"❌ Erreur lors de la vérification de l'index: {e}") def _create_index(self): """Crée l'index avec le mapping approprié""" mapping = { "mappings": { "properties": { "doc_id": {"type": "keyword"}, "dossier_id": {"type": "keyword"}, "etude_id": {"type": "keyword"}, "utilisateur_id": {"type": "keyword"}, "filename": {"type": "text", "analyzer": "french"}, "doc_type": {"type": "keyword"}, "status": {"type": "keyword"}, "text_content": { "type": "text", "analyzer": "french", "fields": { "keyword": {"type": "keyword"}, "suggest": {"type": "completion"} } }, "entities": { "type": "nested", "properties": { "type": {"type": "keyword"}, "value": {"type": "text", "analyzer": "french"}, "confidence": {"type": "float"} } }, "metadata": {"type": "object"}, "processing_info": { "type": "object", "properties": { "ocr_confidence": {"type": "float"}, "classification_confidence": {"type": "float"}, "processing_time": {"type": "float"}, "steps_completed": {"type": "keyword"} } }, "created_at": {"type": "date"}, "updated_at": {"type": "date"} } }, "settings": { "number_of_shards": 1, "number_of_replicas": 0, "analysis": { "analyzer": { "french": { "type": "custom", "tokenizer": "standard", "filter": ["lowercase", "french_stemmer", "french_stop"] } }, "filter": { "french_stemmer": { "type": "stemmer", "language": "french" }, "french_stop": { "type": "stop", "stopwords": "_french_" } } } } } self.client.indices.create(index=self.default_index, body=mapping) async def index_document(self, doc_id: str, document_data: Dict[str, Any]) -> Dict[str, Any]: """ Indexation d'un document dans OpenSearch Args: doc_id: ID du document document_data: Données du document à indexer Returns: Résultat de l'indexation """ logger.info(f"📚 Indexation du document {doc_id} dans OpenSearch") try: # Préparation du document pour l'indexation indexed_doc = { "doc_id": doc_id, "dossier_id": document_data.get('dossier_id'), "etude_id": document_data.get('etude_id'), "utilisateur_id": document_data.get('utilisateur_id'), "filename": document_data.get('filename'), "doc_type": document_data.get('doc_type'), "status": document_data.get('status', 'processed'), "text_content": document_data.get('text_content', ''), "entities": document_data.get('entities', []), "metadata": document_data.get('metadata', {}), "processing_info": document_data.get('processing_info', {}), "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat() } # Indexation du document response = self.client.index( index=self.default_index, id=doc_id, body=indexed_doc ) logger.info(f"✅ Document {doc_id} indexé avec succès") return { 'status': 'indexed', 'doc_id': doc_id, 'index': self.default_index, 'version': response.get('_version'), 'indexed_at': datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Erreur lors de l'indexation du document {doc_id}: {e}") return { 'status': 'error', 'error': str(e) } async def search_documents(self, query: str, filters: Dict[str, Any] = None, limit: int = 10, offset: int = 0) -> Dict[str, Any]: """ Recherche de documents dans OpenSearch Args: query: Requête de recherche filters: Filtres à appliquer limit: Nombre maximum de résultats offset: Décalage pour la pagination Returns: Résultats de la recherche """ logger.info(f"🔍 Recherche dans OpenSearch: {query}") try: # Construction de la requête search_body = { "query": { "bool": { "must": [ { "multi_match": { "query": query, "fields": ["text_content^2", "filename", "entities.value"], "type": "best_fields", "fuzziness": "AUTO" } } ] } }, "highlight": { "fields": { "text_content": { "fragment_size": 150, "number_of_fragments": 3 } } }, "sort": [ {"_score": {"order": "desc"}}, {"created_at": {"order": "desc"}} ], "from": offset, "size": limit } # Ajout des filtres if filters: bool_query = search_body["query"]["bool"] bool_query["filter"] = [] for field, value in filters.items(): if isinstance(value, list): bool_query["filter"].append({ "terms": {field: value} }) else: bool_query["filter"].append({ "term": {field: value} }) # Exécution de la recherche response = self.client.search( index=self.default_index, body=search_body ) # Traitement des résultats hits = response.get('hits', {}) total = hits.get('total', {}).get('value', 0) results = [] for hit in hits.get('hits', []): result = { 'doc_id': hit['_source']['doc_id'], 'filename': hit['_source'].get('filename'), 'doc_type': hit['_source'].get('doc_type'), 'score': hit['_score'], 'highlights': hit.get('highlight', {}), 'created_at': hit['_source'].get('created_at') } results.append(result) logger.info(f"✅ Recherche terminée: {len(results)} résultats sur {total}") return { 'status': 'completed', 'query': query, 'total': total, 'results': results, 'took': response.get('took'), 'searched_at': datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Erreur lors de la recherche: {e}") return { 'status': 'error', 'error': str(e) } async def search_by_entities(self, entities: Dict[str, Any], limit: int = 10) -> Dict[str, Any]: """ Recherche de documents par entités Args: entities: Entités à rechercher limit: Nombre maximum de résultats Returns: Résultats de la recherche """ logger.info(f"🏷️ Recherche par entités dans OpenSearch") try: # Construction de la requête pour les entités must_queries = [] for entity_type, entity_values in entities.items(): if isinstance(entity_values, list): for value in entity_values: must_queries.append({ "nested": { "path": "entities", "query": { "bool": { "must": [ {"term": {"entities.type": entity_type}}, {"match": {"entities.value": value}} ] } } } }) else: must_queries.append({ "nested": { "path": "entities", "query": { "bool": { "must": [ {"term": {"entities.type": entity_type}}, {"match": {"entities.value": entity_values}} ] } } } }) search_body = { "query": { "bool": { "must": must_queries } }, "sort": [ {"_score": {"order": "desc"}}, {"created_at": {"order": "desc"}} ], "size": limit } # Exécution de la recherche response = self.client.search( index=self.default_index, body=search_body ) # Traitement des résultats hits = response.get('hits', {}) total = hits.get('total', {}).get('value', 0) results = [] for hit in hits.get('hits', []): result = { 'doc_id': hit['_source']['doc_id'], 'filename': hit['_source'].get('filename'), 'doc_type': hit['_source'].get('doc_type'), 'score': hit['_score'], 'entities': hit['_source'].get('entities', []), 'created_at': hit['_source'].get('created_at') } results.append(result) logger.info(f"✅ Recherche par entités terminée: {len(results)} résultats") return { 'status': 'completed', 'entities': entities, 'total': total, 'results': results, 'searched_at': datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Erreur lors de la recherche par entités: {e}") return { 'status': 'error', 'error': str(e) } async def get_document(self, doc_id: str) -> Dict[str, Any]: """ Récupération d'un document par son ID Args: doc_id: ID du document Returns: Document récupéré """ try: response = self.client.get( index=self.default_index, id=doc_id ) if response.get('found'): return { 'status': 'found', 'doc_id': doc_id, 'document': response['_source'], 'retrieved_at': datetime.now().isoformat() } else: return { 'status': 'not_found', 'doc_id': doc_id } except Exception as e: logger.error(f"❌ Erreur lors de la récupération du document {doc_id}: {e}") return { 'status': 'error', 'error': str(e) } async def update_document(self, doc_id: str, updates: Dict[str, Any]) -> Dict[str, Any]: """ Mise à jour d'un document Args: doc_id: ID du document updates: Mises à jour à appliquer Returns: Résultat de la mise à jour """ logger.info(f"🔄 Mise à jour du document {doc_id}") try: # Ajout de la date de mise à jour updates['updated_at'] = datetime.now().isoformat() response = self.client.update( index=self.default_index, id=doc_id, body={ "doc": updates } ) logger.info(f"✅ Document {doc_id} mis à jour") return { 'status': 'updated', 'doc_id': doc_id, 'version': response.get('_version'), 'updated_at': datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Erreur lors de la mise à jour du document {doc_id}: {e}") return { 'status': 'error', 'error': str(e) } async def delete_document(self, doc_id: str) -> Dict[str, Any]: """ Suppression d'un document Args: doc_id: ID du document Returns: Résultat de la suppression """ logger.info(f"🗑️ Suppression du document {doc_id}") try: response = self.client.delete( index=self.default_index, id=doc_id ) logger.info(f"✅ Document {doc_id} supprimé") return { 'status': 'deleted', 'doc_id': doc_id, 'deleted_at': datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Erreur lors de la suppression du document {doc_id}: {e}") return { 'status': 'error', 'error': str(e) } async def get_index_stats(self) -> Dict[str, Any]: """ Récupération des statistiques de l'index Returns: Statistiques de l'index """ try: stats = self.client.indices.stats(index=self.default_index) index_stats = stats['indices'][self.default_index] return { 'status': 'success', 'index': self.default_index, 'stats': { 'documents_count': index_stats['total']['docs']['count'], 'size_in_bytes': index_stats['total']['store']['size_in_bytes'], 'indexing_total': index_stats['total']['indexing']['index_total'], 'search_total': index_stats['total']['search']['query_total'] }, 'retrieved_at': datetime.now().isoformat() } except Exception as e: logger.error(f"❌ Erreur lors de la récupération des statistiques: {e}") return { 'status': 'error', 'error': str(e) }