Merge branch 'ws-server' into dev
This commit is contained in:
commit
0d9e8ba4e5
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
/target
|
1907
Cargo.lock
generated
Normal file
1907
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
21
Cargo.toml
Normal file
21
Cargo.toml
Normal file
@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "sdk_relay"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
bitcoincore-rpc = { version = "0.18" }
|
||||
electrum-client = { git = "https://github.com/cygnet3/rust-electrum-client", branch = "sp_tweaks" }
|
||||
env_logger = "0.9"
|
||||
futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
|
||||
hex = "0.4.3"
|
||||
log = "0.4.20"
|
||||
sdk_common = { git = "https://git.4nkweb.com/4nk/sdk_common.git", branch = "demo" }
|
||||
serde = { version = "1.0.193", features = ["derive"]}
|
||||
serde_json = "1.0"
|
||||
serde_with = "3.6.0"
|
||||
tokio = { version = "1.0.0", features = ["io-util", "rt-multi-thread", "macros", "sync"] }
|
||||
tokio-stream = "0.1"
|
||||
tokio-tungstenite = "0.21.0"
|
||||
zeromq = "0.3.5"
|
450
src/daemon.rs
Normal file
450
src/daemon.rs
Normal file
@ -0,0 +1,450 @@
|
||||
use anyhow::{Context, Error, Result};
|
||||
|
||||
use bitcoincore_rpc::json::{
|
||||
CreateRawTransactionInput, ListUnspentQueryOptions, ListUnspentResultEntry,
|
||||
WalletCreateFundedPsbtOptions,
|
||||
};
|
||||
use bitcoincore_rpc::{json, jsonrpc, Auth, Client, RpcApi};
|
||||
use sdk_common::sp_client::bitcoin::bip158::BlockFilter;
|
||||
use sdk_common::sp_client::bitcoin::{
|
||||
block, Address, Amount, Block, BlockHash, Network, OutPoint, Psbt, ScriptBuf, Sequence,
|
||||
Transaction, TxIn, TxOut, Txid,
|
||||
};
|
||||
use sdk_common::sp_client::bitcoin::{consensus::deserialize, hashes::hex::FromHex};
|
||||
// use crossbeam_channel::Receiver;
|
||||
// use parking_lot::Mutex;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::fs::File;
|
||||
use std::io::Read;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::FAUCET_AMT;
|
||||
|
||||
pub struct SensitiveAuth(pub Auth);
|
||||
|
||||
impl SensitiveAuth {
|
||||
pub(crate) fn get_auth(&self) -> Auth {
|
||||
self.0.clone()
|
||||
}
|
||||
}
|
||||
|
||||
enum PollResult {
|
||||
Done(Result<()>),
|
||||
Retry,
|
||||
}
|
||||
|
||||
fn rpc_poll(client: &mut Client, skip_block_download_wait: bool) -> PollResult {
|
||||
match client.get_blockchain_info() {
|
||||
Ok(info) => {
|
||||
if skip_block_download_wait {
|
||||
// bitcoind RPC is available, don't wait for block download to finish
|
||||
return PollResult::Done(Ok(()));
|
||||
}
|
||||
let left_blocks = info.headers - info.blocks;
|
||||
if info.initial_block_download || left_blocks > 0 {
|
||||
log::info!(
|
||||
"waiting for {} blocks to download{}",
|
||||
left_blocks,
|
||||
if info.initial_block_download {
|
||||
" (IBD)"
|
||||
} else {
|
||||
""
|
||||
}
|
||||
);
|
||||
return PollResult::Retry;
|
||||
}
|
||||
PollResult::Done(Ok(()))
|
||||
}
|
||||
Err(err) => {
|
||||
if let Some(e) = extract_bitcoind_error(&err) {
|
||||
if e.code == -28 {
|
||||
log::debug!("waiting for RPC warmup: {}", e.message);
|
||||
return PollResult::Retry;
|
||||
}
|
||||
}
|
||||
PollResult::Done(Err(err).context("daemon not available"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn read_cookie(path: &Path) -> Result<(String, String)> {
|
||||
// Load username and password from bitcoind cookie file:
|
||||
// * https://github.com/bitcoin/bitcoin/pull/6388/commits/71cbeaad9a929ba6a7b62d9b37a09b214ae00c1a
|
||||
// * https://bitcoin.stackexchange.com/questions/46782/rpc-cookie-authentication
|
||||
let mut file = File::open(path)
|
||||
.with_context(|| format!("failed to open bitcoind cookie file: {}", path.display()))?;
|
||||
let mut contents = String::new();
|
||||
file.read_to_string(&mut contents)
|
||||
.with_context(|| format!("failed to read bitcoind cookie from {}", path.display()))?;
|
||||
|
||||
let parts: Vec<&str> = contents.splitn(2, ':').collect();
|
||||
anyhow::ensure!(
|
||||
parts.len() == 2,
|
||||
"failed to parse bitcoind cookie - missing ':' separator"
|
||||
);
|
||||
Ok((parts[0].to_owned(), parts[1].to_owned()))
|
||||
}
|
||||
|
||||
fn rpc_connect(rpcwallet: Option<String>, network: Network, mut rpc_url: String) -> Result<Client> {
|
||||
match rpcwallet {
|
||||
Some(rpcwallet) => rpc_url.push_str(&rpcwallet),
|
||||
None => (),
|
||||
}
|
||||
|
||||
// Allow `wait_for_new_block` to take a bit longer before timing out.
|
||||
// See https://github.com/romanz/electrs/issues/495 for more details.
|
||||
let builder = jsonrpc::simple_http::SimpleHttpTransport::builder()
|
||||
.url(&rpc_url)?
|
||||
.timeout(Duration::from_secs(30));
|
||||
let home = env::var("HOME")?;
|
||||
let mut cookie_path = PathBuf::from_str(&home)?;
|
||||
cookie_path.push(".bitcoin");
|
||||
cookie_path.push(network.to_core_arg());
|
||||
cookie_path.push(".cookie");
|
||||
let daemon_auth = SensitiveAuth(Auth::CookieFile(cookie_path));
|
||||
let builder = match daemon_auth.get_auth() {
|
||||
Auth::None => builder,
|
||||
Auth::UserPass(user, pass) => builder.auth(user, Some(pass)),
|
||||
Auth::CookieFile(path) => {
|
||||
let (user, pass) = read_cookie(&path)?;
|
||||
builder.auth(user, Some(pass))
|
||||
}
|
||||
};
|
||||
Ok(Client::from_jsonrpc(jsonrpc::Client::with_transport(
|
||||
builder.build(),
|
||||
)))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Daemon {
|
||||
// p2p: Mutex<Connection>,
|
||||
rpc: Client,
|
||||
}
|
||||
|
||||
impl Daemon {
|
||||
pub(crate) fn connect(
|
||||
rpcwallet: Option<String>,
|
||||
rpc_url: String,
|
||||
network: Network,
|
||||
// config: &Config,
|
||||
// exit_flag: &ExitFlag,
|
||||
// metrics: &Metrics,
|
||||
) -> Result<Self> {
|
||||
let mut rpc = rpc_connect(rpcwallet, network, rpc_url)?;
|
||||
|
||||
loop {
|
||||
match rpc_poll(&mut rpc, false) {
|
||||
PollResult::Done(result) => {
|
||||
result.context("bitcoind RPC polling failed")?;
|
||||
break; // on success, finish polling
|
||||
}
|
||||
PollResult::Retry => {
|
||||
std::thread::sleep(std::time::Duration::from_secs(1)); // wait a bit before polling
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let network_info = rpc.get_network_info()?;
|
||||
// if network_info.version < 21_00_00 {
|
||||
// bail!("electrs requires bitcoind 0.21+");
|
||||
// }
|
||||
if !network_info.network_active {
|
||||
anyhow::bail!("electrs requires active bitcoind p2p network");
|
||||
}
|
||||
let info = rpc.get_blockchain_info()?;
|
||||
if info.pruned {
|
||||
anyhow::bail!("electrs requires non-pruned bitcoind node");
|
||||
}
|
||||
|
||||
// let p2p = tokio::sync::Mutex::new(Connection::connect(
|
||||
// config.network,
|
||||
// config.daemon_p2p_addr,
|
||||
// metrics,
|
||||
// config.signet_magic,
|
||||
// )?);
|
||||
Ok(Self { rpc })
|
||||
}
|
||||
|
||||
pub(crate) fn estimate_fee(&self, nblocks: u16) -> Result<Amount> {
|
||||
let res = self
|
||||
.rpc
|
||||
.estimate_smart_fee(nblocks, None)
|
||||
.context("failed to estimate fee")?;
|
||||
if res.errors.is_some() {
|
||||
Err(Error::msg(serde_json::to_string(&res.errors.unwrap())?))
|
||||
} else {
|
||||
Ok(res.fee_rate.unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_relay_fee(&self) -> Result<Amount> {
|
||||
Ok(self
|
||||
.rpc
|
||||
.get_network_info()
|
||||
.context("failed to get relay fee")?
|
||||
.relay_fee)
|
||||
}
|
||||
|
||||
pub(crate) fn get_current_height(&self) -> Result<u64> {
|
||||
Ok(self
|
||||
.rpc
|
||||
.get_block_count()
|
||||
.context("failed to get block count")?)
|
||||
}
|
||||
|
||||
pub(crate) fn get_block(&self, block_hash: BlockHash) -> Result<Block> {
|
||||
Ok(self
|
||||
.rpc
|
||||
.get_block(&block_hash)
|
||||
.context("failed to get block")?)
|
||||
}
|
||||
|
||||
pub(crate) fn get_filters(&self, block_height: u32) -> Result<(u32, BlockHash, BlockFilter)> {
|
||||
let block_hash = self.rpc.get_block_hash(block_height.try_into()?)?;
|
||||
let filter = self
|
||||
.rpc
|
||||
.get_block_filter(&block_hash)
|
||||
.context("failed to get block filter")?
|
||||
.into_filter();
|
||||
Ok((block_height, block_hash, filter))
|
||||
}
|
||||
|
||||
pub(crate) fn list_unspent_from_to(
|
||||
&self,
|
||||
minamt: Option<Amount>,
|
||||
) -> Result<Vec<json::ListUnspentResultEntry>> {
|
||||
let minimum_sum_amount = if minamt.is_none() || minamt <= FAUCET_AMT.checked_mul(2) {
|
||||
FAUCET_AMT.checked_mul(2)
|
||||
} else {
|
||||
minamt
|
||||
};
|
||||
Ok(self.rpc.list_unspent(
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Some(false),
|
||||
Some(ListUnspentQueryOptions {
|
||||
minimum_sum_amount,
|
||||
..Default::default()
|
||||
}),
|
||||
)?)
|
||||
}
|
||||
|
||||
pub(crate) fn create_psbt(
|
||||
&self,
|
||||
unspents: &[ListUnspentResultEntry],
|
||||
spk: ScriptBuf,
|
||||
network: Network,
|
||||
) -> Result<String> {
|
||||
let inputs: Vec<CreateRawTransactionInput> = unspents
|
||||
.iter()
|
||||
.map(|utxo| CreateRawTransactionInput {
|
||||
txid: utxo.txid,
|
||||
vout: utxo.vout,
|
||||
sequence: None,
|
||||
})
|
||||
.collect();
|
||||
let address = Address::from_script(&spk, network)?;
|
||||
let total_amt = unspents
|
||||
.iter()
|
||||
.fold(Amount::from_sat(0), |acc, x| acc + x.amount);
|
||||
|
||||
if total_amt < FAUCET_AMT {
|
||||
return Err(Error::msg("Not enought funds"));
|
||||
}
|
||||
|
||||
let mut outputs = HashMap::new();
|
||||
outputs.insert(address.to_string(), total_amt);
|
||||
|
||||
let options = WalletCreateFundedPsbtOptions {
|
||||
subtract_fee_from_outputs: vec![0],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let wallet_create_funded_result =
|
||||
self.rpc
|
||||
.wallet_create_funded_psbt(&inputs, &outputs, None, Some(options), None)?;
|
||||
|
||||
Ok(wallet_create_funded_result.psbt.to_string())
|
||||
}
|
||||
|
||||
pub(crate) fn process_psbt(&self, psbt: String) -> Result<String> {
|
||||
let processed_psbt = self.rpc.wallet_process_psbt(&psbt, None, None, None)?;
|
||||
match processed_psbt.complete {
|
||||
true => Ok(processed_psbt.psbt),
|
||||
false => Err(Error::msg("Failed to complete the psbt")),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn finalize_psbt(&self, psbt: String) -> Result<String> {
|
||||
let final_tx = self.rpc.finalize_psbt(&psbt, Some(false))?;
|
||||
|
||||
match final_tx.complete {
|
||||
true => Ok(final_tx
|
||||
.psbt
|
||||
.expect("We shouldn't have an empty psbt for a complete return")),
|
||||
false => Err(Error::msg("Failed to finalize psbt")),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_network(&self) -> Result<Network> {
|
||||
let blockchain_info = self.rpc.get_blockchain_info()?;
|
||||
|
||||
Ok(blockchain_info.chain)
|
||||
}
|
||||
|
||||
pub(crate) fn test_mempool_accept(
|
||||
&self,
|
||||
tx: &Transaction,
|
||||
) -> Result<crate::bitcoin_json::TestMempoolAcceptResult> {
|
||||
let res = self.rpc.test_mempool_accept(&vec![tx])?;
|
||||
|
||||
Ok(res.get(0).unwrap().clone())
|
||||
}
|
||||
|
||||
pub(crate) fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
|
||||
let txid = self.rpc.send_raw_transaction(tx)?;
|
||||
|
||||
Ok(txid)
|
||||
}
|
||||
|
||||
pub(crate) fn get_transaction_info(
|
||||
&self,
|
||||
txid: &Txid,
|
||||
blockhash: Option<BlockHash>,
|
||||
) -> Result<Value> {
|
||||
// No need to parse the resulting JSON, just return it as-is to the client.
|
||||
self.rpc
|
||||
.call(
|
||||
"getrawtransaction",
|
||||
&[json!(txid), json!(true), json!(blockhash)],
|
||||
)
|
||||
.context("failed to get transaction info")
|
||||
}
|
||||
|
||||
pub(crate) fn get_transaction_hex(
|
||||
&self,
|
||||
txid: &Txid,
|
||||
blockhash: Option<BlockHash>,
|
||||
) -> Result<Value> {
|
||||
use sdk_common::sp_client::bitcoin::consensus::serde::{hex::Lower, Hex, With};
|
||||
|
||||
let tx = self.get_transaction(txid, blockhash)?;
|
||||
#[derive(serde::Serialize)]
|
||||
#[serde(transparent)]
|
||||
struct TxAsHex(#[serde(with = "With::<Hex<Lower>>")] Transaction);
|
||||
serde_json::to_value(TxAsHex(tx)).map_err(Into::into)
|
||||
}
|
||||
|
||||
pub(crate) fn get_transaction(
|
||||
&self,
|
||||
txid: &Txid,
|
||||
blockhash: Option<BlockHash>,
|
||||
) -> Result<Transaction> {
|
||||
self.rpc
|
||||
.get_raw_transaction(txid, blockhash.as_ref())
|
||||
.context("failed to get transaction")
|
||||
}
|
||||
|
||||
pub(crate) fn get_block_txids(&self, blockhash: BlockHash) -> Result<Vec<Txid>> {
|
||||
Ok(self
|
||||
.rpc
|
||||
.get_block_info(&blockhash)
|
||||
.context("failed to get block txids")?
|
||||
.tx)
|
||||
}
|
||||
|
||||
pub(crate) fn get_mempool_txids(&self) -> Result<Vec<Txid>> {
|
||||
self.rpc
|
||||
.get_raw_mempool()
|
||||
.context("failed to get mempool txids")
|
||||
}
|
||||
|
||||
pub(crate) fn get_mempool_entries(
|
||||
&self,
|
||||
txids: &[Txid],
|
||||
) -> Result<Vec<Result<json::GetMempoolEntryResult>>> {
|
||||
let client = self.rpc.get_jsonrpc_client();
|
||||
log::debug!("getting {} mempool entries", txids.len());
|
||||
let args: Vec<_> = txids
|
||||
.iter()
|
||||
.map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()])
|
||||
.collect();
|
||||
let reqs: Vec<_> = args
|
||||
.iter()
|
||||
.map(|a| client.build_request("getmempoolentry", a))
|
||||
.collect();
|
||||
let res = client.send_batch(&reqs).context("batch request failed")?;
|
||||
log::debug!("got {} mempool entries", res.len());
|
||||
Ok(res
|
||||
.into_iter()
|
||||
.map(|r| {
|
||||
r.context("missing response")?
|
||||
.result::<json::GetMempoolEntryResult>()
|
||||
.context("invalid response")
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub(crate) fn get_mempool_transactions(
|
||||
&self,
|
||||
txids: &[Txid],
|
||||
) -> Result<Vec<Result<Transaction>>> {
|
||||
let client = self.rpc.get_jsonrpc_client();
|
||||
log::debug!("getting {} transactions", txids.len());
|
||||
let args: Vec<_> = txids
|
||||
.iter()
|
||||
.map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()])
|
||||
.collect();
|
||||
let reqs: Vec<_> = args
|
||||
.iter()
|
||||
.map(|a| client.build_request("getrawtransaction", a))
|
||||
.collect();
|
||||
let res = client.send_batch(&reqs).context("batch request failed")?;
|
||||
log::debug!("got {} mempool transactions", res.len());
|
||||
Ok(res
|
||||
.into_iter()
|
||||
.map(|r| -> Result<Transaction> {
|
||||
let tx_hex = r
|
||||
.context("missing response")?
|
||||
.result::<String>()
|
||||
.context("invalid response")?;
|
||||
let tx_bytes = Vec::from_hex(&tx_hex).context("non-hex transaction")?;
|
||||
deserialize(&tx_bytes).context("invalid transaction")
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
// pub(crate) fn get_new_headers(&self, chain: &Chain) -> Result<Vec<NewHeader>> {
|
||||
// self.p2p.lock().get_new_headers(chain)
|
||||
// }
|
||||
|
||||
// pub(crate) fn for_blocks<B, F>(&self, blockhashes: B, func: F) -> Result<()>
|
||||
// where
|
||||
// B: IntoIterator<Item = BlockHash>,
|
||||
// F: FnMut(BlockHash, SerBlock),
|
||||
// {
|
||||
// self.p2p.lock().for_blocks(blockhashes, func)
|
||||
// }
|
||||
|
||||
// pub(crate) fn new_block_notification(&self) -> Receiver<()> {
|
||||
// self.p2p.lock().new_block_notification()
|
||||
// }
|
||||
}
|
||||
|
||||
pub(crate) type RpcError = bitcoincore_rpc::jsonrpc::error::RpcError;
|
||||
|
||||
pub(crate) fn extract_bitcoind_error(err: &bitcoincore_rpc::Error) -> Option<&RpcError> {
|
||||
use bitcoincore_rpc::{
|
||||
jsonrpc::error::Error::Rpc as ServerError, Error::JsonRpc as JsonRpcError,
|
||||
};
|
||||
match err {
|
||||
JsonRpcError(ServerError(e)) => Some(e),
|
||||
_ => None,
|
||||
}
|
||||
}
|
15
src/electrumclient.rs
Normal file
15
src/electrumclient.rs
Normal file
@ -0,0 +1,15 @@
|
||||
use electrum_client::{Client, ConfigBuilder};
|
||||
use log::info;
|
||||
|
||||
const ELECTRS_URI: &str = "ssl://silentpayments.dev:51002";
|
||||
const VALIDATE_DOMAIN: bool = false; // self-signed cert, so we don't validate
|
||||
|
||||
pub fn create_electrum_client() -> anyhow::Result<Client> {
|
||||
let config = ConfigBuilder::new()
|
||||
.validate_domain(VALIDATE_DOMAIN)
|
||||
.build();
|
||||
let electrum_client = Client::from_config(ELECTRS_URI, config)?;
|
||||
info!("ssl client {}", ELECTRS_URI);
|
||||
|
||||
Ok(electrum_client)
|
||||
}
|
884
src/main.rs
Normal file
884
src/main.rs
Normal file
@ -0,0 +1,884 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
env,
|
||||
fmt::Debug,
|
||||
fs,
|
||||
net::SocketAddr,
|
||||
path::PathBuf,
|
||||
str::FromStr,
|
||||
sync::{Arc, Mutex, MutexGuard, OnceLock},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use bitcoincore_rpc::json::{self as bitcoin_json};
|
||||
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
|
||||
use log::{debug, error, info, warn};
|
||||
use sdk_common::sp_client::bitcoin::{
|
||||
absolute::LockTime,
|
||||
consensus::{deserialize, serialize},
|
||||
hex::{DisplayHex, FromHex},
|
||||
key::TapTweak,
|
||||
script::PushBytesBuf,
|
||||
sighash::{Prevouts, SighashCache},
|
||||
taproot::Signature,
|
||||
transaction::Version,
|
||||
Amount, Network, OutPoint, Psbt, ScriptBuf, TapSighashType, Transaction, TxIn, TxOut, Witness,
|
||||
XOnlyPublicKey,
|
||||
};
|
||||
use sdk_common::sp_client::{
|
||||
bitcoin::secp256k1::{
|
||||
rand::{thread_rng, Rng},
|
||||
Keypair, Message as Secp256k1Message, PublicKey, Secp256k1, ThirtyTwoByteHash,
|
||||
},
|
||||
spclient::SpWallet,
|
||||
};
|
||||
use sdk_common::{
|
||||
error::AnkError,
|
||||
network::{AnkFlag, AnkNetworkMsg, FaucetMessage, NewTxMessage},
|
||||
silentpayments::create_transaction_for_address_with_shared_secret,
|
||||
};
|
||||
|
||||
use sdk_common::sp_client::db::{JsonFile, Storage};
|
||||
use sdk_common::sp_client::silentpayments::sending::{
|
||||
generate_recipient_pubkeys, SilentPaymentAddress,
|
||||
};
|
||||
use sdk_common::sp_client::silentpayments::utils::receiving::{
|
||||
calculate_tweak_data, get_pubkey_from_input,
|
||||
};
|
||||
use sdk_common::sp_client::silentpayments::utils::sending::calculate_partial_secret;
|
||||
use sdk_common::sp_client::spclient::{derive_keys_from_seed, Recipient, SpClient, SpendKey};
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
||||
use tokio::{
|
||||
net::{TcpListener, TcpStream},
|
||||
time,
|
||||
};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
use zeromq::{Socket, SocketRecv};
|
||||
|
||||
mod daemon;
|
||||
mod electrumclient;
|
||||
mod scan;
|
||||
|
||||
use crate::{daemon::Daemon, scan::scan_blocks};
|
||||
|
||||
type Tx = UnboundedSender<Message>;
|
||||
|
||||
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
|
||||
|
||||
type SharedDaemon = Arc<Mutex<Daemon>>;
|
||||
|
||||
static MESSAGECACHE: OnceLock<MessageCache> = OnceLock::new();
|
||||
|
||||
const MESSAGECACHEDURATION: Duration = Duration::from_secs(10);
|
||||
const MESSAGECACHEINTERVAL: Duration = Duration::from_secs(2);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MessageCache {
|
||||
store: Mutex<HashMap<String, Instant>>,
|
||||
}
|
||||
|
||||
impl MessageCache {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
store: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
fn insert(&self, key: String) {
|
||||
let mut store = self.store.lock().unwrap();
|
||||
store.insert(key.clone(), Instant::now());
|
||||
}
|
||||
|
||||
fn contains(&self, key: &str) -> bool {
|
||||
let store = self.store.lock().unwrap();
|
||||
store.contains_key(key)
|
||||
}
|
||||
}
|
||||
|
||||
async fn clean_up() {
|
||||
let cache = MESSAGECACHE.get().unwrap();
|
||||
|
||||
let mut interval = time::interval(MESSAGECACHEINTERVAL);
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let mut store = cache.store.lock().unwrap();
|
||||
|
||||
let now = Instant::now();
|
||||
let to_rm: Vec<String> = store
|
||||
.iter()
|
||||
.filter_map(|(entry, entrytime)| {
|
||||
if let Some(duration) = now.checked_duration_since(*entrytime) {
|
||||
if duration > MESSAGECACHEDURATION {
|
||||
Some(entry.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
for key in to_rm {
|
||||
store.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const FAUCET_AMT: Amount = Amount::from_sat(100_000);
|
||||
|
||||
pub(crate) trait MutexExt<T> {
|
||||
fn lock_anyhow(&self) -> Result<MutexGuard<T>, Error>;
|
||||
}
|
||||
|
||||
impl<T: Debug> MutexExt<T> for Mutex<T> {
|
||||
fn lock_anyhow(&self) -> Result<MutexGuard<T>, Error> {
|
||||
self.lock()
|
||||
.map_err(|e| Error::msg(format!("Failed to lock: {}", e)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SilentPaymentWallet {
|
||||
sp_wallet: Mutex<SpWallet>,
|
||||
storage: Mutex<JsonFile>,
|
||||
}
|
||||
|
||||
impl SilentPaymentWallet {
|
||||
pub fn get_wallet(&self) -> Result<MutexGuard<SpWallet>> {
|
||||
self.sp_wallet.lock_anyhow()
|
||||
}
|
||||
|
||||
pub fn save(&self) -> Result<()> {
|
||||
self.storage.lock_anyhow()?.save(&self.sp_wallet)
|
||||
}
|
||||
}
|
||||
|
||||
enum BroadcastType {
|
||||
Sender(SocketAddr),
|
||||
#[allow(dead_code)]
|
||||
ExcludeSender(SocketAddr),
|
||||
#[allow(dead_code)]
|
||||
ToAll,
|
||||
}
|
||||
|
||||
fn broadcast_message(
|
||||
peers: PeerMap,
|
||||
flag: AnkFlag,
|
||||
payload: String,
|
||||
broadcast: BroadcastType,
|
||||
) -> Result<()> {
|
||||
let ank_msg = AnkNetworkMsg {
|
||||
flag,
|
||||
content: payload,
|
||||
};
|
||||
let msg = Message::Text(serde_json::to_string(&ank_msg)?);
|
||||
log::debug!("Broadcasting message: {}", msg);
|
||||
match broadcast {
|
||||
BroadcastType::Sender(addr) => {
|
||||
peers
|
||||
.lock()
|
||||
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
|
||||
.iter()
|
||||
.find(|(peer_addr, _)| peer_addr == &&addr)
|
||||
.ok_or(Error::msg("Failed to find the sender in the peer_map"))?
|
||||
.1
|
||||
.send(msg)?;
|
||||
}
|
||||
BroadcastType::ExcludeSender(addr) => {
|
||||
peers
|
||||
.lock()
|
||||
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
|
||||
.iter()
|
||||
.filter(|(peer_addr, _)| peer_addr != &&addr)
|
||||
.for_each(|(_, peer_tx)| {
|
||||
let _ = peer_tx.send(msg.clone());
|
||||
});
|
||||
}
|
||||
BroadcastType::ToAll => {
|
||||
peers
|
||||
.lock()
|
||||
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
|
||||
.iter()
|
||||
.for_each(|(_, peer_tx)| {
|
||||
let _ = peer_tx.send(msg.clone());
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn spend_from_core(
|
||||
dest: XOnlyPublicKey,
|
||||
daemon: Arc<Mutex<Daemon>>,
|
||||
) -> Result<(Transaction, Amount)> {
|
||||
let core = daemon
|
||||
.lock()
|
||||
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?;
|
||||
let unspent_list: Vec<bitcoin_json::ListUnspentResultEntry> =
|
||||
core.list_unspent_from_to(None)?;
|
||||
|
||||
if !unspent_list.is_empty() {
|
||||
let network = core.get_network()?;
|
||||
|
||||
let spk = ScriptBuf::new_p2tr_tweaked(dest.dangerous_assume_tweaked());
|
||||
|
||||
let new_psbt = core.create_psbt(&unspent_list, spk, network)?;
|
||||
let processed_psbt = core.process_psbt(new_psbt)?;
|
||||
let finalize_psbt_result = core.finalize_psbt(processed_psbt)?;
|
||||
let final_psbt = Psbt::from_str(&finalize_psbt_result)?;
|
||||
let total_fee = final_psbt.fee()?;
|
||||
let final_tx = final_psbt.extract_tx()?;
|
||||
let fee_rate = total_fee
|
||||
.checked_div(final_tx.weight().to_vbytes_ceil())
|
||||
.unwrap();
|
||||
|
||||
Ok((final_tx, fee_rate))
|
||||
} else {
|
||||
// we don't have enough available coins to pay for this faucet request
|
||||
Err(Error::msg("No spendable outputs"))
|
||||
}
|
||||
}
|
||||
|
||||
fn faucet_send(
|
||||
sp_address: SilentPaymentAddress,
|
||||
commitment: &str,
|
||||
sp_wallet: Arc<SilentPaymentWallet>,
|
||||
shared_daemon: SharedDaemon,
|
||||
) -> Result<Transaction> {
|
||||
let mut first_tx: Option<Transaction> = None;
|
||||
let final_tx: Transaction;
|
||||
|
||||
// do we have a sp output available ?
|
||||
let available_outpoints = sp_wallet.get_wallet()?.get_outputs().to_spendable_list();
|
||||
|
||||
let available_amt = available_outpoints
|
||||
.iter()
|
||||
.fold(Amount::from_sat(0), |acc, (_, x)| acc + x.amount);
|
||||
|
||||
// If we don't have at least 4 times the amount we need to send, we take some reserves out
|
||||
if available_amt > FAUCET_AMT.checked_mul(4).unwrap() {
|
||||
let mut total_amt = Amount::from_sat(0);
|
||||
let mut inputs = HashMap::new();
|
||||
for (outpoint, output) in available_outpoints {
|
||||
total_amt += output.amount;
|
||||
inputs.insert(outpoint, output);
|
||||
if total_amt >= FAUCET_AMT {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let recipient = Recipient {
|
||||
address: sp_address.into(),
|
||||
amount: FAUCET_AMT,
|
||||
nb_outputs: 1,
|
||||
};
|
||||
|
||||
let fee_estimate = shared_daemon
|
||||
.lock_anyhow()?
|
||||
.estimate_fee(6)
|
||||
.unwrap_or(Amount::from_sat(1000))
|
||||
.checked_div(1000)
|
||||
.unwrap();
|
||||
|
||||
log::debug!("fee estimate for 6 blocks: {}", fee_estimate);
|
||||
|
||||
let wallet = sp_wallet.get_wallet()?;
|
||||
|
||||
let signed_psbt = create_transaction_for_address_with_shared_secret(
|
||||
recipient,
|
||||
&wallet,
|
||||
Some(commitment),
|
||||
fee_estimate,
|
||||
)?;
|
||||
|
||||
let psbt = Psbt::from_str(&signed_psbt)?;
|
||||
|
||||
final_tx = psbt.extract_tx()?;
|
||||
} else {
|
||||
// let's try to spend directly from the mining address
|
||||
let secp = Secp256k1::signing_only();
|
||||
let keypair = Keypair::new(&secp, &mut thread_rng());
|
||||
|
||||
// we first spend from core to the pubkey we just created
|
||||
let (core_tx, fee_rate) =
|
||||
spend_from_core(keypair.x_only_public_key().0, shared_daemon.clone())?;
|
||||
|
||||
// check that the first output of the transaction pays to the key we just created
|
||||
debug_assert!(
|
||||
core_tx.output[0].script_pubkey
|
||||
== ScriptBuf::new_p2tr_tweaked(
|
||||
keypair.x_only_public_key().0.dangerous_assume_tweaked()
|
||||
)
|
||||
);
|
||||
|
||||
// This is ugly and can be streamlined
|
||||
// create a new transaction that spends the newly created UTXO to the sp_address
|
||||
let mut faucet_tx = Transaction {
|
||||
input: vec![TxIn {
|
||||
previous_output: OutPoint::new(core_tx.txid(), 0),
|
||||
..Default::default()
|
||||
}],
|
||||
output: vec![],
|
||||
version: Version::TWO,
|
||||
lock_time: LockTime::ZERO,
|
||||
};
|
||||
|
||||
// now do the silent payment operations with the final recipient address
|
||||
let partial_secret = calculate_partial_secret(
|
||||
&[(keypair.secret_key(), true)],
|
||||
&[(core_tx.txid().to_string(), 0)],
|
||||
)?;
|
||||
|
||||
let ext_output_key: XOnlyPublicKey =
|
||||
generate_recipient_pubkeys(vec![sp_address.into()], partial_secret)?
|
||||
.into_values()
|
||||
.flatten()
|
||||
.collect::<Vec<XOnlyPublicKey>>()
|
||||
.get(0)
|
||||
.expect("Failed to generate keys")
|
||||
.to_owned();
|
||||
let change_sp_address = sp_wallet.get_wallet()?.get_client().get_receiving_address();
|
||||
let change_output_key: XOnlyPublicKey =
|
||||
generate_recipient_pubkeys(vec![change_sp_address], partial_secret)?
|
||||
.into_values()
|
||||
.flatten()
|
||||
.collect::<Vec<XOnlyPublicKey>>()
|
||||
.get(0)
|
||||
.expect("Failed to generate keys")
|
||||
.to_owned();
|
||||
|
||||
let ext_spk = ScriptBuf::new_p2tr_tweaked(ext_output_key.dangerous_assume_tweaked());
|
||||
let change_spk = ScriptBuf::new_p2tr_tweaked(change_output_key.dangerous_assume_tweaked());
|
||||
|
||||
let mut op_return = PushBytesBuf::new();
|
||||
op_return.extend_from_slice(&Vec::from_hex(commitment)?)?;
|
||||
let data_spk = ScriptBuf::new_op_return(op_return);
|
||||
|
||||
// Take some margin to pay for the fees
|
||||
if core_tx.output[0].value < FAUCET_AMT * 4 {
|
||||
return Err(Error::msg("Not enough funds"));
|
||||
}
|
||||
|
||||
let change_amt = core_tx.output[0].value.checked_sub(FAUCET_AMT).unwrap();
|
||||
|
||||
faucet_tx.output.push(TxOut {
|
||||
value: FAUCET_AMT,
|
||||
script_pubkey: ext_spk,
|
||||
});
|
||||
faucet_tx.output.push(TxOut {
|
||||
value: change_amt,
|
||||
script_pubkey: change_spk,
|
||||
});
|
||||
faucet_tx.output.push(TxOut {
|
||||
value: Amount::from_sat(0),
|
||||
script_pubkey: data_spk,
|
||||
});
|
||||
|
||||
// dummy signature only used for fee estimation
|
||||
faucet_tx.input[0].witness.push([1; 64].to_vec());
|
||||
|
||||
let abs_fee = fee_rate
|
||||
.checked_mul(faucet_tx.weight().to_vbytes_ceil())
|
||||
.ok_or_else(|| Error::msg("Fee rate multiplication overflowed"))?;
|
||||
|
||||
// reset the witness to empty
|
||||
faucet_tx.input[0].witness = Witness::new();
|
||||
|
||||
faucet_tx.output[1].value -= abs_fee;
|
||||
|
||||
let first_tx_outputs = vec![core_tx.output[0].clone()];
|
||||
let prevouts = Prevouts::All(&first_tx_outputs);
|
||||
|
||||
let hash_ty = TapSighashType::Default;
|
||||
|
||||
let mut cache = SighashCache::new(&faucet_tx);
|
||||
|
||||
let sighash = cache.taproot_key_spend_signature_hash(0, &prevouts, hash_ty)?;
|
||||
|
||||
let msg = Secp256k1Message::from_digest(sighash.into_32());
|
||||
|
||||
let sig = secp.sign_schnorr_with_rng(&msg, &keypair, &mut thread_rng());
|
||||
let final_sig = Signature { sig, hash_ty };
|
||||
|
||||
faucet_tx.input[0].witness.push(final_sig.to_vec());
|
||||
|
||||
first_tx = Some(core_tx);
|
||||
|
||||
final_tx = faucet_tx;
|
||||
}
|
||||
|
||||
if let Ok(daemon) = shared_daemon.lock() {
|
||||
// broadcast one or two transactions
|
||||
if first_tx.is_some() {
|
||||
daemon.broadcast(&first_tx.unwrap())?;
|
||||
}
|
||||
let txid = daemon.broadcast(&final_tx)?;
|
||||
debug!("Sent tx {}", txid);
|
||||
} else {
|
||||
return Err(Error::msg("Failed to lock daemon"));
|
||||
}
|
||||
|
||||
Ok(final_tx)
|
||||
}
|
||||
|
||||
fn handle_faucet_request(
|
||||
msg: &FaucetMessage,
|
||||
sp_wallet: Arc<SilentPaymentWallet>,
|
||||
shared_daemon: SharedDaemon,
|
||||
) -> Result<NewTxMessage> {
|
||||
let sp_address = SilentPaymentAddress::try_from(msg.sp_address.as_str())?;
|
||||
debug!("Sending bootstrap coins to {}", sp_address);
|
||||
// send bootstrap coins to this sp_address
|
||||
let tx = faucet_send(
|
||||
sp_address,
|
||||
&msg.commitment,
|
||||
sp_wallet.clone(),
|
||||
shared_daemon.clone(),
|
||||
)?;
|
||||
|
||||
// get the tweak
|
||||
let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?;
|
||||
|
||||
// get current blockheight
|
||||
let blkheight: u32 = shared_daemon
|
||||
.lock_anyhow()?
|
||||
.get_current_height()?
|
||||
.try_into()?;
|
||||
|
||||
// update our sp_client with the change output(s)
|
||||
sp_wallet
|
||||
.get_wallet()?
|
||||
.update_wallet_with_transaction(&tx, blkheight, partial_tweak)?;
|
||||
|
||||
debug!("{:?}", sp_wallet);
|
||||
|
||||
debug!("updated the wallet");
|
||||
// save to disk
|
||||
sp_wallet.save()?;
|
||||
|
||||
debug!("saved the wallet");
|
||||
Ok(NewTxMessage::new(
|
||||
serialize(&tx).to_lower_hex_string(),
|
||||
Some(partial_tweak.to_string()),
|
||||
))
|
||||
}
|
||||
|
||||
fn handle_new_tx_request(new_tx_msg: &mut NewTxMessage, shared_daemon: SharedDaemon) -> Result<()> {
|
||||
let tx = deserialize::<Transaction>(&Vec::from_hex(&new_tx_msg.transaction)?)?;
|
||||
let mempool_accept = shared_daemon.lock_anyhow()?.test_mempool_accept(&tx)?;
|
||||
if !mempool_accept.allowed {
|
||||
return Err(AnkError::NewTxError(mempool_accept.reject_reason.unwrap()))?;
|
||||
}
|
||||
if new_tx_msg.tweak_data.is_none() {
|
||||
// we add the tweak_data
|
||||
let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?;
|
||||
new_tx_msg.tweak_data = Some(partial_tweak.to_string());
|
||||
}
|
||||
|
||||
// we try to broadcast it
|
||||
shared_daemon.lock_anyhow()?.broadcast(&tx)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_connection(
|
||||
peers: PeerMap,
|
||||
shared_daemon: SharedDaemon,
|
||||
sp_wallet: Arc<SilentPaymentWallet>,
|
||||
raw_stream: TcpStream,
|
||||
addr: SocketAddr,
|
||||
) {
|
||||
debug!("Incoming TCP connection from: {}", addr);
|
||||
|
||||
let ws_stream = tokio_tungstenite::accept_async(raw_stream)
|
||||
.await
|
||||
.expect("Error during the websocket handshake occurred");
|
||||
debug!("WebSocket connection established");
|
||||
|
||||
// Insert the write part of this peer to the peer map.
|
||||
let (tx, rx) = unbounded_channel();
|
||||
match peers.lock_anyhow() {
|
||||
Ok(mut peer_map) => peer_map.insert(addr, tx),
|
||||
Err(e) => {
|
||||
log::error!("{}", e);
|
||||
panic!();
|
||||
}
|
||||
};
|
||||
|
||||
let (outgoing, incoming) = ws_stream.split();
|
||||
|
||||
let broadcast_incoming = incoming.try_for_each(|msg| {
|
||||
let peers = peers.clone();
|
||||
if let Ok(raw_msg) = msg.to_text() {
|
||||
debug!("Received msg: {}", raw_msg);
|
||||
let cache = MESSAGECACHE.get().expect("Cache should be initialized");
|
||||
if cache.contains(raw_msg) {
|
||||
debug!("Message already processed, dropping");
|
||||
return future::ok(());
|
||||
} else {
|
||||
cache.insert(raw_msg.to_owned());
|
||||
}
|
||||
let parsed = serde_json::from_str::<AnkNetworkMsg>(raw_msg);
|
||||
match parsed {
|
||||
Ok(ank_msg) => match ank_msg.flag {
|
||||
AnkFlag::Faucet => {
|
||||
debug!("Received a faucet message");
|
||||
if let Ok(mut content) =
|
||||
serde_json::from_str::<FaucetMessage>(&ank_msg.content)
|
||||
{
|
||||
match handle_faucet_request(
|
||||
&content,
|
||||
sp_wallet.clone(),
|
||||
shared_daemon.clone(),
|
||||
) {
|
||||
Ok(new_tx_msg) => {
|
||||
debug!(
|
||||
"Obtained new_tx_msg: {}",
|
||||
serde_json::to_string(&new_tx_msg).unwrap()
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to send faucet tx: {}", e);
|
||||
content.error = Some(e.into());
|
||||
let payload = serde_json::to_string(&content)
|
||||
.expect("Message type shouldn't fail");
|
||||
if let Err(e) = broadcast_message(
|
||||
peers.clone(),
|
||||
AnkFlag::Faucet,
|
||||
payload,
|
||||
BroadcastType::Sender(addr),
|
||||
) {
|
||||
log::error!("Failed to broadcast message: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log::error!("Invalid content for faucet message");
|
||||
}
|
||||
}
|
||||
AnkFlag::NewTx => {
|
||||
debug!("Received a new tx message");
|
||||
if let Ok(mut new_tx_msg) =
|
||||
serde_json::from_str::<NewTxMessage>(&ank_msg.content)
|
||||
{
|
||||
match handle_new_tx_request(&mut new_tx_msg, shared_daemon.clone()) {
|
||||
Ok(new_tx_msg) => {
|
||||
// Repeat the msg to all except sender
|
||||
if let Err(e) = broadcast_message(
|
||||
peers.clone(),
|
||||
AnkFlag::NewTx,
|
||||
serde_json::to_string(&new_tx_msg)
|
||||
.expect("This should not fail"),
|
||||
BroadcastType::ExcludeSender(addr),
|
||||
) {
|
||||
log::error!("Failed to send message with error: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("handle_new_tx_request returned error: {}", e);
|
||||
new_tx_msg.error = Some(e.into());
|
||||
if let Err(e) = broadcast_message(
|
||||
peers.clone(),
|
||||
AnkFlag::NewTx,
|
||||
serde_json::to_string(&new_tx_msg)
|
||||
.expect("This shouldn't fail"),
|
||||
BroadcastType::Sender(addr),
|
||||
) {
|
||||
log::error!("Failed to broadcast message: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log::error!("Invalid content for new_tx message");
|
||||
}
|
||||
}
|
||||
AnkFlag::Cipher => {
|
||||
// For now we just send it to everyone
|
||||
debug!("Received a cipher message");
|
||||
if let Err(e) = broadcast_message(
|
||||
peers.clone(),
|
||||
AnkFlag::Cipher,
|
||||
ank_msg.content,
|
||||
BroadcastType::ExcludeSender(addr),
|
||||
) {
|
||||
log::error!("Failed to send message with error: {}", e);
|
||||
}
|
||||
}
|
||||
AnkFlag::Unknown => {
|
||||
debug!("Received an unknown message");
|
||||
if let Err(e) = broadcast_message(
|
||||
peers.clone(),
|
||||
AnkFlag::Unknown,
|
||||
ank_msg.content,
|
||||
BroadcastType::ExcludeSender(addr),
|
||||
) {
|
||||
log::error!("Failed to send message with error: {}", e);
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(_) => log::error!("Failed to parse network message"),
|
||||
}
|
||||
} else {
|
||||
// we don't care
|
||||
log::debug!("Received non-text message {} from peer {}", msg, addr);
|
||||
}
|
||||
future::ok(())
|
||||
});
|
||||
|
||||
let receive_from_others = UnboundedReceiverStream::new(rx)
|
||||
.map(Ok)
|
||||
.forward(outgoing)
|
||||
.map(|result| {
|
||||
if let Err(e) = result {
|
||||
debug!("Error sending message: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
pin_mut!(broadcast_incoming, receive_from_others);
|
||||
future::select(broadcast_incoming, receive_from_others).await;
|
||||
|
||||
debug!("{} disconnected", &addr);
|
||||
peers.lock().unwrap().remove(&addr);
|
||||
}
|
||||
|
||||
fn compute_partial_tweak_to_transaction(
|
||||
tx: &Transaction,
|
||||
daemon: Arc<Mutex<Daemon>>,
|
||||
) -> Result<PublicKey> {
|
||||
let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len());
|
||||
let mut pubkeys: Vec<PublicKey> = Vec::with_capacity(tx.input.len());
|
||||
for input in tx.input.iter() {
|
||||
outpoints.push((
|
||||
input.previous_output.txid.to_string(),
|
||||
input.previous_output.vout,
|
||||
));
|
||||
let prev_tx = daemon
|
||||
.lock_anyhow()?
|
||||
.get_transaction(&input.previous_output.txid, None)
|
||||
.map_err(|e| Error::msg(format!("Failed to find previous transaction: {}", e)))?;
|
||||
|
||||
if let Some(output) = prev_tx.output.get(input.previous_output.vout as usize) {
|
||||
match get_pubkey_from_input(
|
||||
&input.script_sig.to_bytes(),
|
||||
&input.witness.to_vec(),
|
||||
&output.script_pubkey.to_bytes(),
|
||||
) {
|
||||
Ok(Some(pubkey)) => pubkeys.push(pubkey),
|
||||
Ok(None) => continue,
|
||||
Err(e) => {
|
||||
return Err(Error::msg(format!(
|
||||
"Can't extract pubkey from input: {}",
|
||||
e
|
||||
)))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(Error::msg("Transaction with a non-existing input"));
|
||||
}
|
||||
}
|
||||
|
||||
let input_pub_keys: Vec<&PublicKey> = pubkeys.iter().collect();
|
||||
let partial_tweak = calculate_tweak_data(&input_pub_keys, &outpoints)?;
|
||||
Ok(partial_tweak)
|
||||
}
|
||||
|
||||
fn create_new_tx_message(transaction: Vec<u8>, daemon: Arc<Mutex<Daemon>>) -> Result<NewTxMessage> {
|
||||
// debug!("Creating tx message");
|
||||
let tx: Transaction = deserialize(&transaction)?;
|
||||
|
||||
if tx.is_coinbase() {
|
||||
return Err(Error::msg("Can't process coinbase transaction"));
|
||||
}
|
||||
|
||||
let partial_tweak = compute_partial_tweak_to_transaction(&tx, daemon)?;
|
||||
Ok(NewTxMessage::new(
|
||||
transaction.to_lower_hex_string(),
|
||||
Some(partial_tweak.to_string()),
|
||||
))
|
||||
}
|
||||
|
||||
async fn handle_zmq(peers: PeerMap, shared_daemon: SharedDaemon) {
|
||||
debug!("Starting listening on Core");
|
||||
let mut socket = zeromq::SubSocket::new();
|
||||
socket.connect("tcp://127.0.0.1:29100").await.unwrap();
|
||||
socket.subscribe("rawtx").await.unwrap();
|
||||
// socket.subscribe("hashblock");
|
||||
loop {
|
||||
let core_msg = match socket.recv().await {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
error!("Zmq error: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
debug!("Received a message");
|
||||
|
||||
let payload: String = if let (Some(topic), Some(data)) = (core_msg.get(0), core_msg.get(1))
|
||||
{
|
||||
// debug!("topic: {}", std::str::from_utf8(&topic).unwrap());
|
||||
match std::str::from_utf8(&topic) {
|
||||
Ok("rawtx") => match create_new_tx_message(data.to_vec(), shared_daemon.clone()) {
|
||||
Ok(m) => {
|
||||
debug!("Created message");
|
||||
serde_json::to_string(&m).expect("This shouldn't fail")
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{}", e);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Ok("hashblock") => todo!(),
|
||||
_ => {
|
||||
error!("Unexpected message in zmq");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("Empty message");
|
||||
continue;
|
||||
};
|
||||
|
||||
debug!("Broadcasting message {}", payload);
|
||||
if let Err(e) =
|
||||
broadcast_message(peers.clone(), AnkFlag::NewTx, payload, BroadcastType::ToAll)
|
||||
{
|
||||
log::error!("{}", e.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() -> Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
// This is rudimentary, if you change the network don't forget to change rpc_url either, we won't do that for you
|
||||
let rpc_url = env::args()
|
||||
.nth(1)
|
||||
.unwrap_or_else(|| "127.0.0.1:38332".to_owned());
|
||||
let listening_addr = env::args()
|
||||
.nth(2)
|
||||
.unwrap_or_else(|| "127.0.0.1:8090".to_string());
|
||||
let wallet_name = env::args().nth(3).unwrap_or_else(|| "default".to_owned());
|
||||
let network_arg: String = env::args().nth(4).unwrap_or_else(|| "signet".to_owned());
|
||||
let core_wallet: Option<String> = env::args().nth(5);
|
||||
|
||||
let network = Network::from_core_arg(&network_arg)?;
|
||||
|
||||
if network == Network::Bitcoin {
|
||||
warn!("Running on mainnet, you're on your own");
|
||||
}
|
||||
|
||||
MESSAGECACHE
|
||||
.set(MessageCache::new())
|
||||
.expect("Message Cache initialization failed");
|
||||
|
||||
tokio::spawn(clean_up());
|
||||
|
||||
let peers = PeerMap::new(Mutex::new(HashMap::new()));
|
||||
|
||||
// Connect the rpc daemon
|
||||
let shared_daemon = Arc::new(Mutex::new(Daemon::connect(core_wallet, rpc_url, network)?));
|
||||
|
||||
let current_tip: u32 = shared_daemon
|
||||
.lock_anyhow()?
|
||||
.get_current_height()?
|
||||
.try_into()?;
|
||||
|
||||
let mut config_dir = PathBuf::from_str(&env::var("HOME")?)?;
|
||||
config_dir.push(".4nk");
|
||||
let sp_wallet_file = JsonFile::new(&config_dir, &PathBuf::from_str(&wallet_name)?);
|
||||
fs::create_dir_all(config_dir)?;
|
||||
|
||||
// load an existing sp_wallet, or create a new one
|
||||
let is_testnet = if network == Network::Bitcoin {
|
||||
false
|
||||
} else {
|
||||
true
|
||||
};
|
||||
let sp_wallet = match <JsonFile as Storage<SpWallet>>::load(&sp_wallet_file) {
|
||||
Err(_) => {
|
||||
let mut seed = [0u8; 64];
|
||||
thread_rng().fill(&mut seed);
|
||||
let (scan_sk, spend_sk) = derive_keys_from_seed(&seed, is_testnet)
|
||||
.expect("Couldn't generate a new sp_wallet");
|
||||
let new_client = SpClient::new(
|
||||
wallet_name,
|
||||
scan_sk,
|
||||
SpendKey::Secret(spend_sk),
|
||||
None,
|
||||
is_testnet,
|
||||
)
|
||||
.expect("Failed to create a new SpClient");
|
||||
|
||||
let mut wallet = SpWallet::new(new_client, None)?;
|
||||
|
||||
// set birthday to avoid unnecessary scanning
|
||||
let outputs = wallet.get_mut_outputs();
|
||||
outputs.set_birthday(current_tip);
|
||||
outputs.update_last_scan(current_tip);
|
||||
|
||||
wallet
|
||||
}
|
||||
Ok(wallet) => wallet,
|
||||
};
|
||||
|
||||
log::info!(
|
||||
"Using wallet {} with address {}",
|
||||
sp_wallet.get_client().label,
|
||||
sp_wallet.get_client().get_receiving_address()
|
||||
);
|
||||
|
||||
log::info!(
|
||||
"Found {} outputs for a total balance of {}",
|
||||
sp_wallet.get_outputs().to_spendable_list().len(),
|
||||
sp_wallet.get_outputs().get_balance()
|
||||
);
|
||||
|
||||
let last_scan = sp_wallet.get_outputs().get_last_scan();
|
||||
|
||||
let shared_sp_wallet = Mutex::new(sp_wallet);
|
||||
let shared_wallet_storage = Mutex::new(sp_wallet_file);
|
||||
|
||||
let sp_wallet = Arc::new(SilentPaymentWallet {
|
||||
sp_wallet: shared_sp_wallet,
|
||||
storage: shared_wallet_storage,
|
||||
});
|
||||
|
||||
sp_wallet.save()?;
|
||||
|
||||
if last_scan < current_tip {
|
||||
log::info!("Scanning for our outputs");
|
||||
scan_blocks(
|
||||
shared_daemon.clone(),
|
||||
sp_wallet.clone(),
|
||||
current_tip - last_scan,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Subscribe to Bitcoin Core
|
||||
tokio::spawn(handle_zmq(peers.clone(), shared_daemon.clone()));
|
||||
|
||||
// Create the event loop and TCP listener we'll accept connections on.
|
||||
let try_socket = TcpListener::bind(&listening_addr).await;
|
||||
let listener = try_socket.expect("Failed to bind");
|
||||
debug!("Listening on: {}", listening_addr);
|
||||
|
||||
// Let's spawn the handling of each connection in a separate task.
|
||||
while let Ok((stream, addr)) = listener.accept().await {
|
||||
tokio::spawn(handle_connection(
|
||||
peers.clone(),
|
||||
shared_daemon.clone(),
|
||||
sp_wallet.clone(),
|
||||
stream,
|
||||
addr,
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
286
src/scan.rs
Normal file
286
src/scan.rs
Normal file
@ -0,0 +1,286 @@
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
use electrum_client::ElectrumApi;
|
||||
use hex::FromHex;
|
||||
use sdk_common::sp_client::bitcoin::bip158::BlockFilter;
|
||||
use sdk_common::sp_client::bitcoin::hex::DisplayHex;
|
||||
use sdk_common::sp_client::bitcoin::secp256k1::{All, PublicKey, Scalar, Secp256k1, SecretKey};
|
||||
use sdk_common::sp_client::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, XOnlyPublicKey};
|
||||
use sdk_common::sp_client::silentpayments::receiving::Receiver;
|
||||
use sdk_common::sp_client::spclient::{OutputSpendStatus, OwnedOutput};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::{electrumclient, MutexExt, SharedDaemon, SilentPaymentWallet};
|
||||
|
||||
fn get_script_to_secret_map(
|
||||
sp_receiver: &Receiver,
|
||||
tweak_data_vec: Vec<String>,
|
||||
scan_key_scalar: Scalar,
|
||||
secp: &Secp256k1<All>,
|
||||
) -> Result<HashMap<[u8; 34], PublicKey>> {
|
||||
let mut res = HashMap::new();
|
||||
let shared_secrets: Result<Vec<PublicKey>> = tweak_data_vec
|
||||
.into_iter()
|
||||
.map(|s| {
|
||||
let x = PublicKey::from_str(&s).map_err(Error::new)?;
|
||||
x.mul_tweak(secp, &scan_key_scalar).map_err(Error::new)
|
||||
})
|
||||
.collect();
|
||||
let shared_secrets = shared_secrets?;
|
||||
|
||||
for shared_secret in shared_secrets {
|
||||
let spks = sp_receiver.get_spks_from_shared_secret(&shared_secret)?;
|
||||
|
||||
for spk in spks.into_values() {
|
||||
res.insert(spk, shared_secret);
|
||||
}
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn check_block(
|
||||
blkfilter: BlockFilter,
|
||||
blkhash: BlockHash,
|
||||
candidate_spks: Vec<&[u8; 34]>,
|
||||
owned_spks: Vec<Vec<u8>>,
|
||||
) -> Result<bool> {
|
||||
// check output scripts
|
||||
let mut scripts_to_match: Vec<_> = candidate_spks.into_iter().map(|spk| spk.as_ref()).collect();
|
||||
|
||||
// check input scripts
|
||||
scripts_to_match.extend(owned_spks.iter().map(|spk| spk.as_slice()));
|
||||
|
||||
// note: match will always return true for an empty query!
|
||||
if !scripts_to_match.is_empty() {
|
||||
Ok(blkfilter.match_any(&blkhash, &mut scripts_to_match.into_iter())?)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
fn scan_block_outputs(
|
||||
sp_receiver: &Receiver,
|
||||
txdata: &Vec<Transaction>,
|
||||
blkheight: u64,
|
||||
spk2secret: HashMap<[u8; 34], PublicKey>,
|
||||
) -> Result<HashMap<OutPoint, OwnedOutput>> {
|
||||
let mut res: HashMap<OutPoint, OwnedOutput> = HashMap::new();
|
||||
|
||||
// loop over outputs
|
||||
for tx in txdata {
|
||||
let txid = tx.txid();
|
||||
|
||||
// collect all taproot outputs from transaction
|
||||
let p2tr_outs: Vec<(usize, &TxOut)> = tx
|
||||
.output
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, o)| o.script_pubkey.is_p2tr())
|
||||
.collect();
|
||||
|
||||
if p2tr_outs.is_empty() {
|
||||
continue;
|
||||
}; // no taproot output
|
||||
|
||||
let mut secret: Option<PublicKey> = None;
|
||||
// Does this transaction contains one of the outputs we already found?
|
||||
for spk in p2tr_outs.iter().map(|(_, o)| &o.script_pubkey) {
|
||||
if let Some(s) = spk2secret.get(spk.as_bytes()) {
|
||||
// we might have at least one output in this transaction
|
||||
secret = Some(*s);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if secret.is_none() {
|
||||
continue;
|
||||
}; // we don't have a secret that matches any of the keys
|
||||
|
||||
// Now we can just run sp_receiver on all the p2tr outputs
|
||||
let xonlykeys: Result<Vec<XOnlyPublicKey>> = p2tr_outs
|
||||
.iter()
|
||||
.map(|(_, o)| {
|
||||
XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]).map_err(Error::new)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let ours = sp_receiver.scan_transaction(&secret.unwrap(), xonlykeys?)?;
|
||||
for (label, map) in ours {
|
||||
res.extend(p2tr_outs.iter().filter_map(|(i, o)| {
|
||||
match XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]) {
|
||||
Ok(key) => {
|
||||
if let Some(scalar) = map.get(&key) {
|
||||
match SecretKey::from_slice(&scalar.to_be_bytes()) {
|
||||
Ok(tweak) => {
|
||||
let outpoint = OutPoint {
|
||||
txid,
|
||||
vout: *i as u32,
|
||||
};
|
||||
let label_str: Option<String>;
|
||||
if let Some(l) = &label {
|
||||
label_str =
|
||||
Some(l.as_inner().to_be_bytes().to_lower_hex_string());
|
||||
} else {
|
||||
label_str = None;
|
||||
}
|
||||
return Some((
|
||||
outpoint,
|
||||
OwnedOutput {
|
||||
blockheight: blkheight as u32,
|
||||
tweak: hex::encode(tweak.secret_bytes()),
|
||||
amount: o.value,
|
||||
script: hex::encode(o.script_pubkey.as_bytes()),
|
||||
label: label_str,
|
||||
spend_status: OutputSpendStatus::Unspent,
|
||||
},
|
||||
));
|
||||
}
|
||||
Err(_) => {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
Err(_) => None,
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn scan_block_inputs(
|
||||
our_outputs: HashMap<OutPoint, OwnedOutput>,
|
||||
txdata: Vec<Transaction>,
|
||||
) -> Result<Vec<OutPoint>> {
|
||||
let mut found = vec![];
|
||||
|
||||
for tx in txdata {
|
||||
for input in tx.input {
|
||||
let prevout = input.previous_output;
|
||||
|
||||
if our_outputs.contains_key(&prevout) {
|
||||
found.push(prevout);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(found)
|
||||
}
|
||||
|
||||
pub fn scan_blocks(
|
||||
shared_daemon: SharedDaemon,
|
||||
sp_wallet: Arc<SilentPaymentWallet>,
|
||||
mut n_blocks_to_scan: u32,
|
||||
) -> anyhow::Result<()> {
|
||||
log::info!("Starting a rescan");
|
||||
let electrum_client = electrumclient::create_electrum_client()?;
|
||||
|
||||
let core = shared_daemon.lock_anyhow()?;
|
||||
|
||||
let secp = Secp256k1::new();
|
||||
let scan_height = sp_wallet.get_wallet()?.get_outputs().get_last_scan();
|
||||
let tip_height: u32 = core.get_current_height()?.try_into()?;
|
||||
|
||||
// 0 means scan to tip
|
||||
if n_blocks_to_scan == 0 {
|
||||
n_blocks_to_scan = tip_height - scan_height;
|
||||
}
|
||||
|
||||
let start = scan_height + 1;
|
||||
let end = if scan_height + n_blocks_to_scan <= tip_height {
|
||||
scan_height + n_blocks_to_scan
|
||||
} else {
|
||||
tip_height
|
||||
};
|
||||
|
||||
if start > end {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
log::info!("start: {} end: {}", start, end);
|
||||
let mut filters: Vec<(u32, BlockHash, BlockFilter)> = vec![];
|
||||
for blkheight in start..=end {
|
||||
filters.push(core.get_filters(blkheight)?);
|
||||
}
|
||||
|
||||
let mut tweak_data_map = electrum_client.sp_tweaks(start as usize)?;
|
||||
|
||||
let scan_sk = sp_wallet.get_wallet()?.get_client().get_scan_key();
|
||||
|
||||
let sp_receiver = sp_wallet.get_wallet()?.get_client().sp_receiver.clone();
|
||||
let start_time = Instant::now();
|
||||
|
||||
for (blkheight, blkhash, blkfilter) in filters {
|
||||
let spk2secret = match tweak_data_map.remove(&(&blkheight)) {
|
||||
Some(tweak_data_vec) => {
|
||||
get_script_to_secret_map(&sp_receiver, tweak_data_vec, scan_sk.into(), &secp)?
|
||||
}
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
// check if new possible outputs are payments to us
|
||||
let candidate_spks: Vec<&[u8; 34]> = spk2secret.keys().collect();
|
||||
|
||||
// check if owned inputs are spent
|
||||
let our_outputs: HashMap<OutPoint, OwnedOutput> =
|
||||
sp_wallet.get_wallet()?.get_outputs().to_outpoints_list();
|
||||
|
||||
let owned_spks: Result<Vec<Vec<u8>>> = our_outputs
|
||||
.iter()
|
||||
.map(|(_, output)| {
|
||||
let script = Vec::from_hex(&output.script).map_err(|e| Error::new(e));
|
||||
script
|
||||
})
|
||||
.collect();
|
||||
|
||||
let matched = check_block(blkfilter, blkhash, candidate_spks, owned_spks?)?;
|
||||
|
||||
if matched {
|
||||
let blk = core.get_block(blkhash)?;
|
||||
|
||||
// scan block for new outputs, and add them to our list
|
||||
let utxo_created_in_block =
|
||||
scan_block_outputs(&sp_receiver, &blk.txdata, blkheight.into(), spk2secret)?;
|
||||
if !utxo_created_in_block.is_empty() {
|
||||
sp_wallet
|
||||
.get_wallet()?
|
||||
.get_mut_outputs()
|
||||
.extend_from(utxo_created_in_block);
|
||||
}
|
||||
|
||||
// update the list of outputs just in case
|
||||
// utxos may be created and destroyed in the same block
|
||||
let updated_outputs: HashMap<OutPoint, OwnedOutput> =
|
||||
sp_wallet.get_wallet()?.get_outputs().to_outpoints_list();
|
||||
|
||||
// search inputs and mark as mined
|
||||
let utxo_destroyed_in_block = scan_block_inputs(updated_outputs, blk.txdata)?;
|
||||
if !utxo_destroyed_in_block.is_empty() {
|
||||
let mut wallet = sp_wallet.get_wallet()?;
|
||||
let outputs = wallet.get_mut_outputs();
|
||||
for outpoint in utxo_destroyed_in_block {
|
||||
outputs.mark_mined(outpoint, blkhash)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// time elapsed for the scan
|
||||
log::info!(
|
||||
"Scan complete in {} seconds",
|
||||
start_time.elapsed().as_secs()
|
||||
);
|
||||
|
||||
// update last_scan height
|
||||
sp_wallet
|
||||
.get_wallet()?
|
||||
.get_mut_outputs()
|
||||
.update_last_scan(end);
|
||||
sp_wallet.save()?;
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user