#!/usr/bin/env python3 """ Thread log: one file per email thread under projects//logs/gitea-issues/threads/. Content: exchanges (received + sent), tickets (issues), commits. Usage: mail-thread-log.py get-id --uid # print THREAD_ID=... mail-thread-log.py init --uid # create/update log from thread mail-thread-log.py append-sent --thread-id --to --subject "..." [--body "..."] [--date "..."] mail-thread-log.py append-issue --thread-id --issue [--title "..."] mail-thread-log.py append-commit --thread-id --hash --message "..." [--branch "..."] """ from __future__ import annotations import argparse import re import subprocess import sys from datetime import datetime, timezone from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) from mail_common import load_gitea_config, load_imap_config, repo_root from project_config import project_logs_dir def threads_dir() -> Path: """Thread log directory: projects//logs/gitea-issues/threads/ or repo logs fallback.""" d = project_logs_dir() / "gitea-issues" / "threads" d.mkdir(parents=True, exist_ok=True) return d def sanitize_thread_id(raw: str, max_len: int = 80) -> str: s = re.sub(r"[^a-zA-Z0-9._-]", "_", raw).strip("_") return s[:max_len] if s else "thread_unknown" def get_thread_output(uid: str) -> str: gitea_dir = Path(__file__).resolve().parent root = gitea_dir.parent env = {"GITEA_ISSUES_DIR": str(gitea_dir)} result = subprocess.run( [sys.executable, str(gitea_dir / "mail-get-thread.py"), uid], cwd=str(root), capture_output=True, text=True, env={**__import__("os").environ, **env}, timeout=60, ) if result.returncode != 0: raise RuntimeError( f"mail-get-thread failed: {result.stderr or result.stdout or 'unknown'}" ) return result.stdout def parse_thread_blocks(text: str) -> list[dict[str, str]]: """Parse --- MAIL UID=... --- ... --- END MAIL --- blocks.""" blocks: list[dict[str, str]] = [] pattern = re.compile( r"--- MAIL\s+UID=(\S+)\s+---\s*\n" r"(?:Message-ID:\s*(.*?)\n)?" r"From:\s*(.*?)\n" r"To:\s*(.*?)\n" r"Subject:\s*(.*?)\n" r"Date:\s*(.*?)\n" r"Body:\s*\n(.*?)--- END MAIL ---", re.DOTALL, ) for m in pattern.finditer(text): blocks.append({ "uid": m.group(1).strip(), "message_id": (m.group(2) or "").strip(), "from": (m.group(3) or "").strip(), "to": (m.group(4) or "").strip(), "subject": (m.group(5) or "").strip(), "date": (m.group(6) or "").strip(), "body": (m.group(7) or "").strip(), }) return blocks def get_thread_id_from_uid(uid: str) -> str: out = get_thread_output(uid) blocks = parse_thread_blocks(out) if not blocks: return sanitize_thread_id(f"thread_uid_{uid}") first_msg_id = (blocks[0].get("message_id") or "").strip() or blocks[0].get("uid", "") return sanitize_thread_id(first_msg_id) def format_exchange_received(block: dict[str, str]) -> str: return ( f"### {block.get('date', '')} — Reçu\n" f"- **De:** {block.get('from', '')}\n" f"- **À:** {block.get('to', '')}\n" f"- **Sujet:** {block.get('subject', '')}\n\n" f"{block.get('body', '')}\n\n" ) def format_exchange_sent(block: dict[str, str]) -> str: return ( f"### {block.get('date', '')} — Envoyé\n" f"- **À:** {block.get('to', '')}\n" f"- **Sujet:** {block.get('subject', '')}\n\n" f"{block.get('body', '')}\n\n" ) def init_log(uid: str) -> str: cfg = load_imap_config() our_address = (cfg.get("filter_to") or "").strip().lower() if not our_address: our_address = (cfg.get("user") or "").strip().lower() out = get_thread_output(uid) blocks = parse_thread_blocks(out) thread_id = get_thread_id_from_uid(uid) log_path = threads_dir() / f"{thread_id}.md" received_blocks: list[dict[str, str]] = [] sent_blocks: list[dict[str, str]] = [] for b in blocks: from_ = (b.get("from") or "").lower() if our_address and our_address in from_: sent_blocks.append(b) else: received_blocks.append(b) existing_tickets = "" existing_commits = "" if log_path.exists(): content = log_path.read_text(encoding="utf-8") if "## Tickets (issues)" in content: idx = content.index("## Tickets (issues)") end = content.find("\n## ", idx + 1) if end == -1: end = len(content) existing_tickets = content[idx:end].strip() if "## Commits" in content: idx = content.index("## Commits") end = content.find("\n## ", idx + 1) if end == -1: end = len(content) existing_commits = content[idx:end].strip() lines = [ f"# Fil — {thread_id}", "", "## Échanges reçus", "", ] for b in received_blocks: lines.append(format_exchange_received(b)) lines.append("## Échanges envoyés") lines.append("") for b in sent_blocks: lines.append(format_exchange_sent(b)) if existing_tickets: lines.append(existing_tickets) lines.append("") else: lines.append("## Tickets (issues)") lines.append("") lines.append("(aucun)") lines.append("") if existing_commits: lines.append(existing_commits) lines.append("") else: lines.append("## Commits") lines.append("") lines.append("(aucun)") lines.append("") log_path.write_text("\n".join(lines), encoding="utf-8") return thread_id def append_sent( thread_id: str, to_addr: str, subject: str, body: str = "", date_str: str | None = None, ) -> None: if not date_str: date_str = datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S +0000") log_path = threads_dir() / f"{sanitize_thread_id(thread_id)}.md" block = { "date": date_str, "to": to_addr, "subject": subject, "body": body, } section = format_exchange_sent(block) if not log_path.exists(): log_path.write_text( f"# Fil — {thread_id}\n\n## Échanges reçus\n\n(aucun)\n\n" "## Échanges envoyés\n\n" + section + "\n## Tickets (issues)\n\n(aucun)\n\n## Commits\n\n(aucun)\n", encoding="utf-8", ) return content = log_path.read_text(encoding="utf-8") insert_marker = "## Échanges envoyés" idx = content.find(insert_marker) if idx == -1: content += "\n\n## Échanges envoyés\n\n" + section else: next_section = content.find("\n## ", idx + 1) if next_section == -1: content = content.rstrip() + "\n\n" + section else: content = ( content[:next_section].rstrip() + "\n\n" + section + content[next_section:] ) log_path.write_text(content, encoding="utf-8") def append_issue(thread_id: str, issue_num: str, title: str = "") -> None: gitea = load_gitea_config() base = f"{gitea['api_url'].replace('/api/v1', '')}/{gitea['owner']}/{gitea['repo']}/issues/{issue_num}" line = f"- #{issue_num}" + (f" — {title}" if title else "") + f" — <{base}>\n" log_path = threads_dir() / f"{sanitize_thread_id(thread_id)}.md" if not log_path.exists(): log_path.write_text( f"# Fil — {thread_id}\n\n## Échanges reçus\n\n(aucun)\n\n" "## Échanges envoyés\n\n(aucun)\n\n## Tickets (issues)\n\n" + line + "\n## Commits\n\n(aucun)\n", encoding="utf-8", ) return content = log_path.read_text(encoding="utf-8") marker = "## Tickets (issues)" idx = content.find(marker) if idx == -1: content += "\n\n" + marker + "\n\n" + line else: end = idx + len(marker) rest = content[end:] if "(aucun)" in rest.split("\n## ")[0]: content = content[:end] + "\n\n" + line + rest.replace("(aucun)\n", "", 1) else: content = content[:end] + "\n\n" + line + content[end:] log_path.write_text(content, encoding="utf-8") def append_commit( thread_id: str, commit_hash: str, message: str, branch: str = "", ) -> None: line = f"- `{commit_hash[:12]}`" if branch: line += f" ({branch})" line += f" — {message.strip()}\n" log_path = threads_dir() / f"{sanitize_thread_id(thread_id)}.md" if not log_path.exists(): log_path.write_text( f"# Fil — {thread_id}\n\n## Échanges reçus\n\n(aucun)\n\n" "## Échanges envoyés\n\n(aucun)\n\n## Tickets (issues)\n\n(aucun)\n\n## Commits\n\n" + line, encoding="utf-8", ) return content = log_path.read_text(encoding="utf-8") marker = "## Commits" idx = content.find(marker) if idx == -1: content += "\n\n" + marker + "\n\n" + line else: end = idx + len(marker) rest = content[end:] if "(aucun)" in rest.split("\n## ")[0]: content = content[:end] + "\n\n" + line + rest.replace("(aucun)\n", "", 1) else: content = content[:end] + "\n\n" + line + content[end:] log_path.write_text(content, encoding="utf-8") def main() -> int: ap = argparse.ArgumentParser(prog="mail-thread-log.py") sub = ap.add_subparsers(dest="cmd", required=True) p_get = sub.add_parser("get-id") p_get.add_argument("--uid", required=True, help="Mail UID") p_init = sub.add_parser("init") p_init.add_argument("--uid", required=True, help="Mail UID") p_sent = sub.add_parser("append-sent") p_sent.add_argument("--thread-id", required=True) p_sent.add_argument("--to", required=True, dest="to_addr") p_sent.add_argument("--subject", required=True) p_sent.add_argument("--body", default="") p_sent.add_argument("--date", default=None) p_issue = sub.add_parser("append-issue") p_issue.add_argument("--thread-id", required=True) p_issue.add_argument("--issue", required=True) p_issue.add_argument("--title", default="") p_commit = sub.add_parser("append-commit") p_commit.add_argument("--thread-id", required=True) p_commit.add_argument("--hash", required=True) p_commit.add_argument("--message", required=True) p_commit.add_argument("--branch", default="") args = ap.parse_args() if args.cmd == "get-id": tid = get_thread_id_from_uid(args.uid) print(f"THREAD_ID={tid}") return 0 if args.cmd == "init": tid = init_log(args.uid) print(f"THREAD_ID={tid}") return 0 if args.cmd == "append-sent": append_sent( args.thread_id, args.to_addr, args.subject, args.body, args.date, ) return 0 if args.cmd == "append-issue": append_issue(args.thread_id, args.issue, args.title) return 0 if args.cmd == "append-commit": append_commit(args.thread_id, args.hash, args.message, args.branch) return 0 return 1 if __name__ == "__main__": sys.exit(main())