sdk_relay/src/message.rs

235 lines
7.9 KiB
Rust

use anyhow::{Error, Result};
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Mutex, OnceLock},
time::{Duration, Instant},
};
use tokio::time;
use tokio_tungstenite::tungstenite::Message;
use sdk_common::network::{AnkFlag, CommitMessage, Envelope, FaucetMessage, NewTxMessage};
use crate::{
commit::handle_commit_request, faucet::handle_faucet_request, handle_new_tx_request, PEERMAP,
};
pub(crate) static MESSAGECACHE: OnceLock<MessageCache> = OnceLock::new();
const MESSAGECACHEDURATION: Duration = Duration::from_secs(20);
const MESSAGECACHEINTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug)]
pub(crate) struct MessageCache {
store: Mutex<HashMap<String, Instant>>,
}
impl MessageCache {
pub fn new() -> Self {
Self {
store: Mutex::new(HashMap::new()),
}
}
fn insert(&self, key: String) {
let mut store = self.store.lock().unwrap();
store.insert(key, Instant::now());
}
fn remove(&self, key: &str) {
let mut store = self.store.lock().unwrap();
store.remove(key);
}
fn contains(&self, key: &str) -> bool {
let store = self.store.lock().unwrap();
store.contains_key(key)
}
pub async fn clean_up() {
let cache = MESSAGECACHE.get().unwrap();
let mut interval = time::interval(MESSAGECACHEINTERVAL);
loop {
interval.tick().await;
let mut store = cache.store.lock().unwrap();
let now = Instant::now();
store.retain(|_, entrytime| now.duration_since(*entrytime) <= MESSAGECACHEDURATION);
}
}
}
pub(crate) enum BroadcastType {
Sender(SocketAddr),
#[allow(dead_code)]
ExcludeSender(SocketAddr),
#[allow(dead_code)]
ToAll,
}
pub(crate) fn broadcast_message(
flag: AnkFlag,
payload: String,
broadcast: BroadcastType,
) -> Result<()> {
let peers = PEERMAP.get().ok_or(Error::msg("Unitialized peer map"))?;
let ank_msg = Envelope {
flag,
content: payload,
};
let msg = Message::Text(serde_json::to_string(&ank_msg)?);
match ank_msg.flag {
AnkFlag::Cipher => log::debug!("Broadcasting cipher"),
AnkFlag::Handshake => log::debug!("Broadcasting handshake"),
_ => log::debug!("Broadcasting {} message: {}", ank_msg.flag.as_str(), msg),
}
match broadcast {
BroadcastType::Sender(addr) => {
peers
.lock()
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
.iter()
.find(|(peer_addr, _)| peer_addr == &&addr)
.ok_or(Error::msg("Failed to find the sender in the peer_map"))?
.1
.send(msg)?;
}
BroadcastType::ExcludeSender(addr) => {
peers
.lock()
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
.iter()
.filter(|(peer_addr, _)| peer_addr != &&addr)
.for_each(|(_, peer_tx)| {
let _ = peer_tx.send(msg.clone());
});
}
BroadcastType::ToAll => {
peers
.lock()
.map_err(|e| Error::msg(format!("Failed to lock peers: {}", e.to_string())))?
.iter()
.for_each(|(_, peer_tx)| {
let _ = peer_tx.send(msg.clone());
});
}
}
Ok(())
}
fn process_faucet_message(ank_msg: Envelope, addr: SocketAddr) {
log::debug!("Received a faucet message");
if let Ok(mut content) = serde_json::from_str::<FaucetMessage>(&ank_msg.content) {
match handle_faucet_request(&content) {
Ok(new_tx_msg) => {
log::debug!(
"Obtained new_tx_msg: {}",
serde_json::to_string(&new_tx_msg).unwrap()
);
}
Err(e) => {
log::error!("Failed to send faucet tx: {}", e);
content.error = Some(e.into());
let payload = serde_json::to_string(&content).expect("Message type shouldn't fail");
if let Err(e) =
broadcast_message(AnkFlag::Faucet, payload, BroadcastType::Sender(addr))
{
log::error!("Failed to broadcast message: {}", e);
}
}
}
} else {
log::error!("Invalid content for faucet message");
}
}
fn process_new_tx_message(ank_msg: Envelope, addr: SocketAddr) {
log::debug!("Received a new tx message");
if let Ok(mut new_tx_msg) = serde_json::from_str::<NewTxMessage>(&ank_msg.content) {
if let Err(e) = handle_new_tx_request(&mut new_tx_msg) {
log::error!("handle_new_tx_request returned error: {}", e);
new_tx_msg.error = Some(e.into());
if let Err(e) = broadcast_message(
AnkFlag::NewTx,
serde_json::to_string(&new_tx_msg).expect("This shouldn't fail"),
BroadcastType::Sender(addr),
) {
log::error!("Failed to broadcast message: {}", e);
}
}
} else {
log::error!("Invalid content for new_tx message");
}
}
fn process_cipher_message(ank_msg: Envelope, addr: SocketAddr) {
// For now we just send it to everyone
log::debug!("Received a cipher message");
if let Err(e) = broadcast_message(
AnkFlag::Cipher,
ank_msg.content,
BroadcastType::ExcludeSender(addr),
) {
log::error!("Failed to send message with error: {}", e);
}
}
fn process_commit_message(ank_msg: Envelope, addr: SocketAddr) {
if let Ok(mut commit_msg) = serde_json::from_str::<CommitMessage>(&ank_msg.content) {
match handle_commit_request(commit_msg.clone()) {
Ok(new_outpoint) => log::debug!("Processed commit msg for outpoint {}", new_outpoint),
Err(e) => {
log::error!("handle_commit_request returned error: {}", e);
// Temporary fix: we remove the message from the cache in case the client wants to try again
let cache = MESSAGECACHE.get().expect("Cache should be initialized");
cache.remove(ank_msg.to_string().as_str());
commit_msg.error = Some(e.into());
if let Err(e) = broadcast_message(
AnkFlag::Commit,
serde_json::to_string(&commit_msg).expect("This shouldn't fail"),
BroadcastType::Sender(addr),
) {
log::error!("Failed to broadcast message: {}", e);
}
}
};
}
}
fn process_unknown_message(ank_msg: Envelope, addr: SocketAddr) {
log::debug!("Received an unknown message");
if let Err(e) = broadcast_message(
AnkFlag::Unknown,
ank_msg.content,
BroadcastType::ExcludeSender(addr),
) {
log::error!("Failed to send message with error: {}", e);
}
}
pub fn process_message(raw_msg: &str, addr: SocketAddr) {
let cache = MESSAGECACHE.get().expect("Cache should be initialized");
if cache.contains(raw_msg) {
log::debug!("Message already processed, dropping");
return;
} else {
cache.insert(raw_msg.to_owned());
}
match serde_json::from_str::<Envelope>(raw_msg) {
Ok(ank_msg) => match ank_msg.flag {
AnkFlag::Faucet => process_faucet_message(ank_msg, addr),
AnkFlag::NewTx => process_new_tx_message(ank_msg, addr),
AnkFlag::Cipher => process_cipher_message(ank_msg, addr),
AnkFlag::Commit => process_commit_message(ank_msg, addr),
AnkFlag::Unknown => process_unknown_message(ank_msg, addr),
AnkFlag::Sync => todo!(),
AnkFlag::Handshake => log::debug!("Received init message from {}", addr),
},
Err(_) => log::error!("Failed to parse network message"),
}
}