""" Client pour l'intégration avec AnythingLLM """ import os import logging import requests from typing import Dict, Any, List, Optional import json from datetime import datetime logger = logging.getLogger(__name__) class AnythingLLMClient: """Client pour l'intégration avec AnythingLLM""" def __init__(self): self.base_url = os.getenv('ANYLLM_BASE_URL', 'http://anythingllm:3001') self.api_key = os.getenv('ANYLLM_API_KEY', 'change_me') # Configuration des workspaces self.workspaces = { 'normes': os.getenv('ANYLLM_WORKSPACE_NORMES', 'workspace_normes'), 'trames': os.getenv('ANYLLM_WORKSPACE_TRAMES', 'workspace_trames'), 'actes': os.getenv('ANYLLM_WORKSPACE_ACTES', 'workspace_actes') } self.session = requests.Session() self.session.headers.update({ 'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json' }) async def create_workspace(self, name: str, description: str = None) -> Dict[str, Any]: """ Création d'un workspace AnythingLLM Args: name: Nom du workspace description: Description du workspace Returns: Résultat de la création """ logger.info(f"🏢 Création du workspace AnythingLLM: {name}") try: payload = { 'name': name, 'description': description or f"Workspace {name} pour le pipeline notarial", 'openAiTemp': 0.7, 'openAiHistory': 20, 'openAiMaxTokens': 4000, 'openAiModel': 'gpt-3.5-turbo', 'embeddingsEngine': 'openai', 'embeddingsModel': 'text-embedding-ada-002', 'vectorTag': name.lower().replace(' ', '_') } response = self.session.post( f"{self.base_url}/api/workspace/new", json=payload, timeout=30 ) if response.status_code == 200: data = response.json() logger.info(f"✅ Workspace {name} créé avec succès") return { 'status': 'created', 'workspace_id': data.get('id'), 'workspace_name': name, 'created_at': datetime.now().isoformat() } else: logger.error(f"Erreur lors de la création du workspace: {response.status_code}") return { 'status': 'error', 'error': f"Erreur API: {response.status_code}", 'response': response.text } except Exception as e: logger.error(f"Erreur lors de la création du workspace {name}: {e}") return { 'status': 'error', 'error': str(e) } async def upload_document(self, workspace_id: str, document_data: bytes, filename: str, metadata: Dict[str, Any] = None) -> Dict[str, Any]: """ Upload d'un document dans un workspace Args: workspace_id: ID du workspace document_data: Données du document filename: Nom du fichier metadata: Métadonnées du document Returns: Résultat de l'upload """ logger.info(f"📄 Upload du document {filename} dans le workspace {workspace_id}") try: # Préparation des fichiers files = { 'file': (filename, document_data, 'application/octet-stream') } # Préparation des données data = { 'workspaceId': workspace_id, 'chunkSize': 1000, 'chunkOverlap': 200 } if metadata: data['metadata'] = json.dumps(metadata) # Suppression de l'header Content-Type pour les multipart headers = {'Authorization': f'Bearer {self.api_key}'} response = requests.post( f"{self.base_url}/api/workspace/{workspace_id}/upload", files=files, data=data, headers=headers, timeout=60 ) if response.status_code == 200: data = response.json() logger.info(f"✅ Document {filename} uploadé avec succès") return { 'status': 'uploaded', 'document_id': data.get('id'), 'filename': filename, 'workspace_id': workspace_id, 'chunks_created': data.get('chunks', 0), 'uploaded_at': datetime.now().isoformat() } else: logger.error(f"Erreur lors de l'upload: {response.status_code}") return { 'status': 'error', 'error': f"Erreur API: {response.status_code}", 'response': response.text } except Exception as e: logger.error(f"Erreur lors de l'upload du document {filename}: {e}") return { 'status': 'error', 'error': str(e) } async def search_documents(self, workspace_id: str, query: str, limit: int = 10) -> Dict[str, Any]: """ Recherche dans les documents d'un workspace Args: workspace_id: ID du workspace query: Requête de recherche limit: Nombre maximum de résultats Returns: Résultats de la recherche """ logger.info(f"🔍 Recherche dans le workspace {workspace_id}: {query}") try: payload = { 'workspaceId': workspace_id, 'query': query, 'mode': 'chat', 'maxTokens': 4000, 'temperature': 0.7, 'topK': limit } response = self.session.post( f"{self.base_url}/api/workspace/{workspace_id}/chat", json=payload, timeout=30 ) if response.status_code == 200: data = response.json() logger.info(f"✅ Recherche terminée, {len(data.get('sources', []))} résultats") return { 'status': 'completed', 'query': query, 'workspace_id': workspace_id, 'results': data.get('sources', []), 'response': data.get('text', ''), 'searched_at': datetime.now().isoformat() } else: logger.error(f"Erreur lors de la recherche: {response.status_code}") return { 'status': 'error', 'error': f"Erreur API: {response.status_code}", 'response': response.text } except Exception as e: logger.error(f"Erreur lors de la recherche: {e}") return { 'status': 'error', 'error': str(e) } async def get_workspace_info(self, workspace_id: str) -> Dict[str, Any]: """ Récupération des informations d'un workspace Args: workspace_id: ID du workspace Returns: Informations du workspace """ try: response = self.session.get( f"{self.base_url}/api/workspace/{workspace_id}", timeout=10 ) if response.status_code == 200: data = response.json() return { 'status': 'found', 'workspace': data, 'retrieved_at': datetime.now().isoformat() } else: return { 'status': 'error', 'error': f"Erreur API: {response.status_code}" } except Exception as e: logger.error(f"Erreur lors de la récupération du workspace: {e}") return { 'status': 'error', 'error': str(e) } async def list_workspaces(self) -> Dict[str, Any]: """ Liste tous les workspaces disponibles Returns: Liste des workspaces """ try: response = self.session.get( f"{self.base_url}/api/workspaces", timeout=10 ) if response.status_code == 200: data = response.json() return { 'status': 'success', 'workspaces': data.get('workspaces', []), 'count': len(data.get('workspaces', [])), 'retrieved_at': datetime.now().isoformat() } else: return { 'status': 'error', 'error': f"Erreur API: {response.status_code}" } except Exception as e: logger.error(f"Erreur lors de la liste des workspaces: {e}") return { 'status': 'error', 'error': str(e) } async def index_document_for_actes(self, doc_id: str, text: str, entities: Dict[str, Any], doc_type: str) -> Dict[str, Any]: """ Indexation d'un document dans le workspace des actes Args: doc_id: ID du document text: Texte du document entities: Entités extraites doc_type: Type de document Returns: Résultat de l'indexation """ logger.info(f"📚 Indexation du document {doc_id} dans le workspace actes") try: # Préparation du contenu structuré structured_content = self._prepare_structured_content(doc_id, text, entities, doc_type) # Upload du contenu structuré workspace_id = await self._get_workspace_id('actes') if not workspace_id: return { 'status': 'error', 'error': 'Workspace actes non trouvé' } filename = f"{doc_id}_structured.txt" document_data = structured_content.encode('utf-8') result = await self.upload_document(workspace_id, document_data, filename, { 'doc_id': doc_id, 'doc_type': doc_type, 'entities': entities, 'indexed_at': datetime.now().isoformat() }) return result except Exception as e: logger.error(f"Erreur lors de l'indexation du document {doc_id}: {e}") return { 'status': 'error', 'error': str(e) } async def search_similar_actes(self, doc_type: str, entities: Dict[str, Any]) -> Dict[str, Any]: """ Recherche d'actes similaires Args: doc_type: Type de document entities: Entités extraites Returns: Actes similaires trouvés """ logger.info(f"🔍 Recherche d'actes similaires pour le type: {doc_type}") try: workspace_id = await self._get_workspace_id('actes') if not workspace_id: return { 'status': 'error', 'error': 'Workspace actes non trouvé' } # Construction de la requête de recherche query = self._build_similarity_query(doc_type, entities) result = await self.search_documents(workspace_id, query, limit=5) return result except Exception as e: logger.error(f"Erreur lors de la recherche d'actes similaires: {e}") return { 'status': 'error', 'error': str(e) } def _prepare_structured_content(self, doc_id: str, text: str, entities: Dict[str, Any], doc_type: str) -> str: """Prépare le contenu structuré pour l'indexation""" content = f"""DOCUMENT ID: {doc_id} TYPE: {doc_type} DATE D'INDEXATION: {datetime.now().isoformat()} ENTITÉS EXTRAITES: {json.dumps(entities, indent=2, ensure_ascii=False)} TEXTE DU DOCUMENT: {text} --- Ce document a été traité par le pipeline notarial v1.2.0 """ return content def _build_similarity_query(self, doc_type: str, entities: Dict[str, Any]) -> str: """Construit une requête de recherche pour trouver des actes similaires""" query_parts = [f"type:{doc_type}"] # Ajout des entités importantes if 'vendeur' in entities: query_parts.append(f"vendeur:{entities['vendeur'].get('nom', '')}") if 'acheteur' in entities: query_parts.append(f"acheteur:{entities['acheteur'].get('nom', '')}") if 'bien' in entities: query_parts.append(f"adresse:{entities['bien'].get('adresse', '')}") return " ".join(query_parts) async def _get_workspace_id(self, workspace_name: str) -> Optional[str]: """Récupère l'ID d'un workspace par son nom""" try: workspaces_result = await self.list_workspaces() if workspaces_result['status'] == 'success': for workspace in workspaces_result['workspaces']: if workspace.get('name') == workspace_name: return workspace.get('id') return None except Exception as e: logger.error(f"Erreur lors de la récupération de l'ID du workspace {workspace_name}: {e}") return None