Refactor sharding logic; coming next: full replication with epidemic dissemination

This commit is contained in:
Alex Auvolat 2020-04-19 13:22:28 +02:00
parent 4ba54ccfca
commit 7131553c53
8 changed files with 218 additions and 162 deletions

View file

@ -4,6 +4,7 @@ mod error;
mod background; mod background;
mod membership; mod membership;
mod table; mod table;
mod table_sharded;
mod table_sync; mod table_sync;
mod block; mod block;
@ -22,12 +23,15 @@ use std::collections::HashSet;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use structopt::StructOpt; use structopt::StructOpt;
use error::Error; use error::Error;
use membership::*; use membership::*;
use rpc_client::*; use rpc_client::*;
use server::{TlsConfig, DEFAULT_TIMEOUT}; use server::TlsConfig;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
#[structopt(name = "garage")] #[structopt(name = "garage")]
@ -158,11 +162,8 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
for adv in status.iter() { for adv in status.iter() {
if let Some(cfg) = config.members.get(&adv.id) { if let Some(cfg) = config.members.get(&adv.id) {
println!( println!(
"{}\t{}\t{}\t{}", "{:?}\t{}\t{}\t{}",
hex::encode(&adv.id), adv.id, cfg.datacenter, cfg.n_tokens, adv.addr
cfg.datacenter,
cfg.n_tokens,
adv.addr
); );
} }
} }
@ -176,7 +177,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
println!("\nFailed nodes:"); println!("\nFailed nodes:");
for (id, cfg) in config.members.iter() { for (id, cfg) in config.members.iter() {
if !status.iter().any(|x| x.id == *id) { if !status.iter().any(|x| x.id == *id) {
println!("{}\t{}\t{}", hex::encode(&id), cfg.datacenter, cfg.n_tokens); println!("{:?}\t{}\t{}", id, cfg.datacenter, cfg.n_tokens);
} }
} }
} }
@ -188,7 +189,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
println!("\nUnconfigured nodes:"); println!("\nUnconfigured nodes:");
for adv in status.iter() { for adv in status.iter() {
if !config.members.contains_key(&adv.id) { if !config.members.contains_key(&adv.id) {
println!("{}\t{}", hex::encode(&adv.id), adv.addr); println!("{:?}\t{}", adv.id, adv.addr);
} }
} }
} }

View file

@ -198,7 +198,7 @@ impl Ring {
self.walk_ring_from_pos(start, n) self.walk_ring_from_pos(start, n)
} }
pub fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> { fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
if n >= self.config.members.len() { if n >= self.config.members.len() {
return self.config.members.keys().cloned().collect::<Vec<_>>(); return self.config.members.keys().cloned().collect::<Vec<_>>();
} }

View file

@ -5,6 +5,7 @@ use std::sync::Arc;
use crate::background::BackgroundRunner; use crate::background::BackgroundRunner;
use crate::data::*; use crate::data::*;
use crate::table::*; use crate::table::*;
use crate::table_sharded::*;
use crate::version_table::*; use crate::version_table::*;
@ -90,7 +91,7 @@ impl Entry<String, String> for Object {
pub struct ObjectTable { pub struct ObjectTable {
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable>>, pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
} }
#[async_trait] #[async_trait]

View file

@ -2,7 +2,6 @@ use std::io::{Read, Write};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
pub use futures_util::future::FutureExt; pub use futures_util::future::FutureExt;
use serde::Deserialize; use serde::Deserialize;
@ -14,6 +13,7 @@ use crate::error::Error;
use crate::membership::System; use crate::membership::System;
use crate::rpc_server::RpcServer; use crate::rpc_server::RpcServer;
use crate::table::*; use crate::table::*;
use crate::table_sharded::*;
use crate::block::*; use crate::block::*;
use crate::block_ref_table::*; use crate::block_ref_table::*;
@ -22,8 +22,6 @@ use crate::version_table::*;
use crate::api_server; use crate::api_server;
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct Config { pub struct Config {
pub metadata_dir: PathBuf, pub metadata_dir: PathBuf,
@ -59,9 +57,9 @@ pub struct Garage {
pub system: Arc<System>, pub system: Arc<System>,
pub block_manager: Arc<BlockManager>, pub block_manager: Arc<BlockManager>,
pub object_table: Arc<Table<ObjectTable>>, pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
pub version_table: Arc<Table<VersionTable>>, pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
pub block_ref_table: Arc<Table<BlockRefTable>>, pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
} }
impl Garage { impl Garage {
@ -79,18 +77,16 @@ impl Garage {
let block_manager = let block_manager =
BlockManager::new(&db, config.data_dir.clone(), system.clone(), rpc_server); BlockManager::new(&db, config.data_dir.clone(), system.clone(), rpc_server);
let data_rep_param = TableReplicationParams { let data_rep_param = TableShardedReplication {
replication_factor: system.config.data_replication_factor, replication_factor: system.config.data_replication_factor,
write_quorum: (system.config.data_replication_factor + 1) / 2, write_quorum: (system.config.data_replication_factor + 1) / 2,
read_quorum: 1, read_quorum: 1,
timeout: DEFAULT_TIMEOUT,
}; };
let meta_rep_param = TableReplicationParams { let meta_rep_param = TableShardedReplication {
replication_factor: system.config.meta_replication_factor, replication_factor: system.config.meta_replication_factor,
write_quorum: (system.config.meta_replication_factor + 1) / 2, write_quorum: (system.config.meta_replication_factor + 1) / 2,
read_quorum: (system.config.meta_replication_factor + 1) / 2, read_quorum: (system.config.meta_replication_factor + 1) / 2,
timeout: DEFAULT_TIMEOUT,
}; };
println!("Initialize block_ref_table..."); println!("Initialize block_ref_table...");
@ -99,10 +95,10 @@ impl Garage {
background: background.clone(), background: background.clone(),
block_manager: block_manager.clone(), block_manager: block_manager.clone(),
}, },
data_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
"block_ref".to_string(), "block_ref".to_string(),
data_rep_param.clone(),
rpc_server, rpc_server,
) )
.await; .await;
@ -113,10 +109,10 @@ impl Garage {
background: background.clone(), background: background.clone(),
block_ref_table: block_ref_table.clone(), block_ref_table: block_ref_table.clone(),
}, },
meta_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
"version".to_string(), "version".to_string(),
meta_rep_param.clone(),
rpc_server, rpc_server,
) )
.await; .await;
@ -127,10 +123,10 @@ impl Garage {
background: background.clone(), background: background.clone(),
version_table: version_table.clone(), version_table: version_table.clone(),
}, },
meta_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
"object".to_string(), "object".to_string(),
meta_rep_param.clone(),
rpc_server, rpc_server,
) )
.await; .await;

View file

@ -10,30 +10,23 @@ use serde_bytes::ByteBuf;
use crate::data::*; use crate::data::*;
use crate::error::Error; use crate::error::Error;
use crate::membership::System; use crate::membership::{Ring, System};
use crate::rpc_client::*; use crate::rpc_client::*;
use crate::rpc_server::*; use crate::rpc_server::*;
use crate::table_sync::*; use crate::table_sync::*;
pub struct Table<F: TableSchema> { const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
pub struct Table<F: TableSchema, R: TableReplication> {
pub instance: F, pub instance: F,
pub replication: R,
pub name: String, pub name: String,
pub rpc_client: Arc<RpcClient<TableRPC<F>>>, pub rpc_client: Arc<RpcClient<TableRPC<F>>>,
pub system: Arc<System>, pub system: Arc<System>,
pub store: sled::Tree, pub store: sled::Tree,
pub syncer: ArcSwapOption<TableSyncer<F>>, pub syncer: ArcSwapOption<TableSyncer<F, R>>,
pub param: TableReplicationParams,
}
#[derive(Clone)]
pub struct TableReplicationParams {
pub replication_factor: usize,
pub read_quorum: usize,
pub write_quorum: usize,
pub timeout: Duration,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -112,15 +105,38 @@ pub trait TableSchema: Send + Sync {
} }
} }
impl<F: TableSchema + 'static> Table<F> { pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
// Which nodes to send reads from
fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
fn read_quorum(&self) -> usize;
// Which nodes to send writes to
fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
fn epidemic_writes(&self) -> bool;
// Which are the nodes that do actually replicate the data
fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
fn split_points(&self, ring: &Ring) -> Vec<Hash>;
}
impl<F, R> Table<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub async fn new( pub async fn new(
instance: F, instance: F,
replication: R,
system: Arc<System>, system: Arc<System>,
db: &sled::Db, db: &sled::Db,
name: String, name: String,
param: TableReplicationParams,
rpc_server: &mut RpcServer, rpc_server: &mut RpcServer,
) -> Arc<Self> { ) -> Arc<Self> {
let store = db.open_tree(&name).expect("Unable to open DB tree"); let store = db.open_tree(&name).expect("Unable to open DB tree");
@ -130,11 +146,11 @@ impl<F: TableSchema + 'static> Table<F> {
let table = Arc::new(Self { let table = Arc::new(Self {
instance, instance,
replication,
name, name,
rpc_client, rpc_client,
system, system,
store, store,
param,
syncer: ArcSwapOption::from(None), syncer: ArcSwapOption::from(None),
}); });
table.clone().register_handler(rpc_server, rpc_path); table.clone().register_handler(rpc_server, rpc_path);
@ -147,15 +163,19 @@ impl<F: TableSchema + 'static> Table<F> {
pub async fn insert(&self, e: &F::E) -> Result<(), Error> { pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash(); let hash = e.partition_key().hash();
let ring = self.system.ring.borrow().clone(); let who = self.replication.write_nodes(&hash, &self.system);
let who = ring.walk_ring(&hash, self.param.replication_factor);
//eprintln!("insert who: {:?}", who); //eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRPC::<F>::Update(vec![e_enc]); let rpc = TableRPC::<F>::Update(vec![e_enc]);
self.rpc_client self.rpc_client
.try_call_many(&who[..], rpc, self.param.write_quorum, self.param.timeout) .try_call_many(
&who[..],
rpc,
self.replication.write_quorum(),
TABLE_RPC_TIMEOUT,
)
.await?; .await?;
Ok(()) Ok(())
} }
@ -165,8 +185,7 @@ impl<F: TableSchema + 'static> Table<F> {
for entry in entries.iter() { for entry in entries.iter() {
let hash = entry.partition_key().hash(); let hash = entry.partition_key().hash();
let ring = self.system.ring.borrow().clone(); let who = self.replication.write_nodes(&hash, &self.system);
let who = ring.walk_ring(&hash, self.param.replication_factor);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who { for node in who {
if !call_list.contains_key(&node) { if !call_list.contains_key(&node) {
@ -179,7 +198,7 @@ impl<F: TableSchema + 'static> Table<F> {
let call_futures = call_list.drain().map(|(node, entries)| async move { let call_futures = call_list.drain().map(|(node, entries)| async move {
let rpc = TableRPC::<F>::Update(entries); let rpc = TableRPC::<F>::Update(entries);
let resp = self.rpc_client.call(&node, rpc, self.param.timeout).await?; let resp = self.rpc_client.call(&node, rpc, TABLE_RPC_TIMEOUT).await?;
Ok::<_, Error>((node, resp)) Ok::<_, Error>((node, resp))
}); });
let mut resps = call_futures.collect::<FuturesUnordered<_>>(); let mut resps = call_futures.collect::<FuturesUnordered<_>>();
@ -190,7 +209,7 @@ impl<F: TableSchema + 'static> Table<F> {
errors.push(e); errors.push(e);
} }
} }
if errors.len() > self.param.replication_factor - self.param.write_quorum { if errors.len() > self.replication.max_write_errors() {
Err(Error::Message("Too many errors".into())) Err(Error::Message("Too many errors".into()))
} else { } else {
Ok(()) Ok(())
@ -203,14 +222,18 @@ impl<F: TableSchema + 'static> Table<F> {
sort_key: &F::S, sort_key: &F::S,
) -> Result<Option<F::E>, Error> { ) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash(); let hash = partition_key.hash();
let ring = self.system.ring.borrow().clone(); let who = self.replication.read_nodes(&hash, &self.system);
let who = ring.walk_ring(&hash, self.param.replication_factor);
//eprintln!("get who: {:?}", who); //eprintln!("get who: {:?}", who);
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self let resps = self
.rpc_client .rpc_client
.try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout) .try_call_many(
&who[..],
rpc,
self.replication.read_quorum(),
TABLE_RPC_TIMEOUT,
)
.await?; .await?;
let mut ret = None; let mut ret = None;
@ -254,14 +277,18 @@ impl<F: TableSchema + 'static> Table<F> {
limit: usize, limit: usize,
) -> Result<Vec<F::E>, Error> { ) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash(); let hash = partition_key.hash();
let ring = self.system.ring.borrow().clone(); let who = self.replication.read_nodes(&hash, &self.system);
let who = ring.walk_ring(&hash, self.param.replication_factor);
let rpc = let rpc =
TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit);
let resps = self let resps = self
.rpc_client .rpc_client
.try_call_many(&who[..], rpc, self.param.read_quorum, self.param.timeout) .try_call_many(
&who[..],
rpc,
self.replication.read_quorum(),
TABLE_RPC_TIMEOUT,
)
.await?; .await?;
let mut ret = BTreeMap::new(); let mut ret = BTreeMap::new();
@ -315,7 +342,7 @@ impl<F: TableSchema + 'static> Table<F> {
&who[..], &who[..],
TableRPC::<F>::Update(vec![what_enc]), TableRPC::<F>::Update(vec![what_enc]),
who.len(), who.len(),
self.param.timeout, TABLE_RPC_TIMEOUT,
) )
.await?; .await?;
Ok(()) Ok(())

55
src/table_sharded.rs Normal file
View file

@ -0,0 +1,55 @@
use crate::data::*;
use crate::membership::{System, Ring};
use crate::table::*;
#[derive(Clone)]
pub struct TableShardedReplication {
pub replication_factor: usize,
pub read_quorum: usize,
pub write_quorum: usize,
}
impl TableReplication for TableShardedReplication {
// Sharded replication schema:
// - based on the ring of nodes, a certain set of neighbors
// store entries, given as a function of the position of the
// entry's hash in the ring
// - reads are done on all of the nodes that replicate the data
// - writes as well
fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
let ring = system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
let ring = system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor)
}
fn write_quorum(&self) -> usize {
self.write_quorum
}
fn max_write_errors(&self) -> usize {
self.replication_factor - self.write_quorum
}
fn epidemic_writes(&self) -> bool {
false
}
fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID> {
ring.walk_ring(&hash, self.replication_factor)
}
fn split_points(&self, ring: &Ring) -> Vec<Hash> {
let mut ret = vec![];
ret.push([0u8; 32].into());
for entry in ring.ring.iter() {
ret.push(entry.location.clone());
}
ret.push([0xFFu8; 32].into());
ret
}
}

View file

@ -1,5 +1,5 @@
use rand::Rng; use rand::Rng;
use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -21,10 +21,12 @@ const MAX_DEPTH: usize = 16;
const SCAN_INTERVAL: Duration = Duration::from_secs(3600); const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
pub struct TableSyncer<F: TableSchema> { const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10);
pub table: Arc<Table<F>>,
pub todo: Mutex<SyncTodo>, pub struct TableSyncer<F: TableSchema, R: TableReplication> {
pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>, table: Arc<Table<F, R>>,
todo: Mutex<SyncTodo>,
cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -36,21 +38,21 @@ pub enum SyncRPC {
} }
pub struct SyncTodo { pub struct SyncTodo {
pub todo: Vec<Partition>, todo: Vec<TodoPartition>,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Partition { struct TodoPartition {
pub begin: Hash, begin: Hash,
pub end: Hash, end: Hash,
pub retain: bool, retain: bool,
} }
#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] #[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
pub struct SyncRange { pub struct SyncRange {
pub begin: Vec<u8>, begin: Vec<u8>,
pub end: Vec<u8>, end: Vec<u8>,
pub level: usize, level: usize,
} }
impl std::cmp::PartialOrd for SyncRange { impl std::cmp::PartialOrd for SyncRange {
@ -66,16 +68,20 @@ impl std::cmp::Ord for SyncRange {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RangeChecksum { pub struct RangeChecksum {
pub bounds: SyncRange, bounds: SyncRange,
pub children: Vec<(SyncRange, Hash)>, children: Vec<(SyncRange, Hash)>,
pub found_limit: Option<Vec<u8>>, found_limit: Option<Vec<u8>>,
#[serde(skip, default = "std::time::Instant::now")] #[serde(skip, default = "std::time::Instant::now")]
pub time: Instant, time: Instant,
} }
impl<F: TableSchema + 'static> TableSyncer<F> { impl<F, R> TableSyncer<F, R>
pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> { where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
pub async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
let todo = SyncTodo { todo: Vec::new() }; let todo = SyncTodo { todo: Vec::new() };
let syncer = Arc::new(TableSyncer { let syncer = Arc::new(TableSyncer {
table: table.clone(), table: table.clone(),
@ -166,7 +172,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
async fn sync_partition( async fn sync_partition(
self: Arc<Self>, self: Arc<Self>,
partition: &Partition, partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>, must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> { ) -> Result<(), Error> {
eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition); eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition);
@ -175,8 +181,10 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.await?; .await?;
let my_id = self.table.system.id.clone(); let my_id = self.table.system.id.clone();
let ring = self.table.system.ring.borrow().clone(); let nodes = self
let nodes = ring.walk_ring(&partition.begin, self.table.param.replication_factor); .table
.replication
.write_nodes(&partition.begin, &self.table.system);
let mut sync_futures = nodes let mut sync_futures = nodes
.iter() .iter()
.filter(|node| **node != my_id) .filter(|node| **node != my_id)
@ -349,7 +357,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
async fn do_sync_with( async fn do_sync_with(
self: Arc<Self>, self: Arc<Self>,
partition: Partition, partition: TodoPartition,
root_ck: RangeChecksum, root_ck: RangeChecksum,
who: UUID, who: UUID,
retain: bool, retain: bool,
@ -367,7 +375,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
partition.begin.clone(), partition.begin.clone(),
partition.end.clone(), partition.end.clone(),
)), )),
self.table.param.timeout, TABLE_SYNC_RPC_TIMEOUT,
) )
.await?; .await?;
if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp { if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
@ -398,7 +406,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.call( .call(
&who, &who,
&TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)), &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)),
self.table.param.timeout, TABLE_SYNC_RPC_TIMEOUT,
) )
.await?; .await?;
if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) =
@ -456,11 +464,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let rpc_resp = self let rpc_resp = self
.table .table
.rpc_client .rpc_client
.call( .call(&who, &TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
&who,
&TableRPC::<F>::Update(values),
self.table.param.timeout,
)
.await?; .await?;
if let TableRPC::<F>::Ok = rpc_resp { if let TableRPC::<F>::Ok = rpc_resp {
Ok(()) Ok(())
@ -490,7 +494,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
} }
} }
pub async fn handle_checksums_rpc( async fn handle_checksums_rpc(
self: &Arc<Self>, self: &Arc<Self>,
checksums: &[RangeChecksum], checksums: &[RangeChecksum],
retain: bool, retain: bool,
@ -589,39 +593,20 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
} }
impl SyncTodo { impl SyncTodo {
fn add_full_scan<F: TableSchema>(&mut self, table: &Table<F>) { fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) {
let my_id = table.system.id.clone(); let my_id = table.system.id.clone();
self.todo.clear(); self.todo.clear();
let ring: Arc<Ring> = table.system.ring.borrow().clone(); let ring = table.system.ring.borrow().clone();
let split_points = table.replication.split_points(&ring);
for i in 0..ring.ring.len() { for i in 0..split_points.len() - 1 {
let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor); let begin = split_points[i].clone();
let begin = ring.ring[i].location.clone(); let end = split_points[i + 1].clone();
let nodes = table.replication.write_nodes_from_ring(&begin, &ring);
if i == 0 { let retain = nodes.contains(&my_id);
self.add_full_scan_aux(table, [0u8; 32].into(), begin.clone(), &nodes[..], &my_id);
}
if i == ring.ring.len() - 1 {
self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id);
} else {
let end = ring.ring[i + 1].location.clone();
self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id);
}
}
}
fn add_full_scan_aux<F: TableSchema>(
&mut self,
table: &Table<F>,
begin: Hash,
end: Hash,
nodes: &[UUID],
my_id: &UUID,
) {
let retain = nodes.contains(my_id);
if !retain { if !retain {
// Check if we have some data to send, otherwise skip // Check if we have some data to send, otherwise skip
if table if table
@ -630,58 +615,58 @@ impl SyncTodo {
.next() .next()
.is_none() .is_none()
{ {
return; continue;
} }
} }
self.todo.push(Partition { begin, end, retain }); self.todo.push(TodoPartition { begin, end, retain });
}
} }
fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) { fn add_ring_difference<F: TableSchema, R: TableReplication>(
&mut self,
table: &Table<F, R>,
old_ring: &Ring,
new_ring: &Ring,
) {
let my_id = table.system.id.clone(); let my_id = table.system.id.clone();
let old_ring = ring_points(old); let mut all_points = None
let new_ring = ring_points(new); .into_iter()
let both_ring = old_ring.union(&new_ring).cloned().collect::<BTreeSet<_>>(); .chain(table.replication.split_points(old_ring).drain(..))
.chain(table.replication.split_points(new_ring).drain(..))
.chain(self.todo.iter().map(|x| x.begin.clone()))
.chain(self.todo.iter().map(|x| x.end.clone()))
.collect::<Vec<_>>();
all_points.sort();
all_points.dedup();
let prev_todo_begin = self let mut old_todo = std::mem::replace(&mut self.todo, vec![]);
.todo old_todo.sort_by(|x, y| x.begin.cmp(&y.begin));
.iter()
.map(|x| x.begin.clone())
.collect::<BTreeSet<_>>();
let prev_todo_end = self
.todo
.iter()
.map(|x| x.end.clone())
.collect::<BTreeSet<_>>();
let prev_todo = prev_todo_begin
.union(&prev_todo_end)
.cloned()
.collect::<BTreeSet<_>>();
let all_points = both_ring.union(&prev_todo).cloned().collect::<Vec<_>>();
self.todo.sort_by(|x, y| x.begin.cmp(&y.begin));
let mut new_todo = vec![]; let mut new_todo = vec![];
for i in 0..all_points.len() - 1 { for i in 0..all_points.len() - 1 {
let begin = all_points[i].clone(); let begin = all_points[i].clone();
let end = all_points[i + 1].clone(); let end = all_points[i + 1].clone();
let was_ours = old let was_ours = table
.walk_ring(&begin, table.param.replication_factor) .replication
.write_nodes_from_ring(&begin, &old_ring)
.contains(&my_id); .contains(&my_id);
let is_ours = new let is_ours = table
.walk_ring(&begin, table.param.replication_factor) .replication
.write_nodes_from_ring(&begin, &new_ring)
.contains(&my_id); .contains(&my_id);
let was_todo = match self.todo.binary_search_by(|x| x.begin.cmp(&begin)) {
let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) {
Ok(_) => true, Ok(_) => true,
Err(j) => { Err(j) => {
(j > 0 && self.todo[j - 1].begin < end && begin < self.todo[j - 1].end) (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end)
|| (j < self.todo.len() || (j < old_todo.len()
&& self.todo[j].begin < end && begin < self.todo[j].end) && old_todo[j].begin < end && begin < old_todo[j].end)
} }
}; };
if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) {
new_todo.push(Partition { new_todo.push(TodoPartition {
begin, begin,
end, end,
retain: is_ours, retain: is_ours,
@ -692,7 +677,7 @@ impl SyncTodo {
self.todo = new_todo; self.todo = new_todo;
} }
fn pop_task(&mut self) -> Option<Partition> { fn pop_task(&mut self) -> Option<TodoPartition> {
if self.todo.is_empty() { if self.todo.is_empty() {
return None; return None;
} }
@ -707,13 +692,3 @@ impl SyncTodo {
} }
} }
} }
fn ring_points(ring: &Ring) -> BTreeSet<Hash> {
let mut ret = BTreeSet::new();
ret.insert([0u8; 32].into());
ret.insert([0xFFu8; 32].into());
for i in 0..ring.ring.len() {
ret.insert(ring.ring[i].location.clone());
}
ret
}

View file

@ -5,6 +5,7 @@ use std::sync::Arc;
use crate::background::BackgroundRunner; use crate::background::BackgroundRunner;
use crate::data::*; use crate::data::*;
use crate::table::*; use crate::table::*;
use crate::table_sharded::*;
use crate::block_ref_table::*; use crate::block_ref_table::*;
@ -56,7 +57,7 @@ impl Entry<Hash, EmptySortKey> for Version {
pub struct VersionTable { pub struct VersionTable {
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
pub block_ref_table: Arc<Table<BlockRefTable>>, pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
} }
#[async_trait] #[async_trait]