#!/usr/bin/env python3 import json import subprocess import time from http.server import HTTPServer, BaseHTTPRequestHandler from datetime import datetime import re import time as _time import hashlib import hmac import urllib.parse import urllib.request def run_cmd(command: list[str], timeout_seconds: int = 5) -> tuple[int, str, str]: try: proc = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout_seconds, text=True, ) return proc.returncode, proc.stdout.strip(), proc.stderr.strip() except subprocess.TimeoutExpired: return 124, "", "timeout" except Exception as exc: return 1, "", str(exc) def get_container_inspect(container_name: str) -> dict: code, out, _ = run_cmd([ "docker", "inspect", container_name, "--format", "{{json .}}", ], timeout_seconds=4) if code != 0 or not out: return {} try: return json.loads(out) except Exception: return {} def compute_uptime(started_at: str) -> str: try: start = datetime.fromisoformat(started_at.replace("Z", "+00:00")) delta = datetime.now(start.tzinfo) - start total_seconds = int(delta.total_seconds()) days, rem = divmod(total_seconds, 86400) hours, rem = divmod(rem, 3600) minutes, _ = divmod(rem, 60) if days > 0: return f"{days}j {hours}h {minutes}m" if hours > 0: return f"{hours}h {minutes}m" return f"{minutes}m" except Exception: return "N/A" def http_probe(url: str) -> tuple[str, str]: # Use curl inside the container code, out, _ = run_cmd(["curl", "-fsS", "--max-time", "5", url], timeout_seconds=6) if code == 0: return "running", "ok" return "error", "unreachable" def get_container_env(container_name: str) -> dict: inspect = get_container_inspect(container_name) env_list = (inspect.get("Config") or {}).get("Env") or [] env_map = {} for e in env_list: if "=" in e: k, v = e.split("=", 1) env_map[k] = v return env_map def get_file_in_container(container: str, path: str) -> str: code, out, _ = run_cmd(["docker", "exec", container, "sh", "-c", f"[ -f {path} ] && cat {path} || true"], timeout_seconds=6) return out if code == 0 else "" def parse_wallet_name_from_conf(conf_text: str) -> str: try: # accept lines like wallet_name="default" or wallet_name=default for line in conf_text.splitlines(): if "wallet_name" in line: parts = line.split("=", 1) if len(parts) == 2: val = parts[1].strip().strip('"\'') if val: return val return "" except Exception: return "" def btc_list_wallets() -> list: code, out, _ = run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", "listwallets"], timeout_seconds=6) if code == 0 and out: try: return json.loads(out) or [] except Exception: return [] return [] def btc_list_walletdir() -> list: code, out, _ = run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", "listwalletdir"], timeout_seconds=6) if code == 0 and out: try: data = json.loads(out) or {} names = [w.get("name") for w in (data.get("wallets") or []) if w.get("name")] return names except Exception: return [] return [] def btc_ensure_loaded(wallet: str) -> None: try: # loadwallet returns error if already loaded; ignore run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", "loadwallet", wallet], timeout_seconds=6) except Exception: pass def ws_placeholder(url: str) -> tuple[str, str]: # Placeholder for WebSocket checks return "running", "N/A (WebSocket)" def exec_health(container: str, script: str) -> str: code, out, _ = run_cmd(["docker", "exec", container, "sh", script], timeout_seconds=6) return out if code == 0 or out else "" def blindbit_scan_progress(container: str) -> str: code, out, _ = run_cmd(["docker", "logs", "--tail", "200", container], timeout_seconds=6) if code != 0 or not out: return "" lines = out.splitlines() keywords = ("scan", "scanning", "index", "indexed", "sync", "block", "height") for line in reversed(lines): # Strip ANSI color codes ansi_stripped = re.sub(r"\x1B\[[0-9;]*[mK]", "", line) lower = ansi_stripped.lower() if any(k in lower for k in keywords): # Try to extract a 64-hex block hash from the line (after ANSI strip) m = re.search(r"\b[0-9a-fA-F]{64}\b", ansi_stripped) if m: h = m.group(0).lower() return f"{h[:15]}..." # Fallback to trimmed message if no hash present clean = ansi_stripped.strip() return (clean[:220] + ("…" if len(clean) > 220 else "")) return "" def miner_detailed_state(container: str) -> str: code, out, _ = run_cmd(["docker", "logs", "--tail", "200", container], timeout_seconds=6) if code != 0 or not out: return "" lines = out.splitlines() for line in reversed(lines): # Strip ANSI clean = re.sub(r"\x1B\[[0-9;]*[mK]", "", line).strip() low = clean.lower() if any(k in low for k in ["mining", "processed block", "new block", "candidate", "hash", "submit"]): # Extract hash-like token if present m = re.search(r"\b[0-9a-fA-F]{64}\b", clean) if m: h = m.group(0).lower() return f"{h[:15]}..." return clean[:200] + ("…" if len(clean) > 200 else "") return "" def image_info(image_ref: str) -> dict: code, out, _ = run_cmd([ "docker", "image", "inspect", image_ref, "--format", "{{json .}}" ], timeout_seconds=4) if code != 0 or not out: return {} def get_storage_size_bytes(container: str) -> int: # Try common storage paths for path in ("/app/data", "/app/storage", "/home/bitcoin/.4nk/storage"): # Use cut to avoid awk braces in f-string code, out, _ = run_cmd(["docker", "exec", container, "sh", "-c", f"[ -d {path} ] && du -sb {path} 2>/dev/null | cut -f1"], timeout_seconds=6) if code == 0 and out.strip().isdigit(): try: return int(out.strip()) except Exception: continue return 0 try: data = json.loads(out) return { "id": data.get("Id"), "created": data.get("Created"), "tags": data.get("RepoTags"), "digest": (data.get("RepoDigests") or [None])[0] } except Exception: return {} def docker_ps_names() -> set: code, out, _ = run_cmd(["docker", "ps", "--format", "{{.Names}}"], timeout_seconds=6) if code != 0 or not out: return set() return set(n.strip() for n in out.splitlines() if n.strip()) def ovh_safe_check(app_key: str, app_secret: str, consumer_key: str, service_name: str, base_url: str = "https://eu.api.ovh.com/1.0") -> dict: try: # Get OVH time with urllib.request.urlopen(f"{base_url}/auth/time") as resp: server_time = int(resp.read().decode().strip()) method = "GET" path = f"/sms/{service_name}/senders" url = f"{base_url}{path}" body = "" # Signature: $1$ + sha1(appSecret + '+' + consumerKey + '+' + method + '+' + url + '+' + body + '+' + timestamp) to_sign = "+".join([app_secret, consumer_key, method, url, body, str(server_time)]) sha = hashlib.sha1(to_sign.encode()).hexdigest() signature = f"$1${sha}" req = urllib.request.Request(url) req.add_header("X-Ovh-Application", app_key) req.add_header("X-Ovh-Consumer", consumer_key) req.add_header("X-Ovh-Signature", signature) req.add_header("X-Ovh-Timestamp", str(server_time)) with urllib.request.urlopen(req, timeout=6) as r2: status_code = r2.getcode() if status_code == 200: return {"provider": "OVH", "status": "ok"} return {"provider": "OVH", "status": "error", "code": status_code} except Exception: return {"provider": "OVH", "status": "error"} class StatusAPIHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/api': self.send_response(200) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() # Map service definitions to docker containers and optional probes service_defs = [ {"name": "Tor Proxy", "container": "tor-proxy", "protocol": "SOCKS", "port": 9050, "health": lambda: exec_health("tor-proxy", "/scripts/healthchecks/tor-progress.sh")}, {"name": "Bitcoin Signet", "container": "bitcoin-signet", "protocol": "RPC", "port": 8332, "health": lambda: exec_health("bitcoin-signet", "/scripts/healthchecks/bitcoin-progress.sh")}, {"name": "BlindBit Oracle", "container": "blindbit-oracle", "protocol": "HTTP", "port": 8000, "health": lambda: exec_health("blindbit-oracle", "/scripts/healthchecks/blindbit-progress.sh")}, {"name": "SDK Relay", "container": "sdk_relay", "protocol": "WebSocket", "port": 8090, "health": lambda: exec_health("sdk_relay", "/scripts/healthchecks/sdk-relay-progress.sh")}, {"name": "SDK Storage", "container": "sdk_storage", "protocol": "HTTP", "port": 8080, "probe": lambda: http_probe("http://sdk_storage:8080/health")}, {"name": "LeCoffre Frontend", "container": "lecoffre-front", "protocol": "HTTP", "port": 3000}, {"name": "IHM Client", "container": "ihm_client", "protocol": "HTTP", "port": 3003}, {"name": "Grafana", "container": "grafana", "protocol": "HTTP", "port": 3000, "probe": lambda: http_probe("http://grafana:3000/api/health")}, {"name": "Loki", "container": "loki", "protocol": "HTTP", "port": 3100, "probe": lambda: http_probe("http://loki:3100/ready")}, {"name": "Promtail", "container": "promtail", "protocol": "HTTP", "port": 9080}, {"name": "Miner Signet", "container": "signet_miner", "protocol": "Bitcoin", "port": None}, ] services = [] for sdef in service_defs: inspect = get_container_inspect(sdef["container"]) or {} state = (inspect.get("State") or {}) status = state.get("Status", "stopped") started_at = state.get("StartedAt", "") uptime = compute_uptime(started_at) if status == "running" else "N/A" image_ref = inspect.get("Config", {}).get("Image") or "" img = image_info(image_ref) if image_ref else {} # health status text via scripts or simple probe health_text = "" health = "unknown" try: if "health" in sdef: health_text = sdef["health"]() or "" health = "healthy" if "ready" in health_text or "Synced" in health_text else "starting" elif "probe" in sdef: hstatus, _ = sdef["probe"]() health = "healthy" if hstatus == "running" else "error" if sdef.get("name") == "BlindBit Oracle": progress = blindbit_scan_progress("blindbit-oracle") if progress and progress not in (health_text or ""): # If progress looks like a pure hash, show only the hash if len(progress) == 64 and all(c in '0123456789abcdef' for c in progress): health_text = (health_text + (" | " if health_text else "") + f"Scan: {progress}") else: health_text = (health_text + (" | " if health_text else "") + f"Scan: {progress}") if sdef.get("name") == "Miner Signet": mstate = miner_detailed_state("signet_miner") if mstate: health_text = (health_text + (" | " if health_text else "") + f"Miner: {mstate}") except Exception: health = "unknown" # SDK Storage extra: compute data size data_size_bytes = 0 if sdef["name"] == "SDK Storage" and status == "running": try: data_size_bytes = get_storage_size_bytes(sdef["container"]) or 0 except Exception: data_size_bytes = 0 services.append({ "name": sdef["name"], "status": status, "image": image_ref, "ip": (inspect.get("NetworkSettings") or {}).get("IPAddress"), "port": sdef.get("port"), "protocol": sdef.get("protocol"), "uptime": uptime, "health": health, "health_text": health_text, "image_info": img, "data_size_bytes": data_size_bytes, }) # External endpoints ext_defs = [ {"name": "Mempool Signet", "url": "https://mempool2.4nkweb.com", "protocol": "HTTPS", "check": lambda: http_probe("https://mempool2.4nkweb.com/fr/docs/api/rest")}, {"name": "Relay Bootstrap", "url": "wss://dev3.4nkweb.com/ws/", "protocol": "WebSocket", "check": lambda: ws_placeholder("wss://dev3.4nkweb.com/ws/")}, {"name": "Signer Bootstrap", "url": "https://dev3.4nkweb.com", "protocol": "HTTPS", "check": lambda: http_probe("https://dev3.4nkweb.com")}, {"name": "Git Repository", "url": "git.4nkweb.com", "protocol": "SSH", "check": lambda: ("running", "N/A (SSH)")}, ] external = [] for ext in ext_defs: status, response = ext["check"]() external.append({ "name": ext["name"], "url": ext["url"], "protocol": ext["protocol"], "status": status, "response_time": response, }) # Runner info from Gitea API if credentials present runner = {} # Back-end env placeholders configured? back = get_container_inspect("lecoffre-back") env_list = back.get("Config", {}).get("Env") if back else [] env_map = {e.split("=", 1)[0]: e.split("=", 1)[1] for e in env_list or [] if "=" in e} externals_cfg = { "OVH": bool(env_map.get("OVH_APPLICATION_KEY")), "Stripe": bool(env_map.get("STRIPE_SECRET_KEY")), "Mailchimp": bool(env_map.get("MAILCHIMP_API_KEY")), } # Try to fetch latest run from Gitea if configured gitea_token = env_map.get("GIT_TOKEN") or env_map.get("GITEA_TOKEN") gitea_base = env_map.get("GITEA_BASE_URL", "https://git.4nkweb.com").rstrip('/') owners_raw = env_map.get("GITEA_OWNER", "") or "nicolas.cantu,Omar" owners = [o.strip() for o in owners_raw.split(",") if o.strip()] if owners_raw else [] if gitea_token and owners: try: auth_header = f"Authorization: token {gitea_token}" latest = None latest_repo = None for owner in owners: # List repos for owner u_repos = f"{gitea_base}/api/v1/users/{owner}/repos?limit=100" code_r, out_r, _ = run_cmd(["curl", "-fsS", u_repos, "-H", auth_header, "-H", "accept: application/json"], timeout_seconds=6) if code_r != 0 or not out_r: # Try orgs endpoint as fallback o_repos = f"{gitea_base}/api/v1/orgs/{owner}/repos?limit=100" code_ro, out_ro, _ = run_cmd(["curl", "-fsS", o_repos, "-H", auth_header, "-H", "accept: application/json"], timeout_seconds=6) if code_ro != 0 or not out_ro: continue out_r = out_ro repos = json.loads(out_r) for repo in repos: name = repo.get("name") if not name: continue runs_url = f"{gitea_base}/api/v1/repos/{owner}/{name}/actions/runs?limit=1" code_u, out_u, _ = run_cmd(["curl", "-fsS", runs_url, "-H", auth_header, "-H", "accept: application/json"], timeout_seconds=6) if code_u != 0 or not out_u: continue data = json.loads(out_u) runs = data.get("workflow_runs") or data.get("data") or [] if runs: r = runs[0] ts = r.get("created_at") or r.get("started_at") or "" if ts and (latest is None or ts > (latest.get("created_at") or latest.get("started_at") or "")): latest = r latest_repo = f"{owner}/{name}" if latest and latest_repo: runner = { "name": latest_repo, "status": latest.get("status") or latest.get("conclusion"), "started_at": latest.get("created_at") or latest.get("started_at"), "uptime": "", "url": latest.get("html_url") or latest.get("url"), } except Exception: pass # Deployment progress: basé sur présence (docker ps) ET santé running_names = docker_ps_names() total = len(services) ready = 0 for s in services: name = s.get("name") container = next((d["container"] for d in service_defs if d["name"] == name), None) present = container in running_names if container else False healthy_s = (s.get("health") == "healthy") running_s = (s.get("status") == "running") # considérer prêt si present ET (healthy ou running) if present and (healthy_s or running_s): ready += 1 percent = int(ready * 100 / total) if total else 0 # Integrations: Mailchimp (Mandrill ping) and Stripe (counts) mailchimp_test = {"provider": "Mailchimp", "status": "missing"} if env_map.get("MAILCHIMP_API_KEY"): try: code_mc, out_mc, _ = run_cmd([ "curl", "-fsS", "-X", "POST", "https://mandrillapp.com/api/1.0/users/ping.json", "-H", "Content-Type: application/json", "-d", json.dumps({"key": env_map.get("MAILCHIMP_API_KEY")}) ], timeout_seconds=6) if code_mc == 0 and (out_mc.strip() == '"PONG"' or 'PONG' in out_mc): mailchimp_test = {"provider": "Mailchimp", "status": "ok"} else: mailchimp_test = {"provider": "Mailchimp", "status": "error"} except Exception: mailchimp_test = {"provider": "Mailchimp", "status": "error"} # Stripe: lister prices et agréger en balayant les subscriptions (sans filtre price) stripe_by_offer = {"CREATORS": 0, "STARTER": 0, "STANDARD": 0, "UNLIMITED": 0, "TOTAL": 0} stripe_prices_map = {} stripe_price_counts = {} if env_map.get("STRIPE_SECRET_KEY"): try: auth_h = f"Authorization: Bearer {env_map.get('STRIPE_SECRET_KEY')}" # 1) Lister les prices actifs (<=100) pour mapper price.id -> nickname code_p, out_p, _ = run_cmd([ "curl", "-fsS", "https://api.stripe.com/v1/prices?limit=100&active=true", "-H", auth_h ], timeout_seconds=6) if code_p == 0 and out_p: prices = (json.loads(out_p) or {}).get("data") or [] for pr in prices: pid = pr.get('id') stripe_prices_map[pid] = pr.get('nickname') or '' stripe_price_counts[pid] = 0 # Déterminer les familles par ID connus (si présents dans l'env) sinon par nickname creators_ids = set(filter(None, [env_map.get("STRIPE_CREATORS_PRICE_ID")])) standard_ids = set(filter(None, [ env_map.get("STRIPE_STANDARD_SUBSCRIPTION_PRICE_ID"), env_map.get("STRIPE_STANDARD_ANNUAL_SUBSCRIPTION_PRICE_ID"), env_map.get("STRIPE_STANDARD_MONTHLY_YEAR_PRICE_ID"), env_map.get("STRIPE_STANDARD_MONTHLY_MONTH_PRICE_ID"), ])) starter_ids = set(filter(None, [ env_map.get("STRIPE_STARTER_ANNUAL_PRICE_ID"), env_map.get("STRIPE_STARTER_MONTHLY_YEAR_PRICE_ID"), env_map.get("STRIPE_STARTER_MONTHLY_MONTH_PRICE_ID"), ])) unlimited_ids = set(filter(None, [ env_map.get("STRIPE_UNLIMITED_SUBSCRIPTION_PRICE_ID"), env_map.get("STRIPE_UNLIMITED_ANNUAL_SUBSCRIPTION_PRICE_ID"), ])) def family_for(pid: str, nickname: str) -> str: if pid in creators_ids or (nickname and 'createur' in nickname.lower()): return 'CREATORS' if pid in starter_ids or (nickname and 'starter' in nickname.lower()): return 'STARTER' if pid in standard_ids or (nickname and 'standard' in nickname.lower()): return 'STANDARD' if pid in unlimited_ids or (nickname and 'unlimit' in nickname.lower()): return 'UNLIMITED' return '' # 2) Lister subscriptions (active + trialing) et agréger par famille du price starting_after = None pages = 0 while pages < 3: # limite de pagination pour éviter les boucles longues url = "https://api.stripe.com/v1/subscriptions?limit=100&status=active&status=trialing" if starting_after: url += f"&starting_after={starting_after}" code_s, out_s, _ = run_cmd(["curl", "-fsS", url, "-H", auth_h], timeout_seconds=8) if code_s != 0 or not out_s: break d = json.loads(out_s) or {} subs = d.get("data") or [] for sub in subs: items = ((sub.get("items") or {}).get("data") or []) for it in items: pid = ((it.get("price") or {}).get("id")) nick = stripe_prices_map.get(pid, '') fam = family_for(pid or '', nick) if not fam: continue stripe_by_offer[fam] = stripe_by_offer.get(fam, 0) + 1 stripe_by_offer["TOTAL"] += 1 if pid: stripe_price_counts[pid] = stripe_price_counts.get(pid, 0) + 1 if d.get("has_more") and subs: starting_after = subs[-1].get('id') pages += 1 continue break except Exception: pass # OVH: afficher configuré/non configuré (appel signé non implémenté ici) ovh_test = {"provider": "OVH", "status": "missing"} if externals_cfg.get("OVH"): ovh_test = ovh_safe_check(env_map.get("OVH_APPLICATION_KEY", ""), env_map.get("OVH_APPLICATION_SECRET", ""), env_map.get("OVH_CONSUMER_KEY", ""), env_map.get("OVH_SERVICE_NAME", "")) # Wallet balances via bitcoin-cli (signet) def btc_wallet_balance(wallet: str) -> dict: try: if wallet: btc_ensure_loaded(wallet) code_b, out_b, _ = run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", f"-rpcwallet={wallet}", "getbalances"], timeout_seconds=6) if code_b == 0 and out_b: b = json.loads(out_b) conf = ((b.get("mine") or {}).get("trusted") or 0) if isinstance(b.get("mine"), dict) else 0 unconf = ((b.get("mine") or {}).get("untrusted_pending") or 0) if isinstance(b.get("mine"), dict) else 0 imm = ((b.get("mine") or {}).get("immature") or 0) if isinstance(b.get("mine"), dict) else 0 # Convert BTC -> sats to_sats = lambda v: int(float(v) * 100_000_000) return {"confirmed_sat": to_sats(conf), "unconfirmed_sat": to_sats(unconf), "immature_sat": to_sats(imm)} except Exception: pass return {"confirmed_sat": 0, "unconfirmed_sat": 0, "immature_sat": 0} wallets = {} # Detect known wallets from service envs relay_env = get_container_env("sdk_relay") # Try env, then file conf relay_wallet = relay_env.get("WALLET_NAME") or relay_env.get("SDK_RELAY_WALLET_NAME") if not relay_wallet: relay_conf = get_file_in_container("sdk_relay", "/app/.conf") relay_wallet = parse_wallet_name_from_conf(relay_conf) if relay_wallet: wallets["SDK Relay"] = btc_wallet_balance(relay_wallet) # Miner wallet: try default 'miner' else listwallets miner_wallet = "miner" wallets["Miner Signet"] = btc_wallet_balance(miner_wallet) relay_bootstrap_wallet = env_map.get("RELAY_BOOTSTRAP_WALLET_NAME") if relay_bootstrap_wallet: wallets["Relay Bootstrap"] = btc_wallet_balance(relay_bootstrap_wallet) # Enumerate all bitcoin wallets (load if necessary) and balances try: bitcoin_wallets = {} loaded = set(btc_list_wallets()) all_in_dir = btc_list_walletdir() for wname in (all_in_dir or []): if wname not in loaded: btc_ensure_loaded(wname) loaded.add(wname) for wname in loaded: bitcoin_wallets[wname] = btc_wallet_balance(wname) wallets["Bitcoin Signet Wallets"] = bitcoin_wallets except Exception: pass response = { "timestamp": datetime.now().isoformat(), "services": services, "external": external, "runner": runner, "integrations_configured": externals_cfg, "deployment": {"total": total, "healthy": healthy, "percent": percent}, "integrations_test": { "ovh": ovh_test, "mailchimp": mailchimp_test, "stripe_subscriptions_by_offer": stripe_by_offer, "stripe_prices": {pid: {"nickname": stripe_prices_map.get(pid, ""), "count": cnt} for pid, cnt in stripe_price_counts.items()}, }, "wallets": wallets, } self.wfile.write(json.dumps(response, indent=2).encode()) else: self.send_response(404) self.end_headers() def do_OPTIONS(self): self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization') self.end_headers() if __name__ == '__main__': server = HTTPServer(('0.0.0.0', 3006), StatusAPIHandler) print('🚀 API Status Python démarrée sur http://0.0.0.0:3006') server.serve_forever()