algo/applications/collatz/collatz_k_scripts/collatz_update_noyau.py
ncantu f05f2380ff Collatz: pipelines, scripts paliers, docs et fixKnowledge
**Motivations:**
- Conserver l'état des scripts Collatz k, pipelines et démonstration
- Documenter diagnostic D18/D21, errata, plan de preuve et correctif OOM paliers

**Root causes:**
- Consommation mémoire excessive (OOM) sur script paliers finale f16

**Correctifs:**
- Documentation du crash OOM paliers finale f16 et pistes de correction

**Evolutions:**
- Évolutions des pipelines fusion/k, recover/update noyau, script 08-paliers-finale
- Ajout de docs (diagnostic, errata, plan lemmes, fixKnowledge OOM)

**Pages affectées:**
- applications/collatz/collatz_k_scripts/*.py, note.md, requirements.txt
- applications/collatz/collatz_k_scripts/*.md (diagnostic, errata, plan)
- applications/collatz/scripts/08-paliers-finale.sh, README.md
- docs/fixKnowledge/crash_paliers_finale_f16_oom.md
2026-03-04 17:19:50 +01:00

171 lines
6.4 KiB
Python

# -*- coding: utf-8 -*-
"""
collatz_update_noyau.py
Met à jour le noyau en soustrayant les classes couvertes par un certificat de fusion.
Charge le noyau précédent, charge le certificat (classes couvertes),
soustrait et écrit le nouveau noyau.
CLI: --fusion CERT_JSON --previous NOYAU_JSON --output OUTPUT_JSON
"""
from __future__ import annotations
from pathlib import Path
import argparse
import csv
import json
import re
def load_noyau(path: str) -> list[int]:
"""Load noyau from JSON: list of residues or dict with R*_after / noyau / residues."""
data = json.loads(Path(path).read_text(encoding="utf-8"))
if isinstance(data, list):
return [int(x) for x in data]
if isinstance(data, dict):
for key in ("R25_after", "R24_after", "noyau", "residues", "uncovered"):
if key in data and isinstance(data[key], list):
return [int(x) for x in data[key]]
raise ValueError(f"Noyau JSON: no known key in {list(data.keys())}")
raise ValueError("Noyau JSON must be a list or dict with residue list")
def load_covered_classes(path: str) -> set[int]:
"""
Load covered classes from fusion certificate.
Supports: JSON with 'covered', 'covered_classes', or list; CSV with classe_mod_2^m column.
"""
p = Path(path)
suffix = p.suffix.lower()
if suffix == ".json":
data = json.loads(p.read_text(encoding="utf-8"))
if isinstance(data, list):
return {int(x) for x in data}
if isinstance(data, dict):
for key in ("covered", "covered_classes", "classe_mod_2^m"):
if key in data:
val = data[key]
if isinstance(val, list):
return {int(x) for x in val}
# Try top-level keys like "11", "12" for per-horizon data
covered: set[int] = set()
for v in data.values():
if isinstance(v, dict) and "covered" in v and isinstance(v["covered"], list):
covered.update(int(x) for x in v["covered"])
elif isinstance(v, list):
covered.update(int(x) for x in v if isinstance(x, (int, float)))
if covered:
return covered
raise ValueError(f"Fusion cert JSON: no covered classes found in {list(data.keys()) if isinstance(data, dict) else 'list'}")
if suffix == ".csv":
covered: set[int] = set()
with p.open("r", encoding="utf-8") as f:
reader = csv.DictReader(f)
col = "classe_mod_2^m"
for row in reader:
if col in row:
covered.add(int(row[col]))
return covered
raise ValueError(f"Fusion cert must be .json or .csv, got {suffix}")
def _get_palier(path: str) -> int | None:
"""Extract palier from noyau JSON if present (full read; use _get_palier_from_tail for large files)."""
data = json.loads(Path(path).read_text(encoding="utf-8"))
if isinstance(data, dict) and "palier" in data:
return int(data["palier"])
return None
def _get_palier_from_tail(path: str) -> int | None:
"""Extract palier from end of noyau JSON file without loading full content. Expects ...\"palier\": N}."""
p = Path(path)
if not p.exists():
return None
with p.open("rb") as f:
f.seek(max(0, p.stat().st_size - 128))
tail = f.read().decode("utf-8", errors="ignore")
m = re.search(r'"palier"\s*:\s*(\d+)', tail)
return int(m.group(1)) if m else None
def _stream_update_noyau(previous_noyau: str, covered: set[int], output_path: Path, palier: int | None) -> int:
"""Stream-parse previous noyau, write residues not in covered to output. Returns count written. Use when noyau file is very large."""
import ijson
p = Path(previous_noyau)
if not p.exists():
raise FileNotFoundError(previous_noyau)
out_path = Path(output_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
count = 0
with p.open("rb") as f_in:
with out_path.open("w", encoding="utf-8") as f_out:
f_out.write('{"noyau": [')
first = True
for x in ijson.items(f_in, "noyau.item"):
r = int(x)
if r in covered:
continue
if not first:
f_out.write(",")
f_out.write(str(r))
first = False
count += 1
suffix = f'], "palier": {palier}}}' if palier is not None else "]}"
f_out.write(suffix)
return count
def run_update_noyau(fusion_cert: str, previous_noyau: str, output: str) -> None:
p_prev = Path(previous_noyau)
size_mb = p_prev.stat().st_size / (1024 * 1024) if p_prev.exists() else 0
covered = load_covered_classes(fusion_cert)
if size_mb > 500:
palier = _get_palier_from_tail(previous_noyau)
count = _stream_update_noyau(previous_noyau, covered, Path(output), palier)
print(f"Stream update: covered {len(covered)}, new noyau {count} residues (previous file {size_mb:.0f} MB)", flush=True)
print(f"Wrote: {output}", flush=True)
return
noyau = set(load_noyau(previous_noyau))
palier = _get_palier(previous_noyau)
new_noyau = sorted(noyau - covered)
out_path = Path(output)
out_path.parent.mkdir(parents=True, exist_ok=True)
if palier is not None:
with out_path.open("w", encoding="utf-8") as f:
f.write('{"noyau": [')
for i, r in enumerate(new_noyau):
if i > 0:
f.write(",")
f.write(str(r))
f.write(f'], "palier": {palier}}}')
else:
out_path.write_text(json.dumps(new_noyau), encoding="utf-8")
print(f"Previous noyau: {len(noyau)}, covered: {len(covered)}, new noyau: {len(new_noyau)}")
print(f"Wrote: {out_path}")
def main() -> None:
ap = argparse.ArgumentParser(description="Update noyau by subtracting fusion-covered classes")
ap.add_argument("--fusion", required=True, help="Path to fusion certificate (JSON or CSV with covered classes)")
ap.add_argument("--previous", required=True, help="Path to previous noyau JSON")
ap.add_argument("--output", required=True, help="Path to output new noyau JSON")
args = ap.parse_args()
run_update_noyau(
fusion_cert=args.fusion,
previous_noyau=args.previous,
output=args.output,
)
if __name__ == "__main__":
main()