Migrate transcriber to aws sdk

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/759>
This commit is contained in:
Sanchayan Maity 2022-05-17 20:55:08 +05:30 committed by Sebastian Dröge
parent 768fad2445
commit 753425507a
2 changed files with 93 additions and 63 deletions

View file

@ -19,8 +19,11 @@ aws-config = "0.12.0"
aws-sdk-s3 = "0.12.0"
aws-sdk-transcribe = "0.12.0"
aws-types = "0.12.0"
aws-sig-auth = "0.12.0"
aws-smithy-http = { version = "0.42.0", features = [ "rt-tokio" ] }
aws-smithy-types = "0.42.0"
http = "0.2.7"
chrono = "0.4"
rusoto_core = "0.48"
rusoto_s3 = "0.48"
rusoto_credential = "0.48"

View file

@ -13,10 +13,14 @@ use gst::{element_error, error_msg, loggable_error};
use std::default::Default;
use rusoto_core::Region;
use rusoto_credential::{ChainProvider, ProvideAwsCredentials, StaticProvider};
use aws_sig_auth::signer::{self, HttpSignatureType, OperationSigningConfig, RequestConfig};
use aws_smithy_http::body::SdkBody;
use aws_types::region::{Region, SigningRegion};
use aws_types::{Credentials, SigningService};
use std::time::{Duration, SystemTime};
use rusoto_signature::signature::SignedRequest;
use chrono::prelude::*;
use http::Uri;
use async_tungstenite::tungstenite::error::Error as WsError;
use async_tungstenite::{tokio::connect_async, tungstenite::Message};
@ -863,86 +867,109 @@ impl Transcriber {
gst::info!(CAT, obj: element, "Connecting ..");
let creds = match (
settings.access_key.as_ref(),
settings.secret_access_key.as_ref(),
) {
(Some(access_key), Some(secret_access_key)) => {
let _enter = RUNTIME.enter();
futures::executor::block_on(
StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone())
.credentials()
.map_err(|err| {
gst::error!(
CAT,
obj: element,
"Failed to generate credentials: {}",
err
);
error_msg!(
gst::CoreError::Failed,
["Failed to generate credentials: {}", err]
)
}),
)?
}
_ => {
let _enter = RUNTIME.enter();
futures::executor::block_on(ChainProvider::new().credentials()).map_err(|err| {
gst::error!(CAT, obj: element, "Failed to generate credentials: {}", err);
error_msg!(
gst::CoreError::Failed,
["Failed to generate credentials: {}", err]
)
})?
}
};
if settings.access_key.is_none() || settings.secret_access_key.is_none() {
return Err(error_msg!(
gst::LibraryError::Settings,
["Access key and secret access key not provided"]
));
}
let access_key = settings.access_key.as_ref().unwrap().clone();
let secret_access_key = settings.secret_access_key.as_ref().unwrap().clone();
let credentials = Credentials::new(access_key, secret_access_key, None, None, "transcribe");
let region = Region::new("us-east-1");
let current_time = Utc::now();
let mut query_params = String::from("/stream-transcription-websocket?");
let language_code = settings
.language_code
.as_ref()
.expect("Language code is required");
let region = Region::UsEast1;
let mut signed = SignedRequest::new(
"GET",
"transcribe",
&region,
"/stream-transcription-websocket",
query_params.push_str(
format!(
"language-code={}&media-encoding=pcm&sample-rate={}",
language_code,
&sample_rate.to_string(),
)
.as_str(),
);
signed.set_hostname(Some(format!(
"transcribestreaming.{}.amazonaws.com:8443",
region.name()
)));
signed.add_param("language-code", language_code);
signed.add_param("media-encoding", "pcm");
signed.add_param("sample-rate", &sample_rate.to_string());
if let Some(ref vocabulary) = settings.vocabulary {
signed.add_param("vocabulary-name", vocabulary);
query_params.push_str(format!("&vocabulary-name={}", vocabulary).as_str());
}
if let Some(ref session_id) = settings.session_id {
gst::debug!(CAT, obj: element, "Using session ID: {}", session_id);
signed.add_param("session-id", session_id);
query_params.push_str(format!("&session-id={}", session_id).as_str());
}
signed.add_param("enable-partial-results-stabilization", "true");
signed.add_param(
"partial-results-stability",
match settings.results_stability {
AwsTranscriberResultStability::High => "high",
AwsTranscriberResultStability::Medium => "medium",
AwsTranscriberResultStability::Low => "low",
},
query_params.push_str("&enable-partial-results-stabilization=true");
query_params.push_str(
format!(
"&partial-results-stability={}",
match settings.results_stability {
AwsTranscriberResultStability::High => "high",
AwsTranscriberResultStability::Medium => "medium",
AwsTranscriberResultStability::Low => "low",
}
)
.as_str(),
);
drop(settings);
drop(state);
let url =
signed.generate_presigned_url(&creds, &std::time::Duration::from_secs(5 * 60), true);
let signer = signer::SigV4Signer::new();
let mut operation_config = OperationSigningConfig::default_config();
operation_config.signature_type = HttpSignatureType::HttpRequestQueryParams;
operation_config.expires_in = Some(Duration::from_secs(5 * 60)); // See commit a3db85d.
let request_config = RequestConfig {
request_ts: SystemTime::from(current_time),
region: &SigningRegion::from(region.clone()),
service: &SigningService::from_static("transcribe"),
payload_override: None,
};
let transcribe_uri = Uri::builder()
.scheme("https")
.authority(format!("transcribestreaming.{}.amazonaws.com:8443", region).as_str())
.path_and_query(query_params.clone())
.build()
.map_err(|err| {
gst::error!(
CAT,
obj: element,
"Failed to build HTTP request URI: {}",
err
);
return error_msg!(
gst::CoreError::Failed,
["Failed to build HTTP request URI: {}", err]
);
})?;
let mut request = http::Request::builder()
.uri(transcribe_uri)
.body(SdkBody::empty())
.expect("Failed to build valid request");
let _signature = signer
.sign(
&operation_config,
&request_config,
&credentials,
&mut request,
)
.map_err(|err| {
gst::error!(CAT, obj: element, "Failed to sign HTTP request: {}", err);
error_msg!(
gst::CoreError::Failed,
["Failed to sign HTTP request: {}", err]
)
})?;
let url = request.uri().to_string();
let (ws, _) = {
let _enter = RUNTIME.enter();