# -*- coding: utf-8 -*- """ collatz_fusion_pipeline.py Pipeline de fusion F(t) sur un noyau donné. Charge le noyau JSON, appelle build_fusion_clauses pour chaque horizon, et fusionne les sorties en un seul CSV. CLI: --horizons 11,12,14 --palier 25 --input-noyau PATH --output CSV_PATH [--audit60 PATH] """ from __future__ import annotations from collections import Counter from pathlib import Path from typing import Iterator import argparse import csv import json import tempfile from collatz_k_fusion import build_fusion_clauses from collatz_k_pipeline import load_state_map_60 def load_noyau(path: str) -> list[int]: """Load noyau from JSON: list of residues or dict with R{palier}_after / noyau / residues.""" data = json.loads(Path(path).read_text(encoding="utf-8")) if isinstance(data, list): return [int(x) for x in data] if isinstance(data, dict): for key in ("R25_after", "R24_after", "noyau", "residues", "uncovered"): if key in data and isinstance(data[key], list): return [int(x) for x in data[key]] raise ValueError(f"Noyau JSON: no known key (R25_after, noyau, residues, uncovered) in {list(data.keys())}") raise ValueError("Noyau JSON must be a list or dict with residue list") def _stream_load_noyau_modulo(path: str, modulo: int) -> list[int]: """Stream-parse noyau JSON and return only residues with r % modulo == 0. Use for large files to avoid OOM.""" import ijson p = Path(path) if not p.exists(): raise FileNotFoundError(path) residues: list[int] = [] with p.open("rb") as f: for x in ijson.items(f, "noyau.item"): r = int(x) if r % modulo == 0: residues.append(r) return residues def _stream_load_noyau_modulo_chunked( path: str, modulo: int, chunk_size: int = 800_000 ) -> Iterator[list[int]]: """Stream-parse noyau JSON, yield chunks of residues with r % modulo == 0. Use for very large files to avoid OOM.""" import ijson p = Path(path) if not p.exists(): raise FileNotFoundError(path) chunk: list[int] = [] with p.open("rb") as f: for x in ijson.items(f, "noyau.item"): r = int(x) if r % modulo == 0: chunk.append(r) if len(chunk) >= chunk_size: yield chunk chunk = [] if chunk: yield chunk def _filter_residues_critique(residues: list[int], res_to_state: dict[int, int]) -> list[int]: """Filter residues to those in states with highest count (critical coverage).""" state_counts: Counter[int] = Counter() for r in residues: base = r % 4096 sid = res_to_state.get(base, 0) state_counts[sid] += 1 if not state_counts: return residues threshold = max(state_counts.values()) * 0.5 critical_states = {s for s, c in state_counts.items() if c >= threshold} return [r for r in residues if res_to_state.get(r % 4096, 0) in critical_states] def _run_fusion_chunked( input_noyau: str, modulo: int, horizons: list[int], palier: int, res_to_state: dict[int, int], state_mot7: dict[int, str], out_csv_path: Path, ) -> int: """Run fusion pipeline over streamed chunks; write rows directly to out_csv_path. Returns total row count. Used when noyau file is very large.""" fieldnames = ["horizon_t", "classe_mod_2^m", "m", "t", "a", "A_t", "mot_a0..", "C_t", "y", "y_mod_3", "DeltaF", "Nf", "preimage_m", "etat_id", "base_mod_4096"] total_rows = 0 with out_csv_path.open("w", newline="", encoding="utf-8") as out_f: writer = csv.DictWriter(out_f, fieldnames=fieldnames, extrasaction="ignore") writer.writeheader() for chunk in _stream_load_noyau_modulo_chunked(input_noyau, modulo): for t in horizons: with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f_csv: tmp_csv = f_csv.name with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f_md: tmp_md = f_md.name try: build_fusion_clauses( chunk, t, res_to_state, state_mot7, tmp_md, tmp_csv, palier, ) with Path(tmp_csv).open("r", encoding="utf-8") as f: if Path(tmp_csv).stat().st_size > 0: reader = csv.DictReader(f) for row in reader: row["horizon_t"] = t writer.writerow(row) total_rows += 1 finally: Path(tmp_csv).unlink(missing_ok=True) Path(tmp_md).unlink(missing_ok=True) return total_rows def run_fusion_pipeline( horizons: list[int], palier: int, input_noyau: str, output_csv: str, audit60_json: str, cible: str | None = None, modulo: int | None = None, ) -> None: input_path = Path(input_noyau) size_mb = input_path.stat().st_size / (1024 * 1024) if input_path.exists() else 0 if modulo is not None and size_mb > 500: if cible == "critique": raise ValueError("Chunked stream path does not support cible=critique (needs full residue set)") print(f"F16 chunked path: file {size_mb:.0f} MB, modulo {modulo}", flush=True) res_to_state, state_mot7 = load_state_map_60(audit60_json) print("F16 chunked path: state map loaded, starting stream chunks", flush=True) out_path = Path(output_csv) out_path.parent.mkdir(parents=True, exist_ok=True) total_rows = _run_fusion_chunked( input_noyau=input_noyau, modulo=modulo, horizons=horizons, palier=palier, res_to_state=res_to_state, state_mot7=state_mot7, out_csv_path=out_path, ) print(f"Stream-loaded noyau (modulo {modulo}), chunked: {total_rows} rows (file size {size_mb:.0f} MB)", flush=True) print(f"Wrote merged fusion CSV: {out_path} ({total_rows} rows)", flush=True) return if modulo is not None: residues = _stream_load_noyau_modulo(input_noyau, modulo) print(f"Stream-loaded noyau (modulo {modulo}): {len(residues)} residues (file size {size_mb:.0f} MB)", flush=True) else: residues = load_noyau(input_noyau) if modulo is not None: residues = [r for r in residues if r % modulo == 0] print(f"Modulo {modulo} filter: {len(residues)} residues", flush=True) res_to_state, state_mot7 = load_state_map_60(audit60_json) if cible == "critique": residues = _filter_residues_critique(residues, res_to_state) print(f"Cible critique filter: {len(residues)} residues", flush=True) out_path = Path(output_csv) out_path.parent.mkdir(parents=True, exist_ok=True) all_rows: list[dict] = [] for t in horizons: with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f_csv: tmp_csv = f_csv.name with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f_md: tmp_md = f_md.name try: build_fusion_clauses( residues, t, res_to_state, state_mot7, tmp_md, tmp_csv, palier, ) with Path(tmp_csv).open("r", encoding="utf-8") as f: if Path(tmp_csv).stat().st_size > 0: reader = csv.DictReader(f) for row in reader: row["horizon_t"] = t all_rows.append(row) finally: Path(tmp_csv).unlink(missing_ok=True) Path(tmp_md).unlink(missing_ok=True) with out_path.open("w", newline="", encoding="utf-8") as f: if all_rows: fieldnames = ["horizon_t"] + [k for k in all_rows[0].keys() if k != "horizon_t"] w = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore") w.writeheader() for row in all_rows: w.writerow(row) else: f.write("horizon_t,classe_mod_2^m,m,t,a,A_t,mot_a0..,C_t,y,y_mod_3,DeltaF,Nf,preimage_m,etat_id,base_mod_4096\n") print(f"Wrote merged fusion CSV: {out_path} ({len(all_rows)} rows)", flush=True) def main() -> None: ap = argparse.ArgumentParser(description="Fusion pipeline: build fusion clauses and merge to CSV") ap.add_argument("--horizons", required=True, help="Comma-separated horizons, e.g. 11,12,14") ap.add_argument("--palier", type=int, required=True, help="Modulus power (e.g. 25 for 2^25)") ap.add_argument("--input-noyau", required=True, help="Path to noyau JSON (list of residues or R*_after)") ap.add_argument("--output", required=True, help="Path to output merged CSV") ap.add_argument( "--audit60", default="audit_60_etats_B12_mod4096_horizon7.json", help="Path to audit 60 états JSON (residue_to_state, state_table)", ) ap.add_argument("--cible", help="Target filter, e.g. critique") ap.add_argument("--modulo", type=int, help="Filter residues by modulo (e.g. 9)") args = ap.parse_args() horizons = [int(h.strip()) for h in args.horizons.split(",")] run_fusion_pipeline( horizons=horizons, palier=args.palier, input_noyau=args.input_noyau, output_csv=args.output, audit60_json=args.audit60, cible=args.cible, modulo=args.modulo, ) if __name__ == "__main__": main()