From a1b89c1fb96ca84899e599aad4d81c5ccc4d293a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Fri, 24 Jun 2022 11:48:04 +0200 Subject: [PATCH] ts/udpsrc: reduce sync primitives in async hot path - Moved UdpSrcPadHandlerState and related funtions to UdpSrcTask. - Moved Socket preparation in UdpSrcTask. No longer need for Context::enter. --- generic/threadshare/src/udpsrc/imp.rs | 705 ++++++++++++-------------- 1 file changed, 323 insertions(+), 382 deletions(-) diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index cd5cf5fe..df887616 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -18,7 +18,6 @@ // SPDX-License-Identifier: LGPL-2.1-or-later use futures::future::BoxFuture; -use futures::lock::Mutex as FutMutex; use futures::prelude::*; use gst::glib; @@ -31,13 +30,12 @@ use once_cell::sync::Lazy; use std::i32; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; -use std::sync::Arc; -use std::sync::Mutex as StdMutex; +use std::sync::Mutex; use std::time::Duration; use std::u16; use crate::runtime::prelude::*; -use crate::runtime::{Async, Context, PadSrc, PadSrcRef, PadSrcWeak, Task}; +use crate::runtime::{Async, Context, PadSrc, PadSrcRef, Task}; use crate::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead}; @@ -109,91 +107,8 @@ impl SocketRead for UdpReader { } } -#[derive(Debug)] -struct UdpSrcPadHandlerState { - retrieve_sender_address: bool, - need_initial_events: bool, - need_segment: bool, - caps: Option, -} - -impl Default for UdpSrcPadHandlerState { - fn default() -> Self { - UdpSrcPadHandlerState { - retrieve_sender_address: true, - need_initial_events: true, - need_segment: true, - caps: None, - } - } -} - -#[derive(Debug, Default)] -struct UdpSrcPadHandlerInner { - state: FutMutex, - configured_caps: StdMutex>, -} - -#[derive(Clone, Debug, Default)] -struct UdpSrcPadHandler(Arc); - -impl UdpSrcPadHandler { - fn prepare(&self, caps: Option, retrieve_sender_address: bool) { - let mut state = self.0.state.try_lock().expect("State locked elsewhere"); - - state.caps = caps; - state.retrieve_sender_address = retrieve_sender_address; - } - - async fn reset_state(&self) { - *self.0.state.lock().await = Default::default(); - } - - async fn set_need_segment(&self) { - self.0.state.lock().await.need_segment = true; - } - - async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &super::UdpSrc) { - let mut state = self.0.state.lock().await; - if state.need_initial_events { - gst::debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); - - let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); - let stream_start_evt = gst::event::StreamStart::builder(&stream_id) - .group_id(gst::GroupId::next()) - .build(); - pad.push_event(stream_start_evt).await; - - if let Some(ref caps) = state.caps { - pad.push_event(gst::event::Caps::new(caps)).await; - *self.0.configured_caps.lock().unwrap() = Some(caps.clone()); - } - - state.need_initial_events = false; - } - - if state.need_segment { - let segment_evt = - gst::event::Segment::new(&gst::FormattedSegment::::new()); - pad.push_event(segment_evt).await; - - state.need_segment = false; - } - } - - async fn push_buffer( - &self, - pad: &PadSrcRef<'_>, - element: &super::UdpSrc, - buffer: gst::Buffer, - ) -> Result { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); - - self.push_prelude(pad, element).await; - - pad.push(buffer).await - } -} +#[derive(Clone, Debug)] +struct UdpSrcPadHandler; impl PadSrcHandler for UdpSrcPadHandler { type ElementImpl = UdpSrc; @@ -229,7 +144,7 @@ impl PadSrcHandler for UdpSrcPadHandler { fn src_query( &self, pad: &PadSrcRef, - _udpsrc: &UdpSrc, + udpsrc: &UdpSrc, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { @@ -248,7 +163,7 @@ impl PadSrcHandler for UdpSrcPadHandler { true } QueryViewMut::Caps(q) => { - let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() { + let caps = if let Some(caps) = udpsrc.configured_caps.lock().unwrap().as_ref() { q.filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -277,32 +192,296 @@ impl PadSrcHandler for UdpSrcPadHandler { struct UdpSrcTask { element: super::UdpSrc, - src_pad: PadSrcWeak, - src_pad_handler: UdpSrcPadHandler, - socket: Socket, + socket: Option>, + retrieve_sender_address: bool, + need_initial_events: bool, + need_segment: bool, } impl UdpSrcTask { - fn new( - element: &super::UdpSrc, - src_pad: &PadSrc, - src_pad_handler: &UdpSrcPadHandler, - socket: Socket, - ) -> Self { + fn new(element: super::UdpSrc) -> Self { UdpSrcTask { - element: element.clone(), - src_pad: src_pad.downgrade(), - src_pad_handler: src_pad_handler.clone(), - socket, + element, + socket: None, + retrieve_sender_address: DEFAULT_RETRIEVE_SENDER_ADDRESS, + need_initial_events: true, + need_segment: true, } } + + async fn push_buffer( + &mut self, + buffer: gst::Buffer, + ) -> Result { + gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer); + let udpsrc = self.element.imp(); + + if self.need_initial_events { + gst::debug!(CAT, obj: &self.element, "Pushing initial events"); + + let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); + let stream_start_evt = gst::event::StreamStart::builder(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + udpsrc.src_pad.push_event(stream_start_evt).await; + + let caps = udpsrc.settings.lock().unwrap().caps.clone(); + if let Some(caps) = caps { + udpsrc + .src_pad + .push_event(gst::event::Caps::new(&caps)) + .await; + *udpsrc.configured_caps.lock().unwrap() = Some(caps); + } + + self.need_initial_events = false; + } + + if self.need_segment { + let segment_evt = + gst::event::Segment::new(&gst::FormattedSegment::::new()); + udpsrc.src_pad.push_event(segment_evt).await; + + self.need_segment = false; + } + + let res = udpsrc.src_pad.push(buffer).await; + match res { + Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"), + Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"), + Err(gst::FlowError::Eos) => { + gst::debug!(CAT, obj: &self.element, "EOS"); + udpsrc.src_pad.push_event(gst::event::Eos::new()).await; + } + Err(err) => { + gst::error!(CAT, obj: &self.element, "Got error {}", err); + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + } + } + + res + } } impl TaskImpl for UdpSrcTask { + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + async move { + let udpsrc = self.element.imp(); + let mut settings = udpsrc.settings.lock().unwrap(); + + gst::debug!(CAT, obj: &self.element, "Preparing Task"); + + self.retrieve_sender_address = settings.retrieve_sender_address; + + let socket = if let Some(ref wrapped_socket) = settings.socket { + let socket: UdpSocket; + + #[cfg(unix)] + { + socket = wrapped_socket.get() + } + #[cfg(windows)] + { + socket = wrapped_socket.get() + } + + let socket = Async::::try_from(socket).map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to setup Async socket: {}", err] + ) + })?; + + settings.used_socket = Some(wrapped_socket.clone()); + + socket + } else { + let addr: IpAddr = match settings.address { + None => { + return Err(gst::error_msg!( + gst::ResourceError::Settings, + ["No address set"] + )); + } + Some(ref addr) => match addr.parse() { + Err(err) => { + return Err(gst::error_msg!( + gst::ResourceError::Settings, + ["Invalid address '{}' set: {}", addr, err] + )); + } + Ok(addr) => addr, + }, + }; + let port = settings.port; + + // TODO: TTL, multicast loopback, etc + let saddr = if addr.is_multicast() { + let bind_addr = if addr.is_ipv4() { + IpAddr::V4(Ipv4Addr::UNSPECIFIED) + } else { + IpAddr::V6(Ipv6Addr::UNSPECIFIED) + }; + + let saddr = SocketAddr::new(bind_addr, port as u16); + gst::debug!( + CAT, + obj: &self.element, + "Binding to {:?} for multicast group {:?}", + saddr, + addr + ); + + saddr + } else { + let saddr = SocketAddr::new(addr, port as u16); + gst::debug!(CAT, obj: &self.element, "Binding to {:?}", saddr); + + saddr + }; + + let socket = if addr.is_ipv4() { + socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + ) + } else { + socket2::Socket::new( + socket2::Domain::IPV6, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + ) + } + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to create socket: {}", err] + ) + })?; + + socket.set_reuse_address(settings.reuse).map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to set reuse_address: {}", err] + ) + })?; + + #[cfg(unix)] + { + socket.set_reuse_port(settings.reuse).map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to set reuse_port: {}", err] + ) + })?; + } + + socket.bind(&saddr.into()).map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to bind socket: {}", err] + ) + })?; + + let socket = Async::::try_from(socket).map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to setup Async socket: {}", err] + ) + })?; + + if addr.is_multicast() { + // TODO: Multicast interface configuration, going to be tricky + match addr { + IpAddr::V4(addr) => { + socket + .as_ref() + .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to join multicast group: {}", err] + ) + })?; + } + IpAddr::V6(addr) => { + socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to join multicast group: {}", err] + ) + })?; + } + } + } + + settings.used_socket = Some(wrap_socket(&socket)?); + + socket + }; + + let port: i32 = socket.as_ref().local_addr().unwrap().port().into(); + if settings.port != port { + settings.port = port; + drop(settings); + self.element.notify("port"); + + settings = udpsrc.settings.lock().unwrap(); + }; + + let buffer_pool = gst::BufferPool::new(); + let mut config = buffer_pool.config(); + config.set_params(None, settings.mtu, 0, 0); + buffer_pool.set_config(config).map_err(|err| { + gst::error_msg!( + gst::ResourceError::Settings, + ["Failed to configure buffer pool {:?}", err] + ) + })?; + + self.socket = Some( + Socket::try_new( + self.element.clone().upcast(), + buffer_pool, + UdpReader::new(socket), + ) + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::OpenRead, + ["Failed to prepare socket {:?}", err] + ) + })?, + ); + + self.element.notify("used-socket"); + + Ok(()) + } + .boxed() + } + + fn unprepare(&mut self) -> BoxFuture<'_, ()> { + async move { + gst::debug!(CAT, obj: &self.element, "Unpreparing Task"); + let udpsrc = self.element.imp(); + udpsrc.settings.lock().unwrap().used_socket = None; + self.element.notify("used-socket"); + } + .boxed() + } + fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Starting task"); self.socket + .as_mut() + .unwrap() .set_clock(self.element.clock(), self.element.base_time()); gst::log!(CAT, obj: &self.element, "Task started"); Ok(()) @@ -312,47 +491,36 @@ impl TaskImpl for UdpSrcTask { fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { - let item = self.socket.next().await; + let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| { + gst::log!(CAT, obj: &self.element, "SocketStream Stopped"); + gst::FlowError::Flushing + })?; - let (mut buffer, saddr) = match item { - Some(Ok((buffer, saddr))) => (buffer, saddr), - Some(Err(err)) => { - gst::error!(CAT, obj: &self.element, "Got error {:?}", err); - match err { - SocketError::Gst(err) => { - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - } - SocketError::Io(err) => { - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("I/O error"), - ["streaming stopped, I/O error {}", err] - ); - } + let (mut buffer, saddr) = item.map_err(|err| { + gst::error!(CAT, obj: &self.element, "Got error {:?}", err); + match err { + SocketError::Gst(err) => { + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + } + SocketError::Io(err) => { + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); } - return Err(gst::FlowError::Error); } - None => { - gst::log!(CAT, obj: &self.element, "SocketStream Stopped"); - return Err(gst::FlowError::Flushing); - } - }; + gst::FlowError::Error + })?; if let Some(saddr) = saddr { - if self - .src_pad_handler - .0 - .state - .lock() - .await - .retrieve_sender_address - { + if self.retrieve_sender_address { NetAddressMeta::add( buffer.get_mut().unwrap(), &gio::InetSocketAddress::from(saddr), @@ -360,30 +528,7 @@ impl TaskImpl for UdpSrcTask { } } - let pad = self.src_pad.upgrade().expect("PadSrc no longer exists"); - let res = self - .src_pad_handler - .push_buffer(&pad, &self.element, buffer) - .await; - match res { - Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"), - Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"), - Err(gst::FlowError::Eos) => { - gst::debug!(CAT, obj: &self.element, "EOS"); - pad.push_event(gst::event::Eos::new()).await; - } - Err(err) => { - gst::error!(CAT, obj: &self.element, "Got error {}", err); - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - } - } - - res.map(drop) + self.push_buffer(buffer).await.map(drop) } .boxed() } @@ -391,7 +536,8 @@ impl TaskImpl for UdpSrcTask { fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Stopping task"); - self.src_pad_handler.reset_state().await; + self.need_initial_events = true; + self.need_segment = true; gst::log!(CAT, obj: &self.element, "Task stopped"); Ok(()) } @@ -401,7 +547,7 @@ impl TaskImpl for UdpSrcTask { fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Stopping task flush"); - self.src_pad_handler.set_need_segment().await; + self.need_segment = true; gst::log!(CAT, obj: &self.element, "Stopped task flush"); Ok(()) } @@ -411,9 +557,9 @@ impl TaskImpl for UdpSrcTask { pub struct UdpSrc { src_pad: PadSrc, - src_pad_handler: UdpSrcPadHandler, task: Task, - settings: StdMutex, + configured_caps: Mutex>, + settings: Mutex, } static CAT: Lazy = Lazy::new(|| { @@ -426,219 +572,21 @@ static CAT: Lazy = Lazy::new(|| { impl UdpSrc { fn prepare(&self, element: &super::UdpSrc) -> Result<(), gst::ErrorMessage> { - let mut settings_guard = self.settings.lock().unwrap(); - gst::debug!(CAT, obj: element, "Preparing"); - let context = Context::acquire(&settings_guard.context, settings_guard.context_wait) - .map_err(|err| { + let settings = self.settings.lock().unwrap(); + let context = + Context::acquire(&settings.context, settings.context_wait).map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, ["Failed to acquire Context: {}", err] ) })?; + drop(settings); - let socket = if let Some(ref wrapped_socket) = settings_guard.socket { - let socket: UdpSocket; - - #[cfg(unix)] - { - socket = wrapped_socket.get() - } - #[cfg(windows)] - { - socket = wrapped_socket.get() - } - - let socket = context.enter(|| { - Async::::try_from(socket).map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to setup Async socket: {}", err] - ) - }) - })?; - - settings_guard.used_socket = Some(wrapped_socket.clone()); - - socket - } else { - let addr: IpAddr = match settings_guard.address { - None => { - return Err(gst::error_msg!( - gst::ResourceError::Settings, - ["No address set"] - )); - } - Some(ref addr) => match addr.parse() { - Err(err) => { - return Err(gst::error_msg!( - gst::ResourceError::Settings, - ["Invalid address '{}' set: {}", addr, err] - )); - } - Ok(addr) => addr, - }, - }; - let port = settings_guard.port; - - // TODO: TTL, multicast loopback, etc - let saddr = if addr.is_multicast() { - let bind_addr = if addr.is_ipv4() { - IpAddr::V4(Ipv4Addr::UNSPECIFIED) - } else { - IpAddr::V6(Ipv6Addr::UNSPECIFIED) - }; - - let saddr = SocketAddr::new(bind_addr, port as u16); - gst::debug!( - CAT, - obj: element, - "Binding to {:?} for multicast group {:?}", - saddr, - addr - ); - - saddr - } else { - let saddr = SocketAddr::new(addr, port as u16); - gst::debug!(CAT, obj: element, "Binding to {:?}", saddr); - - saddr - }; - - let socket = if addr.is_ipv4() { - socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - ) - } else { - socket2::Socket::new( - socket2::Domain::IPV6, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - ) - } - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to create socket: {}", err] - ) - })?; - - socket - .set_reuse_address(settings_guard.reuse) - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to set reuse_address: {}", err] - ) - })?; - - #[cfg(unix)] - { - socket.set_reuse_port(settings_guard.reuse).map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to set reuse_port: {}", err] - ) - })?; - } - - socket.bind(&saddr.into()).map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to bind socket: {}", err] - ) - })?; - - let socket = context.enter(|| { - Async::::try_from(socket).map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to setup Async socket: {}", err] - ) - }) - })?; - - if addr.is_multicast() { - // TODO: Multicast interface configuration, going to be tricky - match addr { - IpAddr::V4(addr) => { - socket - .as_ref() - .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to join multicast group: {}", err] - ) - })?; - } - IpAddr::V6(addr) => { - socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to join multicast group: {}", err] - ) - })?; - } - } - } - - settings_guard.used_socket = Some(wrap_socket(&socket)?); - - socket - }; - - let port: i32 = socket.as_ref().local_addr().unwrap().port().into(); - let settings = if settings_guard.port != port { - settings_guard.port = port; - let settings = settings_guard.clone(); - drop(settings_guard); - element.notify("port"); - - settings - } else { - let settings = settings_guard.clone(); - drop(settings_guard); - - settings - }; - - let buffer_pool = gst::BufferPool::new(); - let mut config = buffer_pool.config(); - config.set_params(None, settings.mtu, 0, 0); - buffer_pool.set_config(config).map_err(|err| { - gst::error_msg!( - gst::ResourceError::Settings, - ["Failed to configure buffer pool {:?}", err] - ) - })?; - - let socket = Socket::try_new( - element.clone().upcast(), - buffer_pool, - UdpReader::new(socket), - ) - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::OpenRead, - ["Failed to prepare socket {:?}", err] - ) - })?; - - element.notify("used-socket"); - - self.src_pad_handler - .prepare(settings.caps, settings.retrieve_sender_address); - + *self.configured_caps.lock().unwrap() = None; self.task - .prepare( - UdpSrcTask::new(element, &self.src_pad, &self.src_pad_handler, socket), - context, - ) + .prepare(UdpSrcTask::new(element.clone()), context) .map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, @@ -653,12 +601,7 @@ impl UdpSrc { fn unprepare(&self, element: &super::UdpSrc) { gst::debug!(CAT, obj: element, "Unpreparing"); - - self.settings.lock().unwrap().used_socket = None; - element.notify("used-socket"); - self.task.unprepare().unwrap(); - gst::debug!(CAT, obj: element, "Unprepared"); } @@ -691,16 +634,14 @@ impl ObjectSubclass for UdpSrc { type ParentType = gst::Element; fn with_class(klass: &Self::Class) -> Self { - let src_pad_handler = UdpSrcPadHandler::default(); - Self { src_pad: PadSrc::new( gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")), - src_pad_handler.clone(), + UdpSrcPadHandler, ), - src_pad_handler, task: Task::default(), - settings: StdMutex::new(Settings::default()), + configured_caps: Default::default(), + settings: Default::default(), } } }