Try adding store healthcheck

This commit is contained in:
asonix 2023-07-07 12:05:42 -05:00
parent 345aa7f3d6
commit 7274538b70
4 changed files with 49 additions and 2 deletions

View file

@ -1060,8 +1060,12 @@ async fn identifier<R: FullRepo, S: Store>(
}))) })))
} }
async fn healthz<R: FullRepo>(repo: web::Data<R>) -> Result<HttpResponse, Error> { async fn healthz<R: FullRepo, S: Store>(
repo: web::Data<R>,
store: web::Data<S>,
) -> Result<HttpResponse, Error> {
repo.health_check().await?; repo.health_check().await?;
store.health_check().await?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Ok().finish())
} }
@ -1104,7 +1108,7 @@ fn configure_endpoints<R: FullRepo + 'static, S: Store + 'static>(
.app_data(web::Data::new(repo)) .app_data(web::Data::new(repo))
.app_data(web::Data::new(store)) .app_data(web::Data::new(store))
.app_data(web::Data::new(client)) .app_data(web::Data::new(client))
.route("/healthz", web::get().to(healthz::<R>)) .route("/healthz", web::get().to(healthz::<R, S>))
.service( .service(
web::scope("/image") web::scope("/image")
.service( .service(

View file

@ -77,6 +77,8 @@ pub(crate) trait Store: Clone + Debug {
type Identifier: Identifier + 'static; type Identifier: Identifier + 'static;
type Stream: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static; type Stream: Stream<Item = std::io::Result<Bytes>> + Unpin + 'static;
async fn health_check(&self) -> Result<(), StoreError>;
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError> async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
where where
Reader: AsyncRead + Unpin + 'static; Reader: AsyncRead + Unpin + 'static;
@ -115,6 +117,10 @@ where
type Identifier = T::Identifier; type Identifier = T::Identifier;
type Stream = T::Stream; type Stream = T::Stream;
async fn health_check(&self) -> Result<(), StoreError> {
T::health_check(self).await
}
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError> async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
where where
Reader: AsyncRead + Unpin + 'static, Reader: AsyncRead + Unpin + 'static,
@ -170,6 +176,10 @@ where
type Identifier = T::Identifier; type Identifier = T::Identifier;
type Stream = T::Stream; type Stream = T::Stream;
async fn health_check(&self) -> Result<(), StoreError> {
T::health_check(self).await
}
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError> async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
where where
Reader: AsyncRead + Unpin + 'static, Reader: AsyncRead + Unpin + 'static,

View file

@ -54,6 +54,14 @@ impl Store for FileStore {
type Identifier = FileId; type Identifier = FileId;
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>; type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
async fn health_check(&self) -> Result<(), StoreError> {
tokio::fs::metadata(&self.root_dir)
.await
.map_err(FileError::from)?;
Ok(())
}
#[tracing::instrument(skip(reader))] #[tracing::instrument(skip(reader))]
async fn save_async_read<Reader>( async fn save_async_read<Reader>(
&self, &self,
@ -157,6 +165,10 @@ impl FileStore {
pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result<Self, StoreError> { pub(crate) async fn build(root_dir: PathBuf, repo: Repo) -> Result<Self, StoreError> {
let path_gen = init_generator(&repo).await?; let path_gen = init_generator(&repo).await?;
tokio::fs::create_dir_all(&root_dir)
.await
.map_err(FileError::from)?;
Ok(FileStore { Ok(FileStore {
root_dir, root_dir,
path_gen, path_gen,

View file

@ -169,6 +169,21 @@ impl Store for ObjectStore {
type Identifier = ObjectId; type Identifier = ObjectId;
type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>; type Stream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>>>>;
async fn health_check(&self) -> Result<(), StoreError> {
let response = self
.head_bucket_request()
.await?
.send()
.await
.map_err(ObjectError::from)?;
if !response.status().is_success() {
return Err(status_error(response).await);
}
Ok(())
}
async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError> async fn save_async_read<Reader>(&self, reader: Reader) -> Result<Self::Identifier, StoreError>
where where
Reader: AsyncRead + Unpin + 'static, Reader: AsyncRead + Unpin + 'static,
@ -438,6 +453,12 @@ impl ObjectStore {
}) })
} }
async fn head_bucket_request(&self) -> Result<ClientRequest, StoreError> {
let action = self.bucket.head_bucket(Some(&self.credentials));
Ok(self.build_request(action))
}
async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> { async fn put_object_request(&self) -> Result<(ClientRequest, ObjectId), StoreError> {
let path = self.next_file().await?; let path = self.next_file().await?;