From e314b2e59182e1bbde42c6fdaca5cff430216d39 Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Thu, 31 Jul 2025 00:35:41 +0900 Subject: [PATCH] awstranslate: Handle multiple stream-start event Do not spawn new task loop with channel if we have one already Part-of: --- net/aws/src/translate/imp.rs | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/net/aws/src/translate/imp.rs b/net/aws/src/translate/imp.rs index 514d8a18d..3a51bb14a 100644 --- a/net/aws/src/translate/imp.rs +++ b/net/aws/src/translate/imp.rs @@ -187,6 +187,7 @@ struct State { seqnum: gst::Seqnum, chained_one: bool, current_speaker: Option, + task_started: bool, } impl Default for State { @@ -202,6 +203,7 @@ impl Default for State { seqnum: gst::Seqnum::next(), chained_one: false, current_speaker: None, + task_started: false, } } } @@ -255,7 +257,7 @@ impl Translate { let ret = gst::Pad::event_default(pad, Some(&*self.obj()), event); let _ = self.state.lock().unwrap().translate_tx.take(); let _ = self.srcpad.pause_task(); - self.disconnect(); + self.disconnect(false); ret } StreamStart(_) => { @@ -658,6 +660,11 @@ impl Translate { } fn start_srcpad_task(&self) -> Result<(), gst::LoggableError> { + if self.state.lock().unwrap().task_started { + gst::debug!(CAT, imp = self, "Task started already"); + return Ok(()); + } + gst::debug!(CAT, imp = self, "starting source pad task"); self.ensure_connection() @@ -746,12 +753,14 @@ impl Translate { return Err(gst::loggable_error!(CAT, "Failed to start pad task")); } + self.state.lock().unwrap().task_started = true; + gst::debug!(CAT, imp = self, "started source pad task"); Ok(()) } - fn disconnect(&self) { + fn disconnect(&self, stop_task: bool) { let mut state = self.state.lock().unwrap(); if let Some(abort_handle) = state.send_abort_handle.take() { @@ -759,7 +768,19 @@ impl Translate { abort_handle.abort(); } - *state = State::default(); + let mut task_started = state.task_started; + + if stop_task { + drop(state); + let _ = self.srcpad.stop_task(); + state = self.state.lock().unwrap(); + task_started = false; + } + + *state = State { + task_started, + ..Default::default() + }; } fn sink_chain( @@ -1194,8 +1215,7 @@ impl ElementImpl for Translate { })?; } gst::StateChange::PausedToReady => { - self.disconnect(); - let _ = self.srcpad.stop_task(); + self.disconnect(true); } _ => (), }