ia_dev/gitea-issues/mail-get-thread.py
2026-03-15 12:51:16 +01:00

209 lines
6.5 KiB
Python

#!/usr/bin/env python3
"""
Fetch the full email thread (conversation) for a given message UID.
Uses Message-ID, References and In-Reply-To to find all messages in the thread.
Output format: same as mail-list-unread (--- MAIL UID=... --- ... --- END MAIL ---), chronological order.
Usage: mail-get-thread.py <uid>
or: ./gitea-issues/mail-get-thread.sh <uid>
"""
from __future__ import annotations
import email
import imaplib
import re
import sys
from email.header import decode_header
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from mail_common import imap_since_date, load_imap_config, repo_root, imap_ssl_context
def decode_header_value(header: str | None) -> str:
if not header:
return ""
parts = decode_header(header)
result = []
for part, charset in parts:
if isinstance(part, bytes):
result.append(part.decode(charset or "utf-8", errors="replace"))
else:
result.append(part)
return "".join(result)
def get_text_body(msg: email.message.Message) -> str:
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if payload:
return payload.decode(
part.get_content_charset() or "utf-8", errors="replace"
)
return ""
payload = msg.get_payload(decode=True)
if not payload:
return ""
return payload.decode(
msg.get_content_charset() or "utf-8", errors="replace"
)
def parse_message_ids(refs: str | None, in_reply_to: str | None) -> set[str]:
"""Extract Message-ID values from References and In-Reply-To headers."""
ids: set[str] = set()
for raw in (refs or "", in_reply_to or ""):
for part in re.split(r"\s+", raw.strip()):
part = part.strip()
if part.startswith("<") and ">" in part:
ids.add(part)
elif part and "@" in part and part not in ("<", ">"):
ids.add(part if part.startswith("<") else f"<{part}>")
return ids
def find_message_ids_from_msg(msg: email.message.Message) -> set[str]:
mid = (msg.get("Message-ID") or "").strip()
refs = (msg.get("References") or "").strip()
in_reply = (msg.get("In-Reply-To") or "").strip()
ids = {mid} if mid else set()
ids |= parse_message_ids(refs, in_reply)
return ids
def search_by_message_id(mail: imaplib.IMAP4, msg_id: str) -> list[str]:
"""Return list of UIDs (as strings) for messages with given Message-ID, on or after MAIL_SINCE_DATE."""
if not msg_id:
return []
if not msg_id.startswith("<"):
msg_id = f"<{msg_id}>"
if not msg_id.endswith(">"):
msg_id = msg_id + ">"
since = imap_since_date()
criterion = f'(HEADER Message-ID "{msg_id}" SINCE {since})'
try:
_, data = mail.search(None, criterion)
except Exception:
return []
if not data or not data[0]:
return []
return [u.decode("ascii") for u in data[0].split() if u]
def fetch_message_by_uid(
mail: imaplib.IMAP4, uid: str
) -> email.message.Message | None:
"""Fetch a single message by UID. Returns parsed email or None."""
try:
_, data = mail.fetch(uid.encode("ascii"), "(RFC822)")
except Exception:
return None
if not data or not data[0] or len(data[0]) < 2:
return None
raw = data[0][1]
if isinstance(raw, bytes):
return email.message_from_bytes(raw)
return None
def format_message(uid: str, msg: email.message.Message) -> str:
mid = (msg.get("Message-ID") or "").strip()
from_ = decode_header_value(msg.get("From"))
to_ = decode_header_value(msg.get("To"))
subj = decode_header_value(msg.get("Subject"))
date_h = decode_header_value(msg.get("Date"))
body = get_text_body(msg)
lines = [
"--- MAIL",
f"UID={uid}",
"---",
"Message-ID: " + (mid or "(none)"),
"From: " + from_,
"To: " + (to_ or ""),
"Subject: " + subj,
"Date: " + (date_h or ""),
"Body:",
body or "(empty)",
"--- END MAIL ---",
]
return "\n".join(lines)
def main() -> int:
if len(sys.argv) < 2:
print("Usage: mail-get-thread.py <uid>", file=sys.stderr)
return 1
uid0 = sys.argv[1].strip()
if not uid0:
print("[gitea-issues] ERROR: UID required.", file=sys.stderr)
return 1
cfg = load_imap_config()
if not cfg["user"] or not cfg["password"]:
root = repo_root()
env_path = root / ".secrets" / "gitea-issues" / "imap-bridge.env"
print(
"[gitea-issues] ERROR: IMAP_USER and IMAP_PASSWORD required.",
file=sys.stderr,
)
print(f"[gitea-issues] Set env or create {env_path}", file=sys.stderr)
return 1
mail = imaplib.IMAP4(cfg["host"], int(cfg["port"]))
if cfg["use_starttls"]:
mail.starttls(imap_ssl_context(cfg.get("ssl_verify", True)))
mail.login(cfg["user"], cfg["password"])
mail.select("INBOX")
msg0 = fetch_message_by_uid(mail, uid0)
if not msg0:
print(f"[gitea-issues] No message found for UID={uid0}.", file=sys.stderr)
mail.logout()
return 1
to_fetch: set[str] = find_message_ids_from_msg(msg0)
seen_ids: set[str] = set()
uids_by_mid: dict[str, str] = {}
while to_fetch:
mid = to_fetch.pop()
if not mid or mid in seen_ids:
continue
seen_ids.add(mid)
uids = search_by_message_id(mail, mid)
if uids:
uids_by_mid[mid] = uids[0]
msg = fetch_message_by_uid(mail, uids[0])
if msg:
to_fetch |= find_message_ids_from_msg(msg)
mid0 = (msg0.get("Message-ID") or "").strip()
if mid0 and mid0 not in uids_by_mid:
uids_by_mid[mid0] = uid0
collected: list[tuple[str, str, email.message.Message]] = []
for _mid, uid in uids_by_mid.items():
msg = fetch_message_by_uid(mail, uid)
if not msg:
continue
date_h = (msg.get("Date") or "").strip()
collected.append((date_h, uid, msg))
if uid0 not in uids_by_mid.values():
date0 = (msg0.get("Date") or "").strip()
collected.append((date0, uid0, msg0))
collected.sort(key=lambda x: x[0])
for _date, uid, msg in collected:
print(format_message(uid, msg))
mail.logout()
return 0
if __name__ == "__main__":
sys.exit(main())