Nicolas Cantu 14ed1de36b Pipeline Collatz aligné sur commandes.md et reprise après interruption
**Motivations:**
- Implémenter le workflow complet de démonstration Collatz (commandes.md)
- Permettre la reprise après interruption au palier D20

**Evolutions:**
- Scripts 01-12 et run-full-workflow alignés sur commandes.md sections 1-10
- collatz_recover_noyau.py : recréation de noyau_post_D20 à partir du CSV candidats
- Option --resume-from D20 dans collatz_k_pipeline pour reprendre sans recalculer D18-D19-F15
- Détection automatique : si candidats_D20 existe sans noyau_post_D20, récupération puis poursuite
- Filtres --cible=critique et --modulo dans collatz_fusion_pipeline
- ROOT par défaut = collatz_k_scripts (plus data/source vide)

**Pages affectées:**
- .gitignore (__pycache__, out/)
- applications/collatz/collatz_k_scripts/*.py
- applications/collatz/scripts/*.sh
- applications/collatz/scripts/README.md
2026-03-02 02:49:23 +01:00

137 lines
4.4 KiB
Python

# -*- coding: utf-8 -*-
"""
collatz_audit.py
Audit des classes couvertes à partir d'un CSV de candidats.
Produit un rapport Markdown avec tailles, distributions et impact par état.
Usage: --input CSV_PATH --output MD_PATH [--audit60 JSON_PATH]
"""
from __future__ import annotations
import argparse
import csv
import json
from collections import Counter
from pathlib import Path
def _find_column(row: dict, *candidates: str) -> str | None:
"""Return first matching column name from row keys."""
keys = set(row.keys())
for c in candidates:
for k in keys:
if c in k or k.replace(" ", "").lower() == c.replace(" ", "").lower():
return k
return None
def load_state_table(audit60_path: str | None) -> dict[int, str]:
"""Load state_id -> mot_7 from audit60 JSON. Returns {} if not found."""
if not audit60_path or not Path(audit60_path).exists():
return {}
try:
data = json.loads(Path(audit60_path).read_text(encoding="utf-8"))
state_table = data.get("state_table", [])
mot_key = "Mot (a0..a6)"
etat_key = "État"
return {
int(row.get(etat_key, 0)): row.get(mot_key, "")
for row in state_table
if etat_key in row
}
except (json.JSONDecodeError, KeyError):
return {}
def run_audit(csv_path: str, out_md_path: str, audit60_path: str | None = None) -> None:
"""Read CSV, produce audit markdown."""
rows: list[dict] = []
with Path(csv_path).open("r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
rows.append(dict(row))
if not rows:
Path(out_md_path).write_text("# Audit (vide)\n\nAucune clause.\n", encoding="utf-8")
print(f"Wrote {out_md_path} (empty)")
return
classe_col = _find_column(rows[0], "classe_mod_2^m", "classe_mod_2^27", "classe_mod_2^28", "classe_mod_2")
soeur_col = _find_column(rows[0], "sœur", "soeur")
etat_col = _find_column(rows[0], "etat_id", "état_id")
clauses: set[int] = set()
covered: set[int] = set()
etat_counts: Counter[int] = Counter()
for r in rows:
if classe_col:
try:
c = int(r.get(classe_col, 0) or 0)
clauses.add(c)
covered.add(c)
except (ValueError, TypeError):
pass
if soeur_col:
try:
s = int(r.get(soeur_col, 0) or 0)
covered.add(s)
except (ValueError, TypeError):
pass
if etat_col:
try:
e = int(r.get(etat_col, 0) or 0)
etat_counts[e] += 1
except (ValueError, TypeError):
pass
n_clauses = len(clauses)
n_covered = len(covered)
name = Path(csv_path).stem
state_mot = load_state_table(audit60_path or str(Path(__file__).parent / "audit_60_etats_B12_mod4096_horizon7.json"))
lines = [
f"# Audit {name}",
"",
"## Introduction",
"",
f"Audit des clauses extraites de {Path(csv_path).name}.",
"",
"## Résultats globaux",
"",
f"- Nombre de clauses : {n_clauses}",
f"- Classes couvertes (clauses + sœurs) : {n_covered}",
f"- États distincts représentés : {len(etat_counts)}",
"",
]
if etat_counts:
lines.extend([
"## Distribution par état (60 états de base)",
"",
"| état_id | mot_7 | effectif |",
"|--------:|:------|--------:|",
])
for etat_id in sorted(etat_counts.keys(), key=lambda x: (-etat_counts[x], x)):
mot = state_mot.get(etat_id, "")
lines.append(f"| {etat_id:8} | {mot:20} | {etat_counts[etat_id]:8} |")
lines.append("")
Path(out_md_path).write_text("\n".join(lines), encoding="utf-8")
print(f"Wrote {out_md_path}: {n_clauses} clauses, {n_covered} covered")
def main() -> None:
ap = argparse.ArgumentParser(description="Audit Collatz CSV → Markdown")
ap.add_argument("--input", "-i", required=True, help="Input CSV path")
ap.add_argument("--output", "-o", required=True, help="Output Markdown path")
ap.add_argument("--audit60", help="Path to audit_60_etats JSON (optional)")
args = ap.parse_args()
run_audit(args.input, args.output, args.audit60)
if __name__ == "__main__":
main()