# Load project config (projects//conf.json) for tickets spooler and authorized_emails. # Project id comes from PROJECT_ID (set by shell project_config.sh from MAIL_TO or AI_AGENT_TOKEN) or from per-message resolution (resolve_project_id_by_email_to). from __future__ import annotations import json import os from pathlib import Path def project_root() -> Path: """Project repo root when applicable. Derived from PROJECT_ROOT, REPO_ROOT or GITEA_ISSUES_DIR.""" env_root = os.environ.get("PROJECT_ROOT") if env_root: return Path(env_root).resolve() env_repo = os.environ.get("REPO_ROOT") if env_repo: root = Path(env_repo).resolve() # If REPO_ROOT is ia_dev (has gitea-issues), use it as repo root if (root / "gitea-issues").is_dir(): return root return root issues_dir = os.environ.get("GITEA_ISSUES_DIR") if issues_dir: return Path(issues_dir).resolve().parent.parent return Path(__file__).resolve().parent.parent.parent def ia_dev_root() -> Path: """Directory containing gitea-issues (ia_dev).""" issues_dir = os.environ.get("GITEA_ISSUES_DIR") if issues_dir: return Path(issues_dir).resolve().parent return Path(__file__).resolve().parent.parent def load_project_config() -> dict | None: """Load projects//conf.json. Uses PROJECT_ID from env (set by shell from MAIL_TO or AI_AGENT_TOKEN). Returns None if not found or PROJECT_ID missing.""" project_id = os.environ.get("PROJECT_ID", "").strip() if not project_id: return None return load_project_config_by_id(project_id) def project_dir() -> Path | None: """Path to projects// (under ia_dev). Uses PROJECT_ID from env. None if PROJECT_ID not set.""" project_id = os.environ.get("PROJECT_ID", "").strip() if not project_id: return None return ia_dev_root() / "projects" / project_id def data_issues_dir() -> Path: """Path to data/issues/ spooler under projects// (ia_dev/projects//data/issues).""" pd = project_dir() if pd is not None: return pd / "data" / "issues" return project_root() / "data" / "issues" def data_issues_dir_for_project(project_id: str) -> Path: """Path to data/issues/ for a given project id (ia_dev/projects//data/issues).""" ia_dev = ia_dev_root() return ia_dev / "projects" / project_id / "data" / "issues" def project_logs_dir() -> Path: """Path to logs/ under projects// (ia_dev/projects//logs).""" pd = project_dir() if pd is not None: return pd / "logs" return project_root() / "logs" def authorized_emails() -> dict[str, str | list[str]]: """Return tickets.authorized_emails (to, from list). Empty dict if missing.""" conf = load_project_config() if not conf: return {} tickets = conf.get("tickets") or {} return tickets.get("authorized_emails") or {} def list_project_ids() -> list[str]: """List all project ids (directory names under projects/).""" ia_dev = ia_dev_root() projects_dir = ia_dev / "projects" if not projects_dir.is_dir(): return [] return [d.name for d in projects_dir.iterdir() if d.is_dir() and (d / "conf.json").is_file()] def _normalize_conf_to_addresses(auth_to: object) -> set[str]: """Return set of normalized (lowercase) email addresses from authorized_emails.to. Supports: str (single address), list of str, or list of dict with env keys (e.g. test, pprod, prod). Address pattern AI..@4nkweb.com; project_id and env may be uppercase.""" out: set[str] = set() if not auth_to: return out if isinstance(auth_to, str): a = auth_to.strip().lower() if a: out.add(a) return out if isinstance(auth_to, list): for item in auth_to: if isinstance(item, str): a = item.strip().lower() if a: out.add(a) elif isinstance(item, dict): for v in item.values(): if isinstance(v, str): a = v.strip().lower() if a: out.add(a) return out def resolve_project_id_by_email_to(to_address: str) -> str | None: """Find project id whose tickets.authorized_emails.to matches the given address (case-insensitive). authorized_emails.to may be a single string or a list of objects { test, pprod, prod } with addresses AI..@4nkweb.com (project_id and env may be uppercase).""" if not to_address or not to_address.strip(): return None to_normalized = to_address.strip().lower() for pid in list_project_ids(): conf_path = ia_dev_root() / "projects" / pid / "conf.json" try: with open(conf_path, encoding="utf-8") as f: conf = json.load(f) except (OSError, json.JSONDecodeError): continue tickets = conf.get("tickets") or {} auth = tickets.get("authorized_emails") or {} conf_to_set = _normalize_conf_to_addresses(auth.get("to")) if to_normalized in conf_to_set: return pid return None def _list_project_envs(project_id: str) -> list[str]: """List env names (subdirs of .secrets) for a project that contain ia_token.""" ia_dev = ia_dev_root() secrets_dir = ia_dev / "projects" / project_id / ".secrets" if not secrets_dir.is_dir(): return [] return [ d.name for d in secrets_dir.iterdir() if d.is_dir() and (d / "ia_token").is_file() ] def resolve_project_and_env_by_token(token: str) -> tuple[str, str] | None: """Find (project_id, env) by scanning projects//.secrets//ia_token. Returns first match.""" if not token or not token.strip(): return None token_stripped = token.strip() for pid in list_project_ids(): for env in _list_project_envs(pid): token_path = ia_dev_root() / "projects" / pid / ".secrets" / env / "ia_token" try: content = token_path.read_text(encoding="utf-8").strip() # Token is either full value in file or base + env (e.g. nicolecoffreio) if content == token_stripped or (content + env) == token_stripped: return (pid, env) except (OSError, UnicodeDecodeError): continue return None def resolve_project_id_by_token(token: str) -> str | None: """Find project id whose .secrets//ia_token matches the given token.""" resolved = resolve_project_and_env_by_token(token) return resolved[0] if resolved else None def load_project_config_by_id(project_id: str) -> dict | None: """Load conf.json for a given project id. Returns None if not found.""" ia_dev = ia_dev_root() conf_path = ia_dev / "projects" / project_id / "conf.json" if not conf_path.is_file(): return None with open(conf_path, encoding="utf-8") as f: return json.load(f)