algo/v0/collatz_k_scripts/collatz_k_pipeline.py
Nicolas Cantu 2b99e8ff02 Skills document-improvement et scripts Collatz
**Motivations:**
- Ajout skill pour amélioration de documents en background
- Scripts et documentation Collatz

**Evolutions:**
- .cursor/skills/document-improvement/ (SKILL, reference, examples)
- v0/collatz_k_scripts/ (core, fusion, pipeline, utils, reproduce)
- v0/journal.md, v0/log.md, v0/README collatz

**Pages affectées:**
- .cursor/skills/document-improvement/
- v0/collatz_k_scripts/
- v0/journal.md, v0/log.md
2026-02-27 16:23:25 +01:00

319 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
collatz_k_pipeline.py
Pipeline principale (reproduction des audits "après fusion" et des paquets D16/D17).
Entrées attendues:
- audit_60_etats_B12_mod4096_horizon7.json
- complétion_minorée_m15_vers_m16.md
- candidats_D10_palier2p17.md
Sorties:
- audits Markdown
- CSV exhaustifs
- Markdown listes exhaustives (bloc ```csv```)
"""
from __future__ import annotations
from pathlib import Path
import csv
import re
from collections import Counter
from typing import List, Set, Dict, Tuple, Iterable
from collatz_k_core import A_k, prefix_data, N0_D
from collatz_k_utils import parse_markdown_table_to_rows, write_text
from collatz_k_fusion import build_fusion_clauses
def load_state_map_60(audit60_json_path: str) -> Tuple[Dict[int, int], Dict[int, str]]:
import json
data = json.loads(Path(audit60_json_path).read_text(encoding="utf-8"))
res_to_state = {int(k): int(v) for k, v in data["residue_to_state"].items()}
state_mot7 = {}
for row in data["state_table"]:
state_mot7[int(row["État"])] = row["Mot (a0..a6)"]
return res_to_state, state_mot7
def rebuild_R17_after_full_D10(completion_m15_to_m16_md: str, candidats_D10_md: str) -> List[int]:
text = Path(completion_m15_to_m16_md).read_text(encoding="utf-8")
m = re.search(r"### Parents « both ».*?\n(.*)\Z", text, flags=re.S)
if not m:
raise ValueError("Section 'Parents both' introuvable")
B15 = sorted(set(map(int, re.findall(r"\b\d+\b", m.group(1)))))
shift15 = 1 << 15
shift16 = 1 << 16
R16 = set(B15) | {p + shift15 for p in B15}
R17 = set(R16) | {x + shift16 for x in R16}
rows = parse_markdown_table_to_rows(candidats_D10_md)
cover175: Set[int] = set()
for parts in rows[2:]:
low = int(parts[0])
high = int(parts[1])
cover175.add(low)
cover175.add(high)
R17_after_175 = R17 - cover175
A10_16_high = [x for x in R17_after_175 if A_k(x, 10) == 16]
cover171: Set[int] = set()
for x in A10_16_high:
cover171.add(x)
cover171.add(x - shift16)
return sorted(R17_after_175 - cover171)
def lift_set(residues: Iterable[int], shift: int, count: int) -> List[int]:
out: List[int] = []
for r in residues:
for j in range(count):
out.append(r + j * shift)
return out
def csv_to_md_list(csv_path: str, md_path: str, title: str, intro: str) -> None:
p_csv = Path(csv_path)
p_md = Path(md_path)
with p_csv.open("r", encoding="utf-8") as fin, p_md.open("w", encoding="utf-8") as fout:
fout.write(f"# {title}\n\n")
fout.write("## Introduction\n\n")
fout.write(intro.strip() + "\n\n")
fout.write("## Liste exhaustive\n\n")
fout.write("```csv\n")
last = ""
for line in fin:
fout.write(line)
last = line
if last and not last.endswith("\n"):
fout.write("\n")
fout.write("```\n")
def run_after_fusion_D16_D17(
audit60_json: str,
completion_m15_to_m16_md: str,
candidats_D10_md: str,
out_dir: str,
) -> None:
Path(out_dir).mkdir(parents=True, exist_ok=True)
res_to_state, state_mot7 = load_state_map_60(audit60_json)
# R17 après D10 complet
R17_after_full = rebuild_R17_after_full_D10(completion_m15_to_m16_md, candidats_D10_md)
# D11 (2^19)
shift17 = 1 << 17
shift18 = 1 << 18
R19 = lift_set(R17_after_full, shift17, 4)
cand11 = set([n for n in R19 if A_k(n, 11) == 18])
cover11 = cand11 | {n ^ shift18 for n in cand11}
R19_after = [n for n in R19 if n not in cover11]
# D12 (2^21)
shift19 = 1 << 19
shift20 = 1 << 20
R21 = lift_set(R19_after, shift19, 4)
cand12 = set([n for n in R21 if A_k(n, 12) == 20])
cover12 = cand12 | {n ^ shift20 for n in cand12}
R21_after = [n for n in R21 if n not in cover12]
# D13 (2^22)
shift21 = 1 << 21
R22 = list(R21_after) + [n + shift21 for n in R21_after]
cand13 = set([n for n in R22 if A_k(n, 13) == 21])
cover13 = cand13 | {n ^ shift21 for n in cand13}
R22_after = [n for n in R22 if n not in cover13]
# D14 (2^24)
shift22 = 1 << 22
shift23 = 1 << 23
R24 = lift_set(R22_after, shift22, 4)
cand14 = set([n for n in R24 if A_k(n, 14) == 23])
cover14 = cand14 | {n ^ shift23 for n in cand14}
R24_after = [n for n in R24 if n not in cover14]
# D15 (2^25)
shift24 = 1 << 24
R25 = list(R24_after) + [n + shift24 for n in R24_after]
cand15 = set([n for n in R25 if A_k(n, 15) == 24])
cover15 = cand15 | {n ^ shift24 for n in cand15}
R25_after = sorted([n for n in R25 if n not in cover15])
# Fusion (t=11,12,14) au palier 2^25
md_f11 = str(Path(out_dir) / "fusion_t11_palier2p25.md")
csv_f11 = str(Path(out_dir) / "fusion_t11_palier2p25.csv")
md_f12 = str(Path(out_dir) / "fusion_t12_palier2p25.md")
csv_f12 = str(Path(out_dir) / "fusion_t12_palier2p25.csv")
md_f14 = str(Path(out_dir) / "fusion_t14_palier2p25.md")
csv_f14 = str(Path(out_dir) / "fusion_t14_palier2p25.csv")
build_fusion_clauses(R25_after, 11, res_to_state, state_mot7, md_f11, csv_f11, 25)
build_fusion_clauses(R25_after, 12, res_to_state, state_mot7, md_f12, csv_f12, 25)
build_fusion_clauses(R25_after, 14, res_to_state, state_mot7, md_f14, csv_f14, 25)
def load_hitset(csv_path: str) -> Set[int]:
hs: Set[int] = set()
p = Path(csv_path)
if p.stat().st_size == 0:
return hs
with p.open("r", encoding="utf-8") as f:
r = csv.DictReader(f)
for row in r:
hs.add(int(row["classe_mod_2^m"]))
return hs
unionF = load_hitset(csv_f11) | load_hitset(csv_f12) | load_hitset(csv_f14)
R25_after_F = [n for n in R25_after if n not in unionF]
# D16 après fusion (2^27)
shift25 = 1 << 25
shift26 = 1 << 26
k16 = 16
A16_target = 26
cand_D16: Set[int] = set()
for r in R25_after_F:
for j in range(4):
n = r + j * shift25
if A_k(n, k16) == A16_target:
cand_D16.add(n)
cover_D16 = cand_D16 | {n ^ shift26 for n in cand_D16}
delta16 = (1 << 26) - (3**16)
N0_dist = Counter()
csv_d16 = str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27.csv")
with Path(csv_d16).open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["classe_mod_2^27", "sœur", "mot_a0..a15", "A16", "C16", "delta", "N0", "U^16(n)", "etat_id", "base_mod_4096"])
for n in sorted(cand_D16):
pref = prefix_data(n, 16)
N0 = N0_D(pref.C, pref.A, 16)
N0_dist[N0] += 1
w.writerow([n, n ^ shift26, " ".join(map(str, pref.word)), pref.A, pref.C, delta16, N0, pref.y, res_to_state[n % 4096], n % 4096])
# audit D16 (minimal)
maxA16_after = 0
for r in R25_after_F:
for j in range(4):
n = r + j * shift25
if n in cover_D16:
continue
A = A_k(n, k16)
if A > maxA16_after:
maxA16_after = A
md_d16 = str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27_et_impact.md")
write_text(
md_d16,
"\n".join(
[
"# Paquet D16 minimal après fusion (palier 2^27)",
"",
"## Introduction",
"",
"Audit D16 sur le noyau au palier 2^25 après fusion F(11)F(12)F(14).",
"",
"## Tailles",
"",
f"- noyau après D15 : {len(R25_after)}",
f"- noyau après fusion : {len(R25_after_F)}",
f"- relèvements 2^27 : {4 * len(R25_after_F)}",
f"- candidats D16 : {len(cand_D16)}",
f"- couverture (avec sœurs) : {len(cover_D16)}",
f"- invariant max A16 après : {maxA16_after}",
"",
"## CSV exhaustif",
"",
f"- {Path(csv_d16).name}",
"",
]
)
+ "\n",
)
csv_to_md_list(
csv_d16,
str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27_liste_exhaustive.md"),
"Liste exhaustive des clauses D16 après fusion (palier 2^27)",
"Liste exhaustive (format CSV copiable).",
)
# D17 après fusion et D16 (2^28)
shift27 = 1 << 27
k17 = 17
A17_target = 27
pair_low_set: Set[int] = set()
for r in R25_after_F:
for j in range(4):
low = r + j * shift25
if low in cover_D16:
continue
if A_k(low, k17) == A17_target or A_k(low + shift27, k17) == A17_target:
pair_low_set.add(low)
delta17 = (1 << 27) - (3**17)
csv_d17 = str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28.csv")
with Path(csv_d17).open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["classe_mod_2^28", "sœur", "côté", "mot_a0..a16", "A17", "C17", "delta", "N0", "U^17(n)", "etat_id", "base_mod_4096"])
for low in sorted(pair_low_set):
high = low + shift27
rep = low if A_k(low, k17) == A17_target else high
side = "basse" if rep == low else "haute"
pref = prefix_data(rep, 17)
N0 = N0_D(pref.C, pref.A, 17)
w.writerow([rep, rep ^ shift27, side, " ".join(map(str, pref.word)), pref.A, pref.C, delta17, N0, pref.y, res_to_state[rep % 4096], rep % 4096])
md_d17 = str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28_et_impact.md")
write_text(
md_d17,
"\n".join(
[
"# Paquet D17 minimal après fusion (palier 2^28)",
"",
"## Introduction",
"",
"Audit D17 sur le domaine résiduel après fusion et après D16.",
"",
"## Tailles",
"",
f"- paires candidates D17 : {len(pair_low_set)}",
"",
"## CSV exhaustif",
"",
f"- {Path(csv_d17).name}",
"",
]
)
+ "\n",
)
csv_to_md_list(
csv_d17,
str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28_liste_exhaustive.md"),
"Liste exhaustive des clauses D17 après fusion (palier 2^28)",
"Liste exhaustive (format CSV copiable).",
)
def main() -> None:
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("--audit60", required=True)
ap.add_argument("--m15m16", required=True)
ap.add_argument("--d10", required=True)
ap.add_argument("--out", required=True)
args = ap.parse_args()
run_after_fusion_D16_D17(args.audit60, args.m15m16, args.d10, args.out)
if __name__ == "__main__":
main()