algo/applications/collatz/collatz_k_scripts/collatz_k_pipeline.py
ncantu f05f2380ff Collatz: pipelines, scripts paliers, docs et fixKnowledge
**Motivations:**
- Conserver l'état des scripts Collatz k, pipelines et démonstration
- Documenter diagnostic D18/D21, errata, plan de preuve et correctif OOM paliers

**Root causes:**
- Consommation mémoire excessive (OOM) sur script paliers finale f16

**Correctifs:**
- Documentation du crash OOM paliers finale f16 et pistes de correction

**Evolutions:**
- Évolutions des pipelines fusion/k, recover/update noyau, script 08-paliers-finale
- Ajout de docs (diagnostic, errata, plan lemmes, fixKnowledge OOM)

**Pages affectées:**
- applications/collatz/collatz_k_scripts/*.py, note.md, requirements.txt
- applications/collatz/collatz_k_scripts/*.md (diagnostic, errata, plan)
- applications/collatz/scripts/08-paliers-finale.sh, README.md
- docs/fixKnowledge/crash_paliers_finale_f16_oom.md
2026-03-04 17:19:50 +01:00

814 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
collatz_k_pipeline.py
Pipeline principale (reproduction des audits "après fusion" et des paquets D16/D17).
Entrées attendues:
- audit_60_etats_B12_mod4096_horizon7.json
- complétion_minorée_m15_vers_m16.md
- candidats_D10_palier2p17.md
Sorties:
- audits Markdown
- CSV exhaustifs
- Markdown listes exhaustives (bloc ```csv```)
"""
from __future__ import annotations
from pathlib import Path
import csv
import json
import re
import sys
import tempfile
import time
from collections import Counter
from typing import List, Set, Dict, Tuple, Iterable, Optional
from collatz_k_core import A_k, prefix_data, N0_D
from collatz_k_utils import parse_markdown_table_to_rows, write_text
from collatz_k_fusion import build_fusion_clauses
# When set by run_extended_D18_to_D21, steps log to this file (flush after each line).
_pipeline_log_path: Optional[Path] = None
_original_excepthook: Optional[object] = None
def _get_memory_str() -> str:
"""Return max RSS in MB (Unix). Empty string if unavailable."""
try:
import resource
ru = resource.getrusage(resource.RUSAGE_SELF)
rss = getattr(ru, "ru_maxrss", 0)
if not rss:
return ""
if sys.platform == "darwin":
rss_mb = rss / (1024 * 1024)
else:
rss_mb = rss / 1024
return f"rss_max_mb={rss_mb:.0f}"
except (ImportError, OSError, AttributeError):
return ""
def _log_step(msg: str, out_dir: Optional[Path] = None, memory: bool = False) -> None:
"""Log to stderr and optionally to pipeline log file. Flush so crash leaves trace."""
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
if memory:
mem = _get_memory_str()
if mem:
msg = f"{msg} {mem}"
line = f"[{ts}] {msg}"
print(line, flush=True)
path = out_dir if isinstance(out_dir, Path) else _pipeline_log_path
if path is not None:
try:
with path.open("a", encoding="utf-8") as f:
f.write(line + "\n")
f.flush()
except OSError:
pass
def load_state_map_60(audit60_json_path: str) -> Tuple[Dict[int, int], Dict[int, str]]:
import json
data = json.loads(Path(audit60_json_path).read_text(encoding="utf-8"))
res_to_state = {int(k): int(v) for k, v in data["residue_to_state"].items()}
state_mot7 = {}
for row in data["state_table"]:
state_mot7[int(row["État"])] = row["Mot (a0..a6)"]
return res_to_state, state_mot7
def build_R17_from_completion(completion_m15_to_m16_md: str) -> List[int]:
"""Build R17 from completion (Parents both) without D10 subtraction."""
text = Path(completion_m15_to_m16_md).read_text(encoding="utf-8")
m = re.search(r"### Parents « both ».*?\n(.*)\Z", text, flags=re.S)
if not m:
raise ValueError("Section 'Parents both' introuvable")
B15 = sorted(set(map(int, re.findall(r"\b\d+\b", m.group(1)))))
shift15 = 1 << 15
shift16 = 1 << 16
R16 = set(B15) | {p + shift15 for p in B15}
R17 = set(R16) | {x + shift16 for x in R16}
return sorted(R17)
def rebuild_R17_after_full_D10(completion_m15_to_m16_md: str, candidats_D10_md: str) -> List[int]:
text = Path(completion_m15_to_m16_md).read_text(encoding="utf-8")
m = re.search(r"### Parents « both ».*?\n(.*)\Z", text, flags=re.S)
if not m:
raise ValueError("Section 'Parents both' introuvable")
B15 = sorted(set(map(int, re.findall(r"\b\d+\b", m.group(1)))))
shift15 = 1 << 15
shift16 = 1 << 16
R16 = set(B15) | {p + shift15 for p in B15}
R17 = set(R16) | {x + shift16 for x in R16}
rows = parse_markdown_table_to_rows(candidats_D10_md)
cover175: Set[int] = set()
for parts in rows[2:]:
low = int(parts[0])
high = int(parts[1])
cover175.add(low)
cover175.add(high)
R17_after_175 = R17 - cover175
A10_16_high = [x for x in R17_after_175 if A_k(x, 10) == 16]
cover171: Set[int] = set()
for x in A10_16_high:
cover171.add(x)
cover171.add(x - shift16)
return sorted(R17_after_175 - cover171)
def lift_set(residues: Iterable[int], shift: int, count: int) -> List[int]:
out: List[int] = []
for r in residues:
for j in range(count):
out.append(r + j * shift)
return out
def csv_to_md_list(csv_path: str, md_path: str, title: str, intro: str) -> None:
p_csv = Path(csv_path)
p_md = Path(md_path)
with p_csv.open("r", encoding="utf-8") as fin, p_md.open("w", encoding="utf-8") as fout:
fout.write(f"# {title}\n\n")
fout.write("## Introduction\n\n")
fout.write(intro.strip() + "\n\n")
fout.write("## Liste exhaustive\n\n")
fout.write("```csv\n")
last = ""
for line in fin:
fout.write(line)
last = line
if last and not last.endswith("\n"):
fout.write("\n")
fout.write("```\n")
def run_after_fusion_D16_D17(
audit60_json: str,
completion_m15_to_m16_md: str,
candidats_D10_md: str,
out_dir: str,
) -> None:
Path(out_dir).mkdir(parents=True, exist_ok=True)
res_to_state, state_mot7 = load_state_map_60(audit60_json)
# R17 après D10 complet
R17_after_full = rebuild_R17_after_full_D10(completion_m15_to_m16_md, candidats_D10_md)
# D11 (2^19)
shift17 = 1 << 17
shift18 = 1 << 18
R19 = lift_set(R17_after_full, shift17, 4)
cand11 = set([n for n in R19 if A_k(n, 11) == 18])
cover11 = cand11 | {n ^ shift18 for n in cand11}
R19_after = [n for n in R19 if n not in cover11]
# D12 (2^21)
shift19 = 1 << 19
shift20 = 1 << 20
R21 = lift_set(R19_after, shift19, 4)
cand12 = set([n for n in R21 if A_k(n, 12) == 20])
cover12 = cand12 | {n ^ shift20 for n in cand12}
R21_after = [n for n in R21 if n not in cover12]
# D13 (2^22)
shift21 = 1 << 21
R22 = list(R21_after) + [n + shift21 for n in R21_after]
cand13 = set([n for n in R22 if A_k(n, 13) == 21])
cover13 = cand13 | {n ^ shift21 for n in cand13}
R22_after = [n for n in R22 if n not in cover13]
# D14 (2^24)
shift22 = 1 << 22
shift23 = 1 << 23
R24 = lift_set(R22_after, shift22, 4)
cand14 = set([n for n in R24 if A_k(n, 14) == 23])
cover14 = cand14 | {n ^ shift23 for n in cand14}
R24_after = [n for n in R24 if n not in cover14]
# D15 (2^25)
shift24 = 1 << 24
R25 = list(R24_after) + [n + shift24 for n in R24_after]
cand15 = set([n for n in R25 if A_k(n, 15) == 24])
cover15 = cand15 | {n ^ shift24 for n in cand15}
R25_after = sorted([n for n in R25 if n not in cover15])
# Fusion (t=11,12,14) au palier 2^25
md_f11 = str(Path(out_dir) / "fusion_t11_palier2p25.md")
csv_f11 = str(Path(out_dir) / "fusion_t11_palier2p25.csv")
md_f12 = str(Path(out_dir) / "fusion_t12_palier2p25.md")
csv_f12 = str(Path(out_dir) / "fusion_t12_palier2p25.csv")
md_f14 = str(Path(out_dir) / "fusion_t14_palier2p25.md")
csv_f14 = str(Path(out_dir) / "fusion_t14_palier2p25.csv")
build_fusion_clauses(R25_after, 11, res_to_state, state_mot7, md_f11, csv_f11, 25)
build_fusion_clauses(R25_after, 12, res_to_state, state_mot7, md_f12, csv_f12, 25)
build_fusion_clauses(R25_after, 14, res_to_state, state_mot7, md_f14, csv_f14, 25)
def load_hitset(csv_path: str) -> Set[int]:
hs: Set[int] = set()
p = Path(csv_path)
if p.stat().st_size == 0:
return hs
with p.open("r", encoding="utf-8") as f:
r = csv.DictReader(f)
for row in r:
hs.add(int(row["classe_mod_2^m"]))
return hs
unionF = load_hitset(csv_f11) | load_hitset(csv_f12) | load_hitset(csv_f14)
R25_after_F = [n for n in R25_after if n not in unionF]
# D16 après fusion (2^27)
shift25 = 1 << 25
shift26 = 1 << 26
k16 = 16
A16_target = 26
cand_D16: Set[int] = set()
for r in R25_after_F:
for j in range(4):
n = r + j * shift25
if A_k(n, k16) == A16_target:
cand_D16.add(n)
cover_D16 = cand_D16 | {n ^ shift26 for n in cand_D16}
delta16 = (1 << 26) - (3**16)
N0_dist = Counter()
csv_d16 = str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27.csv")
with Path(csv_d16).open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["classe_mod_2^27", "sœur", "mot_a0..a15", "A16", "C16", "delta", "N0", "U^16(n)", "etat_id", "base_mod_4096"])
for n in sorted(cand_D16):
pref = prefix_data(n, 16)
N0 = N0_D(pref.C, pref.A, 16)
N0_dist[N0] += 1
w.writerow([n, n ^ shift26, " ".join(map(str, pref.word)), pref.A, pref.C, delta16, N0, pref.y, res_to_state[n % 4096], n % 4096])
# audit D16 (minimal)
maxA16_after = 0
for r in R25_after_F:
for j in range(4):
n = r + j * shift25
if n in cover_D16:
continue
A = A_k(n, k16)
if A > maxA16_after:
maxA16_after = A
md_d16 = str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27_et_impact.md")
write_text(
md_d16,
"\n".join(
[
"# Paquet D16 minimal après fusion (palier 2^27)",
"",
"## Introduction",
"",
"Audit D16 sur le noyau au palier 2^25 après fusion F(11)F(12)F(14).",
"",
"## Tailles",
"",
f"- noyau après D15 : {len(R25_after)}",
f"- noyau après fusion : {len(R25_after_F)}",
f"- relèvements 2^27 : {4 * len(R25_after_F)}",
f"- candidats D16 : {len(cand_D16)}",
f"- couverture (avec sœurs) : {len(cover_D16)}",
f"- invariant max A16 après : {maxA16_after}",
"",
"## CSV exhaustif",
"",
f"- {Path(csv_d16).name}",
"",
]
)
+ "\n",
)
csv_to_md_list(
csv_d16,
str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27_liste_exhaustive.md"),
"Liste exhaustive des clauses D16 après fusion (palier 2^27)",
"Liste exhaustive (format CSV copiable).",
)
# D17 après fusion et D16 (2^28)
shift27 = 1 << 27
k17 = 17
A17_target = 27
pair_low_set: Set[int] = set()
for r in R25_after_F:
for j in range(4):
low = r + j * shift25
if low in cover_D16:
continue
if A_k(low, k17) == A17_target or A_k(low + shift27, k17) == A17_target:
pair_low_set.add(low)
delta17 = (1 << 27) - (3**17)
csv_d17 = str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28.csv")
with Path(csv_d17).open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["classe_mod_2^28", "sœur", "côté", "mot_a0..a16", "A17", "C17", "delta", "N0", "U^17(n)", "etat_id", "base_mod_4096"])
for low in sorted(pair_low_set):
high = low + shift27
rep = low if A_k(low, k17) == A17_target else high
side = "basse" if rep == low else "haute"
pref = prefix_data(rep, 17)
N0 = N0_D(pref.C, pref.A, 17)
w.writerow([rep, rep ^ shift27, side, " ".join(map(str, pref.word)), pref.A, pref.C, delta17, N0, pref.y, res_to_state[rep % 4096], rep % 4096])
md_d17 = str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28_et_impact.md")
write_text(
md_d17,
"\n".join(
[
"# Paquet D17 minimal après fusion (palier 2^28)",
"",
"## Introduction",
"",
"Audit D17 sur le domaine résiduel après fusion et après D16.",
"",
"## Tailles",
"",
f"- paires candidates D17 : {len(pair_low_set)}",
"",
"## CSV exhaustif",
"",
f"- {Path(csv_d17).name}",
"",
]
)
+ "\n",
)
csv_to_md_list(
csv_d17,
str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28_liste_exhaustive.md"),
"Liste exhaustive des clauses D17 après fusion (palier 2^28)",
"Liste exhaustive (format CSV copiable).",
)
cover_D17 = {low for low in pair_low_set} | {low + shift27 for low in pair_low_set}
all_lifted_28: Set[int] = set()
for r in R25_after_F:
for j in range(4):
low = r + j * shift25
if low in cover_D16:
continue
all_lifted_28.add(low)
all_lifted_28.add(low + shift27)
R28_after_D17 = sorted(all_lifted_28 - cover_D17)
noyau_path = Path(out_dir) / "noyaux" / "noyau_post_D17.json"
noyau_path.parent.mkdir(parents=True, exist_ok=True)
noyau_path.write_text(
json.dumps({"noyau": R28_after_D17, "palier": 28}),
encoding="utf-8",
)
def run_extended_D18_to_D21(
audit60_json: str,
out_dir: str,
noyau_post_D17_path: str | None = None,
resume_from: str | None = None,
) -> None:
"""Continue from D17 to D18, D19, F15, D20, F16, D21. resume_from='D20' skips to D20."""
global _pipeline_log_path
from collatz_fusion_pipeline import run_fusion_pipeline
from collatz_scission import run_scission
from collatz_update_noyau import run_update_noyau
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
(out / "noyaux").mkdir(exist_ok=True)
(out / "candidats").mkdir(exist_ok=True)
(out / "certificats").mkdir(exist_ok=True)
log_file = out / "pipeline_extend.log"
_pipeline_log_path = log_file
_log_step(f"START extend pipeline out_dir={out_dir} resume_from={resume_from!r} log={log_file}", memory=True)
def _extend_excepthook(etype: type, value: BaseException, tb: object) -> None:
mem = _get_memory_str()
try:
if _pipeline_log_path and _pipeline_log_path.exists():
with _pipeline_log_path.open("a", encoding="utf-8") as f:
f.write(f"[CRASH] {etype.__name__}: {value} {mem}\n")
f.flush()
except OSError:
pass
sys.__excepthook__(etype, value, tb)
global _original_excepthook
_original_excepthook = sys.excepthook
sys.excepthook = _extend_excepthook # type: ignore[assignment]
if resume_from == "D20":
prev_noyau = str(out / "noyaux" / "noyau_post_F15.json")
if not Path(prev_noyau).exists():
_log_step(f"ERROR: Resume D20 requires {prev_noyau}")
raise FileNotFoundError(f"Resume D20 requires {prev_noyau}")
_log_step("Resume from D20: using noyau_post_F15.json", memory=True)
else:
noyau_d17 = noyau_post_D17_path or str(out / "noyaux" / "noyau_post_D17.json")
if not Path(noyau_d17).exists():
_log_step(f"ERROR: Run full pipeline first to produce {noyau_d17}")
raise FileNotFoundError(f"Run full pipeline first to produce {noyau_d17}")
prev_noyau = noyau_d17
if resume_from != "D20":
for horizon, palier, valeur, label in [(18, 30, 29, "D18"), (19, 32, 31, "D19")]:
_log_step(f"STEP start {label} horizon={horizon} palier=2^{palier} valeur={valeur} input={prev_noyau}", memory=True)
try:
run_single_palier(
horizon=horizon,
palier=palier,
valeur=valeur,
input_noyau=prev_noyau,
output_csv=str(out / "candidats" / f"candidats_{label}_palier2p{palier}.csv"),
audit60_json=audit60_json,
output_noyau_path=str(out / "noyaux" / f"noyau_post_{label}.json"),
)
except Exception as e:
_log_step(f"STEP FAILED {label}: {type(e).__name__}: {e}")
raise
prev_noyau = str(out / "noyaux" / f"noyau_post_{label}.json")
_log_step(f"STEP done {label} next_noyau={prev_noyau}", memory=True)
_log_step("STEP start F15 fusion palier=2^32", memory=True)
csv_f15 = str(out / "candidats" / "candidats_F15_palier2p32.csv")
cert_f15 = str(out / "certificats" / "certificat_F15_palier2p32.json")
try:
run_fusion_pipeline(
horizons=[15],
palier=32,
input_noyau=prev_noyau,
output_csv=csv_f15,
audit60_json=audit60_json,
cible="critique",
)
run_scission(csv_f15, cert_f15)
noyau_f15 = str(out / "noyaux" / "noyau_post_F15.json")
run_update_noyau(cert_f15, prev_noyau, noyau_f15)
except Exception as e:
_log_step(f"STEP FAILED F15: {type(e).__name__}: {e}")
raise
prev_noyau = str(out / "noyaux" / "noyau_post_F15.json")
_log_step("STEP done F15", memory=True)
csv_d20 = str(out / "candidats" / "candidats_D20_palier2p34.csv")
noyau_d20 = str(out / "noyaux" / "noyau_post_D20.json")
if Path(noyau_d20).exists():
_log_step(f"Using existing {noyau_d20}", memory=True)
elif Path(csv_d20).exists():
from collatz_recover_noyau import run_recover
_log_step("Recovering noyau_post_D20 from existing candidats CSV...", memory=True)
run_recover(
previous_noyau=prev_noyau,
candidats_csv=csv_d20,
palier=34,
output=noyau_d20,
input_palier=32,
)
else:
_log_step(f"STEP start D20 palier=2^34 input={prev_noyau}", memory=True)
try:
run_single_palier(
horizon=20,
palier=34,
valeur=32,
input_noyau=prev_noyau,
output_csv=csv_d20,
audit60_json=audit60_json,
output_noyau_path=noyau_d20,
)
except Exception as e:
_log_step(f"STEP FAILED D20: {type(e).__name__}: {e}")
raise
_log_step("STEP done D20", memory=True)
prev_noyau = noyau_d20
_log_step("STEP start F16 fusion palier=2^35", memory=True)
csv_f16 = str(out / "candidats" / "candidats_F16_palier2p35.csv")
cert_f16 = str(out / "certificats" / "certificat_F16_palier2p35.json")
try:
run_fusion_pipeline(
horizons=[16],
palier=35,
input_noyau=prev_noyau,
output_csv=csv_f16,
audit60_json=audit60_json,
modulo=9,
)
run_scission(csv_f16, cert_f16)
noyau_f16 = str(out / "noyaux" / "noyau_post_F16.json")
run_update_noyau(cert_f16, prev_noyau, noyau_f16)
except Exception as e:
_log_step(f"STEP FAILED F16: {type(e).__name__}: {e}")
raise
prev_noyau = noyau_f16
_log_step("STEP done F16", memory=True)
_log_step("STEP start D21 palier=2^36 (final)", memory=True)
try:
run_single_palier(
horizon=21,
palier=36,
valeur=34,
input_noyau=prev_noyau,
output_csv=str(out / "candidats" / "candidats_D21_palier2p36.csv"),
audit60_json=audit60_json,
output_noyau_path=str(out / "noyaux" / "noyau_post_D21.json"),
)
except Exception as e:
_log_step(f"STEP FAILED D21: {type(e).__name__}: {e}")
raise
_log_step("STEP done D21 - extend pipeline complete", memory=True)
sys.excepthook = _original_excepthook # type: ignore[assignment]
_pipeline_log_path = None
def load_noyau(path: str) -> List[int]:
"""Load noyau from JSON: list of residues or dict with noyau/residues/covered."""
data = json.loads(Path(path).read_text(encoding="utf-8"))
if isinstance(data, list):
return [int(x) for x in data]
if isinstance(data, dict):
for key in ("noyau", "residues", "uncovered", "R25_after", "R24_after"):
if key in data and isinstance(data[key], list):
return [int(x) for x in data[key]]
raise ValueError(f"Noyau JSON: no residue list in {path}")
def _stream_noyau_items(path: str) -> Iterable[int]:
"""Stream-parse noyau JSON and yield residues. Use for large files to avoid loading all in memory."""
import ijson
p = Path(path)
if not p.exists():
raise FileNotFoundError(path)
with p.open("rb") as f:
for x in ijson.items(f, "noyau.item"):
yield int(x)
def _run_single_palier_stream(
horizon: int,
palier: int,
valeur: int,
input_noyau: str,
output_csv: str,
output_noyau_path: Optional[str],
audit60_json: str,
) -> None:
"""Stream-based single palier for large noyau files (>500 MB). Three passes: max_r, cand/cover, residual write."""
_log_step(" stream path: pass 1 (max_r)", memory=True)
max_r = 0
n_res = 0
for r in _stream_noyau_items(input_noyau):
max_r = max(max_r, r)
n_res += 1
_log_step(f" stream max_r done n_res={n_res} max_r={max_r}", memory=True)
input_palier = max_r.bit_length() if max_r else 0
curr_shift = 1 << (palier - 1)
if palier == 17:
prev_shift = 1 << 16
lift_count = 1
elif palier - input_palier >= 2:
prev_shift = 1 << input_palier
lift_count = 1 << (palier - input_palier)
else:
prev_shift = 1 << (palier - 1)
lift_count = 2
_log_step(" stream path: pass 2 (cand/cover)", memory=True)
cand: Set[int] = set()
for r in _stream_noyau_items(input_noyau):
for j in range(lift_count):
n = r + j * prev_shift
if A_k(n, horizon) == valeur:
cand.add(n)
cover = cand | {n ^ curr_shift for n in cand}
_log_step(f" cand/cover done len(cand)={len(cand)} len(cover)={len(cover)}", memory=True)
res_to_state, _ = load_state_map_60(audit60_json)
delta = (1 << valeur) - (3**horizon) if (1 << valeur) > (3**horizon) else 0
Path(output_csv).parent.mkdir(parents=True, exist_ok=True)
_log_step(f" writing CSV {output_csv}")
with Path(output_csv).open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
col_palier = f"classe_mod_2^{palier}"
w.writerow([col_palier, "sœur", f"mot_a0..a{horizon-1}", f"A{horizon}", f"C{horizon}", "delta", "N0", f"U^{horizon}(n)", "etat_id", "base_mod_4096"])
for n in sorted(cand):
pref = prefix_data(n, horizon)
N0 = N0_D(pref.C, pref.A, horizon) if delta > 0 else 0
base = n % 4096
etat = res_to_state.get(base, 0)
w.writerow([n, n ^ curr_shift, " ".join(map(str, pref.word)), pref.A, pref.C, delta, N0, pref.y, etat, base])
_log_step(" CSV written")
del cand
if output_noyau_path:
_log_step(" stream path: pass 3 (residual write)", memory=True)
Path(output_noyau_path).parent.mkdir(parents=True, exist_ok=True)
n_residual = 0
with Path(output_noyau_path).open("w", encoding="utf-8") as f:
f.write('{"noyau": [')
first = True
for r in _stream_noyau_items(input_noyau):
for j in range(lift_count):
n = r + j * prev_shift
if n not in cover:
if not first:
f.write(",")
f.write(str(n))
first = False
n_residual += 1
f.write(f'], "palier": {palier}}}')
_log_step(f" noyau written ({n_residual} residues)", memory=True)
print(f"Wrote noyau: {output_noyau_path} ({n_residual} residues)", flush=True)
print(f"Wrote {output_csv}: {len(cover) // 2} candidates, palier 2^{palier}", flush=True)
def run_single_palier(
horizon: int,
palier: int,
valeur: int,
input_noyau: str,
output_csv: str,
audit60_json: str,
output_noyau_path: str | None = None,
) -> None:
"""
Run a single palier: load noyau, lift to 2^palier, extract D_k candidates with A_k=valeur.
Memory-optimized: no full lifted list; two passes over residues (cand then residual); stream-write noyau JSON.
"""
p_in = Path(input_noyau)
file_size_mb = p_in.stat().st_size / (1024 * 1024) if p_in.exists() else 0
_log_step(f"run_single_palier k={horizon} palier=2^{palier} valeur={valeur} input={input_noyau} size_mb={file_size_mb:.1f}", memory=True)
if file_size_mb > 500:
_run_single_palier_stream(
horizon=horizon,
palier=palier,
valeur=valeur,
input_noyau=input_noyau,
output_csv=output_csv,
output_noyau_path=output_noyau_path,
audit60_json=audit60_json,
)
return
residues = load_noyau(input_noyau)
n_res = len(residues)
_log_step(f" load_noyau done len(residues)={n_res}", memory=True)
res_to_state, _ = load_state_map_60(audit60_json)
max_r = max(residues) if residues else 0
input_palier = max_r.bit_length() if max_r else 0
curr_shift = 1 << (palier - 1)
if palier == 17:
prev_shift = 1 << 16
lift_count = 1
elif palier - input_palier >= 2:
prev_shift = 1 << input_palier
lift_count = 1 << (palier - input_palier)
else:
prev_shift = 1 << (palier - 1)
lift_count = 2
# Pass 1: build cand (and cover) without storing full lifted list
cand: Set[int] = set()
for r in residues:
for j in range(lift_count):
n = r + j * prev_shift
if A_k(n, horizon) == valeur:
cand.add(n)
cover = cand | {n ^ curr_shift for n in cand}
_log_step(f" cand/cover done len(cand)={len(cand)} len(cover)={len(cover)}", memory=True)
delta = (1 << valeur) - (3**horizon) if (1 << valeur) > (3**horizon) else 0
Path(output_csv).parent.mkdir(parents=True, exist_ok=True)
_log_step(f" writing CSV {output_csv}")
with Path(output_csv).open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
col_palier = f"classe_mod_2^{palier}"
w.writerow([col_palier, "sœur", f"mot_a0..a{horizon-1}", f"A{horizon}", f"C{horizon}", "delta", "N0", f"U^{horizon}(n)", "etat_id", "base_mod_4096"])
for n in sorted(cand):
pref = prefix_data(n, horizon)
N0 = N0_D(pref.C, pref.A, horizon) if delta > 0 else 0
base = n % 4096
etat = res_to_state.get(base, 0)
w.writerow([n, n ^ curr_shift, " ".join(map(str, pref.word)), pref.A, pref.C, delta, N0, pref.y, etat, base])
_log_step(f" CSV written")
del cand # free before building residual
if output_noyau_path:
_log_step(f" computing residual (second pass over residues)")
residual: List[int] = []
for r in residues:
for j in range(lift_count):
n = r + j * prev_shift
if n not in cover:
residual.append(n)
residual.sort()
n_residual = len(residual)
_log_step(f" residual len={n_residual} writing noyau {output_noyau_path}", memory=True)
Path(output_noyau_path).parent.mkdir(parents=True, exist_ok=True)
with Path(output_noyau_path).open("w", encoding="utf-8") as f:
f.write('{"noyau": [')
for i, r in enumerate(residual):
if i > 0:
f.write(",")
f.write(str(r))
f.write(f'], "palier": {palier}}}')
_log_step(f" noyau written ({n_residual} residues)", memory=True)
print(f"Wrote noyau: {output_noyau_path} ({n_residual} residues)", flush=True)
print(f"Wrote {output_csv}: {len(cover) // 2} candidates, palier 2^{palier}", flush=True)
def main() -> None:
import argparse
ap = argparse.ArgumentParser(description="Collatz pipeline: full run or single palier")
ap.add_argument("--audit60", help="Audit 60 états JSON (for full run)")
ap.add_argument("--m15m16", help="Complétion m15→m16 MD (for full run)")
ap.add_argument("--d10", help="Candidats D10 MD (for full run)")
ap.add_argument("--out", help="Output directory (for full run)")
ap.add_argument("--horizon", type=int, help="Horizon k for single palier")
ap.add_argument("--palier", type=int, help="Palier m (2^m) for single palier")
ap.add_argument("--seuil", default="A_min", help="Seuil type (A_min)")
ap.add_argument("--valeur", type=int, help="A_k target value for single palier")
ap.add_argument("--input-noyau", help="Input noyau JSON for single palier")
ap.add_argument("--output", help="Output CSV for single palier")
ap.add_argument("--output-noyau", help="Output residual noyau JSON for next palier")
ap.add_argument("--parallel", action="store_true", help="Use parallel mode (placeholder)")
ap.add_argument("--threads", type=int, default=1, help="Thread count (placeholder)")
ap.add_argument("--extend", action="store_true", help="Run extended D18-D21 pipeline (requires noyau_post_D17)")
ap.add_argument("--resume-from", help="Resume from step (e.g. D20) - skip earlier steps")
args = ap.parse_args()
if args.extend:
audit60 = args.audit60 or str(Path(__file__).parent / "audit_60_etats_B12_mod4096_horizon7.json")
out_dir = args.out or str(Path(__file__).parent / "out")
if not Path(audit60).exists():
raise SystemExit(f"Audit60 not found: {audit60}")
run_extended_D18_to_D21(
audit60_json=audit60,
out_dir=out_dir,
resume_from=args.resume_from,
)
return
if args.horizon is not None and args.palier is not None and args.valeur is not None and args.output:
audit60 = args.audit60 or str(Path(__file__).parent / "audit_60_etats_B12_mod4096_horizon7.json")
if args.horizon == 10 and args.palier == 17 and args.m15m16:
residues = build_R17_from_completion(args.m15m16)
elif args.input_noyau:
residues = load_noyau(args.input_noyau)
else:
raise SystemExit("--input-noyau or --m15m16 (for D10) required for single palier mode")
if not Path(audit60).exists():
raise SystemExit(f"Audit60 not found: {audit60}")
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as tf:
json.dump(residues, tf)
tmp_noyau = tf.name
try:
run_single_palier(
horizon=args.horizon,
palier=args.palier,
valeur=args.valeur,
input_noyau=tmp_noyau if args.horizon == 10 and args.palier == 17 else args.input_noyau,
output_csv=args.output,
audit60_json=audit60,
output_noyau_path=args.output_noyau,
)
finally:
if args.horizon == 10 and args.palier == 17:
Path(tmp_noyau).unlink(missing_ok=True)
elif args.audit60 and args.m15m16 and args.d10 and args.out:
run_after_fusion_D16_D17(args.audit60, args.m15m16, args.d10, args.out)
else:
ap.error("Use either (--audit60 --m15m16 --d10 --out) or (--horizon --palier --valeur --input-noyau --output)")
if __name__ == "__main__":
main()