algo/applications/collatz/collatz_k_scripts/collatz_fusion_pipeline.py
ncantu f05f2380ff Collatz: pipelines, scripts paliers, docs et fixKnowledge
**Motivations:**
- Conserver l'état des scripts Collatz k, pipelines et démonstration
- Documenter diagnostic D18/D21, errata, plan de preuve et correctif OOM paliers

**Root causes:**
- Consommation mémoire excessive (OOM) sur script paliers finale f16

**Correctifs:**
- Documentation du crash OOM paliers finale f16 et pistes de correction

**Evolutions:**
- Évolutions des pipelines fusion/k, recover/update noyau, script 08-paliers-finale
- Ajout de docs (diagnostic, errata, plan lemmes, fixKnowledge OOM)

**Pages affectées:**
- applications/collatz/collatz_k_scripts/*.py, note.md, requirements.txt
- applications/collatz/collatz_k_scripts/*.md (diagnostic, errata, plan)
- applications/collatz/scripts/08-paliers-finale.sh, README.md
- docs/fixKnowledge/crash_paliers_finale_f16_oom.md
2026-03-04 17:19:50 +01:00

248 lines
9.7 KiB
Python

# -*- coding: utf-8 -*-
"""
collatz_fusion_pipeline.py
Pipeline de fusion F(t) sur un noyau donné.
Charge le noyau JSON, appelle build_fusion_clauses pour chaque horizon,
et fusionne les sorties en un seul CSV.
CLI: --horizons 11,12,14 --palier 25 --input-noyau PATH --output CSV_PATH [--audit60 PATH]
"""
from __future__ import annotations
from collections import Counter
from pathlib import Path
from typing import Iterator
import argparse
import csv
import json
import tempfile
from collatz_k_fusion import build_fusion_clauses
from collatz_k_pipeline import load_state_map_60
def load_noyau(path: str) -> list[int]:
"""Load noyau from JSON: list of residues or dict with R{palier}_after / noyau / residues."""
data = json.loads(Path(path).read_text(encoding="utf-8"))
if isinstance(data, list):
return [int(x) for x in data]
if isinstance(data, dict):
for key in ("R25_after", "R24_after", "noyau", "residues", "uncovered"):
if key in data and isinstance(data[key], list):
return [int(x) for x in data[key]]
raise ValueError(f"Noyau JSON: no known key (R25_after, noyau, residues, uncovered) in {list(data.keys())}")
raise ValueError("Noyau JSON must be a list or dict with residue list")
def _stream_load_noyau_modulo(path: str, modulo: int) -> list[int]:
"""Stream-parse noyau JSON and return only residues with r % modulo == 0. Use for large files to avoid OOM."""
import ijson
p = Path(path)
if not p.exists():
raise FileNotFoundError(path)
residues: list[int] = []
with p.open("rb") as f:
for x in ijson.items(f, "noyau.item"):
r = int(x)
if r % modulo == 0:
residues.append(r)
return residues
def _stream_load_noyau_modulo_chunked(
path: str, modulo: int, chunk_size: int = 800_000
) -> Iterator[list[int]]:
"""Stream-parse noyau JSON, yield chunks of residues with r % modulo == 0. Use for very large files to avoid OOM."""
import ijson
p = Path(path)
if not p.exists():
raise FileNotFoundError(path)
chunk: list[int] = []
with p.open("rb") as f:
for x in ijson.items(f, "noyau.item"):
r = int(x)
if r % modulo == 0:
chunk.append(r)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
def _filter_residues_critique(residues: list[int], res_to_state: dict[int, int]) -> list[int]:
"""Filter residues to those in states with highest count (critical coverage)."""
state_counts: Counter[int] = Counter()
for r in residues:
base = r % 4096
sid = res_to_state.get(base, 0)
state_counts[sid] += 1
if not state_counts:
return residues
threshold = max(state_counts.values()) * 0.5
critical_states = {s for s, c in state_counts.items() if c >= threshold}
return [r for r in residues if res_to_state.get(r % 4096, 0) in critical_states]
def _run_fusion_chunked(
input_noyau: str,
modulo: int,
horizons: list[int],
palier: int,
res_to_state: dict[int, int],
state_mot7: dict[int, str],
out_csv_path: Path,
) -> int:
"""Run fusion pipeline over streamed chunks; write rows directly to out_csv_path. Returns total row count. Used when noyau file is very large."""
fieldnames = ["horizon_t", "classe_mod_2^m", "m", "t", "a", "A_t", "mot_a0..", "C_t", "y", "y_mod_3", "DeltaF", "Nf", "preimage_m", "etat_id", "base_mod_4096"]
total_rows = 0
with out_csv_path.open("w", newline="", encoding="utf-8") as out_f:
writer = csv.DictWriter(out_f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
for chunk in _stream_load_noyau_modulo_chunked(input_noyau, modulo):
for t in horizons:
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f_csv:
tmp_csv = f_csv.name
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f_md:
tmp_md = f_md.name
try:
build_fusion_clauses(
chunk,
t,
res_to_state,
state_mot7,
tmp_md,
tmp_csv,
palier,
)
with Path(tmp_csv).open("r", encoding="utf-8") as f:
if Path(tmp_csv).stat().st_size > 0:
reader = csv.DictReader(f)
for row in reader:
row["horizon_t"] = t
writer.writerow(row)
total_rows += 1
finally:
Path(tmp_csv).unlink(missing_ok=True)
Path(tmp_md).unlink(missing_ok=True)
return total_rows
def run_fusion_pipeline(
horizons: list[int],
palier: int,
input_noyau: str,
output_csv: str,
audit60_json: str,
cible: str | None = None,
modulo: int | None = None,
) -> None:
input_path = Path(input_noyau)
size_mb = input_path.stat().st_size / (1024 * 1024) if input_path.exists() else 0
if modulo is not None and size_mb > 500:
if cible == "critique":
raise ValueError("Chunked stream path does not support cible=critique (needs full residue set)")
print(f"F16 chunked path: file {size_mb:.0f} MB, modulo {modulo}", flush=True)
res_to_state, state_mot7 = load_state_map_60(audit60_json)
print("F16 chunked path: state map loaded, starting stream chunks", flush=True)
out_path = Path(output_csv)
out_path.parent.mkdir(parents=True, exist_ok=True)
total_rows = _run_fusion_chunked(
input_noyau=input_noyau,
modulo=modulo,
horizons=horizons,
palier=palier,
res_to_state=res_to_state,
state_mot7=state_mot7,
out_csv_path=out_path,
)
print(f"Stream-loaded noyau (modulo {modulo}), chunked: {total_rows} rows (file size {size_mb:.0f} MB)", flush=True)
print(f"Wrote merged fusion CSV: {out_path} ({total_rows} rows)", flush=True)
return
if modulo is not None:
residues = _stream_load_noyau_modulo(input_noyau, modulo)
print(f"Stream-loaded noyau (modulo {modulo}): {len(residues)} residues (file size {size_mb:.0f} MB)", flush=True)
else:
residues = load_noyau(input_noyau)
if modulo is not None:
residues = [r for r in residues if r % modulo == 0]
print(f"Modulo {modulo} filter: {len(residues)} residues", flush=True)
res_to_state, state_mot7 = load_state_map_60(audit60_json)
if cible == "critique":
residues = _filter_residues_critique(residues, res_to_state)
print(f"Cible critique filter: {len(residues)} residues", flush=True)
out_path = Path(output_csv)
out_path.parent.mkdir(parents=True, exist_ok=True)
all_rows: list[dict] = []
for t in horizons:
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f_csv:
tmp_csv = f_csv.name
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f_md:
tmp_md = f_md.name
try:
build_fusion_clauses(
residues,
t,
res_to_state,
state_mot7,
tmp_md,
tmp_csv,
palier,
)
with Path(tmp_csv).open("r", encoding="utf-8") as f:
if Path(tmp_csv).stat().st_size > 0:
reader = csv.DictReader(f)
for row in reader:
row["horizon_t"] = t
all_rows.append(row)
finally:
Path(tmp_csv).unlink(missing_ok=True)
Path(tmp_md).unlink(missing_ok=True)
with out_path.open("w", newline="", encoding="utf-8") as f:
if all_rows:
fieldnames = ["horizon_t"] + [k for k in all_rows[0].keys() if k != "horizon_t"]
w = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
w.writeheader()
for row in all_rows:
w.writerow(row)
else:
f.write("horizon_t,classe_mod_2^m,m,t,a,A_t,mot_a0..,C_t,y,y_mod_3,DeltaF,Nf,preimage_m,etat_id,base_mod_4096\n")
print(f"Wrote merged fusion CSV: {out_path} ({len(all_rows)} rows)", flush=True)
def main() -> None:
ap = argparse.ArgumentParser(description="Fusion pipeline: build fusion clauses and merge to CSV")
ap.add_argument("--horizons", required=True, help="Comma-separated horizons, e.g. 11,12,14")
ap.add_argument("--palier", type=int, required=True, help="Modulus power (e.g. 25 for 2^25)")
ap.add_argument("--input-noyau", required=True, help="Path to noyau JSON (list of residues or R*_after)")
ap.add_argument("--output", required=True, help="Path to output merged CSV")
ap.add_argument(
"--audit60",
default="audit_60_etats_B12_mod4096_horizon7.json",
help="Path to audit 60 états JSON (residue_to_state, state_table)",
)
ap.add_argument("--cible", help="Target filter, e.g. critique")
ap.add_argument("--modulo", type=int, help="Filter residues by modulo (e.g. 9)")
args = ap.parse_args()
horizons = [int(h.strip()) for h in args.horizons.split(",")]
run_fusion_pipeline(
horizons=horizons,
palier=args.palier,
input_noyau=args.input_noyau,
output_csv=args.output,
audit60_json=args.audit60,
cible=args.cible,
modulo=args.modulo,
)
if __name__ == "__main__":
main()