mirror of
https://git.deuxfleurs.fr/Deuxfleurs/garage.git
synced 2024-11-25 17:41:01 +00:00
Merge pull request 'Improved bootstraping procedure' (#56) from better_bootstrap into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/56
This commit is contained in:
commit
8225fa2e4b
12 changed files with 258 additions and 100 deletions
|
@ -36,7 +36,9 @@ steps:
|
|||
commands:
|
||||
- apt-get update
|
||||
- apt-get install --yes libsodium-dev
|
||||
- rustup component add rustfmt
|
||||
- pwd
|
||||
- cargo fmt -- --check
|
||||
- cargo build
|
||||
|
||||
- name: cargo-test
|
||||
|
@ -107,6 +109,8 @@ steps:
|
|||
endpoint: https://garage.deuxfleurs.fr
|
||||
region: garage
|
||||
when:
|
||||
event:
|
||||
- push
|
||||
branch:
|
||||
- main
|
||||
repo:
|
||||
|
@ -114,6 +118,6 @@ steps:
|
|||
|
||||
---
|
||||
kind: signature
|
||||
hmac: 2ecc0db1a006186c6e3b9bcb458ba40b3e8bf88493bfae40fde2fe4136d0db69
|
||||
hmac: bfe75f47e5eecdd1f6dd8fd3cf1ea359b0215243d06ac767c51a4b4e363e963e
|
||||
|
||||
...
|
||||
|
|
2
Makefile
2
Makefile
|
@ -2,7 +2,7 @@ BIN=target/release/garage
|
|||
DOCKER=lxpz/garage_amd64
|
||||
|
||||
all:
|
||||
clear; RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --no-default-features
|
||||
clear; cargo build
|
||||
|
||||
$(BIN):
|
||||
RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --release --no-default-features
|
||||
|
|
|
@ -246,15 +246,13 @@ impl AdminRpcHandler {
|
|||
)))
|
||||
}
|
||||
KeyOperation::Import(query) => {
|
||||
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id)
|
||||
.await?;
|
||||
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
|
||||
if prev_key.is_some() {
|
||||
return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
|
||||
}
|
||||
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
|
||||
self.garage.key_table.insert(&imported_key).await?;
|
||||
Ok(AdminRPC::KeyInfo(imported_key))
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,8 +5,8 @@ use std::path::PathBuf;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use structopt::StructOpt;
|
||||
|
||||
use garage_util::error::Error;
|
||||
use garage_util::data::UUID;
|
||||
use garage_util::error::Error;
|
||||
use garage_util::time::*;
|
||||
|
||||
use garage_rpc::membership::*;
|
||||
|
@ -384,7 +384,10 @@ pub async fn cmd_status(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn find_matching_node(cand: impl std::iter::Iterator<Item=UUID>, pattern: &str) -> Result<UUID, Error> {
|
||||
pub fn find_matching_node(
|
||||
cand: impl std::iter::Iterator<Item = UUID>,
|
||||
pattern: &str,
|
||||
) -> Result<UUID, Error> {
|
||||
let mut candidates = vec![];
|
||||
for c in cand {
|
||||
if hex::encode(&c).starts_with(&pattern) {
|
||||
|
@ -428,7 +431,10 @@ pub async fn cmd_configure(
|
|||
for replaced in args.replace.iter() {
|
||||
let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
|
||||
if config.members.remove(&replaced_node).is_none() {
|
||||
return Err(Error::Message(format!("Cannot replace node {:?} as it is not in current configuration", replaced_node)));
|
||||
return Err(Error::Message(format!(
|
||||
"Cannot replace node {:?} as it is not in current configuration",
|
||||
replaced_node
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
|||
info!("Initializing Garage main data store...");
|
||||
let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
|
||||
let bootstrap = garage.system.clone().bootstrap(
|
||||
&config.bootstrap_peers[..],
|
||||
config.bootstrap_peers,
|
||||
config.consul_host,
|
||||
config.consul_service_name,
|
||||
);
|
||||
|
|
|
@ -147,7 +147,9 @@ impl Entry<String, String> for Object {
|
|||
&self.key
|
||||
}
|
||||
fn is_tombstone(&self) -> bool {
|
||||
self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
|
||||
self.versions.len() == 1
|
||||
&& self.versions[0].state
|
||||
== ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,13 +11,13 @@ use futures::future::join_all;
|
|||
use futures::select;
|
||||
use futures_util::future::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::sync::watch;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use garage_util::background::BackgroundRunner;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
use garage_util::persister::Persister;
|
||||
use garage_util::time::*;
|
||||
|
||||
use crate::consul::get_consul_nodes;
|
||||
|
@ -26,7 +26,7 @@ use crate::rpc_client::*;
|
|||
use crate::rpc_server::*;
|
||||
|
||||
const PING_INTERVAL: Duration = Duration::from_secs(10);
|
||||
const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
|
||||
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
|
||||
const PING_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
|
||||
|
||||
|
@ -69,7 +69,8 @@ pub struct AdvertisedNode {
|
|||
pub struct System {
|
||||
pub id: UUID,
|
||||
|
||||
metadata_dir: PathBuf,
|
||||
persist_config: Persister<NetworkConfig>,
|
||||
persist_status: Persister<Vec<AdvertisedNode>>,
|
||||
rpc_local_port: u16,
|
||||
|
||||
state_info: StateInfo,
|
||||
|
@ -80,11 +81,16 @@ pub struct System {
|
|||
pub(crate) status: watch::Receiver<Arc<Status>>,
|
||||
pub ring: watch::Receiver<Arc<Ring>>,
|
||||
|
||||
update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
|
||||
update_lock: Mutex<Updaters>,
|
||||
|
||||
pub background: Arc<BackgroundRunner>,
|
||||
}
|
||||
|
||||
struct Updaters {
|
||||
update_status: watch::Sender<Arc<Status>>,
|
||||
update_ring: watch::Sender<Arc<Ring>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Status {
|
||||
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
|
||||
|
@ -144,6 +150,25 @@ impl Status {
|
|||
debug!("END --");
|
||||
self.hash = blake2sum(nodes_txt.as_bytes());
|
||||
}
|
||||
|
||||
fn to_serializable_membership(&self, system: &System) -> Vec<AdvertisedNode> {
|
||||
let mut mem = vec![];
|
||||
for (node, status) in self.nodes.iter() {
|
||||
let state_info = if *node == system.id {
|
||||
system.state_info.clone()
|
||||
} else {
|
||||
status.state_info.clone()
|
||||
};
|
||||
mem.push(AdvertisedNode {
|
||||
id: *node,
|
||||
addr: status.addr,
|
||||
is_up: status.is_up(),
|
||||
last_seen: status.last_seen,
|
||||
state_info,
|
||||
});
|
||||
}
|
||||
mem
|
||||
}
|
||||
}
|
||||
|
||||
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
||||
|
@ -169,23 +194,6 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
|||
}
|
||||
}
|
||||
|
||||
fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
|
||||
let mut path = metadata_dir.clone();
|
||||
path.push("network_config");
|
||||
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(path.as_path())?;
|
||||
|
||||
let mut net_config_bytes = vec![];
|
||||
file.read_to_end(&mut net_config_bytes)?;
|
||||
|
||||
let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
|
||||
.expect("Unable to parse network configuration file (has version format changed?).");
|
||||
|
||||
Ok(net_config)
|
||||
}
|
||||
|
||||
impl System {
|
||||
pub fn new(
|
||||
metadata_dir: PathBuf,
|
||||
|
@ -196,7 +204,10 @@ impl System {
|
|||
let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
|
||||
info!("Node ID: {}", hex::encode(&id));
|
||||
|
||||
let net_config = match read_network_config(&metadata_dir) {
|
||||
let persist_config = Persister::new(&metadata_dir, "network_config");
|
||||
let persist_status = Persister::new(&metadata_dir, "peer_info");
|
||||
|
||||
let net_config = match persist_config.load() {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
info!(
|
||||
|
@ -206,6 +217,7 @@ impl System {
|
|||
NetworkConfig::new()
|
||||
}
|
||||
};
|
||||
|
||||
let mut status = Status {
|
||||
nodes: HashMap::new(),
|
||||
hash: Hash::default(),
|
||||
|
@ -231,14 +243,18 @@ impl System {
|
|||
|
||||
let sys = Arc::new(System {
|
||||
id,
|
||||
metadata_dir,
|
||||
persist_config,
|
||||
persist_status,
|
||||
rpc_local_port: rpc_server.bind_addr.port(),
|
||||
state_info,
|
||||
rpc_http_client,
|
||||
rpc_client,
|
||||
status,
|
||||
ring,
|
||||
update_lock: Mutex::new((update_status, update_ring)),
|
||||
update_lock: Mutex::new(Updaters {
|
||||
update_status,
|
||||
update_ring,
|
||||
}),
|
||||
background,
|
||||
});
|
||||
sys.clone().register_handler(rpc_server, rpc_path);
|
||||
|
@ -272,14 +288,11 @@ impl System {
|
|||
}
|
||||
|
||||
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
|
||||
let mut path = self.metadata_dir.clone();
|
||||
path.push("network_config");
|
||||
|
||||
let ring = self.ring.borrow().clone();
|
||||
let data = rmp_to_vec_all_named(&ring.config)?;
|
||||
|
||||
let mut f = tokio::fs::File::create(path.as_path()).await?;
|
||||
f.write_all(&data[..]).await?;
|
||||
self.persist_config
|
||||
.save_async(&ring.config)
|
||||
.await
|
||||
.expect("Cannot save current cluster configuration");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -308,26 +321,21 @@ impl System {
|
|||
|
||||
pub async fn bootstrap(
|
||||
self: Arc<Self>,
|
||||
peers: &[SocketAddr],
|
||||
peers: Vec<SocketAddr>,
|
||||
consul_host: Option<String>,
|
||||
consul_service_name: Option<String>,
|
||||
) {
|
||||
let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
|
||||
self.clone().ping_nodes(bootstrap_peers).await;
|
||||
let self2 = self.clone();
|
||||
self.background
|
||||
.spawn_worker(format!("discovery loop"), |stop_signal| {
|
||||
self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal)
|
||||
});
|
||||
|
||||
let self2 = self.clone();
|
||||
self.background
|
||||
.spawn_worker(format!("ping loop"), |stop_signal| {
|
||||
self2.ping_loop(stop_signal)
|
||||
});
|
||||
|
||||
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
|
||||
let self2 = self.clone();
|
||||
self.background
|
||||
.spawn_worker(format!("Consul loop"), |stop_signal| {
|
||||
self2.consul_loop(stop_signal, consul_host, consul_service_name)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
|
||||
|
@ -394,9 +402,7 @@ impl System {
|
|||
if has_changes {
|
||||
status.recalculate_hash();
|
||||
}
|
||||
if let Err(e) = update_locked.0.send(Arc::new(status)) {
|
||||
error!("In ping_nodes: could not save status update ({})", e);
|
||||
}
|
||||
self.update_status(&update_locked, status).await;
|
||||
drop(update_locked);
|
||||
|
||||
if to_advertise.len() > 0 {
|
||||
|
@ -420,7 +426,7 @@ impl System {
|
|||
let status_hash = status.hash;
|
||||
let config_version = self.ring.borrow().config.version;
|
||||
|
||||
update_locked.0.send(Arc::new(status))?;
|
||||
self.update_status(&update_locked, status).await;
|
||||
drop(update_locked);
|
||||
|
||||
if is_new || status_hash != ping.status_hash {
|
||||
|
@ -436,23 +442,9 @@ impl System {
|
|||
}
|
||||
|
||||
fn handle_pull_status(&self) -> Result<Message, Error> {
|
||||
let status = self.status.borrow().clone();
|
||||
let mut mem = vec![];
|
||||
for (node, status) in status.nodes.iter() {
|
||||
let state_info = if *node == self.id {
|
||||
self.state_info.clone()
|
||||
} else {
|
||||
status.state_info.clone()
|
||||
};
|
||||
mem.push(AdvertisedNode {
|
||||
id: *node,
|
||||
addr: status.addr,
|
||||
is_up: status.is_up(),
|
||||
last_seen: status.last_seen,
|
||||
state_info,
|
||||
});
|
||||
}
|
||||
Ok(Message::AdvertiseNodesUp(mem))
|
||||
Ok(Message::AdvertiseNodesUp(
|
||||
self.status.borrow().to_serializable_membership(self),
|
||||
))
|
||||
}
|
||||
|
||||
fn handle_pull_config(&self) -> Result<Message, Error> {
|
||||
|
@ -502,7 +494,7 @@ impl System {
|
|||
if has_changed {
|
||||
status.recalculate_hash();
|
||||
}
|
||||
update_lock.0.send(Arc::new(status))?;
|
||||
self.update_status(&update_lock, status).await;
|
||||
drop(update_lock);
|
||||
|
||||
if to_ping.len() > 0 {
|
||||
|
@ -522,7 +514,7 @@ impl System {
|
|||
|
||||
if adv.version > ring.config.version {
|
||||
let ring = Ring::new(adv.clone());
|
||||
update_lock.1.send(Arc::new(ring))?;
|
||||
update_lock.update_ring.send(Arc::new(ring))?;
|
||||
drop(update_lock);
|
||||
|
||||
self.background.spawn_cancellable(
|
||||
|
@ -537,7 +529,7 @@ impl System {
|
|||
}
|
||||
|
||||
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
|
||||
loop {
|
||||
while !*stop_signal.borrow() {
|
||||
let restart_at = tokio::time::sleep(PING_INTERVAL);
|
||||
|
||||
let status = self.status.borrow().clone();
|
||||
|
@ -552,34 +544,64 @@ impl System {
|
|||
|
||||
select! {
|
||||
_ = restart_at.fuse() => (),
|
||||
_ = stop_signal.changed().fuse() => {
|
||||
if *stop_signal.borrow() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
_ = stop_signal.changed().fuse() => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn consul_loop(
|
||||
async fn discovery_loop(
|
||||
self: Arc<Self>,
|
||||
bootstrap_peers: Vec<SocketAddr>,
|
||||
consul_host: Option<String>,
|
||||
consul_service_name: Option<String>,
|
||||
mut stop_signal: watch::Receiver<bool>,
|
||||
consul_host: String,
|
||||
consul_service_name: String,
|
||||
) {
|
||||
while !*stop_signal.borrow() {
|
||||
let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
|
||||
let consul_config = match (consul_host, consul_service_name) {
|
||||
(Some(ch), Some(csn)) => Some((ch, csn)),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
match get_consul_nodes(&consul_host, &consul_service_name).await {
|
||||
Ok(mut node_list) => {
|
||||
let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
|
||||
self.clone().ping_nodes(ping_addrs).await;
|
||||
while !*stop_signal.borrow() {
|
||||
let not_configured = self.ring.borrow().config.members.len() == 0;
|
||||
let no_peers = self.status.borrow().nodes.len() < 3;
|
||||
let bad_peers = self
|
||||
.status
|
||||
.borrow()
|
||||
.nodes
|
||||
.iter()
|
||||
.filter(|(_, v)| v.is_up())
|
||||
.count() != self.ring.borrow().config.members.len();
|
||||
|
||||
if not_configured || no_peers || bad_peers {
|
||||
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
|
||||
|
||||
let mut ping_list = bootstrap_peers
|
||||
.iter()
|
||||
.map(|ip| (*ip, None))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
match self.persist_status.load_async().await {
|
||||
Ok(peers) => {
|
||||
ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Could not retrieve node list from Consul: {}", e);
|
||||
|
||||
if let Some((consul_host, consul_service_name)) = &consul_config {
|
||||
match get_consul_nodes(consul_host, consul_service_name).await {
|
||||
Ok(node_list) => {
|
||||
ping_list.extend(node_list.iter().map(|a| (*a, None)));
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Could not retrieve node list from Consul: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.clone().ping_nodes(ping_list).await;
|
||||
}
|
||||
|
||||
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
|
||||
select! {
|
||||
_ = restart_at.fuse() => (),
|
||||
_ = stop_signal.changed().fuse() => (),
|
||||
|
@ -611,4 +633,35 @@ impl System {
|
|||
let _: Result<_, _> = self.handle_advertise_config(&config).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) {
|
||||
if status.hash != self.status.borrow().hash {
|
||||
let mut list = status.to_serializable_membership(&self);
|
||||
|
||||
// Combine with old peer list to make sure no peer is lost
|
||||
match self.persist_status.load_async().await {
|
||||
Ok(old_list) => {
|
||||
for pp in old_list {
|
||||
if !list.iter().any(|np| pp.id == np.id) {
|
||||
list.push(pp);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
if list.len() > 0 {
|
||||
info!("Persisting new peer list ({} peers)", list.len());
|
||||
self.persist_status
|
||||
.save_async(&list)
|
||||
.await
|
||||
.expect("Unable to persist peer list");
|
||||
}
|
||||
}
|
||||
|
||||
updaters
|
||||
.update_status
|
||||
.send(Arc::new(status))
|
||||
.expect("Could not update internal membership status");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,13 @@ where
|
|||
F: TableSchema,
|
||||
R: TableReplication,
|
||||
{
|
||||
pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
|
||||
pub fn new(
|
||||
system: Arc<System>,
|
||||
name: String,
|
||||
instance: F,
|
||||
replication: R,
|
||||
db: &sled::Db,
|
||||
) -> Arc<Self> {
|
||||
let store = db
|
||||
.open_tree(&format!("{}:table", name))
|
||||
.expect("Unable to open DB tree");
|
||||
|
|
|
@ -157,7 +157,12 @@ where
|
|||
if errs.is_empty() {
|
||||
Ok(true)
|
||||
} else {
|
||||
Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::<Vec<_>>().join(", ")))
|
||||
Err(Error::Message(
|
||||
errs.into_iter()
|
||||
.map(|x| format!("{}", x))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", "),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -200,12 +200,13 @@ where
|
|||
let subnode = self.read_node_txn(tx, &key_sub)?;
|
||||
match subnode {
|
||||
MerkleNode::Empty => {
|
||||
warn!("({}) Single subnode in tree is empty Merkle node", self.data.name);
|
||||
warn!(
|
||||
"({}) Single subnode in tree is empty Merkle node",
|
||||
self.data.name
|
||||
);
|
||||
Some(MerkleNode::Empty)
|
||||
}
|
||||
MerkleNode::Intermediate(_) => {
|
||||
Some(MerkleNode::Intermediate(children))
|
||||
}
|
||||
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
|
||||
x @ MerkleNode::Leaf(_, _) => {
|
||||
tx.remove(key_sub.encode())?;
|
||||
Some(x)
|
||||
|
@ -239,14 +240,24 @@ where
|
|||
|
||||
{
|
||||
let exlf_subkey = key.next_key(&exlf_khash);
|
||||
let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap();
|
||||
let exlf_sub_hash = self
|
||||
.update_item_rec(
|
||||
tx,
|
||||
&exlf_k[..],
|
||||
&exlf_khash,
|
||||
&exlf_subkey,
|
||||
Some(exlf_vhash),
|
||||
)?
|
||||
.unwrap();
|
||||
intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash);
|
||||
assert_eq!(int.len(), 1);
|
||||
}
|
||||
|
||||
{
|
||||
let key2 = key.next_key(khash);
|
||||
let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap();
|
||||
let subhash = self
|
||||
.update_item_rec(tx, k, khash, &key2, new_vhash)?
|
||||
.unwrap();
|
||||
intermediate_set_child(&mut int, key2.prefix[i], subhash);
|
||||
if exlf_khash.as_slice()[i] == khash.as_slice()[i] {
|
||||
assert_eq!(int.len(), 1);
|
||||
|
|
|
@ -5,4 +5,5 @@ pub mod background;
|
|||
pub mod config;
|
||||
pub mod data;
|
||||
pub mod error;
|
||||
pub mod persister;
|
||||
pub mod time;
|
||||
|
|
72
src/util/persister.rs
Normal file
72
src/util/persister.rs
Normal file
|
@ -0,0 +1,72 @@
|
|||
use std::io::{Read, Write};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::data::*;
|
||||
use crate::error::Error;
|
||||
|
||||
pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
|
||||
path: PathBuf,
|
||||
|
||||
_marker: std::marker::PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T> Persister<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de>,
|
||||
{
|
||||
pub fn new(base_dir: &PathBuf, file_name: &str) -> Self {
|
||||
let mut path = base_dir.clone();
|
||||
path.push(file_name);
|
||||
Self {
|
||||
path,
|
||||
_marker: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load(&self) -> Result<T, Error> {
|
||||
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
|
||||
|
||||
let mut bytes = vec![];
|
||||
file.read_to_end(&mut bytes)?;
|
||||
|
||||
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
pub fn save(&self, t: &T) -> Result<(), Error> {
|
||||
let bytes = rmp_to_vec_all_named(t)?;
|
||||
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.open(&self.path)?;
|
||||
|
||||
file.write_all(&bytes[..])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn load_async(&self) -> Result<T, Error> {
|
||||
let mut file = tokio::fs::File::open(&self.path).await?;
|
||||
|
||||
let mut bytes = vec![];
|
||||
file.read_to_end(&mut bytes).await?;
|
||||
|
||||
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
|
||||
let bytes = rmp_to_vec_all_named(t)?;
|
||||
|
||||
let mut file = tokio::fs::File::create(&self.path).await?;
|
||||
file.write_all(&bytes[..]).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue