k2v-client libary poll_range and CLI poll-range

This commit is contained in:
Alex Auvolat 2023-01-11 11:12:16 +01:00
parent de1111076b
commit 32aab06929
No known key found for this signature in database
GPG key ID: 0E496D15096376BE
5 changed files with 248 additions and 59 deletions

3
Cargo.lock generated
View file

@ -1752,12 +1752,13 @@ dependencies = [
[[package]] [[package]]
name = "k2v-client" name = "k2v-client"
version = "0.0.1" version = "0.1.1"
dependencies = [ dependencies = [
"base64", "base64",
"clap 3.1.18", "clap 3.1.18",
"garage_util", "garage_util",
"http", "http",
"hyper-rustls 0.23.0",
"log", "log",
"rusoto_core", "rusoto_core",
"rusoto_credential", "rusoto_credential",

View file

@ -32,7 +32,7 @@ args@{
ignoreLockHash, ignoreLockHash,
}: }:
let let
nixifiedLockHash = "b6aeefc112eb232904b24398f4e5da776c8ee2c13d427a26dbdf1732205d4fc9"; nixifiedLockHash = "8f036894ab81a528f76e97e904ff3e496a9b1500569312489d444f615fb781bf";
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
lockHashIgnored = if ignoreLockHash lockHashIgnored = if ignoreLockHash
@ -65,7 +65,7 @@ in
garage_api = rustPackages.unknown.garage_api."0.8.1"; garage_api = rustPackages.unknown.garage_api."0.8.1";
garage_web = rustPackages.unknown.garage_web."0.8.1"; garage_web = rustPackages.unknown.garage_web."0.8.1";
garage = rustPackages.unknown.garage."0.8.1"; garage = rustPackages.unknown.garage."0.8.1";
k2v-client = rustPackages.unknown.k2v-client."0.0.1"; k2v-client = rustPackages.unknown.k2v-client."0.1.1";
}; };
"registry+https://github.com/rust-lang/crates.io-index".addr2line."0.17.0" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".addr2line."0.17.0" = overridableMkRustCrate (profileName: rec {
name = "addr2line"; name = "addr2line";
@ -2434,9 +2434,9 @@ in
}; };
}); });
"unknown".k2v-client."0.0.1" = overridableMkRustCrate (profileName: rec { "unknown".k2v-client."0.1.1" = overridableMkRustCrate (profileName: rec {
name = "k2v-client"; name = "k2v-client";
version = "0.0.1"; version = "0.1.1";
registry = "unknown"; registry = "unknown";
src = fetchCrateLocal (workspaceSrc + "/src/k2v-client"); src = fetchCrateLocal (workspaceSrc + "/src/k2v-client");
features = builtins.concatLists [ features = builtins.concatLists [
@ -2449,6 +2449,7 @@ in
${ if rootFeatures' ? "k2v-client/clap" || rootFeatures' ? "k2v-client/cli" then "clap" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".clap."3.1.18" { inherit profileName; }).out; ${ if rootFeatures' ? "k2v-client/clap" || rootFeatures' ? "k2v-client/cli" then "clap" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".clap."3.1.18" { inherit profileName; }).out;
${ if rootFeatures' ? "k2v-client/cli" || rootFeatures' ? "k2v-client/garage_util" then "garage_util" else null } = (rustPackages."unknown".garage_util."0.8.1" { inherit profileName; }).out; ${ if rootFeatures' ? "k2v-client/cli" || rootFeatures' ? "k2v-client/garage_util" then "garage_util" else null } = (rustPackages."unknown".garage_util."0.8.1" { inherit profileName; }).out;
http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.8" { inherit profileName; }).out; http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.8" { inherit profileName; }).out;
hyper_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.23.0" { inherit profileName; }).out;
log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }).out; log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }).out;
rusoto_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_core."0.48.0" { inherit profileName; }).out; rusoto_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_core."0.48.0" { inherit profileName; }).out;
rusoto_credential = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_credential."0.48.0" { inherit profileName; }).out; rusoto_credential = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_credential."0.48.0" { inherit profileName; }).out;

View file

@ -1,6 +1,6 @@
[package] [package]
name = "k2v-client" name = "k2v-client"
version = "0.0.1" version = "0.1.1"
authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"] authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "AGPL-3.0" license = "AGPL-3.0"
@ -15,6 +15,7 @@ log = "0.4"
rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] } rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
rusoto_credential = "0.48.0" rusoto_credential = "0.48.0"
rusoto_signature = "0.48.0" rusoto_signature = "0.48.0"
hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12", "logging" ] }
serde = "1.0.137" serde = "1.0.137"
serde_json = "1.0.81" serde_json = "1.0.81"
thiserror = "1.0.31" thiserror = "1.0.31"

View file

@ -1,3 +1,5 @@
use std::collections::BTreeMap;
use std::process::exit;
use std::time::Duration; use std::time::Duration;
use k2v_client::*; use k2v_client::*;
@ -58,21 +60,38 @@ enum Command {
output_kind: ReadOutputKind, output_kind: ReadOutputKind,
}, },
/// Watch changes on a single value /// Watch changes on a single value
Poll { PollItem {
/// Partition key to delete from /// Partition key of item to watch
partition_key: String, partition_key: String,
/// Sort key to delete from /// Sort key of item to watch
sort_key: String, sort_key: String,
/// Causality information /// Causality information
#[clap(short, long)] #[clap(short, long)]
causality: String, causality: String,
/// Timeout, in seconds /// Timeout, in seconds
#[clap(short, long)] #[clap(short = 'T', long)]
timeout: Option<u64>, timeout: Option<u64>,
/// Output formating /// Output formating
#[clap(flatten)] #[clap(flatten)]
output_kind: ReadOutputKind, output_kind: ReadOutputKind,
}, },
/// Watch changes on a range of values
PollRange {
/// Partition key to poll from
partition_key: String,
/// Output only sort keys matching this filter
#[clap(flatten)]
filter: Filter,
/// Marker of data that had previously been seen by a PollRange
#[clap(short = 'S', long)]
seen_marker: Option<String>,
/// Timeout, in seconds
#[clap(short = 'T', long)]
timeout: Option<u64>,
/// Output formating
#[clap(flatten)]
output_kind: BatchOutputKind,
},
/// Delete a single value /// Delete a single value
Delete { Delete {
/// Partition key to delete from /// Partition key to delete from
@ -176,7 +195,6 @@ struct ReadOutputKind {
impl ReadOutputKind { impl ReadOutputKind {
fn display_output(&self, val: CausalValue) -> ! { fn display_output(&self, val: CausalValue) -> ! {
use std::io::Write; use std::io::Write;
use std::process::exit;
if self.json { if self.json {
let stdout = std::io::stdout(); let stdout = std::io::stdout();
@ -254,6 +272,83 @@ struct BatchOutputKind {
json: bool, json: bool,
} }
impl BatchOutputKind {
fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! {
for (key, values) in values {
println!("key: {}", key);
let causality: String = values.causality.into();
println!("causality: {}", causality);
for value in values.value {
match value {
K2vValue::Value(v) => {
if let Ok(string) = std::str::from_utf8(&v) {
println!(" value(utf-8): {}", string);
} else {
println!(" value(base64): {}", base64::encode(&v));
}
}
K2vValue::Tombstone => {
println!(" tombstone");
}
}
}
}
exit(0);
}
fn values_json(&self, values: BTreeMap<String, CausalValue>) -> Vec<serde_json::Value> {
values
.into_iter()
.map(|(k, v)| {
let mut value = serde_json::to_value(v).unwrap();
value
.as_object_mut()
.unwrap()
.insert("sort_key".to_owned(), k.into());
value
})
.collect::<Vec<_>>()
}
fn display_poll_range_output(
&self,
seen_marker: String,
values: BTreeMap<String, CausalValue>,
) -> ! {
if self.json {
let json = serde_json::json!({
"values": self.values_json(values),
"seen_marker": seen_marker,
});
let stdout = std::io::stdout();
serde_json::to_writer_pretty(stdout, &json).unwrap();
exit(0)
} else {
println!("seen marker: {}", seen_marker);
self.display_human_output(values)
}
}
fn display_read_range_output(&self, res: PaginatedRange<CausalValue>) -> ! {
if self.json {
let json = serde_json::json!({
"next_key": res.next_start,
"values": self.values_json(res.items),
});
let stdout = std::io::stdout();
serde_json::to_writer_pretty(stdout, &json).unwrap();
exit(0)
} else {
if let Some(next) = res.next_start {
println!("next key: {}", next);
}
self.display_human_output(res.items)
}
}
}
/// Filter for batch operations /// Filter for batch operations
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))] #[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))]
@ -342,7 +437,7 @@ async fn main() -> Result<(), Error> {
let res = client.read_item(&partition_key, &sort_key).await?; let res = client.read_item(&partition_key, &sort_key).await?;
output_kind.display_output(res); output_kind.display_output(res);
} }
Command::Poll { Command::PollItem {
partition_key, partition_key,
sort_key, sort_key,
causality, causality,
@ -355,10 +450,57 @@ async fn main() -> Result<(), Error> {
.await?; .await?;
if let Some(res) = res_opt { if let Some(res) = res_opt {
output_kind.display_output(res); output_kind.display_output(res);
} else {
if output_kind.json {
println!("null");
} else { } else {
println!("Delay expired and value didn't change."); println!("Delay expired and value didn't change.");
} }
} }
}
Command::PollRange {
partition_key,
filter,
seen_marker,
timeout,
output_kind,
} => {
if filter.conflicts_only
|| filter.tombstones
|| filter.reverse
|| filter.limit.is_some()
{
return Err(Error::Message(
"limit, reverse, conlicts-only, tombstones are invalid for poll-range".into(),
));
}
let timeout = timeout.map(Duration::from_secs);
let res = client
.poll_range(
&partition_key,
Some(PollRangeFilter {
start: filter.start.as_deref(),
end: filter.end.as_deref(),
prefix: filter.prefix.as_deref(),
}),
seen_marker.as_deref(),
timeout,
)
.await?;
match res {
Some((items, seen_marker)) => {
output_kind.display_poll_range_output(seen_marker, items);
}
None => {
if output_kind.json {
println!("null");
} else {
println!("Delay expired and value didn't change.");
}
}
}
}
Command::ReadIndex { Command::ReadIndex {
output_kind, output_kind,
filter, filter,
@ -419,50 +561,7 @@ async fn main() -> Result<(), Error> {
}; };
let mut res = client.read_batch(&[op]).await?; let mut res = client.read_batch(&[op]).await?;
let res = res.pop().unwrap(); let res = res.pop().unwrap();
if output_kind.json { output_kind.display_read_range_output(res);
let values = res
.items
.into_iter()
.map(|(k, v)| {
let mut value = serde_json::to_value(v).unwrap();
value
.as_object_mut()
.unwrap()
.insert("sort_key".to_owned(), k.into());
value
})
.collect::<Vec<_>>();
let json = serde_json::json!({
"next_key": res.next_start,
"values": values,
});
let stdout = std::io::stdout();
serde_json::to_writer_pretty(stdout, &json).unwrap();
} else {
if let Some(next) = res.next_start {
println!("next key: {}", next);
}
for (key, values) in res.items {
println!("key: {}", key);
let causality: String = values.causality.into();
println!("causality: {}", causality);
for value in values.value {
match value {
K2vValue::Value(v) => {
if let Ok(string) = std::str::from_utf8(&v) {
println!(" value(utf-8): {}", string);
} else {
println!(" value(base64): {}", base64::encode(&v));
}
}
K2vValue::Tombstone => {
println!(" tombstone");
}
}
}
}
}
} }
Command::DeleteRange { Command::DeleteRange {
partition_key, partition_key,

View file

@ -40,7 +40,13 @@ impl K2vClient {
creds: AwsCredentials, creds: AwsCredentials,
user_agent: Option<String>, user_agent: Option<String>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let mut client = HttpClient::new()?; let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build();
let mut client = HttpClient::from_connector(connector);
if let Some(ua) = user_agent { if let Some(ua) = user_agent {
client.local_agent_prepend(ua); client.local_agent_prepend(ua);
} else { } else {
@ -153,6 +159,58 @@ impl K2vClient {
} }
} }
/// Perform a PollRange request, waiting for any change in a given range of keys
/// to occur
pub async fn poll_range(
&self,
partition_key: &str,
filter: Option<PollRangeFilter<'_>>,
seen_marker: Option<&str>,
timeout: Option<Duration>,
) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> {
let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
let request = PollRangeRequest {
filter: filter.unwrap_or_default(),
seen_marker,
timeout: timeout.as_secs(),
};
let mut req = SignedRequest::new(
"POST",
SERVICE,
&self.region,
&format!("/{}/{}", self.bucket, partition_key),
);
req.add_param("poll_range", "");
let payload = serde_json::to_vec(&request)?;
req.set_payload(Some(payload));
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
if res.status == StatusCode::NOT_MODIFIED {
return Ok(None);
}
let resp: PollRangeResponse = serde_json::from_slice(&res.body)?;
let items = resp
.items
.into_iter()
.map(|BatchReadItem { sk, ct, v }| {
(
sk,
CausalValue {
causality: ct,
value: v,
},
)
})
.collect::<BTreeMap<_, _>>();
Ok(Some((items, resp.seen_marker)))
}
/// Perform an InsertItem request, inserting a value for a single pk+sk. /// Perform an InsertItem request, inserting a value for a single pk+sk.
pub async fn insert_item( pub async fn insert_item(
&self, &self,
@ -389,6 +447,12 @@ impl From<CausalityToken> for String {
} }
} }
impl AsRef<str> for CausalityToken {
fn as_ref(&self) -> &str {
&self.0
}
}
/// A value in K2V. can be either a binary value, or a tombstone. /// A value in K2V. can be either a binary value, or a tombstone.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum K2vValue { pub enum K2vValue {
@ -466,6 +530,29 @@ pub struct Filter<'a> {
pub reverse: bool, pub reverse: bool,
} }
#[derive(Debug, Default, Clone, Serialize)]
pub struct PollRangeFilter<'a> {
pub start: Option<&'a str>,
pub end: Option<&'a str>,
pub prefix: Option<&'a str>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct PollRangeRequest<'a> {
#[serde(flatten)]
filter: PollRangeFilter<'a>,
seen_marker: Option<&'a str>,
timeout: u64,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PollRangeResponse {
items: Vec<BatchReadItem>,
seen_marker: String,
}
impl<'a> Filter<'a> { impl<'a> Filter<'a> {
fn insert_params(&self, req: &mut SignedRequest) { fn insert_params(&self, req: &mut SignedRequest) {
if let Some(start) = &self.start { if let Some(start) = &self.start {