Add save processes and members to disk

This commit is contained in:
NicolasCantu 2025-01-09 16:11:14 +01:00 committed by Nicolas Cantu
parent 5391dca929
commit 21711edd39
2 changed files with 103 additions and 24 deletions

View File

@ -18,7 +18,7 @@ use sdk_common::process::{lock_processes, Process, ProcessState, CACHEDPROCESSES
use serde_json::json;
use serde_json::Value;
use crate::{lock_freezed_utxos, MutexExt, DAEMON, WALLET};
use crate::{lock_freezed_utxos, MutexExt, DAEMON, STORAGE, WALLET};
pub(crate) fn handle_commit_request(commit_msg: CommitMessage) -> Result<OutPoint> {
// Attempt to process the initial transaction or reference
@ -96,6 +96,9 @@ fn handle_initial_transaction(tx: Transaction, commit_msg: &CommitMessage) -> Re
new_process,
);
// Dump to disk
dump_cached_processes()?;
// Add to frozen UTXOs
lock_freezed_utxos()?.insert(root_commitment);
@ -146,10 +149,7 @@ fn handle_member_list(pcd_commitment: &Value, roles: &HashMap<String, RoleDefini
// Handle the case where `init_tx` is a reference to an existing outpoint
fn handle_existing_commitment(outpoint: OutPoint, commit_msg: CommitMessage) -> Result<OutPoint> {
let mut commitments = CACHEDPROCESSES
.get()
.ok_or(Error::msg("CACHEDPROCESSES not initialized"))?
.lock_anyhow()?;
let mut commitments = lock_processes()?;
let commitment = commitments
.get_mut(&outpoint)
.ok_or(Error::msg("Commitment not found"))?;
@ -161,6 +161,20 @@ fn handle_existing_commitment(outpoint: OutPoint, commit_msg: CommitMessage) ->
}
}
pub fn dump_cached_processes() -> Result<(), anyhow::Error> {
let processes = lock_processes()?.clone();
let storage = STORAGE.get().ok_or(Error::msg("STORAGE is not initialized"))?.lock_anyhow()?;
let processes_file = &storage.processes_file;
processes_file.save(&processes)?;
log::debug!("saved processes");
Ok(())
}
// Register a new state when validation tokens are empty
fn register_new_state(commitment: &mut Process, commit_msg: CommitMessage) -> Result<OutPoint> {
let concurrent_states = commitment.get_latest_concurrent_states()?;

View File

@ -11,11 +11,12 @@ use std::{
};
use bitcoincore_rpc::json::{self as bitcoin_json};
use commit::MEMBERLIST;
use futures_util::{future, pin_mut, stream::TryStreamExt, FutureExt, StreamExt};
use log::{debug, error, warn};
use message::{broadcast_message, process_message, BroadcastType, MessageCache, MESSAGECACHE};
use scan::{check_transaction_alone, compute_partial_tweak_to_transaction};
use sdk_common::{network::InitMessage, process::CACHEDPROCESSES, sp_client::{bitcoin::{
use sdk_common::{network::InitMessage, pcd::Member, process::{Process, CACHEDPROCESSES}, sp_client::{bitcoin::{
consensus::deserialize,
hex::{DisplayHex, FromHex},
Amount, Network, Transaction,
@ -31,6 +32,7 @@ use sdk_common::{
};
use sdk_common::sp_client::spclient::{derive_keys_from_seed, SpClient, SpendKey};
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
@ -75,11 +77,11 @@ pub struct PublicLists {
pub static LISTS: OnceLock<Mutex<PublicLists>> = OnceLock::new();
#[derive(Debug)]
struct WalletFile {
pub struct StateFile {
path: PathBuf,
}
impl WalletFile {
impl StateFile {
fn new(path: PathBuf) -> Self {
Self { path }
}
@ -95,46 +97,56 @@ impl WalletFile {
return Err(Error::msg("wallet file has no parent dir"));
}
// check that parent exists
// Ensure the parent directory exists
if !parent.exists() {
fs::create_dir_all(parent)?;
}
fs::File::create_new(&self.path)?;
// Create a new file
fs::File::create(&self.path)?;
Ok(())
}
fn save(&self, new_value: &SpWallet) -> Result<()> {
fn save<T: Serialize>(&self, data: &T) -> Result<()> {
let mut f = fs::File::options()
.write(true)
.truncate(true)
.open(&self.path)?;
let json = serde_json::to_string(new_value)?;
let json = serde_json::to_string(data)?;
f.write_all(json.as_bytes())?;
Ok(())
}
fn load(&self) -> Result<SpWallet> {
fn load<T: for<'de> Deserialize<'de>>(&self) -> Result<T> {
let mut f = fs::File::open(&self.path)?;
let mut content = vec![];
f.read_to_end(&mut content)?;
let res: SpWallet = serde_json::from_slice(&content)?;
let res: T = serde_json::from_slice(&content)?;
Ok(res)
}
}
#[derive(Debug)]
pub struct DiskStorage {
// wallet_file: StateFile,
pub processes_file: StateFile,
pub members_file: StateFile,
}
pub static STORAGE: OnceLock<Mutex<DiskStorage>> = OnceLock::new();
const FAUCET_AMT: Amount = Amount::from_sat(100_000);
#[derive(Debug)]
struct SilentPaymentWallet {
sp_wallet: Mutex<SpWallet>,
storage: Mutex<WalletFile>,
storage: Mutex<StateFile>,
}
impl SilentPaymentWallet {
@ -143,7 +155,7 @@ impl SilentPaymentWallet {
}
pub fn save(&self) -> Result<()> {
let wallet = self.sp_wallet.lock_anyhow()?;
let wallet = self.sp_wallet.lock_anyhow()?.clone();
self.storage.lock_anyhow()?.save(&wallet)
}
}
@ -329,10 +341,6 @@ async fn main() -> Result<()> {
warn!("Running on mainnet, you're on your own");
}
CACHEDPROCESSES
.set(Mutex::new(HashMap::new()))
.expect("CachedProcesses initialization failed");
MESSAGECACHE
.set(MessageCache::new())
.expect("Message Cache initialization failed");
@ -361,19 +369,34 @@ async fn main() -> Result<()> {
app_dir.push(".4nk");
let mut wallet_file = app_dir.clone();
wallet_file.push(&config.wallet_name);
let mut processes_file = app_dir.clone();
processes_file.push("processes");
let mut members_file = app_dir.clone();
members_file.push("members");
let wallet_file = WalletFile::new(wallet_file);
let wallet_file = StateFile::new(wallet_file);
let processes_file = StateFile::new(processes_file);
let members_file = StateFile::new(members_file);
// load an existing sp_wallet, or create a new one
let sp_wallet = match wallet_file.load() {
Ok(wallet) => {
// TODO: Verify the wallet is compatible with the current network
wallet
}
Err(_) => {
// Create a new wallet file if it doesn't exist or fails to load
wallet_file.create()?;
// Generate a seed and derive keys
let mut seed = [0u8; 64];
thread_rng().fill(&mut seed);
let (scan_sk, spend_sk) = derive_keys_from_seed(&seed, config.network)
.expect("Couldn't generate a new sp_wallet");
let new_client = SpClient::new(
config.wallet_name,
config.wallet_name.clone(),
scan_sk,
SpendKey::Secret(spend_sk),
None,
@ -381,16 +404,41 @@ async fn main() -> Result<()> {
)
.expect("Failed to create a new SpClient");
// Initialize a new wallet with the generated client
let mut wallet = SpWallet::new(new_client, None, vec![])?;
// set birthday to avoid unnecessary scanning
// Set birthday and update scan information
let outputs = wallet.get_mut_outputs();
outputs.set_birthday(current_tip);
outputs.update_last_scan(current_tip);
// Save the newly created wallet to disk
wallet_file.save(&wallet)?;
wallet
}
Ok(wallet) => wallet, // TODO check network
};
let cached_processes: HashMap<OutPoint, Process> = match processes_file.load() {
Ok(processes) => {
processes
}
Err(_) => {
debug!("creating process file at {}", processes_file.path.display());
processes_file.create()?;
HashMap::new()
}
};
let members: HashMap<Member, OutPoint> = match members_file.load() {
Ok(members) => members,
Err(_) => {
debug!("creating members file at {}", members_file.path.display());
members_file.create()?;
HashMap::new()
}
};
let our_sp_address = sp_wallet.get_client().get_receiving_address();
@ -418,6 +466,23 @@ async fn main() -> Result<()> {
WALLET.get().unwrap().save().unwrap();
CACHEDPROCESSES
.set(Mutex::new(cached_processes))
.expect("Failed to initialize CACHEDPROCESSES");
MEMBERLIST
.set(Mutex::new(members))
.expect("Failed to initialize MEMBERLIST");
let storage = DiskStorage {
processes_file,
members_file,
};
STORAGE
.set(Mutex::new(storage))
.unwrap();
if last_scan < current_tip {
log::info!("Scanning for our outputs");
scan_blocks(current_tip - last_scan, &config.electrum_url)?;