sdk_relay/src/main.rs
2024-03-21 18:06:39 +01:00

800 lines
28 KiB
Rust

use std::{
collections::HashMap,
env,
net::SocketAddr,
ops::Deref,
path::PathBuf,
str::FromStr,
sync::{Arc, Mutex},
};
use bitcoincore_rpc::json::{self as bitcoin_json};
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
use log::{debug, error};
use sp_backend::bitcoin::secp256k1::{
rand::{thread_rng, Rng},
Error as Secp256k1Error, Keypair, Message as Secp256k1Message, PublicKey, Scalar, Secp256k1,
SecretKey, ThirtyTwoByteHash,
};
use sp_backend::bitcoin::{
absolute::LockTime,
consensus::deserialize,
hex::DisplayHex,
key::TapTweak,
sighash::{Prevouts, SighashCache},
taproot::Signature,
transaction::Version,
Amount, OutPoint, Psbt, ScriptBuf, TapSighashType, Transaction, TxIn, TxOut, Txid, Witness,
XOnlyPublicKey,
};
use sp_backend::spclient::OutputList;
use sp_backend::db::{JsonFile, Storage};
use sp_backend::silentpayments::receiving::Label;
use sp_backend::silentpayments::sending::{generate_recipient_pubkeys, SilentPaymentAddress};
use sp_backend::silentpayments::utils::receiving::{
calculate_shared_secret, calculate_tweak_data, get_pubkey_from_input,
};
use sp_backend::silentpayments::utils::sending::calculate_partial_secret;
use sp_backend::spclient::{
derive_keys_from_seed, OutputSpendStatus, OwnedOutput, Recipient, SpClient, SpendKey,
};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tungstenite::tungstenite::Message;
use anyhow::{Error, Result};
mod daemon;
mod electrumclient;
mod scan;
use crate::{daemon::Daemon, scan::scan_blocks};
type Tx = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
const FAUCET_AMT: Amount = Amount::from_sat(1000);
pub(crate) trait MutexExt<T> {
fn lock_anyhow(&self) -> Result<MutexGuard<T>, Error>;
}
impl<T: Debug> MutexExt<T> for Mutex<T> {
fn lock_anyhow(&self) -> Result<MutexGuard<T>, Error> {
self.lock()
.map_err(|e| Error::msg(format!("Failed to lock: {}", e)))
}
}
#[derive(Debug)]
pub(crate) struct SilentPaymentWallet {
sp_client: Mutex<SpClient>,
sp_outputs: Mutex<OutputList>,
storage: Mutex<JsonFile>,
}
impl SilentPaymentWallet {
pub fn get_client(&self) -> Result<MutexGuard<SpClient>> {
self.sp_client.lock_anyhow()
}
pub fn get_outputs(&self) -> Result<MutexGuard<OutputList>> {
self.sp_outputs.lock_anyhow()
}
pub fn get_storage(&self) -> Result<MutexGuard<JsonFile>> {
self.storage.lock_anyhow()
}
}
enum BroadcastType {
Sender(SocketAddr),
ExcludeSender(SocketAddr),
#[allow(dead_code)]
ToAll,
}
fn broadcast_message(peers: PeerMap, msg: Message, broadcast: BroadcastType) -> Result<()> {
// log::debug!("Broadcasting message: {}", msg);
match broadcast {
BroadcastType::Sender(addr) => {
peers
.lock()
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
.iter()
.find(|(peer_addr, _)| peer_addr == &&addr)
.ok_or(Error::msg("Failed to find the sender in the peer_map"))?
.1
.send(msg.clone())?;
}
BroadcastType::ExcludeSender(addr) => {
peers
.lock()
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
.iter()
.filter(|(peer_addr, _)| peer_addr != &&addr)
.for_each(|(_, peer_tx)| {
let _ = peer_tx.send(msg.clone());
});
}
BroadcastType::ToAll => {
peers
.lock()
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
.iter()
.for_each(|(_, peer_tx)| {
let _ = peer_tx.send(msg.clone());
});
}
}
Ok(())
}
fn spend_from_core(
dest: XOnlyPublicKey,
daemon: Arc<Mutex<Daemon>>,
) -> Result<(Transaction, Amount)> {
let core = daemon
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?;
let unspent_list: Vec<bitcoin_json::ListUnspentResultEntry> =
core.list_unspent_from_to(None)?;
if !unspent_list.is_empty() {
let network = core.get_network()?;
let spk = ScriptBuf::new_p2tr_tweaked(dest.dangerous_assume_tweaked());
let new_psbt = core.create_psbt(&unspent_list, spk, network)?;
let processed_psbt = core.process_psbt(new_psbt)?;
let finalize_psbt_result = core.finalize_psbt(processed_psbt)?;
let final_psbt = Psbt::from_str(&finalize_psbt_result)?;
let total_fee = final_psbt.fee()?;
let final_tx = final_psbt.extract_tx()?;
let fee_rate = total_fee
.checked_div(final_tx.weight().to_vbytes_ceil())
.unwrap();
Ok((final_tx, fee_rate))
} else {
// we don't have enough available coins to pay for this faucet request
Err(Error::msg("No spendable outputs"))
}
}
fn find_owned_outputs(
tx: &Transaction,
ours: HashMap<Option<Label>, HashMap<XOnlyPublicKey, Scalar>>,
) -> Result<HashMap<OutPoint, OwnedOutput>> {
let mut res: HashMap<OutPoint, OwnedOutput> = HashMap::new();
for (label, map) in ours {
res.extend(tx.output.iter().enumerate().filter_map(
|(i, o)| match XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]) {
Ok(key) => {
if let Some(scalar) = map.get(&key) {
match SecretKey::from_slice(&scalar.to_be_bytes()) {
Ok(tweak) => {
let outpoint = OutPoint {
txid: tx.txid(),
vout: i as u32,
};
let label_str: Option<String>;
if let Some(l) = &label {
label_str =
Some(l.as_inner().to_be_bytes().to_lower_hex_string());
} else {
label_str = None;
}
return Some((
outpoint,
OwnedOutput {
blockheight: 0,
tweak: tweak.secret_bytes().to_lower_hex_string(),
amount: o.value,
script: o.script_pubkey.as_bytes().to_lower_hex_string(),
label: label_str,
spend_status: OutputSpendStatus::Unspent,
},
));
}
Err(_) => {
return None;
}
}
}
None
}
Err(_) => None,
},
));
}
Ok(res)
}
fn faucet_send(
sp_address: SilentPaymentAddress,
sp_client: Arc<Mutex<SpClient>>,
sp_outputs: Arc<Mutex<OutputList>>,
daemon: Arc<Mutex<Daemon>>,
) -> Result<Txid> {
let mut first_tx: Option<Transaction> = None;
let final_tx: Transaction;
let mut new_outpoints: HashMap<OutPoint, OwnedOutput>;
// do we have a sp output available ?
let available_outpoints = sp_outputs
.lock()
.map_err(|e| Error::msg(e.to_string()))?
.to_spendable_list();
let available_amt = available_outpoints
.iter()
.fold(Amount::from_sat(0), |acc, (_, x)| acc + x.amount);
// If we don't have at least 4 times the amount we need to send, we take some reserves out
if available_amt > FAUCET_AMT.checked_mul(4).unwrap() {
let mut total_amt = Amount::from_sat(0);
let mut inputs = HashMap::new();
for (outpoint, output) in available_outpoints {
total_amt += output.amount;
inputs.insert(outpoint, output);
if total_amt >= FAUCET_AMT {
break;
}
}
let recipient = Recipient {
address: sp_address.into(),
amount: FAUCET_AMT,
nb_outputs: 1,
};
let fee_estimate = daemon
.lock()
.map_err(|e| Error::msg(format!("{}", e.to_string())))?
.estimate_fee(6)?
.unwrap_or(Amount::from_sat(1000))
.checked_div(1000)
.unwrap();
log::debug!("fee estimate for 6 blocks: {}", fee_estimate);
let wallet = sp_client.lock().map_err(|e| Error::msg(e.to_string()))?;
let mut new_psbt = wallet.create_new_psbt(inputs.clone(), vec![recipient], None)?;
log::debug!("Created psbt: {}", new_psbt);
SpClient::set_fees(&mut new_psbt, fee_estimate, sp_address.into())?;
wallet.fill_sp_outputs(&mut new_psbt)?;
log::debug!("Definitive psbt: {}", new_psbt);
let mut aux_rand = [0u8; 32];
thread_rng().fill(&mut aux_rand);
let mut signed = wallet.sign_psbt(new_psbt, &aux_rand)?;
log::debug!("signed psbt: {}", signed);
SpClient::finalize_psbt(&mut signed)?;
final_tx = signed.extract_tx()?;
// take all we need to register the new sp output
let outpoints: Vec<(String, u32)> = final_tx
.input
.iter()
.map(|i| (i.previous_output.txid.to_string(), i.previous_output.vout))
.collect();
let our_sp_address: SilentPaymentAddress =
wallet.sp_receiver.get_receiving_address().try_into()?;
let our_spend_pubkey = our_sp_address.get_spend_key();
let secp = Secp256k1::verification_only();
let input_pubkeys: Result<Vec<PublicKey>, Secp256k1Error> = inputs
.iter()
.map(|(_, o)| {
let tweak = SecretKey::from_str(&o.tweak)?;
our_spend_pubkey.mul_tweak(&secp, &tweak.into())
})
.collect();
let input_pubkeys = input_pubkeys?;
let input_pubkeys: Vec<&PublicKey> = input_pubkeys.iter().collect();
let partial_tweak = calculate_tweak_data(&input_pubkeys, &outpoints)?;
let ecdh_shared_secret = calculate_shared_secret(partial_tweak, wallet.get_scan_key())?;
let outputs_to_check: Result<Vec<XOnlyPublicKey>, Secp256k1Error> = final_tx
.output
.iter()
.map(|o| XOnlyPublicKey::from_slice(&o.script_pubkey.as_bytes()[2..]))
.collect();
let ours = wallet
.sp_receiver
.scan_transaction(&ecdh_shared_secret, outputs_to_check?)?;
new_outpoints = find_owned_outputs(&final_tx, ours)?;
} else {
// let's try to spend directly from the mining address
let secp = Secp256k1::signing_only();
let keypair = Keypair::new(&secp, &mut thread_rng());
// we first spend from core to the pubkey we just created
let (core_tx, fee_rate) = spend_from_core(keypair.x_only_public_key().0, daemon.clone())?;
// check that the first output of the transaction pays to the key we just created
assert!(
core_tx.output[0].script_pubkey
== ScriptBuf::new_p2tr_tweaked(
keypair.x_only_public_key().0.dangerous_assume_tweaked()
)
);
// create a new transaction that spends the newly created UTXO to the sp_address
let mut faucet_tx = Transaction {
input: vec![TxIn {
previous_output: OutPoint::new(core_tx.txid(), 0),
..Default::default()
}],
output: vec![],
version: Version::TWO,
lock_time: LockTime::ZERO,
};
// now do the silent payment operations with the final recipient address
let partial_secret = calculate_partial_secret(
&[(keypair.secret_key(), true)],
&[(core_tx.txid().to_string(), 0)],
)?;
let ext_output_key: XOnlyPublicKey =
generate_recipient_pubkeys(vec![sp_address.into()], partial_secret)?
.into_values()
.flatten()
.collect::<Vec<XOnlyPublicKey>>()
.get(0)
.expect("Failed to generate keys")
.to_owned();
let change_sp_address = sp_client
.lock()
.map_err(|e| Error::msg(format!("Failed to lock sp_client: {}", e.to_string())))?
.get_receiving_address();
let change_output_key: XOnlyPublicKey =
generate_recipient_pubkeys(vec![change_sp_address], partial_secret)?
.into_values()
.flatten()
.collect::<Vec<XOnlyPublicKey>>()
.get(0)
.expect("Failed to generate keys")
.to_owned();
let ext_spk = ScriptBuf::new_p2tr_tweaked(ext_output_key.dangerous_assume_tweaked());
let change_spk = ScriptBuf::new_p2tr_tweaked(change_output_key.dangerous_assume_tweaked());
faucet_tx.output.push(TxOut {
value: FAUCET_AMT,
script_pubkey: ext_spk,
});
faucet_tx.output.push(TxOut {
value: core_tx.output[0].value - FAUCET_AMT,
script_pubkey: change_spk,
});
// dummy signature only used for fee estimation
faucet_tx.input[0].witness.push([1; 64].to_vec());
let abs_fee = fee_rate
.checked_mul(faucet_tx.weight().to_vbytes_ceil())
.ok_or_else(|| Error::msg("Fee rate multiplication overflowed"))?;
// reset the witness to empty
faucet_tx.input[0].witness = Witness::new();
faucet_tx.output[1].value -= abs_fee;
let first_tx_outputs = vec![core_tx.output[0].clone()];
let prevouts = Prevouts::All(&first_tx_outputs);
let hash_ty = TapSighashType::Default;
let mut cache = SighashCache::new(&faucet_tx);
let sighash = cache.taproot_key_spend_signature_hash(0, &prevouts, hash_ty)?;
let msg = Secp256k1Message::from_digest(sighash.into_32());
let sig = secp.sign_schnorr_with_rng(&msg, &keypair, &mut thread_rng());
let final_sig = Signature { sig, hash_ty };
faucet_tx.input[0].witness.push(final_sig.to_vec());
// take all we need to register the new sp output
let outpoints: Vec<(String, u32)> = vec![(core_tx.txid().to_string(), 0)];
first_tx = Some(core_tx);
let client = sp_client
.lock()
.map_err(|e| Error::msg(format!("{}", e.to_string())))?;
let input_pubkey = &keypair.public_key();
let input_pub_keys: Vec<&PublicKey> = vec![input_pubkey];
let partial_tweak = calculate_tweak_data(&input_pub_keys, &outpoints)?;
let ecdh_shared_secret = calculate_shared_secret(partial_tweak, client.get_scan_key())?;
let p2tr_outs = vec![ext_output_key, change_output_key];
let ours = client
.sp_receiver
.scan_transaction(&ecdh_shared_secret, p2tr_outs)?;
final_tx = faucet_tx;
new_outpoints = find_owned_outputs(&final_tx, ours)?;
}
if let Ok(core) = daemon.lock() {
// get current blockheight
let blkheight: u32 = core.get_current_height()?.try_into()?;
// update the new outpoints
for o in new_outpoints.iter_mut() {
o.1.blockheight = blkheight;
}
// broadcast one or two transactions
if first_tx.is_some() {
core.broadcast(&first_tx.unwrap())?;
}
core.broadcast(&final_tx)?;
} else {
return Err(Error::msg("Failed to lock daemon"));
}
// update our sp_client with the change output(s)
let mut outputs = sp_outputs
.lock()
.map_err(|e| Error::msg(format!("{}", e.to_string())))?;
outputs.extend_from(new_outpoints);
log::debug!("{:?}", outputs.to_outpoints_list());
Ok(final_tx.txid())
}
fn handle_faucet_request(
msg: &str,
sp_client: Arc<Mutex<SpClient>>,
sp_outputs: Arc<Mutex<OutputList>>,
daemon: Arc<Mutex<Daemon>>,
) -> Result<Txid> {
if let Ok(sp_address) = SilentPaymentAddress::try_from(&msg["faucet".len()..]) {
// send bootstrap coins to this sp_address
faucet_send(sp_address, sp_client, sp_outputs, daemon)
} else {
Err(Error::msg(format!(
"faucet message with unparsable sp_address"
)))
}
}
async fn handle_connection(
peer_map: PeerMap,
raw_stream: TcpStream,
addr: SocketAddr,
sp_client: Arc<Mutex<SpClient>>,
sp_outputs: Arc<Mutex<OutputList>>,
daemon: Arc<Mutex<Daemon>>,
) {
debug!("Incoming TCP connection from: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
debug!("WebSocket connection established");
// Insert the write part of this peer to the peer map.
let (tx, rx) = unbounded_channel();
peer_map.lock().unwrap().insert(addr, tx);
let (outgoing, incoming) = ws_stream.split();
let broadcast_incoming = incoming.try_for_each({
let peer_map = peer_map.clone();
move |msg| {
if msg.is_text() {
if msg.to_string().starts_with("faucet") {
match handle_faucet_request(
&msg.to_string(),
sp_client.clone(),
sp_outputs.clone(),
daemon.clone(),
) {
Ok(txid) => {
if let Err(e) = broadcast_message(
peer_map.clone(),
Message::Text(format!("faucet{}", txid.to_string())),
BroadcastType::Sender(addr),
) {
log::error!("Failed to broadcast message: {}", e.to_string());
} else {
log::debug!("Successfully broadcasted message: {}", txid);
}
}
Err(e) => {
if let Err(e) = broadcast_message(
peer_map.clone(),
Message::Text(e.to_string()),
BroadcastType::Sender(addr),
) {
log::error!("Failed to broadcast message: {}", e);
}
}
}
} else {
// we just send it `as is` to everyone except sender
if let Err(e) =
broadcast_message(peer_map.clone(), msg, BroadcastType::ExcludeSender(addr))
{
log::error!("Failed to broadcast message: {}", e);
}
}
} else {
// we don't care
log::debug!("Received non-text message {} from peer {}", msg, addr);
}
future::ok(())
}
});
let receive_from_others = UnboundedReceiverStream::new(rx)
.map(Ok)
.forward(outgoing)
.map(|result| {
if let Err(e) = result {
debug!("Error sending message: {}", e);
}
});
pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;
debug!("{} disconnected", &addr);
peer_map.lock().unwrap().remove(&addr);
}
fn flatten_msg(parts: &[Vec<u8>]) -> Vec<u8> {
let total_len = parts.iter().fold(0, |acc, v| acc + v.len());
let mut final_vec = Vec::with_capacity(total_len);
for p in parts {
final_vec.extend(p);
}
final_vec
}
fn process_raw_tx_message(
core_msg: &bitcoincore_zmq::Message,
daemon: Arc<Mutex<Daemon>>,
) -> Result<Vec<u8>> {
let tx: Transaction = deserialize(&core_msg.serialize_data_to_vec())?;
if tx.is_coinbase() {
return Err(Error::msg("Can't process coinbase transaction"));
}
let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len());
let mut pubkeys: Vec<PublicKey> = Vec::with_capacity(tx.input.len());
for input in tx.input {
outpoints.push((
input.previous_output.txid.to_string(),
input.previous_output.vout,
));
let prev_tx = daemon
.lock()
.map_err(|e| Error::msg(format!("Failed to lock the daemon: {}", e)))?
.get_transaction(&input.previous_output.txid, None)
.map_err(|e| Error::msg(format!("Failed to find previous transaction: {}", e)))?;
if let Some(output) = prev_tx.output.get(input.previous_output.vout as usize) {
match get_pubkey_from_input(
&input.script_sig.to_bytes(),
&input.witness.to_vec(),
&output.script_pubkey.to_bytes(),
) {
Ok(Some(pubkey)) => pubkeys.push(pubkey),
Ok(None) => continue,
Err(e) => {
return Err(Error::msg(format!(
"Can't extract pubkey from input: {}",
e
)))
}
}
} else {
return Err(Error::msg("Transaction with a non-existing input"));
}
}
let input_pub_keys: Vec<&PublicKey> = pubkeys.iter().collect();
match calculate_tweak_data(&input_pub_keys, &outpoints) {
Ok(partial_tweak) => {
let mut vecs = core_msg.serialize_to_vecs().to_vec();
vecs.push(partial_tweak.serialize().to_vec());
Ok(flatten_msg(&vecs))
}
Err(e) => Err(Error::msg(format!(
"Failed to compute tweak data: {}",
e.to_string()
))),
}
}
async fn handle_zmq(peer_map: PeerMap, daemon: Arc<Mutex<Daemon>>) {
tokio::task::spawn_blocking(move || {
debug!("Starting listening on Core");
for msg in bitcoincore_zmq::subscribe_receiver(&["tcp://127.0.0.1:29000"]).unwrap() {
let core_msg = match msg {
Ok(core_msg) => core_msg,
Err(e) => {
error!("Error receiving ZMQ message: {}", e);
continue;
}
};
debug!("Received a {} message", core_msg.topic_str());
let payload: Vec<u8> = match core_msg.topic_str() {
"rawtx" => {
let processed = process_raw_tx_message(&core_msg, daemon.clone());
match processed {
Ok(p) => p,
Err(_) => continue,
}
}
_ => flatten_msg(&core_msg.serialize_to_vecs()),
};
if let Err(e) = broadcast_message(
peer_map.clone(),
Message::Binary(payload),
BroadcastType::ToAll,
) {
log::error!("{}", e.to_string());
}
}
});
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
env_logger::init();
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8090".to_string());
let wallet_name = env::args().nth(2).unwrap_or_else(|| "default".to_owned());
let is_testnet: bool = env::args()
.nth(3)
.unwrap_or_else(|| "true".to_owned())
.parse()
.expect("Please provide either \"true\" or \"false\"");
let core_wallet: Option<String> = env::args().nth(4);
let state = PeerMap::new(Mutex::new(HashMap::new()));
// Connect the rpc daemon
let daemon = Daemon::connect(core_wallet).unwrap();
let current_tip: u32 = daemon
.get_current_height()
.expect("Failed to make rpc call")
.try_into()
.expect("block count is higher than u32::MAX");
let mut config_dir = PathBuf::from_str(&env::var("HOME")?)?;
config_dir.push(".4nk");
let sp_client_file = JsonFile::new(&config_dir, wallet_name.clone())?;
let sp_outputs_file = JsonFile::new(&config_dir, format!("{}.db", wallet_name))?;
// load an existing sp_wallet, or create a new one
let (sp_client, sp_outputs) = match <JsonFile as Storage<SpClient>>::load(&sp_client_file) {
Ok(existing) => {
if let Ok(our_outputs) = <JsonFile as Storage<OutputList>>::load(&sp_outputs_file) {
(existing, our_outputs)
} else {
let our_address = SilentPaymentAddress::try_from(existing.get_receiving_address())?;
let new_outputs = OutputList::new(
our_address.get_scan_key(),
our_address.get_spend_key(),
current_tip,
);
sp_outputs_file.save(&new_outputs)?;
(existing, new_outputs)
}
}
Err(_) => {
let mut seed = [0u8; 64];
thread_rng().fill(&mut seed);
let (scan_sk, spend_sk) = derive_keys_from_seed(&seed, is_testnet)
.expect("Couldn't generate a new sp_wallet");
let new_client = SpClient::new(
wallet_name,
scan_sk,
SpendKey::Secret(spend_sk),
None,
is_testnet,
)
.expect("Failed to create a new SpClient");
let new_address = SilentPaymentAddress::try_from(new_client.get_receiving_address())?;
let new_outputs = OutputList::new(
new_address.get_scan_key(),
new_address.get_spend_key(),
current_tip,
);
sp_client_file.save(&new_client)?;
sp_outputs_file.save(&new_outputs)?;
(new_client, new_outputs)
}
};
log::info!(
"Using wallet {} with address {}",
sp_client.label,
sp_client.get_receiving_address()
);
log::info!(
"Found {} outputs for a total balance of {}",
sp_outputs.to_spendable_list().len(),
sp_outputs.get_balance()
);
let last_scan = sp_outputs.get_last_scan();
let shared_daemon = Arc::new(Mutex::new(daemon));
let shared_sp_client = Arc::new(Mutex::new(sp_client));
let shared_sp_outputs = Arc::new(Mutex::new(sp_outputs));
if last_scan < current_tip {
log::info!("Scanning for our outputs");
match scan_blocks(
shared_sp_client.clone(),
shared_daemon.clone(),
shared_sp_outputs.clone(),
current_tip - last_scan,
) {
Ok(()) => {
let updated = shared_sp_outputs
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e)))?;
sp_outputs_file.save(updated.deref())?;
}
Err(e) => return Err(e),
};
}
// Subscribe to Bitcoin Core
tokio::spawn(handle_zmq(state.clone(), shared_daemon.clone()));
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect("Failed to bind");
debug!("Listening on: {}", addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(
state.clone(),
stream,
addr,
shared_sp_client.clone(),
shared_sp_outputs.clone(),
shared_daemon.clone(),
));
}
Ok(())
}