Add a message cache
This commit is contained in:
parent
539670d248
commit
ad026a783e
@ -294,7 +294,10 @@ impl Daemon {
|
|||||||
Ok(blockchain_info.chain)
|
Ok(blockchain_info.chain)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn test_mempool_accept(&self, tx: &Transaction) -> Result<crate::bitcoin_json::TestMempoolAcceptResult> {
|
pub(crate) fn test_mempool_accept(
|
||||||
|
&self,
|
||||||
|
tx: &Transaction,
|
||||||
|
) -> Result<crate::bitcoin_json::TestMempoolAcceptResult> {
|
||||||
let res = self.rpc.test_mempool_accept(&vec![tx])?;
|
let res = self.rpc.test_mempool_accept(&vec![tx])?;
|
||||||
|
|
||||||
Ok(res.get(0).unwrap().clone())
|
Ok(res.get(0).unwrap().clone())
|
||||||
|
89
src/main.rs
89
src/main.rs
@ -4,7 +4,8 @@ use std::{
|
|||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
str::FromStr,
|
str::FromStr,
|
||||||
sync::{Arc, Mutex, MutexGuard},
|
sync::{Arc, Mutex, MutexGuard, OnceLock},
|
||||||
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
use bitcoincore_rpc::json::{self as bitcoin_json};
|
use bitcoincore_rpc::json::{self as bitcoin_json};
|
||||||
@ -29,7 +30,7 @@ use sp_client::silentpayments::sending::{generate_recipient_pubkeys, SilentPayme
|
|||||||
use sp_client::silentpayments::utils::receiving::{calculate_tweak_data, get_pubkey_from_input};
|
use sp_client::silentpayments::utils::receiving::{calculate_tweak_data, get_pubkey_from_input};
|
||||||
use sp_client::silentpayments::utils::sending::calculate_partial_secret;
|
use sp_client::silentpayments::utils::sending::calculate_partial_secret;
|
||||||
use sp_client::spclient::{derive_keys_from_seed, Recipient, SpClient, SpendKey};
|
use sp_client::spclient::{derive_keys_from_seed, Recipient, SpClient, SpendKey};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::{net::{TcpListener, TcpStream}, time};
|
||||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
||||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||||
use tokio_tungstenite::tungstenite::Message;
|
use tokio_tungstenite::tungstenite::Message;
|
||||||
@ -49,6 +50,66 @@ type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
|
|||||||
|
|
||||||
type SharedDaemon = Arc<Mutex<Daemon>>;
|
type SharedDaemon = Arc<Mutex<Daemon>>;
|
||||||
|
|
||||||
|
static MESSAGECACHE: OnceLock<MessageCache> = OnceLock::new();
|
||||||
|
|
||||||
|
const MESSAGECACHEDURATION: Duration = Duration::from_secs(10);
|
||||||
|
const MESSAGECACHEINTERVAL: Duration = Duration::from_secs(2);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct MessageCache {
|
||||||
|
store: Mutex<HashMap<String, Instant>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageCache {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
store: Mutex::new(HashMap::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&self, key: String) {
|
||||||
|
let mut store = self.store.lock().unwrap();
|
||||||
|
store.insert(key.clone(), Instant::now());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn contains(&self, key: &str) -> bool {
|
||||||
|
let store = self.store.lock().unwrap();
|
||||||
|
store.contains_key(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn clean_up() {
|
||||||
|
let cache = MESSAGECACHE.get().unwrap();
|
||||||
|
|
||||||
|
let mut interval = time::interval(MESSAGECACHEINTERVAL);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
|
let mut store = cache.store.lock().unwrap();
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
let to_rm: Vec<String> = store
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(entry, entrytime)| {
|
||||||
|
if let Some(duration) = now.checked_duration_since(*entrytime) {
|
||||||
|
if duration > MESSAGECACHEDURATION {
|
||||||
|
Some(entry.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for key in to_rm {
|
||||||
|
store.remove(&key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const FAUCET_AMT: Amount = Amount::from_sat(100_000);
|
const FAUCET_AMT: Amount = Amount::from_sat(100_000);
|
||||||
|
|
||||||
pub(crate) trait MutexExt<T> {
|
pub(crate) trait MutexExt<T> {
|
||||||
@ -433,12 +494,20 @@ async fn handle_connection(
|
|||||||
let peers = peers.clone();
|
let peers = peers.clone();
|
||||||
if let Ok(raw_msg) = msg.to_text() {
|
if let Ok(raw_msg) = msg.to_text() {
|
||||||
debug!("Received msg: {}", raw_msg);
|
debug!("Received msg: {}", raw_msg);
|
||||||
|
let cache = MESSAGECACHE.get().expect("Cache should be initialized");
|
||||||
|
if cache.contains(raw_msg) {
|
||||||
|
debug!("Message already processed, dropping");
|
||||||
|
return future::ok(());
|
||||||
|
} else {
|
||||||
|
cache.insert(raw_msg.to_owned());
|
||||||
|
}
|
||||||
let parsed = serde_json::from_str::<AnkNetworkMsg>(raw_msg);
|
let parsed = serde_json::from_str::<AnkNetworkMsg>(raw_msg);
|
||||||
match parsed {
|
match parsed {
|
||||||
Ok(ank_msg) => match ank_msg.flag {
|
Ok(ank_msg) => match ank_msg.flag {
|
||||||
AnkFlag::Faucet => {
|
AnkFlag::Faucet => {
|
||||||
debug!("Received a faucet message");
|
debug!("Received a faucet message");
|
||||||
if let Ok(mut content) = serde_json::from_str::<FaucetMessage>(&ank_msg.content)
|
if let Ok(mut content) =
|
||||||
|
serde_json::from_str::<FaucetMessage>(&ank_msg.content)
|
||||||
{
|
{
|
||||||
match handle_faucet_request(
|
match handle_faucet_request(
|
||||||
&content,
|
&content,
|
||||||
@ -454,7 +523,8 @@ async fn handle_connection(
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::error!("Failed to send faucet tx: {}", e);
|
log::error!("Failed to send faucet tx: {}", e);
|
||||||
content.error = Some(e.into());
|
content.error = Some(e.into());
|
||||||
let payload = serde_json::to_string(&content).expect("Message type shouldn't fail");
|
let payload = serde_json::to_string(&content)
|
||||||
|
.expect("Message type shouldn't fail");
|
||||||
if let Err(e) = broadcast_message(
|
if let Err(e) = broadcast_message(
|
||||||
peers.clone(),
|
peers.clone(),
|
||||||
AnkFlag::Faucet,
|
AnkFlag::Faucet,
|
||||||
@ -471,7 +541,9 @@ async fn handle_connection(
|
|||||||
}
|
}
|
||||||
AnkFlag::NewTx => {
|
AnkFlag::NewTx => {
|
||||||
debug!("Received a new tx message");
|
debug!("Received a new tx message");
|
||||||
if let Ok(mut new_tx_msg) = serde_json::from_str::<NewTxMessage>(&ank_msg.content) {
|
if let Ok(mut new_tx_msg) =
|
||||||
|
serde_json::from_str::<NewTxMessage>(&ank_msg.content)
|
||||||
|
{
|
||||||
match handle_new_tx_request(&mut new_tx_msg, shared_daemon.clone()) {
|
match handle_new_tx_request(&mut new_tx_msg, shared_daemon.clone()) {
|
||||||
Ok(new_tx_msg) => {
|
Ok(new_tx_msg) => {
|
||||||
// Repeat the msg to all except sender
|
// Repeat the msg to all except sender
|
||||||
@ -491,7 +563,8 @@ async fn handle_connection(
|
|||||||
if let Err(e) = broadcast_message(
|
if let Err(e) = broadcast_message(
|
||||||
peers.clone(),
|
peers.clone(),
|
||||||
AnkFlag::NewTx,
|
AnkFlag::NewTx,
|
||||||
serde_json::to_string(&new_tx_msg).expect("This shouldn't fail"),
|
serde_json::to_string(&new_tx_msg)
|
||||||
|
.expect("This shouldn't fail"),
|
||||||
BroadcastType::Sender(addr),
|
BroadcastType::Sender(addr),
|
||||||
) {
|
) {
|
||||||
log::error!("Failed to broadcast message: {}", e);
|
log::error!("Failed to broadcast message: {}", e);
|
||||||
@ -672,6 +745,10 @@ async fn main() -> Result<()> {
|
|||||||
.expect("Please provide either \"true\" or \"false\"");
|
.expect("Please provide either \"true\" or \"false\"");
|
||||||
let core_wallet: Option<String> = env::args().nth(4);
|
let core_wallet: Option<String> = env::args().nth(4);
|
||||||
|
|
||||||
|
MESSAGECACHE.set(MessageCache::new()).expect("Message Cache initialization failed");
|
||||||
|
|
||||||
|
tokio::spawn(clean_up());
|
||||||
|
|
||||||
let peers = PeerMap::new(Mutex::new(HashMap::new()));
|
let peers = PeerMap::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
// Connect the rpc daemon
|
// Connect the rpc daemon
|
||||||
|
Loading…
x
Reference in New Issue
Block a user