Merge pull request 'various fixes for v0.8.0' (#380) from various-fixes-for-0.8 into main

Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/380
This commit is contained in:
Alex 2022-09-13 16:49:05 +02:00
commit 80fdbfb0aa
23 changed files with 81 additions and 59 deletions

View file

@ -295,7 +295,6 @@ fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> {
let mut ret = None; let mut ret = None;
for item in cbc.children() { for item in cbc.children() {
println!("{:?}", item);
if item.has_tag_name("LocationConstraint") { if item.has_tag_name("LocationConstraint") {
if ret != None { if ret != None {
return None; return None;

View file

@ -609,7 +609,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
} }
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CopyObjectResult { pub struct CopyObjectResult {
#[serde(rename = "LastModified")] #[serde(rename = "LastModified")]
pub last_modified: s3_xml::Value, pub last_modified: s3_xml::Value,
@ -617,7 +617,7 @@ pub struct CopyObjectResult {
pub etag: s3_xml::Value, pub etag: s3_xml::Value,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CopyPartResult { pub struct CopyPartResult {
#[serde(serialize_with = "xmlns_tag")] #[serde(serialize_with = "xmlns_tag")]
pub xmlns: (), pub xmlns: (),
@ -662,7 +662,6 @@ mod tests {
last_modified: s3_xml::Value("2011-04-11T20:34:56.000Z".into()), last_modified: s3_xml::Value("2011-04-11T20:34:56.000Z".into()),
etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".into()), etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".into()),
}; };
println!("{}", to_xml_with_header(&v)?);
assert_eq!(to_xml_with_header(&v)?, expected_retval); assert_eq!(to_xml_with_header(&v)?, expected_retval);

View file

@ -25,7 +25,7 @@ impl From<&str> for Value {
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct IntValue(#[serde(rename = "$value")] pub i64); pub struct IntValue(#[serde(rename = "$value")] pub i64);
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Bucket { pub struct Bucket {
#[serde(rename = "CreationDate")] #[serde(rename = "CreationDate")]
pub creation_date: Value, pub creation_date: Value,
@ -33,7 +33,7 @@ pub struct Bucket {
pub name: Value, pub name: Value,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Owner { pub struct Owner {
#[serde(rename = "DisplayName")] #[serde(rename = "DisplayName")]
pub display_name: Value, pub display_name: Value,
@ -41,13 +41,13 @@ pub struct Owner {
pub id: Value, pub id: Value,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct BucketList { pub struct BucketList {
#[serde(rename = "Bucket")] #[serde(rename = "Bucket")]
pub entries: Vec<Bucket>, pub entries: Vec<Bucket>,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListAllMyBucketsResult { pub struct ListAllMyBucketsResult {
#[serde(rename = "Buckets")] #[serde(rename = "Buckets")]
pub buckets: BucketList, pub buckets: BucketList,
@ -55,7 +55,7 @@ pub struct ListAllMyBucketsResult {
pub owner: Owner, pub owner: Owner,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct LocationConstraint { pub struct LocationConstraint {
#[serde(serialize_with = "xmlns_tag")] #[serde(serialize_with = "xmlns_tag")]
pub xmlns: (), pub xmlns: (),
@ -63,7 +63,7 @@ pub struct LocationConstraint {
pub region: String, pub region: String,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Deleted { pub struct Deleted {
#[serde(rename = "Key")] #[serde(rename = "Key")]
pub key: Value, pub key: Value,
@ -73,7 +73,7 @@ pub struct Deleted {
pub delete_marker_version_id: Value, pub delete_marker_version_id: Value,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Error { pub struct Error {
#[serde(rename = "Code")] #[serde(rename = "Code")]
pub code: Value, pub code: Value,
@ -85,7 +85,7 @@ pub struct Error {
pub region: Option<Value>, pub region: Option<Value>,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct DeleteError { pub struct DeleteError {
#[serde(rename = "Code")] #[serde(rename = "Code")]
pub code: Value, pub code: Value,
@ -97,7 +97,7 @@ pub struct DeleteError {
pub version_id: Option<Value>, pub version_id: Option<Value>,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct DeleteResult { pub struct DeleteResult {
#[serde(serialize_with = "xmlns_tag")] #[serde(serialize_with = "xmlns_tag")]
pub xmlns: (), pub xmlns: (),
@ -107,7 +107,7 @@ pub struct DeleteResult {
pub errors: Vec<DeleteError>, pub errors: Vec<DeleteError>,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct InitiateMultipartUploadResult { pub struct InitiateMultipartUploadResult {
#[serde(serialize_with = "xmlns_tag")] #[serde(serialize_with = "xmlns_tag")]
pub xmlns: (), pub xmlns: (),
@ -119,7 +119,7 @@ pub struct InitiateMultipartUploadResult {
pub upload_id: Value, pub upload_id: Value,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CompleteMultipartUploadResult { pub struct CompleteMultipartUploadResult {
#[serde(serialize_with = "xmlns_tag")] #[serde(serialize_with = "xmlns_tag")]
pub xmlns: (), pub xmlns: (),
@ -133,7 +133,7 @@ pub struct CompleteMultipartUploadResult {
pub etag: Value, pub etag: Value,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct Initiator { pub struct Initiator {
#[serde(rename = "DisplayName")] #[serde(rename = "DisplayName")]
pub display_name: Value, pub display_name: Value,
@ -141,7 +141,7 @@ pub struct Initiator {
pub id: Value, pub id: Value,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListMultipartItem { pub struct ListMultipartItem {
#[serde(rename = "Initiated")] #[serde(rename = "Initiated")]
pub initiated: Value, pub initiated: Value,
@ -157,7 +157,7 @@ pub struct ListMultipartItem {
pub storage_class: Value, pub storage_class: Value,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListMultipartUploadsResult { pub struct ListMultipartUploadsResult {
#[serde(serialize_with = "xmlns_tag")] #[serde(serialize_with = "xmlns_tag")]
pub xmlns: (), pub xmlns: (),
@ -187,7 +187,7 @@ pub struct ListMultipartUploadsResult {
pub encoding_type: Option<Value>, pub encoding_type: Option<Value>,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct PartItem { pub struct PartItem {
#[serde(rename = "ETag")] #[serde(rename = "ETag")]
pub etag: Value, pub etag: Value,
@ -199,7 +199,7 @@ pub struct PartItem {
pub size: IntValue, pub size: IntValue,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListPartsResult { pub struct ListPartsResult {
#[serde(serialize_with = "xmlns_tag")] #[serde(serialize_with = "xmlns_tag")]
pub xmlns: (), pub xmlns: (),
@ -227,7 +227,7 @@ pub struct ListPartsResult {
pub storage_class: Value, pub storage_class: Value,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListBucketItem { pub struct ListBucketItem {
#[serde(rename = "Key")] #[serde(rename = "Key")]
pub key: Value, pub key: Value,
@ -241,13 +241,13 @@ pub struct ListBucketItem {
pub storage_class: Value, pub storage_class: Value,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct CommonPrefix { pub struct CommonPrefix {
#[serde(rename = "Prefix")] #[serde(rename = "Prefix")]
pub prefix: Value, pub prefix: Value,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct ListBucketResult { pub struct ListBucketResult {
#[serde(serialize_with = "xmlns_tag")] #[serde(serialize_with = "xmlns_tag")]
pub xmlns: (), pub xmlns: (),
@ -281,7 +281,7 @@ pub struct ListBucketResult {
pub common_prefixes: Vec<CommonPrefix>, pub common_prefixes: Vec<CommonPrefix>,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct VersioningConfiguration { pub struct VersioningConfiguration {
#[serde(serialize_with = "xmlns_tag")] #[serde(serialize_with = "xmlns_tag")]
pub xmlns: (), pub xmlns: (),
@ -289,7 +289,7 @@ pub struct VersioningConfiguration {
pub status: Option<Value>, pub status: Option<Value>,
} }
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq, Eq)]
pub struct PostObject { pub struct PostObject {
#[serde(serialize_with = "xmlns_tag")] #[serde(serialize_with = "xmlns_tag")]
pub xmlns: (), pub xmlns: (),

View file

@ -7,7 +7,7 @@ use garage_table::*;
/// The bucket alias table holds the names given to buckets /// The bucket alias table holds the names given to buckets
/// in the global namespace. /// in the global namespace.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketAlias { pub struct BucketAlias {
name: String, name: String,
pub state: crdt::Lww<Option<Uuid>>, pub state: crdt::Lww<Option<Uuid>>,

View file

@ -12,7 +12,7 @@ use crate::permission::BucketKeyPerm;
/// Its parameters are not directly accessible as: /// Its parameters are not directly accessible as:
/// - It must be possible to merge paramaters, hence the use of a LWW CRDT. /// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket { pub struct Bucket {
/// ID of the bucket /// ID of the bucket
pub id: Uuid, pub id: Uuid,
@ -21,7 +21,7 @@ pub struct Bucket {
} }
/// Configuration for a bucket /// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams { pub struct BucketParams {
/// Bucket's creation date /// Bucket's creation date
pub creation_date: u64, pub creation_date: u64,

View file

@ -6,7 +6,7 @@ use garage_db as db;
use garage_util::background::*; use garage_util::background::*;
use garage_util::config::*; use garage_util::config::*;
use garage_util::error::Error; use garage_util::error::*;
use garage_rpc::system::System; use garage_rpc::system::System;
@ -76,9 +76,14 @@ pub struct GarageK2V {
impl Garage { impl Garage {
/// Create and run garage /// Create and run garage
pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> { pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> {
// Create meta dir and data dir if they don't exist already
std::fs::create_dir_all(&config.metadata_dir)
.ok_or_message("Unable to create Garage metadata directory")?;
std::fs::create_dir_all(&config.data_dir)
.ok_or_message("Unable to create Garage data directory")?;
info!("Opening database..."); info!("Opening database...");
let mut db_path = config.metadata_dir.clone(); let mut db_path = config.metadata_dir.clone();
std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
let db = match config.db_engine.as_str() { let db = match config.db_engine.as_str() {
// ---- Sled DB ---- // ---- Sled DB ----
#[cfg(feature = "sled")] #[cfg(feature = "sled")]
@ -164,7 +169,7 @@ impl Garage {
background.clone(), background.clone(),
replication_mode.replication_factor(), replication_mode.replication_factor(),
&config, &config,
); )?;
let data_rep_param = TableShardedReplication { let data_rep_param = TableShardedReplication {
system: system.clone(), system: system.clone(),

View file

@ -81,7 +81,7 @@ impl<T: CountedItem> CounterEntry<T> {
} }
/// A counter entry in the global table /// A counter entry in the global table
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct CounterValue { pub struct CounterValue {
pub node_values: BTreeMap<Uuid, (u64, i64)>, pub node_values: BTreeMap<Uuid, (u64, i64)>,
} }

View file

@ -9,7 +9,7 @@ use crate::permission::BucketKeyPerm;
use crate::prev::v051::key_table as old; use crate::prev::v051::key_table as old;
/// An api key /// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Key { pub struct Key {
/// The id of the key (immutable), used as partition key /// The id of the key (immutable), used as partition key
pub key_id: String, pub key_id: String,
@ -19,7 +19,7 @@ pub struct Key {
} }
/// Configuration for a key /// Configuration for a key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct KeyParams { pub struct KeyParams {
/// The secret_key associated (immutable) /// The secret_key associated (immutable)
pub secret_key: String, pub secret_key: String,

View file

@ -10,7 +10,7 @@ use super::key_table::PermissionSet;
/// Its parameters are not directly accessible as: /// Its parameters are not directly accessible as:
/// - It must be possible to merge paramaters, hence the use of a LWW CRDT. /// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket { pub struct Bucket {
/// Name of the bucket /// Name of the bucket
pub name: String, pub name: String,
@ -19,7 +19,7 @@ pub struct Bucket {
} }
/// State of a bucket /// State of a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum BucketState { pub enum BucketState {
/// The bucket is deleted /// The bucket is deleted
Deleted, Deleted,
@ -41,7 +41,7 @@ impl Crdt for BucketState {
} }
/// Configuration for a bucket /// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams { pub struct BucketParams {
/// Map of key with access to the bucket, and what kind of access they give /// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::LwwMap<String, PermissionSet>, pub authorized_keys: crdt::LwwMap<String, PermissionSet>,

View file

@ -4,7 +4,7 @@ use garage_table::crdt::*;
use garage_table::*; use garage_table::*;
/// An api key /// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Key { pub struct Key {
/// The id of the key (immutable), used as partition key /// The id of the key (immutable), used as partition key
pub key_id: String, pub key_id: String,

View file

@ -6,7 +6,7 @@ use garage_util::data::*;
use garage_table::crdt::*; use garage_table::crdt::*;
/// An object /// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object { pub struct Object {
/// The bucket in which the object is stored, used as partition key /// The bucket in which the object is stored, used as partition key
pub bucket: String, pub bucket: String,
@ -26,7 +26,7 @@ impl Object {
} }
/// Informations about a version of an object /// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion { pub struct ObjectVersion {
/// Id of the version /// Id of the version
pub uuid: Uuid, pub uuid: Uuid,
@ -37,7 +37,7 @@ pub struct ObjectVersion {
} }
/// State of an object version /// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState { pub enum ObjectVersionState {
/// The version is being received /// The version is being received
Uploading(ObjectVersionHeaders), Uploading(ObjectVersionHeaders),

View file

@ -6,7 +6,7 @@ use garage_table::crdt::*;
use garage_table::*; use garage_table::*;
/// A version of an object /// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version { pub struct Version {
/// UUID of the version, used as partition key /// UUID of the version, used as partition key
pub uuid: Uuid, pub uuid: Uuid,

View file

@ -10,7 +10,7 @@ use garage_table::*;
use garage_block::manager::*; use garage_block::manager::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef { pub struct BlockRef {
/// Hash (blake2 sum) of the block, used as partition key /// Hash (blake2 sum) of the block, used as partition key
pub block: Hash, pub block: Hash,

View file

@ -21,7 +21,7 @@ pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
pub const BYTES: &str = "bytes"; pub const BYTES: &str = "bytes";
/// An object /// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object { pub struct Object {
/// The bucket in which the object is stored, used as partition key /// The bucket in which the object is stored, used as partition key
pub bucket_id: Uuid, pub bucket_id: Uuid,
@ -70,7 +70,7 @@ impl Object {
} }
/// Informations about a version of an object /// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion { pub struct ObjectVersion {
/// Id of the version /// Id of the version
pub uuid: Uuid, pub uuid: Uuid,
@ -81,7 +81,7 @@ pub struct ObjectVersion {
} }
/// State of an object version /// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState { pub enum ObjectVersionState {
/// The version is being received /// The version is being received
Uploading(ObjectVersionHeaders), Uploading(ObjectVersionHeaders),

View file

@ -15,7 +15,7 @@ use crate::s3::block_ref_table::*;
use crate::prev::v051::version_table as old; use crate::prev::v051::version_table as old;
/// A version of an object /// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version { pub struct Version {
/// UUID of the version, used as partition key /// UUID of the version, used as partition key
pub uuid: Uuid, pub uuid: Uuid,

View file

@ -56,7 +56,7 @@ pub async fn get_kubernetes_nodes(
let mut ret = Vec::with_capacity(nodes.items.len()); let mut ret = Vec::with_capacity(nodes.items.len());
for node in nodes { for node in nodes {
println!("Found Pod: {:?}", node.metadata.name); info!("Found Pod: {:?}", node.metadata.name);
let pubkey = &node let pubkey = &node
.metadata .metadata

View file

@ -198,7 +198,7 @@ impl System {
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
replication_factor: usize, replication_factor: usize,
config: &Config, config: &Config,
) -> Arc<Self> { ) -> Result<Arc<Self>, Error> {
let node_key = let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!( info!(
@ -206,11 +206,21 @@ impl System {
hex::encode(&node_key.public_key()[..8]) hex::encode(&node_key.public_key()[..8])
); );
let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout"); let persist_cluster_layout: Persister<ClusterLayout> =
Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
let cluster_layout = match persist_cluster_layout.load() { let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => x, Ok(x) => {
if x.replication_factor != replication_factor {
return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
x.replication_factor,
replication_factor
)));
}
x
}
Err(e) => { Err(e) => {
info!( info!(
"No valid previous cluster layout stored ({}), starting fresh.", "No valid previous cluster layout stored ({}), starting fresh.",
@ -303,7 +313,7 @@ impl System {
metadata_dir: config.metadata_dir.clone(), metadata_dir: config.metadata_dir.clone(),
}); });
sys.system_endpoint.set_handler(sys.clone()); sys.system_endpoint.set_handler(sys.clone());
sys Ok(sys)
} }
/// Perform bootstraping, starting the ping loop /// Perform bootstraping, starting the ping loop
@ -485,7 +495,7 @@ impl System {
let local_info = self.local_status.load(); let local_info = self.local_status.load();
if local_info.replication_factor < info.replication_factor { if local_info.replication_factor < info.replication_factor {
error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and might lead to bugs", error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and will lead to data corruption. Shutting down for safety.",
info.replication_factor, info.replication_factor,
local_info.replication_factor); local_info.replication_factor);
std::process::exit(1); std::process::exit(1);
@ -513,6 +523,16 @@ impl System {
self: &Arc<Self>, self: &Arc<Self>,
adv: &ClusterLayout, adv: &ClusterLayout,
) -> Result<SystemRpc, Error> { ) -> Result<SystemRpc, Error> {
if adv.replication_factor != self.replication_factor {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
adv.replication_factor,
self.replication_factor
);
error!("{}", msg);
return Err(Error::Message(msg));
}
let update_ring = self.update_ring.lock().await; let update_ring = self.update_ring.lock().await;
let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); let mut layout: ClusterLayout = self.ring.borrow().layout.clone();

View file

@ -113,7 +113,6 @@ where
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash(); let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash); let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRpc::<F>::Update(vec![e_enc]); let rpc = TableRpc::<F>::Update(vec![e_enc]);

View file

@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use crate::crdt::crdt::*; use crate::crdt::crdt::*;
/// Boolean, where `true` is an absorbing state /// Boolean, where `true` is an absorbing state
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Bool(bool); pub struct Bool(bool);
impl Bool { impl Bool {

View file

@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use crate::crdt::crdt::*; use crate::crdt::crdt::*;
/// Deletable object (once deleted, cannot go back) /// Deletable object (once deleted, cannot go back)
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Deletable<T> { pub enum Deletable<T> {
Present(T), Present(T),
Deleted, Deleted,

View file

@ -37,7 +37,7 @@ use crate::crdt::crdt::*;
/// ///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing /// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in enterprise when reconciliating databases with ad-hoc scripts. /// in enterprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Lww<T> { pub struct Lww<T> {
ts: u64, ts: u64,
v: T, v: T,

View file

@ -23,7 +23,7 @@ use crate::crdt::crdt::*;
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, /// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are /// the serialization cost `O(n)` would still have to be paid at each modification, so we are
/// actually not losing anything here. /// actually not losing anything here.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct LwwMap<K, V> { pub struct LwwMap<K, V> {
vals: Vec<(K, u64, V)>, vals: Vec<(K, u64, V)>,
} }

View file

@ -16,7 +16,7 @@ use crate::crdt::crdt::*;
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, /// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are /// the serialization cost `O(n)` would still have to be paid at each modification, so we are
/// actually not losing anything here. /// actually not losing anything here.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Map<K, V> { pub struct Map<K, V> {
vals: Vec<(K, V)>, vals: Vec<(K, V)>,
} }