mirror of
https://git.deuxfleurs.fr/Deuxfleurs/garage.git
synced 2024-11-25 01:21:01 +00:00
More aggressive sync timings & improve other stuff
This commit is contained in:
parent
e325c7f47a
commit
ea75564851
13 changed files with 113 additions and 44 deletions
|
@ -18,7 +18,7 @@ pub enum AdminRPC {
|
||||||
BucketOperation(BucketOperation),
|
BucketOperation(BucketOperation),
|
||||||
|
|
||||||
// Replies
|
// Replies
|
||||||
Ok,
|
Ok(String),
|
||||||
BucketList(Vec<String>),
|
BucketList(Vec<String>),
|
||||||
BucketInfo(Bucket),
|
BucketInfo(Bucket),
|
||||||
}
|
}
|
||||||
|
@ -86,13 +86,13 @@ impl AdminRpcHandler {
|
||||||
self.garage
|
self.garage
|
||||||
.bucket_table
|
.bucket_table
|
||||||
.insert(&Bucket {
|
.insert(&Bucket {
|
||||||
name: query.name,
|
name: query.name.clone(),
|
||||||
timestamp: new_time,
|
timestamp: new_time,
|
||||||
deleted: false,
|
deleted: false,
|
||||||
authorized_keys: vec![],
|
authorized_keys: vec![],
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
Ok(AdminRPC::Ok)
|
Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name)))
|
||||||
}
|
}
|
||||||
BucketOperation::Delete(query) => {
|
BucketOperation::Delete(query) => {
|
||||||
let bucket = match self
|
let bucket = match self
|
||||||
|
@ -129,13 +129,13 @@ impl AdminRpcHandler {
|
||||||
self.garage
|
self.garage
|
||||||
.bucket_table
|
.bucket_table
|
||||||
.insert(&Bucket {
|
.insert(&Bucket {
|
||||||
name: query.name,
|
name: query.name.clone(),
|
||||||
timestamp: std::cmp::max(bucket.timestamp + 1, now_msec()),
|
timestamp: std::cmp::max(bucket.timestamp + 1, now_msec()),
|
||||||
deleted: true,
|
deleted: true,
|
||||||
authorized_keys: vec![],
|
authorized_keys: vec![],
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
Ok(AdminRPC::Ok)
|
Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name)))
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
// TODO
|
// TODO
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::net::{Ipv6Addr, SocketAddr};
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
|
@ -26,7 +26,7 @@ pub async fn run_api_server(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let addr = (Ipv6Addr::LOCALHOST, garage.system.config.api_port).into();
|
let addr = &garage.system.config.api_bind_addr;
|
||||||
|
|
||||||
let service = make_service_fn(|conn: &AddrStream| {
|
let service = make_service_fn(|conn: &AddrStream| {
|
||||||
let garage = garage.clone();
|
let garage = garage.clone();
|
||||||
|
|
|
@ -78,6 +78,8 @@ impl BackgroundRunner {
|
||||||
workers.push(tokio::spawn(async move {
|
workers.push(tokio::spawn(async move {
|
||||||
if let Err(e) = worker(stop_signal).await {
|
if let Err(e) = worker(stop_signal).await {
|
||||||
eprintln!("Worker stopped with error: {}", e);
|
eprintln!("Worker stopped with error: {}", e);
|
||||||
|
} else {
|
||||||
|
println!("A worker exited successfully (which one?)");
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
|
@ -193,7 +193,7 @@ impl BlockManager {
|
||||||
let old_rc = self.rc.get(&hash)?;
|
let old_rc = self.rc.get(&hash)?;
|
||||||
self.rc.merge(&hash, vec![1])?;
|
self.rc.merge(&hash, vec![1])?;
|
||||||
if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
|
if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
|
||||||
self.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT.as_millis() as u64)?;
|
self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -201,7 +201,7 @@ impl BlockManager {
|
||||||
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
|
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
|
||||||
let new_rc = self.rc.merge(&hash, vec![0])?;
|
let new_rc = self.rc.merge(&hash, vec![0])?;
|
||||||
if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
|
if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
|
||||||
self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?;
|
self.put_to_resync(&hash, 0)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,3 +83,9 @@ impl<T> From<tokio::sync::watch::error::SendError<T>> for Error {
|
||||||
Error::Message(format!("Watch send error"))
|
Error::Message(format!("Watch send error"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
|
||||||
|
fn from(_e: tokio::sync::mpsc::error::SendError<T>) -> Error {
|
||||||
|
Error::Message(format!("MPSC send error"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
#![recursion_limit = "1024"]
|
||||||
|
|
||||||
mod data;
|
mod data;
|
||||||
mod error;
|
mod error;
|
||||||
|
|
||||||
|
@ -387,8 +389,8 @@ async fn cmd_admin(
|
||||||
args: AdminRPC,
|
args: AdminRPC,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
match rpc_cli.call(&rpc_host, args, DEFAULT_TIMEOUT).await? {
|
match rpc_cli.call(&rpc_host, args, DEFAULT_TIMEOUT).await? {
|
||||||
AdminRPC::Ok => {
|
AdminRPC::Ok(msg) => {
|
||||||
println!("Ok.");
|
println!("{}", msg);
|
||||||
}
|
}
|
||||||
AdminRPC::BucketList(bl) => {
|
AdminRPC::BucketList(bl) => {
|
||||||
println!("List of buckets:");
|
println!("List of buckets:");
|
||||||
|
|
|
@ -362,7 +362,7 @@ impl System {
|
||||||
let ring = self.ring.borrow().clone();
|
let ring = self.ring.borrow().clone();
|
||||||
Message::Ping(PingMessage {
|
Message::Ping(PingMessage {
|
||||||
id: self.id.clone(),
|
id: self.id.clone(),
|
||||||
rpc_port: self.config.rpc_port,
|
rpc_port: self.config.rpc_bind_addr.port(),
|
||||||
status_hash: status.hash.clone(),
|
status_hash: status.hash.clone(),
|
||||||
config_version: ring.config.version,
|
config_version: ring.config.version,
|
||||||
state_info: self.state_info.clone(),
|
state_info: self.state_info.clone(),
|
||||||
|
@ -539,7 +539,7 @@ impl System {
|
||||||
for node in adv.iter() {
|
for node in adv.iter() {
|
||||||
if node.id == self.id {
|
if node.id == self.id {
|
||||||
// learn our own ip address
|
// learn our own ip address
|
||||||
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
|
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port());
|
||||||
let old_self = status.nodes.insert(
|
let old_self = status.nodes.insert(
|
||||||
node.id.clone(),
|
node.id.clone(),
|
||||||
StatusEntry {
|
StatusEntry {
|
||||||
|
|
|
@ -145,10 +145,7 @@ impl RpcServer {
|
||||||
match socket {
|
match socket {
|
||||||
Ok(stream) => match tls_acceptor.clone().accept(stream).await {
|
Ok(stream) => match tls_acceptor.clone().accept(stream).await {
|
||||||
Ok(x) => Some(Ok::<_, hyper::Error>(x)),
|
Ok(x) => Some(Ok::<_, hyper::Error>(x)),
|
||||||
Err(e) => {
|
Err(_e) => None,
|
||||||
eprintln!("RPC server TLS error: {}", e);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
Err(_) => None,
|
Err(_) => None,
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
pub use futures_util::future::FutureExt;
|
use futures_util::future::*;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
@ -30,8 +30,8 @@ pub struct Config {
|
||||||
pub metadata_dir: PathBuf,
|
pub metadata_dir: PathBuf,
|
||||||
pub data_dir: PathBuf,
|
pub data_dir: PathBuf,
|
||||||
|
|
||||||
pub api_port: u16,
|
pub api_bind_addr: SocketAddr,
|
||||||
pub rpc_port: u16,
|
pub rpc_bind_addr: SocketAddr,
|
||||||
|
|
||||||
pub bootstrap_peers: Vec<SocketAddr>,
|
pub bootstrap_peers: Vec<SocketAddr>,
|
||||||
|
|
||||||
|
@ -252,8 +252,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
||||||
let db = sled::open(db_path).expect("Unable to open DB");
|
let db = sled::open(db_path).expect("Unable to open DB");
|
||||||
|
|
||||||
println!("Initialize RPC server...");
|
println!("Initialize RPC server...");
|
||||||
let rpc_bind_addr = ([0, 0, 0, 0, 0, 0, 0, 0], config.rpc_port).into();
|
let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone());
|
||||||
let mut rpc_server = RpcServer::new(rpc_bind_addr, config.rpc_tls.clone());
|
|
||||||
|
|
||||||
println!("Initializing background runner...");
|
println!("Initializing background runner...");
|
||||||
let (send_cancel, watch_cancel) = watch::channel(false);
|
let (send_cancel, watch_cancel) = watch::channel(false);
|
||||||
|
@ -266,11 +265,26 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
||||||
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
|
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
|
||||||
|
|
||||||
futures::try_join!(
|
futures::try_join!(
|
||||||
garage.system.clone().bootstrap().map(Ok),
|
garage.system.clone().bootstrap().map(|rv| {
|
||||||
run_rpc_server,
|
println!("Bootstrap done");
|
||||||
api_server,
|
Ok(rv)
|
||||||
background.run().map(Ok),
|
}),
|
||||||
|
run_rpc_server.map(|rv| {
|
||||||
|
println!("RPC server exited");
|
||||||
|
rv
|
||||||
|
}),
|
||||||
|
api_server.map(|rv| {
|
||||||
|
println!("API server exited");
|
||||||
|
rv
|
||||||
|
}),
|
||||||
|
background.run().map(|rv| {
|
||||||
|
println!("Background runner exited");
|
||||||
|
Ok(rv)
|
||||||
|
}),
|
||||||
shutdown_signal(send_cancel),
|
shutdown_signal(send_cancel),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
println!("Cleaning up...");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
12
src/table.rs
12
src/table.rs
|
@ -436,6 +436,7 @@ where
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
mut entries: Vec<Arc<ByteBuf>>,
|
mut entries: Vec<Arc<ByteBuf>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
let syncer = self.syncer.load_full().unwrap();
|
||||||
let mut epidemic_propagate = vec![];
|
let mut epidemic_propagate = vec![];
|
||||||
|
|
||||||
for update_bytes in entries.drain(..) {
|
for update_bytes in entries.drain(..) {
|
||||||
|
@ -469,9 +470,9 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
self.instance.updated(old_entry, Some(new_entry)).await;
|
self.instance.updated(old_entry, Some(new_entry)).await;
|
||||||
|
self.system
|
||||||
let syncer = self.syncer.load_full().unwrap();
|
.background
|
||||||
self.system.background.spawn(syncer.invalidate(tree_key));
|
.spawn(syncer.clone().invalidate(tree_key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -486,6 +487,8 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
|
pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
|
||||||
|
let syncer = self.syncer.load_full().unwrap();
|
||||||
|
|
||||||
eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
|
eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
while let Some((key, _value)) = self.store.get_lt(end.as_slice())? {
|
while let Some((key, _value)) = self.store.get_lt(end.as_slice())? {
|
||||||
|
@ -495,6 +498,9 @@ where
|
||||||
if let Some(old_val) = self.store.remove(&key)? {
|
if let Some(old_val) = self.store.remove(&key)? {
|
||||||
let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?;
|
let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?;
|
||||||
self.instance.updated(Some(old_entry), None).await;
|
self.instance.updated(Some(old_entry), None).await;
|
||||||
|
self.system
|
||||||
|
.background
|
||||||
|
.spawn(syncer.clone().invalidate(key.to_vec()));
|
||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,8 @@ use futures_util::future::*;
|
||||||
use futures_util::stream::*;
|
use futures_util::stream::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_bytes::ByteBuf;
|
use serde_bytes::ByteBuf;
|
||||||
use tokio::sync::watch;
|
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
use tokio::sync::{mpsc, watch};
|
||||||
|
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
@ -18,9 +18,8 @@ use crate::membership::Ring;
|
||||||
use crate::table::*;
|
use crate::table::*;
|
||||||
|
|
||||||
const MAX_DEPTH: usize = 16;
|
const MAX_DEPTH: usize = 16;
|
||||||
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
|
const SCAN_INTERVAL: Duration = Duration::from_secs(60);
|
||||||
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
|
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
|
||||||
|
|
||||||
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10);
|
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
|
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
|
||||||
|
@ -91,18 +90,24 @@ where
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let s1 = syncer.clone();
|
let s1 = syncer.clone();
|
||||||
table
|
table
|
||||||
.system
|
.system
|
||||||
.background
|
.background
|
||||||
.spawn_worker(move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit))
|
.spawn_worker(move |must_exit: watch::Receiver<bool>| {
|
||||||
|
s1.watcher_task(must_exit, busy_rx)
|
||||||
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let s2 = syncer.clone();
|
let s2 = syncer.clone();
|
||||||
table
|
table
|
||||||
.system
|
.system
|
||||||
.background
|
.background
|
||||||
.spawn_worker(move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit))
|
.spawn_worker(move |must_exit: watch::Receiver<bool>| {
|
||||||
|
s2.syncer_task(must_exit, busy_tx)
|
||||||
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
syncer
|
syncer
|
||||||
|
@ -111,25 +116,20 @@ where
|
||||||
async fn watcher_task(
|
async fn watcher_task(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut must_exit: watch::Receiver<bool>,
|
mut must_exit: watch::Receiver<bool>,
|
||||||
|
mut busy_rx: mpsc::UnboundedReceiver<bool>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
tokio::time::delay_for(Duration::from_secs(10)).await;
|
|
||||||
|
|
||||||
self.todo.lock().await.add_full_scan(&self.table);
|
|
||||||
let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
|
|
||||||
let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
|
let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
|
||||||
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
|
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
|
||||||
|
let mut nothing_to_do_since = Some(Instant::now());
|
||||||
|
|
||||||
while !*must_exit.borrow() {
|
while !*must_exit.borrow() {
|
||||||
let s_ring_recv = ring_recv.recv().fuse();
|
let s_ring_recv = ring_recv.recv().fuse();
|
||||||
|
let s_busy = busy_rx.recv().fuse();
|
||||||
let s_must_exit = must_exit.recv().fuse();
|
let s_must_exit = must_exit.recv().fuse();
|
||||||
pin_mut!(s_ring_recv, s_must_exit);
|
let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
|
||||||
|
pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
|
||||||
|
|
||||||
select! {
|
select! {
|
||||||
_ = next_full_scan => {
|
|
||||||
next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
|
|
||||||
eprintln!("({}) Adding full scan to syncer todo list", self.table.name);
|
|
||||||
self.todo.lock().await.add_full_scan(&self.table);
|
|
||||||
}
|
|
||||||
new_ring_r = s_ring_recv => {
|
new_ring_r = s_ring_recv => {
|
||||||
if let Some(new_ring) = new_ring_r {
|
if let Some(new_ring) = new_ring_r {
|
||||||
eprintln!("({}) Adding ring difference to syncer todo list", self.table.name);
|
eprintln!("({}) Adding ring difference to syncer todo list", self.table.name);
|
||||||
|
@ -137,11 +137,29 @@ where
|
||||||
prev_ring = new_ring;
|
prev_ring = new_ring;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
busy_opt = s_busy => {
|
||||||
|
if let Some(busy) = busy_opt {
|
||||||
|
if busy {
|
||||||
|
nothing_to_do_since = None;
|
||||||
|
} else {
|
||||||
|
if nothing_to_do_since.is_none() {
|
||||||
|
nothing_to_do_since = Some(Instant::now());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
must_exit_v = s_must_exit => {
|
must_exit_v = s_must_exit => {
|
||||||
if must_exit_v.unwrap_or(false) {
|
if must_exit_v.unwrap_or(false) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
_ = s_timeout => {
|
||||||
|
if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
|
||||||
|
nothing_to_do_since = None;
|
||||||
|
eprintln!("({}) Adding full scan to syncer todo list", self.table.name);
|
||||||
|
self.todo.lock().await.add_full_scan(&self.table);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -150,9 +168,11 @@ where
|
||||||
async fn syncer_task(
|
async fn syncer_task(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut must_exit: watch::Receiver<bool>,
|
mut must_exit: watch::Receiver<bool>,
|
||||||
|
busy_tx: mpsc::UnboundedSender<bool>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
while !*must_exit.borrow() {
|
while !*must_exit.borrow() {
|
||||||
if let Some(partition) = self.todo.lock().await.pop_task() {
|
if let Some(partition) = self.todo.lock().await.pop_task() {
|
||||||
|
busy_tx.send(true)?;
|
||||||
let res = self
|
let res = self
|
||||||
.clone()
|
.clone()
|
||||||
.sync_partition(&partition, &mut must_exit)
|
.sync_partition(&partition, &mut must_exit)
|
||||||
|
@ -164,6 +184,7 @@ where
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
busy_tx.send(false)?;
|
||||||
tokio::time::delay_for(Duration::from_secs(1)).await;
|
tokio::time::delay_for(Duration::from_secs(1)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
13
test_read.sh
Executable file
13
test_read.sh
Executable file
|
@ -0,0 +1,13 @@
|
||||||
|
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
for FILE in $(find target/debug/deps); do
|
||||||
|
SHA2=$(curl localhost:3900/$FILE -H 'Host: garage' 2>/dev/null | sha256sum | cut -d ' ' -f 1)
|
||||||
|
SHA2REF=$(sha256sum $FILE | cut -d ' ' -f 1)
|
||||||
|
if [ "$SHA2" = "$SHA2REF" ]; then
|
||||||
|
echo "OK $FILE"
|
||||||
|
else
|
||||||
|
echo "!!!! ERROR $FILE !!!!"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
8
test_write.sh
Executable file
8
test_write.sh
Executable file
|
@ -0,0 +1,8 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
for FILE in $(find target/debug/deps); do
|
||||||
|
echo
|
||||||
|
echo $FILE
|
||||||
|
curl -v localhost:3900/$FILE -X PUT -H 'Host: garage' -H 'Content-Type: application/blob' --data-binary "@$FILE"
|
||||||
|
done
|
||||||
|
|
Loading…
Reference in a new issue