**Motivations:** - Implémenter le workflow complet de démonstration Collatz (commandes.md) - Permettre la reprise après interruption au palier D20 **Evolutions:** - Scripts 01-12 et run-full-workflow alignés sur commandes.md sections 1-10 - collatz_recover_noyau.py : recréation de noyau_post_D20 à partir du CSV candidats - Option --resume-from D20 dans collatz_k_pipeline pour reprendre sans recalculer D18-D19-F15 - Détection automatique : si candidats_D20 existe sans noyau_post_D20, récupération puis poursuite - Filtres --cible=critique et --modulo dans collatz_fusion_pipeline - ROOT par défaut = collatz_k_scripts (plus data/source vide) **Pages affectées:** - .gitignore (__pycache__, out/) - applications/collatz/collatz_k_scripts/*.py - applications/collatz/scripts/*.sh - applications/collatz/scripts/README.md
137 lines
4.4 KiB
Python
137 lines
4.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
collatz_audit.py
|
|
|
|
Audit des classes couvertes à partir d'un CSV de candidats.
|
|
Produit un rapport Markdown avec tailles, distributions et impact par état.
|
|
|
|
Usage: --input CSV_PATH --output MD_PATH [--audit60 JSON_PATH]
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import argparse
|
|
import csv
|
|
import json
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
|
|
|
|
def _find_column(row: dict, *candidates: str) -> str | None:
|
|
"""Return first matching column name from row keys."""
|
|
keys = set(row.keys())
|
|
for c in candidates:
|
|
for k in keys:
|
|
if c in k or k.replace(" ", "").lower() == c.replace(" ", "").lower():
|
|
return k
|
|
return None
|
|
|
|
|
|
def load_state_table(audit60_path: str | None) -> dict[int, str]:
|
|
"""Load state_id -> mot_7 from audit60 JSON. Returns {} if not found."""
|
|
if not audit60_path or not Path(audit60_path).exists():
|
|
return {}
|
|
try:
|
|
data = json.loads(Path(audit60_path).read_text(encoding="utf-8"))
|
|
state_table = data.get("state_table", [])
|
|
mot_key = "Mot (a0..a6)"
|
|
etat_key = "État"
|
|
return {
|
|
int(row.get(etat_key, 0)): row.get(mot_key, "")
|
|
for row in state_table
|
|
if etat_key in row
|
|
}
|
|
except (json.JSONDecodeError, KeyError):
|
|
return {}
|
|
|
|
|
|
def run_audit(csv_path: str, out_md_path: str, audit60_path: str | None = None) -> None:
|
|
"""Read CSV, produce audit markdown."""
|
|
rows: list[dict] = []
|
|
with Path(csv_path).open("r", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
rows.append(dict(row))
|
|
|
|
if not rows:
|
|
Path(out_md_path).write_text("# Audit (vide)\n\nAucune clause.\n", encoding="utf-8")
|
|
print(f"Wrote {out_md_path} (empty)")
|
|
return
|
|
|
|
classe_col = _find_column(rows[0], "classe_mod_2^m", "classe_mod_2^27", "classe_mod_2^28", "classe_mod_2")
|
|
soeur_col = _find_column(rows[0], "sœur", "soeur")
|
|
etat_col = _find_column(rows[0], "etat_id", "état_id")
|
|
|
|
clauses: set[int] = set()
|
|
covered: set[int] = set()
|
|
etat_counts: Counter[int] = Counter()
|
|
|
|
for r in rows:
|
|
if classe_col:
|
|
try:
|
|
c = int(r.get(classe_col, 0) or 0)
|
|
clauses.add(c)
|
|
covered.add(c)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
if soeur_col:
|
|
try:
|
|
s = int(r.get(soeur_col, 0) or 0)
|
|
covered.add(s)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
if etat_col:
|
|
try:
|
|
e = int(r.get(etat_col, 0) or 0)
|
|
etat_counts[e] += 1
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
n_clauses = len(clauses)
|
|
n_covered = len(covered)
|
|
name = Path(csv_path).stem
|
|
|
|
state_mot = load_state_table(audit60_path or str(Path(__file__).parent / "audit_60_etats_B12_mod4096_horizon7.json"))
|
|
|
|
lines = [
|
|
f"# Audit {name}",
|
|
"",
|
|
"## Introduction",
|
|
"",
|
|
f"Audit des clauses extraites de {Path(csv_path).name}.",
|
|
"",
|
|
"## Résultats globaux",
|
|
"",
|
|
f"- Nombre de clauses : {n_clauses}",
|
|
f"- Classes couvertes (clauses + sœurs) : {n_covered}",
|
|
f"- États distincts représentés : {len(etat_counts)}",
|
|
"",
|
|
]
|
|
|
|
if etat_counts:
|
|
lines.extend([
|
|
"## Distribution par état (60 états de base)",
|
|
"",
|
|
"| état_id | mot_7 | effectif |",
|
|
"|--------:|:------|--------:|",
|
|
])
|
|
for etat_id in sorted(etat_counts.keys(), key=lambda x: (-etat_counts[x], x)):
|
|
mot = state_mot.get(etat_id, "")
|
|
lines.append(f"| {etat_id:8} | {mot:20} | {etat_counts[etat_id]:8} |")
|
|
|
|
lines.append("")
|
|
Path(out_md_path).write_text("\n".join(lines), encoding="utf-8")
|
|
print(f"Wrote {out_md_path}: {n_clauses} clauses, {n_covered} covered")
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description="Audit Collatz CSV → Markdown")
|
|
ap.add_argument("--input", "-i", required=True, help="Input CSV path")
|
|
ap.add_argument("--output", "-o", required=True, help="Output Markdown path")
|
|
ap.add_argument("--audit60", help="Path to audit_60_etats JSON (optional)")
|
|
args = ap.parse_args()
|
|
run_audit(args.input, args.output, args.audit60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|