Trying with channels

This commit is contained in:
Rafael Caricio 2022-02-11 13:46:14 +01:00
parent 46e602ba36
commit f63fb5d01b
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
3 changed files with 44 additions and 27 deletions

1
Cargo.lock generated
View file

@ -284,6 +284,7 @@ dependencies = [
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tonic", "tonic",
"tracing",
"tracing-subscriber", "tracing-subscriber",
] ]

View file

@ -6,6 +6,7 @@ edition = "2021"
[dependencies] [dependencies]
eyre = "0.6.6" eyre = "0.6.6"
log = "0.4" log = "0.4"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { version = "1.15", features = ["macros", "rt-multi-thread", "fs"] } tokio = { version = "1.15", features = ["macros", "rt-multi-thread", "fs"] }
google-authz = { version = "1.0.0-alpha.2", features = ["tonic"] } google-authz = { version = "1.0.0-alpha.2", features = ["tonic"] }
@ -19,3 +20,6 @@ google-api-proto = { version = "1.0.0-alpha", features = ["google-cloud-translat
tokio-stream = "0.1.8" tokio-stream = "0.1.8"
futures-util = "0.3" futures-util = "0.3"
async-stream = "*" async-stream = "*"
#glib = "0.15.4"
#gst = { package = "gstreamer", version = "0.18.3" }
#gstreamer-base = "0.18.0"

View file

@ -8,6 +8,7 @@ use google_authz::{Credentials, GoogleAuthz};
use log::{debug, info}; use log::{debug, info};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tonic::transport::Channel; use tonic::transport::Channel;
use tracing::Instrument;
#[tokio::main] #[tokio::main]
async fn main() -> eyre::Result<()> { async fn main() -> eyre::Result<()> {
@ -36,8 +37,10 @@ async fn main() -> eyre::Result<()> {
let mut client = SpeechClient::new(channel); let mut client = SpeechClient::new(channel);
let outbound = async_stream::stream! {
let request = StreamingRecognizeRequest { let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
sender.send(StreamingRecognizeRequest {
streaming_request: Some(StreamingRequest::StreamingConfig( streaming_request: Some(StreamingRequest::StreamingConfig(
StreamingRecognitionConfig { StreamingRecognitionConfig {
config: Some(RecognitionConfig { config: Some(RecognitionConfig {
@ -56,8 +59,9 @@ async fn main() -> eyre::Result<()> {
interim_results: false, interim_results: false,
}, },
)), )),
}; })?;
yield request;
tokio::spawn(async move {
let file = tokio::fs::File::open("some-audio.flac").await.unwrap(); let file = tokio::fs::File::open("some-audio.flac").await.unwrap();
let mut audio_file = tokio::io::BufReader::new(file); let mut audio_file = tokio::io::BufReader::new(file);
// read file chunk // read file chunk
@ -69,18 +73,26 @@ async fn main() -> eyre::Result<()> {
BytesMut::from(&buffer.as_slice()[..n]).freeze(), BytesMut::from(&buffer.as_slice()[..n]).freeze(),
)), )),
}; };
yield request; sender.send(request).unwrap();
// debug!("added a buffer to the sender queue: {} bytes", n); //debug!("added a buffer to the sender queue: {} bytes", n);
tokio::time::sleep(std::time::Duration::from_millis(100)).await; tokio::time::sleep(std::time::Duration::from_millis(100)).await;
} }
}).instrument(tracing::info_span!("audio-source")).await?;
let message = async_stream::stream! {
while let Some(message) = receiver.recv().await {
debug!("drained message inside stream...");
yield message;
}
}; };
let response = client let response = client
.streaming_recognize(tonic::Request::new(outbound)) .streaming_recognize(tonic::Request::new(message))
.await?; .await?;
let mut inbound = response.into_inner(); let mut inbound = response.into_inner();
while let Some(response) = inbound.message().await? { while let Some(response) = inbound.message().instrument(tracing::info_span!("transcription-results")).await? {
let mut num_results = 0; let mut num_results = 0;
for res in &response.results { for res in &response.results {
num_results = num_results + 1; num_results = num_results + 1;