**Motivations:** - Clarifier l'organisation du dépôt par domaine applicatif - Séparer les contenus par public cible (adulte, enfant, thèse) **Evolutions:** - Nouvelle arborescence applications/ (collatz, IA) - Dossier pour enfants/ pour les contenus jeunesse - Dossier these/ pour le livre jeune adulte - Scripts de pipeline Collatz (01-setup, 02-run-pipeline, 03-run-direct-pipeline) - Candidats D18 palier2p30, registreK partagé en archives zip - Plan de relecture scientifique mis à jour **Pages affectées:** - .cursor/plans/relecture-scientifique-collatz.md - v0/ → applications/collatz/, applications/IA/, pour enfants/, these/ - IA_agents/ → pour enfants/
319 lines
11 KiB
Python
319 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
collatz_k_pipeline.py
|
||
|
||
Pipeline principale (reproduction des audits "après fusion" et des paquets D16/D17).
|
||
|
||
Entrées attendues:
|
||
- audit_60_etats_B12_mod4096_horizon7.json
|
||
- complétion_minorée_m15_vers_m16.md
|
||
- candidats_D10_palier2p17.md
|
||
|
||
Sorties:
|
||
- audits Markdown
|
||
- CSV exhaustifs
|
||
- Markdown listes exhaustives (bloc ```csv```)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
from pathlib import Path
|
||
import csv
|
||
import re
|
||
from collections import Counter
|
||
from typing import List, Set, Dict, Tuple, Iterable
|
||
|
||
from collatz_k_core import A_k, prefix_data, N0_D
|
||
from collatz_k_utils import parse_markdown_table_to_rows, write_text
|
||
from collatz_k_fusion import build_fusion_clauses
|
||
|
||
|
||
def load_state_map_60(audit60_json_path: str) -> Tuple[Dict[int, int], Dict[int, str]]:
|
||
import json
|
||
|
||
data = json.loads(Path(audit60_json_path).read_text(encoding="utf-8"))
|
||
res_to_state = {int(k): int(v) for k, v in data["residue_to_state"].items()}
|
||
state_mot7 = {}
|
||
for row in data["state_table"]:
|
||
state_mot7[int(row["État"])] = row["Mot (a0..a6)"]
|
||
return res_to_state, state_mot7
|
||
|
||
|
||
def rebuild_R17_after_full_D10(completion_m15_to_m16_md: str, candidats_D10_md: str) -> List[int]:
|
||
text = Path(completion_m15_to_m16_md).read_text(encoding="utf-8")
|
||
m = re.search(r"### Parents « both ».*?\n(.*)\Z", text, flags=re.S)
|
||
if not m:
|
||
raise ValueError("Section 'Parents both' introuvable")
|
||
B15 = sorted(set(map(int, re.findall(r"\b\d+\b", m.group(1)))))
|
||
|
||
shift15 = 1 << 15
|
||
shift16 = 1 << 16
|
||
R16 = set(B15) | {p + shift15 for p in B15}
|
||
R17 = set(R16) | {x + shift16 for x in R16}
|
||
|
||
rows = parse_markdown_table_to_rows(candidats_D10_md)
|
||
cover175: Set[int] = set()
|
||
for parts in rows[2:]:
|
||
low = int(parts[0])
|
||
high = int(parts[1])
|
||
cover175.add(low)
|
||
cover175.add(high)
|
||
R17_after_175 = R17 - cover175
|
||
|
||
A10_16_high = [x for x in R17_after_175 if A_k(x, 10) == 16]
|
||
cover171: Set[int] = set()
|
||
for x in A10_16_high:
|
||
cover171.add(x)
|
||
cover171.add(x - shift16)
|
||
|
||
return sorted(R17_after_175 - cover171)
|
||
|
||
|
||
def lift_set(residues: Iterable[int], shift: int, count: int) -> List[int]:
|
||
out: List[int] = []
|
||
for r in residues:
|
||
for j in range(count):
|
||
out.append(r + j * shift)
|
||
return out
|
||
|
||
|
||
def csv_to_md_list(csv_path: str, md_path: str, title: str, intro: str) -> None:
|
||
p_csv = Path(csv_path)
|
||
p_md = Path(md_path)
|
||
with p_csv.open("r", encoding="utf-8") as fin, p_md.open("w", encoding="utf-8") as fout:
|
||
fout.write(f"# {title}\n\n")
|
||
fout.write("## Introduction\n\n")
|
||
fout.write(intro.strip() + "\n\n")
|
||
fout.write("## Liste exhaustive\n\n")
|
||
fout.write("```csv\n")
|
||
last = ""
|
||
for line in fin:
|
||
fout.write(line)
|
||
last = line
|
||
if last and not last.endswith("\n"):
|
||
fout.write("\n")
|
||
fout.write("```\n")
|
||
|
||
|
||
def run_after_fusion_D16_D17(
|
||
audit60_json: str,
|
||
completion_m15_to_m16_md: str,
|
||
candidats_D10_md: str,
|
||
out_dir: str,
|
||
) -> None:
|
||
Path(out_dir).mkdir(parents=True, exist_ok=True)
|
||
res_to_state, state_mot7 = load_state_map_60(audit60_json)
|
||
|
||
# R17 après D10 complet
|
||
R17_after_full = rebuild_R17_after_full_D10(completion_m15_to_m16_md, candidats_D10_md)
|
||
|
||
# D11 (2^19)
|
||
shift17 = 1 << 17
|
||
shift18 = 1 << 18
|
||
R19 = lift_set(R17_after_full, shift17, 4)
|
||
cand11 = set([n for n in R19 if A_k(n, 11) == 18])
|
||
cover11 = cand11 | {n ^ shift18 for n in cand11}
|
||
R19_after = [n for n in R19 if n not in cover11]
|
||
|
||
# D12 (2^21)
|
||
shift19 = 1 << 19
|
||
shift20 = 1 << 20
|
||
R21 = lift_set(R19_after, shift19, 4)
|
||
cand12 = set([n for n in R21 if A_k(n, 12) == 20])
|
||
cover12 = cand12 | {n ^ shift20 for n in cand12}
|
||
R21_after = [n for n in R21 if n not in cover12]
|
||
|
||
# D13 (2^22)
|
||
shift21 = 1 << 21
|
||
R22 = list(R21_after) + [n + shift21 for n in R21_after]
|
||
cand13 = set([n for n in R22 if A_k(n, 13) == 21])
|
||
cover13 = cand13 | {n ^ shift21 for n in cand13}
|
||
R22_after = [n for n in R22 if n not in cover13]
|
||
|
||
# D14 (2^24)
|
||
shift22 = 1 << 22
|
||
shift23 = 1 << 23
|
||
R24 = lift_set(R22_after, shift22, 4)
|
||
cand14 = set([n for n in R24 if A_k(n, 14) == 23])
|
||
cover14 = cand14 | {n ^ shift23 for n in cand14}
|
||
R24_after = [n for n in R24 if n not in cover14]
|
||
|
||
# D15 (2^25)
|
||
shift24 = 1 << 24
|
||
R25 = list(R24_after) + [n + shift24 for n in R24_after]
|
||
cand15 = set([n for n in R25 if A_k(n, 15) == 24])
|
||
cover15 = cand15 | {n ^ shift24 for n in cand15}
|
||
R25_after = sorted([n for n in R25 if n not in cover15])
|
||
|
||
# Fusion (t=11,12,14) au palier 2^25
|
||
md_f11 = str(Path(out_dir) / "fusion_t11_palier2p25.md")
|
||
csv_f11 = str(Path(out_dir) / "fusion_t11_palier2p25.csv")
|
||
md_f12 = str(Path(out_dir) / "fusion_t12_palier2p25.md")
|
||
csv_f12 = str(Path(out_dir) / "fusion_t12_palier2p25.csv")
|
||
md_f14 = str(Path(out_dir) / "fusion_t14_palier2p25.md")
|
||
csv_f14 = str(Path(out_dir) / "fusion_t14_palier2p25.csv")
|
||
|
||
build_fusion_clauses(R25_after, 11, res_to_state, state_mot7, md_f11, csv_f11, 25)
|
||
build_fusion_clauses(R25_after, 12, res_to_state, state_mot7, md_f12, csv_f12, 25)
|
||
build_fusion_clauses(R25_after, 14, res_to_state, state_mot7, md_f14, csv_f14, 25)
|
||
|
||
def load_hitset(csv_path: str) -> Set[int]:
|
||
hs: Set[int] = set()
|
||
p = Path(csv_path)
|
||
if p.stat().st_size == 0:
|
||
return hs
|
||
with p.open("r", encoding="utf-8") as f:
|
||
r = csv.DictReader(f)
|
||
for row in r:
|
||
hs.add(int(row["classe_mod_2^m"]))
|
||
return hs
|
||
|
||
unionF = load_hitset(csv_f11) | load_hitset(csv_f12) | load_hitset(csv_f14)
|
||
R25_after_F = [n for n in R25_after if n not in unionF]
|
||
|
||
# D16 après fusion (2^27)
|
||
shift25 = 1 << 25
|
||
shift26 = 1 << 26
|
||
k16 = 16
|
||
A16_target = 26
|
||
|
||
cand_D16: Set[int] = set()
|
||
for r in R25_after_F:
|
||
for j in range(4):
|
||
n = r + j * shift25
|
||
if A_k(n, k16) == A16_target:
|
||
cand_D16.add(n)
|
||
cover_D16 = cand_D16 | {n ^ shift26 for n in cand_D16}
|
||
|
||
delta16 = (1 << 26) - (3**16)
|
||
N0_dist = Counter()
|
||
csv_d16 = str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27.csv")
|
||
with Path(csv_d16).open("w", newline="", encoding="utf-8") as f:
|
||
w = csv.writer(f)
|
||
w.writerow(["classe_mod_2^27", "sœur", "mot_a0..a15", "A16", "C16", "delta", "N0", "U^16(n)", "etat_id", "base_mod_4096"])
|
||
for n in sorted(cand_D16):
|
||
pref = prefix_data(n, 16)
|
||
N0 = N0_D(pref.C, pref.A, 16)
|
||
N0_dist[N0] += 1
|
||
w.writerow([n, n ^ shift26, " ".join(map(str, pref.word)), pref.A, pref.C, delta16, N0, pref.y, res_to_state[n % 4096], n % 4096])
|
||
|
||
# audit D16 (minimal)
|
||
maxA16_after = 0
|
||
for r in R25_after_F:
|
||
for j in range(4):
|
||
n = r + j * shift25
|
||
if n in cover_D16:
|
||
continue
|
||
A = A_k(n, k16)
|
||
if A > maxA16_after:
|
||
maxA16_after = A
|
||
|
||
md_d16 = str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27_et_impact.md")
|
||
write_text(
|
||
md_d16,
|
||
"\n".join(
|
||
[
|
||
"# Paquet D16 minimal après fusion (palier 2^27)",
|
||
"",
|
||
"## Introduction",
|
||
"",
|
||
"Audit D16 sur le noyau au palier 2^25 après fusion F(11)∪F(12)∪F(14).",
|
||
"",
|
||
"## Tailles",
|
||
"",
|
||
f"- noyau après D15 : {len(R25_after)}",
|
||
f"- noyau après fusion : {len(R25_after_F)}",
|
||
f"- relèvements 2^27 : {4 * len(R25_after_F)}",
|
||
f"- candidats D16 : {len(cand_D16)}",
|
||
f"- couverture (avec sœurs) : {len(cover_D16)}",
|
||
f"- invariant max A16 après : {maxA16_after}",
|
||
"",
|
||
"## CSV exhaustif",
|
||
"",
|
||
f"- {Path(csv_d16).name}",
|
||
"",
|
||
]
|
||
)
|
||
+ "\n",
|
||
)
|
||
|
||
csv_to_md_list(
|
||
csv_d16,
|
||
str(Path(out_dir) / "candidats_D16_apres_fusion_palier2p27_liste_exhaustive.md"),
|
||
"Liste exhaustive des clauses D16 après fusion (palier 2^27)",
|
||
"Liste exhaustive (format CSV copiable).",
|
||
)
|
||
|
||
# D17 après fusion et D16 (2^28)
|
||
shift27 = 1 << 27
|
||
k17 = 17
|
||
A17_target = 27
|
||
|
||
pair_low_set: Set[int] = set()
|
||
for r in R25_after_F:
|
||
for j in range(4):
|
||
low = r + j * shift25
|
||
if low in cover_D16:
|
||
continue
|
||
if A_k(low, k17) == A17_target or A_k(low + shift27, k17) == A17_target:
|
||
pair_low_set.add(low)
|
||
|
||
delta17 = (1 << 27) - (3**17)
|
||
csv_d17 = str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28.csv")
|
||
with Path(csv_d17).open("w", newline="", encoding="utf-8") as f:
|
||
w = csv.writer(f)
|
||
w.writerow(["classe_mod_2^28", "sœur", "côté", "mot_a0..a16", "A17", "C17", "delta", "N0", "U^17(n)", "etat_id", "base_mod_4096"])
|
||
for low in sorted(pair_low_set):
|
||
high = low + shift27
|
||
rep = low if A_k(low, k17) == A17_target else high
|
||
side = "basse" if rep == low else "haute"
|
||
pref = prefix_data(rep, 17)
|
||
N0 = N0_D(pref.C, pref.A, 17)
|
||
w.writerow([rep, rep ^ shift27, side, " ".join(map(str, pref.word)), pref.A, pref.C, delta17, N0, pref.y, res_to_state[rep % 4096], rep % 4096])
|
||
|
||
md_d17 = str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28_et_impact.md")
|
||
write_text(
|
||
md_d17,
|
||
"\n".join(
|
||
[
|
||
"# Paquet D17 minimal après fusion (palier 2^28)",
|
||
"",
|
||
"## Introduction",
|
||
"",
|
||
"Audit D17 sur le domaine résiduel après fusion et après D16.",
|
||
"",
|
||
"## Tailles",
|
||
"",
|
||
f"- paires candidates D17 : {len(pair_low_set)}",
|
||
"",
|
||
"## CSV exhaustif",
|
||
"",
|
||
f"- {Path(csv_d17).name}",
|
||
"",
|
||
]
|
||
)
|
||
+ "\n",
|
||
)
|
||
|
||
csv_to_md_list(
|
||
csv_d17,
|
||
str(Path(out_dir) / "candidats_D17_apres_fusion_palier2p28_liste_exhaustive.md"),
|
||
"Liste exhaustive des clauses D17 après fusion (palier 2^28)",
|
||
"Liste exhaustive (format CSV copiable).",
|
||
)
|
||
|
||
|
||
def main() -> None:
|
||
import argparse
|
||
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--audit60", required=True)
|
||
ap.add_argument("--m15m16", required=True)
|
||
ap.add_argument("--d10", required=True)
|
||
ap.add_argument("--out", required=True)
|
||
args = ap.parse_args()
|
||
run_after_fusion_D16_D17(args.audit60, args.m15m16, args.d10, args.out)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|