Try sending using async_stream

This commit is contained in:
Kuba Clark 2022-02-11 12:12:51 +01:00
parent f851c7952c
commit 247edebe36
3 changed files with 43 additions and 92 deletions

1
Cargo.lock generated
View file

@ -272,6 +272,7 @@ dependencies = [
name = "google-cloud-playground" name = "google-cloud-playground"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-stream",
"bytes", "bytes",
"eyre", "eyre",
"futures-util", "futures-util",

View file

@ -18,3 +18,4 @@ prost-types = { version = "0.9" }
google-api-proto = { version = "1.0.0-alpha", features = ["google-cloud-translation-v3", "google-cloud-speech-v1"] } google-api-proto = { version = "1.0.0-alpha", features = ["google-cloud-translation-v3", "google-cloud-speech-v1"] }
tokio-stream = "0.1.8" tokio-stream = "0.1.8"
futures-util = "0.3" futures-util = "0.3"
async-stream = "*"

View file

@ -1,20 +1,12 @@
use std::io::Cursor;
use bytes::BytesMut; use bytes::BytesMut;
use std::path::PathBuf;
use std::pin::Pin;
use google_api_proto::google::cloud::speech::v1::streaming_recognize_request::StreamingRequest; use google_api_proto::google::cloud::speech::v1::streaming_recognize_request::StreamingRequest;
use google_api_proto::google::cloud::speech::v1::{ use google_api_proto::google::cloud::speech::v1::{
recognition_config::AudioEncoding, speech_client::SpeechClient, RecognitionConfig, recognition_config::AudioEncoding, speech_client::SpeechClient, RecognitionConfig,
StreamingRecognitionConfig, StreamingRecognizeRequest, StreamingRecognitionConfig, StreamingRecognizeRequest,
}; };
use google_api_proto::google::cloud::translation::v3::{TranslateTextRequest, Translation};
use google_api_proto::google::cloud::translation::v3::translation_service_client::TranslationServiceClient;
use google_authz::{Credentials, GoogleAuthz}; use google_authz::{Credentials, GoogleAuthz};
use log::{debug, info}; use log::debug;
use prost_types::Duration;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio_stream::wrappers::ReceiverStream;
use tonic::IntoStreamingRequest;
use tonic::transport::Channel; use tonic::transport::Channel;
#[tokio::main] #[tokio::main]
@ -42,95 +34,52 @@ async fn main() -> eyre::Result<()> {
debug!("authenticated channel created!"); debug!("authenticated channel created!");
// let mut translate = TranslationServiceClient::new(channel_translate);
// let resp = translate.translate_text(TranslateTextRequest {
// contents: vec!["Que palhacada danada".to_string()],
// mime_type: "text/plain".to_string(),
// target_language_code: "en_US".to_string(),
// ..Default::default()
// }).await.unwrap();
// debug!("requested translation");
//
// for trans in resp.into_inner().translations.iter() {
// debug!("translation = {} // {}", trans.translated_text, trans.detected_language_code);
// }
let mut client = SpeechClient::new(channel); let mut client = SpeechClient::new(channel);
let (sender, receiver) = tokio::sync::mpsc::channel(1024); let outbound = async_stream::stream! {
let receiver_stream = Box::pin(ReceiverStream::new(receiver)); let request = StreamingRecognizeRequest {
let mut stream = client.streaming_recognize(receiver_stream).await?.into_inner(); streaming_request: Some(StreamingRequest::StreamingConfig(
debug!("Called the streaming_recognize method"); StreamingRecognitionConfig {
config: Some(RecognitionConfig {
sender.try_send(StreamingRecognizeRequest { encoding: AudioEncoding::Flac.into(), // matching current example file
streaming_request: Some(StreamingRequest::StreamingConfig( sample_rate_hertz: 48000, // matching current example file
StreamingRecognitionConfig { language_code: "en-US".to_string(), // we only support en-US to start with
config: Some(RecognitionConfig { model: "video".to_string(), // dictate does not set this option
encoding: AudioEncoding::Flac.into(), // matching current example file use_enhanced: true, // dictate does not set this option
sample_rate_hertz: 48000, // matching current example file profanity_filter: true, // used by Dictate, so we also use it here
language_code: "en-US".to_string(), // we only support en-US to start with enable_word_time_offsets: true, // important so we can get the spoken word time ranges
model: "video".to_string(), // dictate does not set this option max_alternatives: 1, // make sure the default is used
use_enhanced: true, // dictate does not set this option ..Default::default()
profanity_filter: true, // used by Dictate, so we also use it here }),
enable_word_time_offsets: true, // important so we can get the spoken word time ranges single_utterance: false,
max_alternatives: 1, // make sure the default is used interim_results: false,
..Default::default() },
}), )),
single_utterance: false, };
interim_results: false, yield request;
}, let file = tokio::fs::File::open("some-audio.flac").await.unwrap();
)), let mut audio_file = tokio::io::BufReader::new(file);
})?; // read file chunk
let mut buffer = [0; 1024 * 5];
debug!("sent streaming request configurations"); while audio_file.read(&mut buffer).await.is_ok() {
// send to server
let file = tokio::fs::File::open("some-audio.flac").await?; let request = StreamingRecognizeRequest {
let mut audio_file = tokio::io::BufReader::new(file);
// spawn task reading from file and uploading to Google Speech API
tokio::spawn(async move {
// read file chunk
let mut buffer = [0; 1024 * 5];
while let Ok(_) = audio_file.read(&mut buffer).await {
// send to server
sender
.try_send(StreamingRecognizeRequest {
streaming_request: Some(StreamingRequest::AudioContent( streaming_request: Some(StreamingRequest::AudioContent(
BytesMut::from(buffer.as_slice()).freeze(), BytesMut::from(buffer.as_slice()).freeze(),
)), )),
}) };
.unwrap(); yield request;
debug!("added a buffer to the sender queue"); debug!("added a buffer to the sender queue");
}
})
.await?;
debug!("waiting for responses...");
// continuous receiving the transcribed response
while let Some(response) = stream.message().await? {
let mut num_results = 0;
for res in &response.results {
num_results = num_results + 1;
info!("Result {} {{", num_results);
if let Some(rec) = res.alternatives.first() {
info!("\tTranscription: {}", rec.transcript);
for word_info in &rec.words {
// let start_time: WordTimestamp = word_info.start_time.into();
let start_time = word_info.start_time.as_ref().unwrap();
let end_time = word_info.end_time.as_ref().unwrap();
info!(
"\t - {}: [{}.{} - {}.{}]",
word_info.word,
start_time.seconds,
start_time.nanos,
end_time.seconds,
end_time.nanos
);
}
} }
info!("}}"); };
}
let response = client
.streaming_recognize(tonic::Request::new(outbound))
.await?;
let mut inbound = response.into_inner();
while let Some(msg) = inbound.message().await? {
debug!("Got a message: {:?}", msg);
} }
Ok(()) Ok(())