# -*- coding: utf-8 -*- """ collatz_update_noyau.py Met à jour le noyau en soustrayant les classes couvertes par un certificat de fusion. Charge le noyau précédent, charge le certificat (classes couvertes), soustrait et écrit le nouveau noyau. CLI: --fusion CERT_JSON --previous NOYAU_JSON --output OUTPUT_JSON """ from __future__ import annotations from pathlib import Path import argparse import csv import json import re def load_noyau(path: str) -> list[int]: """Load noyau from JSON: list of residues or dict with R*_after / noyau / residues.""" data = json.loads(Path(path).read_text(encoding="utf-8")) if isinstance(data, list): return [int(x) for x in data] if isinstance(data, dict): for key in ("R25_after", "R24_after", "noyau", "residues", "uncovered"): if key in data and isinstance(data[key], list): return [int(x) for x in data[key]] raise ValueError(f"Noyau JSON: no known key in {list(data.keys())}") raise ValueError("Noyau JSON must be a list or dict with residue list") def load_covered_classes(path: str) -> set[int]: """ Load covered classes from fusion certificate. Supports: JSON with 'covered', 'covered_classes', or list; CSV with classe_mod_2^m column. """ p = Path(path) suffix = p.suffix.lower() if suffix == ".json": data = json.loads(p.read_text(encoding="utf-8")) if isinstance(data, list): return {int(x) for x in data} if isinstance(data, dict): for key in ("covered", "covered_classes", "classe_mod_2^m"): if key in data: val = data[key] if isinstance(val, list): return {int(x) for x in val} # Try top-level keys like "11", "12" for per-horizon data covered: set[int] = set() for v in data.values(): if isinstance(v, dict) and "covered" in v and isinstance(v["covered"], list): covered.update(int(x) for x in v["covered"]) elif isinstance(v, list): covered.update(int(x) for x in v if isinstance(x, (int, float))) if covered: return covered raise ValueError(f"Fusion cert JSON: no covered classes found in {list(data.keys()) if isinstance(data, dict) else 'list'}") if suffix == ".csv": covered: set[int] = set() with p.open("r", encoding="utf-8") as f: reader = csv.DictReader(f) col = "classe_mod_2^m" for row in reader: if col in row: covered.add(int(row[col])) return covered raise ValueError(f"Fusion cert must be .json or .csv, got {suffix}") def _get_palier(path: str) -> int | None: """Extract palier from noyau JSON if present (full read; use _get_palier_from_tail for large files).""" data = json.loads(Path(path).read_text(encoding="utf-8")) if isinstance(data, dict) and "palier" in data: return int(data["palier"]) return None def _get_palier_from_tail(path: str) -> int | None: """Extract palier from end of noyau JSON file without loading full content. Expects ...\"palier\": N}.""" p = Path(path) if not p.exists(): return None with p.open("rb") as f: f.seek(max(0, p.stat().st_size - 128)) tail = f.read().decode("utf-8", errors="ignore") m = re.search(r'"palier"\s*:\s*(\d+)', tail) return int(m.group(1)) if m else None def _stream_update_noyau(previous_noyau: str, covered: set[int], output_path: Path, palier: int | None) -> int: """Stream-parse previous noyau, write residues not in covered to output. Returns count written. Use when noyau file is very large.""" import ijson p = Path(previous_noyau) if not p.exists(): raise FileNotFoundError(previous_noyau) out_path = Path(output_path) out_path.parent.mkdir(parents=True, exist_ok=True) count = 0 with p.open("rb") as f_in: with out_path.open("w", encoding="utf-8") as f_out: f_out.write('{"noyau": [') first = True for x in ijson.items(f_in, "noyau.item"): r = int(x) if r in covered: continue if not first: f_out.write(",") f_out.write(str(r)) first = False count += 1 suffix = f'], "palier": {palier}}}' if palier is not None else "]}" f_out.write(suffix) return count def run_update_noyau(fusion_cert: str, previous_noyau: str, output: str) -> None: p_prev = Path(previous_noyau) size_mb = p_prev.stat().st_size / (1024 * 1024) if p_prev.exists() else 0 covered = load_covered_classes(fusion_cert) if size_mb > 500: palier = _get_palier_from_tail(previous_noyau) count = _stream_update_noyau(previous_noyau, covered, Path(output), palier) print(f"Stream update: covered {len(covered)}, new noyau {count} residues (previous file {size_mb:.0f} MB)", flush=True) print(f"Wrote: {output}", flush=True) return noyau = set(load_noyau(previous_noyau)) palier = _get_palier(previous_noyau) new_noyau = sorted(noyau - covered) out_path = Path(output) out_path.parent.mkdir(parents=True, exist_ok=True) if palier is not None: with out_path.open("w", encoding="utf-8") as f: f.write('{"noyau": [') for i, r in enumerate(new_noyau): if i > 0: f.write(",") f.write(str(r)) f.write(f'], "palier": {palier}}}') else: out_path.write_text(json.dumps(new_noyau), encoding="utf-8") print(f"Previous noyau: {len(noyau)}, covered: {len(covered)}, new noyau: {len(new_noyau)}") print(f"Wrote: {out_path}") def main() -> None: ap = argparse.ArgumentParser(description="Update noyau by subtracting fusion-covered classes") ap.add_argument("--fusion", required=True, help="Path to fusion certificate (JSON or CSV with covered classes)") ap.add_argument("--previous", required=True, help="Path to previous noyau JSON") ap.add_argument("--output", required=True, help="Path to output new noyau JSON") args = ap.parse_args() run_update_noyau( fusion_cert=args.fusion, previous_noyau=args.previous, output=args.output, ) if __name__ == "__main__": main()