sdk_relay/src/main.rs
2024-03-15 12:39:04 +01:00

389 lines
16 KiB
Rust

use std::{
collections::HashMap, env, io::Error as IoError, net::SocketAddr, sync::{Arc, Mutex}
};
use bitcoin::{absolute::LockTime, consensus::deserialize, key::TapTweak, secp256k1::PublicKey, transaction::Version, Amount, OutPoint, ScriptBuf, Transaction, TxIn, TxOut, Txid, XOnlyPublicKey};
use bitcoin::secp256k1::{Message as Secp256k1Message, ThirtyTwoByteHash};
use bitcoincore_rpc::json as bitcoin_json;
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
use log::{debug, error};
use silentpayments::sending::SilentPaymentAddress;
use silentpayments::secp256k1::rand::{thread_rng, Rng};
use spclient::Recipient;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tungstenite::tungstenite::Message;
use anyhow::{Result, Error};
mod sp;
mod daemon;
mod spclient;
mod constants;
mod db;
mod electrumclient;
use crate::daemon::Daemon;
use crate::sp::VinData;
use crate::spclient::{SpClient, SpendKey, OutputSpendStatus};
type Tx = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
const FAUCET_AMT: Amount = Amount::from_sat(1000);
fn spend_from_core(dest: XOnlyPublicKey, daemon: Arc<Mutex<Daemon>>) -> Result<Transaction> {
let core = daemon.lock().map_err(|e| Error::msg(format!("Failed to lock daemon: {}", e.to_string())))?;
let unspent_list: Vec<bitcoin_json::ListUnspentResultEntry> = core.list_unspent_from_to(Some(101), None)?; // we're (probably) spending coinbase, so let's be extra cautious and not spend before 101 confirmations
// just take the first of the list
if let Some(unspent) = unspent_list.get(0) {
let network = core.get_network()?;
let spk = ScriptBuf::new_p2tr_tweaked(dest.dangerous_assume_tweaked());
let new_psbt = core.create_psbt(unspent.clone(), spk, network)?;
let processed_psbt = core.process_psbt(new_psbt)?;
let tx = core.finalize_psbt(processed_psbt)?;
let final_tx = deserialize::<bitcoin::Transaction>(&tx)?;
let _ = core.broadcast(&final_tx)?;
Ok(final_tx)
} else {
// we have 0 spendable outputs
Err(Error::msg("No spendable outputs"))
}
}
fn faucet_send(sp_address: SilentPaymentAddress, sp_client: Arc<Mutex<SpClient>>, daemon: Arc<Mutex<Daemon>>) -> Result<Txid> {
let wallet = sp_client.lock().map_err(|e| Error::msg(format!("{}", e.to_string())))?;
let final_tx: Transaction;
if let Some(utxo) = wallet.list_outpoints()
.into_iter()
// do we have a sp output available ?
.find(|o| o.spend_status == OutputSpendStatus::Unspent) {
// create a new transaction with an available output
let recipient = Recipient {
address: sp_address.into(),
amount: utxo.amount,
nb_outputs: 1
};
let mut new_psbt = wallet.create_new_psbt(vec![utxo], vec![recipient], None)?;
SpClient::set_fees(&mut new_psbt, 1, sp_address.into())?;
wallet.fill_sp_outputs(&mut new_psbt)?;
let mut signed = wallet.sign_psbt(new_psbt)?;
SpClient::finalize_psbt(&mut signed)?;
final_tx = signed.extract_tx()?;
} else {
drop(wallet); // we don't want to keep locking it
// let's try to spend directly from the mining address
let secp = bitcoin::secp256k1::Secp256k1::signing_only();
let keypair = bitcoin::secp256k1::Keypair::new(&secp, &mut thread_rng());
// we first spend from core to the pubkey we just created
let first_tx = spend_from_core(keypair.x_only_public_key().0, daemon.clone())?;
// check that the first output of the transaction pays to the key we just created
assert!(first_tx.output[0].script_pubkey == ScriptBuf::new_p2tr_tweaked(keypair.x_only_public_key().0.dangerous_assume_tweaked()));
// create a new transaction that spends the newly created UTXO to the sp_address
let mut faucet_tx = Transaction {
input: vec![
TxIn {
previous_output: OutPoint::new(first_tx.txid(), 0),
..Default::default()
}
],
output: vec![],
version: Version::TWO,
lock_time: LockTime::ZERO
};
// now do the silent payment operations with the final recipient address
let a_sum = SpClient::get_a_sum_secret_keys(&vec![keypair.secret_key()]);
let prev_outpoint = faucet_tx.input[0].previous_output;
let outpoints_hash = silentpayments::utils::hash_outpoints(&vec![(prev_outpoint.txid.to_string(), prev_outpoint.vout)], keypair.public_key())?;
let partial_secret = silentpayments::utils::sending::sender_calculate_partial_secret(a_sum, outpoints_hash)?;
let ext_output_key = silentpayments::sending::generate_recipient_pubkey(sp_address.into(), partial_secret)?;
let change_sp_address = sp_client.lock()
.map_err(|e| Error::msg(format!("Failed to lock sp_client: {}", e.to_string())))?
.get_receiving_address();
let change_output_key = silentpayments::sending::generate_recipient_pubkey(change_sp_address, partial_secret)?;
let ext_spk = ScriptBuf::new_p2tr_tweaked(ext_output_key.dangerous_assume_tweaked());
let change_spk = ScriptBuf::new_p2tr_tweaked(change_output_key.dangerous_assume_tweaked());
faucet_tx.output.push(TxOut {
value: FAUCET_AMT,
script_pubkey: ext_spk
});
faucet_tx.output.push(TxOut {
value: first_tx.output[0].value - FAUCET_AMT,
script_pubkey: change_spk
});
let first_tx_outputs = vec![first_tx.output[0].clone()];
let prevouts = bitcoin::sighash::Prevouts::All(&first_tx_outputs);
let hash_ty = bitcoin::TapSighashType::Default;
let mut cache = bitcoin::sighash::SighashCache::new(&faucet_tx);
let sighash = cache.taproot_key_spend_signature_hash(0, &prevouts, hash_ty)?;
let msg = Secp256k1Message::from_digest(sighash.into_32());
let sig = secp.sign_schnorr_with_rng(&msg, &keypair, &mut thread_rng());
let final_sig = bitcoin::taproot::Signature{ sig, hash_ty };
faucet_tx.input[0].witness.push(final_sig.to_vec());
final_tx = faucet_tx;
}
daemon.lock()
.map_err(|e| Error::msg(format!("{}", e.to_string())))?
.broadcast(&final_tx)?;
Ok(final_tx.txid())
}
async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr, sp_client: Arc<Mutex<SpClient>>, daemon: Arc<Mutex<Daemon>>) {
debug!("Incoming TCP connection from: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
debug!("WebSocket connection established");
// Insert the write part of this peer to the peer map.
let (tx, rx) = unbounded_channel();
peer_map.lock().unwrap().insert(addr, tx);
let (outgoing, incoming) = ws_stream.split();
let broadcast_incoming = incoming.try_for_each({
let peer_map = peer_map.clone();
move |msg| {
match msg.is_text() {
true => {
let msg_str = msg.to_string();
match msg_str.starts_with("faucet") {
true => {
match SilentPaymentAddress::try_from(&msg_str["faucet".len()..]) {
Ok(sp_address) => {
// send bootstrap coins to this sp_address
match faucet_send(sp_address, sp_client.clone(), daemon.clone()) {
Ok(txid) => {
log::info!("New faucet payment: {}", txid);
},
Err(e) => {
log::error!("faucet failed with error {}", e);
let peers = peer_map.lock().unwrap();
let (_, peer_tx) = peers
.iter()
.find(|(peer_addr, _)| peer_addr == &&addr)
.unwrap();
let _ = peer_tx.send(Message::Text(format!("RELAY_ERROR: {}", e)));
}
}
},
Err(_) => {
log::error!("faucet message with unparsable sp_address received from {}", addr);
}
}
},
false => {
let peers = peer_map.lock().unwrap();
// Broadcast message to other peers
peers
.iter()
.filter(|(peer_addr, _)| peer_addr != &&addr)
.for_each(|(_, peer_tx)| {
let _ = peer_tx.send(msg.clone());
});
}
}
},
false => {
// we don't care
log::debug!("Received non-text message from peer {}", addr);
}
}
future::ok(())
}
});
let receive_from_others = UnboundedReceiverStream::new(rx)
.map(Ok)
.forward(outgoing)
.map(|result| {
if let Err(e) = result {
debug!("Error sending message: {}", e);
}
});
pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;
debug!("{} disconnected", &addr);
peer_map.lock().unwrap().remove(&addr);
}
fn flatten_msg(parts: &[Vec<u8>]) -> Vec<u8> {
let total_len = parts.iter().fold(0, |acc, v| acc + v.len());
let mut final_vec = Vec::with_capacity(total_len);
for p in parts {
final_vec.extend(p);
}
final_vec
}
fn process_raw_tx_message(core_msg: &bitcoincore_zmq::Message, daemon: Arc<Mutex<Daemon>>) -> Result<Vec<u8>> {
let tx: bitcoin::Transaction = deserialize(&core_msg.serialize_data_to_vec())?;
if tx.is_coinbase() {
return Err(Error::msg("Can't process coinbase transaction"));
}
let mut outpoints: Vec<(String, u32)> = Vec::with_capacity(tx.input.len());
let mut pubkeys: Vec<PublicKey> = Vec::with_capacity(tx.input.len());
for input in tx.input {
outpoints.push((input.previous_output.txid.to_string(), input.previous_output.vout));
let prev_tx = daemon.lock()
.map_err(|e| Error::msg(format!("Failed to lock the daemon: {}", e)))?
.get_transaction(&input.previous_output.txid, None)
.map_err(|_| Error::msg("Failed to find previous transaction"))?;
if let Some(output) = prev_tx.output.get(input.previous_output.vout as usize) {
let vin_data = VinData {
script_sig: input.script_sig.to_bytes().to_vec(),
txinwitness: input.witness.to_vec(),
script_pub_key: output.script_pubkey.to_bytes()
};
match sp::get_pubkey_from_input(&vin_data) {
Ok(Some(pubkey)) => pubkeys.push(pubkey),
Ok(None) => continue,
Err(e) => return Err(Error::msg(format!("Can't extract pubkey from input: {}", e))),
}
} else {
return Err(Error::msg("Transaction with a non-existing input"));
}
}
let input_pub_keys: Vec<&PublicKey> = pubkeys.iter().collect();
match silentpayments::utils::receiving::recipient_calculate_tweak_data(&input_pub_keys, &outpoints) {
Ok(partial_tweak) => {
let mut vecs = core_msg.serialize_to_vecs().to_vec();
vecs.push(partial_tweak.serialize().to_vec());
Ok(flatten_msg(&vecs))
},
Err(e) => Err(Error::msg(format!("Failed to compute tweak data: {}", e.to_string())))
}
}
async fn handle_zmq(peer_map: PeerMap, daemon: Arc<Mutex<Daemon>>) {
tokio::task::spawn_blocking(move || {
debug!("Starting listening on Core");
for msg in bitcoincore_zmq::subscribe_receiver(&["tcp://127.0.0.1:29000"]).unwrap() {
let core_msg = match msg {
Ok(core_msg) => core_msg,
Err(e) => {
error!("Error receiving ZMQ message: {}", e);
continue;
}
};
debug!("Received a {} message", core_msg.topic_str());
let peers = peer_map.lock().unwrap();
let payload: Vec<u8> = match core_msg.topic_str() {
"rawtx" => {
let processed = process_raw_tx_message(&core_msg, daemon.clone());
match processed {
Ok(p) => p,
Err(_) => continue
}
},
_ => {
flatten_msg(&core_msg.serialize_to_vecs())
}
};
for tx in peers.values() {
let _ = tx.send(Message::Binary(payload.clone()));
}
}
});
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), IoError> {
env_logger::init();
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
let wallet_name = env::args()
.nth(2)
.unwrap_or_else(|| "default".to_owned());
let is_testnet: bool = env::args()
.nth(3)
.unwrap_or_else(|| "true".to_owned())
.parse()
.expect("Please provide either \"true\" or \"false\"");
let state = PeerMap::new(Mutex::new(HashMap::new()));
// Connect the rpc daemon
let daemon = Daemon::connect().unwrap();
let current_tip: u32 = daemon.get_current_height().expect("Failed to make rpc call").try_into().expect("block count is higher than u32::MAX");
// load an existing sp_wallet, or create a new one
let sp_client = match spclient::SpClient::try_init_from_disk(wallet_name.clone()) {
Ok(existing) => existing,
Err(_) => {
let mut seed = [0u8;64];
thread_rng().fill(&mut seed);
let (scan_sk, spend_sk) = spclient::derive_keys_from_seed(&seed, is_testnet).expect("Couldn't generate a new sp_wallet");
SpClient::new(
wallet_name,
scan_sk,
SpendKey::Secret(spend_sk),
None,
current_tip,
is_testnet
).expect("Failed to create a new SpClient")
}
};
log::info!("Using wallet {} with address {}", sp_client.label, sp_client.get_receiving_address());
let shared_sp_client = Arc::new(Mutex::new(sp_client));
let shared_daemon = Arc::new(Mutex::new(daemon));
// Subscribe to Bitcoin Core
tokio::spawn(handle_zmq(state.clone(), shared_daemon.clone()));
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect("Failed to bind");
debug!("Listening on: {}", addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(state.clone(), stream, addr, shared_sp_client.clone(), shared_daemon.clone()));
}
Ok(())
}