- Add ia_dev submodule (projects/smart_ide on forge 4nk) - Document APIs, orchestrator, gateway, local-office, rollout - Add systemd/scripts layout; relocate setup scripts - Remove obsolete nginx/enso-only docs from this repo scope
122 lines
3.2 KiB
Python
122 lines
3.2 KiB
Python
"""SQLite metadata: document_id, api_key_id, name, mime_type, size, created_at."""
|
|
import datetime
|
|
import logging
|
|
import sqlite3
|
|
import uuid
|
|
from typing import Any
|
|
|
|
from app.config import get_database_path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TABLE = """
|
|
CREATE TABLE IF NOT EXISTS documents (
|
|
document_id TEXT PRIMARY KEY,
|
|
api_key_id TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
mime_type TEXT NOT NULL,
|
|
size INTEGER NOT NULL,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_documents_api_key ON documents(api_key_id);
|
|
"""
|
|
|
|
|
|
def _connect() -> sqlite3.Connection:
|
|
db_path = get_database_path()
|
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_db() -> None:
|
|
"""Create table and index if not exist."""
|
|
conn = _connect()
|
|
try:
|
|
conn.executescript(TABLE)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]:
|
|
return dict(row) if row else {}
|
|
|
|
|
|
def insert_document(
|
|
document_id: str,
|
|
api_key_id: str,
|
|
name: str,
|
|
mime_type: str,
|
|
size: int,
|
|
) -> None:
|
|
"""Insert one document metadata row."""
|
|
now = datetime.datetime.utcnow().isoformat() + "Z"
|
|
conn = _connect()
|
|
try:
|
|
conn.execute(
|
|
"INSERT INTO documents (document_id, api_key_id, name, mime_type, size, created_at) VALUES (?,?,?,?,?,?)",
|
|
(document_id, api_key_id, name, mime_type, size, now),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_document(document_id: str) -> dict[str, Any] | None:
|
|
"""Get metadata by document_id."""
|
|
conn = _connect()
|
|
try:
|
|
cur = conn.execute(
|
|
"SELECT document_id, api_key_id, name, mime_type, size, created_at FROM documents WHERE document_id = ?",
|
|
(document_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return _row_to_dict(row) if row else None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def list_documents(api_key_id: str) -> list[dict[str, Any]]:
|
|
"""List documents for an API key."""
|
|
conn = _connect()
|
|
try:
|
|
cur = conn.execute(
|
|
"SELECT document_id, api_key_id, name, mime_type, size, created_at FROM documents WHERE api_key_id = ? ORDER BY created_at DESC",
|
|
(api_key_id,),
|
|
)
|
|
return [_row_to_dict(row) for row in cur.fetchall()]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def update_document_size(document_id: str, size: int) -> bool:
|
|
"""Update size for document. Returns True if row updated."""
|
|
conn = _connect()
|
|
try:
|
|
cur = conn.execute(
|
|
"UPDATE documents SET size = ? WHERE document_id = ?",
|
|
(size, document_id),
|
|
)
|
|
conn.commit()
|
|
return cur.rowcount > 0
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def delete_document_metadata(document_id: str) -> bool:
|
|
"""Delete metadata row. Returns True if a row was deleted."""
|
|
conn = _connect()
|
|
try:
|
|
cur = conn.execute("DELETE FROM documents WHERE document_id = ?", (document_id,))
|
|
conn.commit()
|
|
return cur.rowcount > 0
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def generate_document_id() -> str:
|
|
"""New unique document id (UUID4)."""
|
|
return str(uuid.uuid4())
|