From 753425507a9b4f2fa4f35ae4c43155214e4f08a8 Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Tue, 17 May 2022 20:55:08 +0530 Subject: [PATCH] Migrate transcriber to aws sdk Part-of: --- net/rusoto/Cargo.toml | 3 + net/rusoto/src/aws_transcriber/imp.rs | 153 +++++++++++++++----------- 2 files changed, 93 insertions(+), 63 deletions(-) diff --git a/net/rusoto/Cargo.toml b/net/rusoto/Cargo.toml index 6a724d01..e07f20f9 100644 --- a/net/rusoto/Cargo.toml +++ b/net/rusoto/Cargo.toml @@ -19,8 +19,11 @@ aws-config = "0.12.0" aws-sdk-s3 = "0.12.0" aws-sdk-transcribe = "0.12.0" aws-types = "0.12.0" +aws-sig-auth = "0.12.0" aws-smithy-http = { version = "0.42.0", features = [ "rt-tokio" ] } aws-smithy-types = "0.42.0" +http = "0.2.7" +chrono = "0.4" rusoto_core = "0.48" rusoto_s3 = "0.48" rusoto_credential = "0.48" diff --git a/net/rusoto/src/aws_transcriber/imp.rs b/net/rusoto/src/aws_transcriber/imp.rs index 19d1cd59..dc06e0c9 100644 --- a/net/rusoto/src/aws_transcriber/imp.rs +++ b/net/rusoto/src/aws_transcriber/imp.rs @@ -13,10 +13,14 @@ use gst::{element_error, error_msg, loggable_error}; use std::default::Default; -use rusoto_core::Region; -use rusoto_credential::{ChainProvider, ProvideAwsCredentials, StaticProvider}; +use aws_sig_auth::signer::{self, HttpSignatureType, OperationSigningConfig, RequestConfig}; +use aws_smithy_http::body::SdkBody; +use aws_types::region::{Region, SigningRegion}; +use aws_types::{Credentials, SigningService}; +use std::time::{Duration, SystemTime}; -use rusoto_signature::signature::SignedRequest; +use chrono::prelude::*; +use http::Uri; use async_tungstenite::tungstenite::error::Error as WsError; use async_tungstenite::{tokio::connect_async, tungstenite::Message}; @@ -863,86 +867,109 @@ impl Transcriber { gst::info!(CAT, obj: element, "Connecting .."); - let creds = match ( - settings.access_key.as_ref(), - settings.secret_access_key.as_ref(), - ) { - (Some(access_key), Some(secret_access_key)) => { - let _enter = RUNTIME.enter(); - futures::executor::block_on( - StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone()) - .credentials() - .map_err(|err| { - gst::error!( - CAT, - obj: element, - "Failed to generate credentials: {}", - err - ); - error_msg!( - gst::CoreError::Failed, - ["Failed to generate credentials: {}", err] - ) - }), - )? - } - _ => { - let _enter = RUNTIME.enter(); - futures::executor::block_on(ChainProvider::new().credentials()).map_err(|err| { - gst::error!(CAT, obj: element, "Failed to generate credentials: {}", err); - error_msg!( - gst::CoreError::Failed, - ["Failed to generate credentials: {}", err] - ) - })? - } - }; + if settings.access_key.is_none() || settings.secret_access_key.is_none() { + return Err(error_msg!( + gst::LibraryError::Settings, + ["Access key and secret access key not provided"] + )); + } + + let access_key = settings.access_key.as_ref().unwrap().clone(); + let secret_access_key = settings.secret_access_key.as_ref().unwrap().clone(); + let credentials = Credentials::new(access_key, secret_access_key, None, None, "transcribe"); + + let region = Region::new("us-east-1"); + let current_time = Utc::now(); + + let mut query_params = String::from("/stream-transcription-websocket?"); let language_code = settings .language_code .as_ref() .expect("Language code is required"); - let region = Region::UsEast1; - - let mut signed = SignedRequest::new( - "GET", - "transcribe", - ®ion, - "/stream-transcription-websocket", + query_params.push_str( + format!( + "language-code={}&media-encoding=pcm&sample-rate={}", + language_code, + &sample_rate.to_string(), + ) + .as_str(), ); - signed.set_hostname(Some(format!( - "transcribestreaming.{}.amazonaws.com:8443", - region.name() - ))); - signed.add_param("language-code", language_code); - signed.add_param("media-encoding", "pcm"); - signed.add_param("sample-rate", &sample_rate.to_string()); if let Some(ref vocabulary) = settings.vocabulary { - signed.add_param("vocabulary-name", vocabulary); + query_params.push_str(format!("&vocabulary-name={}", vocabulary).as_str()); } if let Some(ref session_id) = settings.session_id { gst::debug!(CAT, obj: element, "Using session ID: {}", session_id); - signed.add_param("session-id", session_id); + query_params.push_str(format!("&session-id={}", session_id).as_str()); } - signed.add_param("enable-partial-results-stabilization", "true"); - signed.add_param( - "partial-results-stability", - match settings.results_stability { - AwsTranscriberResultStability::High => "high", - AwsTranscriberResultStability::Medium => "medium", - AwsTranscriberResultStability::Low => "low", - }, + query_params.push_str("&enable-partial-results-stabilization=true"); + + query_params.push_str( + format!( + "&partial-results-stability={}", + match settings.results_stability { + AwsTranscriberResultStability::High => "high", + AwsTranscriberResultStability::Medium => "medium", + AwsTranscriberResultStability::Low => "low", + } + ) + .as_str(), ); drop(settings); drop(state); - let url = - signed.generate_presigned_url(&creds, &std::time::Duration::from_secs(5 * 60), true); + let signer = signer::SigV4Signer::new(); + let mut operation_config = OperationSigningConfig::default_config(); + operation_config.signature_type = HttpSignatureType::HttpRequestQueryParams; + operation_config.expires_in = Some(Duration::from_secs(5 * 60)); // See commit a3db85d. + + let request_config = RequestConfig { + request_ts: SystemTime::from(current_time), + region: &SigningRegion::from(region.clone()), + service: &SigningService::from_static("transcribe"), + payload_override: None, + }; + let transcribe_uri = Uri::builder() + .scheme("https") + .authority(format!("transcribestreaming.{}.amazonaws.com:8443", region).as_str()) + .path_and_query(query_params.clone()) + .build() + .map_err(|err| { + gst::error!( + CAT, + obj: element, + "Failed to build HTTP request URI: {}", + err + ); + return error_msg!( + gst::CoreError::Failed, + ["Failed to build HTTP request URI: {}", err] + ); + })?; + let mut request = http::Request::builder() + .uri(transcribe_uri) + .body(SdkBody::empty()) + .expect("Failed to build valid request"); + let _signature = signer + .sign( + &operation_config, + &request_config, + &credentials, + &mut request, + ) + .map_err(|err| { + gst::error!(CAT, obj: element, "Failed to sign HTTP request: {}", err); + error_msg!( + gst::CoreError::Failed, + ["Failed to sign HTTP request: {}", err] + ) + })?; + let url = request.uri().to_string(); let (ws, _) = { let _enter = RUNTIME.enter();