
- Infrastructure complète de traitement de documents notariaux - API FastAPI d'ingestion et d'orchestration - Pipelines Celery pour le traitement asynchrone - Support des formats PDF, JPEG, PNG, TIFF, HEIC - OCR avec Tesseract et correction lexicale - Classification automatique des documents avec Ollama - Extraction de données structurées - Indexation dans AnythingLLM et OpenSearch - Système de vérifications et contrôles métier - Base de données PostgreSQL pour le métier - Stockage objet avec MinIO - Base de données graphe Neo4j - Recherche plein-texte avec OpenSearch - Supervision avec Prometheus et Grafana - Scripts d'installation pour Debian - Documentation complète - Tests unitaires et de performance - Service systemd pour le déploiement - Scripts de déploiement automatisés
70 lines
1.8 KiB
Python
70 lines
1.8 KiB
Python
"""
|
|
Système de mise en queue des tâches
|
|
"""
|
|
from redis import Redis
|
|
import json
|
|
import os
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration Redis
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
|
|
r = Redis.from_url(REDIS_URL)
|
|
|
|
def enqueue_import(doc_id: str, meta: dict):
|
|
"""
|
|
Mise en queue d'un document pour traitement
|
|
"""
|
|
try:
|
|
payload = {
|
|
"doc_id": doc_id,
|
|
"meta": meta,
|
|
"timestamp": int(time.time())
|
|
}
|
|
|
|
# Ajout à la queue d'import
|
|
r.lpush("queue:import", json.dumps(payload))
|
|
|
|
# Incrémentation du compteur de tâches en attente
|
|
r.incr("stats:pending_tasks")
|
|
|
|
logger.info(f"Document {doc_id} ajouté à la queue d'import")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la mise en queue du document {doc_id}: {e}")
|
|
raise
|
|
|
|
def get_queue_status():
|
|
"""
|
|
Récupération du statut des queues
|
|
"""
|
|
try:
|
|
pending_imports = r.llen("queue:import")
|
|
pending_tasks = r.get("stats:pending_tasks") or 0
|
|
|
|
return {
|
|
"pending_imports": pending_imports,
|
|
"pending_tasks": int(pending_tasks),
|
|
"redis_connected": True
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la récupération du statut des queues: {e}")
|
|
return {
|
|
"pending_imports": 0,
|
|
"pending_tasks": 0,
|
|
"redis_connected": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
def clear_queue(queue_name: str):
|
|
"""
|
|
Vidage d'une queue
|
|
"""
|
|
try:
|
|
r.delete(queue_name)
|
|
logger.info(f"Queue {queue_name} vidée")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du vidage de la queue {queue_name}: {e}")
|
|
raise
|