606 lines
29 KiB
Python

#!/usr/bin/env python3
import json
import subprocess
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
import re
import time as _time
import hashlib
import hmac
import urllib.parse
import urllib.request
def run_cmd(command: list[str], timeout_seconds: int = 5) -> tuple[int, str, str]:
try:
proc = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout_seconds,
text=True,
)
return proc.returncode, proc.stdout.strip(), proc.stderr.strip()
except subprocess.TimeoutExpired:
return 124, "", "timeout"
except Exception as exc:
return 1, "", str(exc)
def get_container_inspect(container_name: str) -> dict:
code, out, _ = run_cmd([
"docker", "inspect", container_name,
"--format",
"{{json .}}",
], timeout_seconds=4)
if code != 0 or not out:
return {}
try:
return json.loads(out)
except Exception:
return {}
def compute_uptime(started_at: str) -> str:
try:
start = datetime.fromisoformat(started_at.replace("Z", "+00:00"))
delta = datetime.now(start.tzinfo) - start
total_seconds = int(delta.total_seconds())
days, rem = divmod(total_seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, _ = divmod(rem, 60)
if days > 0:
return f"{days}j {hours}h {minutes}m"
if hours > 0:
return f"{hours}h {minutes}m"
return f"{minutes}m"
except Exception:
return "N/A"
def http_probe(url: str) -> tuple[str, str]:
# Use curl inside the container
code, out, _ = run_cmd(["curl", "-fsS", "--max-time", "5", url], timeout_seconds=6)
if code == 0:
return "running", "ok"
return "error", "unreachable"
def get_container_env(container_name: str) -> dict:
inspect = get_container_inspect(container_name)
env_list = (inspect.get("Config") or {}).get("Env") or []
env_map = {}
for e in env_list:
if "=" in e:
k, v = e.split("=", 1)
env_map[k] = v
return env_map
def get_file_in_container(container: str, path: str) -> str:
code, out, _ = run_cmd(["docker", "exec", container, "sh", "-c", f"[ -f {path} ] && cat {path} || true"], timeout_seconds=6)
return out if code == 0 else ""
def parse_wallet_name_from_conf(conf_text: str) -> str:
try:
# accept lines like wallet_name="default" or wallet_name=default
for line in conf_text.splitlines():
if "wallet_name" in line:
parts = line.split("=", 1)
if len(parts) == 2:
val = parts[1].strip().strip('"\'')
if val:
return val
return ""
except Exception:
return ""
def btc_list_wallets() -> list:
code, out, _ = run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", "listwallets"], timeout_seconds=6)
if code == 0 and out:
try:
return json.loads(out) or []
except Exception:
return []
return []
def btc_list_walletdir() -> list:
code, out, _ = run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", "listwalletdir"], timeout_seconds=6)
if code == 0 and out:
try:
data = json.loads(out) or {}
names = [w.get("name") for w in (data.get("wallets") or []) if w.get("name")]
return names
except Exception:
return []
return []
def btc_ensure_loaded(wallet: str) -> None:
try:
# loadwallet returns error if already loaded; ignore
run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", "loadwallet", wallet], timeout_seconds=6)
except Exception:
pass
def ws_placeholder(url: str) -> tuple[str, str]:
# Placeholder for WebSocket checks
return "running", "N/A (WebSocket)"
def exec_health(container: str, script: str) -> str:
code, out, _ = run_cmd(["docker", "exec", container, "sh", script], timeout_seconds=6)
return out if code == 0 or out else ""
def blindbit_scan_progress(container: str) -> str:
code, out, _ = run_cmd(["docker", "logs", "--tail", "200", container], timeout_seconds=6)
if code != 0 or not out:
return ""
lines = out.splitlines()
keywords = ("scan", "scanning", "index", "indexed", "sync", "block", "height")
for line in reversed(lines):
# Strip ANSI color codes
ansi_stripped = re.sub(r"\x1B\[[0-9;]*[mK]", "", line)
lower = ansi_stripped.lower()
if any(k in lower for k in keywords):
# Try to extract a 64-hex block hash from the line (after ANSI strip)
m = re.search(r"\b[0-9a-fA-F]{64}\b", ansi_stripped)
if m:
h = m.group(0).lower()
return f"{h[:15]}..."
# Fallback to trimmed message if no hash present
clean = ansi_stripped.strip()
return (clean[:220] + ("" if len(clean) > 220 else ""))
return ""
def miner_detailed_state(container: str) -> str:
code, out, _ = run_cmd(["docker", "logs", "--tail", "200", container], timeout_seconds=6)
if code != 0 or not out:
return ""
lines = out.splitlines()
for line in reversed(lines):
# Strip ANSI
clean = re.sub(r"\x1B\[[0-9;]*[mK]", "", line).strip()
low = clean.lower()
if any(k in low for k in ["mining", "processed block", "new block", "candidate", "hash", "submit"]):
# Extract hash-like token if present
m = re.search(r"\b[0-9a-fA-F]{64}\b", clean)
if m:
h = m.group(0).lower()
return f"{h[:15]}..."
return clean[:200] + ("" if len(clean) > 200 else "")
return ""
def image_info(image_ref: str) -> dict:
code, out, _ = run_cmd([
"docker", "image", "inspect", image_ref, "--format", "{{json .}}"
], timeout_seconds=4)
if code != 0 or not out:
return {}
def get_storage_size_bytes(container: str) -> int:
# Try common storage paths
for path in ("/app/data", "/app/storage", "/home/bitcoin/.4nk/storage"):
# Use cut to avoid awk braces in f-string
code, out, _ = run_cmd(["docker", "exec", container, "sh", "-c", f"[ -d {path} ] && du -sb {path} 2>/dev/null | cut -f1"], timeout_seconds=6)
if code == 0 and out.strip().isdigit():
try:
return int(out.strip())
except Exception:
continue
return 0
try:
data = json.loads(out)
return {
"id": data.get("Id"),
"created": data.get("Created"),
"tags": data.get("RepoTags"),
"digest": (data.get("RepoDigests") or [None])[0]
}
except Exception:
return {}
def docker_ps_names() -> set:
code, out, _ = run_cmd(["docker", "ps", "--format", "{{.Names}}"], timeout_seconds=6)
if code != 0 or not out:
return set()
return set(n.strip() for n in out.splitlines() if n.strip())
def ovh_safe_check(app_key: str, app_secret: str, consumer_key: str, service_name: str, base_url: str = "https://eu.api.ovh.com/1.0") -> dict:
try:
# Get OVH time
with urllib.request.urlopen(f"{base_url}/auth/time") as resp:
server_time = int(resp.read().decode().strip())
method = "GET"
path = f"/sms/{service_name}/senders"
url = f"{base_url}{path}"
body = ""
# Signature: $1$ + sha1(appSecret + '+' + consumerKey + '+' + method + '+' + url + '+' + body + '+' + timestamp)
to_sign = "+".join([app_secret, consumer_key, method, url, body, str(server_time)])
sha = hashlib.sha1(to_sign.encode()).hexdigest()
signature = f"$1${sha}"
req = urllib.request.Request(url)
req.add_header("X-Ovh-Application", app_key)
req.add_header("X-Ovh-Consumer", consumer_key)
req.add_header("X-Ovh-Signature", signature)
req.add_header("X-Ovh-Timestamp", str(server_time))
with urllib.request.urlopen(req, timeout=6) as r2:
status_code = r2.getcode()
if status_code == 200:
return {"provider": "OVH", "status": "ok"}
return {"provider": "OVH", "status": "error", "code": status_code}
except Exception:
return {"provider": "OVH", "status": "error"}
class StatusAPIHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/api':
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
# Map service definitions to docker containers and optional probes
service_defs = [
{"name": "Tor Proxy", "container": "tor-proxy", "protocol": "SOCKS", "port": 9050, "health": lambda: exec_health("tor-proxy", "/scripts/healthchecks/tor-progress.sh")},
{"name": "Bitcoin Signet", "container": "bitcoin-signet", "protocol": "RPC", "port": 8332, "health": lambda: exec_health("bitcoin-signet", "/scripts/healthchecks/bitcoin-progress.sh")},
{"name": "BlindBit Oracle", "container": "blindbit-oracle", "protocol": "HTTP", "port": 8000, "health": lambda: exec_health("blindbit-oracle", "/scripts/healthchecks/blindbit-progress.sh")},
{"name": "SDK Relay", "container": "sdk_relay", "protocol": "WebSocket", "port": 8090, "health": lambda: exec_health("sdk_relay", "/scripts/healthchecks/sdk-relay-progress.sh")},
{"name": "SDK Storage", "container": "sdk_storage", "protocol": "HTTP", "port": 8080, "probe": lambda: http_probe("http://sdk_storage:8080/health")},
{"name": "LeCoffre Frontend", "container": "lecoffre-front", "protocol": "HTTP", "port": 3000},
{"name": "IHM Client", "container": "ihm_client", "protocol": "HTTP", "port": 3003},
{"name": "Grafana", "container": "grafana", "protocol": "HTTP", "port": 3000, "probe": lambda: http_probe("http://grafana:3000/api/health")},
{"name": "Loki", "container": "loki", "protocol": "HTTP", "port": 3100, "probe": lambda: http_probe("http://loki:3100/ready")},
{"name": "Promtail", "container": "promtail", "protocol": "HTTP", "port": 9080},
{"name": "Miner Signet", "container": "signet_miner", "protocol": "Bitcoin", "port": None},
]
services = []
for sdef in service_defs:
inspect = get_container_inspect(sdef["container"]) or {}
state = (inspect.get("State") or {})
status = state.get("Status", "stopped")
started_at = state.get("StartedAt", "")
uptime = compute_uptime(started_at) if status == "running" else "N/A"
image_ref = inspect.get("Config", {}).get("Image") or ""
img = image_info(image_ref) if image_ref else {}
# health status text via scripts or simple probe
health_text = ""
health = "unknown"
try:
if "health" in sdef:
health_text = sdef["health"]() or ""
health = "healthy" if "ready" in health_text or "Synced" in health_text else "starting"
elif "probe" in sdef:
hstatus, _ = sdef["probe"]()
health = "healthy" if hstatus == "running" else "error"
if sdef.get("name") == "BlindBit Oracle":
progress = blindbit_scan_progress("blindbit-oracle")
if progress and progress not in (health_text or ""):
# If progress looks like a pure hash, show only the hash
if len(progress) == 64 and all(c in '0123456789abcdef' for c in progress):
health_text = (health_text + (" | " if health_text else "") + f"Scan: {progress}")
else:
health_text = (health_text + (" | " if health_text else "") + f"Scan: {progress}")
if sdef.get("name") == "Miner Signet":
mstate = miner_detailed_state("signet_miner")
if mstate:
health_text = (health_text + (" | " if health_text else "") + f"Miner: {mstate}")
except Exception:
health = "unknown"
# SDK Storage extra: compute data size
data_size_bytes = 0
if sdef["name"] == "SDK Storage" and status == "running":
try:
data_size_bytes = get_storage_size_bytes(sdef["container"]) or 0
except Exception:
data_size_bytes = 0
services.append({
"name": sdef["name"],
"status": status,
"image": image_ref,
"ip": (inspect.get("NetworkSettings") or {}).get("IPAddress"),
"port": sdef.get("port"),
"protocol": sdef.get("protocol"),
"uptime": uptime,
"health": health,
"health_text": health_text,
"image_info": img,
"data_size_bytes": data_size_bytes,
})
# External endpoints
ext_defs = [
{"name": "Mempool Signet", "url": "https://mempool2.4nkweb.com", "protocol": "HTTPS", "check": lambda: http_probe("https://mempool2.4nkweb.com/fr/docs/api/rest")},
{"name": "Relay Bootstrap", "url": "wss://dev3.4nkweb.com/ws/", "protocol": "WebSocket", "check": lambda: ws_placeholder("wss://dev3.4nkweb.com/ws/")},
{"name": "Signer Bootstrap", "url": "https://dev3.4nkweb.com", "protocol": "HTTPS", "check": lambda: http_probe("https://dev3.4nkweb.com")},
{"name": "Git Repository", "url": "git.4nkweb.com", "protocol": "SSH", "check": lambda: ("running", "N/A (SSH)")},
]
external = []
for ext in ext_defs:
status, response = ext["check"]()
external.append({
"name": ext["name"],
"url": ext["url"],
"protocol": ext["protocol"],
"status": status,
"response_time": response,
})
# Runner info from Gitea API if credentials present
runner = {}
# Back-end env placeholders configured?
back = get_container_inspect("lecoffre-back")
env_list = back.get("Config", {}).get("Env") if back else []
env_map = {e.split("=", 1)[0]: e.split("=", 1)[1] for e in env_list or [] if "=" in e}
externals_cfg = {
"OVH": bool(env_map.get("OVH_APPLICATION_KEY")),
"Stripe": bool(env_map.get("STRIPE_SECRET_KEY")),
"Mailchimp": bool(env_map.get("MAILCHIMP_API_KEY")),
}
# Try to fetch latest run from Gitea if configured
gitea_token = env_map.get("GIT_TOKEN") or env_map.get("GITEA_TOKEN")
gitea_base = env_map.get("GITEA_BASE_URL", "https://git.4nkweb.com").rstrip('/')
owners_raw = env_map.get("GITEA_OWNER", "") or "nicolas.cantu,Omar"
owners = [o.strip() for o in owners_raw.split(",") if o.strip()] if owners_raw else []
if gitea_token and owners:
try:
auth_header = f"Authorization: token {gitea_token}"
latest = None
latest_repo = None
for owner in owners:
# List repos for owner
u_repos = f"{gitea_base}/api/v1/users/{owner}/repos?limit=100"
code_r, out_r, _ = run_cmd(["curl", "-fsS", u_repos, "-H", auth_header, "-H", "accept: application/json"], timeout_seconds=6)
if code_r != 0 or not out_r:
# Try orgs endpoint as fallback
o_repos = f"{gitea_base}/api/v1/orgs/{owner}/repos?limit=100"
code_ro, out_ro, _ = run_cmd(["curl", "-fsS", o_repos, "-H", auth_header, "-H", "accept: application/json"], timeout_seconds=6)
if code_ro != 0 or not out_ro:
continue
out_r = out_ro
repos = json.loads(out_r)
for repo in repos:
name = repo.get("name")
if not name:
continue
runs_url = f"{gitea_base}/api/v1/repos/{owner}/{name}/actions/runs?limit=1"
code_u, out_u, _ = run_cmd(["curl", "-fsS", runs_url, "-H", auth_header, "-H", "accept: application/json"], timeout_seconds=6)
if code_u != 0 or not out_u:
continue
data = json.loads(out_u)
runs = data.get("workflow_runs") or data.get("data") or []
if runs:
r = runs[0]
ts = r.get("created_at") or r.get("started_at") or ""
if ts and (latest is None or ts > (latest.get("created_at") or latest.get("started_at") or "")):
latest = r
latest_repo = f"{owner}/{name}"
if latest and latest_repo:
runner = {
"name": latest_repo,
"status": latest.get("status") or latest.get("conclusion"),
"started_at": latest.get("created_at") or latest.get("started_at"),
"uptime": "",
"url": latest.get("html_url") or latest.get("url"),
}
except Exception:
pass
# Deployment progress: basé sur présence (docker ps) ET santé
running_names = docker_ps_names()
total = len(services)
ready = 0
for s in services:
name = s.get("name")
container = next((d["container"] for d in service_defs if d["name"] == name), None)
present = container in running_names if container else False
healthy_s = (s.get("health") == "healthy")
running_s = (s.get("status") == "running")
# considérer prêt si present ET (healthy ou running)
if present and (healthy_s or running_s):
ready += 1
percent = int(ready * 100 / total) if total else 0
# Integrations: Mailchimp (Mandrill ping) and Stripe (counts)
mailchimp_test = {"provider": "Mailchimp", "status": "missing"}
if env_map.get("MAILCHIMP_API_KEY"):
try:
code_mc, out_mc, _ = run_cmd([
"curl", "-fsS", "-X", "POST",
"https://mandrillapp.com/api/1.0/users/ping.json",
"-H", "Content-Type: application/json",
"-d", json.dumps({"key": env_map.get("MAILCHIMP_API_KEY")})
], timeout_seconds=6)
if code_mc == 0 and (out_mc.strip() == '"PONG"' or 'PONG' in out_mc):
mailchimp_test = {"provider": "Mailchimp", "status": "ok"}
else:
mailchimp_test = {"provider": "Mailchimp", "status": "error"}
except Exception:
mailchimp_test = {"provider": "Mailchimp", "status": "error"}
# Stripe: lister prices et agréger en balayant les subscriptions (sans filtre price)
stripe_by_offer = {"CREATORS": 0, "STARTER": 0, "STANDARD": 0, "UNLIMITED": 0, "TOTAL": 0}
stripe_prices_map = {}
stripe_price_counts = {}
if env_map.get("STRIPE_SECRET_KEY"):
try:
auth_h = f"Authorization: Bearer {env_map.get('STRIPE_SECRET_KEY')}"
# 1) Lister les prices actifs (<=100) pour mapper price.id -> nickname
code_p, out_p, _ = run_cmd([
"curl", "-fsS", "https://api.stripe.com/v1/prices?limit=100&active=true",
"-H", auth_h
], timeout_seconds=6)
if code_p == 0 and out_p:
prices = (json.loads(out_p) or {}).get("data") or []
for pr in prices:
pid = pr.get('id')
stripe_prices_map[pid] = pr.get('nickname') or ''
stripe_price_counts[pid] = 0
# Déterminer les familles par ID connus (si présents dans l'env) sinon par nickname
creators_ids = set(filter(None, [env_map.get("STRIPE_CREATORS_PRICE_ID")]))
standard_ids = set(filter(None, [
env_map.get("STRIPE_STANDARD_SUBSCRIPTION_PRICE_ID"),
env_map.get("STRIPE_STANDARD_ANNUAL_SUBSCRIPTION_PRICE_ID"),
env_map.get("STRIPE_STANDARD_MONTHLY_YEAR_PRICE_ID"),
env_map.get("STRIPE_STANDARD_MONTHLY_MONTH_PRICE_ID"),
]))
starter_ids = set(filter(None, [
env_map.get("STRIPE_STARTER_ANNUAL_PRICE_ID"),
env_map.get("STRIPE_STARTER_MONTHLY_YEAR_PRICE_ID"),
env_map.get("STRIPE_STARTER_MONTHLY_MONTH_PRICE_ID"),
]))
unlimited_ids = set(filter(None, [
env_map.get("STRIPE_UNLIMITED_SUBSCRIPTION_PRICE_ID"),
env_map.get("STRIPE_UNLIMITED_ANNUAL_SUBSCRIPTION_PRICE_ID"),
]))
def family_for(pid: str, nickname: str) -> str:
if pid in creators_ids or (nickname and 'createur' in nickname.lower()):
return 'CREATORS'
if pid in starter_ids or (nickname and 'starter' in nickname.lower()):
return 'STARTER'
if pid in standard_ids or (nickname and 'standard' in nickname.lower()):
return 'STANDARD'
if pid in unlimited_ids or (nickname and 'unlimit' in nickname.lower()):
return 'UNLIMITED'
return ''
# 2) Lister subscriptions (active + trialing) et agréger par famille du price
starting_after = None
pages = 0
while pages < 3: # limite de pagination pour éviter les boucles longues
url = "https://api.stripe.com/v1/subscriptions?limit=100&status=active&status=trialing"
if starting_after:
url += f"&starting_after={starting_after}"
code_s, out_s, _ = run_cmd(["curl", "-fsS", url, "-H", auth_h], timeout_seconds=8)
if code_s != 0 or not out_s:
break
d = json.loads(out_s) or {}
subs = d.get("data") or []
for sub in subs:
items = ((sub.get("items") or {}).get("data") or [])
for it in items:
pid = ((it.get("price") or {}).get("id"))
nick = stripe_prices_map.get(pid, '')
fam = family_for(pid or '', nick)
if not fam:
continue
stripe_by_offer[fam] = stripe_by_offer.get(fam, 0) + 1
stripe_by_offer["TOTAL"] += 1
if pid:
stripe_price_counts[pid] = stripe_price_counts.get(pid, 0) + 1
if d.get("has_more") and subs:
starting_after = subs[-1].get('id')
pages += 1
continue
break
except Exception:
pass
# OVH: afficher configuré/non configuré (appel signé non implémenté ici)
ovh_test = {"provider": "OVH", "status": "missing"}
if externals_cfg.get("OVH"):
ovh_test = ovh_safe_check(env_map.get("OVH_APPLICATION_KEY", ""), env_map.get("OVH_APPLICATION_SECRET", ""), env_map.get("OVH_CONSUMER_KEY", ""), env_map.get("OVH_SERVICE_NAME", ""))
# Wallet balances via bitcoin-cli (signet)
def btc_wallet_balance(wallet: str) -> dict:
try:
if wallet:
btc_ensure_loaded(wallet)
code_b, out_b, _ = run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", f"-rpcwallet={wallet}", "getbalances"], timeout_seconds=6)
if code_b == 0 and out_b:
b = json.loads(out_b)
conf = ((b.get("mine") or {}).get("trusted") or 0) if isinstance(b.get("mine"), dict) else 0
unconf = ((b.get("mine") or {}).get("untrusted_pending") or 0) if isinstance(b.get("mine"), dict) else 0
imm = ((b.get("mine") or {}).get("immature") or 0) if isinstance(b.get("mine"), dict) else 0
# Convert BTC -> sats
to_sats = lambda v: int(float(v) * 100_000_000)
return {"confirmed_sat": to_sats(conf), "unconfirmed_sat": to_sats(unconf), "immature_sat": to_sats(imm)}
except Exception:
pass
return {"confirmed_sat": 0, "unconfirmed_sat": 0, "immature_sat": 0}
wallets = {}
# Detect known wallets from service envs
relay_env = get_container_env("sdk_relay")
# Try env, then file conf
relay_wallet = relay_env.get("WALLET_NAME") or relay_env.get("SDK_RELAY_WALLET_NAME")
if not relay_wallet:
relay_conf = get_file_in_container("sdk_relay", "/app/.conf")
relay_wallet = parse_wallet_name_from_conf(relay_conf)
if relay_wallet:
wallets["SDK Relay"] = btc_wallet_balance(relay_wallet)
# Miner wallet: try default 'miner' else listwallets
miner_wallet = "miner"
wallets["Miner Signet"] = btc_wallet_balance(miner_wallet)
relay_bootstrap_wallet = env_map.get("RELAY_BOOTSTRAP_WALLET_NAME")
if relay_bootstrap_wallet:
wallets["Relay Bootstrap"] = btc_wallet_balance(relay_bootstrap_wallet)
# Enumerate all bitcoin wallets (load if necessary) and balances
try:
bitcoin_wallets = {}
loaded = set(btc_list_wallets())
all_in_dir = btc_list_walletdir()
for wname in (all_in_dir or []):
if wname not in loaded:
btc_ensure_loaded(wname)
loaded.add(wname)
for wname in loaded:
bitcoin_wallets[wname] = btc_wallet_balance(wname)
wallets["Bitcoin Signet Wallets"] = bitcoin_wallets
except Exception:
pass
response = {
"timestamp": datetime.now().isoformat(),
"services": services,
"external": external,
"runner": runner,
"integrations_configured": externals_cfg,
"deployment": {"total": total, "healthy": healthy, "percent": percent},
"integrations_test": {
"ovh": ovh_test,
"mailchimp": mailchimp_test,
"stripe_subscriptions_by_offer": stripe_by_offer,
"stripe_prices": {pid: {"nickname": stripe_prices_map.get(pid, ""), "count": cnt} for pid, cnt in stripe_price_counts.items()},
},
"wallets": wallets,
}
self.wfile.write(json.dumps(response, indent=2).encode())
else:
self.send_response(404)
self.end_headers()
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization')
self.end_headers()
if __name__ == '__main__':
server = HTTPServer(('0.0.0.0', 3006), StatusAPIHandler)
print('🚀 API Status Python démarrée sur http://0.0.0.0:3006')
server.serve_forever()