diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index 9931992cc..6ec3bc9b0 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -638,19 +638,12 @@ impl Default for State { } } -#[derive(Debug)] -struct PreparationSet { - join_handle: JoinHandle>, - context: Context, -} - #[derive(Debug)] struct UdpSrc { src_pad: PadSrc, src_pad_handler: UdpSrcPadHandler, state: Mutex, settings: Mutex, - preparation_set: Mutex>, } lazy_static! { @@ -663,42 +656,18 @@ lazy_static! { impl UdpSrc { async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { - let _state = self.state.lock().await; + let mut state = self.state.lock().await; + let mut settings = self.settings.lock().await.clone(); + gst_debug!(CAT, obj: element, "Preparing"); - let context = { - let settings = self.settings.lock().await.clone(); - - { - let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap(); - src_pad_handler_state.retrieve_sender_address = settings.retrieve_sender_address; - src_pad_handler_state.caps = settings.caps.clone(); - } - + let context = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, ["Failed to acquire Context: {}", err] ) - })? - }; - - // UdpSocket needs to be instantiated in the thread of its I/O reactor - *self.preparation_set.lock().await = Some(PreparationSet { - join_handle: context.spawn(Self::prepare_socket(element.clone())), - context, - }); - - gst_debug!(CAT, obj: element, "Prepared"); - - Ok(()) - } - - async fn prepare_socket(element: gst::Element) -> Result<(), gst::ErrorMessage> { - let this = Self::from_instance(&element); - - let mut settings = this.settings.lock().await.clone(); - gst_debug!(CAT, obj: &element, "Preparing Socket"); + })?; let socket = if let Some(ref wrapped_socket) = settings.socket { use std::net::UdpSocket; @@ -714,11 +683,13 @@ impl UdpSrc { socket = wrapped_socket.get() } - let socket = tokio::net::UdpSocket::from_std(socket).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to setup socket for tokio: {}", err] - ) + let socket = context.enter(|| { + tokio::net::UdpSocket::from_std(socket).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to setup socket for tokio: {}", err] + ) + }) })?; settings.used_socket = Some(wrapped_socket.clone()); @@ -755,7 +726,7 @@ impl UdpSrc { let saddr = SocketAddr::new(bind_addr, port as u16); gst_debug!( CAT, - obj: &element, + obj: element, "Binding to {:?} for multicast group {:?}", saddr, addr @@ -764,7 +735,7 @@ impl UdpSrc { saddr } else { let saddr = SocketAddr::new(addr, port as u16); - gst_debug!(CAT, obj: &element, "Binding to {:?}", saddr); + gst_debug!(CAT, obj: element, "Binding to {:?}", saddr); saddr }; @@ -807,11 +778,13 @@ impl UdpSrc { ) })?; - let socket = tokio::net::UdpSocket::from_std(socket).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to setup socket for tokio: {}", err] - ) + let socket = context.enter(|| { + tokio::net::UdpSocket::from_std(socket).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenRead, + ["Failed to setup socket for tokio: {}", err] + ) + }) })?; if addr.is_multicast() { @@ -911,35 +884,20 @@ impl UdpSrc { ) })?; - *this.src_pad_handler.0.socket_stream.lock().await = Some(socket_stream); + { + let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap(); + src_pad_handler_state.retrieve_sender_address = settings.retrieve_sender_address; + src_pad_handler_state.caps = settings.caps.clone(); + } - this.state.lock().await.socket = Some(socket); - - gst_debug!(CAT, obj: &element, "Socket Prepared"); drop(settings); + *self.src_pad_handler.0.socket_stream.lock().await = Some(socket_stream); + + state.socket = Some(socket); + element.notify("used-socket"); - Ok(()) - } - - async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { - gst_debug!(CAT, obj: element, "Completing preparation"); - - let PreparationSet { - join_handle, - context, - } = self - .preparation_set - .lock() - .await - .take() - .expect("preparation_set already taken"); - - join_handle - .await - .expect("The socket preparation has panicked")?; - self.src_pad .prepare(context, &self.src_pad_handler) .await @@ -950,7 +908,7 @@ impl UdpSrc { ) })?; - gst_debug!(CAT, obj: element, "Preparation completed"); + gst_debug!(CAT, obj: element, "Prepared"); Ok(()) } @@ -1073,7 +1031,6 @@ impl ObjectSubclass for UdpSrc { src_pad_handler: UdpSrcPadHandler::default(), state: Mutex::new(State::default()), settings: Mutex::new(Settings::default()), - preparation_set: Mutex::new(None), } } } @@ -1179,12 +1136,6 @@ impl ElementImpl for UdpSrc { gst::StateChangeError })?; } - gst::StateChange::ReadyToPaused => { - runtime::executor::block_on(self.complete_preparation(element)).map_err(|err| { - element.post_error_message(&err); - gst::StateChangeError - })?; - } gst::StateChange::PlayingToPaused => { runtime::executor::block_on(self.pause(element)) .map_err(|_| gst::StateChangeError)?;