ts-udpsrc: prepare socket immediately

Now that we can `Context::enter`, it is no longer necessary to spawn
a `Future` to prepare the `UdpSocket` and beneficiate from the
`Context`'s IO driver.
This commit is contained in:
François Laignel 2020-01-07 20:19:46 +01:00
parent a15d60105b
commit 7a4fea8669

View file

@ -638,19 +638,12 @@ impl Default for State {
} }
} }
#[derive(Debug)]
struct PreparationSet {
join_handle: JoinHandle<Result<(), gst::ErrorMessage>>,
context: Context,
}
#[derive(Debug)] #[derive(Debug)]
struct UdpSrc { struct UdpSrc {
src_pad: PadSrc, src_pad: PadSrc,
src_pad_handler: UdpSrcPadHandler, src_pad_handler: UdpSrcPadHandler,
state: Mutex<State>, state: Mutex<State>,
settings: Mutex<Settings>, settings: Mutex<Settings>,
preparation_set: Mutex<Option<PreparationSet>>,
} }
lazy_static! { lazy_static! {
@ -663,42 +656,18 @@ lazy_static! {
impl UdpSrc { impl UdpSrc {
async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let _state = self.state.lock().await; let mut state = self.state.lock().await;
let mut settings = self.settings.lock().await.clone();
gst_debug!(CAT, obj: element, "Preparing"); gst_debug!(CAT, obj: element, "Preparing");
let context = { let context =
let settings = self.settings.lock().await.clone();
{
let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap();
src_pad_handler_state.retrieve_sender_address = settings.retrieve_sender_address;
src_pad_handler_state.caps = settings.caps.clone();
}
Context::acquire(&settings.context, settings.context_wait).map_err(|err| { Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst_error_msg!( gst_error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err] ["Failed to acquire Context: {}", err]
) )
})? })?;
};
// UdpSocket needs to be instantiated in the thread of its I/O reactor
*self.preparation_set.lock().await = Some(PreparationSet {
join_handle: context.spawn(Self::prepare_socket(element.clone())),
context,
});
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
async fn prepare_socket(element: gst::Element) -> Result<(), gst::ErrorMessage> {
let this = Self::from_instance(&element);
let mut settings = this.settings.lock().await.clone();
gst_debug!(CAT, obj: &element, "Preparing Socket");
let socket = if let Some(ref wrapped_socket) = settings.socket { let socket = if let Some(ref wrapped_socket) = settings.socket {
use std::net::UdpSocket; use std::net::UdpSocket;
@ -714,11 +683,13 @@ impl UdpSrc {
socket = wrapped_socket.get() socket = wrapped_socket.get()
} }
let socket = tokio::net::UdpSocket::from_std(socket).map_err(|err| { let socket = context.enter(|| {
gst_error_msg!( tokio::net::UdpSocket::from_std(socket).map_err(|err| {
gst::ResourceError::OpenRead, gst_error_msg!(
["Failed to setup socket for tokio: {}", err] gst::ResourceError::OpenRead,
) ["Failed to setup socket for tokio: {}", err]
)
})
})?; })?;
settings.used_socket = Some(wrapped_socket.clone()); settings.used_socket = Some(wrapped_socket.clone());
@ -755,7 +726,7 @@ impl UdpSrc {
let saddr = SocketAddr::new(bind_addr, port as u16); let saddr = SocketAddr::new(bind_addr, port as u16);
gst_debug!( gst_debug!(
CAT, CAT,
obj: &element, obj: element,
"Binding to {:?} for multicast group {:?}", "Binding to {:?} for multicast group {:?}",
saddr, saddr,
addr addr
@ -764,7 +735,7 @@ impl UdpSrc {
saddr saddr
} else { } else {
let saddr = SocketAddr::new(addr, port as u16); let saddr = SocketAddr::new(addr, port as u16);
gst_debug!(CAT, obj: &element, "Binding to {:?}", saddr); gst_debug!(CAT, obj: element, "Binding to {:?}", saddr);
saddr saddr
}; };
@ -807,11 +778,13 @@ impl UdpSrc {
) )
})?; })?;
let socket = tokio::net::UdpSocket::from_std(socket).map_err(|err| { let socket = context.enter(|| {
gst_error_msg!( tokio::net::UdpSocket::from_std(socket).map_err(|err| {
gst::ResourceError::OpenRead, gst_error_msg!(
["Failed to setup socket for tokio: {}", err] gst::ResourceError::OpenRead,
) ["Failed to setup socket for tokio: {}", err]
)
})
})?; })?;
if addr.is_multicast() { if addr.is_multicast() {
@ -911,35 +884,20 @@ impl UdpSrc {
) )
})?; })?;
*this.src_pad_handler.0.socket_stream.lock().await = Some(socket_stream); {
let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap();
src_pad_handler_state.retrieve_sender_address = settings.retrieve_sender_address;
src_pad_handler_state.caps = settings.caps.clone();
}
this.state.lock().await.socket = Some(socket);
gst_debug!(CAT, obj: &element, "Socket Prepared");
drop(settings); drop(settings);
*self.src_pad_handler.0.socket_stream.lock().await = Some(socket_stream);
state.socket = Some(socket);
element.notify("used-socket"); element.notify("used-socket");
Ok(())
}
async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Completing preparation");
let PreparationSet {
join_handle,
context,
} = self
.preparation_set
.lock()
.await
.take()
.expect("preparation_set already taken");
join_handle
.await
.expect("The socket preparation has panicked")?;
self.src_pad self.src_pad
.prepare(context, &self.src_pad_handler) .prepare(context, &self.src_pad_handler)
.await .await
@ -950,7 +908,7 @@ impl UdpSrc {
) )
})?; })?;
gst_debug!(CAT, obj: element, "Preparation completed"); gst_debug!(CAT, obj: element, "Prepared");
Ok(()) Ok(())
} }
@ -1073,7 +1031,6 @@ impl ObjectSubclass for UdpSrc {
src_pad_handler: UdpSrcPadHandler::default(), src_pad_handler: UdpSrcPadHandler::default(),
state: Mutex::new(State::default()), state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()), settings: Mutex::new(Settings::default()),
preparation_set: Mutex::new(None),
} }
} }
} }
@ -1179,12 +1136,6 @@ impl ElementImpl for UdpSrc {
gst::StateChangeError gst::StateChangeError
})?; })?;
} }
gst::StateChange::ReadyToPaused => {
runtime::executor::block_on(self.complete_preparation(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => { gst::StateChange::PlayingToPaused => {
runtime::executor::block_on(self.pause(element)) runtime::executor::block_on(self.pause(element))
.map_err(|_| gst::StateChangeError)?; .map_err(|_| gst::StateChangeError)?;