# -*- coding: utf-8 -*- """ collatz_recover_noyau.py Recover noyau from candidats CSV when run_single_palier was interrupted. Loads previous noyau, lifts to palier, subtracts covered from CSV, writes residual. Usage: --previous NOYAU_JSON --candidats CSV_PATH --palier M --output NOYAU_JSON """ from __future__ import annotations import argparse import csv import json from pathlib import Path def load_noyau(path: str) -> list[int]: """Load noyau from JSON.""" data = json.loads(Path(path).read_text(encoding="utf-8")) if isinstance(data, list): return [int(x) for x in data] if isinstance(data, dict): for key in ("noyau", "residues", "uncovered", "R25_after", "R24_after"): if key in data and isinstance(data[key], list): return [int(x) for x in data[key]] raise ValueError(f"No residue list in {path}") def _find_column(row: dict, *candidates: str) -> str | None: """Return first matching column name.""" keys = set(row.keys()) for c in candidates: for k in keys: if c in k or k.replace(" ", "").lower() == c.replace(" ", "").lower(): return k return None def load_covered_from_csv(csv_path: str, palier: int) -> set[int]: """Load covered set from candidats CSV (classe_mod_2^m and sœur columns).""" covered: set[int] = set() with Path(csv_path).open("r", encoding="utf-8") as f: reader = csv.DictReader(f) rows = list(reader) if not rows: return covered col_classe = _find_column(rows[0], f"classe_mod_2^{palier}", "classe_mod_2^m", "classe_mod_2") col_soeur = _find_column(rows[0], "sœur", "soeur") for row in rows: for col in (col_classe, col_soeur): if col and row.get(col): try: covered.add(int(row[col])) except ValueError: pass return covered def lift_residues(residues: list[int], from_palier: int, to_palier: int) -> list[int]: """Lift residues from 2^from_palier to 2^to_palier.""" prev_shift = 1 << from_palier lift_count = 1 << (to_palier - from_palier) lifted: list[int] = [] for r in residues: for j in range(lift_count): lifted.append(r + j * prev_shift) return lifted def infer_input_palier(noyau_path: str) -> int: """Infer palier from noyau JSON or max residue.""" data = json.loads(Path(noyau_path).read_text(encoding="utf-8")) if isinstance(data, dict) and "palier" in data: return int(data["palier"]) residues = load_noyau(noyau_path) max_r = max(residues) if residues else 0 return max_r.bit_length() if max_r else 0 def run_recover( previous_noyau: str, candidats_csv: str, palier: int, output: str, input_palier: int | None = None, ) -> None: """Recover noyau from interrupted run_single_palier.""" residues = load_noyau(previous_noyau) from_p = input_palier if input_palier is not None else infer_input_palier(previous_noyau) covered = load_covered_from_csv(candidats_csv, palier) lifted = lift_residues(residues, from_p, palier) residual = sorted(set(lifted) - covered) out_path = Path(output) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text( json.dumps({"noyau": residual, "palier": palier}, indent=2), encoding="utf-8", ) print(f"Recovered noyau: {len(residual)} residues (from {len(lifted)} lifted, {len(covered)} covered)") print(f"Wrote: {out_path}") def main() -> None: ap = argparse.ArgumentParser( description="Recover noyau from candidats CSV after interrupted run_single_palier" ) ap.add_argument("--previous", required=True, help="Previous noyau JSON (e.g. noyau_post_F15)") ap.add_argument("--candidats", required=True, help="Candidats CSV (e.g. candidats_D20_palier2p34.csv)") ap.add_argument("--palier", type=int, required=True, help="Palier m (2^m) of the candidats CSV") ap.add_argument("--output", required=True, help="Output noyau JSON path") ap.add_argument("--input-palier", type=int, help="Palier of previous noyau (infer from JSON if omitted)") args = ap.parse_args() run_recover( previous_noyau=args.previous, candidats_csv=args.candidats, palier=args.palier, output=args.output, input_palier=args.input_palier, ) if __name__ == "__main__": main()