- Add ia_dev submodule (projects/smart_ide on forge 4nk) - Document APIs, orchestrator, gateway, local-office, rollout - Add systemd/scripts layout; relocate setup scripts - Remove obsolete nginx/enso-only docs from this repo scope
49 lines
1.5 KiB
Python
49 lines
1.5 KiB
Python
"""Local file storage for documents. Path derived from document_id."""
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
from app.config import get_storage_path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _ensure_storage() -> Path:
|
|
root = get_storage_path()
|
|
root.mkdir(parents=True, exist_ok=True)
|
|
return root
|
|
|
|
|
|
def get_document_path(document_id: str) -> Path:
|
|
"""Path to the file for document_id. Does not create parent."""
|
|
root = _ensure_storage()
|
|
return root / document_id
|
|
|
|
|
|
def write_document(document_id: str, content: bytes) -> None:
|
|
"""Write document bytes. Overwrites if exists."""
|
|
path = get_document_path(document_id)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_bytes(content)
|
|
logger.info("Wrote document %s (%d bytes)", document_id, len(content))
|
|
|
|
|
|
def read_document(document_id: str) -> bytes:
|
|
"""Read document bytes. Raises FileNotFoundError if missing."""
|
|
path = get_document_path(document_id)
|
|
if not path.is_file():
|
|
raise FileNotFoundError(f"Document not found: {document_id}")
|
|
return path.read_bytes()
|
|
|
|
|
|
def delete_document_file(document_id: str) -> None:
|
|
"""Remove document file. No-op if missing."""
|
|
path = get_document_path(document_id)
|
|
if path.is_file():
|
|
path.unlink()
|
|
logger.info("Deleted document file %s", document_id)
|
|
|
|
|
|
def document_file_exists(document_id: str) -> bool:
|
|
"""Return True if file exists."""
|
|
return get_document_path(document_id).is_file()
|