**Motivations:** - Implémenter le workflow complet de démonstration Collatz (commandes.md) - Permettre la reprise après interruption au palier D20 **Evolutions:** - Scripts 01-12 et run-full-workflow alignés sur commandes.md sections 1-10 - collatz_recover_noyau.py : recréation de noyau_post_D20 à partir du CSV candidats - Option --resume-from D20 dans collatz_k_pipeline pour reprendre sans recalculer D18-D19-F15 - Détection automatique : si candidats_D20 existe sans noyau_post_D20, récupération puis poursuite - Filtres --cible=critique et --modulo dans collatz_fusion_pipeline - ROOT par défaut = collatz_k_scripts (plus data/source vide) **Pages affectées:** - .gitignore (__pycache__, out/) - applications/collatz/collatz_k_scripts/*.py - applications/collatz/scripts/*.sh - applications/collatz/scripts/README.md
107 lines
3.3 KiB
Python
107 lines
3.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
collatz_scission.py
|
|
|
|
Read CSV from collatz pipeline, extract covered classes (classe_mod_2^m and sœur),
|
|
output JSON certificate with clauses, covered set, and residual kernel info.
|
|
|
|
Usage: --input CSV_PATH --output JSON_PATH
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
|
|
|
|
def _find_column(row: dict, *candidates: str) -> str | None:
|
|
"""Return first matching column name from row keys."""
|
|
keys = set(row.keys())
|
|
for c in candidates:
|
|
for k in keys:
|
|
if c in k or k.replace(" ", "").lower() == c.replace(" ", "").lower():
|
|
return k
|
|
return None
|
|
|
|
|
|
def infer_palier(rows: list[dict], classe_col: str | None) -> int:
|
|
"""Infer modulus power m from column name or max value."""
|
|
if classe_col and ("2^" in classe_col or "2^" in str(classe_col)):
|
|
m = re.search(r"2\^(\d+)", classe_col)
|
|
if m:
|
|
return int(m.group(1))
|
|
if rows and classe_col:
|
|
try:
|
|
vals = [int(r.get(classe_col, 0) or 0) for r in rows if r.get(classe_col)]
|
|
if vals:
|
|
mx = max(vals)
|
|
m = 0
|
|
while (1 << m) <= mx:
|
|
m += 1
|
|
return m
|
|
except (ValueError, TypeError):
|
|
pass
|
|
return 0
|
|
|
|
|
|
def run_scission(csv_path: str, out_json_path: str) -> None:
|
|
"""Read CSV, extract clauses and covered set, write JSON certificate."""
|
|
rows: list[dict] = []
|
|
with Path(csv_path).open("r", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
rows.append(dict(row))
|
|
|
|
if not rows:
|
|
cert = {"clauses": [], "covered": [], "palier": 0}
|
|
Path(out_json_path).write_text(json.dumps(cert, indent=2), encoding="utf-8")
|
|
print(f"Wrote {out_json_path} (empty)")
|
|
return
|
|
|
|
classe_col = _find_column(rows[0], "classe_mod_2^m", "classe_mod_2^27", "classe_mod_2^28", "classe_mod_2")
|
|
soeur_col = _find_column(rows[0], "sœur", "soeur")
|
|
|
|
clauses: list[int] = []
|
|
covered: set[int] = set()
|
|
|
|
for r in rows:
|
|
if classe_col:
|
|
try:
|
|
c = int(r.get(classe_col, 0) or 0)
|
|
clauses.append(c)
|
|
covered.add(c)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
if soeur_col:
|
|
try:
|
|
s = int(r.get(soeur_col, 0) or 0)
|
|
covered.add(s)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
clauses = sorted(set(clauses))
|
|
covered_list = sorted(covered)
|
|
palier = infer_palier(rows, classe_col)
|
|
|
|
cert = {
|
|
"clauses": clauses,
|
|
"covered": covered_list,
|
|
"palier": palier,
|
|
}
|
|
Path(out_json_path).write_text(json.dumps(cert, indent=2), encoding="utf-8")
|
|
print(f"Wrote {out_json_path}: {len(clauses)} clauses, {len(covered_list)} covered, palier 2^{palier}")
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description="Extract scission certificate from Collatz CSV")
|
|
ap.add_argument("--input", "-i", required=True, help="Input CSV path")
|
|
ap.add_argument("--output", "-o", required=True, help="Output JSON path")
|
|
args = ap.parse_args()
|
|
run_scission(args.input, args.output)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|