Fix scission palier inference and create output dirs

**Motivations:**
- Make certificates reproducible when CSV columns do not encode the palier
- Avoid FileNotFoundError when writing certificates into new folders
- Reuse scission in the local H6 generator to avoid duplicated certificate logic

**Root causes:**
- palier inference relied on max residue value when the class column was generic
- scission assumed output directories already exist
- empty CSV fields were coerced to 0

**Correctifs:**
- Infer palier from explicit columns (palier/m) or filename, keep heuristic fallback
- Create parent directory for output JSON
- Skip empty class/sister values instead of adding residue 0

**Evolutions:**
- Use collatz_scission for certificate generation in local H6 artefacts generator

**Pages affectées:**
- applications/collatz/collatz_k_scripts/collatz_scission.py
- applications/collatz/collatz_k_scripts/collatz_generate_local_h6_artefacts.py
- docs/fixKnowledge/collatz_scission_palier_inference_and_output_dirs.md
This commit is contained in:
ncantu 2026-03-09 01:18:36 +01:00
parent cb7197fa43
commit 6d64ca1a50
3 changed files with 138 additions and 38 deletions

View File

@ -38,6 +38,7 @@ from pathlib import Path
from collatz_k_core import A_k from collatz_k_core import A_k
from collatz_k_fusion import build_fusion_clauses from collatz_k_fusion import build_fusion_clauses
from collatz_k_pipeline import load_state_map_60 from collatz_k_pipeline import load_state_map_60
from collatz_scission import run_scission
@dataclass(frozen=True) @dataclass(frozen=True)
@ -58,14 +59,22 @@ class LocalH6StateResult:
artefacts_dir: str artefacts_dir: str
def _write_certificate_json(path: Path, clauses: set[int], covered: set[int], palier: int) -> None: def _read_certificate_json(cert_path: Path) -> tuple[set[int], set[int], int]:
path.parent.mkdir(parents=True, exist_ok=True) obj = json.loads(cert_path.read_text(encoding="utf-8"))
obj = { if not isinstance(obj, dict):
"clauses": sorted(clauses), raise ValueError(f"Invalid certificate JSON (expected object): {cert_path}")
"covered": sorted(covered), clauses_raw = obj.get("clauses", [])
"palier": palier, covered_raw = obj.get("covered", [])
} palier_raw = obj.get("palier", 0)
path.write_text(json.dumps(obj, indent=2), encoding="utf-8")
if not isinstance(clauses_raw, list) or not all(isinstance(x, int) for x in clauses_raw):
raise ValueError(f"Invalid certificate JSON clauses list: {cert_path}")
if not isinstance(covered_raw, list) or not all(isinstance(x, int) for x in covered_raw):
raise ValueError(f"Invalid certificate JSON covered list: {cert_path}")
if not isinstance(palier_raw, int):
raise ValueError(f"Invalid certificate JSON palier: {cert_path}")
return set(clauses_raw), set(covered_raw), palier_raw
def _invert_residue_to_state(res_to_state: dict[int, int]) -> dict[int, list[int]]: def _invert_residue_to_state(res_to_state: dict[int, int]) -> dict[int, list[int]]:
@ -110,13 +119,8 @@ def _write_candidates_d8_csv(path: Path, residues: list[int], palier: int) -> li
def _covered_from_cert(cert_path: Path) -> set[int]: def _covered_from_cert(cert_path: Path) -> set[int]:
obj = json.loads(cert_path.read_text(encoding="utf-8")) _, covered, _ = _read_certificate_json(cert_path)
if not isinstance(obj, dict) or "covered" not in obj: return covered
raise ValueError(f"Invalid certificate JSON: {cert_path}")
covered = obj["covered"]
if not isinstance(covered, list) or not all(isinstance(x, int) for x in covered):
raise ValueError(f"Invalid certificate JSON covered list: {cert_path}")
return set(covered)
def _compute_fusion_rows_until_covered( def _compute_fusion_rows_until_covered(
@ -293,9 +297,12 @@ def generate_for_state(
d8_covered_set: set[int] = set() d8_covered_set: set[int] = set()
cert_paths: list[Path] = [] cert_paths: list[Path] = []
if cand_d8: if cand_d8:
shift = 1 << (palier - 1) run_scission(str(candidats_d8_path), str(certificat_d8_path))
d8_covered_set = set(cand_d8) | {n ^ shift for n in cand_d8} _, d8_covered_set, cert_palier = _read_certificate_json(certificat_d8_path)
_write_certificate_json(certificat_d8_path, set(cand_d8), d8_covered_set, palier) if cert_palier != palier:
raise ValueError(
f"Unexpected D8 certificate palier for E{state_id}: got 2^{cert_palier}, expected 2^{palier}"
)
cert_paths.append(certificat_d8_path) cert_paths.append(certificat_d8_path)
uncovered_after_d8 = sorted(set(L) - d8_covered_set) uncovered_after_d8 = sorted(set(L) - d8_covered_set)
@ -322,16 +329,15 @@ def generate_for_state(
fusion_covered_set: set[int] = set() fusion_covered_set: set[int] = set()
fusion_clauses = 0 fusion_clauses = 0
fusion_covered = 0 fusion_covered = 0
fusion_hit_classes = { if fusion_rows_list and fusion_cert_path is not None and fusion_csv_path is not None:
int(row["classe_mod_2^m"]) run_scission(str(fusion_csv_path), str(fusion_cert_path))
for row in fusion_rows_list fusion_clauses_set, fusion_covered_set, cert_palier = _read_certificate_json(fusion_cert_path)
if "classe_mod_2^m" in row and str(row["classe_mod_2^m"]).strip() if cert_palier != palier:
} raise ValueError(
if fusion_hit_classes and fusion_cert_path is not None: f"Unexpected fusion certificate palier for E{state_id}: got 2^{cert_palier}, expected 2^{palier}"
fusion_covered_set = set(fusion_hit_classes) )
_write_certificate_json(fusion_cert_path, set(fusion_hit_classes), fusion_covered_set, palier) fusion_clauses = len(fusion_clauses_set)
fusion_clauses = len(fusion_hit_classes) fusion_covered = len(fusion_covered_set)
fusion_covered = len(fusion_hit_classes)
cert_paths.append(fusion_cert_path) cert_paths.append(fusion_cert_path)
union_covered = d8_covered_set | fusion_covered_set union_covered = d8_covered_set | fusion_covered_set

View File

@ -26,12 +26,54 @@ def _find_column(row: dict, *candidates: str) -> str | None:
return None return None
def infer_palier(rows: list[dict], classe_col: str | None) -> int: def _try_parse_int(value: object) -> int | None:
"""Infer modulus power m from column name or max value.""" if value is None:
return None
if isinstance(value, int):
return value
if isinstance(value, str):
s = value.strip()
if not s:
return None
try:
return int(s)
except ValueError:
return None
return None
def infer_palier(rows: list[dict], classe_col: str | None, csv_path: Path | None = None) -> int:
"""
Infer modulus power m.
Priority order:
- explicit numeric column 'palier' (or 'm' used as exponent in some CSVs)
- class column name containing '2^<m>' (e.g. 'classe_mod_2^27')
- filename containing 'palier2p<m>'
- fallback heuristic from max class value (legacy; not reliable when values are sparse)
"""
if rows:
pal_col = _find_column(rows[0], "palier")
if pal_col:
v = _try_parse_int(rows[0].get(pal_col))
if v is not None and v > 0:
return v
m_col = _find_column(rows[0], "m", "modulus_power")
if m_col:
v = _try_parse_int(rows[0].get(m_col))
if v is not None and v > 0:
return v
if classe_col and ("2^" in classe_col or "2^" in str(classe_col)): if classe_col and ("2^" in classe_col or "2^" in str(classe_col)):
m = re.search(r"2\^(\d+)", classe_col) m = re.search(r"2\^(\d+)", classe_col)
if m: if m:
return int(m.group(1)) return int(m.group(1))
if csv_path is not None:
m2 = re.search(r"palier2p(\d+)", str(csv_path))
if m2:
return int(m2.group(1))
if rows and classe_col: if rows and classe_col:
try: try:
vals = [int(r.get(classe_col, 0) or 0) for r in rows if r.get(classe_col)] vals = [int(r.get(classe_col, 0) or 0) for r in rows if r.get(classe_col)]
@ -48,6 +90,9 @@ def infer_palier(rows: list[dict], classe_col: str | None) -> int:
def run_scission(csv_path: str, out_json_path: str) -> None: def run_scission(csv_path: str, out_json_path: str) -> None:
"""Read CSV, extract clauses and covered set, write JSON certificate.""" """Read CSV, extract clauses and covered set, write JSON certificate."""
out_path = Path(out_json_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
rows: list[dict] = [] rows: list[dict] = []
with Path(csv_path).open("r", encoding="utf-8") as f: with Path(csv_path).open("r", encoding="utf-8") as f:
reader = csv.DictReader(f) reader = csv.DictReader(f)
@ -56,7 +101,7 @@ def run_scission(csv_path: str, out_json_path: str) -> None:
if not rows: if not rows:
cert = {"clauses": [], "covered": [], "palier": 0} cert = {"clauses": [], "covered": [], "palier": 0}
Path(out_json_path).write_text(json.dumps(cert, indent=2), encoding="utf-8") out_path.write_text(json.dumps(cert, indent=2), encoding="utf-8")
print(f"Wrote {out_json_path} (empty)") print(f"Wrote {out_json_path} (empty)")
return return
@ -69,28 +114,32 @@ def run_scission(csv_path: str, out_json_path: str) -> None:
for r in rows: for r in rows:
if classe_col: if classe_col:
try: try:
c = int(r.get(classe_col, 0) or 0) raw = r.get(classe_col)
if raw is not None and str(raw).strip():
c = int(raw)
clauses.append(c) clauses.append(c)
covered.add(c) covered.add(c)
except (ValueError, TypeError): except (ValueError, TypeError):
pass pass
if soeur_col: if soeur_col:
try: try:
s = int(r.get(soeur_col, 0) or 0) raw = r.get(soeur_col)
if raw is not None and str(raw).strip():
s = int(raw)
covered.add(s) covered.add(s)
except (ValueError, TypeError): except (ValueError, TypeError):
pass pass
clauses = sorted(set(clauses)) clauses = sorted(set(clauses))
covered_list = sorted(covered) covered_list = sorted(covered)
palier = infer_palier(rows, classe_col) palier = infer_palier(rows, classe_col, csv_path=Path(csv_path))
cert = { cert = {
"clauses": clauses, "clauses": clauses,
"covered": covered_list, "covered": covered_list,
"palier": palier, "palier": palier,
} }
Path(out_json_path).write_text(json.dumps(cert, indent=2), encoding="utf-8") out_path.write_text(json.dumps(cert, indent=2), encoding="utf-8")
print(f"Wrote {out_json_path}: {len(clauses)} clauses, {len(covered_list)} covered, palier 2^{palier}") print(f"Wrote {out_json_path}: {len(clauses)} clauses, {len(covered_list)} covered, palier 2^{palier}")

View File

@ -0,0 +1,45 @@
# collatz_scission palier inference and output directory creation
## Problem
The helper script `applications/collatz/collatz_k_scripts/collatz_scission.py` can produce incorrect certificates in two cases:
- **Incorrect `palier` inference**: when the CSV class column is named generically (e.g. `classe_mod_2^m`) and the covered classes are sparse / small (e.g. values \(<2^8\) while the target modulus is \(2^{13}\)), `palier` was inferred from the maximum class value. This yields a wrong modulus power.
- **Missing output directories**: `run_scission()` writes `out_json_path` without creating parent directories, which can raise `FileNotFoundError` when callers pass a new path under a non-existing folder.
## Root cause
- `infer_palier()` only supported:
- parsing `2^m` from the class column name, or
- a fallback heuristic based on the maximum covered residue value.
This heuristic is not reliable when the class column name does not encode the modulus power.
- `run_scission()` assumed the output directory exists.
## Corrective actions
- **Prefer explicit palier columns**:
- If the CSV contains a numeric `palier` column, use it.
- If the CSV contains a numeric `m` / `modulus_power` column (used as exponent in some pipelines), use it.
- **Fallback from filename**: parse `palier2p<m>` from the CSV path when available.
- **Keep legacy fallback**: keep the max-value heuristic as a last resort.
- **Create output directories**: ensure `out_json_path.parent` exists before writing.
- **Do not add spurious residue 0**: skip empty strings instead of coercing to 0 when parsing the class / sister columns.
## Impact
- Certificates generated via `collatz_scission.py` now carry a `palier` that matches the CSVs intended modulus power when the CSV provides it (or when the filename encodes it).
- Callers can write certificates to new directories without pre-creating them.
## Analysis modalities
- For any certificate JSON, verify:
- `palier` matches the intended modulus power \(2^m\),
- `clauses` and `covered` sets do not contain a spurious `0`,
- directory creation does not fail when writing under a fresh path.
## Deployment
- No environment changes are required.
- The fix is local to:
- `applications/collatz/collatz_k_scripts/collatz_scission.py`