Simplify zmq & one track for all messaging

This commit is contained in:
Sosthene00 2024-04-17 08:24:12 +02:00
parent 459756815f
commit 4d3dc8123a
3 changed files with 505 additions and 394 deletions

653
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,6 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
bitcoincore-rpc = { version = "0.18" } bitcoincore-rpc = { version = "0.18" }
bitcoincore-zmq = "1.4.0"
electrum-client = { git = "https://github.com/cygnet3/rust-electrum-client", branch = "sp_tweaks" } electrum-client = { git = "https://github.com/cygnet3/rust-electrum-client", branch = "sp_tweaks" }
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] } futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
@ -20,3 +19,4 @@ sp_backend = { git = "https://github.com/Sosthene00/sp-backend", branch = "sp_cl
tokio = { version = "1.0.0", features = ["io-util", "rt-multi-thread", "macros", "sync"] } tokio = { version = "1.0.0", features = ["io-util", "rt-multi-thread", "macros", "sync"] }
tokio-stream = "0.1" tokio-stream = "0.1"
tokio-tungstenite = "0.21.0" tokio-tungstenite = "0.21.0"
zeromq = "0.3.5"

View File

@ -1,28 +1,22 @@
use std::{ use std::{
collections::HashMap, any::Any, collections::HashMap, env, fmt::Debug, net::SocketAddr, str::FromStr, sync::{Arc, Mutex, MutexGuard}
env,
fmt::Debug,
net::SocketAddr,
ops::Deref,
str::FromStr,
sync::{Arc, Mutex, MutexGuard},
}; };
use bitcoincore_rpc::json::{self as bitcoin_json}; use bitcoincore_rpc::json::{self as bitcoin_json};
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt}; use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
use log::{debug, error}; use log::{debug, error};
use sdk_common::network::{AnkFlag, AnkNetworkMsg, FaucetMessage, NewTxMessage};
use sp_backend::bitcoin::{ use sp_backend::bitcoin::{
absolute::LockTime, absolute::LockTime,
consensus::deserialize, consensus::{deserialize, serialize},
hex::DisplayHex, hex::DisplayHex,
key::TapTweak, key::TapTweak,
sighash::{Prevouts, SighashCache}, sighash::{Prevouts, SighashCache},
taproot::Signature, taproot::Signature,
transaction::Version, transaction::Version,
Amount, OutPoint, Psbt, ScriptBuf, TapSighashType, Transaction, TxIn, TxOut, Txid, Witness, Amount, OutPoint, Psbt, ScriptBuf, TapSighashType, Transaction, TxIn, TxOut, Witness,
XOnlyPublicKey, XOnlyPublicKey,
}; };
use sp_backend::spclient::OutputList;
use sp_backend::{ use sp_backend::{
bitcoin::secp256k1::{ bitcoin::secp256k1::{
rand::{thread_rng, Rng}, rand::{thread_rng, Rng},
@ -48,6 +42,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use zeromq::{Socket, SocketRecv};
mod daemon; mod daemon;
mod electrumclient; mod electrumclient;
@ -97,8 +92,18 @@ enum BroadcastType {
ToAll, ToAll,
} }
fn broadcast_message(peers: PeerMap, msg: Message, broadcast: BroadcastType) -> Result<()> { fn broadcast_message(
peers: PeerMap,
flag: AnkFlag,
payload: String,
broadcast: BroadcastType,
) -> Result<()> {
// log::debug!("Broadcasting message: {}", msg); // log::debug!("Broadcasting message: {}", msg);
let ank_msg = AnkNetworkMsg {
flag,
content: payload,
};
let msg = Message::Text(serde_json::to_string(&ank_msg)?);
match broadcast { match broadcast {
BroadcastType::Sender(addr) => { BroadcastType::Sender(addr) => {
peers peers
@ -505,28 +510,40 @@ async fn handle_connection(
let broadcast_incoming = incoming.try_for_each({ let broadcast_incoming = incoming.try_for_each({
let peers = peers.clone(); let peers = peers.clone();
move |msg| { move |msg| {
if msg.is_text() { if let Ok(raw_msg) = msg.to_text() {
if msg.to_string().starts_with("faucet") { debug!("Received msg: {}", raw_msg);
let parsed = serde_json::from_str::<AnkNetworkMsg>(raw_msg);
match parsed {
Ok(ank_msg) => match ank_msg.flag {
AnkFlag::Faucet => {
debug!("Received a faucet message");
if let Ok(content) =
serde_json::from_str::<FaucetMessage>(&ank_msg.content)
{
match handle_faucet_request( match handle_faucet_request(
&msg.to_string(), &content.sp_address,
sp_wallet.clone(), sp_wallet.clone(),
shared_daemon.clone(), shared_daemon.clone(),
) { ) {
Ok(txid) => { Ok(new_tx_msg) => {
if let Err(e) = broadcast_message( if let Err(e) = broadcast_message(
peers.clone(), peers.clone(),
Message::Text(format!("faucet{}", txid.to_string())), AnkFlag::NewTx,
serde_json::to_string(&new_tx_msg)
.expect("This should not fail"),
BroadcastType::Sender(addr), BroadcastType::Sender(addr),
) { ) {
log::error!("Failed to broadcast message: {}", e.to_string()); log::error!(
} else { "Failed to broadcast message: {}",
log::debug!("Successfully broadcasted message: {}", txid); e.to_string()
);
} }
} }
Err(e) => { Err(e) => {
if let Err(e) = broadcast_message( if let Err(e) = broadcast_message(
peers.clone(), peers.clone(),
Message::Text(e.to_string()), AnkFlag::Error,
e.to_string(),
BroadcastType::Sender(addr), BroadcastType::Sender(addr),
) { ) {
log::error!("Failed to broadcast message: {}", e); log::error!("Failed to broadcast message: {}", e);
@ -534,13 +551,15 @@ async fn handle_connection(
} }
} }
} else { } else {
// we just send it `as is` to everyone except sender log::error!("Invalid content for faucet message");
if let Err(e) =
broadcast_message(peers.clone(), msg, BroadcastType::ExcludeSender(addr))
{
log::error!("Failed to broadcast message: {}", e);
} }
} }
AnkFlag::NewTx => unimplemented!(),
AnkFlag::Error => unimplemented!(),
AnkFlag::Unknown => unimplemented!(),
},
Err(_) => log::error!("Failed to parse network message"),
}
} else { } else {
// we don't care // we don't care
log::debug!("Received non-text message {} from peer {}", msg, addr); log::debug!("Received non-text message {} from peer {}", msg, addr);
@ -565,26 +584,10 @@ async fn handle_connection(
peers.lock().unwrap().remove(&addr); peers.lock().unwrap().remove(&addr);
} }
fn flatten_msg(parts: &[Vec<u8>]) -> Vec<u8> { fn compute_partial_tweak_to_transaction(
let total_len = parts.iter().fold(0, |acc, v| acc + v.len()); tx: Transaction,
let mut final_vec = Vec::with_capacity(total_len);
for p in parts {
final_vec.extend(p);
}
final_vec
}
fn process_raw_tx_message(
core_msg: &bitcoincore_zmq::Message,
daemon: Arc<Mutex<Daemon>>, daemon: Arc<Mutex<Daemon>>,
) -> Result<Vec<u8>> { ) -> Result<PublicKey> {
let tx: Transaction = deserialize(&core_msg.serialize_data_to_vec())?;
if tx.is_coinbase() {
return Err(Error::msg("Can't process coinbase transaction"));
}
let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len()); let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len());
let mut pubkeys: Vec<PublicKey> = Vec::with_capacity(tx.input.len()); let mut pubkeys: Vec<PublicKey> = Vec::with_capacity(tx.input.len());
for input in tx.input { for input in tx.input {
@ -619,64 +622,73 @@ fn process_raw_tx_message(
} }
let input_pub_keys: Vec<&PublicKey> = pubkeys.iter().collect(); let input_pub_keys: Vec<&PublicKey> = pubkeys.iter().collect();
match calculate_tweak_data(&input_pub_keys, &outpoints) { let partial_tweak = calculate_tweak_data(&input_pub_keys, &outpoints)?;
Ok(partial_tweak) => { Ok(partial_tweak)
let mut vecs = core_msg.serialize_to_vecs().to_vec();
vecs.push(partial_tweak.serialize().to_vec());
Ok(flatten_msg(&vecs))
}
Err(e) => Err(Error::msg(format!(
"Failed to compute tweak data: {}",
e.to_string()
))),
}
} }
async fn handle_zmq( fn create_new_tx_message(transaction: Vec<u8>, daemon: Arc<Mutex<Daemon>>) -> Result<NewTxMessage> {
peers: PeerMap, let tx: Transaction = deserialize(&transaction)?;
shared_daemon: SharedDaemon,
sp_wallet: Arc<SilentPaymentWallet>, if tx.is_coinbase() {
) { return Err(Error::msg("Can't process coinbase transaction"));
tokio::task::spawn_blocking(move || { }
let partial_tweak = compute_partial_tweak_to_transaction(tx, daemon)?;
Ok(NewTxMessage::new(
transaction.to_lower_hex_string(),
Some(partial_tweak.to_string()),
))
}
async fn handle_zmq(peers: PeerMap, shared_daemon: SharedDaemon) {
debug!("Starting listening on Core"); debug!("Starting listening on Core");
for msg in bitcoincore_zmq::subscribe_receiver(&["tcp://127.0.0.1:29000"]).unwrap() { let mut socket = zeromq::SubSocket::new();
let core_msg = match msg { socket.connect("tcp://127.0.0.1:29100").await.unwrap();
Ok(core_msg) => core_msg, socket.subscribe("rawtx").await.unwrap();
// socket.subscribe("hashblock");
debug!("{:?}", socket.type_id());
loop {
let core_msg = match socket.recv().await {
Ok(m) => m,
Err(e) => { Err(e) => {
error!("Error receiving ZMQ message: {}", e); error!("Zmq error: {}", e);
continue; continue;
} }
}; };
debug!("Received a {} message", core_msg.topic_str()); debug!("Received a message");
let payload: Vec<u8> = match core_msg.topic_str() { let payload: String =
"rawtx" => { if let (Some(topic), Some(data)) = (core_msg.get(0), core_msg.get(1)) {
let processed = process_raw_tx_message(&core_msg, shared_daemon.clone()); match std::str::from_utf8(&topic) {
match processed { Ok("rawtx") => {
Ok(p) => p, match create_new_tx_message(data.to_vec(), shared_daemon.clone()) {
Err(_) => continue, Ok(m) => serde_json::to_string(&m).expect("This shouldn't fail"),
Err(e) => {
error!("{}", e);
continue;
} }
} }
"rawblock" => {
// scan the block for our outputs
if let Err(e) = scan_blocks(shared_daemon.clone(), sp_wallet.clone(), 1) {
log::error!("{}", e);
} }
Ok("hashblock") => todo!(),
flatten_msg(&core_msg.serialize_to_vecs()) _ => {
error!("Unexpected message in zmq");
continue;
} }
_ => flatten_msg(&core_msg.serialize_to_vecs()), }
} else {
error!("Empty message");
continue;
}; };
if let Err(e) = broadcast_message( if let Err(e) = broadcast_message(
peers.clone(), peers.clone(),
Message::Binary(payload), AnkFlag::NewTx,
payload,
BroadcastType::ToAll, BroadcastType::ToAll,
) { ) {
log::error!("{}", e.to_string()); log::error!("{}", e.to_string());
} }
} }
});
} }
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
@ -770,11 +782,7 @@ async fn main() -> Result<()> {
} }
// Subscribe to Bitcoin Core // Subscribe to Bitcoin Core
tokio::spawn(handle_zmq( tokio::spawn(handle_zmq(peers.clone(), shared_daemon.clone()));
peers.clone(),
shared_daemon.clone(),
sp_wallet.clone(),
));
// Create the event loop and TCP listener we'll accept connections on. // Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await; let try_socket = TcpListener::bind(&addr).await;