Compare commits
135 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
cf93dde1c0 | ||
![]() |
833ea55233 | ||
![]() |
8dd41ead13 | ||
![]() |
bfb920029d | ||
![]() |
9d365dc862 | ||
![]() |
e4a9a1473f | ||
![]() |
60aa257f04 | ||
![]() |
740bddc0e9 | ||
![]() |
b9fed4a386 | ||
![]() |
475064a835 | ||
![]() |
f7742c8aee | ||
![]() |
07ac93313f | ||
![]() |
0ba8bec07b | ||
![]() |
3faa8ea746 | ||
![]() |
04ce0376a1 | ||
![]() |
bedabd4e41 | ||
![]() |
66248a4600 | ||
![]() |
60a497cdde | ||
![]() |
6de9f31ab1 | ||
![]() |
870085d342 | ||
![]() |
13e92d4afb | ||
![]() |
7d5375e5df | ||
![]() |
2dadcef03c | ||
760a377a3e | |||
fbabe71e76 | |||
6ab48f0b19 | |||
c3a58ed020 | |||
![]() |
7633c07e2f | ||
![]() |
b783649d42 | ||
931c6b827b | |||
bf6f90f754 | |||
d1cd4491f5 | |||
12a8b9aab5 | |||
ebf27a0a8d | |||
a390115b3e | |||
818395177c | |||
00f01e5f48 | |||
69afa2695b | |||
02189bf3db | |||
7c278e9eb2 | |||
fd763e6193 | |||
7a060c123d | |||
f7260d6dce | |||
fd8c40e09a | |||
efa1129e45 | |||
8b978b91e9 | |||
de3808eb1e | |||
2cc026077e | |||
f722dee8f6 | |||
159fd78a3a | |||
e69ae1a21a | |||
6fc08f3fdd | |||
4839c95e93 | |||
c3aba61be2 | |||
3ea25e542a | |||
eea95b8342 | |||
04b9785531 | |||
dbca46a739 | |||
5d18fd7688 | |||
1d1b3546d6 | |||
8d7a6c7400 | |||
d99cab0a26 | |||
17c3fefa88 | |||
3fd014edd9 | |||
7eb6304f6e | |||
1e433b63e7 | |||
a55159449a | |||
9db6b9f957 | |||
960c0808fa | |||
a09eb404e2 | |||
fdcd234838 | |||
292b7135ab | |||
4ec25d1494 | |||
4a72fd3129 | |||
21c0882a34 | |||
2b1bccb687 | |||
1271e492f5 | |||
616f20da8f | |||
a6bb827c56 | |||
![]() |
e708372223 | ||
![]() |
cc2368996f | ||
![]() |
072acb57bc | ||
![]() |
25bbc64e6a | ||
![]() |
2788159c4d | ||
![]() |
465f32c4fe | ||
![]() |
398ee47612 | ||
![]() |
1cb20527da | ||
![]() |
39a8ff87a9 | ||
![]() |
8eae4408f2 | ||
![]() |
ce2c158de2 | ||
![]() |
c468d455ca | ||
![]() |
80b2cf8c4c | ||
![]() |
bc625955e9 | ||
![]() |
0f726d70be | ||
![]() |
7b34096940 | ||
![]() |
fc726dbcf5 | ||
![]() |
a5ea8485d6 | ||
![]() |
d013c4e20f | ||
![]() |
32916f1588 | ||
![]() |
56321d89df | ||
![]() |
da04149923 | ||
![]() |
31c17e908d | ||
![]() |
fbd7a1c1ea | ||
![]() |
9f2c4ed2e1 | ||
![]() |
e5e7496611 | ||
![]() |
8e50727880 | ||
![]() |
a192edb761 | ||
![]() |
0d9e8ba4e5 | ||
![]() |
7e5dc17841 | ||
![]() |
0d5149eb96 | ||
![]() |
ad026a783e | ||
![]() |
539670d248 | ||
![]() |
de84c8a1bf | ||
![]() |
083843a94a | ||
![]() |
4455c76663 | ||
![]() |
ec8e4ebef8 | ||
![]() |
94b96320d7 | ||
![]() |
ee5fcb4932 | ||
![]() |
a287db7cf8 | ||
![]() |
8bbee83aac | ||
![]() |
4d3dc8123a | ||
![]() |
459756815f | ||
![]() |
c1d1c0a4b5 | ||
![]() |
7dba477f33 | ||
![]() |
306949e9f0 | ||
![]() |
6db81ee769 | ||
![]() |
d33c3e9735 | ||
![]() |
a28f40fa0c | ||
![]() |
d816115929 | ||
![]() |
3ebc319a26 | ||
![]() |
ed37accb67 | ||
![]() |
7774207e01 | ||
![]() |
5f4efa5aa3 | ||
![]() |
2d044ec2c2 | ||
![]() |
eb6699baca |
7
.conf.model
Normal file
7
.conf.model
Normal file
@ -0,0 +1,7 @@
|
||||
core_url=""
|
||||
ws_url=""
|
||||
wallet_name="default"
|
||||
network="signet"
|
||||
electrum_url="tcp://localhost:60601"
|
||||
blindbit_url="tcp://localhost:8000"
|
||||
zmq_url=""
|
43
.github/workflows/cicd.yml
vendored
Normal file
43
.github/workflows/cicd.yml
vendored
Normal file
@ -0,0 +1,43 @@
|
||||
name: Build and Push to Registry
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ cicd ]
|
||||
|
||||
env:
|
||||
REGISTRY: git.4nkweb.com
|
||||
IMAGE_NAME: 4nk/sdk_relay
|
||||
|
||||
jobs:
|
||||
build-and-push:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up SSH agent
|
||||
uses: webfactory/ssh-agent@v0.9.1
|
||||
with:
|
||||
ssh-private-key: ${{ secrets.SSH_PRIVATE_KEY }}
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Login to Container Registry
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ${{ env.REGISTRY }}
|
||||
username: ${{ secrets.USER }}
|
||||
password: ${{ secrets.TOKEN }}
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
ssh: default
|
||||
build-args: |
|
||||
CONF=${{ secrets.CONF }}
|
||||
tags: |
|
||||
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
|
||||
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ gitea.sha }}
|
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
/target
|
||||
.conf
|
2828
Cargo.lock
generated
Normal file
2828
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
24
Cargo.toml
Normal file
24
Cargo.toml
Normal file
@ -0,0 +1,24 @@
|
||||
[package]
|
||||
name = "sdk_relay"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
async-trait = "0.1"
|
||||
bitcoincore-rpc = { version = "0.18" }
|
||||
env_logger = "0.9"
|
||||
futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
|
||||
hex = "0.4.3"
|
||||
log = "0.4.20"
|
||||
sdk_common = { git = "ssh://git@git.4nkweb.com/4nk/sdk_common.git", branch = "blindbit" }
|
||||
serde = { version = "1.0.193", features = ["derive"]}
|
||||
serde_json = "1.0"
|
||||
serde_with = "3.6.0"
|
||||
tokio = { version = "1.0.0", features = ["io-util", "rt-multi-thread", "macros", "sync"] }
|
||||
tokio-stream = "0.1"
|
||||
tokio-tungstenite = "0.21.0"
|
||||
zeromq = "0.4.1"
|
||||
|
||||
[dev-dependencies]
|
||||
mockall = "0.13.0"
|
46
Dockerfile
Normal file
46
Dockerfile
Normal file
@ -0,0 +1,46 @@
|
||||
# syntax=docker/dockerfile:1.4
|
||||
FROM rust:latest AS builder
|
||||
WORKDIR /app
|
||||
|
||||
# Configuration de git pour utiliser SSH
|
||||
RUN mkdir -p /root/.ssh && \
|
||||
ssh-keyscan git.4nkweb.com >> /root/.ssh/known_hosts
|
||||
|
||||
# Copie des fichiers de sdk_relay
|
||||
COPY Cargo.toml Cargo.lock ./
|
||||
COPY src/ src/
|
||||
|
||||
# Build avec support SSH pour récupérer les dépendances
|
||||
RUN --mount=type=ssh cargo build --release
|
||||
|
||||
# ---- image finale ----
|
||||
FROM debian:bookworm-slim
|
||||
RUN apt-get update && apt-get install -y ca-certificates strace
|
||||
|
||||
# Créer l'utilisateur bitcoin
|
||||
RUN useradd -m -d /home/bitcoin -u 1000 bitcoin
|
||||
|
||||
COPY --from=builder /app/target/release/sdk_relay /usr/local/bin/sdk_relay
|
||||
RUN chown bitcoin:bitcoin /usr/local/bin/sdk_relay && \
|
||||
chmod 755 /usr/local/bin/sdk_relay
|
||||
|
||||
# Configuration via build arg
|
||||
ARG CONF
|
||||
RUN echo "$CONF" > /home/bitcoin/.conf && \
|
||||
chown bitcoin:bitcoin /home/bitcoin/.conf && \
|
||||
chmod 644 /home/bitcoin/.conf
|
||||
|
||||
# Créer le répertoire .4nk avec les bonnes permissions
|
||||
RUN mkdir -p /home/bitcoin/.4nk && \
|
||||
chown -R bitcoin:bitcoin /home/bitcoin/.4nk && \
|
||||
chmod 755 /home/bitcoin/.4nk
|
||||
|
||||
WORKDIR /home/bitcoin
|
||||
USER bitcoin
|
||||
ENV HOME=/home/bitcoin
|
||||
|
||||
VOLUME ["/home/bitcoin/.4nk"]
|
||||
VOLUME ["/home/bitcoin/.bitcoin"]
|
||||
|
||||
EXPOSE 8090 8091
|
||||
ENTRYPOINT ["sdk_relay", "--config", "/home/bitcoin/.conf"]
|
769
src/commit.rs
Normal file
769
src/commit.rs
Normal file
@ -0,0 +1,769 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Mutex, MutexGuard, OnceLock},
|
||||
};
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
|
||||
use bitcoincore_rpc::bitcoin::hex::DisplayHex;
|
||||
use sdk_common::network::{AnkFlag, CommitMessage, HandshakeMessage};
|
||||
use sdk_common::process::{lock_processes, Process, ProcessState};
|
||||
use sdk_common::serialization::{OutPointMemberMap, OutPointProcessMap};
|
||||
use sdk_common::silentpayments::create_transaction;
|
||||
use sdk_common::sp_client::bitcoin::{Amount, OutPoint};
|
||||
use sdk_common::sp_client::{FeeRate, Recipient};
|
||||
use sdk_common::{
|
||||
pcd::Member,
|
||||
silentpayments::sign_transaction,
|
||||
sp_client::{silentpayments::SilentPaymentAddress, RecipientAddress},
|
||||
};
|
||||
|
||||
use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET};
|
||||
use crate::{
|
||||
message::{broadcast_message, BroadcastType},
|
||||
CHAIN_TIP,
|
||||
};
|
||||
|
||||
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
|
||||
let mut processes = lock_processes()?;
|
||||
if let Some(process) = processes.get_mut(&commit_msg.process_id) {
|
||||
handle_existing_commitment(process, &commit_msg)?;
|
||||
} else {
|
||||
let new_process = handle_new_process(&commit_msg)?;
|
||||
// Cache the process
|
||||
processes.insert(commit_msg.process_id, new_process);
|
||||
}
|
||||
|
||||
// Dump to disk
|
||||
dump_cached_processes(processes.clone())?;
|
||||
|
||||
// Add to frozen UTXOs
|
||||
lock_freezed_utxos()?.insert(commit_msg.process_id);
|
||||
|
||||
// Update processes with the
|
||||
// Send an update to all connected client
|
||||
let our_sp_address = WALLET
|
||||
.get()
|
||||
.ok_or(Error::msg("Wallet not initialized"))?
|
||||
.lock_anyhow()?
|
||||
.get_sp_client()
|
||||
.get_receiving_address();
|
||||
let mut new_process_map = HashMap::new();
|
||||
let new_process = processes.get(&commit_msg.process_id).unwrap().clone();
|
||||
new_process_map.insert(commit_msg.process_id, new_process);
|
||||
let current_tip = CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst);
|
||||
let init_msg = HandshakeMessage::new(
|
||||
our_sp_address.to_string(),
|
||||
OutPointMemberMap(HashMap::new()),
|
||||
OutPointProcessMap(new_process_map),
|
||||
current_tip.into(),
|
||||
);
|
||||
|
||||
if let Err(e) = broadcast_message(
|
||||
AnkFlag::Handshake,
|
||||
format!("{}", init_msg.to_string()),
|
||||
BroadcastType::ToAll,
|
||||
) {
|
||||
log::error!("Failed to send handshake message: {}", e);
|
||||
}
|
||||
|
||||
Ok(commit_msg.process_id)
|
||||
}
|
||||
|
||||
fn handle_new_process(commit_msg: &CommitMessage) -> Result<Process> {
|
||||
let pcd_commitment = &commit_msg.pcd_commitment;
|
||||
|
||||
let merkle_root_bin = pcd_commitment.create_merkle_tree()?.root().unwrap();
|
||||
|
||||
if let Ok(pairing_process_id) = handle_member_list(&commit_msg) {
|
||||
dump_cached_members()?;
|
||||
// Send a handshake message to every connected client
|
||||
if let Some(new_member) = lock_members().unwrap().get(&pairing_process_id) {
|
||||
let our_sp_address = WALLET
|
||||
.get()
|
||||
.ok_or(Error::msg("Wallet not initialized"))?
|
||||
.lock_anyhow()?
|
||||
.get_sp_client()
|
||||
.get_receiving_address();
|
||||
let mut new_member_map = HashMap::new();
|
||||
new_member_map.insert(pairing_process_id, new_member.clone());
|
||||
let init_msg = HandshakeMessage::new(
|
||||
our_sp_address.into(),
|
||||
OutPointMemberMap(new_member_map),
|
||||
OutPointProcessMap(HashMap::new()),
|
||||
CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst).into(),
|
||||
);
|
||||
|
||||
if let Err(e) = broadcast_message(
|
||||
AnkFlag::Handshake,
|
||||
format!("{}", init_msg.to_string()),
|
||||
BroadcastType::ToAll,
|
||||
) {
|
||||
log::error!("Failed to send handshake message: {}", e);
|
||||
}
|
||||
} else {
|
||||
log::error!(
|
||||
"Failed to find new member with process id {}",
|
||||
pairing_process_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let mut new_process = Process::new(commit_msg.process_id);
|
||||
let init_state = ProcessState {
|
||||
commited_in: commit_msg.process_id,
|
||||
roles: commit_msg.roles.clone(),
|
||||
pcd_commitment: commit_msg.pcd_commitment.clone(),
|
||||
state_id: merkle_root_bin,
|
||||
public_data: commit_msg.public_data.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
new_process.insert_concurrent_state(init_state)?;
|
||||
|
||||
Ok(new_process)
|
||||
}
|
||||
|
||||
pub static MEMBERLIST: OnceLock<Mutex<HashMap<OutPoint, Member>>> = OnceLock::new();
|
||||
|
||||
pub fn lock_members() -> Result<MutexGuard<'static, HashMap<OutPoint, Member>>, anyhow::Error> {
|
||||
MEMBERLIST
|
||||
.get_or_init(|| Mutex::new(HashMap::new()))
|
||||
.lock_anyhow()
|
||||
}
|
||||
|
||||
fn handle_member_list(commit_msg: &CommitMessage) -> Result<OutPoint> {
|
||||
//Check if there is one role with one member
|
||||
if commit_msg.roles.len() != 1 {
|
||||
return Err(Error::msg("Process is not a pairing process"));
|
||||
}
|
||||
|
||||
if let Some(pairing_role) = commit_msg.roles.get("pairing") {
|
||||
if !pairing_role.members.is_empty() {
|
||||
return Err(Error::msg("Process is not a pairing process"));
|
||||
}
|
||||
} else {
|
||||
return Err(Error::msg("Process is not a pairing process"));
|
||||
}
|
||||
|
||||
if let Ok(paired_addresses) = commit_msg.public_data.get_as_json("pairedAddresses") {
|
||||
let paired_addresses: Vec<SilentPaymentAddress> = serde_json::from_value(paired_addresses.clone())?;
|
||||
let mut memberlist = lock_members()?;
|
||||
memberlist.insert(commit_msg.process_id, Member::new(paired_addresses));
|
||||
return Ok(commit_msg.process_id);
|
||||
}
|
||||
|
||||
Err(Error::msg("Process is not a pairing process"))
|
||||
}
|
||||
|
||||
fn handle_existing_commitment(
|
||||
process_to_udpate: &mut Process,
|
||||
commit_msg: &CommitMessage,
|
||||
) -> Result<()> {
|
||||
let process_id = process_to_udpate.get_process_id()?;
|
||||
match register_new_state(process_to_udpate, &commit_msg) {
|
||||
Ok(new_state_id) => log::debug!(
|
||||
"Registering new state for process {} with state id {}",
|
||||
process_id,
|
||||
new_state_id.to_lower_hex_string()
|
||||
),
|
||||
Err(existing_state_id) => log::debug!("State {} already exists", existing_state_id),
|
||||
}
|
||||
|
||||
if commit_msg.validation_tokens.len() > 0 {
|
||||
log::debug!(
|
||||
"Received commit_msg with {} validation tokens for process {}",
|
||||
commit_msg.validation_tokens.len(),
|
||||
process_id
|
||||
);
|
||||
// If the validation succeed, we return a new tip
|
||||
process_validation(process_to_udpate, commit_msg)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn dump_cached_members() -> Result<(), anyhow::Error> {
|
||||
let members = lock_members()?.clone();
|
||||
|
||||
let storage = STORAGE
|
||||
.get()
|
||||
.ok_or(Error::msg("STORAGE is not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
|
||||
let members_file = &storage.members_file;
|
||||
|
||||
let members_map = OutPointMemberMap(members);
|
||||
let json = serde_json::to_value(&members_map)?;
|
||||
members_file.save(&json)?;
|
||||
|
||||
log::debug!("saved members");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn dump_cached_processes(processes: HashMap<OutPoint, Process>) -> Result<(), anyhow::Error> {
|
||||
let storage = STORAGE
|
||||
.get()
|
||||
.ok_or(Error::msg("STORAGE is not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
|
||||
let processes_file = &storage.processes_file;
|
||||
|
||||
let outpoints_map = OutPointProcessMap(processes);
|
||||
let json = serde_json::to_value(&outpoints_map)?;
|
||||
processes_file.save(&json)?;
|
||||
|
||||
log::debug!("saved processes");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Register a new state
|
||||
fn register_new_state(process: &mut Process, commit_msg: &CommitMessage) -> Result<[u8; 32]> {
|
||||
let last_commited_state = process.get_latest_commited_state();
|
||||
|
||||
let new_state_id = commit_msg
|
||||
.pcd_commitment
|
||||
.create_merkle_tree()?
|
||||
.root()
|
||||
.unwrap();
|
||||
|
||||
if let Some(state) = last_commited_state {
|
||||
if new_state_id == state.state_id {
|
||||
return Err(Error::msg(format!(
|
||||
"{}",
|
||||
new_state_id.to_lower_hex_string()
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
let concurrent_states = process.get_latest_concurrent_states()?;
|
||||
let (empty_state, actual_states) = concurrent_states.split_last().unwrap();
|
||||
let current_outpoint = empty_state.commited_in;
|
||||
|
||||
// Ensure no duplicate states
|
||||
if actual_states
|
||||
.iter()
|
||||
.any(|state| state.state_id == new_state_id)
|
||||
{
|
||||
return Err(Error::msg(format!(
|
||||
"{}",
|
||||
new_state_id.to_lower_hex_string()
|
||||
)));
|
||||
}
|
||||
|
||||
// Add the new state
|
||||
let new_state = ProcessState {
|
||||
commited_in: current_outpoint,
|
||||
pcd_commitment: commit_msg.pcd_commitment.clone(),
|
||||
state_id: new_state_id.clone(),
|
||||
roles: commit_msg.roles.clone(),
|
||||
public_data: commit_msg.public_data.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
process.insert_concurrent_state(new_state)?;
|
||||
|
||||
Ok(new_state_id)
|
||||
}
|
||||
|
||||
// Process validation for a state with validation tokens
|
||||
fn process_validation(
|
||||
updated_process: &mut Process,
|
||||
commit_msg: &CommitMessage,
|
||||
) -> Result<OutPoint> {
|
||||
let new_state_id = if commit_msg.pcd_commitment.is_empty() {
|
||||
// We're dealing with an obliteration attempt
|
||||
[0u8; 32]
|
||||
} else {
|
||||
commit_msg
|
||||
.pcd_commitment
|
||||
.create_merkle_tree()?
|
||||
.root()
|
||||
.ok_or(Error::msg("Invalid merkle tree"))?
|
||||
};
|
||||
|
||||
{
|
||||
let state_to_update = updated_process.get_state_for_id_mut(&new_state_id)?;
|
||||
|
||||
// Complete with the received tokens
|
||||
state_to_update
|
||||
.validation_tokens
|
||||
.extend(commit_msg.validation_tokens.iter());
|
||||
|
||||
state_to_update.validation_tokens.sort_unstable();
|
||||
state_to_update.validation_tokens.dedup();
|
||||
}
|
||||
|
||||
let state_to_validate = updated_process.get_state_for_id(&new_state_id)?;
|
||||
let members = lock_members()?.clone();
|
||||
state_to_validate.is_valid(
|
||||
updated_process.get_latest_commited_state(),
|
||||
&OutPointMemberMap(members),
|
||||
)?;
|
||||
|
||||
let commited_in = commit_new_transaction(updated_process, state_to_validate.clone())?;
|
||||
|
||||
Ok(commited_in)
|
||||
}
|
||||
|
||||
// Commit the new transaction and update the process state
|
||||
fn commit_new_transaction(
|
||||
updated_process: &mut Process,
|
||||
state_to_commit: ProcessState,
|
||||
) -> Result<OutPoint> {
|
||||
let sp_wallet = WALLET
|
||||
.get()
|
||||
.ok_or(Error::msg("Wallet not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
|
||||
let commitment_payload = Vec::from(state_to_commit.state_id);
|
||||
|
||||
let mut recipients = vec![];
|
||||
recipients.push(Recipient {
|
||||
address: RecipientAddress::SpAddress(sp_wallet.get_sp_client().get_receiving_address()),
|
||||
amount: Amount::from_sat(1000),
|
||||
});
|
||||
|
||||
// TODO not sure if this is still used
|
||||
// If the process is a pairing, we add another output that directly pays the owner of the process
|
||||
// We can find out simply by looking at the members list
|
||||
if let Some(member) = lock_members()?.get(&updated_process.get_process_id().unwrap()) {
|
||||
// We just pick one of the devices of the member at random en pay to it, member can then share the private key between all devices
|
||||
// For now we take the first address
|
||||
let address: SilentPaymentAddress =
|
||||
member.get_addresses().get(0).unwrap().as_str().try_into()?;
|
||||
recipients.push(Recipient {
|
||||
address: RecipientAddress::SpAddress(address),
|
||||
amount: Amount::from_sat(1000),
|
||||
});
|
||||
}
|
||||
// This output is used to generate publicly available public keys without having to go through too many loops
|
||||
|
||||
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
|
||||
let fee_rate = daemon
|
||||
.estimate_fee(6)
|
||||
.unwrap_or(Amount::from_sat(1000))
|
||||
.checked_div(1000)
|
||||
.unwrap();
|
||||
|
||||
let mut freezed_utxos = lock_freezed_utxos()?;
|
||||
|
||||
let next_commited_in = updated_process.get_process_tip()?;
|
||||
if !freezed_utxos.contains(&next_commited_in) {
|
||||
return Err(Error::msg(format!(
|
||||
"Missing next commitment outpoint for process {}",
|
||||
updated_process.get_process_id()?
|
||||
)));
|
||||
};
|
||||
|
||||
let unspent_outputs = sp_wallet.get_unspent_outputs();
|
||||
let mut available_outpoints = vec![];
|
||||
// We push the next_commited_in at the top of the available outpoints
|
||||
if let Some(output) = unspent_outputs.get(&next_commited_in) {
|
||||
available_outpoints.push((next_commited_in, output.clone()));
|
||||
}
|
||||
|
||||
// We filter out freezed utxos
|
||||
for (outpoint, output) in unspent_outputs {
|
||||
if !freezed_utxos.contains(&outpoint) {
|
||||
available_outpoints.push((outpoint, output));
|
||||
}
|
||||
}
|
||||
|
||||
let unsigned_transaction = create_transaction(
|
||||
available_outpoints,
|
||||
sp_wallet.get_sp_client(),
|
||||
recipients,
|
||||
Some(commitment_payload),
|
||||
FeeRate::from_sat_per_vb(fee_rate.to_sat() as f32),
|
||||
)?;
|
||||
|
||||
let final_tx = sign_transaction(sp_wallet.get_sp_client(), unsigned_transaction)?;
|
||||
|
||||
daemon.test_mempool_accept(&final_tx)?;
|
||||
let txid = daemon.broadcast(&final_tx)?;
|
||||
let commited_in = OutPoint::new(txid, 0);
|
||||
|
||||
freezed_utxos.insert(commited_in);
|
||||
freezed_utxos.remove(&next_commited_in);
|
||||
updated_process.remove_all_concurrent_states()?;
|
||||
updated_process.insert_concurrent_state(state_to_commit)?;
|
||||
updated_process.update_states_tip(commited_in)?;
|
||||
|
||||
Ok(commited_in)
|
||||
}
|
||||
|
||||
// TODO tests are broken, we need a complete overhaul to make it work again
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use crate::daemon::RpcCall;
|
||||
use crate::DiskStorage;
|
||||
use crate::StateFile;
|
||||
use bitcoincore_rpc::bitcoin::consensus::deserialize;
|
||||
use bitcoincore_rpc::bitcoin::hex::DisplayHex;
|
||||
use bitcoincore_rpc::bitcoin::*;
|
||||
use mockall::mock;
|
||||
use mockall::predicate::*;
|
||||
use sdk_common::pcd::Member;
|
||||
use sdk_common::pcd::Pcd;
|
||||
use sdk_common::pcd::PcdCommitments;
|
||||
use sdk_common::pcd::RoleDefinition;
|
||||
use sdk_common::pcd::Roles;
|
||||
use sdk_common::pcd::ValidationRule;
|
||||
use sdk_common::process::CACHEDPROCESSES;
|
||||
use sdk_common::sp_client::bitcoin::consensus::serialize;
|
||||
use sdk_common::sp_client::bitcoin::hex::FromHex;
|
||||
use sdk_common::sp_client::silentpayments::SilentPaymentAddress;
|
||||
use serde_json::json;
|
||||
use serde_json::{Map, Value};
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
const LOCAL_ADDRESS: &str = "sprt1qq222dhaxlzmjft2pa7qtspw2aw55vwfmtnjyllv5qrsqwm3nufxs6q7t88jf9asvd7rxhczt87de68du3jhem54xvqxy80wc6ep7lauxacsrq79v";
|
||||
const INIT_TRANSACTION: &str = "02000000000102b01b832bf34cf87583c628839c5316546646dcd4939e339c1d83e693216cdfa00100000000fdffffffdd1ca865b199accd4801634488fca87e0cf81b36ee7e9bec526a8f922539b8670000000000fdffffff0200e1f505000000001600140798fac9f310cefad436ea928f0bdacf03a11be544e0f5050000000016001468a66f38e7c2c9e367577d6fad8532ae2c728ed2014043764b77de5041f80d19e3d872f205635f87486af015c00d2a3b205c694a0ae1cbc60e70b18bcd4470abbd777de63ae52600aba8f5ad1334cdaa6bcd931ab78b0140b56dd8e7ac310d6dcbc3eff37f111ced470990d911b55cd6ff84b74b579c17d0bba051ec23b738eeeedba405a626d95f6bdccb94c626db74c57792254bfc5a7c00000000";
|
||||
const TMP_WALLET: &str = "/tmp/.4nk/wallet";
|
||||
const TMP_PROCESSES: &str = "/tmp/.4nk/processes";
|
||||
const TMP_MEMBERS: &str = "/tmp/.4nk/members";
|
||||
|
||||
// Define the mock for Daemon with the required methods
|
||||
mock! {
|
||||
#[derive(Debug)]
|
||||
pub Daemon {}
|
||||
|
||||
impl RpcCall for Daemon {
|
||||
fn connect(
|
||||
rpcwallet: Option<String>,
|
||||
rpc_url: String,
|
||||
network: bitcoincore_rpc::bitcoin::Network,
|
||||
) -> Result<Self> where Self: Sized;
|
||||
|
||||
fn estimate_fee(&self, nblocks: u16) -> Result<Amount>;
|
||||
|
||||
fn get_relay_fee(&self) -> Result<Amount>;
|
||||
|
||||
fn get_current_height(&self) -> Result<u64>;
|
||||
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Block>;
|
||||
|
||||
fn get_filters(&self, block_height: u32) -> Result<(u32, BlockHash, bip158::BlockFilter)>;
|
||||
|
||||
fn list_unspent_from_to(
|
||||
&self,
|
||||
minamt: Option<Amount>,
|
||||
) -> Result<Vec<bitcoincore_rpc::json::ListUnspentResultEntry>>;
|
||||
|
||||
fn create_psbt(
|
||||
&self,
|
||||
unspents: &[bitcoincore_rpc::json::ListUnspentResultEntry],
|
||||
spk: ScriptBuf,
|
||||
network: Network,
|
||||
) -> Result<String>;
|
||||
|
||||
fn process_psbt(&self, psbt: String) -> Result<String>;
|
||||
|
||||
fn finalize_psbt(&self, psbt: String) -> Result<String>;
|
||||
|
||||
fn get_network(&self) -> Result<Network>;
|
||||
|
||||
fn test_mempool_accept(
|
||||
&self,
|
||||
tx: &Transaction,
|
||||
) -> Result<crate::bitcoin_json::TestMempoolAcceptResult>;
|
||||
|
||||
fn broadcast(&self, tx: &Transaction) -> Result<Txid>;
|
||||
|
||||
fn get_transaction_info(
|
||||
&self,
|
||||
txid: &Txid,
|
||||
blockhash: Option<BlockHash>,
|
||||
) -> Result<Value>;
|
||||
|
||||
fn get_transaction_hex(
|
||||
&self,
|
||||
txid: &Txid,
|
||||
blockhash: Option<BlockHash>,
|
||||
) -> Result<Value>;
|
||||
|
||||
fn get_transaction(
|
||||
&self,
|
||||
txid: &Txid,
|
||||
blockhash: Option<BlockHash>,
|
||||
) -> Result<Transaction>;
|
||||
|
||||
fn get_block_txids(&self, blockhash: BlockHash) -> Result<Vec<Txid>>;
|
||||
|
||||
fn get_mempool_txids(&self) -> Result<Vec<Txid>>;
|
||||
|
||||
fn get_mempool_entries(
|
||||
&self,
|
||||
txids: &[Txid],
|
||||
) -> Result<Vec<Result<bitcoincore_rpc::json::GetMempoolEntryResult>>>;
|
||||
|
||||
fn get_mempool_transactions(
|
||||
&self,
|
||||
txids: &[Txid],
|
||||
) -> Result<Vec<Result<Transaction>>>;
|
||||
}
|
||||
}
|
||||
|
||||
mock! {
|
||||
#[derive(Debug)]
|
||||
pub SpWallet {
|
||||
fn get_receiving_address(&self) -> Result<String>;
|
||||
}
|
||||
}
|
||||
|
||||
mock! {
|
||||
#[derive(Debug)]
|
||||
pub SilentPaymentWallet {
|
||||
fn get_sp_wallet(&self) -> Result<MockSpWallet>;
|
||||
}
|
||||
}
|
||||
|
||||
static WALLET: OnceLock<MockSilentPaymentWallet> = OnceLock::new();
|
||||
|
||||
pub fn initialize_static_variables() {
|
||||
if DAEMON.get().is_none() {
|
||||
let mut daemon = MockDaemon::new();
|
||||
daemon
|
||||
.expect_broadcast()
|
||||
.withf(|tx: &Transaction| serialize(tx).to_lower_hex_string() == INIT_TRANSACTION)
|
||||
.returning(|tx| Ok(tx.txid()));
|
||||
DAEMON
|
||||
.set(Mutex::new(Box::new(daemon)))
|
||||
.expect("DAEMON should only be initialized once");
|
||||
println!("Initialized DAEMON");
|
||||
}
|
||||
|
||||
if WALLET.get().is_none() {
|
||||
let mut wallet = MockSilentPaymentWallet::new();
|
||||
wallet
|
||||
.expect_get_sp_wallet()
|
||||
.returning(|| Ok(MockSpWallet::new()));
|
||||
WALLET
|
||||
.set(wallet)
|
||||
.expect("WALLET should only be initialized once");
|
||||
println!("Initialized WALLET");
|
||||
}
|
||||
|
||||
if CACHEDPROCESSES.get().is_none() {
|
||||
CACHEDPROCESSES
|
||||
.set(Mutex::new(HashMap::new()))
|
||||
.expect("CACHEDPROCESSES should only be initialized once");
|
||||
|
||||
println!("Initialized CACHEDPROCESSES");
|
||||
}
|
||||
|
||||
if STORAGE.get().is_none() {
|
||||
let wallet_file = StateFile::new(PathBuf::from_str(TMP_WALLET).unwrap());
|
||||
let processes_file = StateFile::new(PathBuf::from_str(TMP_PROCESSES).unwrap());
|
||||
let members_file = StateFile::new(PathBuf::from_str(TMP_MEMBERS).unwrap());
|
||||
|
||||
wallet_file.create().unwrap();
|
||||
processes_file.create().unwrap();
|
||||
members_file.create().unwrap();
|
||||
|
||||
let disk_storage = DiskStorage {
|
||||
wallet_file,
|
||||
processes_file,
|
||||
members_file,
|
||||
};
|
||||
STORAGE
|
||||
.set(Mutex::new(disk_storage))
|
||||
.expect("STORAGE should initialize only once");
|
||||
|
||||
println!("Initialized STORAGE");
|
||||
}
|
||||
}
|
||||
|
||||
fn mock_commit_msg(process_id: OutPoint) -> CommitMessage {
|
||||
let field_names = [
|
||||
"a".to_owned(),
|
||||
"b".to_owned(),
|
||||
"pub_a".to_owned(),
|
||||
"roles".to_owned(),
|
||||
];
|
||||
let pairing_id = OutPoint::from_str(
|
||||
"b0c8378ee68e9a73836b04423ddb6de9fc0e2e715e04ffe6aa34117bb1025f01:0",
|
||||
)
|
||||
.unwrap();
|
||||
let member = Member::new(vec![SilentPaymentAddress::try_from(LOCAL_ADDRESS).unwrap()]);
|
||||
let validation_rule = ValidationRule::new(1.0, Vec::from(field_names), 1.0).unwrap();
|
||||
|
||||
let role_def = RoleDefinition {
|
||||
members: vec![pairing_id],
|
||||
validation_rules: vec![validation_rule],
|
||||
storages: vec![],
|
||||
};
|
||||
let roles = Roles::new(BTreeMap::from([(String::from("role_name"), role_def)]));
|
||||
let public_data = TryInto::<Pcd>::try_into(json!({"pub_a": Value::Null})).unwrap();
|
||||
let clear_state =
|
||||
TryInto::<Pcd>::try_into(json!({"a": Value::Null, "b": Value::Null})).unwrap();
|
||||
let pcd_commitments = PcdCommitments::new(
|
||||
&process_id,
|
||||
&Pcd::new(public_data.clone().into_iter().chain(clear_state).collect()),
|
||||
&roles,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let commit_msg = CommitMessage {
|
||||
process_id,
|
||||
roles,
|
||||
public_data,
|
||||
validation_tokens: vec![],
|
||||
pcd_commitment: pcd_commitments,
|
||||
error: None,
|
||||
};
|
||||
|
||||
commit_msg
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_handle_commit_new_process() {
|
||||
initialize_static_variables();
|
||||
let init_tx =
|
||||
deserialize::<Transaction>(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap();
|
||||
let init_txid = init_tx.txid();
|
||||
let process_id = OutPoint::new(init_txid, 0);
|
||||
|
||||
let commit_msg = mock_commit_msg(process_id);
|
||||
|
||||
let roles = commit_msg.roles.clone();
|
||||
let pcd_commitment = commit_msg.pcd_commitment.clone();
|
||||
|
||||
let empty_state = ProcessState {
|
||||
commited_in: process_id,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let result = handle_commit_request(commit_msg);
|
||||
|
||||
assert_eq!(result.unwrap(), process_id);
|
||||
|
||||
let cache = CACHEDPROCESSES.get().unwrap().lock().unwrap();
|
||||
let updated_process = cache.get(&process_id);
|
||||
|
||||
assert!(updated_process.is_some());
|
||||
let concurrent_states = updated_process
|
||||
.unwrap()
|
||||
.get_latest_concurrent_states()
|
||||
.unwrap();
|
||||
|
||||
// Constructing the roles_map that was inserted in the process
|
||||
let roles_object = serde_json::to_value(roles).unwrap();
|
||||
let mut roles_map = Map::new();
|
||||
roles_map.insert("roles".to_owned(), roles_object);
|
||||
let new_state = ProcessState {
|
||||
commited_in: process_id,
|
||||
pcd_commitment,
|
||||
..Default::default()
|
||||
};
|
||||
let target = vec![&empty_state, &new_state];
|
||||
|
||||
assert_eq!(concurrent_states, target);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_handle_commit_new_state() {
|
||||
initialize_static_variables();
|
||||
let init_tx =
|
||||
deserialize::<Transaction>(&Vec::from_hex(INIT_TRANSACTION).unwrap()).unwrap();
|
||||
let init_txid = init_tx.txid();
|
||||
let process_id = OutPoint::new(init_txid, 0);
|
||||
|
||||
let commit_msg = mock_commit_msg(process_id);
|
||||
|
||||
let roles = commit_msg.roles.clone();
|
||||
let pcd_commitment = commit_msg.pcd_commitment.clone();
|
||||
|
||||
let process = Process::new(process_id);
|
||||
CACHEDPROCESSES
|
||||
.get()
|
||||
.unwrap()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(process_id, process);
|
||||
|
||||
let result = handle_commit_request(commit_msg);
|
||||
|
||||
assert_eq!(result.unwrap(), process_id);
|
||||
|
||||
let cache = CACHEDPROCESSES.get().unwrap().lock().unwrap();
|
||||
let updated_process = cache.get(&process_id);
|
||||
|
||||
assert!(updated_process.is_some());
|
||||
let concurrent_states = updated_process
|
||||
.unwrap()
|
||||
.get_latest_concurrent_states()
|
||||
.unwrap();
|
||||
|
||||
let roles_object = serde_json::to_value(roles).unwrap();
|
||||
let mut roles_map = Map::new();
|
||||
roles_map.insert("roles".to_owned(), roles_object);
|
||||
let new_state = ProcessState {
|
||||
commited_in: process_id,
|
||||
pcd_commitment,
|
||||
..Default::default()
|
||||
};
|
||||
let empty_state = ProcessState {
|
||||
commited_in: process_id,
|
||||
..Default::default()
|
||||
};
|
||||
let target = vec![&empty_state, &new_state];
|
||||
|
||||
assert_eq!(concurrent_states, target);
|
||||
}
|
||||
|
||||
// #[test]
|
||||
// fn test_handle_commit_request_invalid_init_tx() {
|
||||
// let commit_msg = CommitMessage {
|
||||
// init_tx: "invalid_tx_hex".to_string(),
|
||||
// roles: HashMap::new(),
|
||||
// validation_tokens: vec![],
|
||||
// pcd_commitment: json!({"roles": "expected_roles"}).as_object().unwrap().clone(),
|
||||
// };
|
||||
|
||||
// // Call the function under test
|
||||
// let result = handle_commit_request(commit_msg);
|
||||
|
||||
// // Assertions for error
|
||||
// assert!(result.is_err());
|
||||
// assert_eq!(result.unwrap_err().to_string(), "init_tx must be a valid transaction or txid");
|
||||
// }
|
||||
|
||||
// // Example test for adding a new state to an existing commitment
|
||||
// #[test]
|
||||
// fn test_handle_commit_request_add_state() {
|
||||
// // Set up data for adding a state to an existing commitment
|
||||
// let commit_msg = CommitMessage {
|
||||
// init_tx: "existing_outpoint_hex".to_string(),
|
||||
// roles: HashMap::new(),
|
||||
// validation_tokens: vec![],
|
||||
// pcd_commitment: json!({"roles": "expected_roles"}).as_object().unwrap().clone(),
|
||||
// };
|
||||
|
||||
// // Mock daemon and cache initialization
|
||||
// let mut daemon = MockDaemon::new();
|
||||
// daemon.expect_broadcast().returning(|_| Ok(Txid::new()));
|
||||
// DAEMON.set(Arc::new(Mutex::new(daemon))).unwrap();
|
||||
|
||||
// let process_state = Process::new(vec![], vec![]);
|
||||
// CACHEDPROCESSES.lock().unwrap().insert(OutPoint::new("mock_txid", 0), process_state);
|
||||
|
||||
// // Run the function
|
||||
// let result = handle_commit_request(commit_msg);
|
||||
|
||||
// // Assert success and that a new state was added
|
||||
// assert!(result.is_ok());
|
||||
// assert_eq!(result.unwrap(), OutPoint::new("mock_txid", 0));
|
||||
// }
|
||||
|
||||
// // Additional tests for errors and validation tokens would follow a similar setup
|
||||
}
|
81
src/config.rs
Normal file
81
src/config.rs
Normal file
@ -0,0 +1,81 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufRead};
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
|
||||
use sdk_common::sp_client::bitcoin::Network;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Config {
|
||||
pub core_url: String,
|
||||
pub core_wallet: Option<String>,
|
||||
pub ws_url: String,
|
||||
pub wallet_name: String,
|
||||
pub network: Network,
|
||||
pub blindbit_url: String,
|
||||
pub zmq_url: String,
|
||||
pub data_dir: String,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn read_from_file(filename: &str) -> Result<Self> {
|
||||
let mut file_content = HashMap::new();
|
||||
if let Ok(file) = File::open(filename) {
|
||||
let reader = io::BufReader::new(file);
|
||||
|
||||
// Read the file line by line
|
||||
for line in reader.lines() {
|
||||
if let Ok(l) = line {
|
||||
// Ignore comments and empty lines
|
||||
if l.starts_with('#') || l.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Split the line into key and value
|
||||
if let Some((k, v)) = l.split_once('=') {
|
||||
file_content.insert(k.to_owned(), v.trim_matches('\"').to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(anyhow::Error::msg("Failed to find conf file"));
|
||||
}
|
||||
|
||||
// Now set the Config
|
||||
let config = Config {
|
||||
core_url: file_content
|
||||
.remove("core_url")
|
||||
.ok_or(Error::msg("No \"core_url\""))?
|
||||
.to_owned(),
|
||||
core_wallet: file_content.remove("core_wallet").map(|s| s.to_owned()),
|
||||
ws_url: file_content
|
||||
.remove("ws_url")
|
||||
.ok_or(Error::msg("No \"ws_url\""))?
|
||||
.to_owned(),
|
||||
wallet_name: file_content
|
||||
.remove("wallet_name")
|
||||
.ok_or(Error::msg("No \"wallet_name\""))?
|
||||
.to_owned(),
|
||||
network: Network::from_core_arg(
|
||||
&file_content
|
||||
.remove("network")
|
||||
.ok_or(Error::msg("no \"network\""))?
|
||||
.trim_matches('\"'),
|
||||
)?,
|
||||
blindbit_url: file_content
|
||||
.remove("blindbit_url")
|
||||
.ok_or(Error::msg("No \"blindbit_url\""))?
|
||||
.to_owned(),
|
||||
zmq_url: file_content
|
||||
.remove("zmq_url")
|
||||
.ok_or(Error::msg("No \"zmq_url\""))?
|
||||
.to_owned(),
|
||||
data_dir: file_content
|
||||
.remove("data_dir")
|
||||
.unwrap_or_else(|| ".4nk".to_string()),
|
||||
};
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
}
|
460
src/daemon.rs
Normal file
460
src/daemon.rs
Normal file
@ -0,0 +1,460 @@
|
||||
use anyhow::{Context, Error, Result};
|
||||
|
||||
use bitcoincore_rpc::json::{
|
||||
CreateRawTransactionInput, ListUnspentQueryOptions, ListUnspentResultEntry,
|
||||
WalletCreateFundedPsbtOptions,
|
||||
};
|
||||
use bitcoincore_rpc::{json, jsonrpc, Auth, Client, RpcApi};
|
||||
use sdk_common::sp_client::bitcoin::bip158::BlockFilter;
|
||||
use sdk_common::sp_client::bitcoin::{
|
||||
block, Address, Amount, Block, BlockHash, Network, OutPoint, Psbt, ScriptBuf, Sequence,
|
||||
Transaction, TxIn, TxOut, Txid,
|
||||
};
|
||||
use sdk_common::sp_client::bitcoin::{consensus::deserialize, hashes::hex::FromHex};
|
||||
// use crossbeam_channel::Receiver;
|
||||
// use parking_lot::Mutex;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::fs::File;
|
||||
use std::io::Read;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::FAUCET_AMT;
|
||||
|
||||
pub struct SensitiveAuth(pub Auth);
|
||||
|
||||
impl SensitiveAuth {
|
||||
pub(crate) fn get_auth(&self) -> Auth {
|
||||
self.0.clone()
|
||||
}
|
||||
}
|
||||
|
||||
enum PollResult {
|
||||
Done(Result<()>),
|
||||
Retry,
|
||||
}
|
||||
|
||||
fn rpc_poll(client: &mut Client, skip_block_download_wait: bool) -> PollResult {
|
||||
match client.get_blockchain_info() {
|
||||
Ok(info) => {
|
||||
if skip_block_download_wait {
|
||||
// bitcoind RPC is available, don't wait for block download to finish
|
||||
return PollResult::Done(Ok(()));
|
||||
}
|
||||
let left_blocks = info.headers - info.blocks;
|
||||
if info.initial_block_download || left_blocks > 0 {
|
||||
log::info!(
|
||||
"waiting for {} blocks to download{}",
|
||||
left_blocks,
|
||||
if info.initial_block_download {
|
||||
" (IBD)"
|
||||
} else {
|
||||
""
|
||||
}
|
||||
);
|
||||
return PollResult::Retry;
|
||||
}
|
||||
PollResult::Done(Ok(()))
|
||||
}
|
||||
Err(err) => {
|
||||
if let Some(e) = extract_bitcoind_error(&err) {
|
||||
if e.code == -28 {
|
||||
log::debug!("waiting for RPC warmup: {}", e.message);
|
||||
return PollResult::Retry;
|
||||
}
|
||||
}
|
||||
PollResult::Done(Err(err).context("daemon not available"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn read_cookie(path: &Path) -> Result<(String, String)> {
|
||||
// Load username and password from bitcoind cookie file:
|
||||
// * https://github.com/bitcoin/bitcoin/pull/6388/commits/71cbeaad9a929ba6a7b62d9b37a09b214ae00c1a
|
||||
// * https://bitcoin.stackexchange.com/questions/46782/rpc-cookie-authentication
|
||||
let mut file = File::open(path)
|
||||
.with_context(|| format!("failed to open bitcoind cookie file: {}", path.display()))?;
|
||||
let mut contents = String::new();
|
||||
file.read_to_string(&mut contents)
|
||||
.with_context(|| format!("failed to read bitcoind cookie from {}", path.display()))?;
|
||||
|
||||
let parts: Vec<&str> = contents.splitn(2, ':').collect();
|
||||
anyhow::ensure!(
|
||||
parts.len() == 2,
|
||||
"failed to parse bitcoind cookie - missing ':' separator"
|
||||
);
|
||||
Ok((parts[0].to_owned(), parts[1].to_owned()))
|
||||
}
|
||||
|
||||
fn rpc_connect(rpcwallet: Option<String>, network: Network, mut rpc_url: String) -> Result<Client> {
|
||||
match rpcwallet {
|
||||
Some(rpcwallet) => rpc_url.push_str(&rpcwallet),
|
||||
None => (),
|
||||
}
|
||||
|
||||
// Allow `wait_for_new_block` to take a bit longer before timing out.
|
||||
// See https://github.com/romanz/electrs/issues/495 for more details.
|
||||
let builder = jsonrpc::simple_http::SimpleHttpTransport::builder()
|
||||
.url(&rpc_url)?
|
||||
.timeout(Duration::from_secs(30));
|
||||
let home = env::var("HOME")?;
|
||||
let mut cookie_path = PathBuf::from_str(&home)?;
|
||||
cookie_path.push(".bitcoin");
|
||||
cookie_path.push(network.to_core_arg());
|
||||
cookie_path.push(".cookie");
|
||||
let daemon_auth = SensitiveAuth(Auth::CookieFile(cookie_path));
|
||||
let builder = match daemon_auth.get_auth() {
|
||||
Auth::None => builder,
|
||||
Auth::UserPass(user, pass) => builder.auth(user, Some(pass)),
|
||||
Auth::CookieFile(path) => {
|
||||
let (user, pass) = read_cookie(&path)?;
|
||||
builder.auth(user, Some(pass))
|
||||
}
|
||||
};
|
||||
Ok(Client::from_jsonrpc(jsonrpc::Client::with_transport(
|
||||
builder.build(),
|
||||
)))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Daemon {
|
||||
rpc: Client,
|
||||
}
|
||||
|
||||
impl RpcCall for Daemon {
|
||||
fn connect(rpcwallet: Option<String>, rpc_url: String, network: Network) -> Result<Self> {
|
||||
let mut rpc = rpc_connect(rpcwallet, network, rpc_url)?;
|
||||
|
||||
loop {
|
||||
match rpc_poll(&mut rpc, false) {
|
||||
PollResult::Done(result) => {
|
||||
result.context("bitcoind RPC polling failed")?;
|
||||
break; // on success, finish polling
|
||||
}
|
||||
PollResult::Retry => {
|
||||
std::thread::sleep(std::time::Duration::from_secs(1)); // wait a bit before polling
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let network_info = rpc.get_network_info()?;
|
||||
if !network_info.network_active {
|
||||
anyhow::bail!("electrs requires active bitcoind p2p network");
|
||||
}
|
||||
let info = rpc.get_blockchain_info()?;
|
||||
if info.pruned {
|
||||
anyhow::bail!("electrs requires non-pruned bitcoind node");
|
||||
}
|
||||
|
||||
Ok(Self { rpc })
|
||||
}
|
||||
|
||||
fn estimate_fee(&self, nblocks: u16) -> Result<Amount> {
|
||||
let res = self
|
||||
.rpc
|
||||
.estimate_smart_fee(nblocks, None)
|
||||
.context("failed to estimate fee")?;
|
||||
if res.errors.is_some() {
|
||||
Err(Error::msg(serde_json::to_string(&res.errors.unwrap())?))
|
||||
} else {
|
||||
Ok(res.fee_rate.unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
fn get_relay_fee(&self) -> Result<Amount> {
|
||||
Ok(self
|
||||
.rpc
|
||||
.get_network_info()
|
||||
.context("failed to get relay fee")?
|
||||
.relay_fee)
|
||||
}
|
||||
|
||||
fn get_current_height(&self) -> Result<u64> {
|
||||
Ok(self
|
||||
.rpc
|
||||
.get_block_count()
|
||||
.context("failed to get block count")?)
|
||||
}
|
||||
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Block> {
|
||||
Ok(self
|
||||
.rpc
|
||||
.get_block(&block_hash)
|
||||
.context("failed to get block")?)
|
||||
}
|
||||
|
||||
fn get_filters(&self, block_height: u32) -> Result<(u32, BlockHash, BlockFilter)> {
|
||||
let block_hash = self.rpc.get_block_hash(block_height.try_into()?)?;
|
||||
let filter = self
|
||||
.rpc
|
||||
.get_block_filter(&block_hash)
|
||||
.context("failed to get block filter")?
|
||||
.into_filter();
|
||||
Ok((block_height, block_hash, filter))
|
||||
}
|
||||
|
||||
fn list_unspent_from_to(
|
||||
&self,
|
||||
minamt: Option<Amount>,
|
||||
) -> Result<Vec<json::ListUnspentResultEntry>> {
|
||||
let minimum_sum_amount = if minamt.is_none() || minamt <= FAUCET_AMT.checked_mul(2) {
|
||||
FAUCET_AMT.checked_mul(2)
|
||||
} else {
|
||||
minamt
|
||||
};
|
||||
Ok(self.rpc.list_unspent(
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Some(true),
|
||||
Some(ListUnspentQueryOptions {
|
||||
minimum_sum_amount,
|
||||
..Default::default()
|
||||
}),
|
||||
)?)
|
||||
}
|
||||
|
||||
fn create_psbt(
|
||||
&self,
|
||||
unspents: &[ListUnspentResultEntry],
|
||||
spk: ScriptBuf,
|
||||
network: Network,
|
||||
) -> Result<String> {
|
||||
let inputs: Vec<CreateRawTransactionInput> = unspents
|
||||
.iter()
|
||||
.map(|utxo| CreateRawTransactionInput {
|
||||
txid: utxo.txid,
|
||||
vout: utxo.vout,
|
||||
sequence: None,
|
||||
})
|
||||
.collect();
|
||||
let address = Address::from_script(&spk, network)?;
|
||||
let total_amt = unspents
|
||||
.iter()
|
||||
.fold(Amount::from_sat(0), |acc, x| acc + x.amount);
|
||||
|
||||
if total_amt < FAUCET_AMT {
|
||||
return Err(Error::msg("Not enought funds"));
|
||||
}
|
||||
|
||||
let mut outputs = HashMap::new();
|
||||
outputs.insert(address.to_string(), total_amt);
|
||||
|
||||
let options = WalletCreateFundedPsbtOptions {
|
||||
subtract_fee_from_outputs: vec![0],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let wallet_create_funded_result =
|
||||
self.rpc
|
||||
.wallet_create_funded_psbt(&inputs, &outputs, None, Some(options), None)?;
|
||||
|
||||
Ok(wallet_create_funded_result.psbt.to_string())
|
||||
}
|
||||
|
||||
fn process_psbt(&self, psbt: String) -> Result<String> {
|
||||
let processed_psbt = self.rpc.wallet_process_psbt(&psbt, None, None, None)?;
|
||||
match processed_psbt.complete {
|
||||
true => Ok(processed_psbt.psbt),
|
||||
false => Err(Error::msg("Failed to complete the psbt")),
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize_psbt(&self, psbt: String) -> Result<String> {
|
||||
let final_tx = self.rpc.finalize_psbt(&psbt, Some(false))?;
|
||||
|
||||
match final_tx.complete {
|
||||
true => Ok(final_tx
|
||||
.psbt
|
||||
.expect("We shouldn't have an empty psbt for a complete return")),
|
||||
false => Err(Error::msg("Failed to finalize psbt")),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_network(&self) -> Result<Network> {
|
||||
let blockchain_info = self.rpc.get_blockchain_info()?;
|
||||
|
||||
Ok(blockchain_info.chain)
|
||||
}
|
||||
|
||||
fn test_mempool_accept(
|
||||
&self,
|
||||
tx: &Transaction,
|
||||
) -> Result<crate::bitcoin_json::TestMempoolAcceptResult> {
|
||||
let res = self.rpc.test_mempool_accept(&vec![tx])?;
|
||||
|
||||
Ok(res.get(0).unwrap().clone())
|
||||
}
|
||||
|
||||
fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
|
||||
let txid = self.rpc.send_raw_transaction(tx)?;
|
||||
|
||||
Ok(txid)
|
||||
}
|
||||
|
||||
fn get_transaction_info(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Value> {
|
||||
// No need to parse the resulting JSON, just return it as-is to the client.
|
||||
self.rpc
|
||||
.call(
|
||||
"getrawtransaction",
|
||||
&[json!(txid), json!(true), json!(blockhash)],
|
||||
)
|
||||
.context("failed to get transaction info")
|
||||
}
|
||||
|
||||
fn get_transaction_hex(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Value> {
|
||||
use sdk_common::sp_client::bitcoin::consensus::serde::{hex::Lower, Hex, With};
|
||||
|
||||
let tx = self.get_transaction(txid, blockhash)?;
|
||||
#[derive(serde::Serialize)]
|
||||
#[serde(transparent)]
|
||||
struct TxAsHex(#[serde(with = "With::<Hex<Lower>>")] Transaction);
|
||||
serde_json::to_value(TxAsHex(tx)).map_err(Into::into)
|
||||
}
|
||||
|
||||
fn get_transaction(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Transaction> {
|
||||
self.rpc
|
||||
.get_raw_transaction(txid, blockhash.as_ref())
|
||||
.context("failed to get transaction")
|
||||
}
|
||||
|
||||
fn get_block_txids(&self, blockhash: BlockHash) -> Result<Vec<Txid>> {
|
||||
Ok(self
|
||||
.rpc
|
||||
.get_block_info(&blockhash)
|
||||
.context("failed to get block txids")?
|
||||
.tx)
|
||||
}
|
||||
|
||||
fn get_mempool_txids(&self) -> Result<Vec<Txid>> {
|
||||
self.rpc
|
||||
.get_raw_mempool()
|
||||
.context("failed to get mempool txids")
|
||||
}
|
||||
|
||||
fn get_mempool_entries(
|
||||
&self,
|
||||
txids: &[Txid],
|
||||
) -> Result<Vec<Result<json::GetMempoolEntryResult>>> {
|
||||
let client = self.rpc.get_jsonrpc_client();
|
||||
log::debug!("getting {} mempool entries", txids.len());
|
||||
let args: Vec<_> = txids
|
||||
.iter()
|
||||
.map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()])
|
||||
.collect();
|
||||
let reqs: Vec<_> = args
|
||||
.iter()
|
||||
.map(|a| client.build_request("getmempoolentry", a))
|
||||
.collect();
|
||||
let res = client.send_batch(&reqs).context("batch request failed")?;
|
||||
log::debug!("got {} mempool entries", res.len());
|
||||
Ok(res
|
||||
.into_iter()
|
||||
.map(|r| {
|
||||
r.context("missing response")?
|
||||
.result::<json::GetMempoolEntryResult>()
|
||||
.context("invalid response")
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn get_mempool_transactions(&self, txids: &[Txid]) -> Result<Vec<Result<Transaction>>> {
|
||||
let client = self.rpc.get_jsonrpc_client();
|
||||
log::debug!("getting {} transactions", txids.len());
|
||||
let args: Vec<_> = txids
|
||||
.iter()
|
||||
.map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()])
|
||||
.collect();
|
||||
let reqs: Vec<_> = args
|
||||
.iter()
|
||||
.map(|a| client.build_request("getrawtransaction", a))
|
||||
.collect();
|
||||
let res = client.send_batch(&reqs).context("batch request failed")?;
|
||||
log::debug!("got {} mempool transactions", res.len());
|
||||
Ok(res
|
||||
.into_iter()
|
||||
.map(|r| -> Result<Transaction> {
|
||||
let tx_hex = r
|
||||
.context("missing response")?
|
||||
.result::<String>()
|
||||
.context("invalid response")?;
|
||||
let tx_bytes = Vec::from_hex(&tx_hex).context("non-hex transaction")?;
|
||||
deserialize(&tx_bytes).context("invalid transaction")
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait RpcCall: Send + Sync + std::fmt::Debug {
|
||||
fn connect(rpcwallet: Option<String>, rpc_url: String, network: Network) -> Result<Self>
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
fn estimate_fee(&self, nblocks: u16) -> Result<Amount>;
|
||||
|
||||
fn get_relay_fee(&self) -> Result<Amount>;
|
||||
|
||||
fn get_current_height(&self) -> Result<u64>;
|
||||
|
||||
fn get_block(&self, block_hash: BlockHash) -> Result<Block>;
|
||||
|
||||
fn get_filters(&self, block_height: u32) -> Result<(u32, BlockHash, BlockFilter)>;
|
||||
|
||||
fn list_unspent_from_to(
|
||||
&self,
|
||||
minamt: Option<Amount>,
|
||||
) -> Result<Vec<json::ListUnspentResultEntry>>;
|
||||
|
||||
fn create_psbt(
|
||||
&self,
|
||||
unspents: &[ListUnspentResultEntry],
|
||||
spk: ScriptBuf,
|
||||
network: Network,
|
||||
) -> Result<String>;
|
||||
|
||||
fn process_psbt(&self, psbt: String) -> Result<String>;
|
||||
|
||||
fn finalize_psbt(&self, psbt: String) -> Result<String>;
|
||||
|
||||
fn get_network(&self) -> Result<Network>;
|
||||
|
||||
fn test_mempool_accept(
|
||||
&self,
|
||||
tx: &Transaction,
|
||||
) -> Result<crate::bitcoin_json::TestMempoolAcceptResult>;
|
||||
|
||||
fn broadcast(&self, tx: &Transaction) -> Result<Txid>;
|
||||
|
||||
fn get_transaction_info(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Value>;
|
||||
|
||||
fn get_transaction_hex(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Value>;
|
||||
|
||||
fn get_transaction(&self, txid: &Txid, blockhash: Option<BlockHash>) -> Result<Transaction>;
|
||||
|
||||
fn get_block_txids(&self, blockhash: BlockHash) -> Result<Vec<Txid>>;
|
||||
|
||||
fn get_mempool_txids(&self) -> Result<Vec<Txid>>;
|
||||
|
||||
fn get_mempool_entries(
|
||||
&self,
|
||||
txids: &[Txid],
|
||||
) -> Result<Vec<Result<json::GetMempoolEntryResult>>>;
|
||||
|
||||
fn get_mempool_transactions(&self, txids: &[Txid]) -> Result<Vec<Result<Transaction>>>;
|
||||
}
|
||||
|
||||
pub(crate) type RpcError = bitcoincore_rpc::jsonrpc::error::RpcError;
|
||||
|
||||
pub(crate) fn extract_bitcoind_error(err: &bitcoincore_rpc::Error) -> Option<&RpcError> {
|
||||
use bitcoincore_rpc::{
|
||||
jsonrpc::error::Error::Rpc as ServerError, Error::JsonRpc as JsonRpcError,
|
||||
};
|
||||
match err {
|
||||
JsonRpcError(ServerError(e)) => Some(e),
|
||||
_ => None,
|
||||
}
|
||||
}
|
274
src/faucet.rs
Normal file
274
src/faucet.rs
Normal file
@ -0,0 +1,274 @@
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
|
||||
use bitcoincore_rpc::bitcoin::secp256k1::PublicKey;
|
||||
use bitcoincore_rpc::json::{self as bitcoin_json};
|
||||
use sdk_common::silentpayments::sign_transaction;
|
||||
use sdk_common::sp_client::bitcoin::secp256k1::{
|
||||
rand::thread_rng, Keypair, Message as Secp256k1Message, Secp256k1, ThirtyTwoByteHash,
|
||||
};
|
||||
use sdk_common::sp_client::bitcoin::{
|
||||
absolute::LockTime,
|
||||
consensus::serialize,
|
||||
hex::{DisplayHex, FromHex},
|
||||
key::TapTweak,
|
||||
script::PushBytesBuf,
|
||||
sighash::{Prevouts, SighashCache},
|
||||
taproot::Signature,
|
||||
transaction::Version,
|
||||
Amount, OutPoint, Psbt, ScriptBuf, TapSighashType, Transaction, TxIn, TxOut, Witness,
|
||||
XOnlyPublicKey,
|
||||
};
|
||||
use sdk_common::{
|
||||
network::{FaucetMessage, NewTxMessage},
|
||||
silentpayments::create_transaction,
|
||||
};
|
||||
|
||||
use sdk_common::sp_client::silentpayments::sending::generate_recipient_pubkeys;
|
||||
use sdk_common::sp_client::silentpayments::utils::sending::calculate_partial_secret;
|
||||
use sdk_common::sp_client::{FeeRate, OwnedOutput, Recipient, RecipientAddress};
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
|
||||
use crate::lock_freezed_utxos;
|
||||
use crate::scan::check_transaction_alone;
|
||||
use crate::{
|
||||
scan::compute_partial_tweak_to_transaction, MutexExt, SilentPaymentAddress, DAEMON, FAUCET_AMT,
|
||||
WALLET,
|
||||
};
|
||||
|
||||
fn spend_from_core(dest: XOnlyPublicKey) -> Result<(Transaction, Amount)> {
|
||||
let core = DAEMON
|
||||
.get()
|
||||
.ok_or(Error::msg("DAEMON not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
let unspent_list: Vec<bitcoin_json::ListUnspentResultEntry> =
|
||||
core.list_unspent_from_to(None)?;
|
||||
|
||||
if !unspent_list.is_empty() {
|
||||
let network = core.get_network()?;
|
||||
|
||||
let spk = ScriptBuf::new_p2tr_tweaked(dest.dangerous_assume_tweaked());
|
||||
|
||||
let new_psbt = core.create_psbt(&unspent_list, spk, network)?;
|
||||
let processed_psbt = core.process_psbt(new_psbt)?;
|
||||
let finalize_psbt_result = core.finalize_psbt(processed_psbt)?;
|
||||
let final_psbt = Psbt::from_str(&finalize_psbt_result)?;
|
||||
let total_fee = final_psbt.fee()?;
|
||||
let final_tx = final_psbt.extract_tx()?;
|
||||
let fee_rate = total_fee
|
||||
.checked_div(final_tx.weight().to_vbytes_ceil())
|
||||
.unwrap();
|
||||
|
||||
Ok((final_tx, fee_rate))
|
||||
} else {
|
||||
// we don't have enough available coins to pay for this faucet request
|
||||
Err(Error::msg("No spendable outputs"))
|
||||
}
|
||||
}
|
||||
|
||||
fn faucet_send(
|
||||
sp_address: SilentPaymentAddress,
|
||||
commitment: &str,
|
||||
) -> Result<(Transaction, PublicKey)> {
|
||||
let sp_wallet = WALLET
|
||||
.get()
|
||||
.ok_or(Error::msg("Wallet not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
|
||||
let fee_estimate = DAEMON
|
||||
.get()
|
||||
.ok_or(Error::msg("DAEMON not initialized"))?
|
||||
.lock_anyhow()?
|
||||
.estimate_fee(6)
|
||||
.unwrap_or(Amount::from_sat(1000))
|
||||
.checked_div(1000)
|
||||
.unwrap();
|
||||
|
||||
log::debug!("fee estimate for 6 blocks: {}", fee_estimate);
|
||||
|
||||
let recipient = Recipient {
|
||||
address: RecipientAddress::SpAddress(sp_address),
|
||||
amount: FAUCET_AMT,
|
||||
};
|
||||
|
||||
let freezed_utxos = lock_freezed_utxos()?;
|
||||
|
||||
// We filter out the freezed utxos from available list
|
||||
let available_outpoints: Vec<(OutPoint, OwnedOutput)> = sp_wallet
|
||||
.get_unspent_outputs()
|
||||
.iter()
|
||||
.filter_map(|(outpoint, output)| {
|
||||
if !freezed_utxos.contains(&outpoint) {
|
||||
Some((*outpoint, output.clone()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// If we had mandatory inputs, we would make sure to put them at the top of the list
|
||||
// We don't care for faucet though
|
||||
|
||||
// We try to pay the faucet amount
|
||||
if let Ok(unsigned_transaction) = create_transaction(
|
||||
available_outpoints,
|
||||
sp_wallet.get_sp_client(),
|
||||
vec![recipient],
|
||||
Some(Vec::from_hex(commitment).unwrap()),
|
||||
FeeRate::from_sat_per_vb(fee_estimate.to_sat() as f32),
|
||||
) {
|
||||
let final_tx = sign_transaction(sp_wallet.get_sp_client(), unsigned_transaction)?;
|
||||
|
||||
let partial_tweak = compute_partial_tweak_to_transaction(&final_tx)?;
|
||||
|
||||
let daemon = DAEMON
|
||||
.get()
|
||||
.ok_or(Error::msg("DAEMON not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
// First check that mempool accept it
|
||||
daemon.test_mempool_accept(&final_tx)?;
|
||||
let txid = daemon.broadcast(&final_tx)?;
|
||||
log::debug!("Sent tx {}", txid);
|
||||
|
||||
// We immediately add the new tx to our wallet to prevent accidental double spend
|
||||
check_transaction_alone(sp_wallet, &final_tx, &partial_tweak)?;
|
||||
|
||||
Ok((final_tx, partial_tweak))
|
||||
} else {
|
||||
// let's try to spend directly from the mining address
|
||||
let secp = Secp256k1::signing_only();
|
||||
let keypair = Keypair::new(&secp, &mut thread_rng());
|
||||
|
||||
// we first spend from core to the pubkey we just created
|
||||
let (core_tx, fee_rate) = spend_from_core(keypair.x_only_public_key().0)?;
|
||||
|
||||
// check that the first output of the transaction pays to the key we just created
|
||||
debug_assert!(
|
||||
core_tx.output[0].script_pubkey
|
||||
== ScriptBuf::new_p2tr_tweaked(
|
||||
keypair.x_only_public_key().0.dangerous_assume_tweaked()
|
||||
)
|
||||
);
|
||||
|
||||
// This is ugly and can be streamlined
|
||||
// create a new transaction that spends the newly created UTXO to the sp_address
|
||||
let mut faucet_tx = Transaction {
|
||||
input: vec![TxIn {
|
||||
previous_output: OutPoint::new(core_tx.txid(), 0),
|
||||
..Default::default()
|
||||
}],
|
||||
output: vec![],
|
||||
version: Version::TWO,
|
||||
lock_time: LockTime::ZERO,
|
||||
};
|
||||
|
||||
// now do the silent payment operations with the final recipient address
|
||||
let partial_secret = calculate_partial_secret(
|
||||
&[(keypair.secret_key(), true)],
|
||||
&[(core_tx.txid().to_string(), 0)],
|
||||
)?;
|
||||
|
||||
let ext_output_key: XOnlyPublicKey =
|
||||
generate_recipient_pubkeys(vec![sp_address.into()], partial_secret)?
|
||||
.into_values()
|
||||
.flatten()
|
||||
.collect::<Vec<XOnlyPublicKey>>()
|
||||
.get(0)
|
||||
.expect("Failed to generate keys")
|
||||
.to_owned();
|
||||
let change_sp_address = sp_wallet.get_sp_client().get_receiving_address();
|
||||
let change_output_key: XOnlyPublicKey =
|
||||
generate_recipient_pubkeys(vec![change_sp_address], partial_secret)?
|
||||
.into_values()
|
||||
.flatten()
|
||||
.collect::<Vec<XOnlyPublicKey>>()
|
||||
.get(0)
|
||||
.expect("Failed to generate keys")
|
||||
.to_owned();
|
||||
|
||||
let ext_spk = ScriptBuf::new_p2tr_tweaked(ext_output_key.dangerous_assume_tweaked());
|
||||
let change_spk = ScriptBuf::new_p2tr_tweaked(change_output_key.dangerous_assume_tweaked());
|
||||
|
||||
let mut op_return = PushBytesBuf::new();
|
||||
op_return.extend_from_slice(&Vec::from_hex(commitment)?)?;
|
||||
let data_spk = ScriptBuf::new_op_return(op_return);
|
||||
|
||||
// Take some margin to pay for the fees
|
||||
if core_tx.output[0].value < FAUCET_AMT * 4 {
|
||||
return Err(Error::msg("Not enough funds"));
|
||||
}
|
||||
|
||||
let change_amt = core_tx.output[0].value.checked_sub(FAUCET_AMT).unwrap();
|
||||
|
||||
faucet_tx.output.push(TxOut {
|
||||
value: FAUCET_AMT,
|
||||
script_pubkey: ext_spk,
|
||||
});
|
||||
faucet_tx.output.push(TxOut {
|
||||
value: change_amt,
|
||||
script_pubkey: change_spk,
|
||||
});
|
||||
faucet_tx.output.push(TxOut {
|
||||
value: Amount::from_sat(0),
|
||||
script_pubkey: data_spk,
|
||||
});
|
||||
|
||||
// dummy signature only used for fee estimation
|
||||
faucet_tx.input[0].witness.push([1; 64].to_vec());
|
||||
|
||||
let abs_fee = fee_rate
|
||||
.checked_mul(faucet_tx.weight().to_vbytes_ceil())
|
||||
.ok_or_else(|| Error::msg("Fee rate multiplication overflowed"))?;
|
||||
|
||||
// reset the witness to empty
|
||||
faucet_tx.input[0].witness = Witness::new();
|
||||
|
||||
faucet_tx.output[1].value -= abs_fee;
|
||||
|
||||
let first_tx_outputs = vec![core_tx.output[0].clone()];
|
||||
let prevouts = Prevouts::All(&first_tx_outputs);
|
||||
|
||||
let hash_ty = TapSighashType::Default;
|
||||
|
||||
let mut cache = SighashCache::new(&faucet_tx);
|
||||
|
||||
let sighash = cache.taproot_key_spend_signature_hash(0, &prevouts, hash_ty)?;
|
||||
|
||||
let msg = Secp256k1Message::from_digest(sighash.into_32());
|
||||
|
||||
let sig = secp.sign_schnorr_with_rng(&msg, &keypair, &mut thread_rng());
|
||||
let final_sig = Signature { sig, hash_ty };
|
||||
|
||||
faucet_tx.input[0].witness.push(final_sig.to_vec());
|
||||
|
||||
{
|
||||
let daemon = DAEMON
|
||||
.get()
|
||||
.ok_or(Error::msg("DAEMON not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
// We don't worry about core_tx being refused by core
|
||||
daemon.broadcast(&core_tx)?;
|
||||
daemon.test_mempool_accept(&faucet_tx)?;
|
||||
let txid = daemon.broadcast(&faucet_tx)?;
|
||||
log::debug!("Sent tx {}", txid);
|
||||
}
|
||||
|
||||
let partial_tweak = compute_partial_tweak_to_transaction(&faucet_tx)?;
|
||||
|
||||
check_transaction_alone(sp_wallet, &faucet_tx, &partial_tweak)?;
|
||||
|
||||
Ok((faucet_tx, partial_tweak))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_faucet_request(msg: &FaucetMessage) -> Result<NewTxMessage> {
|
||||
let sp_address = SilentPaymentAddress::try_from(msg.sp_address.as_str())?;
|
||||
log::debug!("Sending bootstrap coins to {}", sp_address);
|
||||
// send bootstrap coins to this sp_address
|
||||
let (tx, partial_tweak) = faucet_send(sp_address, &msg.commitment)?;
|
||||
|
||||
Ok(NewTxMessage::new(
|
||||
serialize(&tx).to_lower_hex_string(),
|
||||
Some(partial_tweak.to_string()),
|
||||
))
|
||||
}
|
599
src/main.rs
Normal file
599
src/main.rs
Normal file
@ -0,0 +1,599 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
env,
|
||||
fmt::Debug,
|
||||
fs,
|
||||
io::{Read, Write},
|
||||
net::SocketAddr,
|
||||
path::PathBuf,
|
||||
str::FromStr,
|
||||
sync::{atomic::AtomicU32, Arc, Mutex, MutexGuard, OnceLock},
|
||||
};
|
||||
|
||||
use bitcoincore_rpc::{
|
||||
bitcoin::{hashes::Hash, secp256k1::SecretKey},
|
||||
json::{self as bitcoin_json},
|
||||
};
|
||||
use commit::{lock_members, MEMBERLIST};
|
||||
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
|
||||
use log::{debug, error, warn};
|
||||
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
|
||||
use scan::{check_transaction_alone, compute_partial_tweak_to_transaction};
|
||||
use sdk_common::network::{AnkFlag, NewTxMessage};
|
||||
use sdk_common::{
|
||||
network::HandshakeMessage,
|
||||
pcd::Member,
|
||||
process::{lock_processes, Process, CACHEDPROCESSES},
|
||||
serialization::{OutPointMemberMap, OutPointProcessMap},
|
||||
silentpayments::SpWallet,
|
||||
sp_client::{
|
||||
bitcoin::{
|
||||
consensus::deserialize,
|
||||
hex::{DisplayHex, FromHex},
|
||||
Amount, Network, Transaction,
|
||||
},
|
||||
silentpayments::SilentPaymentAddress,
|
||||
},
|
||||
MutexExt,
|
||||
};
|
||||
use sdk_common::{
|
||||
sp_client::{
|
||||
bitcoin::{secp256k1::rand::thread_rng, OutPoint},
|
||||
OutputSpendStatus, SpClient, SpendKey,
|
||||
},
|
||||
updates::{init_update_sink, NativeUpdateSink, StateUpdate},
|
||||
};
|
||||
|
||||
use serde_json::Value;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
use zeromq::{Socket, SocketRecv};
|
||||
|
||||
mod commit;
|
||||
mod config;
|
||||
mod daemon;
|
||||
mod faucet;
|
||||
mod message;
|
||||
mod scan;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::{
|
||||
daemon::{Daemon, RpcCall},
|
||||
scan::scan_blocks,
|
||||
};
|
||||
|
||||
pub const WITH_CUTTHROUGH: bool = false; // We'd rather catch everything for this use case
|
||||
|
||||
type Tx = UnboundedSender<Message>;
|
||||
|
||||
type PeerMap = Mutex<HashMap<SocketAddr, Tx>>;
|
||||
|
||||
pub(crate) static PEERMAP: OnceLock<PeerMap> = OnceLock::new();
|
||||
|
||||
pub(crate) static DAEMON: OnceLock<Mutex<Box<dyn RpcCall>>> = OnceLock::new();
|
||||
|
||||
static CHAIN_TIP: AtomicU32 = AtomicU32::new(0);
|
||||
|
||||
pub static FREEZED_UTXOS: OnceLock<Mutex<HashSet<OutPoint>>> = OnceLock::new();
|
||||
|
||||
pub fn lock_freezed_utxos() -> Result<MutexGuard<'static, HashSet<OutPoint>>, Error> {
|
||||
FREEZED_UTXOS
|
||||
.get_or_init(|| Mutex::new(HashSet::new()))
|
||||
.lock_anyhow()
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StateFile {
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl StateFile {
|
||||
fn new(path: PathBuf) -> Self {
|
||||
Self { path }
|
||||
}
|
||||
|
||||
fn create(&self) -> Result<()> {
|
||||
let parent: PathBuf;
|
||||
if let Some(dir) = self.path.parent() {
|
||||
if !dir.ends_with(".4nk") {
|
||||
return Err(Error::msg("parent dir must be \".4nk\""));
|
||||
}
|
||||
parent = dir.to_path_buf();
|
||||
} else {
|
||||
return Err(Error::msg("wallet file has no parent dir"));
|
||||
}
|
||||
|
||||
// Ensure the parent directory exists
|
||||
if !parent.exists() {
|
||||
fs::create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
// Create a new file
|
||||
fs::File::create(&self.path)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn save(&self, json: &Value) -> Result<()> {
|
||||
let mut f = fs::File::options()
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.open(&self.path)?;
|
||||
|
||||
let stringified = serde_json::to_string(&json)?;
|
||||
let bin = stringified.as_bytes();
|
||||
f.write_all(bin)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load(&self) -> Result<Value> {
|
||||
let mut f = fs::File::open(&self.path)?;
|
||||
|
||||
let mut content = vec![];
|
||||
f.read_to_end(&mut content)?;
|
||||
|
||||
let res: Value = serde_json::from_slice(&content)?;
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DiskStorage {
|
||||
pub wallet_file: StateFile,
|
||||
pub processes_file: StateFile,
|
||||
pub members_file: StateFile,
|
||||
}
|
||||
|
||||
pub static STORAGE: OnceLock<Mutex<DiskStorage>> = OnceLock::new();
|
||||
|
||||
const FAUCET_AMT: Amount = Amount::from_sat(10_000);
|
||||
|
||||
pub(crate) static WALLET: OnceLock<Mutex<SpWallet>> = OnceLock::new();
|
||||
|
||||
fn handle_new_tx_request(new_tx_msg: &NewTxMessage) -> Result<()> {
|
||||
let tx = deserialize::<Transaction>(&Vec::from_hex(&new_tx_msg.transaction)?)?;
|
||||
|
||||
let daemon = DAEMON.get().unwrap().lock_anyhow()?;
|
||||
daemon.test_mempool_accept(&tx)?;
|
||||
daemon.broadcast(&tx)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_connection(
|
||||
raw_stream: TcpStream,
|
||||
addr: SocketAddr,
|
||||
our_sp_address: SilentPaymentAddress,
|
||||
) {
|
||||
debug!("Incoming TCP connection from: {}", addr);
|
||||
|
||||
let peers = PEERMAP.get().expect("Peer Map not initialized");
|
||||
|
||||
let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await {
|
||||
Ok(stream) => {
|
||||
debug!("WebSocket connection established");
|
||||
stream
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("WebSocket handshake failed for {}: {}", addr, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Insert the write part of this peer to the peer map.
|
||||
let (tx, rx) = unbounded_channel();
|
||||
match peers.lock_anyhow() {
|
||||
Ok(mut peer_map) => peer_map.insert(addr, tx),
|
||||
Err(e) => {
|
||||
log::error!("{}", e);
|
||||
panic!();
|
||||
}
|
||||
};
|
||||
|
||||
let processes = lock_processes().unwrap().clone();
|
||||
let members = lock_members().unwrap().clone();
|
||||
let current_tip = CHAIN_TIP.load(std::sync::atomic::Ordering::SeqCst);
|
||||
|
||||
let init_msg = HandshakeMessage::new(
|
||||
our_sp_address.to_string(),
|
||||
OutPointMemberMap(members),
|
||||
OutPointProcessMap(processes),
|
||||
current_tip.into(),
|
||||
);
|
||||
|
||||
if let Err(e) = broadcast_message(
|
||||
AnkFlag::Handshake,
|
||||
format!("{}", init_msg.to_string()),
|
||||
BroadcastType::Sender(addr),
|
||||
) {
|
||||
log::error!("Failed to send init message: {}", e);
|
||||
return;
|
||||
}
|
||||
|
||||
let (outgoing, incoming) = ws_stream.split();
|
||||
|
||||
let broadcast_incoming = incoming.try_for_each(|msg| {
|
||||
if let Ok(raw_msg) = msg.to_text() {
|
||||
// debug!("Received msg: {}", raw_msg);
|
||||
process_message(raw_msg, addr);
|
||||
} else {
|
||||
debug!("Received non-text message {} from peer {}", msg, addr);
|
||||
}
|
||||
future::ok(())
|
||||
});
|
||||
|
||||
let receive_from_others = UnboundedReceiverStream::new(rx)
|
||||
.map(Ok)
|
||||
.forward(outgoing)
|
||||
.map(|result| {
|
||||
if let Err(e) = result {
|
||||
debug!("Error sending message: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
pin_mut!(broadcast_incoming, receive_from_others);
|
||||
future::select(broadcast_incoming, receive_from_others).await;
|
||||
|
||||
debug!("{} disconnected", &addr);
|
||||
peers.lock().unwrap().remove(&addr);
|
||||
}
|
||||
|
||||
fn create_new_tx_message(transaction: Vec<u8>) -> Result<NewTxMessage> {
|
||||
let tx: Transaction = deserialize(&transaction)?;
|
||||
|
||||
if tx.is_coinbase() {
|
||||
return Err(Error::msg("Can't process coinbase transaction"));
|
||||
}
|
||||
|
||||
let partial_tweak = compute_partial_tweak_to_transaction(&tx)?;
|
||||
|
||||
let sp_wallet = WALLET
|
||||
.get()
|
||||
.ok_or_else(|| Error::msg("Wallet not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
check_transaction_alone(sp_wallet, &tx, &partial_tweak)?;
|
||||
|
||||
Ok(NewTxMessage::new(
|
||||
transaction.to_lower_hex_string(),
|
||||
Some(partial_tweak.to_string()),
|
||||
))
|
||||
}
|
||||
|
||||
async fn handle_scan_updates(
|
||||
scan_rx: std::sync::mpsc::Receiver<sdk_common::updates::ScanProgress>,
|
||||
) {
|
||||
while let Ok(update) = scan_rx.recv() {
|
||||
log::debug!("Received scan update: {:?}", update);
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_state_updates(
|
||||
state_rx: std::sync::mpsc::Receiver<sdk_common::updates::StateUpdate>,
|
||||
) {
|
||||
while let Ok(update) = state_rx.recv() {
|
||||
match update {
|
||||
StateUpdate::Update {
|
||||
blkheight,
|
||||
blkhash,
|
||||
found_outputs,
|
||||
found_inputs,
|
||||
} => {
|
||||
// We update the wallet with found outputs and inputs
|
||||
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
||||
// inputs first
|
||||
for outpoint in found_inputs {
|
||||
sp_wallet.mark_output_mined(&outpoint, blkhash);
|
||||
}
|
||||
sp_wallet.get_mut_outputs().extend(found_outputs);
|
||||
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
||||
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
||||
STORAGE
|
||||
.get()
|
||||
.unwrap()
|
||||
.lock_anyhow()
|
||||
.unwrap()
|
||||
.wallet_file
|
||||
.save(&json)
|
||||
.unwrap();
|
||||
}
|
||||
StateUpdate::NoUpdate { blkheight } => {
|
||||
// We just keep the current height to update the last_scan
|
||||
debug!("No update, setting last scan at {}", blkheight);
|
||||
let mut sp_wallet = WALLET.get().unwrap().lock_anyhow().unwrap();
|
||||
sp_wallet.set_last_scan(blkheight.to_consensus_u32());
|
||||
let json = serde_json::to_value(sp_wallet.clone()).unwrap();
|
||||
STORAGE
|
||||
.get()
|
||||
.unwrap()
|
||||
.lock_anyhow()
|
||||
.unwrap()
|
||||
.wallet_file
|
||||
.save(&json)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_zmq(zmq_url: String, blindbit_url: String) {
|
||||
debug!("Starting listening on Core");
|
||||
let mut socket = zeromq::SubSocket::new();
|
||||
socket.connect(&zmq_url).await.unwrap();
|
||||
socket.subscribe("rawtx").await.unwrap();
|
||||
socket.subscribe("hashblock").await.unwrap();
|
||||
loop {
|
||||
let core_msg = match socket.recv().await {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
error!("Zmq error: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
debug!("Received a message");
|
||||
|
||||
let payload: String = if let (Some(topic), Some(data)) = (core_msg.get(0), core_msg.get(1))
|
||||
{
|
||||
debug!("topic: {}", std::str::from_utf8(&topic).unwrap());
|
||||
match std::str::from_utf8(&topic) {
|
||||
Ok("rawtx") => match create_new_tx_message(data.to_vec()) {
|
||||
Ok(m) => {
|
||||
debug!("Created message");
|
||||
serde_json::to_string(&m).expect("This shouldn't fail")
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{}", e);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Ok("hashblock") => {
|
||||
let current_height = DAEMON
|
||||
.get()
|
||||
.unwrap()
|
||||
.lock_anyhow()
|
||||
.unwrap()
|
||||
.get_current_height()
|
||||
.unwrap();
|
||||
CHAIN_TIP.store(current_height as u32, std::sync::atomic::Ordering::SeqCst);
|
||||
|
||||
// Add retry logic for hashblock processing
|
||||
let mut retry_count = 0;
|
||||
const MAX_RETRIES: u32 = 4;
|
||||
const RETRY_DELAY_MS: u64 = 1000; // 1 second initial delay
|
||||
|
||||
loop {
|
||||
match scan_blocks(0, &blindbit_url).await {
|
||||
Ok(_) => {
|
||||
debug!("Successfully scanned blocks after {} retries", retry_count);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
retry_count += 1;
|
||||
if retry_count >= MAX_RETRIES {
|
||||
error!(
|
||||
"Failed to scan blocks after {} retries: {}",
|
||||
MAX_RETRIES, e
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
// Exponential backoff: 1s, 2s, 4s
|
||||
let delay_ms = RETRY_DELAY_MS * (1 << (retry_count - 1));
|
||||
warn!(
|
||||
"Scan failed (attempt {}/{}), retrying in {}ms: {}",
|
||||
retry_count, MAX_RETRIES, delay_ms, e
|
||||
);
|
||||
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
_ => {
|
||||
error!("Unexpected message in zmq");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("Empty message");
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Err(e) = broadcast_message(AnkFlag::NewTx, payload, BroadcastType::ToAll) {
|
||||
log::error!("{}", e.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() -> Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
// todo: take the path to conf file as argument
|
||||
// default to "./.conf"
|
||||
let config = Config::read_from_file(".conf")?;
|
||||
|
||||
if config.network == Network::Bitcoin {
|
||||
warn!("Running on mainnet, you're on your own");
|
||||
}
|
||||
|
||||
MESSAGECACHE
|
||||
.set(MessageCache::new())
|
||||
.expect("Message Cache initialization failed");
|
||||
|
||||
PEERMAP
|
||||
.set(PeerMap::new(HashMap::new()))
|
||||
.expect("PeerMap initialization failed");
|
||||
|
||||
// Connect the rpc daemon
|
||||
DAEMON
|
||||
.set(Mutex::new(Box::new(Daemon::connect(
|
||||
config.core_wallet,
|
||||
config.core_url,
|
||||
config.network,
|
||||
)?)))
|
||||
.expect("DAEMON initialization failed");
|
||||
|
||||
let current_tip: u32 = DAEMON
|
||||
.get()
|
||||
.unwrap()
|
||||
.lock_anyhow()?
|
||||
.get_current_height()?
|
||||
.try_into()?;
|
||||
|
||||
// Set CHAIN_TIP
|
||||
CHAIN_TIP.store(current_tip, std::sync::atomic::Ordering::SeqCst);
|
||||
|
||||
let mut app_dir = PathBuf::from_str(&env::var("HOME")?)?;
|
||||
app_dir.push(config.data_dir);
|
||||
let mut wallet_file = app_dir.clone();
|
||||
wallet_file.push(&config.wallet_name);
|
||||
let mut processes_file = app_dir.clone();
|
||||
processes_file.push("processes");
|
||||
let mut members_file = app_dir.clone();
|
||||
members_file.push("members");
|
||||
|
||||
let wallet_file = StateFile::new(wallet_file);
|
||||
let processes_file = StateFile::new(processes_file);
|
||||
let members_file = StateFile::new(members_file);
|
||||
|
||||
// load an existing sp_wallet, or create a new one
|
||||
let sp_wallet: SpWallet = match wallet_file.load() {
|
||||
Ok(wallet) => {
|
||||
// TODO: Verify the wallet is compatible with the current network
|
||||
serde_json::from_value(wallet)?
|
||||
}
|
||||
Err(_) => {
|
||||
// Create a new wallet file if it doesn't exist or fails to load
|
||||
wallet_file.create()?;
|
||||
|
||||
let mut rng = thread_rng();
|
||||
|
||||
let new_client = SpClient::new(
|
||||
SecretKey::new(&mut rng),
|
||||
SpendKey::Secret(SecretKey::new(&mut rng)),
|
||||
config.network,
|
||||
)
|
||||
.expect("Failed to create a new SpClient");
|
||||
|
||||
let mut sp_wallet = SpWallet::new(new_client);
|
||||
|
||||
// Set birthday and update scan information
|
||||
sp_wallet.set_birthday(current_tip);
|
||||
sp_wallet.set_last_scan(current_tip);
|
||||
|
||||
// Save the newly created wallet to disk
|
||||
let json = serde_json::to_value(sp_wallet.clone())?;
|
||||
wallet_file.save(&json)?;
|
||||
|
||||
sp_wallet
|
||||
}
|
||||
};
|
||||
|
||||
let cached_processes: HashMap<OutPoint, Process> = match processes_file.load() {
|
||||
Ok(processes) => {
|
||||
let deserialized: OutPointProcessMap = serde_json::from_value(processes)?;
|
||||
deserialized.0
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("creating process file at {}", processes_file.path.display());
|
||||
processes_file.create()?;
|
||||
|
||||
HashMap::new()
|
||||
}
|
||||
};
|
||||
|
||||
let members: HashMap<OutPoint, Member> = match members_file.load() {
|
||||
Ok(members) => {
|
||||
let deserialized: OutPointMemberMap = serde_json::from_value(members)?;
|
||||
deserialized.0
|
||||
}
|
||||
Err(_) => {
|
||||
debug!("creating members file at {}", members_file.path.display());
|
||||
members_file.create()?;
|
||||
|
||||
HashMap::new()
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
let utxo_to_freeze: HashSet<OutPoint> = cached_processes
|
||||
.iter()
|
||||
.map(|(_, process)| process.get_last_unspent_outpoint().unwrap())
|
||||
.collect();
|
||||
|
||||
let mut freezed_utxos = lock_freezed_utxos()?;
|
||||
*freezed_utxos = utxo_to_freeze;
|
||||
}
|
||||
|
||||
let our_sp_address = sp_wallet.get_sp_client().get_receiving_address();
|
||||
|
||||
log::info!("Using wallet with address {}", our_sp_address,);
|
||||
|
||||
log::info!(
|
||||
"Found {} outputs for a total balance of {}",
|
||||
sp_wallet.get_outputs().len(),
|
||||
sp_wallet.get_balance()
|
||||
);
|
||||
|
||||
let last_scan = sp_wallet.get_last_scan();
|
||||
|
||||
WALLET
|
||||
.set(Mutex::new(sp_wallet))
|
||||
.expect("Failed to initialize WALLET");
|
||||
|
||||
CACHEDPROCESSES
|
||||
.set(Mutex::new(cached_processes))
|
||||
.expect("Failed to initialize CACHEDPROCESSES");
|
||||
|
||||
MEMBERLIST
|
||||
.set(Mutex::new(members))
|
||||
.expect("Failed to initialize MEMBERLIST");
|
||||
|
||||
let storage = DiskStorage {
|
||||
wallet_file,
|
||||
processes_file,
|
||||
members_file,
|
||||
};
|
||||
|
||||
STORAGE.set(Mutex::new(storage)).unwrap();
|
||||
|
||||
let (sink, scan_rx, state_rx) = NativeUpdateSink::new();
|
||||
init_update_sink(Arc::new(sink));
|
||||
|
||||
// Spawn the update handlers
|
||||
tokio::spawn(handle_scan_updates(scan_rx));
|
||||
tokio::spawn(handle_state_updates(state_rx));
|
||||
|
||||
if last_scan < current_tip {
|
||||
log::info!("Scanning for our outputs");
|
||||
scan_blocks(current_tip - last_scan, &config.blindbit_url).await?;
|
||||
}
|
||||
|
||||
// Subscribe to Bitcoin Core
|
||||
let zmq_url = config.zmq_url.clone();
|
||||
let blindbit_url = config.blindbit_url.clone();
|
||||
tokio::spawn(async move {
|
||||
handle_zmq(zmq_url, blindbit_url).await;
|
||||
});
|
||||
|
||||
// Create the event loop and TCP listener we'll accept connections on.
|
||||
let try_socket = TcpListener::bind(config.ws_url).await;
|
||||
let listener = try_socket.expect("Failed to bind");
|
||||
|
||||
tokio::spawn(MessageCache::clean_up());
|
||||
|
||||
// Let's spawn the handling of each connection in a separate task.
|
||||
while let Ok((stream, addr)) = listener.accept().await {
|
||||
tokio::spawn(handle_connection(stream, addr, our_sp_address));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
234
src/message.rs
Normal file
234
src/message.rs
Normal file
@ -0,0 +1,234 @@
|
||||
use anyhow::{Error, Result};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::SocketAddr,
|
||||
sync::{Mutex, OnceLock},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::time;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
use sdk_common::network::{AnkFlag, CommitMessage, Envelope, FaucetMessage, NewTxMessage};
|
||||
|
||||
use crate::{
|
||||
commit::handle_commit_request, faucet::handle_faucet_request, handle_new_tx_request, PEERMAP,
|
||||
};
|
||||
|
||||
pub(crate) static MESSAGECACHE: OnceLock<MessageCache> = OnceLock::new();
|
||||
|
||||
const MESSAGECACHEDURATION: Duration = Duration::from_secs(20);
|
||||
const MESSAGECACHEINTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct MessageCache {
|
||||
store: Mutex<HashMap<String, Instant>>,
|
||||
}
|
||||
|
||||
impl MessageCache {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
store: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
fn insert(&self, key: String) {
|
||||
let mut store = self.store.lock().unwrap();
|
||||
store.insert(key, Instant::now());
|
||||
}
|
||||
|
||||
fn remove(&self, key: &str) {
|
||||
let mut store = self.store.lock().unwrap();
|
||||
store.remove(key);
|
||||
}
|
||||
|
||||
fn contains(&self, key: &str) -> bool {
|
||||
let store = self.store.lock().unwrap();
|
||||
store.contains_key(key)
|
||||
}
|
||||
|
||||
pub async fn clean_up() {
|
||||
let cache = MESSAGECACHE.get().unwrap();
|
||||
|
||||
let mut interval = time::interval(MESSAGECACHEINTERVAL);
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let mut store = cache.store.lock().unwrap();
|
||||
|
||||
let now = Instant::now();
|
||||
store.retain(|_, entrytime| now.duration_since(*entrytime) <= MESSAGECACHEDURATION);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum BroadcastType {
|
||||
Sender(SocketAddr),
|
||||
#[allow(dead_code)]
|
||||
ExcludeSender(SocketAddr),
|
||||
#[allow(dead_code)]
|
||||
ToAll,
|
||||
}
|
||||
|
||||
pub(crate) fn broadcast_message(
|
||||
flag: AnkFlag,
|
||||
payload: String,
|
||||
broadcast: BroadcastType,
|
||||
) -> Result<()> {
|
||||
let peers = PEERMAP.get().ok_or(Error::msg("Unitialized peer map"))?;
|
||||
let ank_msg = Envelope {
|
||||
flag,
|
||||
content: payload,
|
||||
};
|
||||
let msg = Message::Text(serde_json::to_string(&ank_msg)?);
|
||||
match ank_msg.flag {
|
||||
AnkFlag::Cipher => log::debug!("Broadcasting cipher"),
|
||||
AnkFlag::Handshake => log::debug!("Broadcasting handshake"),
|
||||
_ => log::debug!("Broadcasting {} message: {}", ank_msg.flag.as_str(), msg),
|
||||
}
|
||||
match broadcast {
|
||||
BroadcastType::Sender(addr) => {
|
||||
peers
|
||||
.lock()
|
||||
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
|
||||
.iter()
|
||||
.find(|(peer_addr, _)| peer_addr == &&addr)
|
||||
.ok_or(Error::msg("Failed to find the sender in the peer_map"))?
|
||||
.1
|
||||
.send(msg)?;
|
||||
}
|
||||
BroadcastType::ExcludeSender(addr) => {
|
||||
peers
|
||||
.lock()
|
||||
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
|
||||
.iter()
|
||||
.filter(|(peer_addr, _)| peer_addr != &&addr)
|
||||
.for_each(|(_, peer_tx)| {
|
||||
let _ = peer_tx.send(msg.clone());
|
||||
});
|
||||
}
|
||||
BroadcastType::ToAll => {
|
||||
peers
|
||||
.lock()
|
||||
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
|
||||
.iter()
|
||||
.for_each(|(_, peer_tx)| {
|
||||
let _ = peer_tx.send(msg.clone());
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_faucet_message(ank_msg: Envelope, addr: SocketAddr) {
|
||||
log::debug!("Received a faucet message");
|
||||
if let Ok(mut content) = serde_json::from_str::<FaucetMessage>(&ank_msg.content) {
|
||||
match handle_faucet_request(&content) {
|
||||
Ok(new_tx_msg) => {
|
||||
log::debug!(
|
||||
"Obtained new_tx_msg: {}",
|
||||
serde_json::to_string(&new_tx_msg).unwrap()
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to send faucet tx: {}", e);
|
||||
content.error = Some(e.into());
|
||||
let payload = serde_json::to_string(&content).expect("Message type shouldn't fail");
|
||||
if let Err(e) =
|
||||
broadcast_message(AnkFlag::Faucet, payload, BroadcastType::Sender(addr))
|
||||
{
|
||||
log::error!("Failed to broadcast message: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log::error!("Invalid content for faucet message");
|
||||
}
|
||||
}
|
||||
|
||||
fn process_new_tx_message(ank_msg: Envelope, addr: SocketAddr) {
|
||||
log::debug!("Received a new tx message");
|
||||
if let Ok(mut new_tx_msg) = serde_json::from_str::<NewTxMessage>(&ank_msg.content) {
|
||||
if let Err(e) = handle_new_tx_request(&mut new_tx_msg) {
|
||||
log::error!("handle_new_tx_request returned error: {}", e);
|
||||
new_tx_msg.error = Some(e.into());
|
||||
if let Err(e) = broadcast_message(
|
||||
AnkFlag::NewTx,
|
||||
serde_json::to_string(&new_tx_msg).expect("This shouldn't fail"),
|
||||
BroadcastType::Sender(addr),
|
||||
) {
|
||||
log::error!("Failed to broadcast message: {}", e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log::error!("Invalid content for new_tx message");
|
||||
}
|
||||
}
|
||||
|
||||
fn process_cipher_message(ank_msg: Envelope, addr: SocketAddr) {
|
||||
// For now we just send it to everyone
|
||||
log::debug!("Received a cipher message");
|
||||
|
||||
if let Err(e) = broadcast_message(
|
||||
AnkFlag::Cipher,
|
||||
ank_msg.content,
|
||||
BroadcastType::ExcludeSender(addr),
|
||||
) {
|
||||
log::error!("Failed to send message with error: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
fn process_commit_message(ank_msg: Envelope, addr: SocketAddr) {
|
||||
if let Ok(mut commit_msg) = serde_json::from_str::<CommitMessage>(&ank_msg.content) {
|
||||
match handle_commit_request(commit_msg.clone()) {
|
||||
Ok(new_outpoint) => log::debug!("Processed commit msg for outpoint {}", new_outpoint),
|
||||
Err(e) => {
|
||||
log::error!("handle_commit_request returned error: {}", e);
|
||||
// Temporary fix: we remove the message from the cache in case the client wants to try again
|
||||
let cache = MESSAGECACHE.get().expect("Cache should be initialized");
|
||||
cache.remove(ank_msg.to_string().as_str());
|
||||
commit_msg.error = Some(e.into());
|
||||
if let Err(e) = broadcast_message(
|
||||
AnkFlag::Commit,
|
||||
serde_json::to_string(&commit_msg).expect("This shouldn't fail"),
|
||||
BroadcastType::Sender(addr),
|
||||
) {
|
||||
log::error!("Failed to broadcast message: {}", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn process_unknown_message(ank_msg: Envelope, addr: SocketAddr) {
|
||||
log::debug!("Received an unknown message");
|
||||
if let Err(e) = broadcast_message(
|
||||
AnkFlag::Unknown,
|
||||
ank_msg.content,
|
||||
BroadcastType::ExcludeSender(addr),
|
||||
) {
|
||||
log::error!("Failed to send message with error: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn process_message(raw_msg: &str, addr: SocketAddr) {
|
||||
let cache = MESSAGECACHE.get().expect("Cache should be initialized");
|
||||
if cache.contains(raw_msg) {
|
||||
log::debug!("Message already processed, dropping");
|
||||
return;
|
||||
} else {
|
||||
cache.insert(raw_msg.to_owned());
|
||||
}
|
||||
match serde_json::from_str::<Envelope>(raw_msg) {
|
||||
Ok(ank_msg) => match ank_msg.flag {
|
||||
AnkFlag::Faucet => process_faucet_message(ank_msg, addr),
|
||||
AnkFlag::NewTx => process_new_tx_message(ank_msg, addr),
|
||||
AnkFlag::Cipher => process_cipher_message(ank_msg, addr),
|
||||
AnkFlag::Commit => process_commit_message(ank_msg, addr),
|
||||
AnkFlag::Unknown => process_unknown_message(ank_msg, addr),
|
||||
AnkFlag::Sync => todo!(),
|
||||
AnkFlag::Handshake => log::debug!("Received init message from {}", addr),
|
||||
},
|
||||
Err(_) => log::error!("Failed to parse network message"),
|
||||
}
|
||||
}
|
616
src/scan.rs
Normal file
616
src/scan.rs
Normal file
@ -0,0 +1,616 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::MutexGuard;
|
||||
|
||||
use anyhow::bail;
|
||||
use anyhow::{Error, Result};
|
||||
use bitcoincore_rpc::bitcoin::absolute::Height;
|
||||
use bitcoincore_rpc::bitcoin::hashes::sha256;
|
||||
use bitcoincore_rpc::bitcoin::hashes::Hash;
|
||||
use bitcoincore_rpc::bitcoin::Amount;
|
||||
use futures_util::Stream;
|
||||
use log::info;
|
||||
use sdk_common::silentpayments::SpWallet;
|
||||
use sdk_common::sp_client::bitcoin::bip158::BlockFilter;
|
||||
use sdk_common::sp_client::bitcoin::secp256k1::{All, PublicKey, Scalar, Secp256k1, SecretKey};
|
||||
use sdk_common::sp_client::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, XOnlyPublicKey};
|
||||
use sdk_common::sp_client::silentpayments::receiving::Receiver;
|
||||
use sdk_common::sp_client::silentpayments::utils::receiving::{
|
||||
calculate_tweak_data, get_pubkey_from_input,
|
||||
};
|
||||
use sdk_common::sp_client::BlockData;
|
||||
use sdk_common::sp_client::ChainBackend;
|
||||
use sdk_common::sp_client::FilterData;
|
||||
use sdk_common::sp_client::SpClient;
|
||||
use sdk_common::sp_client::Updater;
|
||||
use sdk_common::sp_client::{BlindbitBackend, OutputSpendStatus, OwnedOutput, SpScanner};
|
||||
use sdk_common::updates::StateUpdater;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::CHAIN_TIP;
|
||||
use crate::{MutexExt, DAEMON, STORAGE, WALLET, WITH_CUTTHROUGH};
|
||||
|
||||
pub fn compute_partial_tweak_to_transaction(tx: &Transaction) -> Result<PublicKey> {
|
||||
let daemon = DAEMON.get().ok_or(Error::msg("DAEMON not initialized"))?;
|
||||
let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len());
|
||||
let mut pubkeys: Vec<PublicKey> = Vec::with_capacity(tx.input.len());
|
||||
// TODO we should cache transactions to prevent multiple rpc request when transaction spends multiple outputs from the same tx
|
||||
for input in tx.input.iter() {
|
||||
outpoints.push((
|
||||
input.previous_output.txid.to_string(),
|
||||
input.previous_output.vout,
|
||||
));
|
||||
let prev_tx = daemon
|
||||
.lock_anyhow()?
|
||||
.get_transaction(&input.previous_output.txid, None)
|
||||
.map_err(|e| Error::msg(format!("Failed to find previous transaction: {}", e)))?;
|
||||
|
||||
if let Some(output) = prev_tx.output.get(input.previous_output.vout as usize) {
|
||||
match get_pubkey_from_input(
|
||||
&input.script_sig.to_bytes(),
|
||||
&input.witness.to_vec(),
|
||||
&output.script_pubkey.to_bytes(),
|
||||
) {
|
||||
Ok(Some(pubkey)) => pubkeys.push(pubkey),
|
||||
Ok(None) => continue,
|
||||
Err(e) => {
|
||||
return Err(Error::msg(format!(
|
||||
"Can't extract pubkey from input: {}",
|
||||
e
|
||||
)))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(Error::msg("Transaction with a non-existing input"));
|
||||
}
|
||||
}
|
||||
|
||||
let input_pub_keys: Vec<&PublicKey> = pubkeys.iter().collect();
|
||||
let partial_tweak = calculate_tweak_data(&input_pub_keys, &outpoints)?;
|
||||
Ok(partial_tweak)
|
||||
}
|
||||
|
||||
fn get_script_to_secret_map(
|
||||
sp_receiver: &Receiver,
|
||||
tweak_data_vec: Vec<String>,
|
||||
scan_key_scalar: Scalar,
|
||||
secp: &Secp256k1<All>,
|
||||
) -> Result<HashMap<[u8; 34], PublicKey>> {
|
||||
let mut res = HashMap::new();
|
||||
let shared_secrets: Result<Vec<PublicKey>> = tweak_data_vec
|
||||
.into_iter()
|
||||
.map(|s| {
|
||||
let x = PublicKey::from_str(&s).map_err(Error::new)?;
|
||||
x.mul_tweak(secp, &scan_key_scalar).map_err(Error::new)
|
||||
})
|
||||
.collect();
|
||||
let shared_secrets = shared_secrets?;
|
||||
|
||||
for shared_secret in shared_secrets {
|
||||
let spks = sp_receiver.get_spks_from_shared_secret(&shared_secret)?;
|
||||
|
||||
for spk in spks.into_values() {
|
||||
res.insert(spk, shared_secret);
|
||||
}
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub fn check_transaction_alone(
|
||||
mut wallet: MutexGuard<SpWallet>,
|
||||
tx: &Transaction,
|
||||
tweak_data: &PublicKey,
|
||||
) -> Result<HashMap<OutPoint, OwnedOutput>> {
|
||||
let updates = match wallet.update_with_transaction(tx, tweak_data, 0) {
|
||||
Ok(updates) => updates,
|
||||
Err(e) => {
|
||||
log::debug!("Error while checking transaction: {}", e);
|
||||
HashMap::new()
|
||||
}
|
||||
};
|
||||
|
||||
if updates.len() > 0 {
|
||||
let storage = STORAGE
|
||||
.get()
|
||||
.ok_or_else(|| Error::msg("Failed to get STORAGE"))?;
|
||||
storage
|
||||
.lock_anyhow()?
|
||||
.wallet_file
|
||||
.save(&serde_json::to_value(wallet.clone())?)?;
|
||||
}
|
||||
|
||||
Ok(updates)
|
||||
}
|
||||
|
||||
fn check_block(
|
||||
blkfilter: BlockFilter,
|
||||
blkhash: BlockHash,
|
||||
candidate_spks: Vec<&[u8; 34]>,
|
||||
owned_spks: Vec<Vec<u8>>,
|
||||
) -> Result<bool> {
|
||||
// check output scripts
|
||||
let mut scripts_to_match: Vec<_> = candidate_spks.into_iter().map(|spk| spk.as_ref()).collect();
|
||||
|
||||
// check input scripts
|
||||
scripts_to_match.extend(owned_spks.iter().map(|spk| spk.as_slice()));
|
||||
|
||||
// note: match will always return true for an empty query!
|
||||
if !scripts_to_match.is_empty() {
|
||||
Ok(blkfilter.match_any(&blkhash, &mut scripts_to_match.into_iter())?)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
fn scan_block_outputs(
|
||||
sp_receiver: &Receiver,
|
||||
txdata: &Vec<Transaction>,
|
||||
blkheight: u64,
|
||||
spk2secret: HashMap<[u8; 34], PublicKey>,
|
||||
) -> Result<HashMap<OutPoint, OwnedOutput>> {
|
||||
let mut res: HashMap<OutPoint, OwnedOutput> = HashMap::new();
|
||||
|
||||
// loop over outputs
|
||||
for tx in txdata {
|
||||
let txid = tx.txid();
|
||||
|
||||
// collect all taproot outputs from transaction
|
||||
let p2tr_outs: Vec<(usize, &TxOut)> = tx
|
||||
.output
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, o)| o.script_pubkey.is_p2tr())
|
||||
.collect();
|
||||
|
||||
if p2tr_outs.is_empty() {
|
||||
continue;
|
||||
}; // no taproot output
|
||||
|
||||
let mut secret: Option<PublicKey> = None;
|
||||
// Does this transaction contains one of the outputs we already found?
|
||||
for spk in p2tr_outs.iter().map(|(_, o)| &o.script_pubkey) {
|
||||
if let Some(s) = spk2secret.get(spk.as_bytes()) {
|
||||
// we might have at least one output in this transaction
|
||||
secret = Some(*s);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if secret.is_none() {
|
||||
continue;
|
||||
}; // we don't have a secret that matches any of the keys
|
||||
|
||||
// Now we can just run sp_receiver on all the p2tr outputs
|
||||
let xonlykeys: Result<Vec<XOnlyPublicKey>> = p2tr_outs
|
||||
.iter()
|
||||
.map(|(_, o)| {
|
||||
XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]).map_err(Error::new)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let ours = sp_receiver.scan_transaction(&secret.unwrap(), xonlykeys?)?;
|
||||
let height = Height::from_consensus(blkheight as u32)?;
|
||||
for (label, map) in ours {
|
||||
res.extend(p2tr_outs.iter().filter_map(|(i, o)| {
|
||||
match XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]) {
|
||||
Ok(key) => {
|
||||
if let Some(scalar) = map.get(&key) {
|
||||
match SecretKey::from_slice(&scalar.to_be_bytes()) {
|
||||
Ok(tweak) => {
|
||||
let outpoint = OutPoint {
|
||||
txid,
|
||||
vout: *i as u32,
|
||||
};
|
||||
return Some((
|
||||
outpoint,
|
||||
OwnedOutput {
|
||||
blockheight: height,
|
||||
tweak: tweak.secret_bytes(),
|
||||
amount: o.value,
|
||||
script: o.script_pubkey.clone(),
|
||||
label: label.clone(),
|
||||
spend_status: OutputSpendStatus::Unspent,
|
||||
},
|
||||
));
|
||||
}
|
||||
Err(_) => {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
Err(_) => None,
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn scan_block_inputs(
|
||||
our_outputs: &HashMap<OutPoint, OwnedOutput>,
|
||||
txdata: Vec<Transaction>,
|
||||
) -> Result<Vec<OutPoint>> {
|
||||
let mut found = vec![];
|
||||
|
||||
for tx in txdata {
|
||||
for input in tx.input {
|
||||
let prevout = input.previous_output;
|
||||
|
||||
if our_outputs.contains_key(&prevout) {
|
||||
found.push(prevout);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(found)
|
||||
}
|
||||
|
||||
pub struct NativeSpScanner<'a> {
|
||||
updater: Box<dyn Updater + Sync + Send>,
|
||||
backend: Box<dyn ChainBackend + Sync + Send>,
|
||||
client: SpClient,
|
||||
keep_scanning: &'a AtomicBool, // used to interrupt scanning
|
||||
owned_outpoints: HashSet<OutPoint>, // used to scan block inputs
|
||||
}
|
||||
|
||||
impl<'a> NativeSpScanner<'a> {
|
||||
pub fn new(
|
||||
client: SpClient,
|
||||
updater: Box<dyn Updater + Sync + Send>,
|
||||
backend: Box<dyn ChainBackend + Sync + Send>,
|
||||
owned_outpoints: HashSet<OutPoint>,
|
||||
keep_scanning: &'a AtomicBool,
|
||||
) -> Self {
|
||||
Self {
|
||||
client,
|
||||
updater,
|
||||
backend,
|
||||
owned_outpoints,
|
||||
keep_scanning,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process_blocks(
|
||||
&mut self,
|
||||
start: Height,
|
||||
end: Height,
|
||||
block_data_stream: impl Stream<Item = Result<BlockData>> + Unpin + Send,
|
||||
) -> Result<()> {
|
||||
use sdk_common::sp_client::futures::StreamExt;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
let mut update_time = Instant::now();
|
||||
let mut stream = block_data_stream;
|
||||
|
||||
while let Some(blockdata) = stream.next().await {
|
||||
let blockdata = blockdata?;
|
||||
let blkheight = blockdata.blkheight;
|
||||
let blkhash = blockdata.blkhash;
|
||||
|
||||
// stop scanning and return if interrupted
|
||||
if self.should_interrupt() {
|
||||
self.save_state()?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut save_to_storage = false;
|
||||
|
||||
// always save on last block or after 30 seconds since last save
|
||||
if blkheight == end || update_time.elapsed() > Duration::from_secs(30) {
|
||||
save_to_storage = true;
|
||||
}
|
||||
|
||||
let (found_outputs, found_inputs) = self.process_block(blockdata).await?;
|
||||
|
||||
if !found_outputs.is_empty() {
|
||||
save_to_storage = true;
|
||||
self.record_outputs(blkheight, blkhash, found_outputs)?;
|
||||
}
|
||||
|
||||
if !found_inputs.is_empty() {
|
||||
save_to_storage = true;
|
||||
self.record_inputs(blkheight, blkhash, found_inputs)?;
|
||||
}
|
||||
|
||||
// tell the updater we scanned this block
|
||||
self.record_progress(start, blkheight, end)?;
|
||||
|
||||
if save_to_storage {
|
||||
self.save_state()?;
|
||||
update_time = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<'a> SpScanner for NativeSpScanner<'a> {
|
||||
async fn scan_blocks(
|
||||
&mut self,
|
||||
start: Height,
|
||||
end: Height,
|
||||
dust_limit: Amount,
|
||||
with_cutthrough: bool,
|
||||
) -> Result<()> {
|
||||
if start > end {
|
||||
bail!("bigger start than end: {} > {}", start, end);
|
||||
}
|
||||
|
||||
info!("start: {} end: {}", start, end);
|
||||
let start_time: Instant = Instant::now();
|
||||
|
||||
// get block data stream
|
||||
let range = start.to_consensus_u32()..=end.to_consensus_u32();
|
||||
let block_data_stream = self.get_block_data_stream(range, dust_limit, with_cutthrough);
|
||||
|
||||
// process blocks using block data stream
|
||||
self.process_blocks(start, end, block_data_stream).await?;
|
||||
|
||||
// time elapsed for the scan
|
||||
info!(
|
||||
"Blindbit scan complete in {} seconds",
|
||||
start_time.elapsed().as_secs()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_block(
|
||||
&mut self,
|
||||
blockdata: BlockData,
|
||||
) -> Result<(HashMap<OutPoint, OwnedOutput>, HashSet<OutPoint>)> {
|
||||
let BlockData {
|
||||
blkheight,
|
||||
tweaks,
|
||||
new_utxo_filter,
|
||||
spent_filter,
|
||||
..
|
||||
} = blockdata;
|
||||
|
||||
let outs = self
|
||||
.process_block_outputs(blkheight, tweaks, new_utxo_filter)
|
||||
.await?;
|
||||
|
||||
// after processing outputs, we add the found outputs to our list
|
||||
self.owned_outpoints.extend(outs.keys());
|
||||
|
||||
let ins = self.process_block_inputs(blkheight, spent_filter).await?;
|
||||
|
||||
// after processing inputs, we remove the found inputs
|
||||
self.owned_outpoints.retain(|item| !ins.contains(item));
|
||||
|
||||
Ok((outs, ins))
|
||||
}
|
||||
|
||||
async fn process_block_outputs(
|
||||
&self,
|
||||
blkheight: Height,
|
||||
tweaks: Vec<PublicKey>,
|
||||
new_utxo_filter: FilterData,
|
||||
) -> Result<HashMap<OutPoint, OwnedOutput>> {
|
||||
let mut res = HashMap::new();
|
||||
|
||||
if !tweaks.is_empty() {
|
||||
let secrets_map = self.client.get_script_to_secret_map(tweaks)?;
|
||||
|
||||
//last_scan = last_scan.max(n as u32);
|
||||
let candidate_spks: Vec<&[u8; 34]> = secrets_map.keys().collect();
|
||||
|
||||
//get block gcs & check match
|
||||
let blkfilter = BlockFilter::new(&new_utxo_filter.data);
|
||||
let blkhash = new_utxo_filter.block_hash;
|
||||
|
||||
let matched_outputs = Self::check_block_outputs(blkfilter, blkhash, candidate_spks)?;
|
||||
|
||||
//if match: fetch and scan utxos
|
||||
if matched_outputs {
|
||||
info!("matched outputs on: {}", blkheight);
|
||||
let found = self.scan_utxos(blkheight, secrets_map).await?;
|
||||
|
||||
if !found.is_empty() {
|
||||
for (label, utxo, tweak) in found {
|
||||
let outpoint = OutPoint {
|
||||
txid: utxo.txid,
|
||||
vout: utxo.vout,
|
||||
};
|
||||
|
||||
let out = OwnedOutput {
|
||||
blockheight: blkheight,
|
||||
tweak: tweak.to_be_bytes(),
|
||||
amount: utxo.value,
|
||||
script: utxo.scriptpubkey,
|
||||
label,
|
||||
spend_status: OutputSpendStatus::Unspent,
|
||||
};
|
||||
|
||||
res.insert(outpoint, out);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn process_block_inputs(
|
||||
&self,
|
||||
blkheight: Height,
|
||||
spent_filter: FilterData,
|
||||
) -> Result<HashSet<OutPoint>> {
|
||||
let mut res = HashSet::new();
|
||||
|
||||
let blkhash = spent_filter.block_hash;
|
||||
|
||||
// first get the 8-byte hashes used to construct the input filter
|
||||
let input_hashes_map = self.get_input_hashes(blkhash)?;
|
||||
|
||||
// check against filter
|
||||
let blkfilter = BlockFilter::new(&spent_filter.data);
|
||||
let matched_inputs = self.check_block_inputs(
|
||||
blkfilter,
|
||||
blkhash,
|
||||
input_hashes_map.keys().cloned().collect(),
|
||||
)?;
|
||||
|
||||
// if match: download spent data, collect the outpoints that are spent
|
||||
if matched_inputs {
|
||||
info!("matched inputs on: {}", blkheight);
|
||||
let spent = self.backend.spent_index(blkheight).await?.data;
|
||||
|
||||
for spent in spent {
|
||||
let hex: &[u8] = spent.as_ref();
|
||||
|
||||
if let Some(outpoint) = input_hashes_map.get(hex) {
|
||||
res.insert(*outpoint);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn get_block_data_stream(
|
||||
&self,
|
||||
range: std::ops::RangeInclusive<u32>,
|
||||
dust_limit: Amount,
|
||||
with_cutthrough: bool,
|
||||
) -> std::pin::Pin<Box<dyn Stream<Item = Result<BlockData>> + Send>> {
|
||||
self.backend
|
||||
.get_block_data_for_range(range, dust_limit, with_cutthrough)
|
||||
}
|
||||
|
||||
fn should_interrupt(&self) -> bool {
|
||||
!self
|
||||
.keep_scanning
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn save_state(&mut self) -> Result<()> {
|
||||
self.updater.save_to_persistent_storage()
|
||||
}
|
||||
|
||||
fn record_outputs(
|
||||
&mut self,
|
||||
height: Height,
|
||||
block_hash: BlockHash,
|
||||
outputs: HashMap<OutPoint, OwnedOutput>,
|
||||
) -> Result<()> {
|
||||
self.updater.record_block_outputs(height, block_hash, outputs)
|
||||
}
|
||||
|
||||
fn record_inputs(
|
||||
&mut self,
|
||||
height: Height,
|
||||
block_hash: BlockHash,
|
||||
inputs: HashSet<OutPoint>,
|
||||
) -> Result<()> {
|
||||
self.updater.record_block_inputs(height, block_hash, inputs)
|
||||
}
|
||||
|
||||
fn record_progress(&mut self, start: Height, current: Height, end: Height) -> Result<()> {
|
||||
self.updater.record_scan_progress(start, current, end)
|
||||
}
|
||||
|
||||
fn client(&self) -> &SpClient {
|
||||
&self.client
|
||||
}
|
||||
|
||||
fn backend(&self) -> &dyn ChainBackend {
|
||||
self.backend.as_ref()
|
||||
}
|
||||
|
||||
fn updater(&mut self) -> &mut dyn Updater {
|
||||
self.updater.as_mut()
|
||||
}
|
||||
|
||||
// Override the default get_input_hashes implementation to use owned_outpoints
|
||||
fn get_input_hashes(&self, blkhash: BlockHash) -> Result<HashMap<[u8; 8], OutPoint>> {
|
||||
let mut map: HashMap<[u8; 8], OutPoint> = HashMap::new();
|
||||
|
||||
for outpoint in &self.owned_outpoints {
|
||||
let mut arr = [0u8; 68];
|
||||
arr[..32].copy_from_slice(&outpoint.txid.to_raw_hash().to_byte_array());
|
||||
arr[32..36].copy_from_slice(&outpoint.vout.to_le_bytes());
|
||||
arr[36..].copy_from_slice(&blkhash.to_byte_array());
|
||||
let hash = sha256::Hash::hash(&arr);
|
||||
|
||||
let mut res = [0u8; 8];
|
||||
res.copy_from_slice(&hash[..8]);
|
||||
|
||||
map.insert(res, outpoint.clone());
|
||||
}
|
||||
|
||||
Ok(map)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn scan_blocks(mut n_blocks_to_scan: u32, blindbit_url: &str) -> anyhow::Result<()> {
|
||||
log::info!("Starting a rescan");
|
||||
|
||||
// Get all the data we need upfront, before any async operations
|
||||
let (sp_wallet, scan_height, tip_height) = {
|
||||
let sp_wallet = WALLET
|
||||
.get()
|
||||
.ok_or(Error::msg("Wallet not initialized"))?
|
||||
.lock_anyhow()?;
|
||||
let scan_height = sp_wallet.get_last_scan();
|
||||
let tip_height: u32 = CHAIN_TIP.load(Ordering::Relaxed).try_into()?;
|
||||
(sp_wallet.clone(), scan_height, tip_height)
|
||||
};
|
||||
|
||||
// 0 means scan to tip
|
||||
if n_blocks_to_scan == 0 {
|
||||
n_blocks_to_scan = tip_height - scan_height;
|
||||
}
|
||||
|
||||
let start = scan_height + 1;
|
||||
let end = if scan_height + n_blocks_to_scan <= tip_height {
|
||||
scan_height + n_blocks_to_scan
|
||||
} else {
|
||||
tip_height
|
||||
};
|
||||
|
||||
if start > end {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let updater = StateUpdater::new();
|
||||
let backend = BlindbitBackend::new(blindbit_url.to_string())?;
|
||||
|
||||
let owned_outpoints = sp_wallet.get_unspent_outputs().keys().map(|o| *o).collect();
|
||||
|
||||
let keep_scanning = Arc::new(AtomicBool::new(true));
|
||||
|
||||
log::info!("start: {} end: {}", start, end);
|
||||
let start_time = Instant::now();
|
||||
let mut scanner = NativeSpScanner::new(
|
||||
sp_wallet.get_sp_client().clone(),
|
||||
Box::new(updater),
|
||||
Box::new(backend),
|
||||
owned_outpoints,
|
||||
&keep_scanning,
|
||||
);
|
||||
|
||||
let dust_limit = Amount::from_sat(0); // We don't really have a dust limit for this use case
|
||||
scanner
|
||||
.scan_blocks(
|
||||
Height::from_consensus(start)?,
|
||||
Height::from_consensus(end)?,
|
||||
dust_limit,
|
||||
WITH_CUTTHROUGH,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// time elapsed for the scan
|
||||
log::info!(
|
||||
"Scan complete in {} seconds",
|
||||
start_time.elapsed().as_secs()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user