# -*- coding: utf-8 -*- """ collatz_k_pipeline.py Pipeline principale (reproduction des audits "après fusion" et des paquets D16/D17). Entrées attendues: - audit_60_etats_B12_mod4096_horizon7.json - complétion_minorée_m15_vers_m16.md - candidats_D10_palier2p17.md Sorties: - audits Markdown - CSV exhaustifs - Markdown listes exhaustives (bloc ```csv```) """ from __future__ import annotations from pathlib import Path import csv import re from collections import Counter from typing import List, Set, Dict, Tuple, Iterable from collatz_k_core import A_k, prefix_data, N0_D from collatz_k_utils import parse_markdown_table_to_rows, write_text from collatz_k_fusion import build_fusion_clauses def load_state_map_60(audit60_json_path: str) -> Tuple[Dict[int, int], Dict[int, str]]: import json data = json.loads(Path(audit60_json_path).read_text(encoding="utf-8")) res_to_state = {int(k): int(v) for k, v in data["residue_to_state"].items()} state_mot7 = {} for row in data["state_table"]: state_mot7[int(row["État"])] = row["Mot (a0..a6)"] return res_to_state, state_mot7 def rebuild_R17_after_full_D10(completion_m15_to_m16_md: str, candidats_D10_md: str) -> List[int]: text = Path(completion_m15_to_m16_md).read_text(encoding="utf-8") m = re.search(r"### Parents « both ».*?\n(.*)\Z", text, flags=re.S) if not m: raise ValueError("Section 'Parents both' introuvable") B15 = sorted(set(map(int, re.findall(r"\b\d+\b", m.group(1))))) shift15 = 1 << 15 shift16 = 1 << 16 R16 = set(B15) | {p + shift15 for p in B15} R17 = set(R16) | {x + shift16 for x in R16} rows = parse_markdown_table_to_rows(candidats_D10_md) cover175: Set[int] = set() for parts in rows[2:]: low = int(parts[0]) high = int(parts[1]) cover175.add(low) cover175.add(high) R17_after_175 = R17 - cover175 A10_16_high = [x for x in R17_after_175 if A_k(x, 10) == 16] cover171: Set[int] = set() for x in A10_16_high: cover171.add(x) cover171.add(x - shift16) return sorted(R17_after_175 - cover171) def lift_set(residues: Iterable[int], shift: int, count: int) -> List[int]: out: List[int] = [] for r in residues: for j in range(count): out.append(r + j * shift) return out def csv_to_md_list(csv_path: str, md_path: str, title: str, intro: str) -> None: p_csv = Path(csv_path) p_md = Path(md_path) with p_csv.open("r", encoding="utf-8") as fin, p_md.open("w", encoding="utf-8") as fout: fout.write(f"# {title}\n\n") fout.write("## Introduction\n\n") fout.write(intro.strip() + "\n\n") fout.write("## Liste exhaustive\n\n") fout.write("```csv\n") last = "" for line in fin: fout.write(line) last = line if last and not last.endswith("\n"): fout.write("\n") fout.write("```\n") def run_after_fusion_D16_D17( audit60_json: str, completion_m15_to_m16_md: str, candidats_D10_md: str, out_dir: str, ) -> None: Path(out_dir).mkdir(parents=True, exist_ok=True) res_to_state, state_mot7 = load_state_map_60(audit60_json) # R17 après D10 complet R17_after_full = rebuild_R17_after_full_D10(completion_m15_to_m16_md, candidats_D10_md) # D11 (2^19) shift17 = 1 << 17 shift18 = 1 << 18 R19 = lift_set(R17_after_full, shift17, 4) cand11 = set([n for n in R19 if A_k(n, 11) == 18]) cover11 = cand11 | {n ^ shift18 for n in cand11} R19_after = [n for n in R19 if n not in cover11] # D12 (2^21) shift19 = 1 << 19 shift20 = 1 << 20 R21 = lift_set(R19_after, shift19, 4) cand12 = set([n for n in R21 if A_k(n, 12) == 20]) cover12 = cand12 | {n ^ shift20 for n in cand12} R21_after = [n for n in R21 if n not in cover12] # D13 (2^22) shift21 = 1 << 21 R22 = list(R21_after) + [n + shift21 for n in R21_after] cand13 = set([n for n in R22 if A_k(n, 13) == 21]) cover13 = cand13 | {n ^ shift21 for n in cand13} R22_after = [n for n in R22 if n not in cover13] # D14 (2^24) shift22 = 1 << 22 shift23 = 1 << 23 R24 = lift_set(R22_after, shift22, 4) cand14 = set([n for n in R24 if A_k(n, 14) == 23]) cover14 = cand14 | {n ^ shift23 for n in cand14} R24_after = [n for n in R24 if n not in cover14] # D15 (2^25) shift24 = 1 << 24 R25 = list(R24_after) + [n + shift24 for n in R24_after] cand15 = set([n for n in R25 if A_k(n, 15) == 24]) cover15 = cand15 | {n ^ shift24 for n in cand15} R25_after = sorted([n for n in R25 if n not in cover15]) # Fusion (t=11,12,14) au palier 2^25 md_f11 = str(Path(out_dir) / "fusion_t11_palier2p25.md") csv_f11 = str(Path(out_dir) / "fusion_t11_palier2p25.csv") md_f12 = str(Path(out_dir) / "fusion_t12_palier2p25.md") csv_f12 = str(Path(out_dir) / "fusion_t12_palier2p25.csv") md_f14 = str(Path(out_dir) / "fusion_t14_palier2p25.md") csv_f14 = str(Path(out_dir) / "fusion_t14_palier2p25.csv") build_fusion_clauses(R25_after, 11, res_to_state, state_mot7, md_f11, csv_f11, 25) build_fusion_clauses(R25_after, 12, res_to_state, state_mot7, md_f12, csv_f12, 25) build_fusion_clauses(R25_after, 14, res_to_state, state_mot7, md_f14, csv_f14, 25) def load_hitset(csv_path: str) -> Set[int]: hs: Set[int] = set() p = Path(csv_path) if p.stat().st_size == 0: return hs with p.open("r", encoding="utf-8") as f: r = csv.DictReader(f) for row in r: hs.add(int(row["classe_mod_2^m"])) return hs unionF = load_hitset(csv_f11) | load_hitset(csv_f12) | load_hitset(csv_f14) R25_after_F = [n for n in R25_after if n not in unionF] # D16 après fusion (2^27) shift25 = 1 << 25 shift26 = 1 << 26 k16 = 16 A16_target = 26 cand_D16: Set[int] = set() for r in R25_after_F: for j in range(4): n = r + j * shift25 if A_k(n, k16) == A16_target: cand_D16.add(n) cover_D16 = cand_D16 | {n ^ shift26 for n in cand_D16} delta16 = (1 << 26) - (3**16) N0_dist = Counter() csv_d16 = str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27.csv") with Path(csv_d16).open("w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["classe_mod_2^27", "sœur", "mot_a0..a15", "A16", "C16", "delta", "N0", "U^16(n)", "etat_id", "base_mod_4096"]) for n in sorted(cand_D16): pref = prefix_data(n, 16) N0 = N0_D(pref.C, pref.A, 16) N0_dist[N0] += 1 w.writerow([n, n ^ shift26, " ".join(map(str, pref.word)), pref.A, pref.C, delta16, N0, pref.y, res_to_state[n % 4096], n % 4096]) # audit D16 (minimal) maxA16_after = 0 for r in R25_after_F: for j in range(4): n = r + j * shift25 if n in cover_D16: continue A = A_k(n, k16) if A > maxA16_after: maxA16_after = A md_d16 = str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27_et_impact.md") write_text( md_d16, "\n".join( [ "# Paquet D16 minimal après fusion (palier 2^27)", "", "## Introduction", "", "Audit D16 sur le noyau au palier 2^25 après fusion F(11)∪F(12)∪F(14).", "", "## Tailles", "", f"- noyau après D15 : {len(R25_after)}", f"- noyau après fusion : {len(R25_after_F)}", f"- relèvements 2^27 : {4 * len(R25_after_F)}", f"- candidats D16 : {len(cand_D16)}", f"- couverture (avec sœurs) : {len(cover_D16)}", f"- invariant max A16 après : {maxA16_after}", "", "## CSV exhaustif", "", f"- {Path(csv_d16).name}", "", ] ) + "\n", ) csv_to_md_list( csv_d16, str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27_liste_exhaustive.md"), "Liste exhaustive des clauses D16 après fusion (palier 2^27)", "Liste exhaustive (format CSV copiable).", ) # D17 après fusion et D16 (2^28) shift27 = 1 << 27 k17 = 17 A17_target = 27 pair_low_set: Set[int] = set() for r in R25_after_F: for j in range(4): low = r + j * shift25 if low in cover_D16: continue if A_k(low, k17) == A17_target or A_k(low + shift27, k17) == A17_target: pair_low_set.add(low) delta17 = (1 << 27) - (3**17) csv_d17 = str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28.csv") with Path(csv_d17).open("w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["classe_mod_2^28", "sœur", "côté", "mot_a0..a16", "A17", "C17", "delta", "N0", "U^17(n)", "etat_id", "base_mod_4096"]) for low in sorted(pair_low_set): high = low + shift27 rep = low if A_k(low, k17) == A17_target else high side = "basse" if rep == low else "haute" pref = prefix_data(rep, 17) N0 = N0_D(pref.C, pref.A, 17) w.writerow([rep, rep ^ shift27, side, " ".join(map(str, pref.word)), pref.A, pref.C, delta17, N0, pref.y, res_to_state[rep % 4096], rep % 4096]) md_d17 = str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28_et_impact.md") write_text( md_d17, "\n".join( [ "# Paquet D17 minimal après fusion (palier 2^28)", "", "## Introduction", "", "Audit D17 sur le domaine résiduel après fusion et après D16.", "", "## Tailles", "", f"- paires candidates D17 : {len(pair_low_set)}", "", "## CSV exhaustif", "", f"- {Path(csv_d17).name}", "", ] ) + "\n", ) csv_to_md_list( csv_d17, str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28_liste_exhaustive.md"), "Liste exhaustive des clauses D17 après fusion (palier 2^28)", "Liste exhaustive (format CSV copiable).", ) def main() -> None: import argparse ap = argparse.ArgumentParser() ap.add_argument("--audit60", required=True) ap.add_argument("--m15m16", required=True) ap.add_argument("--d10", required=True) ap.add_argument("--out", required=True) args = ap.parse_args() run_after_fusion_D16_D17(args.audit60, args.m15m16, args.d10, args.out) if __name__ == "__main__": main()