speechmatics: fix hang when one source pad errors out

We still want to push translations / transcripts on the other pads, and
prior to that patch as the pad only paused itself but kept its mpsc
channel alive and stopped reading from it, it would block further messages
from being processed by the other source pads.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1925>
This commit is contained in:
Mathieu Duponchelle 2024-11-14 16:52:30 +01:00
parent 169f7b762e
commit 849ae7c845

View file

@ -336,7 +336,7 @@ impl TranscriberSrcPad {
/* We're EOS, we can pause and exit early */ /* We're EOS, we can pause and exit early */
if send_eos { if send_eos {
let _ = self.obj().pause_task(); let _ = self.pause_task();
return self return self
.obj() .obj()
@ -511,7 +511,7 @@ impl TranscriberSrcPad {
Some(msg) => msg, Some(msg) => msg,
/* Sender was closed */ /* Sender was closed */
None => { None => {
let _ = self.obj().pause_task(); let _ = self.pause_task();
return Ok(()); return Ok(());
} }
}; };
@ -679,7 +679,7 @@ impl TranscriberSrcPad {
if !self.dequeue() { if !self.dequeue() {
gst::info!(CAT, imp = self, "Failed to push gap event, pausing"); gst::info!(CAT, imp = self, "Failed to push gap event, pausing");
let _ = self.obj().pause_task(); let _ = self.pause_task();
} }
Ok(()) Ok(())
} }
@ -687,7 +687,7 @@ impl TranscriberSrcPad {
if !self.dequeue() { if !self.dequeue() {
gst::info!(CAT, imp = self, "Failed to push gap event, pausing"); gst::info!(CAT, imp = self, "Failed to push gap event, pausing");
let _ = self.obj().pause_task(); let _ = self.pause_task();
} }
res res
} }
@ -707,7 +707,7 @@ impl TranscriberSrcPad {
let res = self.obj().start_task(move || { let res = self.obj().start_task(move || {
let Some(this) = this_weak.upgrade() else { let Some(this) = this_weak.upgrade() else {
if let Some(pad) = pad_weak.upgrade() { if let Some(pad) = pad_weak.upgrade() {
let _ = pad.pause_task(); let _ = pad.imp().pause_task();
} }
return; return;
}; };
@ -723,7 +723,7 @@ impl TranscriberSrcPad {
gst::StreamError::Failed, gst::StreamError::Failed,
["Streaming failed: {}", err] ["Streaming failed: {}", err]
); );
let _ = this.obj().pause_task(); let _ = this.pause_task();
} }
}); });
if res.is_err() { if res.is_err() {
@ -732,6 +732,12 @@ impl TranscriberSrcPad {
Ok(()) Ok(())
} }
fn pause_task(&self) -> Result<(), glib::BoolError> {
self.state.lock().unwrap().sender = None;
self.obj().pause_task()
}
fn stop_task(&self) -> Result<(), glib::BoolError> { fn stop_task(&self) -> Result<(), glib::BoolError> {
self.state.lock().unwrap().sender = None; self.state.lock().unwrap().sender = None;