Compare commits

..

135 Commits
main ... cicd

Author SHA1 Message Date
omaroughriss
cf93dde1c0 Update
All checks were successful
Build and Push to Registry / build-and-push (push) Successful in 1m50s
2025-07-03 13:40:59 +02:00
omaroughriss
833ea55233 Update
All checks were successful
Build and Push to Registry / build-and-push (push) Successful in 1m51s
2025-07-03 13:29:56 +02:00
omaroughriss
8dd41ead13 Update
Some checks failed
Build and Push to Registry / build-and-push (push) Failing after 17s
2025-07-03 13:26:26 +02:00
omaroughriss
bfb920029d Update
Some checks failed
Build and Push to Registry / build-and-push (push) Failing after 17s
2025-07-03 13:22:04 +02:00
omaroughriss
9d365dc862 Update conf
All checks were successful
Build and Push to Registry / build-and-push (push) Successful in 1m49s
2025-07-03 13:09:19 +02:00
omaroughriss
e4a9a1473f Update Dockerfile
All checks were successful
Build and Push to Registry / build-and-push (push) Successful in 1m51s
2025-07-03 13:02:28 +02:00
omaroughriss
60aa257f04 Update Dockerfile
Some checks failed
Build and Push to Registry / build-and-push (push) Failing after 24s
2025-07-03 13:01:01 +02:00
omaroughriss
740bddc0e9 Fix dep
All checks were successful
Build and Push to Registry / build-and-push (push) Successful in 1m53s
2025-07-03 12:33:45 +02:00
omaroughriss
b9fed4a386 Fix path error
Some checks failed
Build and Push to Registry / build-and-push (push) Failing after 41s
2025-07-03 12:30:30 +02:00
omaroughriss
475064a835 Use git secret CONF
Some checks failed
Build and Push to Registry / build-and-push (push) Failing after 18s
2025-07-03 12:27:52 +02:00
omaroughriss
f7742c8aee Add CICD
Some checks failed
Build and Push to Registry / build-and-push (push) Failing after 18s
2025-07-03 12:19:38 +02:00
omaroughriss
07ac93313f Update Dockerfile to use SSH 2025-07-03 12:19:27 +02:00
omaroughriss
0ba8bec07b Uset git SHH for deps 2025-07-03 12:19:06 +02:00
omaroughriss
3faa8ea746 Add sp_client package source 2025-07-03 12:18:44 +02:00
Sosthene
04ce0376a1 Add Dockerfile 2025-06-30 16:19:41 +02:00
Sosthene
bedabd4e41 Implement SpScanner with blindbit 2025-06-24 15:07:16 +02:00
Sosthene
66248a4600 [bug] fix improper mark_output_mined 2025-06-24 15:07:01 +02:00
Sosthene
60a497cdde Update to new compression 2025-06-24 15:06:38 +02:00
Sosthene
6de9f31ab1 [bug] Get only unspent outputs to craft transactions 2025-06-24 10:16:06 +02:00
Sosthene
870085d342 Set blindbit new block sync attempts at 4 2025-06-24 10:16:06 +02:00
Sosthene
13e92d4afb Rm electrs and related dependencies 2025-06-24 10:16:06 +02:00
Sosthene
7d5375e5df Cargo fmt 2025-06-24 10:16:06 +02:00
Sosthene
2dadcef03c Scan block with blindbit instead of electrs 2025-06-24 10:16:05 +02:00
760a377a3e Replace electrum with blindbit 2025-06-24 10:15:05 +02:00
fbabe71e76 Add blindbit config option 2025-06-24 10:13:13 +02:00
6ab48f0b19 Add blindbit_url to conf file 2025-06-24 10:13:13 +02:00
c3a58ed020 Add chain_tip to handshake message 2025-06-24 10:13:13 +02:00
Sosthene
7633c07e2f Minor update to latest sp_client 2025-06-23 18:00:07 +02:00
Sosthene
b783649d42 Update Cargo.lock 2025-06-23 17:59:16 +02:00
931c6b827b Cargo fmt 2025-06-03 18:42:12 +02:00
bf6f90f754 Update Cargo.lock 2025-06-03 18:41:53 +02:00
d1cd4491f5 Update commit to latest Pcd definition 2025-06-03 18:32:26 +02:00
12a8b9aab5 Refactoring to update to latest common 2025-04-08 16:03:12 +02:00
ebf27a0a8d process_validation accepts empty state 2025-03-13 14:40:08 +01:00
a390115b3e Update dependencies 2025-03-12 10:28:34 +01:00
818395177c Update to latest common 2025-03-12 10:28:21 +01:00
00f01e5f48 [bug] prevent freezed utxos being accidentally unlocked 2025-03-12 10:26:29 +01:00
69afa2695b Handle public_data in commitments message 2025-03-03 23:21:46 +01:00
02189bf3db udpate sdk_common dependency 2025-02-20 11:38:07 +01:00
7c278e9eb2 Update tests 2025-02-20 11:38:07 +01:00
fd763e6193 Minor fixes in commit 2025-02-20 11:38:07 +01:00
7a060c123d Roles in process state 2025-02-20 11:38:07 +01:00
f7260d6dce Update dependencies 2025-02-20 11:38:07 +01:00
fd8c40e09a Refactor commit 2025-02-05 14:36:33 +01:00
efa1129e45 [bug] quick fix by removing commit msg from cache to allow retry 2025-02-04 10:47:31 +01:00
8b978b91e9 list_unspent also list unsafe outputs 2025-02-04 10:39:29 +01:00
de3808eb1e Update dependencies 2025-02-04 10:39:14 +01:00
2cc026077e Add an output to update for pairing process 2025-02-03 16:24:20 +01:00
f722dee8f6 Change criteria for a pairing process 2025-02-03 15:57:35 +01:00
159fd78a3a [bug] deadlock when spending from core for the faucet 2025-01-28 21:41:45 +01:00
e69ae1a21a configurable data_dir 2025-01-24 17:43:39 +01:00
6fc08f3fdd Broadcast new process 2025-01-24 17:35:52 +01:00
4839c95e93 Send a partial update when a new member is created 2025-01-24 13:33:54 +01:00
c3aba61be2 Better handling of process updates 2025-01-24 12:42:38 +01:00
3ea25e542a Optimize handshake 2025-01-21 11:07:11 +01:00
eea95b8342 Update process when commiting new state 2025-01-17 18:15:57 +01:00
04b9785531 minor fixes 2025-01-17 09:23:35 +01:00
dbca46a739 Refactor process_validation 2025-01-17 09:22:52 +01:00
5d18fd7688 Refactor to prevent accidental double spends 2025-01-17 09:21:29 +01:00
1d1b3546d6 SilentPaymentWallet::save() takes an already unlocked wallet as argument 2025-01-17 09:19:01 +01:00
8d7a6c7400 Load freezed utxos at startup 2025-01-12 14:11:18 +01:00
d99cab0a26 [bug] fix saving/loading files 2025-01-10 16:27:05 +01:00
17c3fefa88 Rm PublicLists 2025-01-10 16:25:38 +01:00
3fd014edd9 Less verbosity 2025-01-10 16:24:59 +01:00
7eb6304f6e [bug] Missing use statements in main 2025-01-10 09:35:41 +01:00
1e433b63e7 Write to disk Member when process is pairing 2025-01-09 17:31:06 +01:00
a55159449a Update to HandshakeMessage 2025-01-09 17:30:39 +01:00
9db6b9f957 [bug] deadlock in handle_initial_transaction 2025-01-09 16:53:02 +01:00
960c0808fa Update AnkFlag::Init to Handshake 2025-01-09 16:12:25 +01:00
a09eb404e2 Add save processes and members to disk 2025-01-09 16:11:14 +01:00
fdcd234838 Add MEMBERLIST and update it with pairing processes 2025-01-09 11:15:41 +01:00
292b7135ab [bug] deadlock in commit 2025-01-03 12:38:13 +01:00
4ec25d1494 [bug] doesn't fail if checking already scanned transaction 2025-01-02 22:36:41 +01:00
4a72fd3129 Return peers and processes list in init message 2025-01-02 14:23:24 +01:00
21c0882a34 Update to latest common dev 2025-01-02 14:22:35 +01:00
2b1bccb687 Add check_transaction_alone and scan transactions in mempool 2025-01-01 11:48:51 +01:00
1271e492f5 Remove useless verbosity 2024-12-19 14:09:02 +01:00
616f20da8f [bug] Set default fees for commit transaction in case Core fee estimation fails 2024-12-19 14:08:44 +01:00
a6bb827c56 Add the stateId when registering a new state 2024-12-19 14:08:07 +01:00
Sosthene
e708372223 Add Init message at connection with client 2024-12-17 13:42:38 +01:00
Sosthene
cc2368996f Update Cargo.lock 2024-12-17 13:41:52 +01:00
Sosthene
072acb57bc Update commit tests 2024-12-17 13:41:38 +01:00
Sosthene
25bbc64e6a Update to latest sdk_common 2024-12-03 10:45:49 +01:00
Sosthene
2788159c4d [bug] comment out roles check 2024-12-02 10:05:15 +01:00
Sosthene
465f32c4fe [bug] handling new_tx doesn't end up in sending empty message 2024-11-22 14:50:35 +01:00
Sosthene
398ee47612 [test] handle_commit_{new_process, new_state} 2024-11-18 15:35:13 +01:00
Sosthene
1cb20527da Refactor commitment 2024-11-18 15:34:10 +01:00
Sosthene
39a8ff87a9 Abstract Daemon methods in RpcCall trait 2024-11-18 15:21:52 +01:00
Sosthene
8eae4408f2 Refactor commit 2024-11-12 23:24:14 +01:00
Sosthene
ce2c158de2 Update zmq dependency 2024-11-12 23:23:22 +01:00
Sosthene
c468d455ca update Cargo.lock 2024-10-07 16:59:01 +02:00
Sosthene
80b2cf8c4c Update to latest common dev 2024-10-07 11:25:52 +02:00
Sosthene
bc625955e9 Add commit logic 2024-10-04 09:22:03 +02:00
Sosthene
0f726d70be Modify MessageCache constants 2024-10-04 09:21:38 +02:00
Sosthene
7b34096940 More efficient clean_up of MessageCache 2024-10-04 09:21:13 +02:00
Sosthene
fc726dbcf5 Modity create_transaction inputs 2024-10-04 09:20:27 +02:00
Sosthene
a5ea8485d6 Import MutexExt from common 2024-09-23 12:45:15 +02:00
Sosthene
d013c4e20f Update AnkNetworkMsg to Envelope 2024-08-28 10:40:37 +02:00
Sosthene
32916f1588 Save wallet to disk at creation 2024-08-12 16:38:16 +02:00
Sosthene
56321d89df Update to latest common dev 2024-08-12 12:22:28 +02:00
Sosthene
da04149923 scan at each new block 2024-06-25 11:23:15 +02:00
Sosthene
31c17e908d Move shared resources to static variables 2024-06-25 11:21:14 +02:00
Sosthene
fbd7a1c1ea Refactor message processing 2024-06-21 22:43:05 +02:00
ank
9f2c4ed2e1 Add config file + bug fix 2024-06-21 22:42:07 +02:00
Sosthene
e5e7496611 Update to latest sdk_common dev + minor refactoring 2024-06-17 13:48:20 +02:00
Sosthene
8e50727880 import SilentPaymentAddress from utils 2024-06-03 18:18:33 +02:00
Sosthene
a192edb761 sdk_common on branch dev 2024-05-29 15:23:43 +02:00
Sosthene
0d9e8ba4e5 Merge branch 'ws-server' into dev 2024-05-28 11:12:27 +02:00
Sosthene
7e5dc17841 Add an argument for core rpc and network 2024-05-28 11:11:17 +02:00
Sosthene
0d5149eb96 import sp_client through sdk_common 2024-05-28 11:10:48 +02:00
Sosthene
ad026a783e Add a message cache 2024-05-28 11:09:37 +02:00
Sosthene
539670d248 Handle new message types and better error management 2024-05-27 12:10:39 +02:00
Sosthene
de84c8a1bf Faucet refactoring 2024-05-22 10:18:02 +02:00
Sosthene
083843a94a [bug] estimate_fee 2024-05-18 16:36:49 +02:00
Sosthene
4455c76663 Process Unknown message 2024-05-06 10:48:56 +02:00
Sosthene00
ec8e4ebef8 refactoring 2024-04-29 16:03:09 +02:00
Sosthene00
94b96320d7 spend refactoring 2024-04-18 00:34:10 +02:00
Sosthene00
ee5fcb4932 Update sp_backend to sp_client 2024-04-17 21:57:58 +02:00
Sosthene00
a287db7cf8 [bug fix] set floor fees when no estimation from core 2024-04-17 08:25:52 +02:00
Sosthene00
8bbee83aac Return whole tx for faucet 2024-04-17 08:25:06 +02:00
Sosthene00
4d3dc8123a Simplify zmq & one track for all messaging 2024-04-17 08:24:12 +02:00
Sosthene00
459756815f abort if core doesn't have enough funds 2024-04-17 08:18:33 +02:00
Sosthene00
c1d1c0a4b5 Add sdk_common dependency 2024-04-09 14:18:28 +02:00
Sosthene00
7dba477f33 Refactoring 2024-04-08 18:15:26 +02:00
Sosthene00
306949e9f0 Refactoring 2024-03-21 18:08:09 +01:00
Sosthene00
6db81ee769 Add SilentPaymentWallet 2024-03-21 18:06:39 +01:00
Sosthene00
d33c3e9735 Add MutexExt trait 2024-03-21 18:06:17 +01:00
Sosthene00
a28f40fa0c derive Debug for Daemon 2024-03-21 18:05:46 +01:00
Sosthene00
d816115929 use sp_backend 2024-03-20 17:28:12 +01:00
Sosthene00
3ebc319a26 Add DUST constant 2024-03-15 12:39:17 +01:00
Sosthene00
ed37accb67 Add electrum_client 2024-03-15 12:39:04 +01:00
Sosthene00
7774207e01 Faucet spending 2024-03-08 20:40:25 +01:00
Sosthene00
5f4efa5aa3 Add the sp wallet + daemon logic, some refactoring 2024-03-08 15:37:29 +01:00
Sosthene00
2d044ec2c2 Analyze rawtx msg and add sp_tweak 2024-03-07 09:36:59 +01:00
Sosthene00
eb6699baca base_server 2024-02-21 17:11:13 +01:00
13 changed files with 5983 additions and 0 deletions

7
.conf.model Normal file
View File

@ -0,0 +1,7 @@
core_url=""
ws_url=""
wallet_name="default"
network="signet"
electrum_url="tcp://localhost:60601"
blindbit_url="tcp://localhost:8000"
zmq_url=""

43
.github/workflows/cicd.yml vendored Normal file
View File

@ -0,0 +1,43 @@
name: Build and Push to Registry
on:
push:
branches: [ cicd ]
env:
REGISTRY: git.4nkweb.com
IMAGE_NAME: 4nk/sdk_relay
jobs:
build-and-push:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up SSH agent
uses: webfactory/ssh-agent@v0.9.1
with:
ssh-private-key: ${{ secrets.SSH_PRIVATE_KEY }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ secrets.USER }}
password: ${{ secrets.TOKEN }}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
push: true
ssh: default
build-args: |
CONF=${{ secrets.CONF }}
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ gitea.sha }}

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
.conf

2828
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

24
Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
name = "sdk_relay"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
bitcoincore-rpc = { version = "0.18" }
env_logger = "0.9"
futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
hex = "0.4.3"
log = "0.4.20"
sdk_common = { git = "ssh://git@git.4nkweb.com/4nk/sdk_common.git", branch = "blindbit" }
serde = { version = "1.0.193", features = ["derive"]}
serde_json = "1.0"
serde_with = "3.6.0"
tokio = { version = "1.0.0", features = ["io-util", "rt-multi-thread", "macros", "sync"] }
tokio-stream = "0.1"
tokio-tungstenite = "0.21.0"
zeromq = "0.4.1"
[dev-dependencies]
mockall = "0.13.0"

46
Dockerfile Normal file
View File

@ -0,0 +1,46 @@
# syntax=docker/dockerfile:1.4
FROM rust:latest AS builder
WORKDIR /app
# Configuration de git pour utiliser SSH
RUN mkdir -p /root/.ssh && \
ssh-keyscan git.4nkweb.com >> /root/.ssh/known_hosts
# Copie des fichiers de sdk_relay
COPY Cargo.toml Cargo.lock ./
COPY src/ src/
# Build avec support SSH pour récupérer les dépendances
RUN --mount=type=ssh cargo build --release
# ---- image finale ----
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates strace
# Créer l'utilisateur bitcoin
RUN useradd -m -d /home/bitcoin -u 1000 bitcoin
COPY --from=builder /app/target/release/sdk_relay /usr/local/bin/sdk_relay
RUN chown bitcoin:bitcoin /usr/local/bin/sdk_relay && \
chmod 755 /usr/local/bin/sdk_relay
# Configuration via build arg
ARG CONF
RUN echo "$CONF" > /home/bitcoin/.conf && \
chown bitcoin:bitcoin /home/bitcoin/.conf && \
chmod 644 /home/bitcoin/.conf
# Créer le répertoire .4nk avec les bonnes permissions
RUN mkdir -p /home/bitcoin/.4nk && \
chown -R bitcoin:bitcoin /home/bitcoin/.4nk && \
chmod 755 /home/bitcoin/.4nk
WORKDIR /home/bitcoin
USER bitcoin
ENV HOME=/home/bitcoin
VOLUME ["/home/bitcoin/.4nk"]
VOLUME ["/home/bitcoin/.bitcoin"]
EXPOSE 8090 8091
ENTRYPOINT ["sdk_relay", "--config", "/home/bitcoin/.conf"]

769
src/commit.rs Normal file
View File

@ -0,0 +1,769 @@
use std::{
collections::HashMap,
sync::{Mutex, MutexGuard, OnceLock},
};
use anyhow::{Error, Result};
use bitcoincore_rpc::bitcoin::hex::DisplayHex;
use sdk_common::network::{AnkFlag, CommitMessage, HandshakeMessage};
use sdk_common::process::{lock_processes, Process, ProcessState};
use sdk_common::serialization::{OutPointMemberMap, OutPointProcessMap};
use sdk_common::silentpayments::create_transaction;
use sdk_common::sp_client::bitcoin::{Amount, OutPoint};
use sdk_common::sp_client::{FeeRate, Recipient};
use sdk_common::{
pcd::Member,
silentpayments::sign_transaction,
sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress},
};
use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET};
use crate::{
message::{broadcast_message, BroadcastType},
CHAIN_TIP,
};
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
let mut processes = lock_processes()?;
if let Some(process) = processes.get_mut(&commit_msg.process_id) {
handle_existing_commitment(process, &commit_msg)?;
} else {
let new_process = handle_new_process(&commit_msg)?;
// Cache the process
processes.insert(commit_msg.process_id, new_process);
}
// Dump to disk
dump_cached_processes(processes.clone())?;
// Add to frozen UTXOs
lock_freezed_utxos()?.insert(commit_msg.process_id);
// Update processes with the
// Send an update to all connected client
let our_sp_address = WALLET
.get()
.ok_or(Error::msg("Wallet not initialized"))?
.lock_anyhow()?
.get_sp_client()
.get_receiving_address();
let mut new_process_map = HashMap::new();
let new_process = processes.get(&commit_msg.process_id).unwrap().clone();
new_process_map.insert(commit_msg.process_id, new_process);
let current_tip = CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst);
let init_msg = HandshakeMessage::new(
our_sp_address.to_string(),
OutPointMemberMap(HashMap::new()),
OutPointProcessMap(new_process_map),
current_tip.into(),
);
if let Err(e) = broadcast_message(
AnkFlag::Handshake,
format!("{}", init_msg.to_string()),
BroadcastType::ToAll,
) {
log::error!("Failed to send handshake message: {}", e);
}
Ok(commit_msg.process_id)
}
fn handle_new_process(commit_msg: &CommitMessage) -> Result<Process> {
let pcd_commitment = &commit_msg.pcd_commitment;
let merkle_root_bin = pcd_commitment.create_merkle_tree()?.root().unwrap();
if let Ok(pairing_process_id) = handle_member_list(&commit_msg) {
dump_cached_members()?;
// Send a handshake message to every connected client
if let Some(new_member) = lock_members().unwrap().get(&pairing_process_id) {
let our_sp_address = WALLET
.get()
.ok_or(Error::msg("Wallet not initialized"))?
.lock_anyhow()?
.get_sp_client()
.get_receiving_address();
let mut new_member_map = HashMap::new();
new_member_map.insert(pairing_process_id, new_member.clone());
let init_msg = HandshakeMessage::new(
our_sp_address.into(),
OutPointMemberMap(new_member_map),
OutPointProcessMap(HashMap::new()),
CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst).into(),
);
if let Err(e) = broadcast_message(
AnkFlag::Handshake,
format!("{}", init_msg.to_string()),
BroadcastType::ToAll,
) {
log::error!("Failed to send handshake message: {}", e);
}
} else {
log::error!(
"Failed to find new member with process id {}",
pairing_process_id
);
}
}
let mut new_process = Process::new(commit_msg.process_id);
let init_state = ProcessState {
commited_in: commit_msg.process_id,
roles: commit_msg.roles.clone(),
pcd_commitment: commit_msg.pcd_commitment.clone(),
state_id: merkle_root_bin,
public_data: commit_msg.public_data.clone(),
..Default::default()
};
new_process.insert_concurrent_state(init_state)?;
Ok(new_process)
}
pub static MEMBERLIST: OnceLock<Mutex<HashMap<OutPoint, Member>>> = OnceLock::new();
pub fn lock_members() -> Result<MutexGuard<'static, HashMap<OutPoint, Member>>, anyhow::Error> {
MEMBERLIST
.get_or_init(|| Mutex::new(HashMap::new()))
.lock_anyhow()
}
fn handle_member_list(commit_msg: &CommitMessage) -> Result<OutPoint> {
//Check if there is one role with one member
if commit_msg.roles.len() != 1 {
return Err(Error::msg("Process is not a pairing process"));
}
if let Some(pairing_role) = commit_msg.roles.get("pairing") {
if !pairing_role.members.is_empty() {
return Err(Error::msg("Process is not a pairing process"));
}
} else {
return Err(Error::msg("Process is not a pairing process"));
}
if let Ok(paired_addresses) = commit_msg.public_data.get_as_json("pairedAddresses") {
let paired_addresses: Vec<SilentPaymentAddress> = serde_json::from_value(paired_addresses.clone())?;
let mut memberlist = lock_members()?;
memberlist.insert(commit_msg.process_id, Member::new(paired_addresses));
return Ok(commit_msg.process_id);
}
Err(Error::msg("Process is not a pairing process"))
}
fn handle_existing_commitment(
process_to_udpate: &mut Process,
commit_msg: &CommitMessage,
) -> Result<()> {
let process_id = process_to_udpate.get_process_id()?;
match register_new_state(process_to_udpate, &commit_msg) {
Ok(new_state_id) => log::debug!(
"Registering new state for process {} with state id {}",
process_id,
new_state_id.to_lower_hex_string()
),
Err(existing_state_id) => log::debug!("State {} already exists", existing_state_id),
}
if commit_msg.validation_tokens.len() > 0 {
log::debug!(
"Received commit_msg with {} validation tokens for process {}",
commit_msg.validation_tokens.len(),
process_id
);
// If the validation succeed, we return a new tip
process_validation(process_to_udpate, commit_msg)?;
}
Ok(())
}
pub fn dump_cached_members() -> Result<(), anyhow::Error> {
let members = lock_members()?.clone();
let storage = STORAGE
.get()
.ok_or(Error::msg("STORAGE is not initialized"))?
.lock_anyhow()?;
let members_file = &storage.members_file;
let members_map = OutPointMemberMap(members);
let json = serde_json::to_value(&members_map)?;
members_file.save(&json)?;
log::debug!("saved members");
Ok(())
}
pub fn dump_cached_processes(processes: HashMap<OutPoint, Process>) -> Result<(), anyhow::Error> {
let storage = STORAGE
.get()
.ok_or(Error::msg("STORAGE is not initialized"))?
.lock_anyhow()?;
let processes_file = &storage.processes_file;
let outpoints_map = OutPointProcessMap(processes);
let json = serde_json::to_value(&outpoints_map)?;
processes_file.save(&json)?;
log::debug!("saved processes");
Ok(())
}
// Register a new state
fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Result<[u8; 32]> {
let last_commited_state = process.get_latest_commited_state();
let new_state_id = commit_msg
.pcd_commitment
.create_merkle_tree()?
.root()
.unwrap();
if let Some(state) = last_commited_state {
if new_state_id == state.state_id {
return Err(Error::msg(format!(
"{}",
new_state_id.to_lower_hex_string()
)));
}
}
let concurrent_states = process.get_latest_concurrent_states()?;
let (empty_state, actual_states) = concurrent_states.split_last().unwrap();
let current_outpoint = empty_state.commited_in;
// Ensure no duplicate states
if actual_states
.iter()
.any(|state| state.state_id == new_state_id)
{
return Err(Error::msg(format!(
"{}",
new_state_id.to_lower_hex_string()
)));
}
// Add the new state
let new_state = ProcessState {
commited_in: current_outpoint,
pcd_commitment: commit_msg.pcd_commitment.clone(),
state_id: new_state_id.clone(),
roles: commit_msg.roles.clone(),
public_data: commit_msg.public_data.clone(),
..Default::default()
};
process.insert_concurrent_state(new_state)?;
Ok(new_state_id)
}
// Process validation for a state with validation tokens
fn process_validation(
updated_process: &mut Process,
commit_msg: &CommitMessage,
) -> Result<OutPoint> {
let new_state_id = if commit_msg.pcd_commitment.is_empty() {
// We're dealing with an obliteration attempt
[0u8; 32]
} else {
commit_msg
.pcd_commitment
.create_merkle_tree()?
.root()
.ok_or(Error::msg("Invalid merkle tree"))?
};
{
let state_to_update = updated_process.get_state_for_id_mut(&new_state_id)?;
// Complete with the received tokens
state_to_update
.validation_tokens
.extend(commit_msg.validation_tokens.iter());
state_to_update.validation_tokens.sort_unstable();
state_to_update.validation_tokens.dedup();
}
let state_to_validate = updated_process.get_state_for_id(&new_state_id)?;
let members = lock_members()?.clone();
state_to_validate.is_valid(
updated_process.get_latest_commited_state(),
&OutPointMemberMap(members),
)?;
let commited_in = commit_new_transaction(updated_process, state_to_validate.clone())?;
Ok(commited_in)
}
// Commit the new transaction and update the process state
fn commit_new_transaction(
updated_process: &mut Process,
state_to_commit: ProcessState,
) -> Result<OutPoint> {
let sp_wallet = WALLET
.get()
.ok_or(Error::msg("Wallet not initialized"))?
.lock_anyhow()?;
let commitment_payload = Vec::from(state_to_commit.state_id);
let mut recipients = vec![];
recipients.push(Recipient {
address: RecipientAddress::SpAddress(sp_wallet.get_sp_client().get_receiving_address()),
amount: Amount::from_sat(1000),
});
// TODO not sure if this is still used
// If the process is a pairing, we add another output that directly pays the owner of the process
// We can find out simply by looking at the members list
if let Some(member) = lock_members()?.get(&updated_process.get_process_id().unwrap()) {
// We just pick one of the devices of the member at random en pay to it, member can then share the private key between all devices
// For now we take the first address
let address: SilentPaymentAddress =
member.get_addresses().get(0).unwrap().as_str().try_into()?;
recipients.push(Recipient {
address: RecipientAddress::SpAddress(address),
amount: Amount::from_sat(1000),
});
}
// This output is used to generate publicly available public keys without having to go through too many loops
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
let fee_rate = daemon
.estimate_fee(6)
.unwrap_or(Amount::from_sat(1000))
.checked_div(1000)
.unwrap();
let mut freezed_utxos = lock_freezed_utxos()?;
let next_commited_in = updated_process.get_process_tip()?;
if !freezed_utxos.contains(&next_commited_in) {
return Err(Error::msg(format!(
"Missing next commitment outpoint for process {}",
updated_process.get_process_id()?
)));
};
let unspent_outputs = sp_wallet.get_unspent_outputs();
let mut available_outpoints = vec![];
// We push the next_commited_in at the top of the available outpoints
if let Some(output) = unspent_outputs.get(&next_commited_in) {
available_outpoints.push((next_commited_in, output.clone()));
}
// We filter out freezed utxos
for (outpoint, output) in unspent_outputs {
if !freezed_utxos.contains(&outpoint) {
available_outpoints.push((outpoint, output));
}
}
let unsigned_transaction = create_transaction(
available_outpoints,
sp_wallet.get_sp_client(),
recipients,
Some(commitment_payload),
FeeRate::from_sat_per_vb(fee_rate.to_sat() as f32),
)?;
let final_tx = sign_transaction(sp_wallet.get_sp_client(), unsigned_transaction)?;
daemon.test_mempool_accept(&final_tx)?;
let txid = daemon.broadcast(&final_tx)?;
let commited_in = OutPoint::new(txid, 0);
freezed_utxos.insert(commited_in);
freezed_utxos.remove(&next_commited_in);
updated_process.remove_all_concurrent_states()?;
updated_process.insert_concurrent_state(state_to_commit)?;
updated_process.update_states_tip(commited_in)?;
Ok(commited_in)
}
// TODO tests are broken, we need a complete overhaul to make it work again
#[cfg(test)]
mod tests {
use super::*;
use crate::daemon::RpcCall;
use crate::DiskStorage;
use crate::StateFile;
use bitcoincore_rpc::bitcoin::consensus::deserialize;
use bitcoincore_rpc::bitcoin::hex::DisplayHex;
use bitcoincore_rpc::bitcoin::*;
use mockall::mock;
use mockall::predicate::*;
use sdk_common::pcd::Member;
use sdk_common::pcd::Pcd;
use sdk_common::pcd::PcdCommitments;
use sdk_common::pcd::RoleDefinition;
use sdk_common::pcd::Roles;
use sdk_common::pcd::ValidationRule;
use sdk_common::process::CACHEDPROCESSES;
use sdk_common::sp_client::bitcoin::consensus::serialize;
use sdk_common::sp_client::bitcoin::hex::FromHex;
use sdk_common::sp_client::silentpayments::SilentPaymentAddress;
use serde_json::json;
use serde_json::{Map, Value};
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Mutex;
use std::sync::OnceLock;
const LOCAL_ADDRESS: &str = "sprt1qq222dhaxlzmjft2pa7qtspw2aw55vwfmtnjyllv5qrsqwm3nufxs6q7t88jf9asvd7rxhczt87de68du3jhem54xvqxy80wc6ep7lauxacsrq79v";
const INIT_TRANSACTION: &str = "02000000000102b01b832bf34cf87583c628839c5316546646dcd4939e339c1d83e693216cdfa00100000000fdffffffdd1ca865b199accd4801634488fca87e0cf81b36ee7e9bec526a8f922539b8670000000000fdffffff0200e1f505000000001600140798fac9f310cefad436ea928f0bdacf03a11be544e0f5050000000016001468a66f38e7c2c9e367577d6fad8532ae2c728ed2014043764b77de5041f80d19e3d872f205635f87486af015c00d2a3b205c694a0ae1cbc60e70b18bcd4470abbd777de63ae52600aba8f5ad1334cdaa6bcd931ab78b0140b56dd8e7ac310d6dcbc3eff37f111ced470990d911b55cd6ff84b74b579c17d0bba051ec23b738eeeedba405a626d95f6bdccb94c626db74c57792254bfc5a7c00000000";
const TMP_WALLET: &str = "/tmp/.4nk/wallet";
const TMP_PROCESSES: &str = "/tmp/.4nk/processes";
const TMP_MEMBERS: &str = "/tmp/.4nk/members";
// Define the mock for Daemon with the required methods
mock! {
#[derive(Debug)]
pub Daemon {}
impl RpcCall for Daemon {
fn connect(
rpcwallet: Option<String>,
rpc_url: String,
network: bitcoincore_rpc::bitcoin::Network,
) -> Result<Self> where Self: Sized;
fn estimate_fee(&self, nblocks: u16) -> Result<Amount>;
fn get_relay_fee(&self) -> Result<Amount>;
fn get_current_height(&self) -> Result<u64>;
fn get_block(&self, block_hash: BlockHash) -> Result<Block>;
fn get_filters(&self, block_height: u32) -> Result<(u32, BlockHash, bip158::BlockFilter)>;
fn list_unspent_from_to(
&self,
minamt: Option<Amount>,
) -> Result<Vec<bitcoincore_rpc::json::ListUnspentResultEntry>>;
fn create_psbt(
&self,
unspents: &[bitcoincore_rpc::json::ListUnspentResultEntry],
spk: ScriptBuf,
network: Network,
) -> Result<String>;
fn process_psbt(&self, psbt: String) -> Result<String>;
fn finalize_psbt(&self, psbt: String) -> Result<String>;
fn get_network(&self) -> Result<Network>;
fn test_mempool_accept(
&self,
tx: &Transaction,
) -> Result<crate::bitcoin_json::TestMempoolAcceptResult>;
fn broadcast(&self, tx: &Transaction) -> Result<Txid>;
fn get_transaction_info(
&self,
txid: &Txid,
blockhash: Option<BlockHash>,
) -> Result<Value>;
fn get_transaction_hex(
&self,
txid: &Txid,
blockhash: Option<BlockHash>,
) -> Result<Value>;
fn get_transaction(
&self,
txid: &Txid,
blockhash: Option<BlockHash>,
) -> Result<Transaction>;
fn get_block_txids(&self, blockhash: BlockHash) -> Result<Vec<Txid>>;
fn get_mempool_txids(&self) -> Result<Vec<Txid>>;
fn get_mempool_entries(
&self,
txids: &[Txid],
) -> Result<Vec<Result<bitcoincore_rpc::json::GetMempoolEntryResult>>>;
fn get_mempool_transactions(
&self,
txids: &[Txid],
) -> Result<Vec<Result<Transaction>>>;
}
}
mock! {
#[derive(Debug)]
pub SpWallet {
fn get_receiving_address(&self) -> Result<String>;
}
}
mock! {
#[derive(Debug)]
pub SilentPaymentWallet {
fn get_sp_wallet(&self) -> Result<MockSpWallet>;
}
}
static WALLET: OnceLock<MockSilentPaymentWallet> = OnceLock::new();
pub fn initialize_static_variables() {
if DAEMON.get().is_none() {
let mut daemon = MockDaemon::new();
daemon
.expect_broadcast()
.withf(|tx: &Transaction| serialize(tx).to_lower_hex_string() == INIT_TRANSACTION)
.returning(|tx| Ok(tx.txid()));
DAEMON
.set(Mutex::new(Box::new(daemon)))
.expect("DAEMON should only be initialized once");
println!("Initialized DAEMON");
}
if WALLET.get().is_none() {
let mut wallet = MockSilentPaymentWallet::new();
wallet
.expect_get_sp_wallet()
.returning(|| Ok(MockSpWallet::new()));
WALLET
.set(wallet)
.expect("WALLET should only be initialized once");
println!("Initialized WALLET");
}
if CACHEDPROCESSES.get().is_none() {
CACHEDPROCESSES
.set(Mutex::new(HashMap::new()))
.expect("CACHEDPROCESSES should only be initialized once");
println!("Initialized CACHEDPROCESSES");
}
if STORAGE.get().is_none() {
let wallet_file = StateFile::new(PathBuf::from_str(TMP_WALLET).unwrap());
let processes_file = StateFile::new(PathBuf::from_str(TMP_PROCESSES).unwrap());
let members_file = StateFile::new(PathBuf::from_str(TMP_MEMBERS).unwrap());
wallet_file.create().unwrap();
processes_file.create().unwrap();
members_file.create().unwrap();
let disk_storage = DiskStorage {
wallet_file,
processes_file,
members_file,
};
STORAGE
.set(Mutex::new(disk_storage))
.expect("STORAGE should initialize only once");
println!("Initialized STORAGE");
}
}
fn mock_commit_msg(process_id: OutPoint) -> CommitMessage {
let field_names = [
"a".to_owned(),
"b".to_owned(),
"pub_a".to_owned(),
"roles".to_owned(),
];
let pairing_id = OutPoint::from_str(
"b0c8378ee68e9a73836b04423ddb6de9fc0e2e715e04ffe6aa34117bb1025f01:0",
)
.unwrap();
let member = Member::new(vec![SilentPaymentAddress::try_from(LOCAL_ADDRESS).unwrap()]);
let validation_rule = ValidationRule::new(1.0, Vec::from(field_names), 1.0).unwrap();
let role_def = RoleDefinition {
members: vec![pairing_id],
validation_rules: vec![validation_rule],
storages: vec![],
};
let roles = Roles::new(BTreeMap::from([(String::from("role_name"), role_def)]));
let public_data = TryInto::<Pcd>::try_into(json!({"pub_a": Value::Null})).unwrap();
let clear_state =
TryInto::<Pcd>::try_into(json!({"a": Value::Null, "b": Value::Null})).unwrap();
let pcd_commitments = PcdCommitments::new(
&process_id,
&Pcd::new(public_data.clone().into_iter().chain(clear_state).collect()),
&roles,
)
.unwrap();
let commit_msg = CommitMessage {
process_id,
roles,
public_data,
validation_tokens: vec![],
pcd_commitment: pcd_commitments,
error: None,
};
commit_msg
}
#[test]
fn test_handle_commit_new_process() {
initialize_static_variables();
let init_tx =
deserialize::<Transaction>(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap();
let init_txid = init_tx.txid();
let process_id = OutPoint::new(init_txid, 0);
let commit_msg = mock_commit_msg(process_id);
let roles = commit_msg.roles.clone();
let pcd_commitment = commit_msg.pcd_commitment.clone();
let empty_state = ProcessState {
commited_in: process_id,
..Default::default()
};
let result = handle_commit_request(commit_msg);
assert_eq!(result.unwrap(), process_id);
let cache = CACHEDPROCESSES.get().unwrap().lock().unwrap();
let updated_process = cache.get(&process_id);
assert!(updated_process.is_some());
let concurrent_states = updated_process
.unwrap()
.get_latest_concurrent_states()
.unwrap();
// Constructing the roles_map that was inserted in the process
let roles_object = serde_json::to_value(roles).unwrap();
let mut roles_map = Map::new();
roles_map.insert("roles".to_owned(), roles_object);
let new_state = ProcessState {
commited_in: process_id,
pcd_commitment,
..Default::default()
};
let target = vec![&empty_state, &new_state];
assert_eq!(concurrent_states, target);
}
#[test]
fn test_handle_commit_new_state() {
initialize_static_variables();
let init_tx =
deserialize::<Transaction>(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap();
let init_txid = init_tx.txid();
let process_id = OutPoint::new(init_txid, 0);
let commit_msg = mock_commit_msg(process_id);
let roles = commit_msg.roles.clone();
let pcd_commitment = commit_msg.pcd_commitment.clone();
let process = Process::new(process_id);
CACHEDPROCESSES
.get()
.unwrap()
.lock()
.unwrap()
.insert(process_id, process);
let result = handle_commit_request(commit_msg);
assert_eq!(result.unwrap(), process_id);
let cache = CACHEDPROCESSES.get().unwrap().lock().unwrap();
let updated_process = cache.get(&process_id);
assert!(updated_process.is_some());
let concurrent_states = updated_process
.unwrap()
.get_latest_concurrent_states()
.unwrap();
let roles_object = serde_json::to_value(roles).unwrap();
let mut roles_map = Map::new();
roles_map.insert("roles".to_owned(), roles_object);
let new_state = ProcessState {
commited_in: process_id,
pcd_commitment,
..Default::default()
};
let empty_state = ProcessState {
commited_in: process_id,
..Default::default()
};
let target = vec![&empty_state, &new_state];
assert_eq!(concurrent_states, target);
}
// #[test]
// fn test_handle_commit_request_invalid_init_tx() {
// let commit_msg = CommitMessage {
// init_tx: "invalid_tx_hex".to_string(),
// roles: HashMap::new(),
// validation_tokens: vec![],
// pcd_commitment: json!({"roles": "expected_roles"}).as_object().unwrap().clone(),
// };
// // Call the function under test
// let result = handle_commit_request(commit_msg);
// // Assertions for error
// assert!(result.is_err());
// assert_eq!(result.unwrap_err().to_string(), "init_tx must be a valid transaction or txid");
// }
// // Example test for adding a new state to an existing commitment
// #[test]
// fn test_handle_commit_request_add_state() {
// // Set up data for adding a state to an existing commitment
// let commit_msg = CommitMessage {
// init_tx: "existing_outpoint_hex".to_string(),
// roles: HashMap::new(),
// validation_tokens: vec![],
// pcd_commitment: json!({"roles": "expected_roles"}).as_object().unwrap().clone(),
// };
// // Mock daemon and cache initialization
// let mut daemon = MockDaemon::new();
// daemon.expect_broadcast().returning(|_| Ok(Txid::new()));
// DAEMON.set(Arc::new(Mutex::new(daemon))).unwrap();
// let process_state = Process::new(vec![], vec![]);
// CACHEDPROCESSES.lock().unwrap().insert(OutPoint::new("mock_txid", 0), process_state);
// // Run the function
// let result = handle_commit_request(commit_msg);
// // Assert success and that a new state was added
// assert!(result.is_ok());
// assert_eq!(result.unwrap(), OutPoint::new("mock_txid", 0));
// }
// // Additional tests for errors and validation tokens would follow a similar setup
}

81
src/config.rs Normal file
View File

@ -0,0 +1,81 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufRead};
use anyhow::{Error, Result};
use sdk_common::sp_client::bitcoin::Network;
#[derive(Debug)]
pub struct Config {
pub core_url: String,
pub core_wallet: Option<String>,
pub ws_url: String,
pub wallet_name: String,
pub network: Network,
pub blindbit_url: String,
pub zmq_url: String,
pub data_dir: String,
}
impl Config {
pub fn read_from_file(filename: &str) -> Result<Self> {
let mut file_content = HashMap::new();
if let Ok(file) = File::open(filename) {
let reader = io::BufReader::new(file);
// Read the file line by line
for line in reader.lines() {
if let Ok(l) = line {
// Ignore comments and empty lines
if l.starts_with('#') || l.trim().is_empty() {
continue;
}
// Split the line into key and value
if let Some((k, v)) = l.split_once('=') {
file_content.insert(k.to_owned(), v.trim_matches('\"').to_owned());
}
}
}
} else {
return Err(anyhow::Error::msg("Failed to find conf file"));
}
// Now set the Config
let config = Config {
core_url: file_content
.remove("core_url")
.ok_or(Error::msg("No \"core_url\""))?
.to_owned(),
core_wallet: file_content.remove("core_wallet").map(|s| s.to_owned()),
ws_url: file_content
.remove("ws_url")
.ok_or(Error::msg("No \"ws_url\""))?
.to_owned(),
wallet_name: file_content
.remove("wallet_name")
.ok_or(Error::msg("No \"wallet_name\""))?
.to_owned(),
network: Network::from_core_arg(
&file_content
.remove("network")
.ok_or(Error::msg("no \"network\""))?
.trim_matches('\"'),
)?,
blindbit_url: file_content
.remove("blindbit_url")
.ok_or(Error::msg("No \"blindbit_url\""))?
.to_owned(),
zmq_url: file_content
.remove("zmq_url")
.ok_or(Error::msg("No \"zmq_url\""))?
.to_owned(),
data_dir: file_content
.remove("data_dir")
.unwrap_or_else(|| ".4nk".to_string()),
};
Ok(config)
}
}

460
src/daemon.rs Normal file
View File

@ -0,0 +1,460 @@
use anyhow::{Context, Error, Result};
use bitcoincore_rpc::json::{
CreateRawTransactionInput, ListUnspentQueryOptions, ListUnspentResultEntry,
WalletCreateFundedPsbtOptions,
};
use bitcoincore_rpc::{json, jsonrpc, Auth, Client, RpcApi};
use sdk_common::sp_client::bitcoin::bip158::BlockFilter;
use sdk_common::sp_client::bitcoin::{
block, Address, Amount, Block, BlockHash, Network, OutPoint, Psbt, ScriptBuf, Sequence,
Transaction, TxIn, TxOut, Txid,
};
use sdk_common::sp_client::bitcoin::{consensus::deserialize, hashes::hex::FromHex};
// use crossbeam_channel::Receiver;
// use parking_lot::Mutex;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::env;
use std::fs::File;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
use crate::FAUCET_AMT;
pub struct SensitiveAuth(pub Auth);
impl SensitiveAuth {
pub(crate) fn get_auth(&self) -> Auth {
self.0.clone()
}
}
enum PollResult {
Done(Result<()>),
Retry,
}
fn rpc_poll(client: &mut Client, skip_block_download_wait: bool) -> PollResult {
match client.get_blockchain_info() {
Ok(info) => {
if skip_block_download_wait {
// bitcoind RPC is available, don't wait for block download to finish
return PollResult::Done(Ok(()));
}
let left_blocks = info.headers - info.blocks;
if info.initial_block_download || left_blocks > 0 {
log::info!(
"waiting for {} blocks to download{}",
left_blocks,
if info.initial_block_download {
" (IBD)"
} else {
""
}
);
return PollResult::Retry;
}
PollResult::Done(Ok(()))
}
Err(err) => {
if let Some(e) = extract_bitcoind_error(&err) {
if e.code == -28 {
log::debug!("waiting for RPC warmup: {}", e.message);
return PollResult::Retry;
}
}
PollResult::Done(Err(err).context("daemon not available"))
}
}
}
fn read_cookie(path: &Path) -> Result<(String, String)> {
// Load username and password from bitcoind cookie file:
// * https://github.com/bitcoin/bitcoin/pull/6388/commits/71cbeaad9a929ba6a7b62d9b37a09b214ae00c1a
// * https://bitcoin.stackexchange.com/questions/46782/rpc-cookie-authentication
let mut file = File::open(path)
.with_context(|| format!("failed to open bitcoind cookie file: {}", path.display()))?;
let mut contents = String::new();
file.read_to_string(&mut contents)
.with_context(|| format!("failed to read bitcoind cookie from {}", path.display()))?;
let parts: Vec<&str> = contents.splitn(2, ':').collect();
anyhow::ensure!(
parts.len() == 2,
"failed to parse bitcoind cookie - missing ':' separator"
);
Ok((parts[0].to_owned(), parts[1].to_owned()))
}
fn rpc_connect(rpcwallet: Option<String>, network: Network, mut rpc_url: String) -> Result<Client> {
match rpcwallet {
Some(rpcwallet) => rpc_url.push_str(&rpcwallet),
None => (),
}
// Allow `wait_for_new_block` to take a bit longer before timing out.
// See https://github.com/romanz/electrs/issues/495 for more details.
let builder = jsonrpc::simple_http::SimpleHttpTransport::builder()
.url(&rpc_url)?
.timeout(Duration::from_secs(30));
let home = env::var("HOME")?;
let mut cookie_path = PathBuf::from_str(&home)?;
cookie_path.push(".bitcoin");
cookie_path.push(network.to_core_arg());
cookie_path.push(".cookie");
let daemon_auth = SensitiveAuth(Auth::CookieFile(cookie_path));
let builder = match daemon_auth.get_auth() {
Auth::None => builder,
Auth::UserPass(user, pass) => builder.auth(user, Some(pass)),
Auth::CookieFile(path) => {
let (user, pass) = read_cookie(&path)?;
builder.auth(user, Some(pass))
}
};
Ok(Client::from_jsonrpc(jsonrpc::Client::with_transport(
builder.build(),
)))
}
#[derive(Debug)]
pub struct Daemon {
rpc: Client,
}
impl RpcCall for Daemon {
fn connect(rpcwallet: Option<String>, rpc_url: String, network: Network) -> Result<Self> {
let mut rpc = rpc_connect(rpcwallet, network, rpc_url)?;
loop {
match rpc_poll(&mut rpc, false) {
PollResult::Done(result) => {
result.context("bitcoind RPC polling failed")?;
break; // on success, finish polling
}
PollResult::Retry => {
std::thread::sleep(std::time::Duration::from_secs(1)); // wait a bit before polling
}
}
}
let network_info = rpc.get_network_info()?;
if !network_info.network_active {
anyhow::bail!("electrs requires active bitcoind p2p network");
}
let info = rpc.get_blockchain_info()?;
if info.pruned {
anyhow::bail!("electrs requires non-pruned bitcoind node");
}
Ok(Self { rpc })
}
fn estimate_fee(&self, nblocks: u16) -> Result<Amount> {
let res = self
.rpc
.estimate_smart_fee(nblocks, None)
.context("failed to estimate fee")?;
if res.errors.is_some() {
Err(Error::msg(serde_json::to_string(&res.errors.unwrap())?))
} else {
Ok(res.fee_rate.unwrap())
}
}
fn get_relay_fee(&self) -> Result<Amount> {
Ok(self
.rpc
.get_network_info()
.context("failed to get relay fee")?
.relay_fee)
}
fn get_current_height(&self) -> Result<u64> {
Ok(self
.rpc
.get_block_count()
.context("failed to get block count")?)
}
fn get_block(&self, block_hash: BlockHash) -> Result<Block> {
Ok(self
.rpc
.get_block(&block_hash)
.context("failed to get block")?)
}
fn get_filters(&self, block_height: u32) -> Result<(u32, BlockHash, BlockFilter)> {
let block_hash = self.rpc.get_block_hash(block_height.try_into()?)?;
let filter = self
.rpc
.get_block_filter(&block_hash)
.context("failed to get block filter")?
.into_filter();
Ok((block_height, block_hash, filter))
}
fn list_unspent_from_to(
&self,
minamt: Option<Amount>,
) -> Result<Vec<json::ListUnspentResultEntry>> {
let minimum_sum_amount = if minamt.is_none() || minamt <= FAUCET_AMT.checked_mul(2) {
FAUCET_AMT.checked_mul(2)
} else {
minamt
};
Ok(self.rpc.list_unspent(
None,
None,
None,
Some(true),
Some(ListUnspentQueryOptions {
minimum_sum_amount,
..Default::default()
}),
)?)
}
fn create_psbt(
&self,
unspents: &[ListUnspentResultEntry],
spk: ScriptBuf,
network: Network,
) -> Result<String> {
let inputs: Vec<CreateRawTransactionInput> = unspents
.iter()
.map(|utxo| CreateRawTransactionInput {
txid: utxo.txid,
vout: utxo.vout,
sequence: None,
})
.collect();
let address = Address::from_script(&spk, network)?;
let total_amt = unspents
.iter()
.fold(Amount::from_sat(0), |acc, x| acc + x.amount);
if total_amt < FAUCET_AMT {
return Err(Error::msg("Not enought funds"));
}
let mut outputs = HashMap::new();
outputs.insert(address.to_string(), total_amt);
let options = WalletCreateFundedPsbtOptions {
subtract_fee_from_outputs: vec![0],
..Default::default()
};
let wallet_create_funded_result =
self.rpc
.wallet_create_funded_psbt(&inputs, &outputs, None, Some(options), None)?;
Ok(wallet_create_funded_result.psbt.to_string())
}
fn process_psbt(&self, psbt: String) -> Result<String> {
let processed_psbt = self.rpc.wallet_process_psbt(&psbt, None, None, None)?;
match processed_psbt.complete {
true => Ok(processed_psbt.psbt),
false => Err(Error::msg("Failed to complete the psbt")),
}
}
fn finalize_psbt(&self, psbt: String) -> Result<String> {
let final_tx = self.rpc.finalize_psbt(&psbt, Some(false))?;
match final_tx.complete {
true => Ok(final_tx
.psbt
.expect("We shouldn't have an empty psbt for a complete return")),
false => Err(Error::msg("Failed to finalize psbt")),
}
}
fn get_network(&self) -> Result<Network> {
let blockchain_info = self.rpc.get_blockchain_info()?;
Ok(blockchain_info.chain)
}
fn test_mempool_accept(
&self,
tx: &Transaction,
) -> Result<crate::bitcoin_json::TestMempoolAcceptResult> {
let res = self.rpc.test_mempool_accept(&vec![tx])?;
Ok(res.get(0).unwrap().clone())
}
fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
let txid = self.rpc.send_raw_transaction(tx)?;
Ok(txid)
}
fn get_transaction_info(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Value> {
// No need to parse the resulting JSON, just return it as-is to the client.
self.rpc
.call(
"getrawtransaction",
&[json!(txid), json!(true), json!(blockhash)],
)
.context("failed to get transaction info")
}
fn get_transaction_hex(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Value> {
use sdk_common::sp_client::bitcoin::consensus::serde::{hex::Lower, Hex, With};
let tx = self.get_transaction(txid, blockhash)?;
#[derive(serde::Serialize)]
#[serde(transparent)]
struct TxAsHex(#[serde(with = "With::<Hex<Lower>>")] Transaction);
serde_json::to_value(TxAsHex(tx)).map_err(Into::into)
}
fn get_transaction(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Transaction> {
self.rpc
.get_raw_transaction(txid, blockhash.as_ref())
.context("failed to get transaction")
}
fn get_block_txids(&self, blockhash: BlockHash) -> Result<Vec<Txid>> {
Ok(self
.rpc
.get_block_info(&blockhash)
.context("failed to get block txids")?
.tx)
}
fn get_mempool_txids(&self) -> Result<Vec<Txid>> {
self.rpc
.get_raw_mempool()
.context("failed to get mempool txids")
}
fn get_mempool_entries(
&self,
txids: &[Txid],
) -> Result<Vec<Result<json::GetMempoolEntryResult>>> {
let client = self.rpc.get_jsonrpc_client();
log::debug!("getting {} mempool entries", txids.len());
let args: Vec<_> = txids
.iter()
.map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()])
.collect();
let reqs: Vec<_> = args
.iter()
.map(|a| client.build_request("getmempoolentry", a))
.collect();
let res = client.send_batch(&reqs).context("batch request failed")?;
log::debug!("got {} mempool entries", res.len());
Ok(res
.into_iter()
.map(|r| {
r.context("missing response")?
.result::<json::GetMempoolEntryResult>()
.context("invalid response")
})
.collect())
}
fn get_mempool_transactions(&self, txids: &[Txid]) -> Result<Vec<Result<Transaction>>> {
let client = self.rpc.get_jsonrpc_client();
log::debug!("getting {} transactions", txids.len());
let args: Vec<_> = txids
.iter()
.map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()])
.collect();
let reqs: Vec<_> = args
.iter()
.map(|a| client.build_request("getrawtransaction", a))
.collect();
let res = client.send_batch(&reqs).context("batch request failed")?;
log::debug!("got {} mempool transactions", res.len());
Ok(res
.into_iter()
.map(|r| -> Result<Transaction> {
let tx_hex = r
.context("missing response")?
.result::<String>()
.context("invalid response")?;
let tx_bytes = Vec::from_hex(&tx_hex).context("non-hex transaction")?;
deserialize(&tx_bytes).context("invalid transaction")
})
.collect())
}
}
pub(crate) trait RpcCall: Send + Sync + std::fmt::Debug {
fn connect(rpcwallet: Option<String>, rpc_url: String, network: Network) -> Result<Self>
where
Self: Sized;
fn estimate_fee(&self, nblocks: u16) -> Result<Amount>;
fn get_relay_fee(&self) -> Result<Amount>;
fn get_current_height(&self) -> Result<u64>;
fn get_block(&self, block_hash: BlockHash) -> Result<Block>;
fn get_filters(&self, block_height: u32) -> Result<(u32, BlockHash, BlockFilter)>;
fn list_unspent_from_to(
&self,
minamt: Option<Amount>,
) -> Result<Vec<json::ListUnspentResultEntry>>;
fn create_psbt(
&self,
unspents: &[ListUnspentResultEntry],
spk: ScriptBuf,
network: Network,
) -> Result<String>;
fn process_psbt(&self, psbt: String) -> Result<String>;
fn finalize_psbt(&self, psbt: String) -> Result<String>;
fn get_network(&self) -> Result<Network>;
fn test_mempool_accept(
&self,
tx: &Transaction,
) -> Result<crate::bitcoin_json::TestMempoolAcceptResult>;
fn broadcast(&self, tx: &Transaction) -> Result<Txid>;
fn get_transaction_info(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Value>;
fn get_transaction_hex(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Value>;
fn get_transaction(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Transaction>;
fn get_block_txids(&self, blockhash: BlockHash) -> Result<Vec<Txid>>;
fn get_mempool_txids(&self) -> Result<Vec<Txid>>;
fn get_mempool_entries(
&self,
txids: &[Txid],
) -> Result<Vec<Result<json::GetMempoolEntryResult>>>;
fn get_mempool_transactions(&self, txids: &[Txid]) -> Result<Vec<Result<Transaction>>>;
}
pub(crate) type RpcError = bitcoincore_rpc::jsonrpc::error::RpcError;
pub(crate) fn extract_bitcoind_error(err: &bitcoincore_rpc::Error) -> Option<&RpcError> {
use bitcoincore_rpc::{
jsonrpc::error::Error::Rpc as ServerError, Error::JsonRpc as JsonRpcError,
};
match err {
JsonRpcError(ServerError(e)) => Some(e),
_ => None,
}
}

274
src/faucet.rs Normal file
View File

@ -0,0 +1,274 @@
use std::{collections::HashMap, str::FromStr};
use bitcoincore_rpc::bitcoin::secp256k1::PublicKey;
use bitcoincore_rpc::json::{self as bitcoin_json};
use sdk_common::silentpayments::sign_transaction;
use sdk_common::sp_client::bitcoin::secp256k1::{
rand::thread_rng, Keypair, Message as Secp256k1Message, Secp256k1, ThirtyTwoByteHash,
};
use sdk_common::sp_client::bitcoin::{
absolute::LockTime,
consensus::serialize,
hex::{DisplayHex, FromHex},
key::TapTweak,
script::PushBytesBuf,
sighash::{Prevouts, SighashCache},
taproot::Signature,
transaction::Version,
Amount, OutPoint, Psbt, ScriptBuf, TapSighashType, Transaction, TxIn, TxOut, Witness,
XOnlyPublicKey,
};
use sdk_common::{
network::{FaucetMessage, NewTxMessage},
silentpayments::create_transaction,
};
use sdk_common::sp_client::silentpayments::sending::generate_recipient_pubkeys;
use sdk_common::sp_client::silentpayments::utils::sending::calculate_partial_secret;
use sdk_common::sp_client::{FeeRate, OwnedOutput, Recipient, RecipientAddress};
use anyhow::{Error, Result};
use crate::lock_freezed_utxos;
use crate::scan::check_transaction_alone;
use crate::{
scan::compute_partial_tweak_to_transaction, MutexExt, SilentPaymentAddress, DAEMON, FAUCET_AMT,
WALLET,
};
fn spend_from_core(dest: XOnlyPublicKey) -> Result<(Transaction, Amount)> {
let core = DAEMON
.get()
.ok_or(Error::msg("DAEMON not initialized"))?
.lock_anyhow()?;
let unspent_list: Vec<bitcoin_json::ListUnspentResultEntry> =
core.list_unspent_from_to(None)?;
if !unspent_list.is_empty() {
let network = core.get_network()?;
let spk = ScriptBuf::new_p2tr_tweaked(dest.dangerous_assume_tweaked());
let new_psbt = core.create_psbt(&unspent_list, spk, network)?;
let processed_psbt = core.process_psbt(new_psbt)?;
let finalize_psbt_result = core.finalize_psbt(processed_psbt)?;
let final_psbt = Psbt::from_str(&finalize_psbt_result)?;
let total_fee = final_psbt.fee()?;
let final_tx = final_psbt.extract_tx()?;
let fee_rate = total_fee
.checked_div(final_tx.weight().to_vbytes_ceil())
.unwrap();
Ok((final_tx, fee_rate))
} else {
// we don't have enough available coins to pay for this faucet request
Err(Error::msg("No spendable outputs"))
}
}
fn faucet_send(
sp_address: SilentPaymentAddress,
commitment: &str,
) -> Result<(Transaction, PublicKey)> {
let sp_wallet = WALLET
.get()
.ok_or(Error::msg("Wallet not initialized"))?
.lock_anyhow()?;
let fee_estimate = DAEMON
.get()
.ok_or(Error::msg("DAEMON not initialized"))?
.lock_anyhow()?
.estimate_fee(6)
.unwrap_or(Amount::from_sat(1000))
.checked_div(1000)
.unwrap();
log::debug!("fee estimate for 6 blocks: {}", fee_estimate);
let recipient = Recipient {
address: RecipientAddress::SpAddress(sp_address),
amount: FAUCET_AMT,
};
let freezed_utxos = lock_freezed_utxos()?;
// We filter out the freezed utxos from available list
let available_outpoints: Vec<(OutPoint, OwnedOutput)> = sp_wallet
.get_unspent_outputs()
.iter()
.filter_map(|(outpoint, output)| {
if !freezed_utxos.contains(&outpoint) {
Some((*outpoint, output.clone()))
} else {
None
}
})
.collect();
// If we had mandatory inputs, we would make sure to put them at the top of the list
// We don't care for faucet though
// We try to pay the faucet amount
if let Ok(unsigned_transaction) = create_transaction(
available_outpoints,
sp_wallet.get_sp_client(),
vec![recipient],
Some(Vec::from_hex(commitment).unwrap()),
FeeRate::from_sat_per_vb(fee_estimate.to_sat() as f32),
) {
let final_tx = sign_transaction(sp_wallet.get_sp_client(), unsigned_transaction)?;
let partial_tweak = compute_partial_tweak_to_transaction(&final_tx)?;
let daemon = DAEMON
.get()
.ok_or(Error::msg("DAEMON not initialized"))?
.lock_anyhow()?;
// First check that mempool accept it
daemon.test_mempool_accept(&final_tx)?;
let txid = daemon.broadcast(&final_tx)?;
log::debug!("Sent tx {}", txid);
// We immediately add the new tx to our wallet to prevent accidental double spend
check_transaction_alone(sp_wallet, &final_tx, &partial_tweak)?;
Ok((final_tx, partial_tweak))
} else {
// let's try to spend directly from the mining address
let secp = Secp256k1::signing_only();
let keypair = Keypair::new(&secp, &mut thread_rng());
// we first spend from core to the pubkey we just created
let (core_tx, fee_rate) = spend_from_core(keypair.x_only_public_key().0)?;
// check that the first output of the transaction pays to the key we just created
debug_assert!(
core_tx.output[0].script_pubkey
== ScriptBuf::new_p2tr_tweaked(
keypair.x_only_public_key().0.dangerous_assume_tweaked()
)
);
// This is ugly and can be streamlined
// create a new transaction that spends the newly created UTXO to the sp_address
let mut faucet_tx = Transaction {
input: vec![TxIn {
previous_output: OutPoint::new(core_tx.txid(), 0),
..Default::default()
}],
output: vec![],
version: Version::TWO,
lock_time: LockTime::ZERO,
};
// now do the silent payment operations with the final recipient address
let partial_secret = calculate_partial_secret(
&[(keypair.secret_key(), true)],
&[(core_tx.txid().to_string(), 0)],
)?;
let ext_output_key: XOnlyPublicKey =
generate_recipient_pubkeys(vec![sp_address.into()], partial_secret)?
.into_values()
.flatten()
.collect::<Vec<XOnlyPublicKey>>()
.get(0)
.expect("Failed to generate keys")
.to_owned();
let change_sp_address = sp_wallet.get_sp_client().get_receiving_address();
let change_output_key: XOnlyPublicKey =
generate_recipient_pubkeys(vec![change_sp_address], partial_secret)?
.into_values()
.flatten()
.collect::<Vec<XOnlyPublicKey>>()
.get(0)
.expect("Failed to generate keys")
.to_owned();
let ext_spk = ScriptBuf::new_p2tr_tweaked(ext_output_key.dangerous_assume_tweaked());
let change_spk = ScriptBuf::new_p2tr_tweaked(change_output_key.dangerous_assume_tweaked());
let mut op_return = PushBytesBuf::new();
op_return.extend_from_slice(&Vec::from_hex(commitment)?)?;
let data_spk = ScriptBuf::new_op_return(op_return);
// Take some margin to pay for the fees
if core_tx.output[0].value < FAUCET_AMT * 4 {
return Err(Error::msg("Not enough funds"));
}
let change_amt = core_tx.output[0].value.checked_sub(FAUCET_AMT).unwrap();
faucet_tx.output.push(TxOut {
value: FAUCET_AMT,
script_pubkey: ext_spk,
});
faucet_tx.output.push(TxOut {
value: change_amt,
script_pubkey: change_spk,
});
faucet_tx.output.push(TxOut {
value: Amount::from_sat(0),
script_pubkey: data_spk,
});
// dummy signature only used for fee estimation
faucet_tx.input[0].witness.push([1; 64].to_vec());
let abs_fee = fee_rate
.checked_mul(faucet_tx.weight().to_vbytes_ceil())
.ok_or_else(|| Error::msg("Fee rate multiplication overflowed"))?;
// reset the witness to empty
faucet_tx.input[0].witness = Witness::new();
faucet_tx.output[1].value -= abs_fee;
let first_tx_outputs = vec![core_tx.output[0].clone()];
let prevouts = Prevouts::All(&first_tx_outputs);
let hash_ty = TapSighashType::Default;
let mut cache = SighashCache::new(&faucet_tx);
let sighash = cache.taproot_key_spend_signature_hash(0, &prevouts, hash_ty)?;
let msg = Secp256k1Message::from_digest(sighash.into_32());
let sig = secp.sign_schnorr_with_rng(&msg, &keypair, &mut thread_rng());
let final_sig = Signature { sig, hash_ty };
faucet_tx.input[0].witness.push(final_sig.to_vec());
{
let daemon = DAEMON
.get()
.ok_or(Error::msg("DAEMON not initialized"))?
.lock_anyhow()?;
// We don't worry about core_tx being refused by core
daemon.broadcast(&core_tx)?;
daemon.test_mempool_accept(&faucet_tx)?;
let txid = daemon.broadcast(&faucet_tx)?;
log::debug!("Sent tx {}", txid);
}
let partial_tweak = compute_partial_tweak_to_transaction(&faucet_tx)?;
check_transaction_alone(sp_wallet, &faucet_tx, &partial_tweak)?;
Ok((faucet_tx, partial_tweak))
}
}
pub fn handle_faucet_request(msg: &FaucetMessage) -> Result<NewTxMessage> {
let sp_address = SilentPaymentAddress::try_from(msg.sp_address.as_str())?;
log::debug!("Sending bootstrap coins to {}", sp_address);
// send bootstrap coins to this sp_address
let (tx, partial_tweak) = faucet_send(sp_address, &msg.commitment)?;
Ok(NewTxMessage::new(
serialize(&tx).to_lower_hex_string(),
Some(partial_tweak.to_string()),
))
}

599
src/main.rs Normal file
View File

@ -0,0 +1,599 @@
use std::{
collections::{HashMap, HashSet},
env,
fmt::Debug,
fs,
io::{Read, Write},
net::SocketAddr,
path::PathBuf,
str::FromStr,
sync::{atomic::AtomicU32, Arc, Mutex, MutexGuard, OnceLock},
};
use bitcoincore_rpc::{
bitcoin::{hashes::Hash, secp256k1::SecretKey},
json::{self as bitcoin_json},
};
use commit::{lock_members, MEMBERLIST};
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
use log::{debug, error, warn};
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
use scan::{check_transaction_alone, compute_partial_tweak_to_transaction};
use sdk_common::network::{AnkFlag, NewTxMessage};
use sdk_common::{
network::HandshakeMessage,
pcd::Member,
process::{lock_processes, Process, CACHEDPROCESSES},
serialization::{OutPointMemberMap, OutPointProcessMap},
silentpayments::SpWallet,
sp_client::{
bitcoin::{
consensus::deserialize,
hex::{DisplayHex, FromHex},
Amount, Network, Transaction,
},
silentpayments::SilentPaymentAddress,
},
MutexExt,
};
use sdk_common::{
sp_client::{
bitcoin::{secp256k1::rand::thread_rng, OutPoint},
OutputSpendStatus, SpClient, SpendKey,
},
updates::{init_update_sink, NativeUpdateSink, StateUpdate},
};
use serde_json::Value;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tungstenite::tungstenite::Message;
use anyhow::{Error, Result};
use zeromq::{Socket, SocketRecv};
mod commit;
mod config;
mod daemon;
mod faucet;
mod message;
mod scan;
use crate::config::Config;
use crate::{
daemon::{Daemon, RpcCall},
scan::scan_blocks,
};
pub const WITH_CUTTHROUGH: bool = false; // We'd rather catch everything for this use case
type Tx = UnboundedSender<Message>;
type PeerMap = Mutex<HashMap<SocketAddr, Tx>>;
pub(crate) static PEERMAP: OnceLock<PeerMap> = OnceLock::new();
pub(crate) static DAEMON: OnceLock<Mutex<Box<dyn RpcCall>>> = OnceLock::new();
static CHAIN_TIP: AtomicU32 = AtomicU32::new(0);
pub static FREEZED_UTXOS: OnceLock<Mutex<HashSet<OutPoint>>> = OnceLock::new();
pub fn lock_freezed_utxos() -> Result<MutexGuard<'static, HashSet<OutPoint>>, Error> {
FREEZED_UTXOS
.get_or_init(|| Mutex::new(HashSet::new()))
.lock_anyhow()
}
#[derive(Debug)]
pub struct StateFile {
path: PathBuf,
}
impl StateFile {
fn new(path: PathBuf) -> Self {
Self { path }
}
fn create(&self) -> Result<()> {
let parent: PathBuf;
if let Some(dir) = self.path.parent() {
if !dir.ends_with(".4nk") {
return Err(Error::msg("parent dir must be \".4nk\""));
}
parent = dir.to_path_buf();
} else {
return Err(Error::msg("wallet file has no parent dir"));
}
// Ensure the parent directory exists
if !parent.exists() {
fs::create_dir_all(parent)?;
}
// Create a new file
fs::File::create(&self.path)?;
Ok(())
}
fn save(&self, json: &Value) -> Result<()> {
let mut f = fs::File::options()
.write(true)
.truncate(true)
.open(&self.path)?;
let stringified = serde_json::to_string(&json)?;
let bin = stringified.as_bytes();
f.write_all(bin)?;
Ok(())
}
fn load(&self) -> Result<Value> {
let mut f = fs::File::open(&self.path)?;
let mut content = vec![];
f.read_to_end(&mut content)?;
let res: Value = serde_json::from_slice(&content)?;
Ok(res)
}
}
#[derive(Debug)]
pub struct DiskStorage {
pub wallet_file: StateFile,
pub processes_file: StateFile,
pub members_file: StateFile,
}
pub static STORAGE: OnceLock<Mutex<DiskStorage>> = OnceLock::new();
const FAUCET_AMT: Amount = Amount::from_sat(10_000);
pub(crate) static WALLET: OnceLock<Mutex<SpWallet>> = OnceLock::new();
fn handle_new_tx_request(new_tx_msg: &NewTxMessage) -> Result<()> {
let tx = deserialize::<Transaction>(&Vec::from_hex(&new_tx_msg.transaction)?)?;
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
daemon.test_mempool_accept(&tx)?;
daemon.broadcast(&tx)?;
Ok(())
}
async fn handle_connection(
raw_stream: TcpStream,
addr: SocketAddr,
our_sp_address: SilentPaymentAddress,
) {
debug!("Incoming TCP connection from: {}", addr);
let peers = PEERMAP.get().expect("Peer Map not initialized");
let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await {
Ok(stream) => {
debug!("WebSocket connection established");
stream
}
Err(e) => {
log::error!("WebSocket handshake failed for {}: {}", addr, e);
return;
}
};
// Insert the write part of this peer to the peer map.
let (tx, rx) = unbounded_channel();
match peers.lock_anyhow() {
Ok(mut peer_map) => peer_map.insert(addr, tx),
Err(e) => {
log::error!("{}", e);
panic!();
}
};
let processes = lock_processes().unwrap().clone();
let members = lock_members().unwrap().clone();
let current_tip = CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst);
let init_msg = HandshakeMessage::new(
our_sp_address.to_string(),
OutPointMemberMap(members),
OutPointProcessMap(processes),
current_tip.into(),
);
if let Err(e) = broadcast_message(
AnkFlag::Handshake,
format!("{}", init_msg.to_string()),
BroadcastType::Sender(addr),
) {
log::error!("Failed to send init message: {}", e);
return;
}
let (outgoing, incoming) = ws_stream.split();
let broadcast_incoming = incoming.try_for_each(|msg| {
if let Ok(raw_msg) = msg.to_text() {
// debug!("Received msg: {}", raw_msg);
process_message(raw_msg, addr);
} else {
debug!("Received non-text message {} from peer {}", msg, addr);
}
future::ok(())
});
let receive_from_others = UnboundedReceiverStream::new(rx)
.map(Ok)
.forward(outgoing)
.map(|result| {
if let Err(e) = result {
debug!("Error sending message: {}", e);
}
});
pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;
debug!("{} disconnected", &addr);
peers.lock().unwrap().remove(&addr);
}
fn create_new_tx_message(transaction: Vec<u8>) -> Result<NewTxMessage> {
let tx: Transaction = deserialize(&transaction)?;
if tx.is_coinbase() {
return Err(Error::msg("Can't process coinbase transaction"));
}
let partial_tweak = compute_partial_tweak_to_transaction(&tx)?;
let sp_wallet = WALLET
.get()
.ok_or_else(|| Error::msg("Wallet not initialized"))?
.lock_anyhow()?;
check_transaction_alone(sp_wallet, &tx, &partial_tweak)?;
Ok(NewTxMessage::new(
transaction.to_lower_hex_string(),
Some(partial_tweak.to_string()),
))
}
async fn handle_scan_updates(
scan_rx: std::sync::mpsc::Receiver<sdk_common::updates::ScanProgress>,
) {
while let Ok(update) = scan_rx.recv() {
log::debug!("Received scan update: {:?}", update);
}
}
async fn handle_state_updates(
state_rx: std::sync::mpsc::Receiver<sdk_common::updates::StateUpdate>,
) {
while let Ok(update) = state_rx.recv() {
match update {
StateUpdate::Update {
blkheight,
blkhash,
found_outputs,
found_inputs,
} => {
// We update the wallet with found outputs and inputs
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
// inputs first
for outpoint in found_inputs {
sp_wallet.mark_output_mined(&outpoint, blkhash);
}
sp_wallet.get_mut_outputs().extend(found_outputs);
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
STORAGE
.get()
.unwrap()
.lock_anyhow()
.unwrap()
.wallet_file
.save(&json)
.unwrap();
}
StateUpdate::NoUpdate { blkheight } => {
// We just keep the current height to update the last_scan
debug!("No update, setting last scan at {}", blkheight);
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
STORAGE
.get()
.unwrap()
.lock_anyhow()
.unwrap()
.wallet_file
.save(&json)
.unwrap();
}
}
}
}
async fn handle_zmq(zmq_url: String, blindbit_url: String) {
debug!("Starting listening on Core");
let mut socket = zeromq::SubSocket::new();
socket.connect(&zmq_url).await.unwrap();
socket.subscribe("rawtx").await.unwrap();
socket.subscribe("hashblock").await.unwrap();
loop {
let core_msg = match socket.recv().await {
Ok(m) => m,
Err(e) => {
error!("Zmq error: {}", e);
continue;
}
};
debug!("Received a message");
let payload: String = if let (Some(topic), Some(data)) = (core_msg.get(0), core_msg.get(1))
{
debug!("topic: {}", std::str::from_utf8(&topic).unwrap());
match std::str::from_utf8(&topic) {
Ok("rawtx") => match create_new_tx_message(data.to_vec()) {
Ok(m) => {
debug!("Created message");
serde_json::to_string(&m).expect("This shouldn't fail")
}
Err(e) => {
error!("{}", e);
continue;
}
},
Ok("hashblock") => {
let current_height = DAEMON
.get()
.unwrap()
.lock_anyhow()
.unwrap()
.get_current_height()
.unwrap();
CHAIN_TIP.store(current_height as u32, std::sync::atomic::Ordering::SeqCst);
// Add retry logic for hashblock processing
let mut retry_count = 0;
const MAX_RETRIES: u32 = 4;
const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay
loop {
match scan_blocks(0, &blindbit_url).await {
Ok(_) => {
debug!("Successfully scanned blocks after {} retries", retry_count);
break;
}
Err(e) => {
retry_count += 1;
if retry_count >= MAX_RETRIES {
error!(
"Failed to scan blocks after {} retries: {}",
MAX_RETRIES, e
);
break;
}
// Exponential backoff: 1s, 2s, 4s
let delay_ms = RETRY_DELAY_MS * (1 << (retry_count - 1));
warn!(
"Scan failed (attempt {}/{}), retrying in {}ms: {}",
retry_count, MAX_RETRIES, delay_ms, e
);
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms))
.await;
}
}
}
continue;
}
_ => {
error!("Unexpected message in zmq");
continue;
}
}
} else {
error!("Empty message");
continue;
};
if let Err(e) = broadcast_message(AnkFlag::NewTx, payload, BroadcastType::ToAll) {
log::error!("{}", e.to_string());
}
}
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
env_logger::init();
// todo: take the path to conf file as argument
// default to "./.conf"
let config = Config::read_from_file(".conf")?;
if config.network == Network::Bitcoin {
warn!("Running on mainnet, you're on your own");
}
MESSAGECACHE
.set(MessageCache::new())
.expect("Message Cache initialization failed");
PEERMAP
.set(PeerMap::new(HashMap::new()))
.expect("PeerMap initialization failed");
// Connect the rpc daemon
DAEMON
.set(Mutex::new(Box::new(Daemon::connect(
config.core_wallet,
config.core_url,
config.network,
)?)))
.expect("DAEMON initialization failed");
let current_tip: u32 = DAEMON
.get()
.unwrap()
.lock_anyhow()?
.get_current_height()?
.try_into()?;
// Set CHAIN_TIP
CHAIN_TIP.store(current_tip, std::sync::atomic::Ordering::SeqCst);
let mut app_dir = PathBuf::from_str(&env::var("HOME")?)?;
app_dir.push(config.data_dir);
let mut wallet_file = app_dir.clone();
wallet_file.push(&config.wallet_name);
let mut processes_file = app_dir.clone();
processes_file.push("processes");
let mut members_file = app_dir.clone();
members_file.push("members");
let wallet_file = StateFile::new(wallet_file);
let processes_file = StateFile::new(processes_file);
let members_file = StateFile::new(members_file);
// load an existing sp_wallet, or create a new one
let sp_wallet: SpWallet = match wallet_file.load() {
Ok(wallet) => {
// TODO: Verify the wallet is compatible with the current network
serde_json::from_value(wallet)?
}
Err(_) => {
// Create a new wallet file if it doesn't exist or fails to load
wallet_file.create()?;
let mut rng = thread_rng();
let new_client = SpClient::new(
SecretKey::new(&mut rng),
SpendKey::Secret(SecretKey::new(&mut rng)),
config.network,
)
.expect("Failed to create a new SpClient");
let mut sp_wallet = SpWallet::new(new_client);
// Set birthday and update scan information
sp_wallet.set_birthday(current_tip);
sp_wallet.set_last_scan(current_tip);
// Save the newly created wallet to disk
let json = serde_json::to_value(sp_wallet.clone())?;
wallet_file.save(&json)?;
sp_wallet
}
};
let cached_processes: HashMap<OutPoint, Process> = match processes_file.load() {
Ok(processes) => {
let deserialized: OutPointProcessMap = serde_json::from_value(processes)?;
deserialized.0
}
Err(_) => {
debug!("creating process file at {}", processes_file.path.display());
processes_file.create()?;
HashMap::new()
}
};
let members: HashMap<OutPoint, Member> = match members_file.load() {
Ok(members) => {
let deserialized: OutPointMemberMap = serde_json::from_value(members)?;
deserialized.0
}
Err(_) => {
debug!("creating members file at {}", members_file.path.display());
members_file.create()?;
HashMap::new()
}
};
{
let utxo_to_freeze: HashSet<OutPoint> = cached_processes
.iter()
.map(|(_, process)| process.get_last_unspent_outpoint().unwrap())
.collect();
let mut freezed_utxos = lock_freezed_utxos()?;
*freezed_utxos = utxo_to_freeze;
}
let our_sp_address = sp_wallet.get_sp_client().get_receiving_address();
log::info!("Using wallet with address {}", our_sp_address,);
log::info!(
"Found {} outputs for a total balance of {}",
sp_wallet.get_outputs().len(),
sp_wallet.get_balance()
);
let last_scan = sp_wallet.get_last_scan();
WALLET
.set(Mutex::new(sp_wallet))
.expect("Failed to initialize WALLET");
CACHEDPROCESSES
.set(Mutex::new(cached_processes))
.expect("Failed to initialize CACHEDPROCESSES");
MEMBERLIST
.set(Mutex::new(members))
.expect("Failed to initialize MEMBERLIST");
let storage = DiskStorage {
wallet_file,
processes_file,
members_file,
};
STORAGE.set(Mutex::new(storage)).unwrap();
let (sink, scan_rx, state_rx) = NativeUpdateSink::new();
init_update_sink(Arc::new(sink));
// Spawn the update handlers
tokio::spawn(handle_scan_updates(scan_rx));
tokio::spawn(handle_state_updates(state_rx));
if last_scan < current_tip {
log::info!("Scanning for our outputs");
scan_blocks(current_tip - last_scan, &config.blindbit_url).await?;
}
// Subscribe to Bitcoin Core
let zmq_url = config.zmq_url.clone();
let blindbit_url = config.blindbit_url.clone();
tokio::spawn(async move {
handle_zmq(zmq_url, blindbit_url).await;
});
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(config.ws_url).await;
let listener = try_socket.expect("Failed to bind");
tokio::spawn(MessageCache::clean_up());
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(stream, addr, our_sp_address));
}
Ok(())
}

234
src/message.rs Normal file
View File

@ -0,0 +1,234 @@
use anyhow::{Error, Result};
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Mutex, OnceLock},
time::{Duration, Instant},
};
use tokio::time;
use tokio_tungstenite::tungstenite::Message;
use sdk_common::network::{AnkFlag, CommitMessage, Envelope, FaucetMessage, NewTxMessage};
use crate::{
commit::handle_commit_request, faucet::handle_faucet_request, handle_new_tx_request, PEERMAP,
};
pub(crate) static MESSAGECACHE: OnceLock<MessageCache> = OnceLock::new();
const MESSAGECACHEDURATION: Duration = Duration::from_secs(20);
const MESSAGECACHEINTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug)]
pub(crate) struct MessageCache {
store: Mutex<HashMap<String, Instant>>,
}
impl MessageCache {
pub fn new() -> Self {
Self {
store: Mutex::new(HashMap::new()),
}
}
fn insert(&self, key: String) {
let mut store = self.store.lock().unwrap();
store.insert(key, Instant::now());
}
fn remove(&self, key: &str) {
let mut store = self.store.lock().unwrap();
store.remove(key);
}
fn contains(&self, key: &str) -> bool {
let store = self.store.lock().unwrap();
store.contains_key(key)
}
pub async fn clean_up() {
let cache = MESSAGECACHE.get().unwrap();
let mut interval = time::interval(MESSAGECACHEINTERVAL);
loop {
interval.tick().await;
let mut store = cache.store.lock().unwrap();
let now = Instant::now();
store.retain(|_, entrytime| now.duration_since(*entrytime) <= MESSAGECACHEDURATION);
}
}
}
pub(crate) enum BroadcastType {
Sender(SocketAddr),
#[allow(dead_code)]
ExcludeSender(SocketAddr),
#[allow(dead_code)]
ToAll,
}
pub(crate) fn broadcast_message(
flag: AnkFlag,
payload: String,
broadcast: BroadcastType,
) -> Result<()> {
let peers = PEERMAP.get().ok_or(Error::msg("Unitialized peer map"))?;
let ank_msg = Envelope {
flag,
content: payload,
};
let msg = Message::Text(serde_json::to_string(&ank_msg)?);
match ank_msg.flag {
AnkFlag::Cipher => log::debug!("Broadcasting cipher"),
AnkFlag::Handshake => log::debug!("Broadcasting handshake"),
_ => log::debug!("Broadcasting {} message: {}", ank_msg.flag.as_str(), msg),
}
match broadcast {
BroadcastType::Sender(addr) => {
peers
.lock()
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
.iter()
.find(|(peer_addr, _)| peer_addr == &&addr)
.ok_or(Error::msg("Failed to find the sender in the peer_map"))?
.1
.send(msg)?;
}
BroadcastType::ExcludeSender(addr) => {
peers
.lock()
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
.iter()
.filter(|(peer_addr, _)| peer_addr != &&addr)
.for_each(|(_, peer_tx)| {
let _ = peer_tx.send(msg.clone());
});
}
BroadcastType::ToAll => {
peers
.lock()
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
.iter()
.for_each(|(_, peer_tx)| {
let _ = peer_tx.send(msg.clone());
});
}
}
Ok(())
}
fn process_faucet_message(ank_msg: Envelope, addr: SocketAddr) {
log::debug!("Received a faucet message");
if let Ok(mut content) = serde_json::from_str::<FaucetMessage>(&ank_msg.content) {
match handle_faucet_request(&content) {
Ok(new_tx_msg) => {
log::debug!(
"Obtained new_tx_msg: {}",
serde_json::to_string(&new_tx_msg).unwrap()
);
}
Err(e) => {
log::error!("Failed to send faucet tx: {}", e);
content.error = Some(e.into());
let payload = serde_json::to_string(&content).expect("Message type shouldn't fail");
if let Err(e) =
broadcast_message(AnkFlag::Faucet, payload, BroadcastType::Sender(addr))
{
log::error!("Failed to broadcast message: {}", e);
}
}
}
} else {
log::error!("Invalid content for faucet message");
}
}
fn process_new_tx_message(ank_msg: Envelope, addr: SocketAddr) {
log::debug!("Received a new tx message");
if let Ok(mut new_tx_msg) = serde_json::from_str::<NewTxMessage>(&ank_msg.content) {
if let Err(e) = handle_new_tx_request(&mut new_tx_msg) {
log::error!("handle_new_tx_request returned error: {}", e);
new_tx_msg.error = Some(e.into());
if let Err(e) = broadcast_message(
AnkFlag::NewTx,
serde_json::to_string(&new_tx_msg).expect("This shouldn't fail"),
BroadcastType::Sender(addr),
) {
log::error!("Failed to broadcast message: {}", e);
}
}
} else {
log::error!("Invalid content for new_tx message");
}
}
fn process_cipher_message(ank_msg: Envelope, addr: SocketAddr) {
// For now we just send it to everyone
log::debug!("Received a cipher message");
if let Err(e) = broadcast_message(
AnkFlag::Cipher,
ank_msg.content,
BroadcastType::ExcludeSender(addr),
) {
log::error!("Failed to send message with error: {}", e);
}
}
fn process_commit_message(ank_msg: Envelope, addr: SocketAddr) {
if let Ok(mut commit_msg) = serde_json::from_str::<CommitMessage>(&ank_msg.content) {
match handle_commit_request(commit_msg.clone()) {
Ok(new_outpoint) => log::debug!("Processed commit msg for outpoint {}", new_outpoint),
Err(e) => {
log::error!("handle_commit_request returned error: {}", e);
// Temporary fix: we remove the message from the cache in case the client wants to try again
let cache = MESSAGECACHE.get().expect("Cache should be initialized");
cache.remove(ank_msg.to_string().as_str());
commit_msg.error = Some(e.into());
if let Err(e) = broadcast_message(
AnkFlag::Commit,
serde_json::to_string(&commit_msg).expect("This shouldn't fail"),
BroadcastType::Sender(addr),
) {
log::error!("Failed to broadcast message: {}", e);
}
}
};
}
}
fn process_unknown_message(ank_msg: Envelope, addr: SocketAddr) {
log::debug!("Received an unknown message");
if let Err(e) = broadcast_message(
AnkFlag::Unknown,
ank_msg.content,
BroadcastType::ExcludeSender(addr),
) {
log::error!("Failed to send message with error: {}", e);
}
}
pub fn process_message(raw_msg: &str, addr: SocketAddr) {
let cache = MESSAGECACHE.get().expect("Cache should be initialized");
if cache.contains(raw_msg) {
log::debug!("Message already processed, dropping");
return;
} else {
cache.insert(raw_msg.to_owned());
}
match serde_json::from_str::<Envelope>(raw_msg) {
Ok(ank_msg) => match ank_msg.flag {
AnkFlag::Faucet => process_faucet_message(ank_msg, addr),
AnkFlag::NewTx => process_new_tx_message(ank_msg, addr),
AnkFlag::Cipher => process_cipher_message(ank_msg, addr),
AnkFlag::Commit => process_commit_message(ank_msg, addr),
AnkFlag::Unknown => process_unknown_message(ank_msg, addr),
AnkFlag::Sync => todo!(),
AnkFlag::Handshake => log::debug!("Received init message from {}", addr),
},
Err(_) => log::error!("Failed to parse network message"),
}
}

616
src/scan.rs Normal file
View File

@ -0,0 +1,616 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::MutexGuard;
use anyhow::bail;
use anyhow::{Error, Result};
use bitcoincore_rpc::bitcoin::absolute::Height;
use bitcoincore_rpc::bitcoin::hashes::sha256;
use bitcoincore_rpc::bitcoin::hashes::Hash;
use bitcoincore_rpc::bitcoin::Amount;
use futures_util::Stream;
use log::info;
use sdk_common::silentpayments::SpWallet;
use sdk_common::sp_client::bitcoin::bip158::BlockFilter;
use sdk_common::sp_client::bitcoin::secp256k1::{All, PublicKey, Scalar, Secp256k1, SecretKey};
use sdk_common::sp_client::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, XOnlyPublicKey};
use sdk_common::sp_client::silentpayments::receiving::Receiver;
use sdk_common::sp_client::silentpayments::utils::receiving::{
calculate_tweak_data, get_pubkey_from_input,
};
use sdk_common::sp_client::BlockData;
use sdk_common::sp_client::ChainBackend;
use sdk_common::sp_client::FilterData;
use sdk_common::sp_client::SpClient;
use sdk_common::sp_client::Updater;
use sdk_common::sp_client::{BlindbitBackend, OutputSpendStatus, OwnedOutput, SpScanner};
use sdk_common::updates::StateUpdater;
use tokio::time::Instant;
use crate::CHAIN_TIP;
use crate::{MutexExt, DAEMON, STORAGE, WALLET, WITH_CUTTHROUGH};
pub fn compute_partial_tweak_to_transaction(tx: &Transaction) -> Result<PublicKey> {
let daemon = DAEMON.get().ok_or(Error::msg("DAEMON not initialized"))?;
let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len());
let mut pubkeys: Vec<PublicKey> = Vec::with_capacity(tx.input.len());
// TODO we should cache transactions to prevent multiple rpc request when transaction spends multiple outputs from the same tx
for input in tx.input.iter() {
outpoints.push((
input.previous_output.txid.to_string(),
input.previous_output.vout,
));
let prev_tx = daemon
.lock_anyhow()?
.get_transaction(&input.previous_output.txid, None)
.map_err(|e| Error::msg(format!("Failed to find previous transaction: {}", e)))?;
if let Some(output) = prev_tx.output.get(input.previous_output.vout as usize) {
match get_pubkey_from_input(
&input.script_sig.to_bytes(),
&input.witness.to_vec(),
&output.script_pubkey.to_bytes(),
) {
Ok(Some(pubkey)) => pubkeys.push(pubkey),
Ok(None) => continue,
Err(e) => {
return Err(Error::msg(format!(
"Can't extract pubkey from input: {}",
e
)))
}
}
} else {
return Err(Error::msg("Transaction with a non-existing input"));
}
}
let input_pub_keys: Vec<&PublicKey> = pubkeys.iter().collect();
let partial_tweak = calculate_tweak_data(&input_pub_keys, &outpoints)?;
Ok(partial_tweak)
}
fn get_script_to_secret_map(
sp_receiver: &Receiver,
tweak_data_vec: Vec<String>,
scan_key_scalar: Scalar,
secp: &Secp256k1<All>,
) -> Result<HashMap<[u8; 34], PublicKey>> {
let mut res = HashMap::new();
let shared_secrets: Result<Vec<PublicKey>> = tweak_data_vec
.into_iter()
.map(|s| {
let x = PublicKey::from_str(&s).map_err(Error::new)?;
x.mul_tweak(secp, &scan_key_scalar).map_err(Error::new)
})
.collect();
let shared_secrets = shared_secrets?;
for shared_secret in shared_secrets {
let spks = sp_receiver.get_spks_from_shared_secret(&shared_secret)?;
for spk in spks.into_values() {
res.insert(spk, shared_secret);
}
}
Ok(res)
}
pub fn check_transaction_alone(
mut wallet: MutexGuard<SpWallet>,
tx: &Transaction,
tweak_data: &PublicKey,
) -> Result<HashMap<OutPoint, OwnedOutput>> {
let updates = match wallet.update_with_transaction(tx, tweak_data, 0) {
Ok(updates) => updates,
Err(e) => {
log::debug!("Error while checking transaction: {}", e);
HashMap::new()
}
};
if updates.len() > 0 {
let storage = STORAGE
.get()
.ok_or_else(|| Error::msg("Failed to get STORAGE"))?;
storage
.lock_anyhow()?
.wallet_file
.save(&serde_json::to_value(wallet.clone())?)?;
}
Ok(updates)
}
fn check_block(
blkfilter: BlockFilter,
blkhash: BlockHash,
candidate_spks: Vec<&[u8; 34]>,
owned_spks: Vec<Vec<u8>>,
) -> Result<bool> {
// check output scripts
let mut scripts_to_match: Vec<_> = candidate_spks.into_iter().map(|spk| spk.as_ref()).collect();
// check input scripts
scripts_to_match.extend(owned_spks.iter().map(|spk| spk.as_slice()));
// note: match will always return true for an empty query!
if !scripts_to_match.is_empty() {
Ok(blkfilter.match_any(&blkhash, &mut scripts_to_match.into_iter())?)
} else {
Ok(false)
}
}
fn scan_block_outputs(
sp_receiver: &Receiver,
txdata: &Vec<Transaction>,
blkheight: u64,
spk2secret: HashMap<[u8; 34], PublicKey>,
) -> Result<HashMap<OutPoint, OwnedOutput>> {
let mut res: HashMap<OutPoint, OwnedOutput> = HashMap::new();
// loop over outputs
for tx in txdata {
let txid = tx.txid();
// collect all taproot outputs from transaction
let p2tr_outs: Vec<(usize, &TxOut)> = tx
.output
.iter()
.enumerate()
.filter(|(_, o)| o.script_pubkey.is_p2tr())
.collect();
if p2tr_outs.is_empty() {
continue;
}; // no taproot output
let mut secret: Option<PublicKey> = None;
// Does this transaction contains one of the outputs we already found?
for spk in p2tr_outs.iter().map(|(_, o)| &o.script_pubkey) {
if let Some(s) = spk2secret.get(spk.as_bytes()) {
// we might have at least one output in this transaction
secret = Some(*s);
break;
}
}
if secret.is_none() {
continue;
}; // we don't have a secret that matches any of the keys
// Now we can just run sp_receiver on all the p2tr outputs
let xonlykeys: Result<Vec<XOnlyPublicKey>> = p2tr_outs
.iter()
.map(|(_, o)| {
XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]).map_err(Error::new)
})
.collect();
let ours = sp_receiver.scan_transaction(&secret.unwrap(), xonlykeys?)?;
let height = Height::from_consensus(blkheight as u32)?;
for (label, map) in ours {
res.extend(p2tr_outs.iter().filter_map(|(i, o)| {
match XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]) {
Ok(key) => {
if let Some(scalar) = map.get(&key) {
match SecretKey::from_slice(&scalar.to_be_bytes()) {
Ok(tweak) => {
let outpoint = OutPoint {
txid,
vout: *i as u32,
};
return Some((
outpoint,
OwnedOutput {
blockheight: height,
tweak: tweak.secret_bytes(),
amount: o.value,
script: o.script_pubkey.clone(),
label: label.clone(),
spend_status: OutputSpendStatus::Unspent,
},
));
}
Err(_) => {
return None;
}
}
}
None
}
Err(_) => None,
}
}));
}
}
Ok(res)
}
fn scan_block_inputs(
our_outputs: &HashMap<OutPoint, OwnedOutput>,
txdata: Vec<Transaction>,
) -> Result<Vec<OutPoint>> {
let mut found = vec![];
for tx in txdata {
for input in tx.input {
let prevout = input.previous_output;
if our_outputs.contains_key(&prevout) {
found.push(prevout);
}
}
}
Ok(found)
}
pub struct NativeSpScanner<'a> {
updater: Box<dyn Updater + Sync + Send>,
backend: Box<dyn ChainBackend + Sync + Send>,
client: SpClient,
keep_scanning: &'a AtomicBool, // used to interrupt scanning
owned_outpoints: HashSet<OutPoint>, // used to scan block inputs
}
impl<'a> NativeSpScanner<'a> {
pub fn new(
client: SpClient,
updater: Box<dyn Updater + Sync + Send>,
backend: Box<dyn ChainBackend + Sync + Send>,
owned_outpoints: HashSet<OutPoint>,
keep_scanning: &'a AtomicBool,
) -> Self {
Self {
client,
updater,
backend,
owned_outpoints,
keep_scanning,
}
}
pub async fn process_blocks(
&mut self,
start: Height,
end: Height,
block_data_stream: impl Stream<Item = Result<BlockData>> + Unpin + Send,
) -> Result<()> {
use sdk_common::sp_client::futures::StreamExt;
use std::time::{Duration, Instant};
let mut update_time = Instant::now();
let mut stream = block_data_stream;
while let Some(blockdata) = stream.next().await {
let blockdata = blockdata?;
let blkheight = blockdata.blkheight;
let blkhash = blockdata.blkhash;
// stop scanning and return if interrupted
if self.should_interrupt() {
self.save_state()?;
return Ok(());
}
let mut save_to_storage = false;
// always save on last block or after 30 seconds since last save
if blkheight == end || update_time.elapsed() > Duration::from_secs(30) {
save_to_storage = true;
}
let (found_outputs, found_inputs) = self.process_block(blockdata).await?;
if !found_outputs.is_empty() {
save_to_storage = true;
self.record_outputs(blkheight, blkhash, found_outputs)?;
}
if !found_inputs.is_empty() {
save_to_storage = true;
self.record_inputs(blkheight, blkhash, found_inputs)?;
}
// tell the updater we scanned this block
self.record_progress(start, blkheight, end)?;
if save_to_storage {
self.save_state()?;
update_time = Instant::now();
}
}
Ok(())
}
}
#[async_trait::async_trait]
impl<'a> SpScanner for NativeSpScanner<'a> {
async fn scan_blocks(
&mut self,
start: Height,
end: Height,
dust_limit: Amount,
with_cutthrough: bool,
) -> Result<()> {
if start > end {
bail!("bigger start than end: {} > {}", start, end);
}
info!("start: {} end: {}", start, end);
let start_time: Instant = Instant::now();
// get block data stream
let range = start.to_consensus_u32()..=end.to_consensus_u32();
let block_data_stream = self.get_block_data_stream(range, dust_limit, with_cutthrough);
// process blocks using block data stream
self.process_blocks(start, end, block_data_stream).await?;
// time elapsed for the scan
info!(
"Blindbit scan complete in {} seconds",
start_time.elapsed().as_secs()
);
Ok(())
}
async fn process_block(
&mut self,
blockdata: BlockData,
) -> Result<(HashMap<OutPoint, OwnedOutput>, HashSet<OutPoint>)> {
let BlockData {
blkheight,
tweaks,
new_utxo_filter,
spent_filter,
..
} = blockdata;
let outs = self
.process_block_outputs(blkheight, tweaks, new_utxo_filter)
.await?;
// after processing outputs, we add the found outputs to our list
self.owned_outpoints.extend(outs.keys());
let ins = self.process_block_inputs(blkheight, spent_filter).await?;
// after processing inputs, we remove the found inputs
self.owned_outpoints.retain(|item| !ins.contains(item));
Ok((outs, ins))
}
async fn process_block_outputs(
&self,
blkheight: Height,
tweaks: Vec<PublicKey>,
new_utxo_filter: FilterData,
) -> Result<HashMap<OutPoint, OwnedOutput>> {
let mut res = HashMap::new();
if !tweaks.is_empty() {
let secrets_map = self.client.get_script_to_secret_map(tweaks)?;
//last_scan = last_scan.max(n as u32);
let candidate_spks: Vec<&[u8; 34]> = secrets_map.keys().collect();
//get block gcs & check match
let blkfilter = BlockFilter::new(&new_utxo_filter.data);
let blkhash = new_utxo_filter.block_hash;
let matched_outputs = Self::check_block_outputs(blkfilter, blkhash, candidate_spks)?;
//if match: fetch and scan utxos
if matched_outputs {
info!("matched outputs on: {}", blkheight);
let found = self.scan_utxos(blkheight, secrets_map).await?;
if !found.is_empty() {
for (label, utxo, tweak) in found {
let outpoint = OutPoint {
txid: utxo.txid,
vout: utxo.vout,
};
let out = OwnedOutput {
blockheight: blkheight,
tweak: tweak.to_be_bytes(),
amount: utxo.value,
script: utxo.scriptpubkey,
label,
spend_status: OutputSpendStatus::Unspent,
};
res.insert(outpoint, out);
}
}
}
}
Ok(res)
}
async fn process_block_inputs(
&self,
blkheight: Height,
spent_filter: FilterData,
) -> Result<HashSet<OutPoint>> {
let mut res = HashSet::new();
let blkhash = spent_filter.block_hash;
// first get the 8-byte hashes used to construct the input filter
let input_hashes_map = self.get_input_hashes(blkhash)?;
// check against filter
let blkfilter = BlockFilter::new(&spent_filter.data);
let matched_inputs = self.check_block_inputs(
blkfilter,
blkhash,
input_hashes_map.keys().cloned().collect(),
)?;
// if match: download spent data, collect the outpoints that are spent
if matched_inputs {
info!("matched inputs on: {}", blkheight);
let spent = self.backend.spent_index(blkheight).await?.data;
for spent in spent {
let hex: &[u8] = spent.as_ref();
if let Some(outpoint) = input_hashes_map.get(hex) {
res.insert(*outpoint);
}
}
}
Ok(res)
}
fn get_block_data_stream(
&self,
range: std::ops::RangeInclusive<u32>,
dust_limit: Amount,
with_cutthrough: bool,
) -> std::pin::Pin<Box<dyn Stream<Item = Result<BlockData>> + Send>> {
self.backend
.get_block_data_for_range(range, dust_limit, with_cutthrough)
}
fn should_interrupt(&self) -> bool {
!self
.keep_scanning
.load(std::sync::atomic::Ordering::Relaxed)
}
fn save_state(&mut self) -> Result<()> {
self.updater.save_to_persistent_storage()
}
fn record_outputs(
&mut self,
height: Height,
block_hash: BlockHash,
outputs: HashMap<OutPoint, OwnedOutput>,
) -> Result<()> {
self.updater.record_block_outputs(height, block_hash, outputs)
}
fn record_inputs(
&mut self,
height: Height,
block_hash: BlockHash,
inputs: HashSet<OutPoint>,
) -> Result<()> {
self.updater.record_block_inputs(height, block_hash, inputs)
}
fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()> {
self.updater.record_scan_progress(start, current, end)
}
fn client(&self) -> &SpClient {
&self.client
}
fn backend(&self) -> &dyn ChainBackend {
self.backend.as_ref()
}
fn updater(&mut self) -> &mut dyn Updater {
self.updater.as_mut()
}
// Override the default get_input_hashes implementation to use owned_outpoints
fn get_input_hashes(&self, blkhash: BlockHash) -> Result<HashMap<[u8; 8], OutPoint>> {
let mut map: HashMap<[u8; 8], OutPoint> = HashMap::new();
for outpoint in &self.owned_outpoints {
let mut arr = [0u8; 68];
arr[..32].copy_from_slice(&outpoint.txid.to_raw_hash().to_byte_array());
arr[32..36].copy_from_slice(&outpoint.vout.to_le_bytes());
arr[36..].copy_from_slice(&blkhash.to_byte_array());
let hash = sha256::Hash::hash(&arr);
let mut res = [0u8; 8];
res.copy_from_slice(&hash[..8]);
map.insert(res, outpoint.clone());
}
Ok(map)
}
}
pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyhow::Result<()> {
log::info!("Starting a rescan");
// Get all the data we need upfront, before any async operations
let (sp_wallet, scan_height, tip_height) = {
let sp_wallet = WALLET
.get()
.ok_or(Error::msg("Wallet not initialized"))?
.lock_anyhow()?;
let scan_height = sp_wallet.get_last_scan();
let tip_height: u32 = CHAIN_TIP.load(Ordering::Relaxed).try_into()?;
(sp_wallet.clone(), scan_height, tip_height)
};
// 0 means scan to tip
if n_blocks_to_scan == 0 {
n_blocks_to_scan = tip_height - scan_height;
}
let start = scan_height + 1;
let end = if scan_height + n_blocks_to_scan <= tip_height {
scan_height + n_blocks_to_scan
} else {
tip_height
};
if start > end {
return Ok(());
}
let updater = StateUpdater::new();
let backend = BlindbitBackend::new(blindbit_url.to_string())?;
let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect();
let keep_scanning = Arc::new(AtomicBool::new(true));
log::info!("start: {} end: {}", start, end);
let start_time = Instant::now();
let mut scanner = NativeSpScanner::new(
sp_wallet.get_sp_client().clone(),
Box::new(updater),
Box::new(backend),
owned_outpoints,
&keep_scanning,
);
let dust_limit = Amount::from_sat(0); // We don't really have a dust limit for this use case
scanner
.scan_blocks(
Height::from_consensus(start)?,
Height::from_consensus(end)?,
dust_limit,
WITH_CUTTHROUGH,
)
.await?;
// time elapsed for the scan
log::info!(
"Scan complete in {} seconds",
start_time.elapsed().as_secs()
);
Ok(())
}