Not working

This commit is contained in:
Rafael Caricio 2022-03-28 00:07:49 +02:00
parent 8760c7d734
commit 2d9692797a
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947

View file

@ -79,9 +79,6 @@ struct State {
/// Flag to indicate that we need to send the required stream initialization events to the src pad /// Flag to indicate that we need to send the required stream initialization events to the src pad
send_events: bool, send_events: bool,
/// Flag to indicate that we need to send the EOS event to the src pad
send_eos: bool,
/// Flag to indicate that we need to send a discontinuity buffer in the src pad /// Flag to indicate that we need to send a discontinuity buffer in the src pad
send_discontinuity: bool, send_discontinuity: bool,
@ -111,7 +108,6 @@ impl Default for State {
buffers: VecDeque::new(), buffers: VecDeque::new(),
start_time: None, start_time: None,
send_events: true, send_events: true,
send_eos: false,
send_discontinuity: true, send_discontinuity: true,
in_segment: gst::FormattedSegment::new(), in_segment: gst::FormattedSegment::new(),
seqnum: gst::Seqnum::next(), seqnum: gst::Seqnum::next(),
@ -165,8 +161,6 @@ impl Transcriber {
let start_time = state.start_time.unwrap(); let start_time = state.start_time.unwrap();
let mut last_position = state.out_segment.position().unwrap(); let mut last_position = state.out_segment.position().unwrap();
let send_eos = state.send_eos && state.buffers.is_empty();
while let Some(buf) = state.buffers.front() { while let Some(buf) = state.buffers.front() {
let pts = buf.pts().unwrap(); let pts = buf.pts().unwrap();
gst_trace!( gst_trace!(
@ -201,15 +195,6 @@ impl Transcriber {
drop(state); drop(state);
// We're EOS, we can pause and exit early
if send_eos {
let _ = self.srcpad.pause_task();
return self
.srcpad
.push_event(gst::event::Eos::builder().seqnum(seqnum).build());
}
for mut buf in items.drain(..) { for mut buf in items.drain(..) {
let mut pts = buf.pts().unwrap(); let mut pts = buf.pts().unwrap();
let mut duration = buf.duration().unwrap(); let mut duration = buf.duration().unwrap();
@ -314,7 +299,7 @@ impl Transcriber {
gst_debug!( gst_debug!(
CAT, CAT,
obj: element, obj: element,
"Item is ready for queuing: {}, PTS {}", "Item is ready for queuing: \"{}\", PTS {}",
item.word, item.word,
start_time start_time
); );
@ -405,7 +390,7 @@ impl Transcriber {
} }
}; };
gst_trace!( gst_debug!(
CAT, CAT,
obj: element, obj: element,
"result: {}", "result: {}",
@ -533,6 +518,7 @@ impl Transcriber {
if ret { if ret {
let (_, min, _) = peer_query.result(); let (_, min, _) = peer_query.result();
let our_latency = self.settings.lock().unwrap().latency; let our_latency = self.settings.lock().unwrap().latency;
gst_info!(CAT, obj: element, "Replying to latency query: {}", our_latency + min);
// We never drop buffers, so our max latency is set to infinity // We never drop buffers, so our max latency is set to infinity
q.set(true, our_latency + min, gst::ClockTime::NONE); q.set(true, our_latency + min, gst::ClockTime::NONE);
} }
@ -658,22 +644,24 @@ impl Transcriber {
if let Some(ws_sink) = self.ws_sink.borrow_mut().as_mut() { if let Some(ws_sink) = self.ws_sink.borrow_mut().as_mut() {
if let Some(buffer) = buffer { if let Some(buffer) = buffer {
let data = buffer.map_readable().unwrap(); let data = buffer.map_readable().unwrap();
for chunk in data.chunks(8192) { for chunk in data.chunks(8000) {
ws_sink ws_sink
.send(Message::Binary(chunk.to_vec())) .send(Message::Binary(chunk.to_vec()))
.await .await
.map_err(|err| { .map_err(|err| {
gst_error!(CAT, obj: element, "Failed sending packet: {}", err); gst_error!(CAT, obj: element, "Failed sending audio packet to server: {}", err);
gst::FlowError::Error gst::FlowError::Error
})?; })?;
} }
gst_trace!(CAT, obj: element, "Sent complete buffer to server!");
} else { } else {
// Send end of stream // Send end of stream
gst_info!(CAT, obj: element, "Closing transcription session to Vosk server");
ws_sink ws_sink
.send(Message::Text("{\"eof\": 1}".to_string())) .send(Message::Text("{\"eof\": 1}".to_string()))
.await .await
.map_err(|err| { .map_err(|err| {
gst_error!(CAT, obj: element, "Failed sending packet: {}", err); gst_error!(CAT, obj: element, "Failed sending EOF packet to server: {}", err);
gst::FlowError::Error gst::FlowError::Error
})?; })?;
} }
@ -724,11 +712,13 @@ impl Transcriber {
} }
fn ensure_connection(&self, element: &super::Transcriber) -> Result<(), gst::ErrorMessage> { fn ensure_connection(&self, element: &super::Transcriber) -> Result<(), gst::ErrorMessage> {
{
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if state.connected { if state.connected {
return Ok(()); return Ok(());
} }
}
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
if settings.latency <= 2 * GRANULARITY { if settings.latency <= 2 * GRANULARITY {
@ -748,7 +738,6 @@ impl Transcriber {
let url = settings.server_address.clone(); let url = settings.server_address.clone();
drop(settings); drop(settings);
drop(state);
let (ws, _) = { let (ws, _) = {
let _enter = RUNTIME.enter(); let _enter = RUNTIME.enter();
@ -774,6 +763,8 @@ impl Transcriber {
let s = in_caps.structure(0).unwrap(); let s = in_caps.structure(0).unwrap();
let sample_rate = s.get::<i32>("rate").unwrap(); let sample_rate = s.get::<i32>("rate").unwrap();
gst_debug!(CAT, obj: &element, "Configuring transcription session, sample_rate={}", sample_rate);
let config = Configuration::new(sample_rate); let config = Configuration::new(sample_rate);
let packet = serde_json::to_vec(&config).unwrap(); let packet = serde_json::to_vec(&config).unwrap();
let msg = Message::Binary(packet); let msg = Message::Binary(packet);
@ -788,6 +779,8 @@ impl Transcriber {
return; return;
} }
} else {
return;
} }
} }
@ -795,11 +788,7 @@ impl Transcriber {
let transcribe = element.imp(); let transcribe = element.imp();
let msg = match ws_stream.next().await { let msg = match ws_stream.next().await {
Some(msg) => msg, Some(msg) => msg,
None => { None => continue
let mut state = transcribe.state.lock().unwrap();
state.send_eos = true;
break;
}
}; };
let msg = match msg { let msg = match msg {
@ -818,9 +807,12 @@ impl Transcriber {
let mut sender = transcribe.state.lock().unwrap().sender.clone(); let mut sender = transcribe.state.lock().unwrap().sender.clone();
if let Some(sender) = sender.as_mut() { if let Some(sender) = sender.as_mut() {
if sender.send(msg).await.is_err() { if let Err(err) = sender.send(msg).await {
gst_error!(CAT, obj: &element, "Stopped RECV handler: {}", err);
break; break;
} }
} else {
break;
} }
} }
}; };
@ -921,7 +913,8 @@ impl ObjectSubclass for Transcriber {
impl ObjectImpl for Transcriber { impl ObjectImpl for Transcriber {
fn properties() -> &'static [glib::ParamSpec] { fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpecUInt::new( vec![
glib::ParamSpecUInt::new(
"latency", "latency",
"Latency", "Latency",
"Amount of milliseconds to allow Vosk to transcribe", "Amount of milliseconds to allow Vosk to transcribe",
@ -929,7 +922,15 @@ impl ObjectImpl for Transcriber {
std::u32::MAX, std::u32::MAX,
DEFAULT_LATENCY.mseconds() as u32, DEFAULT_LATENCY.mseconds() as u32,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
)] ),
glib::ParamSpecString::new(
"server-address",
"Server Address",
"Address of the Vosk websocket server",
Some(DEFAULT_SERVER_ADDRESS),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
)
]
}); });
PROPERTIES.as_ref() PROPERTIES.as_ref()
@ -957,6 +958,10 @@ impl ObjectImpl for Transcriber {
value.get::<u32>().expect("type checked upstream").into(), value.get::<u32>().expect("type checked upstream").into(),
); );
} }
"server-address" => {
let mut settings = self.settings.lock().unwrap();
settings.server_address = value.get().expect("type checked upstream")
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -967,6 +972,10 @@ impl ObjectImpl for Transcriber {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
(settings.latency.mseconds() as u32).to_value() (settings.latency.mseconds() as u32).to_value()
} }
"server-address" => {
let settings = self.settings.lock().unwrap();
settings.server_address.to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -1002,7 +1011,7 @@ impl ElementImpl for Transcriber {
.unwrap(); .unwrap();
let sink_caps = gst::Caps::builder("audio/x-raw") let sink_caps = gst::Caps::builder("audio/x-raw")
.field("format", "S16LE") .field("format", "S16BE")
.field("rate", gst::IntRange::new(8000_i32, 48000)) .field("rate", gst::IntRange::new(8000_i32, 48000))
.field("channels", 1) .field("channels", 1)
.build(); .build();