sdk_storage/src/main.rs
2024-12-18 10:50:19 +01:00

297 lines
9.5 KiB
Rust

use async_std::fs::{create_dir_all, read_dir, read_to_string, remove_file, File};
use sdk_common::sp_client::bitcoin::hex::{DisplayHex, FromHex};
use serde::{Deserialize, Serialize};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use async_std::io::WriteExt;
use async_std::path::Path;
use async_std::stream::{IntoStream, StreamExt};
use async_std::task;
use base64;
use sdk_common::log;
use serde_json::json;
use tide::{Request, Response, StatusCode};
const STORAGE_DIR: &str = "./storage";
const MIN_TTL: u64 = 60; // 1 minute
const DEFAULT_TTL: u64 = 86400; // 1 day
const MAX_TTL: u64 = 31_536_000; // 1 year, to be discussed
/// Scans storage and removes expired files
async fn cleanup_expired_files() {
loop {
log::debug!("Running cleanup routine...");
// Traverse storage directory
let mut entries = match read_dir(STORAGE_DIR).await {
Ok(entry) => entry,
Err(e) => {
log::error!("Failed to read storage dir: {}", e);
task::sleep(Duration::from_secs(60)).await;
continue;
}
};
let now = system_time_to_unix(SystemTime::now());
while let Some(entry) = entries.next().await {
let e = match entry {
Ok(e) => e,
Err(e) => {
log::error!("entry returned error: {}", e);
continue;
}
};
let path = e.path();
if path.is_dir().await {
if let Ok(mut sub_entries) = read_dir(&path).await {
while let Some(sub_entry) = sub_entries.next().await {
if let Ok(sub_entry) = sub_entry {
let file_path = sub_entry.path();
if file_path.extension() == Some("meta".as_ref()) {
if let Err(err) = handle_file_cleanup(now, &file_path).await {
log::error!("Error cleaning file {:?}: {}", file_path, err);
}
}
}
}
}
}
}
// Sleep for 1 minute before next cleanup
// task::sleep(Duration::from_secs(60)).await;
task::sleep(Duration::from_secs(10)).await;
}
}
#[derive(Debug, Deserialize, Serialize)]
struct Metadata {
expires_at: u64,
}
/// Converts a `SystemTime` to a UNIX timestamp (seconds since UNIX epoch).
fn system_time_to_unix(system_time: SystemTime) -> u64 {
system_time
.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX_EPOCH!")
.as_secs()
}
/// Converts a UNIX timestamp (seconds since UNIX epoch) back to `SystemTime`.
fn unix_to_system_time(unix_timestamp: u64) -> SystemTime {
UNIX_EPOCH + Duration::from_secs(unix_timestamp)
}
#[derive(Deserialize)]
struct StoreRequest {
key: String,
value: String,
ttl: Option<u64>,
}
#[derive(Serialize)]
struct ApiResponse {
message: String,
}
#[derive(Serialize)]
struct RetrieveResponse {
key: String,
value: String,
}
async fn get_file_path(key: &str) -> String {
let dir_name = format!("{}/{}", STORAGE_DIR, &key[..2]);
let file_path = format!("{}/{}", dir_name, &key[2..]);
file_path
}
/// Store data on the filesystem
async fn store_data(key: &str, value: &[u8], expires_at: SystemTime) -> Result<(), tide::Error> {
let file_name = get_file_path(key).await;
let file_path = Path::new(&file_name);
// Check if key exists
if file_path.exists().await {
return Err(tide::Error::from_str(
StatusCode::Conflict,
"Key already exists",
));
}
create_dir_all(file_path.parent().ok_or(tide::Error::from_str(
StatusCode::InternalServerError,
"File path doesn't have parent",
))?)
.await
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
let metadata_path = format!("{}.meta", file_name);
let mut file = File::create(&file_path)
.await
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
file.write_all(value)
.await
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
let metadata = Metadata {
expires_at: system_time_to_unix(expires_at),
};
let metadata_json = serde_json::to_string(&metadata)
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
let mut meta_file = File::create(&metadata_path)
.await
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
meta_file
.write_all(metadata_json.as_bytes())
.await
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
Ok(())
}
async fn retrieve_data(key: &str) -> Result<Vec<u8>, String> {
let file_path = format!("{}/{}/{}", STORAGE_DIR, &key[..2], &key[2..]);
let mut file = File::open(&file_path)
.await
.map_err(|_| "Key not found.".to_string())?;
let mut buffer = Vec::new();
async_std::io::ReadExt::read_to_end(&mut file, &mut buffer)
.await
.map_err(|e| e.to_string())?;
Ok(buffer)
}
/// Handler for the /store endpoint
async fn handle_store(mut req: Request<()>) -> tide::Result<Response> {
// Parse the JSON body
let data: StoreRequest = match req.body_json().await {
Ok(data) => data,
Err(e) => {
return Ok(Response::builder(StatusCode::BadRequest)
.body(format!("Invalid request: {}", e))
.build());
}
};
// Validate the key
if data.key.len() != 64 || !data.key.chars().all(|c| c.is_ascii_hexdigit()) {
return Ok(Response::builder(StatusCode::BadRequest)
.body("Invalid key: must be a 32 bytes hex string.".to_string())
.build());
}
// Validate the ttl
let live_for = if let Some(ttl) = data.ttl {
if ttl < MIN_TTL {
return Ok(Response::builder(StatusCode::BadRequest)
.body(format!(
"Invalid ttl: must be at least {} seconds.",
MIN_TTL
))
.build());
} else if ttl > MAX_TTL {
return Ok(Response::builder(StatusCode::BadRequest)
.body(format!("Invalid ttl: must be at most {} seconds.", MAX_TTL))
.build());
}
Duration::from_secs(ttl)
} else {
Duration::from_secs(DEFAULT_TTL)
};
let now = SystemTime::now();
let expires_at = now
.checked_add(live_for)
.ok_or(tide::Error::from_str(StatusCode::BadRequest, "Invalid ttl"))?;
// Decode the value from Base64
let value_bytes = match Vec::from_hex(&data.value) {
Ok(value) => value,
Err(e) => {
return Ok(Response::builder(StatusCode::BadRequest)
.body(format!("Invalid request: {}", e))
.build());
}
};
// Store the data
match store_data(&data.key, &value_bytes, expires_at).await {
Ok(()) => Ok(Response::builder(StatusCode::Ok)
.body(serde_json::to_value(&ApiResponse {
message: "Data stored successfully.".to_string(),
})?)
.build()),
Err(e) => Ok(Response::builder(e.status())
.body(serde_json::to_value(&ApiResponse {
message: e.to_string(),
})?)
.build()),
}
}
async fn handle_retrieve(req: Request<()>) -> tide::Result<Response> {
let key: String = req.param("key")?.to_string();
if key.len() != 64 || !key.chars().all(|c| c.is_ascii_hexdigit()) {
return Ok(Response::builder(StatusCode::BadRequest)
.body("Invalid key: must be a 32 bytes hex string.".to_string())
.build());
}
match retrieve_data(&key).await {
Ok(value) => {
let encoded_value = value.to_lower_hex_string();
Ok(Response::builder(StatusCode::Ok)
.body(serde_json::to_value(&RetrieveResponse {
key,
value: encoded_value,
})?)
.build())
}
Err(e) => Ok(Response::builder(StatusCode::NotFound).body(e).build()),
}
}
/// Checks a metadata file and deletes the associated data file if expired
async fn handle_file_cleanup(now: u64, meta_path: &Path) -> Result<(), String> {
let meta_content = read_to_string(meta_path)
.await
.map_err(|e| format!("Failed to read metadata: {}", e.to_string()))?;
let metadata: Metadata = serde_json::from_str(&meta_content)
.map_err(|e| format!("Failed to parse metadata: {}", e.to_string()))?;
if metadata.expires_at < now {
let data_file_path = meta_path.with_extension("");
remove_file(&data_file_path)
.await
.map_err(|e| format!("Failed to remove data file: {}", e.to_string()))?;
remove_file(meta_path)
.await
.map_err(|e| format!("Failed to remove metadata file: {}", e.to_string()))?;
log::debug!("Removed expired file: {:?}", data_file_path);
}
Ok(())
}
#[async_std::main]
async fn main() -> tide::Result<()> {
sdk_common::env_logger::init();
create_dir_all(STORAGE_DIR)
.await
.expect("Failed to create storage directory.");
task::spawn(cleanup_expired_files());
let mut app = tide::new();
app.at("/store").post(handle_store);
app.at("/retrieve/:key").get(handle_retrieve);
app.listen("0.0.0.0:8080").await?;
log::info!("Server running at http://0.0.0.0:8080");
Ok(())
}