threadshare: fix resources not available when preparing asynchronously

[MR 2494] introduced `task::TransitionStatus::block_on_or_add_subtask_then()`
which allows executing a function after the transition succeeds or fails,
whatever the executor being used.

In that MR, I moved the resource assignments into that function in `prepare()`,
the reasoning being that if the Task transition failed, the resources would be
useless. Examples for such resources include: dataqueues, channel senders, etc.

This was actually a bad idea: if the transition happens asynchronously, the
state change can complete and buffers can start flowing before the transition
completes, leading to resources not being available, while they will be a
moment later and could have buffered the incoming items.

This commit reverts the resource assignments before the Task transition requests
for those handled by `prepare()`.

[MR 2494]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2494

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2517>
This commit is contained in:
François Laignel 2025-08-28 19:16:42 +02:00
parent d59485ed5e
commit d976d2e376
8 changed files with 30 additions and 42 deletions

View file

@ -241,13 +241,14 @@ impl TaskSink {
// Enable backpressure for items // Enable backpressure for items
let (item_sender, item_receiver) = flume::bounded(0); let (item_sender, item_receiver) = flume::bounded(0);
*self.item_sender.lock().unwrap() = Some(item_sender);
let is_main_elem = settings.is_main_elem; let is_main_elem = settings.is_main_elem;
let task_impl = TaskSinkTask::new(&self.obj(), item_receiver, settings.is_main_elem, stats); let task_impl = TaskSinkTask::new(&self.obj(), item_receiver, settings.is_main_elem, stats);
self.task self.task
.prepare(task_impl, ts_ctx) .prepare(task_impl, ts_ctx)
.block_on_or_add_subtask_then(self.obj(), move |elem, res| { .block_on_or_add_subtask_then(self.obj(), move |elem, res| {
if res.is_ok() { if res.is_ok() {
*elem.imp().item_sender.lock().unwrap() = Some(item_sender);
debug_or_trace!(CAT, is_main_elem, obj = elem, "Prepared"); debug_or_trace!(CAT, is_main_elem, obj = elem, "Prepared");
} }
}) })

View file

@ -373,11 +373,11 @@ impl AppSrc {
*self.configured_caps.lock().unwrap() = None; *self.configured_caps.lock().unwrap() = None;
let (sender, receiver) = mpsc::channel(max_buffers); let (sender, receiver) = mpsc::channel(max_buffers);
*self.sender.lock().unwrap() = Some(sender);
self.task self.task
.prepare(AppSrcTask::new(self.obj().clone(), receiver), context) .prepare(AppSrcTask::new(self.obj().clone(), receiver), context)
.block_on_or_add_subtask_then(self.obj(), move |elem, res| { .block_on_or_add_subtask_then(self.obj(), |elem, res| {
if res.is_ok() { if res.is_ok() {
*elem.imp().sender.lock().unwrap() = Some(sender);
gst::debug!(CAT, obj = elem, "Prepared"); gst::debug!(CAT, obj = elem, "Prepared");
} }
}) })

View file

@ -697,16 +697,14 @@ impl InterSrc {
Some(settings.max_size_time) Some(settings.max_size_time)
}, },
); );
*self.dataqueue.lock().unwrap() = Some(dataqueue.clone());
let inter_ctx_name = self.settings.lock().unwrap().inter_context.to_string(); let inter_ctx_name = self.settings.lock().unwrap().inter_context.to_string();
let elem = self.obj().clone(); let elem = self.obj().clone();
block_on_or_add_subtask(async move { block_on_or_add_subtask(async move {
let imp = elem.imp(); let imp = elem.imp();
let res = imp let res = imp.join_inter_ctx(&inter_ctx_name, ts_ctx, dataqueue).await;
.join_inter_ctx(&inter_ctx_name, ts_ctx, dataqueue.clone())
.await;
if res.is_ok() { if res.is_ok() {
*imp.dataqueue.lock().unwrap() = Some(dataqueue);
gst::debug!(CAT, obj = elem, "Prepared"); gst::debug!(CAT, obj = elem, "Prepared");
} }

View file

@ -964,27 +964,22 @@ impl ProxySrc {
}, },
); );
{
let mut shared_ctx = proxy_ctx.lock_shared();
shared_ctx.dataqueue = Some(dataqueue.clone());
let mut proxy_src_pads = PROXY_SRC_PADS.lock().unwrap();
assert!(!proxy_src_pads.contains_key(&settings.proxy_context));
proxy_src_pads.insert(settings.proxy_context, self.src_pad.downgrade());
}
*self.proxy_ctx.lock().unwrap() = Some(proxy_ctx);
*self.dataqueue.lock().unwrap() = Some(dataqueue.clone());
self.task self.task
.prepare( .prepare(ProxySrcTask::new(self.obj().clone(), dataqueue), ts_ctx)
ProxySrcTask::new(self.obj().clone(), dataqueue.clone()), .block_on_or_add_subtask_then(self.obj(), |elem, res| {
ts_ctx,
)
.block_on_or_add_subtask_then(self.obj(), move |elem, res| {
if res.is_ok() { if res.is_ok() {
let imp = elem.imp();
{
let mut shared_ctx = proxy_ctx.lock_shared();
shared_ctx.dataqueue = Some(dataqueue.clone());
let mut proxy_src_pads = PROXY_SRC_PADS.lock().unwrap();
assert!(!proxy_src_pads.contains_key(&settings.proxy_context));
proxy_src_pads.insert(settings.proxy_context, imp.src_pad.downgrade());
}
*imp.proxy_ctx.lock().unwrap() = Some(proxy_ctx);
*imp.dataqueue.lock().unwrap() = Some(dataqueue);
gst::debug!(SRC_CAT, obj = elem, "Prepared"); gst::debug!(SRC_CAT, obj = elem, "Prepared");
} }
}) })

View file

@ -580,14 +580,12 @@ impl Queue {
) )
})?; })?;
*self.dataqueue.lock().unwrap() = Some(dataqueue.clone());
self.task self.task
.prepare( .prepare(QueueTask::new(self.obj().clone(), dataqueue), context)
QueueTask::new(self.obj().clone(), dataqueue.clone()), .block_on_or_add_subtask_then(self.obj(), |elem, res| {
context,
)
.block_on_or_add_subtask_then(self.obj(), move |elem, res| {
if res.is_ok() { if res.is_ok() {
*elem.imp().dataqueue.lock().unwrap() = Some(dataqueue);
gst::debug!(CAT, obj = elem, "Prepared"); gst::debug!(CAT, obj = elem, "Prepared");
} }
}) })

View file

@ -1068,14 +1068,14 @@ impl RTPDTMFSrc {
drop(settings); drop(settings);
let (dtmf_evt_tx, dtmf_evt_rx) = mpsc::channel(DEFAULT_DTMF_EVT_CHAN_CAPACITY); let (dtmf_evt_tx, dtmf_evt_rx) = mpsc::channel(DEFAULT_DTMF_EVT_CHAN_CAPACITY);
*self.dtmf_evt_tx.lock().unwrap() = Some(dtmf_evt_tx);
self.task self.task
.prepare( .prepare(
RTPDTMFSrcTask::new(self.obj().clone(), dtmf_evt_rx), RTPDTMFSrcTask::new(self.obj().clone(), dtmf_evt_rx),
context, context,
) )
.block_on_or_add_subtask_then(self.obj(), move |elem, res| { .block_on_or_add_subtask_then(self.obj(), |elem, res| {
if res.is_ok() { if res.is_ok() {
*elem.imp().dtmf_evt_tx.lock().unwrap() = Some(dtmf_evt_tx);
gst::debug!(CAT, obj = elem, "Prepared"); gst::debug!(CAT, obj = elem, "Prepared");
} }
}) })

View file

@ -819,16 +819,12 @@ impl UdpSrc {
drop(settings); drop(settings);
let (sender, receiver) = channel(1); let (sender, receiver) = channel(1);
self.state.lock().unwrap().event_sender = Some(sender);
*self.configured_caps.lock().unwrap() = None; *self.configured_caps.lock().unwrap() = None;
self.task self.task
.prepare(UdpSrcTask::new(self.obj().clone(), receiver), context) .prepare(UdpSrcTask::new(self.obj().clone(), receiver), context)
.block_on_or_add_subtask_then(self.obj(), move |elem, res| { .block_on_or_add_subtask_then(self.obj(), |elem, res| {
let imp = elem.imp();
let mut state = imp.state.lock().unwrap();
state.event_sender = Some(sender);
drop(state);
if res.is_ok() { if res.is_ok() {
gst::debug!(CAT, obj = elem, "Prepared"); gst::debug!(CAT, obj = elem, "Prepared");
} }

View file

@ -224,14 +224,14 @@ mod imp_src {
})?; })?;
let (sender, receiver) = mpsc::channel(1); let (sender, receiver) = mpsc::channel(1);
*self.sender.lock().unwrap() = Some(sender);
self.task self.task
.prepare( .prepare(
ElementSrcTestTask::new(self.obj().clone(), receiver), ElementSrcTestTask::new(self.obj().clone(), receiver),
context, context,
) )
.block_on_or_add_subtask_then(self.obj(), move |elem, res| { .block_on_or_add_subtask_then(self.obj(), |elem, res| {
if res.is_ok() { if res.is_ok() {
*elem.imp().sender.lock().unwrap() = Some(sender);
gst::debug!(SRC_CAT, obj = elem, "Prepared"); gst::debug!(SRC_CAT, obj = elem, "Prepared");
} }
}) })