Refactoring

This commit is contained in:
Sosthene00 2024-03-21 18:07:22 +01:00
parent 6db81ee769
commit 306949e9f0
3 changed files with 127 additions and 118 deletions

2
Cargo.lock generated
View File

@ -1197,7 +1197,7 @@ dependencies = [
[[package]]
name = "sp_backend"
version = "0.1.0"
source = "git+https://github.com/Sosthene00/sp-backend?branch=sp_client#0213188a95921081f5c74e5099ac46e6737a07d0"
source = "git+https://github.com/Sosthene00/sp-backend?branch=sp_client#32967c214df9a25daef551a372b89c400f2369f8"
dependencies = [
"anyhow",
"bitcoin 0.31.1",

View File

@ -1,11 +1,12 @@
use std::{
collections::HashMap,
env,
fmt::Debug,
net::SocketAddr,
ops::Deref,
path::PathBuf,
str::FromStr,
sync::{Arc, Mutex},
sync::{Arc, Mutex, MutexGuard},
};
use bitcoincore_rpc::json::{self as bitcoin_json};
@ -56,6 +57,8 @@ type Tx = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
type SharedDaemon = Arc<Mutex<Daemon>>;
const FAUCET_AMT: Amount = Amount::from_sat(1000);
pub(crate) trait MutexExt<T> {
@ -216,19 +219,15 @@ fn find_owned_outputs(
fn faucet_send(
sp_address: SilentPaymentAddress,
sp_client: Arc<Mutex<SpClient>>,
sp_outputs: Arc<Mutex<OutputList>>,
daemon: Arc<Mutex<Daemon>>,
sp_wallet: Arc<SilentPaymentWallet>,
shared_daemon: SharedDaemon,
) -> Result<Txid> {
let mut first_tx: Option<Transaction> = None;
let final_tx: Transaction;
let mut new_outpoints: HashMap<OutPoint, OwnedOutput>;
// do we have a sp output available ?
let available_outpoints = sp_outputs
.lock()
.map_err(|e| Error::msg(e.to_string()))?
.to_spendable_list();
let available_outpoints = sp_wallet.get_outputs()?.to_spendable_list();
let available_amt = available_outpoints
.iter()
@ -252,9 +251,8 @@ fn faucet_send(
nb_outputs: 1,
};
let fee_estimate = daemon
.lock()
.map_err(|e| Error::msg(format!("{}", e.to_string())))?
let fee_estimate = shared_daemon
.lock_anyhow()?
.estimate_fee(6)?
.unwrap_or(Amount::from_sat(1000))
.checked_div(1000)
@ -262,7 +260,7 @@ fn faucet_send(
log::debug!("fee estimate for 6 blocks: {}", fee_estimate);
let wallet = sp_client.lock().map_err(|e| Error::msg(e.to_string()))?;
let wallet = sp_wallet.get_client()?;
let mut new_psbt = wallet.create_new_psbt(inputs.clone(), vec![recipient], None)?;
log::debug!("Created psbt: {}", new_psbt);
@ -317,7 +315,8 @@ fn faucet_send(
let keypair = Keypair::new(&secp, &mut thread_rng());
// we first spend from core to the pubkey we just created
let (core_tx, fee_rate) = spend_from_core(keypair.x_only_public_key().0, daemon.clone())?;
let (core_tx, fee_rate) =
spend_from_core(keypair.x_only_public_key().0, shared_daemon.clone())?;
// check that the first output of the transaction pays to the key we just created
assert!(
@ -352,10 +351,7 @@ fn faucet_send(
.get(0)
.expect("Failed to generate keys")
.to_owned();
let change_sp_address = sp_client
.lock()
.map_err(|e| Error::msg(format!("Failed to lock sp_client: {}", e.to_string())))?
.get_receiving_address();
let change_sp_address = sp_wallet.get_client()?.get_receiving_address();
let change_output_key: XOnlyPublicKey =
generate_recipient_pubkeys(vec![change_sp_address], partial_secret)?
.into_values()
@ -410,9 +406,7 @@ fn faucet_send(
first_tx = Some(core_tx);
let client = sp_client
.lock()
.map_err(|e| Error::msg(format!("{}", e.to_string())))?;
let client = sp_wallet.get_client()?;
let input_pubkey = &keypair.public_key();
@ -431,9 +425,9 @@ fn faucet_send(
new_outpoints = find_owned_outputs(&final_tx, ours)?;
}
if let Ok(core) = daemon.lock() {
if let Ok(daemon) = shared_daemon.lock() {
// get current blockheight
let blkheight: u32 = core.get_current_height()?.try_into()?;
let blkheight: u32 = daemon.get_current_height()?.try_into()?;
// update the new outpoints
for o in new_outpoints.iter_mut() {
o.1.blockheight = blkheight;
@ -441,34 +435,32 @@ fn faucet_send(
// broadcast one or two transactions
if first_tx.is_some() {
core.broadcast(&first_tx.unwrap())?;
daemon.broadcast(&first_tx.unwrap())?;
}
core.broadcast(&final_tx)?;
daemon.broadcast(&final_tx)?;
} else {
return Err(Error::msg("Failed to lock daemon"));
}
// update our sp_client with the change output(s)
let mut outputs = sp_outputs
.lock()
.map_err(|e| Error::msg(format!("{}", e.to_string())))?;
let mut outputs = sp_wallet.get_outputs()?;
outputs.extend_from(new_outpoints);
log::debug!("{:?}", outputs.to_outpoints_list());
// save to disk
sp_wallet.get_storage()?.save(outputs.deref())?;
Ok(final_tx.txid())
}
fn handle_faucet_request(
msg: &str,
sp_client: Arc<Mutex<SpClient>>,
sp_outputs: Arc<Mutex<OutputList>>,
daemon: Arc<Mutex<Daemon>>,
sp_wallet: Arc<SilentPaymentWallet>,
shared_daemon: SharedDaemon,
) -> Result<Txid> {
if let Ok(sp_address) = SilentPaymentAddress::try_from(&msg["faucet".len()..]) {
// send bootstrap coins to this sp_address
faucet_send(sp_address, sp_client, sp_outputs, daemon)
faucet_send(sp_address, sp_wallet, shared_daemon)
} else {
Err(Error::msg(format!(
"faucet message with unparsable sp_address"
@ -477,12 +469,11 @@ fn handle_faucet_request(
}
async fn handle_connection(
peer_map: PeerMap,
peers: PeerMap,
shared_daemon: SharedDaemon,
sp_wallet: Arc<SilentPaymentWallet>,
raw_stream: TcpStream,
addr: SocketAddr,
sp_client: Arc<Mutex<SpClient>>,
sp_outputs: Arc<Mutex<OutputList>>,
daemon: Arc<Mutex<Daemon>>,
) {
debug!("Incoming TCP connection from: {}", addr);
@ -493,24 +484,29 @@ async fn handle_connection(
// Insert the write part of this peer to the peer map.
let (tx, rx) = unbounded_channel();
peer_map.lock().unwrap().insert(addr, tx);
match peers.lock_anyhow() {
Ok(mut peer_map) => peer_map.insert(addr, tx),
Err(e) => {
log::error!("{}", e);
panic!();
}
};
let (outgoing, incoming) = ws_stream.split();
let broadcast_incoming = incoming.try_for_each({
let peer_map = peer_map.clone();
let peers = peers.clone();
move |msg| {
if msg.is_text() {
if msg.to_string().starts_with("faucet") {
match handle_faucet_request(
&msg.to_string(),
sp_client.clone(),
sp_outputs.clone(),
daemon.clone(),
sp_wallet.clone(),
shared_daemon.clone(),
) {
Ok(txid) => {
if let Err(e) = broadcast_message(
peer_map.clone(),
peers.clone(),
Message::Text(format!("faucet{}", txid.to_string())),
BroadcastType::Sender(addr),
) {
@ -521,7 +517,7 @@ async fn handle_connection(
}
Err(e) => {
if let Err(e) = broadcast_message(
peer_map.clone(),
peers.clone(),
Message::Text(e.to_string()),
BroadcastType::Sender(addr),
) {
@ -532,7 +528,7 @@ async fn handle_connection(
} else {
// we just send it `as is` to everyone except sender
if let Err(e) =
broadcast_message(peer_map.clone(), msg, BroadcastType::ExcludeSender(addr))
broadcast_message(peers.clone(), msg, BroadcastType::ExcludeSender(addr))
{
log::error!("Failed to broadcast message: {}", e);
}
@ -558,7 +554,7 @@ async fn handle_connection(
future::select(broadcast_incoming, receive_from_others).await;
debug!("{} disconnected", &addr);
peer_map.lock().unwrap().remove(&addr);
peers.lock().unwrap().remove(&addr);
}
fn flatten_msg(parts: &[Vec<u8>]) -> Vec<u8> {
@ -628,7 +624,11 @@ fn process_raw_tx_message(
}
}
async fn handle_zmq(peer_map: PeerMap, daemon: Arc<Mutex<Daemon>>) {
async fn handle_zmq(
peers: PeerMap,
shared_daemon: SharedDaemon,
sp_wallet: Arc<SilentPaymentWallet>,
) {
tokio::task::spawn_blocking(move || {
debug!("Starting listening on Core");
for msg in bitcoincore_zmq::subscribe_receiver(&["tcp://127.0.0.1:29000"]).unwrap() {
@ -643,22 +643,50 @@ async fn handle_zmq(peer_map: PeerMap, daemon: Arc<Mutex<Daemon>>) {
let payload: Vec<u8> = match core_msg.topic_str() {
"rawtx" => {
let processed = process_raw_tx_message(&core_msg, daemon.clone());
let processed = process_raw_tx_message(&core_msg, shared_daemon.clone());
match processed {
Ok(p) => p,
Err(_) => continue,
}
}
},
"rawblock" => {
// scan the block for our outputs
match scan_blocks(shared_daemon.clone(), sp_wallet.clone(), 1) {
Ok(()) => {
let updated = match sp_wallet.get_outputs() {
Ok(sp_outputs) => sp_outputs,
Err(e) => {
log::error!("{}", e);
continue;
}
};
match sp_wallet.get_storage() {
Ok(storage) => {
if let Err(e) = storage.save(updated.deref()) {
log::error!("{}", e);
}
}
Err(e) => {
log::error!("{}", e);
}
}
}
Err(e) => log::error!("{}", e),
};
flatten_msg(&core_msg.serialize_to_vecs())
},
_ => flatten_msg(&core_msg.serialize_to_vecs()),
};
if let Err(e) = broadcast_message(
peer_map.clone(),
peers.clone(),
Message::Binary(payload),
BroadcastType::ToAll,
) {
log::error!("{}", e.to_string());
}
}
});
}
@ -678,16 +706,15 @@ async fn main() -> Result<()> {
.expect("Please provide either \"true\" or \"false\"");
let core_wallet: Option<String> = env::args().nth(4);
let state = PeerMap::new(Mutex::new(HashMap::new()));
let peers = PeerMap::new(Mutex::new(HashMap::new()));
// Connect the rpc daemon
let daemon = Daemon::connect(core_wallet).unwrap();
let shared_daemon = Arc::new(Mutex::new(Daemon::connect(core_wallet)?));
let current_tip: u32 = daemon
.get_current_height()
.expect("Failed to make rpc call")
.try_into()
.expect("block count is higher than u32::MAX");
let current_tip: u32 = shared_daemon
.lock_anyhow()?
.get_current_height()?
.try_into()?;
let mut config_dir = PathBuf::from_str(&env::var("HOME")?)?;
config_dir.push(".4nk");
@ -753,30 +780,31 @@ async fn main() -> Result<()> {
let last_scan = sp_outputs.get_last_scan();
let shared_daemon = Arc::new(Mutex::new(daemon));
let shared_sp_client = Arc::new(Mutex::new(sp_client));
let shared_sp_outputs = Arc::new(Mutex::new(sp_outputs));
let shared_sp_client = Mutex::new(sp_client);
let shared_sp_outputs = Mutex::new(sp_outputs);
let shared_outputs_storage = Mutex::new(sp_outputs_file);
let sp_wallet = Arc::new(SilentPaymentWallet {
sp_client: shared_sp_client,
sp_outputs: shared_sp_outputs,
storage: shared_outputs_storage,
});
if last_scan < current_tip {
log::info!("Scanning for our outputs");
match scan_blocks(
shared_sp_client.clone(),
scan_blocks(
shared_daemon.clone(),
shared_sp_outputs.clone(),
sp_wallet.clone(),
current_tip - last_scan,
) {
Ok(()) => {
let updated = shared_sp_outputs
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e)))?;
sp_outputs_file.save(updated.deref())?;
}
Err(e) => return Err(e),
};
)?;
}
// Subscribe to Bitcoin Core
tokio::spawn(handle_zmq(state.clone(), shared_daemon.clone()));
tokio::spawn(handle_zmq(
peers.clone(),
shared_daemon.clone(),
sp_wallet.clone(),
));
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
@ -786,12 +814,11 @@ async fn main() -> Result<()> {
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(
state.clone(),
peers.clone(),
shared_daemon.clone(),
sp_wallet.clone(),
stream,
addr,
shared_sp_client.clone(),
shared_sp_outputs.clone(),
shared_daemon.clone(),
));
}

View File

@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use anyhow::{Error, Result};
use electrum_client::ElectrumApi;
@ -9,11 +10,12 @@ use sp_backend::bitcoin::bip158::BlockFilter;
use sp_backend::bitcoin::hex::DisplayHex;
use sp_backend::bitcoin::secp256k1::{All, PublicKey, Scalar, Secp256k1, SecretKey};
use sp_backend::bitcoin::{BlockHash, OutPoint, Transaction, TxOut, XOnlyPublicKey};
use sp_backend::db::Storage;
use sp_backend::silentpayments::receiving::Receiver;
use sp_backend::spclient::{OutputList, OutputSpendStatus, OwnedOutput, SpClient};
use sp_backend::spclient::{OutputSpendStatus, OwnedOutput};
use tokio::time::Instant;
use crate::{electrumclient, Daemon};
use crate::{electrumclient, MutexExt, SharedDaemon, SilentPaymentWallet};
fn get_script_to_secret_map(
sp_receiver: &Receiver,
@ -172,23 +174,17 @@ fn scan_block_inputs(
}
pub fn scan_blocks(
sp_client: Arc<Mutex<SpClient>>,
daemon: Arc<Mutex<Daemon>>,
sp_outputs: Arc<Mutex<OutputList>>,
shared_daemon: SharedDaemon,
sp_wallet: Arc<SilentPaymentWallet>,
mut n_blocks_to_scan: u32,
) -> anyhow::Result<()> {
log::info!("Starting a rescan");
let electrum_client = electrumclient::create_electrum_client()?;
let core = daemon
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?;
let core = shared_daemon.lock_anyhow()?;
let secp = Secp256k1::new();
let scan_height = sp_outputs
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?
.get_last_scan();
let scan_height = sp_wallet.get_outputs()?.get_last_scan();
let tip_height: u32 = core.get_current_height()?.try_into()?;
// 0 means scan to tip
@ -215,16 +211,9 @@ pub fn scan_blocks(
let mut tweak_data_map = electrum_client.sp_tweaks(start as usize)?;
let scan_sk = sp_client
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?
.get_scan_key();
let scan_sk = sp_wallet.get_client()?.get_scan_key();
let sp_receiver = sp_client
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?
.sp_receiver
.clone();
let sp_receiver = sp_wallet.get_client()?.sp_receiver.clone();
let start_time = Instant::now();
for (blkheight, blkhash, blkfilter) in filters {
@ -239,10 +228,8 @@ pub fn scan_blocks(
let candidate_spks: Vec<&[u8; 34]> = spk2secret.keys().collect();
// check if owned inputs are spent
let our_outputs: HashMap<OutPoint, OwnedOutput> = sp_outputs
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?
.to_outpoints_list();
let our_outputs: HashMap<OutPoint, OwnedOutput> =
sp_wallet.get_outputs()?.to_outpoints_list();
let owned_spks: Result<Vec<Vec<u8>>> = our_outputs
.iter()
@ -261,25 +248,18 @@ pub fn scan_blocks(
let utxo_created_in_block =
scan_block_outputs(&sp_receiver, &blk.txdata, blkheight.into(), spk2secret)?;
if !utxo_created_in_block.is_empty() {
sp_outputs
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e)))?
.extend_from(utxo_created_in_block);
sp_wallet.get_outputs()?.extend_from(utxo_created_in_block);
}
// update the list of outputs just in case
// utxos may be created and destroyed in the same block
let updated_outputs: HashMap<OutPoint, OwnedOutput> = sp_outputs
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?
.to_outpoints_list();
let updated_outputs: HashMap<OutPoint, OwnedOutput> =
sp_wallet.get_outputs()?.to_outpoints_list();
// search inputs and mark as mined
let utxo_destroyed_in_block = scan_block_inputs(updated_outputs, blk.txdata)?;
if !utxo_destroyed_in_block.is_empty() {
let mut outputs = sp_outputs
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e)))?;
let mut outputs = sp_wallet.get_outputs()?;
for outpoint in utxo_destroyed_in_block {
outputs.mark_mined(outpoint, blkhash)?;
}
@ -294,9 +274,11 @@ pub fn scan_blocks(
);
// update last_scan height
sp_outputs
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e)))?
.update_last_scan(end);
let mut updated = sp_wallet.get_outputs()?;
updated.update_last_scan(end);
sp_wallet.get_storage()?.save(updated.deref())?;
Ok(())
}