diff --git a/generic/threadshare/examples/standalone/sink/task/imp.rs b/generic/threadshare/examples/standalone/sink/task/imp.rs index c89107d3a..6d815a0d1 100644 --- a/generic/threadshare/examples/standalone/sink/task/imp.rs +++ b/generic/threadshare/examples/standalone/sink/task/imp.rs @@ -241,13 +241,14 @@ impl TaskSink { // Enable backpressure for items let (item_sender, item_receiver) = flume::bounded(0); + *self.item_sender.lock().unwrap() = Some(item_sender); + let is_main_elem = settings.is_main_elem; let task_impl = TaskSinkTask::new(&self.obj(), item_receiver, settings.is_main_elem, stats); self.task .prepare(task_impl, ts_ctx) .block_on_or_add_subtask_then(self.obj(), move |elem, res| { if res.is_ok() { - *elem.imp().item_sender.lock().unwrap() = Some(item_sender); debug_or_trace!(CAT, is_main_elem, obj = elem, "Prepared"); } }) diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index 53f9d246b..265549020 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -373,11 +373,11 @@ impl AppSrc { *self.configured_caps.lock().unwrap() = None; let (sender, receiver) = mpsc::channel(max_buffers); + *self.sender.lock().unwrap() = Some(sender); self.task .prepare(AppSrcTask::new(self.obj().clone(), receiver), context) - .block_on_or_add_subtask_then(self.obj(), move |elem, res| { + .block_on_or_add_subtask_then(self.obj(), |elem, res| { if res.is_ok() { - *elem.imp().sender.lock().unwrap() = Some(sender); gst::debug!(CAT, obj = elem, "Prepared"); } }) diff --git a/generic/threadshare/src/inter/src/imp.rs b/generic/threadshare/src/inter/src/imp.rs index 3d04d03b1..d7a13d648 100644 --- a/generic/threadshare/src/inter/src/imp.rs +++ b/generic/threadshare/src/inter/src/imp.rs @@ -697,16 +697,14 @@ impl InterSrc { Some(settings.max_size_time) }, ); + *self.dataqueue.lock().unwrap() = Some(dataqueue.clone()); let inter_ctx_name = self.settings.lock().unwrap().inter_context.to_string(); let elem = self.obj().clone(); block_on_or_add_subtask(async move { let imp = elem.imp(); - let res = imp - .join_inter_ctx(&inter_ctx_name, ts_ctx, dataqueue.clone()) - .await; + let res = imp.join_inter_ctx(&inter_ctx_name, ts_ctx, dataqueue).await; if res.is_ok() { - *imp.dataqueue.lock().unwrap() = Some(dataqueue); gst::debug!(CAT, obj = elem, "Prepared"); } diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index c4047d930..55e174034 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -964,27 +964,22 @@ impl ProxySrc { }, ); + { + let mut shared_ctx = proxy_ctx.lock_shared(); + shared_ctx.dataqueue = Some(dataqueue.clone()); + + let mut proxy_src_pads = PROXY_SRC_PADS.lock().unwrap(); + assert!(!proxy_src_pads.contains_key(&settings.proxy_context)); + proxy_src_pads.insert(settings.proxy_context, self.src_pad.downgrade()); + } + + *self.proxy_ctx.lock().unwrap() = Some(proxy_ctx); + *self.dataqueue.lock().unwrap() = Some(dataqueue.clone()); + self.task - .prepare( - ProxySrcTask::new(self.obj().clone(), dataqueue.clone()), - ts_ctx, - ) - .block_on_or_add_subtask_then(self.obj(), move |elem, res| { + .prepare(ProxySrcTask::new(self.obj().clone(), dataqueue), ts_ctx) + .block_on_or_add_subtask_then(self.obj(), |elem, res| { if res.is_ok() { - let imp = elem.imp(); - - { - let mut shared_ctx = proxy_ctx.lock_shared(); - shared_ctx.dataqueue = Some(dataqueue.clone()); - - let mut proxy_src_pads = PROXY_SRC_PADS.lock().unwrap(); - assert!(!proxy_src_pads.contains_key(&settings.proxy_context)); - proxy_src_pads.insert(settings.proxy_context, imp.src_pad.downgrade()); - } - - *imp.proxy_ctx.lock().unwrap() = Some(proxy_ctx); - *imp.dataqueue.lock().unwrap() = Some(dataqueue); - gst::debug!(SRC_CAT, obj = elem, "Prepared"); } }) diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index 6f4e77087..281cb1411 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -580,14 +580,12 @@ impl Queue { ) })?; + *self.dataqueue.lock().unwrap() = Some(dataqueue.clone()); + self.task - .prepare( - QueueTask::new(self.obj().clone(), dataqueue.clone()), - context, - ) - .block_on_or_add_subtask_then(self.obj(), move |elem, res| { + .prepare(QueueTask::new(self.obj().clone(), dataqueue), context) + .block_on_or_add_subtask_then(self.obj(), |elem, res| { if res.is_ok() { - *elem.imp().dataqueue.lock().unwrap() = Some(dataqueue); gst::debug!(CAT, obj = elem, "Prepared"); } }) diff --git a/generic/threadshare/src/rtpdtmfsrc/imp.rs b/generic/threadshare/src/rtpdtmfsrc/imp.rs index a59ee9bb3..7dd857102 100644 --- a/generic/threadshare/src/rtpdtmfsrc/imp.rs +++ b/generic/threadshare/src/rtpdtmfsrc/imp.rs @@ -1068,14 +1068,14 @@ impl RTPDTMFSrc { drop(settings); let (dtmf_evt_tx, dtmf_evt_rx) = mpsc::channel(DEFAULT_DTMF_EVT_CHAN_CAPACITY); + *self.dtmf_evt_tx.lock().unwrap() = Some(dtmf_evt_tx); self.task .prepare( RTPDTMFSrcTask::new(self.obj().clone(), dtmf_evt_rx), context, ) - .block_on_or_add_subtask_then(self.obj(), move |elem, res| { + .block_on_or_add_subtask_then(self.obj(), |elem, res| { if res.is_ok() { - *elem.imp().dtmf_evt_tx.lock().unwrap() = Some(dtmf_evt_tx); gst::debug!(CAT, obj = elem, "Prepared"); } }) diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index 075042b2b..f311cae4e 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -819,16 +819,12 @@ impl UdpSrc { drop(settings); let (sender, receiver) = channel(1); + self.state.lock().unwrap().event_sender = Some(sender); *self.configured_caps.lock().unwrap() = None; self.task .prepare(UdpSrcTask::new(self.obj().clone(), receiver), context) - .block_on_or_add_subtask_then(self.obj(), move |elem, res| { - let imp = elem.imp(); - let mut state = imp.state.lock().unwrap(); - state.event_sender = Some(sender); - drop(state); - + .block_on_or_add_subtask_then(self.obj(), |elem, res| { if res.is_ok() { gst::debug!(CAT, obj = elem, "Prepared"); } diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs index df441bcba..2296ef2df 100644 --- a/generic/threadshare/tests/pad.rs +++ b/generic/threadshare/tests/pad.rs @@ -224,14 +224,14 @@ mod imp_src { })?; let (sender, receiver) = mpsc::channel(1); + *self.sender.lock().unwrap() = Some(sender); self.task .prepare( ElementSrcTestTask::new(self.obj().clone(), receiver), context, ) - .block_on_or_add_subtask_then(self.obj(), move |elem, res| { + .block_on_or_add_subtask_then(self.obj(), |elem, res| { if res.is_ok() { - *elem.imp().sender.lock().unwrap() = Some(sender); gst::debug!(SRC_CAT, obj = elem, "Prepared"); } })