606 lines
29 KiB
Python
606 lines
29 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import subprocess
|
|
import time
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from datetime import datetime
|
|
import re
|
|
import time as _time
|
|
import hashlib
|
|
import hmac
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
|
|
def run_cmd(command: list[str], timeout_seconds: int = 5) -> tuple[int, str, str]:
|
|
try:
|
|
proc = subprocess.run(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
timeout=timeout_seconds,
|
|
text=True,
|
|
)
|
|
return proc.returncode, proc.stdout.strip(), proc.stderr.strip()
|
|
except subprocess.TimeoutExpired:
|
|
return 124, "", "timeout"
|
|
except Exception as exc:
|
|
return 1, "", str(exc)
|
|
|
|
|
|
def get_container_inspect(container_name: str) -> dict:
|
|
code, out, _ = run_cmd([
|
|
"docker", "inspect", container_name,
|
|
"--format",
|
|
"{{json .}}",
|
|
], timeout_seconds=4)
|
|
if code != 0 or not out:
|
|
return {}
|
|
try:
|
|
return json.loads(out)
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def compute_uptime(started_at: str) -> str:
|
|
try:
|
|
start = datetime.fromisoformat(started_at.replace("Z", "+00:00"))
|
|
delta = datetime.now(start.tzinfo) - start
|
|
total_seconds = int(delta.total_seconds())
|
|
days, rem = divmod(total_seconds, 86400)
|
|
hours, rem = divmod(rem, 3600)
|
|
minutes, _ = divmod(rem, 60)
|
|
if days > 0:
|
|
return f"{days}j {hours}h {minutes}m"
|
|
if hours > 0:
|
|
return f"{hours}h {minutes}m"
|
|
return f"{minutes}m"
|
|
except Exception:
|
|
return "N/A"
|
|
|
|
|
|
def http_probe(url: str) -> tuple[str, str]:
|
|
# Use curl inside the container
|
|
code, out, _ = run_cmd(["curl", "-fsS", "--max-time", "5", url], timeout_seconds=6)
|
|
if code == 0:
|
|
return "running", "ok"
|
|
return "error", "unreachable"
|
|
|
|
|
|
def get_container_env(container_name: str) -> dict:
|
|
inspect = get_container_inspect(container_name)
|
|
env_list = (inspect.get("Config") or {}).get("Env") or []
|
|
env_map = {}
|
|
for e in env_list:
|
|
if "=" in e:
|
|
k, v = e.split("=", 1)
|
|
env_map[k] = v
|
|
return env_map
|
|
|
|
|
|
def get_file_in_container(container: str, path: str) -> str:
|
|
code, out, _ = run_cmd(["docker", "exec", container, "sh", "-c", f"[ -f {path} ] && cat {path} || true"], timeout_seconds=6)
|
|
return out if code == 0 else ""
|
|
|
|
|
|
def parse_wallet_name_from_conf(conf_text: str) -> str:
|
|
try:
|
|
# accept lines like wallet_name="default" or wallet_name=default
|
|
for line in conf_text.splitlines():
|
|
if "wallet_name" in line:
|
|
parts = line.split("=", 1)
|
|
if len(parts) == 2:
|
|
val = parts[1].strip().strip('"\'')
|
|
if val:
|
|
return val
|
|
return ""
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def btc_list_wallets() -> list:
|
|
code, out, _ = run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", "listwallets"], timeout_seconds=6)
|
|
if code == 0 and out:
|
|
try:
|
|
return json.loads(out) or []
|
|
except Exception:
|
|
return []
|
|
return []
|
|
|
|
|
|
def btc_list_walletdir() -> list:
|
|
code, out, _ = run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", "listwalletdir"], timeout_seconds=6)
|
|
if code == 0 and out:
|
|
try:
|
|
data = json.loads(out) or {}
|
|
names = [w.get("name") for w in (data.get("wallets") or []) if w.get("name")]
|
|
return names
|
|
except Exception:
|
|
return []
|
|
return []
|
|
|
|
|
|
def btc_ensure_loaded(wallet: str) -> None:
|
|
try:
|
|
# loadwallet returns error if already loaded; ignore
|
|
run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", "loadwallet", wallet], timeout_seconds=6)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def ws_placeholder(url: str) -> tuple[str, str]:
|
|
# Placeholder for WebSocket checks
|
|
return "running", "N/A (WebSocket)"
|
|
|
|
|
|
def exec_health(container: str, script: str) -> str:
|
|
code, out, _ = run_cmd(["docker", "exec", container, "sh", script], timeout_seconds=6)
|
|
return out if code == 0 or out else ""
|
|
|
|
|
|
def blindbit_scan_progress(container: str) -> str:
|
|
code, out, _ = run_cmd(["docker", "logs", "--tail", "200", container], timeout_seconds=6)
|
|
if code != 0 or not out:
|
|
return ""
|
|
lines = out.splitlines()
|
|
keywords = ("scan", "scanning", "index", "indexed", "sync", "block", "height")
|
|
for line in reversed(lines):
|
|
# Strip ANSI color codes
|
|
ansi_stripped = re.sub(r"\x1B\[[0-9;]*[mK]", "", line)
|
|
lower = ansi_stripped.lower()
|
|
if any(k in lower for k in keywords):
|
|
# Try to extract a 64-hex block hash from the line (after ANSI strip)
|
|
m = re.search(r"\b[0-9a-fA-F]{64}\b", ansi_stripped)
|
|
if m:
|
|
h = m.group(0).lower()
|
|
return f"{h[:15]}..."
|
|
# Fallback to trimmed message if no hash present
|
|
clean = ansi_stripped.strip()
|
|
return (clean[:220] + ("…" if len(clean) > 220 else ""))
|
|
return ""
|
|
|
|
|
|
def miner_detailed_state(container: str) -> str:
|
|
code, out, _ = run_cmd(["docker", "logs", "--tail", "200", container], timeout_seconds=6)
|
|
if code != 0 or not out:
|
|
return ""
|
|
lines = out.splitlines()
|
|
for line in reversed(lines):
|
|
# Strip ANSI
|
|
clean = re.sub(r"\x1B\[[0-9;]*[mK]", "", line).strip()
|
|
low = clean.lower()
|
|
if any(k in low for k in ["mining", "processed block", "new block", "candidate", "hash", "submit"]):
|
|
# Extract hash-like token if present
|
|
m = re.search(r"\b[0-9a-fA-F]{64}\b", clean)
|
|
if m:
|
|
h = m.group(0).lower()
|
|
return f"{h[:15]}..."
|
|
return clean[:200] + ("…" if len(clean) > 200 else "")
|
|
return ""
|
|
|
|
|
|
def image_info(image_ref: str) -> dict:
|
|
code, out, _ = run_cmd([
|
|
"docker", "image", "inspect", image_ref, "--format", "{{json .}}"
|
|
], timeout_seconds=4)
|
|
if code != 0 or not out:
|
|
return {}
|
|
|
|
|
|
def get_storage_size_bytes(container: str) -> int:
|
|
# Try common storage paths
|
|
for path in ("/app/data", "/app/storage", "/home/bitcoin/.4nk/storage"):
|
|
# Use cut to avoid awk braces in f-string
|
|
code, out, _ = run_cmd(["docker", "exec", container, "sh", "-c", f"[ -d {path} ] && du -sb {path} 2>/dev/null | cut -f1"], timeout_seconds=6)
|
|
if code == 0 and out.strip().isdigit():
|
|
try:
|
|
return int(out.strip())
|
|
except Exception:
|
|
continue
|
|
return 0
|
|
try:
|
|
data = json.loads(out)
|
|
return {
|
|
"id": data.get("Id"),
|
|
"created": data.get("Created"),
|
|
"tags": data.get("RepoTags"),
|
|
"digest": (data.get("RepoDigests") or [None])[0]
|
|
}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def docker_ps_names() -> set:
|
|
code, out, _ = run_cmd(["docker", "ps", "--format", "{{.Names}}"], timeout_seconds=6)
|
|
if code != 0 or not out:
|
|
return set()
|
|
return set(n.strip() for n in out.splitlines() if n.strip())
|
|
|
|
def ovh_safe_check(app_key: str, app_secret: str, consumer_key: str, service_name: str, base_url: str = "https://eu.api.ovh.com/1.0") -> dict:
|
|
try:
|
|
# Get OVH time
|
|
with urllib.request.urlopen(f"{base_url}/auth/time") as resp:
|
|
server_time = int(resp.read().decode().strip())
|
|
method = "GET"
|
|
path = f"/sms/{service_name}/senders"
|
|
url = f"{base_url}{path}"
|
|
body = ""
|
|
# Signature: $1$ + sha1(appSecret + '+' + consumerKey + '+' + method + '+' + url + '+' + body + '+' + timestamp)
|
|
to_sign = "+".join([app_secret, consumer_key, method, url, body, str(server_time)])
|
|
sha = hashlib.sha1(to_sign.encode()).hexdigest()
|
|
signature = f"$1${sha}"
|
|
req = urllib.request.Request(url)
|
|
req.add_header("X-Ovh-Application", app_key)
|
|
req.add_header("X-Ovh-Consumer", consumer_key)
|
|
req.add_header("X-Ovh-Signature", signature)
|
|
req.add_header("X-Ovh-Timestamp", str(server_time))
|
|
with urllib.request.urlopen(req, timeout=6) as r2:
|
|
status_code = r2.getcode()
|
|
if status_code == 200:
|
|
return {"provider": "OVH", "status": "ok"}
|
|
return {"provider": "OVH", "status": "error", "code": status_code}
|
|
except Exception:
|
|
return {"provider": "OVH", "status": "error"}
|
|
|
|
|
|
class StatusAPIHandler(BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
if self.path == '/api':
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'application/json')
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.end_headers()
|
|
|
|
# Map service definitions to docker containers and optional probes
|
|
service_defs = [
|
|
{"name": "Tor Proxy", "container": "tor-proxy", "protocol": "SOCKS", "port": 9050, "health": lambda: exec_health("tor-proxy", "/scripts/healthchecks/tor-progress.sh")},
|
|
{"name": "Bitcoin Signet", "container": "bitcoin-signet", "protocol": "RPC", "port": 8332, "health": lambda: exec_health("bitcoin-signet", "/scripts/healthchecks/bitcoin-progress.sh")},
|
|
{"name": "BlindBit Oracle", "container": "blindbit-oracle", "protocol": "HTTP", "port": 8000, "health": lambda: exec_health("blindbit-oracle", "/scripts/healthchecks/blindbit-progress.sh")},
|
|
{"name": "SDK Relay", "container": "sdk_relay", "protocol": "WebSocket", "port": 8090, "health": lambda: exec_health("sdk_relay", "/scripts/healthchecks/sdk-relay-progress.sh")},
|
|
{"name": "SDK Storage", "container": "sdk_storage", "protocol": "HTTP", "port": 8080, "probe": lambda: http_probe("http://sdk_storage:8080/health")},
|
|
{"name": "LeCoffre Frontend", "container": "lecoffre-front", "protocol": "HTTP", "port": 3000},
|
|
{"name": "IHM Client", "container": "ihm_client", "protocol": "HTTP", "port": 3003},
|
|
{"name": "Grafana", "container": "grafana", "protocol": "HTTP", "port": 3000, "probe": lambda: http_probe("http://grafana:3000/api/health")},
|
|
{"name": "Loki", "container": "loki", "protocol": "HTTP", "port": 3100, "probe": lambda: http_probe("http://loki:3100/ready")},
|
|
{"name": "Promtail", "container": "promtail", "protocol": "HTTP", "port": 9080},
|
|
{"name": "Miner Signet", "container": "signet_miner", "protocol": "Bitcoin", "port": None},
|
|
]
|
|
|
|
services = []
|
|
for sdef in service_defs:
|
|
inspect = get_container_inspect(sdef["container"]) or {}
|
|
state = (inspect.get("State") or {})
|
|
status = state.get("Status", "stopped")
|
|
started_at = state.get("StartedAt", "")
|
|
uptime = compute_uptime(started_at) if status == "running" else "N/A"
|
|
image_ref = inspect.get("Config", {}).get("Image") or ""
|
|
img = image_info(image_ref) if image_ref else {}
|
|
|
|
# health status text via scripts or simple probe
|
|
health_text = ""
|
|
health = "unknown"
|
|
try:
|
|
if "health" in sdef:
|
|
health_text = sdef["health"]() or ""
|
|
health = "healthy" if "ready" in health_text or "Synced" in health_text else "starting"
|
|
elif "probe" in sdef:
|
|
hstatus, _ = sdef["probe"]()
|
|
health = "healthy" if hstatus == "running" else "error"
|
|
if sdef.get("name") == "BlindBit Oracle":
|
|
progress = blindbit_scan_progress("blindbit-oracle")
|
|
if progress and progress not in (health_text or ""):
|
|
# If progress looks like a pure hash, show only the hash
|
|
if len(progress) == 64 and all(c in '0123456789abcdef' for c in progress):
|
|
health_text = (health_text + (" | " if health_text else "") + f"Scan: {progress}")
|
|
else:
|
|
health_text = (health_text + (" | " if health_text else "") + f"Scan: {progress}")
|
|
if sdef.get("name") == "Miner Signet":
|
|
mstate = miner_detailed_state("signet_miner")
|
|
if mstate:
|
|
health_text = (health_text + (" | " if health_text else "") + f"Miner: {mstate}")
|
|
except Exception:
|
|
health = "unknown"
|
|
|
|
# SDK Storage extra: compute data size
|
|
data_size_bytes = 0
|
|
if sdef["name"] == "SDK Storage" and status == "running":
|
|
try:
|
|
data_size_bytes = get_storage_size_bytes(sdef["container"]) or 0
|
|
except Exception:
|
|
data_size_bytes = 0
|
|
|
|
services.append({
|
|
"name": sdef["name"],
|
|
"status": status,
|
|
"image": image_ref,
|
|
"ip": (inspect.get("NetworkSettings") or {}).get("IPAddress"),
|
|
"port": sdef.get("port"),
|
|
"protocol": sdef.get("protocol"),
|
|
"uptime": uptime,
|
|
"health": health,
|
|
"health_text": health_text,
|
|
"image_info": img,
|
|
"data_size_bytes": data_size_bytes,
|
|
})
|
|
|
|
# External endpoints
|
|
ext_defs = [
|
|
{"name": "Mempool Signet", "url": "https://mempool2.4nkweb.com", "protocol": "HTTPS", "check": lambda: http_probe("https://mempool2.4nkweb.com/fr/docs/api/rest")},
|
|
{"name": "Relay Bootstrap", "url": "wss://dev3.4nkweb.com/ws/", "protocol": "WebSocket", "check": lambda: ws_placeholder("wss://dev3.4nkweb.com/ws/")},
|
|
{"name": "Signer Bootstrap", "url": "https://dev3.4nkweb.com", "protocol": "HTTPS", "check": lambda: http_probe("https://dev3.4nkweb.com")},
|
|
{"name": "Git Repository", "url": "git.4nkweb.com", "protocol": "SSH", "check": lambda: ("running", "N/A (SSH)")},
|
|
]
|
|
|
|
external = []
|
|
for ext in ext_defs:
|
|
status, response = ext["check"]()
|
|
external.append({
|
|
"name": ext["name"],
|
|
"url": ext["url"],
|
|
"protocol": ext["protocol"],
|
|
"status": status,
|
|
"response_time": response,
|
|
})
|
|
|
|
# Runner info from Gitea API if credentials present
|
|
runner = {}
|
|
|
|
# Back-end env placeholders configured?
|
|
back = get_container_inspect("lecoffre-back")
|
|
env_list = back.get("Config", {}).get("Env") if back else []
|
|
env_map = {e.split("=", 1)[0]: e.split("=", 1)[1] for e in env_list or [] if "=" in e}
|
|
externals_cfg = {
|
|
"OVH": bool(env_map.get("OVH_APPLICATION_KEY")),
|
|
"Stripe": bool(env_map.get("STRIPE_SECRET_KEY")),
|
|
"Mailchimp": bool(env_map.get("MAILCHIMP_API_KEY")),
|
|
}
|
|
|
|
# Try to fetch latest run from Gitea if configured
|
|
gitea_token = env_map.get("GIT_TOKEN") or env_map.get("GITEA_TOKEN")
|
|
gitea_base = env_map.get("GITEA_BASE_URL", "https://git.4nkweb.com").rstrip('/')
|
|
owners_raw = env_map.get("GITEA_OWNER", "") or "nicolas.cantu,Omar"
|
|
owners = [o.strip() for o in owners_raw.split(",") if o.strip()] if owners_raw else []
|
|
if gitea_token and owners:
|
|
try:
|
|
auth_header = f"Authorization: token {gitea_token}"
|
|
latest = None
|
|
latest_repo = None
|
|
for owner in owners:
|
|
# List repos for owner
|
|
u_repos = f"{gitea_base}/api/v1/users/{owner}/repos?limit=100"
|
|
code_r, out_r, _ = run_cmd(["curl", "-fsS", u_repos, "-H", auth_header, "-H", "accept: application/json"], timeout_seconds=6)
|
|
if code_r != 0 or not out_r:
|
|
# Try orgs endpoint as fallback
|
|
o_repos = f"{gitea_base}/api/v1/orgs/{owner}/repos?limit=100"
|
|
code_ro, out_ro, _ = run_cmd(["curl", "-fsS", o_repos, "-H", auth_header, "-H", "accept: application/json"], timeout_seconds=6)
|
|
if code_ro != 0 or not out_ro:
|
|
continue
|
|
out_r = out_ro
|
|
repos = json.loads(out_r)
|
|
for repo in repos:
|
|
name = repo.get("name")
|
|
if not name:
|
|
continue
|
|
runs_url = f"{gitea_base}/api/v1/repos/{owner}/{name}/actions/runs?limit=1"
|
|
code_u, out_u, _ = run_cmd(["curl", "-fsS", runs_url, "-H", auth_header, "-H", "accept: application/json"], timeout_seconds=6)
|
|
if code_u != 0 or not out_u:
|
|
continue
|
|
data = json.loads(out_u)
|
|
runs = data.get("workflow_runs") or data.get("data") or []
|
|
if runs:
|
|
r = runs[0]
|
|
ts = r.get("created_at") or r.get("started_at") or ""
|
|
if ts and (latest is None or ts > (latest.get("created_at") or latest.get("started_at") or "")):
|
|
latest = r
|
|
latest_repo = f"{owner}/{name}"
|
|
if latest and latest_repo:
|
|
runner = {
|
|
"name": latest_repo,
|
|
"status": latest.get("status") or latest.get("conclusion"),
|
|
"started_at": latest.get("created_at") or latest.get("started_at"),
|
|
"uptime": "",
|
|
"url": latest.get("html_url") or latest.get("url"),
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
# Deployment progress: basé sur présence (docker ps) ET santé
|
|
running_names = docker_ps_names()
|
|
total = len(services)
|
|
ready = 0
|
|
for s in services:
|
|
name = s.get("name")
|
|
container = next((d["container"] for d in service_defs if d["name"] == name), None)
|
|
present = container in running_names if container else False
|
|
healthy_s = (s.get("health") == "healthy")
|
|
running_s = (s.get("status") == "running")
|
|
# considérer prêt si present ET (healthy ou running)
|
|
if present and (healthy_s or running_s):
|
|
ready += 1
|
|
percent = int(ready * 100 / total) if total else 0
|
|
|
|
# Integrations: Mailchimp (Mandrill ping) and Stripe (counts)
|
|
mailchimp_test = {"provider": "Mailchimp", "status": "missing"}
|
|
if env_map.get("MAILCHIMP_API_KEY"):
|
|
try:
|
|
code_mc, out_mc, _ = run_cmd([
|
|
"curl", "-fsS", "-X", "POST",
|
|
"https://mandrillapp.com/api/1.0/users/ping.json",
|
|
"-H", "Content-Type: application/json",
|
|
"-d", json.dumps({"key": env_map.get("MAILCHIMP_API_KEY")})
|
|
], timeout_seconds=6)
|
|
if code_mc == 0 and (out_mc.strip() == '"PONG"' or 'PONG' in out_mc):
|
|
mailchimp_test = {"provider": "Mailchimp", "status": "ok"}
|
|
else:
|
|
mailchimp_test = {"provider": "Mailchimp", "status": "error"}
|
|
except Exception:
|
|
mailchimp_test = {"provider": "Mailchimp", "status": "error"}
|
|
|
|
# Stripe: lister prices et agréger en balayant les subscriptions (sans filtre price)
|
|
stripe_by_offer = {"CREATORS": 0, "STARTER": 0, "STANDARD": 0, "UNLIMITED": 0, "TOTAL": 0}
|
|
stripe_prices_map = {}
|
|
stripe_price_counts = {}
|
|
if env_map.get("STRIPE_SECRET_KEY"):
|
|
try:
|
|
auth_h = f"Authorization: Bearer {env_map.get('STRIPE_SECRET_KEY')}"
|
|
# 1) Lister les prices actifs (<=100) pour mapper price.id -> nickname
|
|
code_p, out_p, _ = run_cmd([
|
|
"curl", "-fsS", "https://api.stripe.com/v1/prices?limit=100&active=true",
|
|
"-H", auth_h
|
|
], timeout_seconds=6)
|
|
if code_p == 0 and out_p:
|
|
prices = (json.loads(out_p) or {}).get("data") or []
|
|
for pr in prices:
|
|
pid = pr.get('id')
|
|
stripe_prices_map[pid] = pr.get('nickname') or ''
|
|
stripe_price_counts[pid] = 0
|
|
# Déterminer les familles par ID connus (si présents dans l'env) sinon par nickname
|
|
creators_ids = set(filter(None, [env_map.get("STRIPE_CREATORS_PRICE_ID")]))
|
|
standard_ids = set(filter(None, [
|
|
env_map.get("STRIPE_STANDARD_SUBSCRIPTION_PRICE_ID"),
|
|
env_map.get("STRIPE_STANDARD_ANNUAL_SUBSCRIPTION_PRICE_ID"),
|
|
env_map.get("STRIPE_STANDARD_MONTHLY_YEAR_PRICE_ID"),
|
|
env_map.get("STRIPE_STANDARD_MONTHLY_MONTH_PRICE_ID"),
|
|
]))
|
|
starter_ids = set(filter(None, [
|
|
env_map.get("STRIPE_STARTER_ANNUAL_PRICE_ID"),
|
|
env_map.get("STRIPE_STARTER_MONTHLY_YEAR_PRICE_ID"),
|
|
env_map.get("STRIPE_STARTER_MONTHLY_MONTH_PRICE_ID"),
|
|
]))
|
|
unlimited_ids = set(filter(None, [
|
|
env_map.get("STRIPE_UNLIMITED_SUBSCRIPTION_PRICE_ID"),
|
|
env_map.get("STRIPE_UNLIMITED_ANNUAL_SUBSCRIPTION_PRICE_ID"),
|
|
]))
|
|
|
|
def family_for(pid: str, nickname: str) -> str:
|
|
if pid in creators_ids or (nickname and 'createur' in nickname.lower()):
|
|
return 'CREATORS'
|
|
if pid in starter_ids or (nickname and 'starter' in nickname.lower()):
|
|
return 'STARTER'
|
|
if pid in standard_ids or (nickname and 'standard' in nickname.lower()):
|
|
return 'STANDARD'
|
|
if pid in unlimited_ids or (nickname and 'unlimit' in nickname.lower()):
|
|
return 'UNLIMITED'
|
|
return ''
|
|
# 2) Lister subscriptions (active + trialing) et agréger par famille du price
|
|
starting_after = None
|
|
pages = 0
|
|
while pages < 3: # limite de pagination pour éviter les boucles longues
|
|
url = "https://api.stripe.com/v1/subscriptions?limit=100&status=active&status=trialing"
|
|
if starting_after:
|
|
url += f"&starting_after={starting_after}"
|
|
code_s, out_s, _ = run_cmd(["curl", "-fsS", url, "-H", auth_h], timeout_seconds=8)
|
|
if code_s != 0 or not out_s:
|
|
break
|
|
d = json.loads(out_s) or {}
|
|
subs = d.get("data") or []
|
|
for sub in subs:
|
|
items = ((sub.get("items") or {}).get("data") or [])
|
|
for it in items:
|
|
pid = ((it.get("price") or {}).get("id"))
|
|
nick = stripe_prices_map.get(pid, '')
|
|
fam = family_for(pid or '', nick)
|
|
if not fam:
|
|
continue
|
|
stripe_by_offer[fam] = stripe_by_offer.get(fam, 0) + 1
|
|
stripe_by_offer["TOTAL"] += 1
|
|
if pid:
|
|
stripe_price_counts[pid] = stripe_price_counts.get(pid, 0) + 1
|
|
if d.get("has_more") and subs:
|
|
starting_after = subs[-1].get('id')
|
|
pages += 1
|
|
continue
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
# OVH: afficher configuré/non configuré (appel signé non implémenté ici)
|
|
ovh_test = {"provider": "OVH", "status": "missing"}
|
|
if externals_cfg.get("OVH"):
|
|
ovh_test = ovh_safe_check(env_map.get("OVH_APPLICATION_KEY", ""), env_map.get("OVH_APPLICATION_SECRET", ""), env_map.get("OVH_CONSUMER_KEY", ""), env_map.get("OVH_SERVICE_NAME", ""))
|
|
|
|
# Wallet balances via bitcoin-cli (signet)
|
|
def btc_wallet_balance(wallet: str) -> dict:
|
|
try:
|
|
if wallet:
|
|
btc_ensure_loaded(wallet)
|
|
code_b, out_b, _ = run_cmd(["docker", "exec", "bitcoin-signet", "bitcoin-cli", "-signet", f"-rpcwallet={wallet}", "getbalances"], timeout_seconds=6)
|
|
if code_b == 0 and out_b:
|
|
b = json.loads(out_b)
|
|
conf = ((b.get("mine") or {}).get("trusted") or 0) if isinstance(b.get("mine"), dict) else 0
|
|
unconf = ((b.get("mine") or {}).get("untrusted_pending") or 0) if isinstance(b.get("mine"), dict) else 0
|
|
imm = ((b.get("mine") or {}).get("immature") or 0) if isinstance(b.get("mine"), dict) else 0
|
|
# Convert BTC -> sats
|
|
to_sats = lambda v: int(float(v) * 100_000_000)
|
|
return {"confirmed_sat": to_sats(conf), "unconfirmed_sat": to_sats(unconf), "immature_sat": to_sats(imm)}
|
|
except Exception:
|
|
pass
|
|
return {"confirmed_sat": 0, "unconfirmed_sat": 0, "immature_sat": 0}
|
|
|
|
wallets = {}
|
|
# Detect known wallets from service envs
|
|
relay_env = get_container_env("sdk_relay")
|
|
# Try env, then file conf
|
|
relay_wallet = relay_env.get("WALLET_NAME") or relay_env.get("SDK_RELAY_WALLET_NAME")
|
|
if not relay_wallet:
|
|
relay_conf = get_file_in_container("sdk_relay", "/app/.conf")
|
|
relay_wallet = parse_wallet_name_from_conf(relay_conf)
|
|
if relay_wallet:
|
|
wallets["SDK Relay"] = btc_wallet_balance(relay_wallet)
|
|
# Miner wallet: try default 'miner' else listwallets
|
|
miner_wallet = "miner"
|
|
wallets["Miner Signet"] = btc_wallet_balance(miner_wallet)
|
|
relay_bootstrap_wallet = env_map.get("RELAY_BOOTSTRAP_WALLET_NAME")
|
|
if relay_bootstrap_wallet:
|
|
wallets["Relay Bootstrap"] = btc_wallet_balance(relay_bootstrap_wallet)
|
|
|
|
# Enumerate all bitcoin wallets (load if necessary) and balances
|
|
try:
|
|
bitcoin_wallets = {}
|
|
loaded = set(btc_list_wallets())
|
|
all_in_dir = btc_list_walletdir()
|
|
for wname in (all_in_dir or []):
|
|
if wname not in loaded:
|
|
btc_ensure_loaded(wname)
|
|
loaded.add(wname)
|
|
for wname in loaded:
|
|
bitcoin_wallets[wname] = btc_wallet_balance(wname)
|
|
wallets["Bitcoin Signet Wallets"] = bitcoin_wallets
|
|
except Exception:
|
|
pass
|
|
|
|
response = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"services": services,
|
|
"external": external,
|
|
"runner": runner,
|
|
"integrations_configured": externals_cfg,
|
|
"deployment": {"total": total, "healthy": healthy, "percent": percent},
|
|
"integrations_test": {
|
|
"ovh": ovh_test,
|
|
"mailchimp": mailchimp_test,
|
|
"stripe_subscriptions_by_offer": stripe_by_offer,
|
|
"stripe_prices": {pid: {"nickname": stripe_prices_map.get(pid, ""), "count": cnt} for pid, cnt in stripe_price_counts.items()},
|
|
},
|
|
"wallets": wallets,
|
|
}
|
|
|
|
self.wfile.write(json.dumps(response, indent=2).encode())
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
|
|
def do_OPTIONS(self):
|
|
self.send_response(200)
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
|
|
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization')
|
|
self.end_headers()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
server = HTTPServer(('0.0.0.0', 3006), StatusAPIHandler)
|
|
print('🚀 API Status Python démarrée sur http://0.0.0.0:3006')
|
|
server.serve_forever()
|