264 lines
9.1 KiB
Rust
264 lines
9.1 KiB
Rust
use async_std::fs::{create_dir_all, read_dir, read_to_string, remove_file, File};
|
|
use async_std::io::WriteExt;
|
|
use async_std::path::Path;
|
|
use async_std::stream::StreamExt;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
|
use tide::{log, Request, Response, StatusCode};
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct StorageService {
|
|
storage_dir: String,
|
|
}
|
|
|
|
impl StorageService {
|
|
pub fn new<S: Into<String>>(storage_dir: S) -> Self {
|
|
Self {
|
|
storage_dir: storage_dir.into(),
|
|
}
|
|
}
|
|
|
|
fn get_file_path(&self, key: &str) -> String {
|
|
let dir_name = format!("{}/{}", self.storage_dir, &key[..2]);
|
|
let file_path = format!("{}/{}", dir_name, &key[2..]);
|
|
file_path
|
|
}
|
|
|
|
pub async fn store_data(
|
|
&self,
|
|
key: &str,
|
|
value: &[u8],
|
|
expires_at: Option<SystemTime>,
|
|
) -> Result<(), tide::Error> {
|
|
let file_name = self.get_file_path(key);
|
|
let file_path = Path::new(&file_name);
|
|
|
|
if file_path.exists().await {
|
|
return Err(tide::Error::from_str(StatusCode::Conflict, "Key already exists"));
|
|
}
|
|
|
|
create_dir_all(file_path.parent().ok_or(tide::Error::from_str(
|
|
StatusCode::InternalServerError,
|
|
"File path doesn't have parent",
|
|
))?)
|
|
.await
|
|
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
|
|
|
|
let metadata_path = format!("{}.meta", file_name);
|
|
|
|
let mut file = File::create(&file_path)
|
|
.await
|
|
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
|
|
file.write_all(value)
|
|
.await
|
|
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
|
|
|
|
let metadata = Metadata {
|
|
expires_at: expires_at.map(system_time_to_unix),
|
|
};
|
|
|
|
let metadata_json = serde_json::to_string(&metadata)
|
|
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
|
|
let mut meta_file = File::create(&metadata_path)
|
|
.await
|
|
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
|
|
meta_file
|
|
.write_all(metadata_json.as_bytes())
|
|
.await
|
|
.map_err(|e| tide::Error::new(StatusCode::InternalServerError, e))?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn retrieve_data(&self, key: &str) -> Result<Vec<u8>, String> {
|
|
let file_path = format!("{}/{}/{}", self.storage_dir, &key[..2], &key[2..]);
|
|
|
|
let mut file = File::open(&file_path)
|
|
.await
|
|
.map_err(|_| "Key not found.".to_string())?;
|
|
let mut buffer = Vec::new();
|
|
async_std::io::ReadExt::read_to_end(&mut file, &mut buffer)
|
|
.await
|
|
.map_err(|e| e.to_string())?;
|
|
Ok(buffer)
|
|
}
|
|
|
|
pub async fn cleanup_expired_files_once(&self) -> Result<(), String> {
|
|
let mut entries = read_dir(&self.storage_dir)
|
|
.await
|
|
.map_err(|e| format!("Failed to read storage dir: {}", e))?;
|
|
let now = system_time_to_unix(SystemTime::now());
|
|
while let Some(entry) = entries.next().await {
|
|
let e = entry.map_err(|e| format!("entry returned error: {}", e))?;
|
|
let path = e.path();
|
|
if path.is_dir().await {
|
|
if let Ok(mut sub_entries) = read_dir(&path).await {
|
|
while let Some(sub_entry) = sub_entries.next().await {
|
|
if let Ok(sub_entry) = sub_entry {
|
|
let file_path = sub_entry.path();
|
|
if file_path.extension() == Some("meta".as_ref()) {
|
|
self.handle_file_cleanup(now, &file_path).await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_file_cleanup(&self, now: u64, meta_path: &Path) -> Result<(), String> {
|
|
let meta_content = read_to_string(meta_path)
|
|
.await
|
|
.map_err(|e| format!("Failed to read metadata: {}", e.to_string()))?;
|
|
let metadata: Metadata = serde_json::from_str(&meta_content)
|
|
.map_err(|e| format!("Failed to parse metadata: {}", e.to_string()))?;
|
|
|
|
if metadata.expires_at.is_some() && metadata.expires_at.unwrap() < now {
|
|
let data_file_path = meta_path.with_extension("");
|
|
remove_file(&data_file_path)
|
|
.await
|
|
.map_err(|e| format!("Failed to remove data file: {}", e.to_string()))?;
|
|
remove_file(meta_path)
|
|
.await
|
|
.map_err(|e| format!("Failed to remove metadata file: {}", e.to_string()))?;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Deserialize, Serialize)]
|
|
pub struct Metadata {
|
|
pub expires_at: Option<u64>,
|
|
}
|
|
|
|
pub fn system_time_to_unix(system_time: SystemTime) -> u64 {
|
|
system_time
|
|
.duration_since(UNIX_EPOCH)
|
|
.expect("SystemTime before UNIX_EPOCH!")
|
|
.as_secs()
|
|
}
|
|
|
|
pub fn unix_to_system_time(unix_timestamp: u64) -> SystemTime {
|
|
UNIX_EPOCH + Duration::from_secs(unix_timestamp)
|
|
}
|
|
|
|
#[derive(Deserialize, Debug)]
|
|
pub struct StoreRequest {
|
|
pub key: String,
|
|
pub value: String,
|
|
pub ttl: Option<u64>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct ApiResponse { pub message: String }
|
|
|
|
#[derive(Serialize)]
|
|
pub struct RetrieveResponse { pub key: String, pub value: String }
|
|
|
|
pub async fn handle_health(_req: Request<StorageService>) -> tide::Result<Response> {
|
|
Ok(Response::builder(StatusCode::Ok)
|
|
.body(serde_json::to_value(&ApiResponse { message: "ok".into() })?)
|
|
.build())
|
|
}
|
|
|
|
pub async fn handle_store(mut req: Request<StorageService>, no_ttl_permanent: bool) -> tide::Result<Response> {
|
|
// Extract key from URL parameter
|
|
let key: String = req.param("key")?.to_string();
|
|
|
|
// Validate key format
|
|
if key.len() != 64 || !key.chars().all(|c| c.is_ascii_hexdigit()) {
|
|
return Ok(Response::builder(StatusCode::BadRequest)
|
|
.body("Invalid key: must be a 32 bytes hex string.".to_string())
|
|
.build());
|
|
}
|
|
|
|
// Get TTL from query parameter (optional)
|
|
let ttl: Option<u64> = req.url().query_pairs()
|
|
.find(|(key, _)| key == "ttl")
|
|
.and_then(|(_, value)| value.parse().ok());
|
|
log::info!("ttl: {:?}", ttl);
|
|
|
|
let live_for: Option<Duration> = if let Some(ttl) = ttl {
|
|
if ttl < 60 {
|
|
return Ok(Response::builder(StatusCode::BadRequest)
|
|
.body(format!("Invalid ttl: must be at least {} seconds.", 60))
|
|
.build());
|
|
} else if ttl > 31_536_000 {
|
|
return Ok(Response::builder(StatusCode::BadRequest)
|
|
.body(format!("Invalid ttl: must be at most {} seconds.", 31_536_000))
|
|
.build());
|
|
}
|
|
Some(Duration::from_secs(ttl))
|
|
} else if no_ttl_permanent {
|
|
None
|
|
} else {
|
|
Some(Duration::from_secs(86_400))
|
|
};
|
|
|
|
let expires_at: Option<SystemTime> = match live_for {
|
|
Some(lf) => Some(
|
|
SystemTime::now()
|
|
.checked_add(lf)
|
|
.ok_or(tide::Error::from_str(StatusCode::BadRequest, "Invalid ttl"))?
|
|
),
|
|
None => None,
|
|
};
|
|
|
|
// Read binary data directly from request body
|
|
let value_bytes = match req.body_bytes().await {
|
|
Ok(bytes) => bytes,
|
|
Err(e) => {
|
|
return Ok(Response::builder(StatusCode::BadRequest)
|
|
.body(format!("Failed to read request body: {}", e))
|
|
.build());
|
|
}
|
|
};
|
|
|
|
log::info!("received {} bytes", value_bytes.len());
|
|
|
|
let svc = req.state();
|
|
match svc.store_data(&key, &value_bytes, expires_at).await {
|
|
Ok(()) => Ok(Response::builder(StatusCode::Ok)
|
|
.body(serde_json::to_value(&ApiResponse {
|
|
message: "Data stored successfully.".to_string(),
|
|
})?)
|
|
.build()),
|
|
Err(e) => Ok(Response::builder(e.status())
|
|
.body(serde_json::to_value(&ApiResponse {
|
|
message: e.to_string(),
|
|
})?)
|
|
.build()),
|
|
}
|
|
}
|
|
|
|
pub async fn handle_retrieve(req: Request<StorageService>) -> tide::Result<Response> {
|
|
let key: String = req.param("key")?.to_string();
|
|
|
|
if key.len() != 64 || !key.chars().all(|c| c.is_ascii_hexdigit()) {
|
|
return Ok(Response::builder(StatusCode::BadRequest)
|
|
.body("Invalid key: must be a 32 bytes hex string.".to_string())
|
|
.build());
|
|
}
|
|
|
|
let svc = req.state();
|
|
match svc.retrieve_data(&key).await {
|
|
Ok(value) => {
|
|
Ok(Response::builder(StatusCode::Ok)
|
|
.header("Content-Type", "application/octet-stream")
|
|
.body(value)
|
|
.build())
|
|
}
|
|
Err(e) => Ok(Response::builder(StatusCode::NotFound).body(e).build()),
|
|
}
|
|
}
|
|
|
|
pub fn create_app(no_ttl_permanent: bool, storage_dir: impl Into<String>) -> tide::Server<StorageService> {
|
|
let svc = StorageService::new(storage_dir);
|
|
let mut app = tide::with_state(svc);
|
|
app.at("/health").get(handle_health);
|
|
app.at("/store/:key").post(move |req| handle_store(req, no_ttl_permanent));
|
|
app.at("/retrieve/:key").get(handle_retrieve);
|
|
app
|
|
}
|