**Motivations:** - Conserver l'état des scripts Collatz k, pipelines et démonstration - Documenter diagnostic D18/D21, errata, plan de preuve et correctif OOM paliers **Root causes:** - Consommation mémoire excessive (OOM) sur script paliers finale f16 **Correctifs:** - Documentation du crash OOM paliers finale f16 et pistes de correction **Evolutions:** - Évolutions des pipelines fusion/k, recover/update noyau, script 08-paliers-finale - Ajout de docs (diagnostic, errata, plan lemmes, fixKnowledge OOM) **Pages affectées:** - applications/collatz/collatz_k_scripts/*.py, note.md, requirements.txt - applications/collatz/collatz_k_scripts/*.md (diagnostic, errata, plan) - applications/collatz/scripts/08-paliers-finale.sh, README.md - docs/fixKnowledge/crash_paliers_finale_f16_oom.md
171 lines
6.4 KiB
Python
171 lines
6.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
collatz_update_noyau.py
|
|
|
|
Met à jour le noyau en soustrayant les classes couvertes par un certificat de fusion.
|
|
Charge le noyau précédent, charge le certificat (classes couvertes),
|
|
soustrait et écrit le nouveau noyau.
|
|
|
|
CLI: --fusion CERT_JSON --previous NOYAU_JSON --output OUTPUT_JSON
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
from pathlib import Path
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import re
|
|
|
|
|
|
def load_noyau(path: str) -> list[int]:
|
|
"""Load noyau from JSON: list of residues or dict with R*_after / noyau / residues."""
|
|
data = json.loads(Path(path).read_text(encoding="utf-8"))
|
|
if isinstance(data, list):
|
|
return [int(x) for x in data]
|
|
if isinstance(data, dict):
|
|
for key in ("R25_after", "R24_after", "noyau", "residues", "uncovered"):
|
|
if key in data and isinstance(data[key], list):
|
|
return [int(x) for x in data[key]]
|
|
raise ValueError(f"Noyau JSON: no known key in {list(data.keys())}")
|
|
raise ValueError("Noyau JSON must be a list or dict with residue list")
|
|
|
|
|
|
def load_covered_classes(path: str) -> set[int]:
|
|
"""
|
|
Load covered classes from fusion certificate.
|
|
Supports: JSON with 'covered', 'covered_classes', or list; CSV with classe_mod_2^m column.
|
|
"""
|
|
p = Path(path)
|
|
suffix = p.suffix.lower()
|
|
|
|
if suffix == ".json":
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
if isinstance(data, list):
|
|
return {int(x) for x in data}
|
|
if isinstance(data, dict):
|
|
for key in ("covered", "covered_classes", "classe_mod_2^m"):
|
|
if key in data:
|
|
val = data[key]
|
|
if isinstance(val, list):
|
|
return {int(x) for x in val}
|
|
# Try top-level keys like "11", "12" for per-horizon data
|
|
covered: set[int] = set()
|
|
for v in data.values():
|
|
if isinstance(v, dict) and "covered" in v and isinstance(v["covered"], list):
|
|
covered.update(int(x) for x in v["covered"])
|
|
elif isinstance(v, list):
|
|
covered.update(int(x) for x in v if isinstance(x, (int, float)))
|
|
if covered:
|
|
return covered
|
|
raise ValueError(f"Fusion cert JSON: no covered classes found in {list(data.keys()) if isinstance(data, dict) else 'list'}")
|
|
|
|
if suffix == ".csv":
|
|
covered: set[int] = set()
|
|
with p.open("r", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
col = "classe_mod_2^m"
|
|
for row in reader:
|
|
if col in row:
|
|
covered.add(int(row[col]))
|
|
return covered
|
|
|
|
raise ValueError(f"Fusion cert must be .json or .csv, got {suffix}")
|
|
|
|
|
|
def _get_palier(path: str) -> int | None:
|
|
"""Extract palier from noyau JSON if present (full read; use _get_palier_from_tail for large files)."""
|
|
data = json.loads(Path(path).read_text(encoding="utf-8"))
|
|
if isinstance(data, dict) and "palier" in data:
|
|
return int(data["palier"])
|
|
return None
|
|
|
|
|
|
def _get_palier_from_tail(path: str) -> int | None:
|
|
"""Extract palier from end of noyau JSON file without loading full content. Expects ...\"palier\": N}."""
|
|
p = Path(path)
|
|
if not p.exists():
|
|
return None
|
|
with p.open("rb") as f:
|
|
f.seek(max(0, p.stat().st_size - 128))
|
|
tail = f.read().decode("utf-8", errors="ignore")
|
|
m = re.search(r'"palier"\s*:\s*(\d+)', tail)
|
|
return int(m.group(1)) if m else None
|
|
|
|
|
|
def _stream_update_noyau(previous_noyau: str, covered: set[int], output_path: Path, palier: int | None) -> int:
|
|
"""Stream-parse previous noyau, write residues not in covered to output. Returns count written. Use when noyau file is very large."""
|
|
import ijson
|
|
p = Path(previous_noyau)
|
|
if not p.exists():
|
|
raise FileNotFoundError(previous_noyau)
|
|
out_path = Path(output_path)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
count = 0
|
|
with p.open("rb") as f_in:
|
|
with out_path.open("w", encoding="utf-8") as f_out:
|
|
f_out.write('{"noyau": [')
|
|
first = True
|
|
for x in ijson.items(f_in, "noyau.item"):
|
|
r = int(x)
|
|
if r in covered:
|
|
continue
|
|
if not first:
|
|
f_out.write(",")
|
|
f_out.write(str(r))
|
|
first = False
|
|
count += 1
|
|
suffix = f'], "palier": {palier}}}' if palier is not None else "]}"
|
|
f_out.write(suffix)
|
|
return count
|
|
|
|
|
|
def run_update_noyau(fusion_cert: str, previous_noyau: str, output: str) -> None:
|
|
p_prev = Path(previous_noyau)
|
|
size_mb = p_prev.stat().st_size / (1024 * 1024) if p_prev.exists() else 0
|
|
covered = load_covered_classes(fusion_cert)
|
|
|
|
if size_mb > 500:
|
|
palier = _get_palier_from_tail(previous_noyau)
|
|
count = _stream_update_noyau(previous_noyau, covered, Path(output), palier)
|
|
print(f"Stream update: covered {len(covered)}, new noyau {count} residues (previous file {size_mb:.0f} MB)", flush=True)
|
|
print(f"Wrote: {output}", flush=True)
|
|
return
|
|
|
|
noyau = set(load_noyau(previous_noyau))
|
|
palier = _get_palier(previous_noyau)
|
|
new_noyau = sorted(noyau - covered)
|
|
|
|
out_path = Path(output)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
if palier is not None:
|
|
with out_path.open("w", encoding="utf-8") as f:
|
|
f.write('{"noyau": [')
|
|
for i, r in enumerate(new_noyau):
|
|
if i > 0:
|
|
f.write(",")
|
|
f.write(str(r))
|
|
f.write(f'], "palier": {palier}}}')
|
|
else:
|
|
out_path.write_text(json.dumps(new_noyau), encoding="utf-8")
|
|
|
|
print(f"Previous noyau: {len(noyau)}, covered: {len(covered)}, new noyau: {len(new_noyau)}")
|
|
print(f"Wrote: {out_path}")
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description="Update noyau by subtracting fusion-covered classes")
|
|
ap.add_argument("--fusion", required=True, help="Path to fusion certificate (JSON or CSV with covered classes)")
|
|
ap.add_argument("--previous", required=True, help="Path to previous noyau JSON")
|
|
ap.add_argument("--output", required=True, help="Path to output new noyau JSON")
|
|
args = ap.parse_args()
|
|
|
|
run_update_noyau(
|
|
fusion_cert=args.fusion,
|
|
previous_noyau=args.previous,
|
|
output=args.output,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|