awstranslate: Handle multiple stream-start event

Do not spawn new task loop with channel if we have one already

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2404>
This commit is contained in:
Seungha Yang 2025-07-31 00:35:41 +09:00 committed by GStreamer Marge Bot
parent 00b7ebab48
commit e314b2e591

View file

@ -187,6 +187,7 @@ struct State {
seqnum: gst::Seqnum, seqnum: gst::Seqnum,
chained_one: bool, chained_one: bool,
current_speaker: Option<String>, current_speaker: Option<String>,
task_started: bool,
} }
impl Default for State { impl Default for State {
@ -202,6 +203,7 @@ impl Default for State {
seqnum: gst::Seqnum::next(), seqnum: gst::Seqnum::next(),
chained_one: false, chained_one: false,
current_speaker: None, current_speaker: None,
task_started: false,
} }
} }
} }
@ -255,7 +257,7 @@ impl Translate {
let ret = gst::Pad::event_default(pad, Some(&*self.obj()), event); let ret = gst::Pad::event_default(pad, Some(&*self.obj()), event);
let _ = self.state.lock().unwrap().translate_tx.take(); let _ = self.state.lock().unwrap().translate_tx.take();
let _ = self.srcpad.pause_task(); let _ = self.srcpad.pause_task();
self.disconnect(); self.disconnect(false);
ret ret
} }
StreamStart(_) => { StreamStart(_) => {
@ -658,6 +660,11 @@ impl Translate {
} }
fn start_srcpad_task(&self) -> Result<(), gst::LoggableError> { fn start_srcpad_task(&self) -> Result<(), gst::LoggableError> {
if self.state.lock().unwrap().task_started {
gst::debug!(CAT, imp = self, "Task started already");
return Ok(());
}
gst::debug!(CAT, imp = self, "starting source pad task"); gst::debug!(CAT, imp = self, "starting source pad task");
self.ensure_connection() self.ensure_connection()
@ -746,12 +753,14 @@ impl Translate {
return Err(gst::loggable_error!(CAT, "Failed to start pad task")); return Err(gst::loggable_error!(CAT, "Failed to start pad task"));
} }
self.state.lock().unwrap().task_started = true;
gst::debug!(CAT, imp = self, "started source pad task"); gst::debug!(CAT, imp = self, "started source pad task");
Ok(()) Ok(())
} }
fn disconnect(&self) { fn disconnect(&self, stop_task: bool) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
if let Some(abort_handle) = state.send_abort_handle.take() { if let Some(abort_handle) = state.send_abort_handle.take() {
@ -759,7 +768,19 @@ impl Translate {
abort_handle.abort(); abort_handle.abort();
} }
*state = State::default(); let mut task_started = state.task_started;
if stop_task {
drop(state);
let _ = self.srcpad.stop_task();
state = self.state.lock().unwrap();
task_started = false;
}
*state = State {
task_started,
..Default::default()
};
} }
fn sink_chain( fn sink_chain(
@ -1194,8 +1215,7 @@ impl ElementImpl for Translate {
})?; })?;
} }
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
self.disconnect(); self.disconnect(true);
let _ = self.srcpad.stop_task();
} }
_ => (), _ => (),
} }