Merge pull request #1 from rafaelcaricio/with-channels

Trying with channels
This commit is contained in:
Rafael Caricio 2022-02-11 14:09:56 +01:00 committed by GitHub
commit 1a61841a0e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 30 deletions

1
Cargo.lock generated
View file

@ -284,6 +284,7 @@ dependencies = [
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tonic", "tonic",
"tracing",
"tracing-subscriber", "tracing-subscriber",
] ]

View file

@ -6,6 +6,7 @@ edition = "2021"
[dependencies] [dependencies]
eyre = "0.6.6" eyre = "0.6.6"
log = "0.4" log = "0.4"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { version = "1.15", features = ["macros", "rt-multi-thread", "fs"] } tokio = { version = "1.15", features = ["macros", "rt-multi-thread", "fs"] }
google-authz = { version = "1.0.0-alpha.2", features = ["tonic"] } google-authz = { version = "1.0.0-alpha.2", features = ["tonic"] }
@ -19,3 +20,6 @@ google-api-proto = { version = "1.0.0-alpha", features = ["google-cloud-translat
tokio-stream = "0.1.8" tokio-stream = "0.1.8"
futures-util = "0.3" futures-util = "0.3"
async-stream = "*" async-stream = "*"
#glib = "0.15.4"
#gst = { package = "gstreamer", version = "0.18.3" }
#gstreamer-base = "0.18.0"

View file

@ -5,14 +5,16 @@ use google_api_proto::google::cloud::speech::v1::{
StreamingRecognitionConfig, StreamingRecognizeRequest, StreamingRecognitionConfig, StreamingRecognizeRequest,
}; };
use google_authz::{Credentials, GoogleAuthz}; use google_authz::{Credentials, GoogleAuthz};
use log::{debug, info}; use log::{debug, info, warn};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::transport::Channel; use tonic::transport::Channel;
use tracing::Instrument;
#[tokio::main] #[tokio::main]
async fn main() -> eyre::Result<()> { async fn main() -> eyre::Result<()> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
//console_subscriber::init(); // console_subscriber::init();
debug!("starting..."); debug!("starting...");
@ -36,28 +38,31 @@ async fn main() -> eyre::Result<()> {
let mut client = SpeechClient::new(channel); let mut client = SpeechClient::new(channel);
let outbound = async_stream::stream! {
let request = StreamingRecognizeRequest { let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
streaming_request: Some(StreamingRequest::StreamingConfig(
StreamingRecognitionConfig { sender.send(StreamingRecognizeRequest {
config: Some(RecognitionConfig { streaming_request: Some(StreamingRequest::StreamingConfig(
encoding: AudioEncoding::Flac.into(), // matching current example file StreamingRecognitionConfig {
sample_rate_hertz: 44_100, // matching current example file config: Some(RecognitionConfig {
audio_channel_count: 2, encoding: AudioEncoding::Flac.into(), // matching current example file
language_code: "en-US".to_string(), // we only support en-US to start with sample_rate_hertz: 44_100, // matching current example file
model: "video".to_string(), // dictate does not set this option audio_channel_count: 2,
use_enhanced: true, // dictate does not set this option language_code: "en-US".to_string(), // we only support en-US to start with
profanity_filter: true, // used by Dictate, so we also use it here model: "video".to_string(), // dictate does not set this option
enable_word_time_offsets: true, // important so we can get the spoken word time ranges use_enhanced: true, // dictate does not set this option
max_alternatives: 1, // make sure the default is used profanity_filter: true, // used by Dictate, so we also use it here
..Default::default() enable_word_time_offsets: true, // important so we can get the spoken word time ranges
}), max_alternatives: 1, // make sure the default is used
single_utterance: false, ..Default::default()
interim_results: false, }),
}, single_utterance: false,
)), interim_results: false,
}; },
yield request; )),
})?;
tokio::spawn(async move {
let file = tokio::fs::File::open("some-audio.flac").await.unwrap(); let file = tokio::fs::File::open("some-audio.flac").await.unwrap();
let mut audio_file = tokio::io::BufReader::new(file); let mut audio_file = tokio::io::BufReader::new(file);
// read file chunk // read file chunk
@ -69,18 +74,18 @@ async fn main() -> eyre::Result<()> {
BytesMut::from(&buffer.as_slice()[..n]).freeze(), BytesMut::from(&buffer.as_slice()[..n]).freeze(),
)), )),
}; };
yield request; sender.send(request).unwrap();
// debug!("added a buffer to the sender queue: {} bytes", n); //debug!("added a buffer to the sender queue: {} bytes", n);
tokio::time::sleep(std::time::Duration::from_millis(100)).await; tokio::time::sleep(std::time::Duration::from_millis(100)).await;
} }
}; });
let response = client let response = client
.streaming_recognize(tonic::Request::new(outbound)) .streaming_recognize(UnboundedReceiverStream::new(receiver))
.await?; .await?;
let mut inbound = response.into_inner(); let mut inbound = response.into_inner();
while let Some(response) = inbound.message().await? { while let Some(response) = inbound.message().instrument(tracing::info_span!("transcription-results")).await? {
let mut num_results = 0; let mut num_results = 0;
for res in &response.results { for res in &response.results {
num_results = num_results + 1; num_results = num_results + 1;