**Motivations:** - Conserver l'état des scripts Collatz k, pipelines et démonstration - Documenter diagnostic D18/D21, errata, plan de preuve et correctif OOM paliers **Root causes:** - Consommation mémoire excessive (OOM) sur script paliers finale f16 **Correctifs:** - Documentation du crash OOM paliers finale f16 et pistes de correction **Evolutions:** - Évolutions des pipelines fusion/k, recover/update noyau, script 08-paliers-finale - Ajout de docs (diagnostic, errata, plan lemmes, fixKnowledge OOM) **Pages affectées:** - applications/collatz/collatz_k_scripts/*.py, note.md, requirements.txt - applications/collatz/collatz_k_scripts/*.md (diagnostic, errata, plan) - applications/collatz/scripts/08-paliers-finale.sh, README.md - docs/fixKnowledge/crash_paliers_finale_f16_oom.md
248 lines
9.7 KiB
Python
248 lines
9.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
collatz_fusion_pipeline.py
|
|
|
|
Pipeline de fusion F(t) sur un noyau donné.
|
|
Charge le noyau JSON, appelle build_fusion_clauses pour chaque horizon,
|
|
et fusionne les sorties en un seul CSV.
|
|
|
|
CLI: --horizons 11,12,14 --palier 25 --input-noyau PATH --output CSV_PATH [--audit60 PATH]
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import tempfile
|
|
|
|
from collatz_k_fusion import build_fusion_clauses
|
|
from collatz_k_pipeline import load_state_map_60
|
|
|
|
|
|
def load_noyau(path: str) -> list[int]:
|
|
"""Load noyau from JSON: list of residues or dict with R{palier}_after / noyau / residues."""
|
|
data = json.loads(Path(path).read_text(encoding="utf-8"))
|
|
if isinstance(data, list):
|
|
return [int(x) for x in data]
|
|
if isinstance(data, dict):
|
|
for key in ("R25_after", "R24_after", "noyau", "residues", "uncovered"):
|
|
if key in data and isinstance(data[key], list):
|
|
return [int(x) for x in data[key]]
|
|
raise ValueError(f"Noyau JSON: no known key (R25_after, noyau, residues, uncovered) in {list(data.keys())}")
|
|
raise ValueError("Noyau JSON must be a list or dict with residue list")
|
|
|
|
|
|
def _stream_load_noyau_modulo(path: str, modulo: int) -> list[int]:
|
|
"""Stream-parse noyau JSON and return only residues with r % modulo == 0. Use for large files to avoid OOM."""
|
|
import ijson
|
|
p = Path(path)
|
|
if not p.exists():
|
|
raise FileNotFoundError(path)
|
|
residues: list[int] = []
|
|
with p.open("rb") as f:
|
|
for x in ijson.items(f, "noyau.item"):
|
|
r = int(x)
|
|
if r % modulo == 0:
|
|
residues.append(r)
|
|
return residues
|
|
|
|
|
|
def _stream_load_noyau_modulo_chunked(
|
|
path: str, modulo: int, chunk_size: int = 800_000
|
|
) -> Iterator[list[int]]:
|
|
"""Stream-parse noyau JSON, yield chunks of residues with r % modulo == 0. Use for very large files to avoid OOM."""
|
|
import ijson
|
|
p = Path(path)
|
|
if not p.exists():
|
|
raise FileNotFoundError(path)
|
|
chunk: list[int] = []
|
|
with p.open("rb") as f:
|
|
for x in ijson.items(f, "noyau.item"):
|
|
r = int(x)
|
|
if r % modulo == 0:
|
|
chunk.append(r)
|
|
if len(chunk) >= chunk_size:
|
|
yield chunk
|
|
chunk = []
|
|
if chunk:
|
|
yield chunk
|
|
|
|
|
|
def _filter_residues_critique(residues: list[int], res_to_state: dict[int, int]) -> list[int]:
|
|
"""Filter residues to those in states with highest count (critical coverage)."""
|
|
state_counts: Counter[int] = Counter()
|
|
for r in residues:
|
|
base = r % 4096
|
|
sid = res_to_state.get(base, 0)
|
|
state_counts[sid] += 1
|
|
if not state_counts:
|
|
return residues
|
|
threshold = max(state_counts.values()) * 0.5
|
|
critical_states = {s for s, c in state_counts.items() if c >= threshold}
|
|
return [r for r in residues if res_to_state.get(r % 4096, 0) in critical_states]
|
|
|
|
|
|
def _run_fusion_chunked(
|
|
input_noyau: str,
|
|
modulo: int,
|
|
horizons: list[int],
|
|
palier: int,
|
|
res_to_state: dict[int, int],
|
|
state_mot7: dict[int, str],
|
|
out_csv_path: Path,
|
|
) -> int:
|
|
"""Run fusion pipeline over streamed chunks; write rows directly to out_csv_path. Returns total row count. Used when noyau file is very large."""
|
|
fieldnames = ["horizon_t", "classe_mod_2^m", "m", "t", "a", "A_t", "mot_a0..", "C_t", "y", "y_mod_3", "DeltaF", "Nf", "preimage_m", "etat_id", "base_mod_4096"]
|
|
total_rows = 0
|
|
with out_csv_path.open("w", newline="", encoding="utf-8") as out_f:
|
|
writer = csv.DictWriter(out_f, fieldnames=fieldnames, extrasaction="ignore")
|
|
writer.writeheader()
|
|
for chunk in _stream_load_noyau_modulo_chunked(input_noyau, modulo):
|
|
for t in horizons:
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f_csv:
|
|
tmp_csv = f_csv.name
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f_md:
|
|
tmp_md = f_md.name
|
|
try:
|
|
build_fusion_clauses(
|
|
chunk,
|
|
t,
|
|
res_to_state,
|
|
state_mot7,
|
|
tmp_md,
|
|
tmp_csv,
|
|
palier,
|
|
)
|
|
with Path(tmp_csv).open("r", encoding="utf-8") as f:
|
|
if Path(tmp_csv).stat().st_size > 0:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
row["horizon_t"] = t
|
|
writer.writerow(row)
|
|
total_rows += 1
|
|
finally:
|
|
Path(tmp_csv).unlink(missing_ok=True)
|
|
Path(tmp_md).unlink(missing_ok=True)
|
|
return total_rows
|
|
|
|
|
|
def run_fusion_pipeline(
|
|
horizons: list[int],
|
|
palier: int,
|
|
input_noyau: str,
|
|
output_csv: str,
|
|
audit60_json: str,
|
|
cible: str | None = None,
|
|
modulo: int | None = None,
|
|
) -> None:
|
|
input_path = Path(input_noyau)
|
|
size_mb = input_path.stat().st_size / (1024 * 1024) if input_path.exists() else 0
|
|
if modulo is not None and size_mb > 500:
|
|
if cible == "critique":
|
|
raise ValueError("Chunked stream path does not support cible=critique (needs full residue set)")
|
|
print(f"F16 chunked path: file {size_mb:.0f} MB, modulo {modulo}", flush=True)
|
|
res_to_state, state_mot7 = load_state_map_60(audit60_json)
|
|
print("F16 chunked path: state map loaded, starting stream chunks", flush=True)
|
|
out_path = Path(output_csv)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
total_rows = _run_fusion_chunked(
|
|
input_noyau=input_noyau,
|
|
modulo=modulo,
|
|
horizons=horizons,
|
|
palier=palier,
|
|
res_to_state=res_to_state,
|
|
state_mot7=state_mot7,
|
|
out_csv_path=out_path,
|
|
)
|
|
print(f"Stream-loaded noyau (modulo {modulo}), chunked: {total_rows} rows (file size {size_mb:.0f} MB)", flush=True)
|
|
print(f"Wrote merged fusion CSV: {out_path} ({total_rows} rows)", flush=True)
|
|
return
|
|
if modulo is not None:
|
|
residues = _stream_load_noyau_modulo(input_noyau, modulo)
|
|
print(f"Stream-loaded noyau (modulo {modulo}): {len(residues)} residues (file size {size_mb:.0f} MB)", flush=True)
|
|
else:
|
|
residues = load_noyau(input_noyau)
|
|
if modulo is not None:
|
|
residues = [r for r in residues if r % modulo == 0]
|
|
print(f"Modulo {modulo} filter: {len(residues)} residues", flush=True)
|
|
res_to_state, state_mot7 = load_state_map_60(audit60_json)
|
|
if cible == "critique":
|
|
residues = _filter_residues_critique(residues, res_to_state)
|
|
print(f"Cible critique filter: {len(residues)} residues", flush=True)
|
|
|
|
out_path = Path(output_csv)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
all_rows: list[dict] = []
|
|
for t in horizons:
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f_csv:
|
|
tmp_csv = f_csv.name
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f_md:
|
|
tmp_md = f_md.name
|
|
|
|
try:
|
|
build_fusion_clauses(
|
|
residues,
|
|
t,
|
|
res_to_state,
|
|
state_mot7,
|
|
tmp_md,
|
|
tmp_csv,
|
|
palier,
|
|
)
|
|
with Path(tmp_csv).open("r", encoding="utf-8") as f:
|
|
if Path(tmp_csv).stat().st_size > 0:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
row["horizon_t"] = t
|
|
all_rows.append(row)
|
|
finally:
|
|
Path(tmp_csv).unlink(missing_ok=True)
|
|
Path(tmp_md).unlink(missing_ok=True)
|
|
|
|
with out_path.open("w", newline="", encoding="utf-8") as f:
|
|
if all_rows:
|
|
fieldnames = ["horizon_t"] + [k for k in all_rows[0].keys() if k != "horizon_t"]
|
|
w = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
|
|
w.writeheader()
|
|
for row in all_rows:
|
|
w.writerow(row)
|
|
else:
|
|
f.write("horizon_t,classe_mod_2^m,m,t,a,A_t,mot_a0..,C_t,y,y_mod_3,DeltaF,Nf,preimage_m,etat_id,base_mod_4096\n")
|
|
|
|
print(f"Wrote merged fusion CSV: {out_path} ({len(all_rows)} rows)", flush=True)
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description="Fusion pipeline: build fusion clauses and merge to CSV")
|
|
ap.add_argument("--horizons", required=True, help="Comma-separated horizons, e.g. 11,12,14")
|
|
ap.add_argument("--palier", type=int, required=True, help="Modulus power (e.g. 25 for 2^25)")
|
|
ap.add_argument("--input-noyau", required=True, help="Path to noyau JSON (list of residues or R*_after)")
|
|
ap.add_argument("--output", required=True, help="Path to output merged CSV")
|
|
ap.add_argument(
|
|
"--audit60",
|
|
default="audit_60_etats_B12_mod4096_horizon7.json",
|
|
help="Path to audit 60 états JSON (residue_to_state, state_table)",
|
|
)
|
|
ap.add_argument("--cible", help="Target filter, e.g. critique")
|
|
ap.add_argument("--modulo", type=int, help="Filter residues by modulo (e.g. 9)")
|
|
args = ap.parse_args()
|
|
|
|
horizons = [int(h.strip()) for h in args.horizons.split(",")]
|
|
run_fusion_pipeline(
|
|
horizons=horizons,
|
|
palier=args.palier,
|
|
input_noyau=args.input_noyau,
|
|
output_csv=args.output,
|
|
audit60_json=args.audit60,
|
|
cible=args.cible,
|
|
modulo=args.modulo,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|