diff --git a/Cargo.lock b/Cargo.lock index a3ba129..ed23933 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -272,6 +272,7 @@ dependencies = [ name = "google-cloud-playground" version = "0.1.0" dependencies = [ + "async-stream", "bytes", "eyre", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index af070ad..7a3251b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,4 @@ prost-types = { version = "0.9" } google-api-proto = { version = "1.0.0-alpha", features = ["google-cloud-translation-v3", "google-cloud-speech-v1"] } tokio-stream = "0.1.8" futures-util = "0.3" +async-stream = "*" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 97878bc..ceec708 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,20 +1,12 @@ -use std::io::Cursor; use bytes::BytesMut; -use std::path::PathBuf; -use std::pin::Pin; use google_api_proto::google::cloud::speech::v1::streaming_recognize_request::StreamingRequest; use google_api_proto::google::cloud::speech::v1::{ recognition_config::AudioEncoding, speech_client::SpeechClient, RecognitionConfig, StreamingRecognitionConfig, StreamingRecognizeRequest, }; -use google_api_proto::google::cloud::translation::v3::{TranslateTextRequest, Translation}; -use google_api_proto::google::cloud::translation::v3::translation_service_client::TranslationServiceClient; use google_authz::{Credentials, GoogleAuthz}; -use log::{debug, info}; -use prost_types::Duration; +use log::debug; use tokio::io::AsyncReadExt; -use tokio_stream::wrappers::ReceiverStream; -use tonic::IntoStreamingRequest; use tonic::transport::Channel; #[tokio::main] @@ -42,95 +34,52 @@ async fn main() -> eyre::Result<()> { debug!("authenticated channel created!"); - - // let mut translate = TranslationServiceClient::new(channel_translate); - // let resp = translate.translate_text(TranslateTextRequest { - // contents: vec!["Que palhacada danada".to_string()], - // mime_type: "text/plain".to_string(), - // target_language_code: "en_US".to_string(), - // ..Default::default() - // }).await.unwrap(); - // debug!("requested translation"); - // - // for trans in resp.into_inner().translations.iter() { - // debug!("translation = {} // {}", trans.translated_text, trans.detected_language_code); - // } - let mut client = SpeechClient::new(channel); - let (sender, receiver) = tokio::sync::mpsc::channel(1024); - let receiver_stream = Box::pin(ReceiverStream::new(receiver)); - let mut stream = client.streaming_recognize(receiver_stream).await?.into_inner(); - debug!("Called the streaming_recognize method"); - - sender.try_send(StreamingRecognizeRequest { - streaming_request: Some(StreamingRequest::StreamingConfig( - StreamingRecognitionConfig { - config: Some(RecognitionConfig { - encoding: AudioEncoding::Flac.into(), // matching current example file - sample_rate_hertz: 48000, // matching current example file - language_code: "en-US".to_string(), // we only support en-US to start with - model: "video".to_string(), // dictate does not set this option - use_enhanced: true, // dictate does not set this option - profanity_filter: true, // used by Dictate, so we also use it here - enable_word_time_offsets: true, // important so we can get the spoken word time ranges - max_alternatives: 1, // make sure the default is used - ..Default::default() - }), - single_utterance: false, - interim_results: false, - }, - )), - })?; - - debug!("sent streaming request configurations"); - - let file = tokio::fs::File::open("some-audio.flac").await?; - let mut audio_file = tokio::io::BufReader::new(file); - - // spawn task reading from file and uploading to Google Speech API - tokio::spawn(async move { - // read file chunk - let mut buffer = [0; 1024 * 5]; - while let Ok(_) = audio_file.read(&mut buffer).await { - // send to server - sender - .try_send(StreamingRecognizeRequest { + let outbound = async_stream::stream! { + let request = StreamingRecognizeRequest { + streaming_request: Some(StreamingRequest::StreamingConfig( + StreamingRecognitionConfig { + config: Some(RecognitionConfig { + encoding: AudioEncoding::Flac.into(), // matching current example file + sample_rate_hertz: 48000, // matching current example file + language_code: "en-US".to_string(), // we only support en-US to start with + model: "video".to_string(), // dictate does not set this option + use_enhanced: true, // dictate does not set this option + profanity_filter: true, // used by Dictate, so we also use it here + enable_word_time_offsets: true, // important so we can get the spoken word time ranges + max_alternatives: 1, // make sure the default is used + ..Default::default() + }), + single_utterance: false, + interim_results: false, + }, + )), + }; + yield request; + let file = tokio::fs::File::open("some-audio.flac").await.unwrap(); + let mut audio_file = tokio::io::BufReader::new(file); + // read file chunk + let mut buffer = [0; 1024 * 5]; + while audio_file.read(&mut buffer).await.is_ok() { + // send to server + let request = StreamingRecognizeRequest { streaming_request: Some(StreamingRequest::AudioContent( BytesMut::from(buffer.as_slice()).freeze(), )), - }) - .unwrap(); - debug!("added a buffer to the sender queue"); - } - }) - .await?; - - debug!("waiting for responses..."); - // continuous receiving the transcribed response - while let Some(response) = stream.message().await? { - let mut num_results = 0; - for res in &response.results { - num_results = num_results + 1; - info!("Result {} {{", num_results); - if let Some(rec) = res.alternatives.first() { - info!("\tTranscription: {}", rec.transcript); - for word_info in &rec.words { - // let start_time: WordTimestamp = word_info.start_time.into(); - let start_time = word_info.start_time.as_ref().unwrap(); - let end_time = word_info.end_time.as_ref().unwrap(); - info!( - "\t - {}: [{}.{} - {}.{}]", - word_info.word, - start_time.seconds, - start_time.nanos, - end_time.seconds, - end_time.nanos - ); - } + }; + yield request; + debug!("added a buffer to the sender queue"); } - info!("}}"); - } + }; + + let response = client + .streaming_recognize(tonic::Request::new(outbound)) + .await?; + let mut inbound = response.into_inner(); + + while let Some(msg) = inbound.message().await? { + debug!("Got a message: {:?}", msg); } Ok(())