# -*- coding: utf-8 -*- """ collatz_k_pipeline.py Pipeline principale (reproduction des audits "après fusion" et des paquets D16/D17). Entrées attendues: - audit_60_etats_B12_mod4096_horizon7.json - complétion_minorée_m15_vers_m16.md - candidats_D10_palier2p17.md Sorties: - audits Markdown - CSV exhaustifs - Markdown listes exhaustives (bloc ```csv```) """ from __future__ import annotations from pathlib import Path import csv import json import re import sys import tempfile import time from collections import Counter from typing import List, Set, Dict, Tuple, Iterable, Optional from collatz_k_core import A_k, prefix_data, N0_D from collatz_k_utils import parse_markdown_table_to_rows, write_text from collatz_k_fusion import build_fusion_clauses # When set by run_extended_D18_to_D21, steps log to this file (flush after each line). _pipeline_log_path: Optional[Path] = None _original_excepthook: Optional[object] = None def _get_memory_str() -> str: """Return max RSS in MB (Unix). Empty string if unavailable.""" try: import resource ru = resource.getrusage(resource.RUSAGE_SELF) rss = getattr(ru, "ru_maxrss", 0) if not rss: return "" if sys.platform == "darwin": rss_mb = rss / (1024 * 1024) else: rss_mb = rss / 1024 return f"rss_max_mb={rss_mb:.0f}" except (ImportError, OSError, AttributeError): return "" def _log_step(msg: str, out_dir: Optional[Path] = None, memory: bool = False) -> None: """Log to stderr and optionally to pipeline log file. Flush so crash leaves trace.""" ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) if memory: mem = _get_memory_str() if mem: msg = f"{msg} {mem}" line = f"[{ts}] {msg}" print(line, flush=True) path = out_dir if isinstance(out_dir, Path) else _pipeline_log_path if path is not None: try: with path.open("a", encoding="utf-8") as f: f.write(line + "\n") f.flush() except OSError: pass def load_state_map_60(audit60_json_path: str) -> Tuple[Dict[int, int], Dict[int, str]]: import json data = json.loads(Path(audit60_json_path).read_text(encoding="utf-8")) res_to_state = {int(k): int(v) for k, v in data["residue_to_state"].items()} state_mot7 = {} for row in data["state_table"]: state_mot7[int(row["État"])] = row["Mot (a0..a6)"] return res_to_state, state_mot7 def build_R17_from_completion(completion_m15_to_m16_md: str) -> List[int]: """Build R17 from completion (Parents both) without D10 subtraction.""" text = Path(completion_m15_to_m16_md).read_text(encoding="utf-8") m = re.search(r"### Parents « both ».*?\n(.*)\Z", text, flags=re.S) if not m: raise ValueError("Section 'Parents both' introuvable") B15 = sorted(set(map(int, re.findall(r"\b\d+\b", m.group(1))))) shift15 = 1 << 15 shift16 = 1 << 16 R16 = set(B15) | {p + shift15 for p in B15} R17 = set(R16) | {x + shift16 for x in R16} return sorted(R17) def rebuild_R17_after_full_D10(completion_m15_to_m16_md: str, candidats_D10_md: str) -> List[int]: text = Path(completion_m15_to_m16_md).read_text(encoding="utf-8") m = re.search(r"### Parents « both ».*?\n(.*)\Z", text, flags=re.S) if not m: raise ValueError("Section 'Parents both' introuvable") B15 = sorted(set(map(int, re.findall(r"\b\d+\b", m.group(1))))) shift15 = 1 << 15 shift16 = 1 << 16 R16 = set(B15) | {p + shift15 for p in B15} R17 = set(R16) | {x + shift16 for x in R16} rows = parse_markdown_table_to_rows(candidats_D10_md) cover175: Set[int] = set() for parts in rows[2:]: low = int(parts[0]) high = int(parts[1]) cover175.add(low) cover175.add(high) R17_after_175 = R17 - cover175 A10_16_high = [x for x in R17_after_175 if A_k(x, 10) == 16] cover171: Set[int] = set() for x in A10_16_high: cover171.add(x) cover171.add(x - shift16) return sorted(R17_after_175 - cover171) def lift_set(residues: Iterable[int], shift: int, count: int) -> List[int]: out: List[int] = [] for r in residues: for j in range(count): out.append(r + j * shift) return out def csv_to_md_list(csv_path: str, md_path: str, title: str, intro: str) -> None: p_csv = Path(csv_path) p_md = Path(md_path) with p_csv.open("r", encoding="utf-8") as fin, p_md.open("w", encoding="utf-8") as fout: fout.write(f"# {title}\n\n") fout.write("## Introduction\n\n") fout.write(intro.strip() + "\n\n") fout.write("## Liste exhaustive\n\n") fout.write("```csv\n") last = "" for line in fin: fout.write(line) last = line if last and not last.endswith("\n"): fout.write("\n") fout.write("```\n") def run_after_fusion_D16_D17( audit60_json: str, completion_m15_to_m16_md: str, candidats_D10_md: str, out_dir: str, ) -> None: Path(out_dir).mkdir(parents=True, exist_ok=True) res_to_state, state_mot7 = load_state_map_60(audit60_json) # R17 après D10 complet R17_after_full = rebuild_R17_after_full_D10(completion_m15_to_m16_md, candidats_D10_md) # D11 (2^19) shift17 = 1 << 17 shift18 = 1 << 18 R19 = lift_set(R17_after_full, shift17, 4) cand11 = set([n for n in R19 if A_k(n, 11) == 18]) cover11 = cand11 | {n ^ shift18 for n in cand11} R19_after = [n for n in R19 if n not in cover11] # D12 (2^21) shift19 = 1 << 19 shift20 = 1 << 20 R21 = lift_set(R19_after, shift19, 4) cand12 = set([n for n in R21 if A_k(n, 12) == 20]) cover12 = cand12 | {n ^ shift20 for n in cand12} R21_after = [n for n in R21 if n not in cover12] # D13 (2^22) shift21 = 1 << 21 R22 = list(R21_after) + [n + shift21 for n in R21_after] cand13 = set([n for n in R22 if A_k(n, 13) == 21]) cover13 = cand13 | {n ^ shift21 for n in cand13} R22_after = [n for n in R22 if n not in cover13] # D14 (2^24) shift22 = 1 << 22 shift23 = 1 << 23 R24 = lift_set(R22_after, shift22, 4) cand14 = set([n for n in R24 if A_k(n, 14) == 23]) cover14 = cand14 | {n ^ shift23 for n in cand14} R24_after = [n for n in R24 if n not in cover14] # D15 (2^25) shift24 = 1 << 24 R25 = list(R24_after) + [n + shift24 for n in R24_after] cand15 = set([n for n in R25 if A_k(n, 15) == 24]) cover15 = cand15 | {n ^ shift24 for n in cand15} R25_after = sorted([n for n in R25 if n not in cover15]) # Fusion (t=11,12,14) au palier 2^25 md_f11 = str(Path(out_dir) / "fusion_t11_palier2p25.md") csv_f11 = str(Path(out_dir) / "fusion_t11_palier2p25.csv") md_f12 = str(Path(out_dir) / "fusion_t12_palier2p25.md") csv_f12 = str(Path(out_dir) / "fusion_t12_palier2p25.csv") md_f14 = str(Path(out_dir) / "fusion_t14_palier2p25.md") csv_f14 = str(Path(out_dir) / "fusion_t14_palier2p25.csv") build_fusion_clauses(R25_after, 11, res_to_state, state_mot7, md_f11, csv_f11, 25) build_fusion_clauses(R25_after, 12, res_to_state, state_mot7, md_f12, csv_f12, 25) build_fusion_clauses(R25_after, 14, res_to_state, state_mot7, md_f14, csv_f14, 25) def load_hitset(csv_path: str) -> Set[int]: hs: Set[int] = set() p = Path(csv_path) if p.stat().st_size == 0: return hs with p.open("r", encoding="utf-8") as f: r = csv.DictReader(f) for row in r: hs.add(int(row["classe_mod_2^m"])) return hs unionF = load_hitset(csv_f11) | load_hitset(csv_f12) | load_hitset(csv_f14) R25_after_F = [n for n in R25_after if n not in unionF] # D16 après fusion (2^27) shift25 = 1 << 25 shift26 = 1 << 26 k16 = 16 A16_target = 26 cand_D16: Set[int] = set() for r in R25_after_F: for j in range(4): n = r + j * shift25 if A_k(n, k16) == A16_target: cand_D16.add(n) cover_D16 = cand_D16 | {n ^ shift26 for n in cand_D16} delta16 = (1 << 26) - (3**16) N0_dist = Counter() csv_d16 = str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27.csv") with Path(csv_d16).open("w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["classe_mod_2^27", "sœur", "mot_a0..a15", "A16", "C16", "delta", "N0", "U^16(n)", "etat_id", "base_mod_4096"]) for n in sorted(cand_D16): pref = prefix_data(n, 16) N0 = N0_D(pref.C, pref.A, 16) N0_dist[N0] += 1 w.writerow([n, n ^ shift26, " ".join(map(str, pref.word)), pref.A, pref.C, delta16, N0, pref.y, res_to_state[n % 4096], n % 4096]) # audit D16 (minimal) maxA16_after = 0 for r in R25_after_F: for j in range(4): n = r + j * shift25 if n in cover_D16: continue A = A_k(n, k16) if A > maxA16_after: maxA16_after = A md_d16 = str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27_et_impact.md") write_text( md_d16, "\n".join( [ "# Paquet D16 minimal après fusion (palier 2^27)", "", "## Introduction", "", "Audit D16 sur le noyau au palier 2^25 après fusion F(11)∪F(12)∪F(14).", "", "## Tailles", "", f"- noyau après D15 : {len(R25_after)}", f"- noyau après fusion : {len(R25_after_F)}", f"- relèvements 2^27 : {4 * len(R25_after_F)}", f"- candidats D16 : {len(cand_D16)}", f"- couverture (avec sœurs) : {len(cover_D16)}", f"- invariant max A16 après : {maxA16_after}", "", "## CSV exhaustif", "", f"- {Path(csv_d16).name}", "", ] ) + "\n", ) csv_to_md_list( csv_d16, str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27_liste_exhaustive.md"), "Liste exhaustive des clauses D16 après fusion (palier 2^27)", "Liste exhaustive (format CSV copiable).", ) # D17 après fusion et D16 (2^28) shift27 = 1 << 27 k17 = 17 A17_target = 27 pair_low_set: Set[int] = set() for r in R25_after_F: for j in range(4): low = r + j * shift25 if low in cover_D16: continue if A_k(low, k17) == A17_target or A_k(low + shift27, k17) == A17_target: pair_low_set.add(low) delta17 = (1 << 27) - (3**17) csv_d17 = str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28.csv") with Path(csv_d17).open("w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["classe_mod_2^28", "sœur", "côté", "mot_a0..a16", "A17", "C17", "delta", "N0", "U^17(n)", "etat_id", "base_mod_4096"]) for low in sorted(pair_low_set): high = low + shift27 rep = low if A_k(low, k17) == A17_target else high side = "basse" if rep == low else "haute" pref = prefix_data(rep, 17) N0 = N0_D(pref.C, pref.A, 17) w.writerow([rep, rep ^ shift27, side, " ".join(map(str, pref.word)), pref.A, pref.C, delta17, N0, pref.y, res_to_state[rep % 4096], rep % 4096]) md_d17 = str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28_et_impact.md") write_text( md_d17, "\n".join( [ "# Paquet D17 minimal après fusion (palier 2^28)", "", "## Introduction", "", "Audit D17 sur le domaine résiduel après fusion et après D16.", "", "## Tailles", "", f"- paires candidates D17 : {len(pair_low_set)}", "", "## CSV exhaustif", "", f"- {Path(csv_d17).name}", "", ] ) + "\n", ) csv_to_md_list( csv_d17, str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28_liste_exhaustive.md"), "Liste exhaustive des clauses D17 après fusion (palier 2^28)", "Liste exhaustive (format CSV copiable).", ) cover_D17 = {low for low in pair_low_set} | {low + shift27 for low in pair_low_set} all_lifted_28: Set[int] = set() for r in R25_after_F: for j in range(4): low = r + j * shift25 if low in cover_D16: continue all_lifted_28.add(low) all_lifted_28.add(low + shift27) R28_after_D17 = sorted(all_lifted_28 - cover_D17) noyau_path = Path(out_dir) / "noyaux" / "noyau_post_D17.json" noyau_path.parent.mkdir(parents=True, exist_ok=True) noyau_path.write_text( json.dumps({"noyau": R28_after_D17, "palier": 28}), encoding="utf-8", ) def run_extended_D18_to_D21( audit60_json: str, out_dir: str, noyau_post_D17_path: str | None = None, resume_from: str | None = None, ) -> None: """Continue from D17 to D18, D19, F15, D20, F16, D21. resume_from='D20' skips to D20.""" global _pipeline_log_path from collatz_fusion_pipeline import run_fusion_pipeline from collatz_scission import run_scission from collatz_update_noyau import run_update_noyau out = Path(out_dir) out.mkdir(parents=True, exist_ok=True) (out / "noyaux").mkdir(exist_ok=True) (out / "candidats").mkdir(exist_ok=True) (out / "certificats").mkdir(exist_ok=True) log_file = out / "pipeline_extend.log" _pipeline_log_path = log_file _log_step(f"START extend pipeline out_dir={out_dir} resume_from={resume_from!r} log={log_file}", memory=True) def _extend_excepthook(etype: type, value: BaseException, tb: object) -> None: mem = _get_memory_str() try: if _pipeline_log_path and _pipeline_log_path.exists(): with _pipeline_log_path.open("a", encoding="utf-8") as f: f.write(f"[CRASH] {etype.__name__}: {value} {mem}\n") f.flush() except OSError: pass sys.__excepthook__(etype, value, tb) global _original_excepthook _original_excepthook = sys.excepthook sys.excepthook = _extend_excepthook # type: ignore[assignment] if resume_from == "D20": prev_noyau = str(out / "noyaux" / "noyau_post_F15.json") if not Path(prev_noyau).exists(): _log_step(f"ERROR: Resume D20 requires {prev_noyau}") raise FileNotFoundError(f"Resume D20 requires {prev_noyau}") _log_step("Resume from D20: using noyau_post_F15.json", memory=True) else: noyau_d17 = noyau_post_D17_path or str(out / "noyaux" / "noyau_post_D17.json") if not Path(noyau_d17).exists(): _log_step(f"ERROR: Run full pipeline first to produce {noyau_d17}") raise FileNotFoundError(f"Run full pipeline first to produce {noyau_d17}") prev_noyau = noyau_d17 if resume_from != "D20": for horizon, palier, valeur, label in [(18, 30, 29, "D18"), (19, 32, 31, "D19")]: _log_step(f"STEP start {label} horizon={horizon} palier=2^{palier} valeur={valeur} input={prev_noyau}", memory=True) try: run_single_palier( horizon=horizon, palier=palier, valeur=valeur, input_noyau=prev_noyau, output_csv=str(out / "candidats" / f"candidats_{label}_palier2p{palier}.csv"), audit60_json=audit60_json, output_noyau_path=str(out / "noyaux" / f"noyau_post_{label}.json"), ) except Exception as e: _log_step(f"STEP FAILED {label}: {type(e).__name__}: {e}") raise prev_noyau = str(out / "noyaux" / f"noyau_post_{label}.json") _log_step(f"STEP done {label} next_noyau={prev_noyau}", memory=True) _log_step("STEP start F15 fusion palier=2^32", memory=True) csv_f15 = str(out / "candidats" / "candidats_F15_palier2p32.csv") cert_f15 = str(out / "certificats" / "certificat_F15_palier2p32.json") try: run_fusion_pipeline( horizons=[15], palier=32, input_noyau=prev_noyau, output_csv=csv_f15, audit60_json=audit60_json, cible="critique", ) run_scission(csv_f15, cert_f15) noyau_f15 = str(out / "noyaux" / "noyau_post_F15.json") run_update_noyau(cert_f15, prev_noyau, noyau_f15) except Exception as e: _log_step(f"STEP FAILED F15: {type(e).__name__}: {e}") raise prev_noyau = str(out / "noyaux" / "noyau_post_F15.json") _log_step("STEP done F15", memory=True) csv_d20 = str(out / "candidats" / "candidats_D20_palier2p34.csv") noyau_d20 = str(out / "noyaux" / "noyau_post_D20.json") if Path(noyau_d20).exists(): _log_step(f"Using existing {noyau_d20}", memory=True) elif Path(csv_d20).exists(): from collatz_recover_noyau import run_recover _log_step("Recovering noyau_post_D20 from existing candidats CSV...", memory=True) run_recover( previous_noyau=prev_noyau, candidats_csv=csv_d20, palier=34, output=noyau_d20, input_palier=32, ) else: _log_step(f"STEP start D20 palier=2^34 input={prev_noyau}", memory=True) try: run_single_palier( horizon=20, palier=34, valeur=32, input_noyau=prev_noyau, output_csv=csv_d20, audit60_json=audit60_json, output_noyau_path=noyau_d20, ) except Exception as e: _log_step(f"STEP FAILED D20: {type(e).__name__}: {e}") raise _log_step("STEP done D20", memory=True) prev_noyau = noyau_d20 _log_step("STEP start F16 fusion palier=2^35", memory=True) csv_f16 = str(out / "candidats" / "candidats_F16_palier2p35.csv") cert_f16 = str(out / "certificats" / "certificat_F16_palier2p35.json") try: run_fusion_pipeline( horizons=[16], palier=35, input_noyau=prev_noyau, output_csv=csv_f16, audit60_json=audit60_json, modulo=9, ) run_scission(csv_f16, cert_f16) noyau_f16 = str(out / "noyaux" / "noyau_post_F16.json") run_update_noyau(cert_f16, prev_noyau, noyau_f16) except Exception as e: _log_step(f"STEP FAILED F16: {type(e).__name__}: {e}") raise prev_noyau = noyau_f16 _log_step("STEP done F16", memory=True) _log_step("STEP start D21 palier=2^36 (final)", memory=True) try: run_single_palier( horizon=21, palier=36, valeur=34, input_noyau=prev_noyau, output_csv=str(out / "candidats" / "candidats_D21_palier2p36.csv"), audit60_json=audit60_json, output_noyau_path=str(out / "noyaux" / "noyau_post_D21.json"), ) except Exception as e: _log_step(f"STEP FAILED D21: {type(e).__name__}: {e}") raise _log_step("STEP done D21 - extend pipeline complete", memory=True) sys.excepthook = _original_excepthook # type: ignore[assignment] _pipeline_log_path = None def load_noyau(path: str) -> List[int]: """Load noyau from JSON: list of residues or dict with noyau/residues/covered.""" data = json.loads(Path(path).read_text(encoding="utf-8")) if isinstance(data, list): return [int(x) for x in data] if isinstance(data, dict): for key in ("noyau", "residues", "uncovered", "R25_after", "R24_after"): if key in data and isinstance(data[key], list): return [int(x) for x in data[key]] raise ValueError(f"Noyau JSON: no residue list in {path}") def _stream_noyau_items(path: str) -> Iterable[int]: """Stream-parse noyau JSON and yield residues. Use for large files to avoid loading all in memory.""" import ijson p = Path(path) if not p.exists(): raise FileNotFoundError(path) with p.open("rb") as f: for x in ijson.items(f, "noyau.item"): yield int(x) def _run_single_palier_stream( horizon: int, palier: int, valeur: int, input_noyau: str, output_csv: str, output_noyau_path: Optional[str], audit60_json: str, ) -> None: """Stream-based single palier for large noyau files (>500 MB). Three passes: max_r, cand/cover, residual write.""" _log_step(" stream path: pass 1 (max_r)", memory=True) max_r = 0 n_res = 0 for r in _stream_noyau_items(input_noyau): max_r = max(max_r, r) n_res += 1 _log_step(f" stream max_r done n_res={n_res} max_r={max_r}", memory=True) input_palier = max_r.bit_length() if max_r else 0 curr_shift = 1 << (palier - 1) if palier == 17: prev_shift = 1 << 16 lift_count = 1 elif palier - input_palier >= 2: prev_shift = 1 << input_palier lift_count = 1 << (palier - input_palier) else: prev_shift = 1 << (palier - 1) lift_count = 2 _log_step(" stream path: pass 2 (cand/cover)", memory=True) cand: Set[int] = set() for r in _stream_noyau_items(input_noyau): for j in range(lift_count): n = r + j * prev_shift if A_k(n, horizon) == valeur: cand.add(n) cover = cand | {n ^ curr_shift for n in cand} _log_step(f" cand/cover done len(cand)={len(cand)} len(cover)={len(cover)}", memory=True) res_to_state, _ = load_state_map_60(audit60_json) delta = (1 << valeur) - (3**horizon) if (1 << valeur) > (3**horizon) else 0 Path(output_csv).parent.mkdir(parents=True, exist_ok=True) _log_step(f" writing CSV {output_csv}") with Path(output_csv).open("w", newline="", encoding="utf-8") as f: w = csv.writer(f) col_palier = f"classe_mod_2^{palier}" w.writerow([col_palier, "sœur", f"mot_a0..a{horizon-1}", f"A{horizon}", f"C{horizon}", "delta", "N0", f"U^{horizon}(n)", "etat_id", "base_mod_4096"]) for n in sorted(cand): pref = prefix_data(n, horizon) N0 = N0_D(pref.C, pref.A, horizon) if delta > 0 else 0 base = n % 4096 etat = res_to_state.get(base, 0) w.writerow([n, n ^ curr_shift, " ".join(map(str, pref.word)), pref.A, pref.C, delta, N0, pref.y, etat, base]) _log_step(" CSV written") del cand if output_noyau_path: _log_step(" stream path: pass 3 (residual write)", memory=True) Path(output_noyau_path).parent.mkdir(parents=True, exist_ok=True) n_residual = 0 with Path(output_noyau_path).open("w", encoding="utf-8") as f: f.write('{"noyau": [') first = True for r in _stream_noyau_items(input_noyau): for j in range(lift_count): n = r + j * prev_shift if n not in cover: if not first: f.write(",") f.write(str(n)) first = False n_residual += 1 f.write(f'], "palier": {palier}}}') _log_step(f" noyau written ({n_residual} residues)", memory=True) print(f"Wrote noyau: {output_noyau_path} ({n_residual} residues)", flush=True) print(f"Wrote {output_csv}: {len(cover) // 2} candidates, palier 2^{palier}", flush=True) def run_single_palier( horizon: int, palier: int, valeur: int, input_noyau: str, output_csv: str, audit60_json: str, output_noyau_path: str | None = None, ) -> None: """ Run a single palier: load noyau, lift to 2^palier, extract D_k candidates with A_k=valeur. Memory-optimized: no full lifted list; two passes over residues (cand then residual); stream-write noyau JSON. """ p_in = Path(input_noyau) file_size_mb = p_in.stat().st_size / (1024 * 1024) if p_in.exists() else 0 _log_step(f"run_single_palier k={horizon} palier=2^{palier} valeur={valeur} input={input_noyau} size_mb={file_size_mb:.1f}", memory=True) if file_size_mb > 500: _run_single_palier_stream( horizon=horizon, palier=palier, valeur=valeur, input_noyau=input_noyau, output_csv=output_csv, output_noyau_path=output_noyau_path, audit60_json=audit60_json, ) return residues = load_noyau(input_noyau) n_res = len(residues) _log_step(f" load_noyau done len(residues)={n_res}", memory=True) res_to_state, _ = load_state_map_60(audit60_json) max_r = max(residues) if residues else 0 input_palier = max_r.bit_length() if max_r else 0 curr_shift = 1 << (palier - 1) if palier == 17: prev_shift = 1 << 16 lift_count = 1 elif palier - input_palier >= 2: prev_shift = 1 << input_palier lift_count = 1 << (palier - input_palier) else: prev_shift = 1 << (palier - 1) lift_count = 2 # Pass 1: build cand (and cover) without storing full lifted list cand: Set[int] = set() for r in residues: for j in range(lift_count): n = r + j * prev_shift if A_k(n, horizon) == valeur: cand.add(n) cover = cand | {n ^ curr_shift for n in cand} _log_step(f" cand/cover done len(cand)={len(cand)} len(cover)={len(cover)}", memory=True) delta = (1 << valeur) - (3**horizon) if (1 << valeur) > (3**horizon) else 0 Path(output_csv).parent.mkdir(parents=True, exist_ok=True) _log_step(f" writing CSV {output_csv}") with Path(output_csv).open("w", newline="", encoding="utf-8") as f: w = csv.writer(f) col_palier = f"classe_mod_2^{palier}" w.writerow([col_palier, "sœur", f"mot_a0..a{horizon-1}", f"A{horizon}", f"C{horizon}", "delta", "N0", f"U^{horizon}(n)", "etat_id", "base_mod_4096"]) for n in sorted(cand): pref = prefix_data(n, horizon) N0 = N0_D(pref.C, pref.A, horizon) if delta > 0 else 0 base = n % 4096 etat = res_to_state.get(base, 0) w.writerow([n, n ^ curr_shift, " ".join(map(str, pref.word)), pref.A, pref.C, delta, N0, pref.y, etat, base]) _log_step(f" CSV written") del cand # free before building residual if output_noyau_path: _log_step(f" computing residual (second pass over residues)") residual: List[int] = [] for r in residues: for j in range(lift_count): n = r + j * prev_shift if n not in cover: residual.append(n) residual.sort() n_residual = len(residual) _log_step(f" residual len={n_residual} writing noyau {output_noyau_path}", memory=True) Path(output_noyau_path).parent.mkdir(parents=True, exist_ok=True) with Path(output_noyau_path).open("w", encoding="utf-8") as f: f.write('{"noyau": [') for i, r in enumerate(residual): if i > 0: f.write(",") f.write(str(r)) f.write(f'], "palier": {palier}}}') _log_step(f" noyau written ({n_residual} residues)", memory=True) print(f"Wrote noyau: {output_noyau_path} ({n_residual} residues)", flush=True) print(f"Wrote {output_csv}: {len(cover) // 2} candidates, palier 2^{palier}", flush=True) def main() -> None: import argparse ap = argparse.ArgumentParser(description="Collatz pipeline: full run or single palier") ap.add_argument("--audit60", help="Audit 60 états JSON (for full run)") ap.add_argument("--m15m16", help="Complétion m15→m16 MD (for full run)") ap.add_argument("--d10", help="Candidats D10 MD (for full run)") ap.add_argument("--out", help="Output directory (for full run)") ap.add_argument("--horizon", type=int, help="Horizon k for single palier") ap.add_argument("--palier", type=int, help="Palier m (2^m) for single palier") ap.add_argument("--seuil", default="A_min", help="Seuil type (A_min)") ap.add_argument("--valeur", type=int, help="A_k target value for single palier") ap.add_argument("--input-noyau", help="Input noyau JSON for single palier") ap.add_argument("--output", help="Output CSV for single palier") ap.add_argument("--output-noyau", help="Output residual noyau JSON for next palier") ap.add_argument("--parallel", action="store_true", help="Use parallel mode (placeholder)") ap.add_argument("--threads", type=int, default=1, help="Thread count (placeholder)") ap.add_argument("--extend", action="store_true", help="Run extended D18-D21 pipeline (requires noyau_post_D17)") ap.add_argument("--resume-from", help="Resume from step (e.g. D20) - skip earlier steps") args = ap.parse_args() if args.extend: audit60 = args.audit60 or str(Path(__file__).parent / "audit_60_etats_B12_mod4096_horizon7.json") out_dir = args.out or str(Path(__file__).parent / "out") if not Path(audit60).exists(): raise SystemExit(f"Audit60 not found: {audit60}") run_extended_D18_to_D21( audit60_json=audit60, out_dir=out_dir, resume_from=args.resume_from, ) return if args.horizon is not None and args.palier is not None and args.valeur is not None and args.output: audit60 = args.audit60 or str(Path(__file__).parent / "audit_60_etats_B12_mod4096_horizon7.json") if args.horizon == 10 and args.palier == 17 and args.m15m16: residues = build_R17_from_completion(args.m15m16) elif args.input_noyau: residues = load_noyau(args.input_noyau) else: raise SystemExit("--input-noyau or --m15m16 (for D10) required for single palier mode") if not Path(audit60).exists(): raise SystemExit(f"Audit60 not found: {audit60}") with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as tf: json.dump(residues, tf) tmp_noyau = tf.name try: run_single_palier( horizon=args.horizon, palier=args.palier, valeur=args.valeur, input_noyau=tmp_noyau if args.horizon == 10 and args.palier == 17 else args.input_noyau, output_csv=args.output, audit60_json=audit60, output_noyau_path=args.output_noyau, ) finally: if args.horizon == 10 and args.palier == 17: Path(tmp_noyau).unlink(missing_ok=True) elif args.audit60 and args.m15m16 and args.d10 and args.out: run_after_fusion_D16_D17(args.audit60, args.m15m16, args.d10, args.out) else: ap.error("Use either (--audit60 --m15m16 --d10 --out) or (--horizon --palier --valeur --input-noyau --output)") if __name__ == "__main__": main()