ia_dev/gitea-issues/project_config.py
2026-03-16 16:52:55 +01:00

187 lines
6.9 KiB
Python

# Load project config (projects/<id>/conf.json) for tickets spooler and authorized_emails.
# Project id comes from PROJECT_ID (set by shell project_config.sh from MAIL_TO or AI_AGENT_TOKEN) or from per-message resolution (resolve_project_id_by_email_to).
from __future__ import annotations
import json
import os
from pathlib import Path
def project_root() -> Path:
"""Project repo root when applicable. Derived from PROJECT_ROOT, REPO_ROOT or GITEA_ISSUES_DIR."""
env_root = os.environ.get("PROJECT_ROOT")
if env_root:
return Path(env_root).resolve()
env_repo = os.environ.get("REPO_ROOT")
if env_repo:
root = Path(env_repo).resolve()
# If REPO_ROOT is ia_dev (has gitea-issues), use it as repo root
if (root / "gitea-issues").is_dir():
return root
return root
issues_dir = os.environ.get("GITEA_ISSUES_DIR")
if issues_dir:
return Path(issues_dir).resolve().parent.parent
return Path(__file__).resolve().parent.parent.parent
def ia_dev_root() -> Path:
"""Directory containing gitea-issues (ia_dev)."""
issues_dir = os.environ.get("GITEA_ISSUES_DIR")
if issues_dir:
return Path(issues_dir).resolve().parent
return Path(__file__).resolve().parent.parent
def load_project_config() -> dict | None:
"""Load projects/<id>/conf.json. Uses PROJECT_ID from env (set by shell from MAIL_TO or AI_AGENT_TOKEN). Returns None if not found or PROJECT_ID missing."""
project_id = os.environ.get("PROJECT_ID", "").strip()
if not project_id:
return None
return load_project_config_by_id(project_id)
def project_dir() -> Path | None:
"""Path to projects/<id>/ (under ia_dev). Uses PROJECT_ID from env. None if PROJECT_ID not set."""
project_id = os.environ.get("PROJECT_ID", "").strip()
if not project_id:
return None
return ia_dev_root() / "projects" / project_id
def data_issues_dir() -> Path:
"""Path to data/issues/ spooler under projects/<id>/ (ia_dev/projects/<id>/data/issues)."""
pd = project_dir()
if pd is not None:
return pd / "data" / "issues"
return project_root() / "data" / "issues"
def data_issues_dir_for_project(project_id: str) -> Path:
"""Path to data/issues/ for a given project id (ia_dev/projects/<id>/data/issues)."""
ia_dev = ia_dev_root()
return ia_dev / "projects" / project_id / "data" / "issues"
def project_logs_dir() -> Path:
"""Path to logs/ under projects/<id>/ (ia_dev/projects/<id>/logs)."""
pd = project_dir()
if pd is not None:
return pd / "logs"
return project_root() / "logs"
def authorized_emails() -> dict[str, str | list[str]]:
"""Return tickets.authorized_emails (to, from list). Empty dict if missing."""
conf = load_project_config()
if not conf:
return {}
tickets = conf.get("tickets") or {}
return tickets.get("authorized_emails") or {}
def list_project_ids() -> list[str]:
"""List all project ids (directory names under projects/)."""
ia_dev = ia_dev_root()
projects_dir = ia_dev / "projects"
if not projects_dir.is_dir():
return []
return [d.name for d in projects_dir.iterdir() if d.is_dir() and (d / "conf.json").is_file()]
def _normalize_conf_to_addresses(auth_to: object) -> set[str]:
"""Return set of normalized (lowercase) email addresses from authorized_emails.to.
Supports: str (single address), list of str, or list of dict with env keys (e.g. test, pprod, prod).
Address pattern AI.<project_id>.<env>@4nkweb.com; project_id and env may be uppercase."""
out: set[str] = set()
if not auth_to:
return out
if isinstance(auth_to, str):
a = auth_to.strip().lower()
if a:
out.add(a)
return out
if isinstance(auth_to, list):
for item in auth_to:
if isinstance(item, str):
a = item.strip().lower()
if a:
out.add(a)
elif isinstance(item, dict):
for v in item.values():
if isinstance(v, str):
a = v.strip().lower()
if a:
out.add(a)
return out
def resolve_project_id_by_email_to(to_address: str) -> str | None:
"""Find project id whose tickets.authorized_emails.to matches the given address (case-insensitive).
authorized_emails.to may be a single string or a list of objects { test, pprod, prod } with addresses
AI.<project_id>.<env>@4nkweb.com (project_id and env may be uppercase)."""
if not to_address or not to_address.strip():
return None
to_normalized = to_address.strip().lower()
for pid in list_project_ids():
conf_path = ia_dev_root() / "projects" / pid / "conf.json"
try:
with open(conf_path, encoding="utf-8") as f:
conf = json.load(f)
except (OSError, json.JSONDecodeError):
continue
tickets = conf.get("tickets") or {}
auth = tickets.get("authorized_emails") or {}
conf_to_set = _normalize_conf_to_addresses(auth.get("to"))
if to_normalized in conf_to_set:
return pid
return None
def _list_project_envs(project_id: str) -> list[str]:
"""List env names (subdirs of .secrets) for a project that contain ia_token."""
ia_dev = ia_dev_root()
secrets_dir = ia_dev / "projects" / project_id / ".secrets"
if not secrets_dir.is_dir():
return []
return [
d.name
for d in secrets_dir.iterdir()
if d.is_dir() and (d / "ia_token").is_file()
]
def resolve_project_and_env_by_token(token: str) -> tuple[str, str] | None:
"""Find (project_id, env) by scanning projects/<id>/.secrets/<env>/ia_token. Returns first match."""
if not token or not token.strip():
return None
token_stripped = token.strip()
for pid in list_project_ids():
for env in _list_project_envs(pid):
token_path = ia_dev_root() / "projects" / pid / ".secrets" / env / "ia_token"
try:
content = token_path.read_text(encoding="utf-8").strip()
# Token is either full value in file or base + env (e.g. nicolecoffreio<env>)
if content == token_stripped or (content + env) == token_stripped:
return (pid, env)
except (OSError, UnicodeDecodeError):
continue
return None
def resolve_project_id_by_token(token: str) -> str | None:
"""Find project id whose .secrets/<env>/ia_token matches the given token."""
resolved = resolve_project_and_env_by_token(token)
return resolved[0] if resolved else None
def load_project_config_by_id(project_id: str) -> dict | None:
"""Load conf.json for a given project id. Returns None if not found."""
ia_dev = ia_dev_root()
conf_path = ia_dev / "projects" / project_id / "conf.json"
if not conf_path.is_file():
return None
with open(conf_path, encoding="utf-8") as f:
return json.load(f)