187 lines
6.9 KiB
Python
187 lines
6.9 KiB
Python
# Load project config (projects/<id>/conf.json) for tickets spooler and authorized_emails.
|
|
# Project id comes from PROJECT_ID (set by shell project_config.sh from MAIL_TO or AI_AGENT_TOKEN) or from per-message resolution (resolve_project_id_by_email_to).
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
|
|
|
|
def project_root() -> Path:
|
|
"""Project repo root when applicable. Derived from PROJECT_ROOT, REPO_ROOT or GITEA_ISSUES_DIR."""
|
|
env_root = os.environ.get("PROJECT_ROOT")
|
|
if env_root:
|
|
return Path(env_root).resolve()
|
|
env_repo = os.environ.get("REPO_ROOT")
|
|
if env_repo:
|
|
root = Path(env_repo).resolve()
|
|
# If REPO_ROOT is ia_dev (has gitea-issues), use it as repo root
|
|
if (root / "gitea-issues").is_dir():
|
|
return root
|
|
return root
|
|
issues_dir = os.environ.get("GITEA_ISSUES_DIR")
|
|
if issues_dir:
|
|
return Path(issues_dir).resolve().parent.parent
|
|
return Path(__file__).resolve().parent.parent.parent
|
|
|
|
|
|
def ia_dev_root() -> Path:
|
|
"""Directory containing gitea-issues (ia_dev)."""
|
|
issues_dir = os.environ.get("GITEA_ISSUES_DIR")
|
|
if issues_dir:
|
|
return Path(issues_dir).resolve().parent
|
|
return Path(__file__).resolve().parent.parent
|
|
|
|
|
|
def load_project_config() -> dict | None:
|
|
"""Load projects/<id>/conf.json. Uses PROJECT_ID from env (set by shell from MAIL_TO or AI_AGENT_TOKEN). Returns None if not found or PROJECT_ID missing."""
|
|
project_id = os.environ.get("PROJECT_ID", "").strip()
|
|
if not project_id:
|
|
return None
|
|
return load_project_config_by_id(project_id)
|
|
|
|
|
|
def project_dir() -> Path | None:
|
|
"""Path to projects/<id>/ (under ia_dev). Uses PROJECT_ID from env. None if PROJECT_ID not set."""
|
|
project_id = os.environ.get("PROJECT_ID", "").strip()
|
|
if not project_id:
|
|
return None
|
|
return ia_dev_root() / "projects" / project_id
|
|
|
|
|
|
def data_issues_dir() -> Path:
|
|
"""Path to data/issues/ spooler under projects/<id>/ (ia_dev/projects/<id>/data/issues)."""
|
|
pd = project_dir()
|
|
if pd is not None:
|
|
return pd / "data" / "issues"
|
|
return project_root() / "data" / "issues"
|
|
|
|
|
|
def data_issues_dir_for_project(project_id: str) -> Path:
|
|
"""Path to data/issues/ for a given project id (ia_dev/projects/<id>/data/issues)."""
|
|
ia_dev = ia_dev_root()
|
|
return ia_dev / "projects" / project_id / "data" / "issues"
|
|
|
|
|
|
def project_logs_dir() -> Path:
|
|
"""Path to logs/ under projects/<id>/ (ia_dev/projects/<id>/logs)."""
|
|
pd = project_dir()
|
|
if pd is not None:
|
|
return pd / "logs"
|
|
return project_root() / "logs"
|
|
|
|
|
|
def authorized_emails() -> dict[str, str | list[str]]:
|
|
"""Return tickets.authorized_emails (to, from list). Empty dict if missing."""
|
|
conf = load_project_config()
|
|
if not conf:
|
|
return {}
|
|
tickets = conf.get("tickets") or {}
|
|
return tickets.get("authorized_emails") or {}
|
|
|
|
|
|
def list_project_ids() -> list[str]:
|
|
"""List all project ids (directory names under projects/)."""
|
|
ia_dev = ia_dev_root()
|
|
projects_dir = ia_dev / "projects"
|
|
if not projects_dir.is_dir():
|
|
return []
|
|
return [d.name for d in projects_dir.iterdir() if d.is_dir() and (d / "conf.json").is_file()]
|
|
|
|
|
|
def _normalize_conf_to_addresses(auth_to: object) -> set[str]:
|
|
"""Return set of normalized (lowercase) email addresses from authorized_emails.to.
|
|
Supports: str (single address), list of str, or list of dict with env keys (e.g. test, pprod, prod).
|
|
Address pattern AI.<project_id>.<env>@4nkweb.com; project_id and env may be uppercase."""
|
|
out: set[str] = set()
|
|
if not auth_to:
|
|
return out
|
|
if isinstance(auth_to, str):
|
|
a = auth_to.strip().lower()
|
|
if a:
|
|
out.add(a)
|
|
return out
|
|
if isinstance(auth_to, list):
|
|
for item in auth_to:
|
|
if isinstance(item, str):
|
|
a = item.strip().lower()
|
|
if a:
|
|
out.add(a)
|
|
elif isinstance(item, dict):
|
|
for v in item.values():
|
|
if isinstance(v, str):
|
|
a = v.strip().lower()
|
|
if a:
|
|
out.add(a)
|
|
return out
|
|
|
|
|
|
def resolve_project_id_by_email_to(to_address: str) -> str | None:
|
|
"""Find project id whose tickets.authorized_emails.to matches the given address (case-insensitive).
|
|
authorized_emails.to may be a single string or a list of objects { test, pprod, prod } with addresses
|
|
AI.<project_id>.<env>@4nkweb.com (project_id and env may be uppercase)."""
|
|
if not to_address or not to_address.strip():
|
|
return None
|
|
to_normalized = to_address.strip().lower()
|
|
for pid in list_project_ids():
|
|
conf_path = ia_dev_root() / "projects" / pid / "conf.json"
|
|
try:
|
|
with open(conf_path, encoding="utf-8") as f:
|
|
conf = json.load(f)
|
|
except (OSError, json.JSONDecodeError):
|
|
continue
|
|
tickets = conf.get("tickets") or {}
|
|
auth = tickets.get("authorized_emails") or {}
|
|
conf_to_set = _normalize_conf_to_addresses(auth.get("to"))
|
|
if to_normalized in conf_to_set:
|
|
return pid
|
|
return None
|
|
|
|
|
|
def _list_project_envs(project_id: str) -> list[str]:
|
|
"""List env names (subdirs of .secrets) for a project that contain ia_token."""
|
|
ia_dev = ia_dev_root()
|
|
secrets_dir = ia_dev / "projects" / project_id / ".secrets"
|
|
if not secrets_dir.is_dir():
|
|
return []
|
|
return [
|
|
d.name
|
|
for d in secrets_dir.iterdir()
|
|
if d.is_dir() and (d / "ia_token").is_file()
|
|
]
|
|
|
|
|
|
def resolve_project_and_env_by_token(token: str) -> tuple[str, str] | None:
|
|
"""Find (project_id, env) by scanning projects/<id>/.secrets/<env>/ia_token. Returns first match."""
|
|
if not token or not token.strip():
|
|
return None
|
|
token_stripped = token.strip()
|
|
for pid in list_project_ids():
|
|
for env in _list_project_envs(pid):
|
|
token_path = ia_dev_root() / "projects" / pid / ".secrets" / env / "ia_token"
|
|
try:
|
|
content = token_path.read_text(encoding="utf-8").strip()
|
|
# Token is either full value in file or base + env (e.g. nicolecoffreio<env>)
|
|
if content == token_stripped or (content + env) == token_stripped:
|
|
return (pid, env)
|
|
except (OSError, UnicodeDecodeError):
|
|
continue
|
|
return None
|
|
|
|
|
|
def resolve_project_id_by_token(token: str) -> str | None:
|
|
"""Find project id whose .secrets/<env>/ia_token matches the given token."""
|
|
resolved = resolve_project_and_env_by_token(token)
|
|
return resolved[0] if resolved else None
|
|
|
|
|
|
def load_project_config_by_id(project_id: str) -> dict | None:
|
|
"""Load conf.json for a given project id. Returns None if not found."""
|
|
ia_dev = ia_dev_root()
|
|
conf_path = ia_dev / "projects" / project_id / "conf.json"
|
|
if not conf_path.is_file():
|
|
return None
|
|
with open(conf_path, encoding="utf-8") as f:
|
|
return json.load(f)
|