Move shared resources to static variables

This commit is contained in:
Sosthene 2024-06-25 11:21:14 +02:00
parent fbd7a1c1ea
commit 31c17e908d
5 changed files with 103 additions and 179 deletions

View File

@ -3,11 +3,11 @@ use log::info;
const VALIDATE_DOMAIN: bool = false; // self-signed cert, so we don't validate const VALIDATE_DOMAIN: bool = false; // self-signed cert, so we don't validate
pub fn create_electrum_client(electrum_url: String) -> anyhow::Result<Client> { pub fn create_electrum_client(electrum_url: &str) -> anyhow::Result<Client> {
let config = ConfigBuilder::new() let config = ConfigBuilder::new()
.validate_domain(VALIDATE_DOMAIN) .validate_domain(VALIDATE_DOMAIN)
.build(); .build();
let electrum_client = Client::from_config(&electrum_url, config)?; let electrum_client = Client::from_config(electrum_url, config)?;
info!("ssl client {}", electrum_url); info!("ssl client {}", electrum_url);
Ok(electrum_client) Ok(electrum_client)

View File

@ -1,8 +1,4 @@
use std::{ use std::{collections::HashMap, str::FromStr};
collections::HashMap,
str::FromStr,
sync::{Arc, Mutex},
};
use bitcoincore_rpc::json::{self as bitcoin_json}; use bitcoincore_rpc::json::{self as bitcoin_json};
use sdk_common::sp_client::bitcoin::secp256k1::{ use sdk_common::sp_client::bitcoin::secp256k1::{
@ -32,18 +28,13 @@ use sdk_common::sp_client::spclient::Recipient;
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use crate::{ use crate::{scan::compute_partial_tweak_to_transaction, MutexExt, DAEMON, FAUCET_AMT, WALLET};
daemon::Daemon, scan::compute_partial_tweak_to_transaction, MutexExt, SharedDaemon,
SilentPaymentWallet, FAUCET_AMT,
};
fn spend_from_core( fn spend_from_core(dest: XOnlyPublicKey) -> Result<(Transaction, Amount)> {
dest: XOnlyPublicKey, let core = DAEMON
daemon: Arc<Mutex<Daemon>>, .get()
) -> Result<(Transaction, Amount)> { .ok_or(Error::msg("DAEMON not initialized"))?
let core = daemon .lock_anyhow()?;
.lock()
.map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?;
let unspent_list: Vec<bitcoin_json::ListUnspentResultEntry> = let unspent_list: Vec<bitcoin_json::ListUnspentResultEntry> =
core.list_unspent_from_to(None)?; core.list_unspent_from_to(None)?;
@ -69,15 +60,11 @@ fn spend_from_core(
} }
} }
fn faucet_send( fn faucet_send(sp_address: SilentPaymentAddress, commitment: &str) -> Result<Transaction> {
sp_address: SilentPaymentAddress,
commitment: &str,
sp_wallet: Arc<SilentPaymentWallet>,
shared_daemon: SharedDaemon,
) -> Result<Transaction> {
let mut first_tx: Option<Transaction> = None; let mut first_tx: Option<Transaction> = None;
let final_tx: Transaction; let final_tx: Transaction;
let sp_wallet = WALLET.get().ok_or(Error::msg("Wallet not initialized"))?;
// do we have a sp output available ? // do we have a sp output available ?
let available_outpoints = sp_wallet.get_wallet()?.get_outputs().to_spendable_list(); let available_outpoints = sp_wallet.get_wallet()?.get_outputs().to_spendable_list();
@ -103,7 +90,9 @@ fn faucet_send(
nb_outputs: 1, nb_outputs: 1,
}; };
let fee_estimate = shared_daemon let fee_estimate = DAEMON
.get()
.ok_or(Error::msg("DAEMON not initialized"))?
.lock_anyhow()? .lock_anyhow()?
.estimate_fee(6) .estimate_fee(6)
.unwrap_or(Amount::from_sat(1000)) .unwrap_or(Amount::from_sat(1000))
@ -130,8 +119,7 @@ fn faucet_send(
let keypair = Keypair::new(&secp, &mut thread_rng()); let keypair = Keypair::new(&secp, &mut thread_rng());
// we first spend from core to the pubkey we just created // we first spend from core to the pubkey we just created
let (core_tx, fee_rate) = let (core_tx, fee_rate) = spend_from_core(keypair.x_only_public_key().0)?;
spend_from_core(keypair.x_only_public_key().0, shared_daemon.clone())?;
// check that the first output of the transaction pays to the key we just created // check that the first output of the transaction pays to the key we just created
debug_assert!( debug_assert!(
@ -237,44 +225,41 @@ fn faucet_send(
final_tx = faucet_tx; final_tx = faucet_tx;
} }
if let Ok(daemon) = shared_daemon.lock() { {
let daemon = DAEMON
.get()
.ok_or(Error::msg("DAEMON not initialized"))?
.lock_anyhow()?;
// broadcast one or two transactions // broadcast one or two transactions
if first_tx.is_some() { if first_tx.is_some() {
daemon.broadcast(&first_tx.unwrap())?; daemon.broadcast(&first_tx.unwrap())?;
} }
let txid = daemon.broadcast(&final_tx)?; let txid = daemon.broadcast(&final_tx)?;
log::debug!("Sent tx {}", txid); log::debug!("Sent tx {}", txid);
} else {
return Err(Error::msg("Failed to lock daemon"));
} }
Ok(final_tx) Ok(final_tx)
} }
pub fn handle_faucet_request( pub fn handle_faucet_request(msg: &FaucetMessage) -> Result<NewTxMessage> {
msg: &FaucetMessage,
sp_wallet: Arc<SilentPaymentWallet>,
shared_daemon: SharedDaemon,
) -> Result<NewTxMessage> {
let sp_address = SilentPaymentAddress::try_from(msg.sp_address.as_str())?; let sp_address = SilentPaymentAddress::try_from(msg.sp_address.as_str())?;
log::debug!("Sending bootstrap coins to {}", sp_address); log::debug!("Sending bootstrap coins to {}", sp_address);
// send bootstrap coins to this sp_address // send bootstrap coins to this sp_address
let tx = faucet_send( let tx = faucet_send(sp_address, &msg.commitment)?;
sp_address,
&msg.commitment,
sp_wallet.clone(),
shared_daemon.clone(),
)?;
// get the tweak // get the tweak
let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?; let partial_tweak = compute_partial_tweak_to_transaction(&tx)?;
// get current blockheight // get current blockheight
let blkheight: u32 = shared_daemon let blkheight: u32 = DAEMON
.get()
.unwrap()
.lock_anyhow()? .lock_anyhow()?
.get_current_height()? .get_current_height()?
.try_into()?; .try_into()?;
let sp_wallet = WALLET.get().ok_or(Error::msg("Wallet not initialized"))?;
// update our sp_client with the change output(s) // update our sp_client with the change output(s)
sp_wallet sp_wallet
.get_wallet()? .get_wallet()?

View File

@ -7,7 +7,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
path::PathBuf, path::PathBuf,
str::FromStr, str::FromStr,
sync::{Arc, Mutex, MutexGuard}, sync::{Mutex, MutexGuard, OnceLock},
}; };
use bitcoincore_rpc::json::{self as bitcoin_json}; use bitcoincore_rpc::json::{self as bitcoin_json};
@ -50,9 +50,13 @@ use crate::{daemon::Daemon, scan::scan_blocks};
type Tx = UnboundedSender<Message>; type Tx = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>; type PeerMap = Mutex<HashMap<SocketAddr, Tx>>;
type SharedDaemon = Arc<Mutex<Daemon>>; pub(crate) static PEERMAP: OnceLock<PeerMap> = OnceLock::new();
type SharedDaemon = Mutex<Daemon>;
pub(crate) static DAEMON: OnceLock<SharedDaemon> = OnceLock::new();
#[derive(Debug)] #[derive(Debug)]
struct WalletFile { struct WalletFile {
@ -139,33 +143,35 @@ impl SilentPaymentWallet {
} }
} }
fn handle_new_tx_request(new_tx_msg: &mut NewTxMessage, shared_daemon: SharedDaemon) -> Result<()> { pub(crate) static WALLET: OnceLock<SilentPaymentWallet> = OnceLock::new();
fn handle_new_tx_request(new_tx_msg: &mut NewTxMessage) -> Result<()> {
let tx = deserialize::<Transaction>(&Vec::from_hex(&new_tx_msg.transaction)?)?; let tx = deserialize::<Transaction>(&Vec::from_hex(&new_tx_msg.transaction)?)?;
let mempool_accept = shared_daemon.lock_anyhow()?.test_mempool_accept(&tx)?; let mempool_accept = DAEMON
.get()
.unwrap()
.lock_anyhow()?
.test_mempool_accept(&tx)?;
if !mempool_accept.allowed { if !mempool_accept.allowed {
return Err(AnkError::NewTxError(mempool_accept.reject_reason.unwrap()))?; return Err(AnkError::NewTxError(mempool_accept.reject_reason.unwrap()))?;
} }
if new_tx_msg.tweak_data.is_none() { if new_tx_msg.tweak_data.is_none() {
// we add the tweak_data // we add the tweak_data
let partial_tweak = compute_partial_tweak_to_transaction(&tx, shared_daemon.clone())?; let partial_tweak = compute_partial_tweak_to_transaction(&tx)?;
new_tx_msg.tweak_data = Some(partial_tweak.to_string()); new_tx_msg.tweak_data = Some(partial_tweak.to_string());
} }
// we try to broadcast it // we try to broadcast it
shared_daemon.lock_anyhow()?.broadcast(&tx)?; DAEMON.get().unwrap().lock_anyhow()?.broadcast(&tx)?;
Ok(()) Ok(())
} }
async fn handle_connection( async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr) {
peers: PeerMap,
shared_daemon: SharedDaemon,
sp_wallet: Arc<SilentPaymentWallet>,
raw_stream: TcpStream,
addr: SocketAddr,
) {
debug!("Incoming TCP connection from: {}", addr); debug!("Incoming TCP connection from: {}", addr);
let peers = PEERMAP.get().expect("Peer Map not initialized");
let ws_stream = tokio_tungstenite::accept_async(raw_stream) let ws_stream = tokio_tungstenite::accept_async(raw_stream)
.await .await
.expect("Error during the websocket handshake occurred"); .expect("Error during the websocket handshake occurred");
@ -186,13 +192,7 @@ async fn handle_connection(
let broadcast_incoming = incoming.try_for_each(|msg| { let broadcast_incoming = incoming.try_for_each(|msg| {
if let Ok(raw_msg) = msg.to_text() { if let Ok(raw_msg) = msg.to_text() {
debug!("Received msg: {}", raw_msg); debug!("Received msg: {}", raw_msg);
process_message( process_message(raw_msg, addr);
raw_msg,
peers.clone(),
sp_wallet.clone(),
shared_daemon.clone(),
addr,
);
} else { } else {
debug!("Received non-text message {} from peer {}", msg, addr); debug!("Received non-text message {} from peer {}", msg, addr);
} }
@ -215,7 +215,7 @@ async fn handle_connection(
peers.lock().unwrap().remove(&addr); peers.lock().unwrap().remove(&addr);
} }
fn create_new_tx_message(transaction: Vec<u8>, daemon: Arc<Mutex<Daemon>>) -> Result<NewTxMessage> { fn create_new_tx_message(transaction: Vec<u8>) -> Result<NewTxMessage> {
// debug!("Creating tx message"); // debug!("Creating tx message");
let tx: Transaction = deserialize(&transaction)?; let tx: Transaction = deserialize(&transaction)?;
@ -223,14 +223,14 @@ fn create_new_tx_message(transaction: Vec<u8>, daemon: Arc<Mutex<Daemon>>) -> Re
return Err(Error::msg("Can't process coinbase transaction")); return Err(Error::msg("Can't process coinbase transaction"));
} }
let partial_tweak = compute_partial_tweak_to_transaction(&tx, daemon)?; let partial_tweak = compute_partial_tweak_to_transaction(&tx)?;
Ok(NewTxMessage::new( Ok(NewTxMessage::new(
transaction.to_lower_hex_string(), transaction.to_lower_hex_string(),
Some(partial_tweak.to_string()), Some(partial_tweak.to_string()),
)) ))
} }
async fn handle_zmq(peers: PeerMap, shared_daemon: SharedDaemon, zmq_url: String) { async fn handle_zmq(zmq_url: String, electrum_url: String) {
debug!("Starting listening on Core"); debug!("Starting listening on Core");
let mut socket = zeromq::SubSocket::new(); let mut socket = zeromq::SubSocket::new();
socket.connect(&zmq_url).await.unwrap(); socket.connect(&zmq_url).await.unwrap();
@ -250,7 +250,7 @@ async fn handle_zmq(peers: PeerMap, shared_daemon: SharedDaemon, zmq_url: String
{ {
debug!("topic: {}", std::str::from_utf8(&topic).unwrap()); debug!("topic: {}", std::str::from_utf8(&topic).unwrap());
match std::str::from_utf8(&topic) { match std::str::from_utf8(&topic) {
Ok("rawtx") => match create_new_tx_message(data.to_vec(), shared_daemon.clone()) { Ok("rawtx") => match create_new_tx_message(data.to_vec()) {
Ok(m) => { Ok(m) => {
debug!("Created message"); debug!("Created message");
serde_json::to_string(&m).expect("This shouldn't fail") serde_json::to_string(&m).expect("This shouldn't fail")
@ -272,9 +272,7 @@ async fn handle_zmq(peers: PeerMap, shared_daemon: SharedDaemon, zmq_url: String
}; };
debug!("Broadcasting message {}", payload); debug!("Broadcasting message {}", payload);
if let Err(e) = if let Err(e) = broadcast_message(AnkFlag::NewTx, payload, BroadcastType::ToAll) {
broadcast_message(peers.clone(), AnkFlag::NewTx, payload, BroadcastType::ToAll)
{
log::error!("{}", e.to_string()); log::error!("{}", e.to_string());
} }
} }
@ -294,16 +292,22 @@ async fn main() -> Result<()> {
.set(MessageCache::new()) .set(MessageCache::new())
.expect("Message Cache initialization failed"); .expect("Message Cache initialization failed");
let peers = PeerMap::new(Mutex::new(HashMap::new())); PEERMAP
.set(PeerMap::new(HashMap::new()))
.expect("PeerMap initialization failed");
// Connect the rpc daemon // Connect the rpc daemon
let shared_daemon = Arc::new(Mutex::new(Daemon::connect( DAEMON
config.core_wallet, .set(Mutex::new(Daemon::connect(
config.core_url, config.core_wallet,
config.network, config.core_url,
)?)); config.network,
)?))
.expect("DAEMON initialization failed");
let current_tip: u32 = shared_daemon let current_tip: u32 = DAEMON
.get()
.unwrap()
.lock_anyhow()? .lock_anyhow()?
.get_current_height()? .get_current_height()?
.try_into()?; .try_into()?;
@ -358,32 +362,20 @@ async fn main() -> Result<()> {
let last_scan = sp_wallet.get_outputs().get_last_scan(); let last_scan = sp_wallet.get_outputs().get_last_scan();
let shared_sp_wallet = Mutex::new(sp_wallet); WALLET
let shared_wallet_storage = Mutex::new(wallet_file); .set(SilentPaymentWallet {
sp_wallet: Mutex::new(sp_wallet),
let sp_wallet = Arc::new(SilentPaymentWallet { storage: Mutex::new(wallet_file),
sp_wallet: shared_sp_wallet, })
storage: shared_wallet_storage, .expect("Failed to initialize WALLET");
});
sp_wallet.save()?;
if last_scan < current_tip { if last_scan < current_tip {
log::info!("Scanning for our outputs"); log::info!("Scanning for our outputs");
scan_blocks( scan_blocks(current_tip - last_scan, &config.electrum_url)?;
shared_daemon.clone(),
sp_wallet.clone(),
current_tip - last_scan,
config.electrum_url,
)?;
} }
// Subscribe to Bitcoin Core // Subscribe to Bitcoin Core
tokio::spawn(handle_zmq( tokio::spawn(handle_zmq(config.zmq_url, config.electrum_url));
peers.clone(),
shared_daemon.clone(),
config.zmq_url,
));
// Create the event loop and TCP listener we'll accept connections on. // Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind("127.0.0.1:9090").await; let try_socket = TcpListener::bind("127.0.0.1:9090").await;
@ -393,13 +385,7 @@ async fn main() -> Result<()> {
// Let's spawn the handling of each connection in a separate task. // Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await { while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection( tokio::spawn(handle_connection(stream, addr));
peers.clone(),
shared_daemon.clone(),
sp_wallet.clone(),
stream,
addr,
));
} }
Ok(()) Ok(())

View File

@ -2,7 +2,7 @@ use anyhow::{Error, Result};
use std::{ use std::{
collections::HashMap, collections::HashMap,
net::SocketAddr, net::SocketAddr,
sync::{Arc, Mutex, OnceLock}, sync::{Mutex, OnceLock},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::time; use tokio::time;
@ -10,10 +10,7 @@ use tokio_tungstenite::tungstenite::Message;
use sdk_common::network::{AnkFlag, AnkNetworkMsg, FaucetMessage, NewTxMessage}; use sdk_common::network::{AnkFlag, AnkNetworkMsg, FaucetMessage, NewTxMessage};
use crate::{ use crate::{faucet::handle_faucet_request, handle_new_tx_request, PEERMAP};
daemon::Daemon, faucet::handle_faucet_request, handle_new_tx_request, PeerMap,
SilentPaymentWallet,
};
pub(crate) static MESSAGECACHE: OnceLock<MessageCache> = OnceLock::new(); pub(crate) static MESSAGECACHE: OnceLock<MessageCache> = OnceLock::new();
@ -84,11 +81,11 @@ pub(crate) enum BroadcastType {
} }
pub(crate) fn broadcast_message( pub(crate) fn broadcast_message(
peers: PeerMap,
flag: AnkFlag, flag: AnkFlag,
payload: String, payload: String,
broadcast: BroadcastType, broadcast: BroadcastType,
) -> Result<()> { ) -> Result<()> {
let peers = PEERMAP.get().ok_or(Error::msg("Unitialized peer map"))?;
let ank_msg = AnkNetworkMsg { let ank_msg = AnkNetworkMsg {
flag, flag,
content: payload, content: payload,
@ -129,16 +126,10 @@ pub(crate) fn broadcast_message(
Ok(()) Ok(())
} }
fn process_faucet_message( fn process_faucet_message(ank_msg: AnkNetworkMsg, addr: SocketAddr) {
ank_msg: AnkNetworkMsg,
peers: PeerMap,
sp_wallet: Arc<SilentPaymentWallet>,
shared_daemon: Arc<Mutex<Daemon>>,
addr: SocketAddr,
) {
log::debug!("Received a faucet message"); log::debug!("Received a faucet message");
if let Ok(mut content) = serde_json::from_str::<FaucetMessage>(&ank_msg.content) { if let Ok(mut content) = serde_json::from_str::<FaucetMessage>(&ank_msg.content) {
match handle_faucet_request(&content, sp_wallet.clone(), shared_daemon.clone()) { match handle_faucet_request(&content) {
Ok(new_tx_msg) => { Ok(new_tx_msg) => {
log::debug!( log::debug!(
"Obtained new_tx_msg: {}", "Obtained new_tx_msg: {}",
@ -149,12 +140,9 @@ fn process_faucet_message(
log::error!("Failed to send faucet tx: {}", e); log::error!("Failed to send faucet tx: {}", e);
content.error = Some(e.into()); content.error = Some(e.into());
let payload = serde_json::to_string(&content).expect("Message type shouldn't fail"); let payload = serde_json::to_string(&content).expect("Message type shouldn't fail");
if let Err(e) = broadcast_message( if let Err(e) =
peers.clone(), broadcast_message(AnkFlag::Faucet, payload, BroadcastType::Sender(addr))
AnkFlag::Faucet, {
payload,
BroadcastType::Sender(addr),
) {
log::error!("Failed to broadcast message: {}", e); log::error!("Failed to broadcast message: {}", e);
} }
} }
@ -164,19 +152,13 @@ fn process_faucet_message(
} }
} }
fn process_new_tx_message( fn process_new_tx_message(ank_msg: AnkNetworkMsg, addr: SocketAddr) {
ank_msg: AnkNetworkMsg,
peers: PeerMap,
shared_daemon: Arc<Mutex<Daemon>>,
addr: SocketAddr,
) {
log::debug!("Received a new tx message"); log::debug!("Received a new tx message");
if let Ok(mut new_tx_msg) = serde_json::from_str::<NewTxMessage>(&ank_msg.content) { if let Ok(mut new_tx_msg) = serde_json::from_str::<NewTxMessage>(&ank_msg.content) {
match handle_new_tx_request(&mut new_tx_msg, shared_daemon.clone()) { match handle_new_tx_request(&mut new_tx_msg) {
Ok(new_tx_msg) => { Ok(new_tx_msg) => {
// Repeat the msg to all except sender // Repeat the msg to all except sender
if let Err(e) = broadcast_message( if let Err(e) = broadcast_message(
peers.clone(),
AnkFlag::NewTx, AnkFlag::NewTx,
serde_json::to_string(&new_tx_msg).expect("This should not fail"), serde_json::to_string(&new_tx_msg).expect("This should not fail"),
BroadcastType::ExcludeSender(addr), BroadcastType::ExcludeSender(addr),
@ -188,7 +170,6 @@ fn process_new_tx_message(
log::error!("handle_new_tx_request returned error: {}", e); log::error!("handle_new_tx_request returned error: {}", e);
new_tx_msg.error = Some(e.into()); new_tx_msg.error = Some(e.into());
if let Err(e) = broadcast_message( if let Err(e) = broadcast_message(
peers.clone(),
AnkFlag::NewTx, AnkFlag::NewTx,
serde_json::to_string(&new_tx_msg).expect("This shouldn't fail"), serde_json::to_string(&new_tx_msg).expect("This shouldn't fail"),
BroadcastType::Sender(addr), BroadcastType::Sender(addr),
@ -202,16 +183,11 @@ fn process_new_tx_message(
} }
} }
fn process_cipher_message( fn process_cipher_message(ank_msg: AnkNetworkMsg, addr: SocketAddr) {
ank_msg: AnkNetworkMsg,
peers: PeerMap,
addr: SocketAddr,
) {
// For now we just send it to everyone // For now we just send it to everyone
log::debug!("Received a cipher message"); log::debug!("Received a cipher message");
if let Err(e) = broadcast_message( if let Err(e) = broadcast_message(
peers.clone(),
AnkFlag::Cipher, AnkFlag::Cipher,
ank_msg.content, ank_msg.content,
BroadcastType::ExcludeSender(addr), BroadcastType::ExcludeSender(addr),
@ -220,14 +196,9 @@ fn process_cipher_message(
} }
} }
fn process_unknown_message( fn process_unknown_message(ank_msg: AnkNetworkMsg, addr: SocketAddr) {
ank_msg: AnkNetworkMsg,
peers: PeerMap,
addr: SocketAddr,
) {
log::debug!("Received an unknown message"); log::debug!("Received an unknown message");
if let Err(e) = broadcast_message( if let Err(e) = broadcast_message(
peers.clone(),
AnkFlag::Unknown, AnkFlag::Unknown,
ank_msg.content, ank_msg.content,
BroadcastType::ExcludeSender(addr), BroadcastType::ExcludeSender(addr),
@ -236,13 +207,7 @@ fn process_unknown_message(
} }
} }
pub fn process_message( pub fn process_message(raw_msg: &str, addr: SocketAddr) {
raw_msg: &str,
peers: PeerMap,
sp_wallet: Arc<SilentPaymentWallet>,
shared_daemon: Arc<Mutex<Daemon>>,
addr: SocketAddr,
) {
log::debug!("Received msg: {}", raw_msg); log::debug!("Received msg: {}", raw_msg);
let cache = MESSAGECACHE.get().expect("Cache should be initialized"); let cache = MESSAGECACHE.get().expect("Cache should be initialized");
if cache.contains(raw_msg) { if cache.contains(raw_msg) {
@ -253,18 +218,10 @@ pub fn process_message(
} }
match serde_json::from_str::<AnkNetworkMsg>(raw_msg) { match serde_json::from_str::<AnkNetworkMsg>(raw_msg) {
Ok(ank_msg) => match ank_msg.flag { Ok(ank_msg) => match ank_msg.flag {
AnkFlag::Faucet => { AnkFlag::Faucet => process_faucet_message(ank_msg, addr),
process_faucet_message(ank_msg, peers, sp_wallet, shared_daemon, addr) AnkFlag::NewTx => process_new_tx_message(ank_msg, addr),
} AnkFlag::Cipher => process_cipher_message(ank_msg, addr),
AnkFlag::NewTx => { AnkFlag::Unknown => process_unknown_message(ank_msg, addr),
process_new_tx_message(ank_msg, peers, shared_daemon, addr)
}
AnkFlag::Cipher => {
process_cipher_message(ank_msg, peers, addr)
}
AnkFlag::Unknown => {
process_unknown_message(ank_msg, peers, addr)
}
}, },
Err(_) => log::error!("Failed to parse network message"), Err(_) => log::error!("Failed to parse network message"),
} }

View File

@ -1,6 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Mutex};
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use electrum_client::ElectrumApi; use electrum_client::ElectrumApi;
@ -16,13 +15,10 @@ use sdk_common::sp_client::silentpayments::utils::receiving::{
use sdk_common::sp_client::spclient::{OutputSpendStatus, OwnedOutput}; use sdk_common::sp_client::spclient::{OutputSpendStatus, OwnedOutput};
use tokio::time::Instant; use tokio::time::Instant;
use crate::daemon::Daemon; use crate::{electrumclient, MutexExt, DAEMON, WALLET};
use crate::{electrumclient, MutexExt, SharedDaemon, SilentPaymentWallet};
pub fn compute_partial_tweak_to_transaction( pub fn compute_partial_tweak_to_transaction(tx: &Transaction) -> Result<PublicKey> {
tx: &Transaction, let daemon = DAEMON.get().ok_or(Error::msg("DAEMON not initialized"))?;
daemon: Arc<Mutex<Daemon>>,
) -> Result<PublicKey> {
let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len()); let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len());
let mut pubkeys: Vec<PublicKey> = Vec::with_capacity(tx.input.len()); let mut pubkeys: Vec<PublicKey> = Vec::with_capacity(tx.input.len());
for input in tx.input.iter() { for input in tx.input.iter() {
@ -216,16 +212,16 @@ fn scan_block_inputs(
Ok(found) Ok(found)
} }
pub fn scan_blocks( pub fn scan_blocks(mut n_blocks_to_scan: u32, electrum_url: &str) -> anyhow::Result<()> {
shared_daemon: SharedDaemon,
sp_wallet: Arc<SilentPaymentWallet>,
mut n_blocks_to_scan: u32,
electrum_url: String,
) -> anyhow::Result<()> {
log::info!("Starting a rescan"); log::info!("Starting a rescan");
let electrum_client = electrumclient::create_electrum_client(electrum_url)?; let electrum_client = electrumclient::create_electrum_client(electrum_url)?;
let core = shared_daemon.lock_anyhow()?; let sp_wallet = WALLET.get().ok_or(Error::msg("Wallet not initialized"))?;
let core = DAEMON
.get()
.ok_or(Error::msg("DAEMON not initialized"))?
.lock_anyhow()?;
let secp = Secp256k1::new(); let secp = Secp256k1::new();
let scan_height = sp_wallet.get_wallet()?.get_outputs().get_last_scan(); let scan_height = sp_wallet.get_wallet()?.get_outputs().get_last_scan();