# Load project config (projects//conf.json) for tickets spooler and authorized_emails. # Requires PROJECT_ROOT (repo root with ai_project_id) and GITEA_ISSUES_DIR (ia_dev/gitea-issues). from __future__ import annotations import json import os from pathlib import Path def project_root() -> Path: """Project repo root (parent of ia_dev). Where data/issues/ and ai_project_id live.""" env_root = os.environ.get("PROJECT_ROOT") if env_root: return Path(env_root).resolve() env_repo = os.environ.get("REPO_ROOT") if env_repo: root = Path(env_repo).resolve() # If REPO_ROOT is ia_dev, project root is parent if (root / "gitea-issues").is_dir(): return root.parent return root issues_dir = os.environ.get("GITEA_ISSUES_DIR") if issues_dir: return Path(issues_dir).resolve().parent.parent return Path(__file__).resolve().parent.parent.parent def ia_dev_root() -> Path: """Directory containing gitea-issues (ia_dev).""" issues_dir = os.environ.get("GITEA_ISSUES_DIR") if issues_dir: return Path(issues_dir).resolve().parent return Path(__file__).resolve().parent.parent def load_project_config() -> dict | None: """Load projects//conf.json. Returns None if not found or slug missing.""" root = project_root() ia_dev = ia_dev_root() slug_path = root / "ai_project_id" if not slug_path.is_file(): slug_path = root / ".ia_project" slug = os.environ.get("IA_PROJECT", "").strip() if os.environ.get("IA_PROJECT") else None if not slug and slug_path.is_file(): slug = slug_path.read_text(encoding="utf-8").strip() if not slug: return None conf_path = ia_dev / "projects" / slug / "conf.json" if not conf_path.is_file(): return None with open(conf_path, encoding="utf-8") as f: return json.load(f) def project_dir() -> Path | None: """Path to projects// (under ia_dev). None if project config not found.""" root = project_root() ia_dev = ia_dev_root() slug = os.environ.get("IA_PROJECT", "").strip() if not slug and (root / "ai_project_id").is_file(): slug = (root / "ai_project_id").read_text(encoding="utf-8").strip() if not slug and (root / ".ia_project").is_file(): slug = (root / ".ia_project").read_text(encoding="utf-8").strip() if not slug: return None return ia_dev / "projects" / slug def data_issues_dir() -> Path: """Path to data/issues/ spooler under projects// (ia_dev/projects//data/issues).""" pd = project_dir() if pd is not None: return pd / "data" / "issues" return project_root() / "data" / "issues" def data_issues_dir_for_project(project_id: str) -> Path: """Path to data/issues/ for a given project id (ia_dev/projects//data/issues).""" ia_dev = ia_dev_root() return ia_dev / "projects" / project_id / "data" / "issues" def project_logs_dir() -> Path: """Path to logs/ under projects// (ia_dev/projects//logs).""" pd = project_dir() if pd is not None: return pd / "logs" return project_root() / "logs" def authorized_emails() -> dict[str, str | list[str]]: """Return tickets.authorized_emails (to, from list). Empty dict if missing.""" conf = load_project_config() if not conf: return {} tickets = conf.get("tickets") or {} return tickets.get("authorized_emails") or {} def list_project_ids() -> list[str]: """List all project ids (directory names under projects/).""" ia_dev = ia_dev_root() projects_dir = ia_dev / "projects" if not projects_dir.is_dir(): return [] return [d.name for d in projects_dir.iterdir() if d.is_dir() and (d / "conf.json").is_file()] def resolve_project_id_by_email_to(to_address: str) -> str | None: """Find project id whose tickets.authorized_emails.to matches the given address (case-insensitive).""" if not to_address or not to_address.strip(): return None to_normalized = to_address.strip().lower() for pid in list_project_ids(): conf_path = ia_dev_root() / "projects" / pid / "conf.json" try: with open(conf_path, encoding="utf-8") as f: conf = json.load(f) except (OSError, json.JSONDecodeError): continue tickets = conf.get("tickets") or {} auth = tickets.get("authorized_emails") or {} conf_to = (auth.get("to") or "").strip().lower() if conf_to == to_normalized: return pid return None def _list_project_envs(project_id: str) -> list[str]: """List env names (subdirs of .secrets) for a project that contain ia_token.""" ia_dev = ia_dev_root() secrets_dir = ia_dev / "projects" / project_id / ".secrets" if not secrets_dir.is_dir(): return [] return [ d.name for d in secrets_dir.iterdir() if d.is_dir() and (d / "ia_token").is_file() ] def resolve_project_and_env_by_token(token: str) -> tuple[str, str] | None: """Find (project_id, env) by scanning projects//.secrets//ia_token. Returns first match.""" if not token or not token.strip(): return None token_stripped = token.strip() for pid in list_project_ids(): for env in _list_project_envs(pid): token_path = ia_dev_root() / "projects" / pid / ".secrets" / env / "ia_token" try: content = token_path.read_text(encoding="utf-8").strip() # Token is either full value in file or base + env (e.g. nicolecoffreio) if content == token_stripped or (content + env) == token_stripped: return (pid, env) except (OSError, UnicodeDecodeError): continue return None def resolve_project_id_by_token(token: str) -> str | None: """Find project id whose .secrets//ia_token matches the given token.""" resolved = resolve_project_and_env_by_token(token) return resolved[0] if resolved else None def load_project_config_by_id(project_id: str) -> dict | None: """Load conf.json for a given project id. Returns None if not found.""" ia_dev = ia_dev_root() conf_path = ia_dev / "projects" / project_id / "conf.json" if not conf_path.is_file(): return None with open(conf_path, encoding="utf-8") as f: return json.load(f)