Getting some words

This commit is contained in:
Rafael Caricio 2022-03-31 11:21:15 +02:00
parent daab674c23
commit 8f85e53ca2
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
2 changed files with 55 additions and 28 deletions

View file

@ -6,6 +6,8 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
extern crate core;
use gst::glib; use gst::glib;
mod transcriber; mod transcriber;

View file

@ -518,7 +518,12 @@ impl Transcriber {
if ret { if ret {
let (_, min, _) = peer_query.result(); let (_, min, _) = peer_query.result();
let our_latency = self.settings.lock().unwrap().latency; let our_latency = self.settings.lock().unwrap().latency;
gst_info!(CAT, obj: element, "Replying to latency query: {}", our_latency + min); gst_info!(
CAT,
obj: element,
"Replying to latency query: {}",
our_latency + min
);
// We never drop buffers, so our max latency is set to infinity // We never drop buffers, so our max latency is set to infinity
q.set(true, our_latency + min, gst::ClockTime::NONE); q.set(true, our_latency + min, gst::ClockTime::NONE);
} }
@ -624,44 +629,59 @@ impl Transcriber {
buffer: Option<gst::Buffer>, buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut delay = None; let mut delay = None;
{
let state = self.state.lock().unwrap();
if let Some(buffer) = &buffer { if let Some(buffer) = &buffer {
let state = self.state.lock().unwrap();
let running_time = state.in_segment.to_running_time(buffer.pts()); let running_time = state.in_segment.to_running_time(buffer.pts());
let now = element.current_running_time(); let now = element.current_running_time();
delay = running_time.opt_checked_sub(now).ok().flatten(); delay = running_time.opt_checked_sub(now).ok().flatten();
} }
}
// Wait until now is close enough to the buffer's PTS // Wait until now is close enough to the buffer's PTS
if let Some(delay) = delay { if let Some(delay) = delay {
gst_trace!(
CAT,
obj: element,
"Waiting {:?} before sending buffer",
delay
);
tokio::time::sleep(delay.into()).await; tokio::time::sleep(delay.into()).await;
} }
if let Some(ws_sink) = self.ws_sink.borrow_mut().as_mut() { if let Some(ws_sink) = self.ws_sink.borrow_mut().as_mut() {
if let Some(buffer) = buffer { if let Some(buffer) = buffer {
let data = buffer.map_readable().unwrap(); let data = buffer.map_readable().unwrap();
for chunk in data.chunks(8000) {
gst_trace!(CAT, obj: element, "Sending {} bytes", data.len());
ws_sink ws_sink
.send(Message::Binary(chunk.to_vec())) .send(Message::Binary(data.to_vec()))
.await .await
.map_err(|err| { .map_err(|err| {
gst_error!(CAT, obj: element, "Failed sending audio packet to server: {}", err); gst_error!(
CAT,
obj: element,
"Failed sending audio packet to server: {}",
err
);
gst::FlowError::Error gst::FlowError::Error
})?; })?;
}
gst_trace!(CAT, obj: element, "Sent complete buffer to server!");
} else { } else {
// Send end of stream gst_info!(
gst_info!(CAT, obj: element, "Closing transcription session to Vosk server"); CAT,
obj: element,
"Closing transcription session to Vosk server"
);
ws_sink ws_sink
.send(Message::Text("{\"eof\": 1}".to_string())) .send(Message::Text("{\"eof\": 1}".to_string()))
.await .await
.map_err(|err| { .map_err(|err| {
gst_error!(CAT, obj: element, "Failed sending EOF packet to server: {}", err); gst_error!(
CAT,
obj: element,
"Failed sending EOF packet to server: {}",
err
);
gst::FlowError::Error gst::FlowError::Error
})?; })?;
} }
@ -676,7 +696,7 @@ impl Transcriber {
element: &super::Transcriber, element: &super::Transcriber,
buffer: Option<gst::Buffer>, buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(CAT, obj: element, "Handling {:?}", buffer); gst_trace!(CAT, obj: element, "Handling {:?}", buffer);
self.ensure_connection(element).map_err(|err| { self.ensure_connection(element).map_err(|err| {
element_error!( element_error!(
@ -763,7 +783,12 @@ impl Transcriber {
let s = in_caps.structure(0).unwrap(); let s = in_caps.structure(0).unwrap();
let sample_rate = s.get::<i32>("rate").unwrap(); let sample_rate = s.get::<i32>("rate").unwrap();
gst_debug!(CAT, obj: &element, "Configuring transcription session, sample_rate={}", sample_rate); gst_debug!(
CAT,
obj: &element,
"Configuring transcription session, sample_rate={}",
sample_rate
);
let config = Configuration::new(sample_rate); let config = Configuration::new(sample_rate);
let packet = serde_json::to_vec(&config).unwrap(); let packet = serde_json::to_vec(&config).unwrap();
@ -788,7 +813,7 @@ impl Transcriber {
let transcribe = element.imp(); let transcribe = element.imp();
let msg = match ws_stream.next().await { let msg = match ws_stream.next().await {
Some(msg) => msg, Some(msg) => msg,
None => continue None => continue,
}; };
let msg = match msg { let msg = match msg {
@ -929,7 +954,7 @@ impl ObjectImpl for Transcriber {
"Address of the Vosk websocket server", "Address of the Vosk websocket server",
Some(DEFAULT_SERVER_ADDRESS), Some(DEFAULT_SERVER_ADDRESS),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
) ),
] ]
}); });
@ -1011,8 +1036,8 @@ impl ElementImpl for Transcriber {
.unwrap(); .unwrap();
let sink_caps = gst::Caps::builder("audio/x-raw") let sink_caps = gst::Caps::builder("audio/x-raw")
.field("format", "S16BE") .field("format", "S16LE")
.field("rate", gst::IntRange::new(8000_i32, 48000)) .field("rate", 16000)
.field("channels", 1) .field("channels", 1)
.build(); .build();
let sink_pad_template = gst::PadTemplate::new( let sink_pad_template = gst::PadTemplate::new(