base_server

This commit is contained in:
Sosthene00 2024-02-21 17:11:13 +01:00
parent 09b45d1488
commit eb6699baca
3 changed files with 1323 additions and 0 deletions

1170
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

15
Cargo.toml Normal file
View File

@ -0,0 +1,15 @@
[package]
name = "sdk_relay"
version = "0.1.0"
edition = "2021"
[dependencies]
bitcoincore-zmq = "1.4.0"
env_logger = "0.9"
futures-util = { version = "0.3.28", default-features = false, features = ["sink", "std"] }
log = "0.4.20"
serde = { version = "1.0.193", features = ["derive"]}
serde_json = "1.0"
tokio = { version = "1.0.0", features = ["io-util", "rt-multi-thread", "macros", "sync"] }
tokio-stream = "0.1"
tokio-tungstenite = "0.21.0"

138
src/main.rs Normal file
View File

@ -0,0 +1,138 @@
use std::{
collections::HashMap,
env,
io::Error as IoError,
net::SocketAddr,
sync::{Arc, Mutex},
};
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
use log::{debug, error};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tungstenite::tungstenite::Message;
type Tx = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
// #[derive(Deserialize, Serialize, Debug)]
// struct AnkMessage {
// dest: SilentPaymentId,
// payload: String
// }
// impl TryFrom<Message> for AnkMessage {
// type Error = serde_json::Error;
// fn try_from(value: Message) -> Result<Self, Self::Error> {
// match value {
// Message::Text(s) => serde_json::from_str(&s),
// _ => Err(serde_json::Error::custom("Unsupported message type")),
// }
// }
// }
async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr) {
debug!("Incoming TCP connection from: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
debug!("WebSocket connection established");
// Insert the write part of this peer to the peer map.
let (tx, rx) = unbounded_channel();
peer_map.lock().unwrap().insert(addr, tx);
let (outgoing, incoming) = ws_stream.split();
let broadcast_incoming = incoming.try_for_each({
let peer_map = peer_map.clone();
move |msg| {
let peers = peer_map.lock().unwrap();
// Broadcast message to other peers
peers
.iter()
.filter(|(peer_addr, _)| peer_addr != &&addr)
.for_each(|(_, peer_tx)| {
let _ = peer_tx.send(msg.clone());
});
future::ok(())
}
});
let receive_from_others = UnboundedReceiverStream::new(rx)
.map(Ok)
.forward(outgoing)
.map(|result| {
if let Err(e) = result {
debug!("Error sending message: {}", e);
}
});
pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;
debug!("{} disconnected", &addr);
peer_map.lock().unwrap().remove(&addr);
}
async fn handle_zmq(peer_map: PeerMap) {
tokio::task::spawn_blocking(move || {
debug!("Starting listening on Core");
for msg in bitcoincore_zmq::subscribe_receiver(&["tcp://127.0.0.1:29000"]).unwrap() {
match msg {
Ok(core_msg) => {
debug!("Received a {} message", core_msg.topic_str());
let peers = peer_map.lock().unwrap();
let vectors = core_msg.serialize_to_vecs();
let mut payload: Vec<u8> = Vec::with_capacity(vectors[0].len() + vectors[1].len() + vectors[2].len());
for v in vectors {
payload.extend(v);
}
for tx in peers.values() {
let _ = tx.send(Message::Binary(payload.clone()));
}
}
Err(e) => {
error!("Error receiving ZMQ message: {}", e);
continue;
}
}
}
});
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), IoError> {
env_logger::init();
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
let state = PeerMap::new(Mutex::new(HashMap::new()));
// Subscribe to Bitcoin Core
tokio::spawn(handle_zmq(state.clone()));
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect("Failed to bind");
debug!("Listening on: {}", addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(state.clone(), stream, addr));
}
Ok(())
}