From 849ae7c84544b54c4bde688665afca6c45d7265e Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Thu, 14 Nov 2024 16:52:30 +0100 Subject: [PATCH] speechmatics: fix hang when one source pad errors out We still want to push translations / transcripts on the other pads, and prior to that patch as the pad only paused itself but kept its mpsc channel alive and stopped reading from it, it would block further messages from being processed by the other source pads. Part-of: --- audio/speechmatics/src/transcriber/imp.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/audio/speechmatics/src/transcriber/imp.rs b/audio/speechmatics/src/transcriber/imp.rs index 740ca187..73e334ac 100644 --- a/audio/speechmatics/src/transcriber/imp.rs +++ b/audio/speechmatics/src/transcriber/imp.rs @@ -336,7 +336,7 @@ impl TranscriberSrcPad { /* We're EOS, we can pause and exit early */ if send_eos { - let _ = self.obj().pause_task(); + let _ = self.pause_task(); return self .obj() @@ -511,7 +511,7 @@ impl TranscriberSrcPad { Some(msg) => msg, /* Sender was closed */ None => { - let _ = self.obj().pause_task(); + let _ = self.pause_task(); return Ok(()); } }; @@ -679,7 +679,7 @@ impl TranscriberSrcPad { if !self.dequeue() { gst::info!(CAT, imp = self, "Failed to push gap event, pausing"); - let _ = self.obj().pause_task(); + let _ = self.pause_task(); } Ok(()) } @@ -687,7 +687,7 @@ impl TranscriberSrcPad { if !self.dequeue() { gst::info!(CAT, imp = self, "Failed to push gap event, pausing"); - let _ = self.obj().pause_task(); + let _ = self.pause_task(); } res } @@ -707,7 +707,7 @@ impl TranscriberSrcPad { let res = self.obj().start_task(move || { let Some(this) = this_weak.upgrade() else { if let Some(pad) = pad_weak.upgrade() { - let _ = pad.pause_task(); + let _ = pad.imp().pause_task(); } return; }; @@ -723,7 +723,7 @@ impl TranscriberSrcPad { gst::StreamError::Failed, ["Streaming failed: {}", err] ); - let _ = this.obj().pause_task(); + let _ = this.pause_task(); } }); if res.is_err() { @@ -732,6 +732,12 @@ impl TranscriberSrcPad { Ok(()) } + fn pause_task(&self) -> Result<(), glib::BoolError> { + self.state.lock().unwrap().sender = None; + + self.obj().pause_task() + } + fn stop_task(&self) -> Result<(), glib::BoolError> { self.state.lock().unwrap().sender = None;