From 885d3de7bbca3abe76eae38619b97523959ece35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Thu, 23 Jun 2022 18:50:11 +0200 Subject: [PATCH] ts/udpsink: reduce sync primitives in async hot path The way the runtime::Task is implemented, UdpSinkTask is available as a mutable ref in all the TaskImpl functions, which offers the opportunity to avoid using Mutexes. Main higlights: - Removed the back and forth calls between UdpSinkPadHandler and UdpSinkTask. - Udp sockets are now part of UdpSinkTask, which is also in charge of preparing them instead of leaving this to UdpSink. This removed the need for Context::enter since TaskImpl::prepare already operates under the target Context. - In order for the clients list to be visible from the UdpSink, this list was maintained by UdpSinkPadHandler which was also in charge of (un)configuring the Udp sockets. The sockets are now part of UdpSinkTask, which is also in charge of the (un)configuration. Add/remove/replace requests are passed as commands to the UdpSinkTask via a channel. - The clients list visible to the UdpSink is now part of the Settings (it is also a read/write property). Since the actual socket (un)configuration is asynchronously handled by the Task, the clients list is updated by the add/remove/replace signals and set_property("clients", ..). Should a problem occur during the async (un)configuration, and only in this case, the UdpSinkTask would update the clients lists in Settings accordingly so that it stays consistent with the internal state. - The function clear_clients was renamed as replace_with_clients. - clients is now based on a BTreeSet instead of a Vec. All the managing functions perform some sort of lookup prior to updating the collection. It also ease implementation. - Removed the UdpSinkPadHandler RwLock. Using flume channels, we are able to clone the Receiver so it can be stored in UdpSink and reused when preparing the UdpSinkTask. --- generic/threadshare/Cargo.toml | 1 + generic/threadshare/src/udpsink/imp.rs | 1131 +++++++++++------------- 2 files changed, 533 insertions(+), 599 deletions(-) diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index bc2229d2..0d3f1692 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -11,6 +11,7 @@ rust-version = "1.57" [dependencies] async-task = "4.0.3" concurrent-queue = "1.2.2" +flume = "0.10.13" futures = { version = "0.3.17", features = ["thread-pool"] } libc = "0.2" gio = { git = "https://github.com/gtk-rs/gtk-rs-core" } diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index 1dd8935b..9eb80c84 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -17,9 +17,7 @@ // // SPDX-License-Identifier: LGPL-2.1-or-later -use futures::channel::mpsc; use futures::future::BoxFuture; -use futures::lock::Mutex; use futures::prelude::*; use gst::glib; @@ -34,10 +32,9 @@ use crate::runtime::prelude::*; use crate::runtime::{self, Async, Context, PadSink, PadSinkRef, Task}; use crate::socket::{wrap_socket, GioSocketWrapper}; -use std::mem; +use std::collections::BTreeSet; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; -use std::sync::Mutex as StdMutex; -use std::sync::{Arc, RwLock}; +use std::sync::Mutex; use std::time::Duration; use std::u16; use std::u8; @@ -80,6 +77,7 @@ struct Settings { qos_dscp: i32, context: String, context_wait: Duration, + clients: BTreeSet, } impl Default for Settings { @@ -101,6 +99,10 @@ impl Default for Settings { qos_dscp: DEFAULT_QOS_DSCP, context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, + clients: BTreeSet::from([SocketAddr::new( + DEFAULT_HOST.unwrap().parse().unwrap(), + DEFAULT_PORT as u16, + )]), } } } @@ -119,437 +121,8 @@ enum TaskItem { Event(gst::Event), } -#[derive(Debug)] -struct UdpSinkPadHandlerInner { - sync: bool, - segment: Option, - latency: Option, - socket: Arc>>>, - socket_v6: Arc>>>, - #[allow(clippy::rc_buffer)] - clients: Arc>, - clients_to_configure: Vec, - clients_to_unconfigure: Vec, - sender: Arc>>>, - settings: Arc>, -} - -impl UdpSinkPadHandlerInner { - fn new(settings: Arc>) -> Self { - UdpSinkPadHandlerInner { - sync: DEFAULT_SYNC, - segment: None, - latency: None, - socket: Arc::new(Mutex::new(None)), - socket_v6: Arc::new(Mutex::new(None)), - clients: Arc::new(vec![SocketAddr::new( - DEFAULT_HOST.unwrap().parse().unwrap(), - DEFAULT_PORT as u16, - )]), - clients_to_configure: vec![], - clients_to_unconfigure: vec![], - sender: Arc::new(Mutex::new(None)), - settings, - } - } - - fn clear_clients( - &mut self, - gst_pad: &gst::Pad, - clients_to_add: impl Iterator, - ) { - let old_clients = mem::take(&mut *Arc::make_mut(&mut self.clients)); - - self.clients_to_configure = vec![]; - self.clients_to_unconfigure = vec![]; - - for addr in clients_to_add { - if !old_clients.contains(&addr) { - self.clients_to_unconfigure.push(addr); - } - self.add_client(gst_pad, addr); - } - } - - fn remove_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) { - if !self.clients.contains(&addr) { - gst::warning!(CAT, obj: gst_pad, "Not removing unknown client {:?}", &addr); - return; - } - - gst::info!(CAT, obj: gst_pad, "Removing client {:?}", addr); - - Arc::make_mut(&mut self.clients).retain(|addr2| addr != *addr2); - - self.clients_to_unconfigure.push(addr); - self.clients_to_configure.retain(|addr2| addr != *addr2); - } - - fn add_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) { - if self.clients.contains(&addr) { - gst::warning!(CAT, obj: gst_pad, "Not adding client {:?} again", &addr); - return; - } - - gst::info!(CAT, obj: gst_pad, "Adding client {:?}", addr); - - Arc::make_mut(&mut self.clients).push(addr); - - self.clients_to_configure.push(addr); - self.clients_to_unconfigure.retain(|addr2| addr != *addr2); - } -} - -#[derive(Debug)] -enum SocketQualified { - Ipv4(Async), - Ipv6(Async), -} - #[derive(Clone, Debug)] -struct UdpSinkPadHandler(Arc>); - -impl UdpSinkPadHandler { - fn new(settings: Arc>) -> UdpSinkPadHandler { - Self(Arc::new(RwLock::new(UdpSinkPadHandlerInner::new(settings)))) - } - - fn set_latency(&self, latency: gst::ClockTime) { - self.0.write().unwrap().latency = Some(latency); - } - - fn prepare(&self) { - let mut inner = self.0.write().unwrap(); - inner.clients_to_configure = inner.clients.to_vec(); - } - - fn prepare_socket(&self, socket: SocketQualified) { - let mut inner = self.0.write().unwrap(); - - match socket { - SocketQualified::Ipv4(socket) => inner.socket = Arc::new(Mutex::new(Some(socket))), - SocketQualified::Ipv6(socket) => inner.socket_v6 = Arc::new(Mutex::new(Some(socket))), - } - } - - fn unprepare(&self) { - let mut inner = self.0.write().unwrap(); - *inner = UdpSinkPadHandlerInner::new(Arc::clone(&inner.settings)) - } - - fn clear_clients(&self, gst_pad: &gst::Pad, clients_to_add: impl Iterator) { - self.0 - .write() - .unwrap() - .clear_clients(gst_pad, clients_to_add); - } - - fn remove_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) { - self.0.write().unwrap().remove_client(gst_pad, addr); - } - - fn add_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) { - self.0.write().unwrap().add_client(gst_pad, addr); - } - - fn clients(&self) -> Vec { - (*self.0.read().unwrap().clients).clone() - } - - fn configure_client( - &self, - settings: &Settings, - socket: &mut Option>, - socket_v6: &mut Option>, - client: &SocketAddr, - ) -> Result<(), gst::ErrorMessage> { - if client.ip().is_multicast() { - match client.ip() { - IpAddr::V4(addr) => { - if let Some(socket) = socket.as_mut() { - if settings.auto_multicast { - socket - .as_ref() - .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to join multicast group: {}", err] - ) - })?; - } - if settings.multicast_loop { - socket.as_ref().set_multicast_loop_v4(true).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast loop: {}", err] - ) - })?; - } - socket - .as_ref() - .set_multicast_ttl_v4(settings.ttl_mc) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast ttl: {}", err] - ) - })?; - } - } - IpAddr::V6(addr) => { - if let Some(socket) = socket_v6.as_mut() { - if settings.auto_multicast { - socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to join multicast group: {}", err] - ) - })?; - } - if settings.multicast_loop { - socket.as_ref().set_multicast_loop_v6(true).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast loop: {}", err] - ) - })?; - } - /* FIXME no API for set_multicast_ttl_v6 ? */ - } - } - } - } else { - match client.ip() { - IpAddr::V4(_) => { - if let Some(socket) = socket.as_mut() { - socket.as_ref().set_ttl(settings.ttl).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set unicast ttl: {}", err] - ) - })?; - } - } - IpAddr::V6(_) => { - if let Some(socket) = socket_v6.as_mut() { - socket.as_ref().set_ttl(settings.ttl).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set unicast ttl: {}", err] - ) - })?; - } - } - } - } - - Ok(()) - } - - fn unconfigure_client( - &self, - settings: &Settings, - socket: &mut Option>, - socket_v6: &mut Option>, - client: &SocketAddr, - ) -> Result<(), gst::ErrorMessage> { - if client.ip().is_multicast() { - match client.ip() { - IpAddr::V4(addr) => { - if let Some(socket) = socket.as_mut() { - if settings.auto_multicast { - socket - .as_ref() - .leave_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to join multicast group: {}", err] - ) - })?; - } - } - } - IpAddr::V6(addr) => { - if let Some(socket) = socket_v6.as_mut() { - if settings.auto_multicast { - socket - .as_ref() - .leave_multicast_v6(&addr, 0) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to join multicast group: {}", err] - ) - })?; - } - } - } - } - } - - Ok(()) - } - - async fn render( - &self, - element: &super::UdpSink, - buffer: gst::Buffer, - ) -> Result { - let ( - do_sync, - rtime, - clients, - clients_to_configure, - clients_to_unconfigure, - socket, - socket_v6, - settings, - ) = { - let mut inner = self.0.write().unwrap(); - let do_sync = inner.sync; - let mut rtime = gst::ClockTime::NONE; - - if let Some(segment) = &inner.segment { - rtime = segment - .downcast_ref::() - .and_then(|segment| { - segment.to_running_time(buffer.pts()).opt_add(inner.latency) - }); - } - - let clients_to_configure = mem::take(&mut inner.clients_to_configure); - let clients_to_unconfigure = mem::take(&mut inner.clients_to_unconfigure); - - let settings = inner.settings.lock().unwrap().clone(); - - ( - do_sync, - rtime, - Arc::clone(&inner.clients), - clients_to_configure, - clients_to_unconfigure, - Arc::clone(&inner.socket), - Arc::clone(&inner.socket_v6), - settings, - ) - }; - - let mut socket = socket.lock().await; - let mut socket_v6 = socket_v6.lock().await; - - if !clients_to_configure.is_empty() { - for client in &clients_to_configure { - self.configure_client(&settings, &mut socket, &mut socket_v6, client) - .map_err(|err| { - element_error!( - element, - gst::StreamError::Failed, - ["Failed to configure client {:?}: {}", client, err] - ); - - gst::FlowError::Error - })?; - } - } - - if !clients_to_unconfigure.is_empty() { - for client in &clients_to_unconfigure { - self.unconfigure_client(&settings, &mut socket, &mut socket_v6, client) - .map_err(|err| { - element_error!( - element, - gst::StreamError::Failed, - ["Failed to unconfigure client {:?}: {}", client, err] - ); - - gst::FlowError::Error - })?; - } - } - - if do_sync { - self.sync(element, rtime).await; - } - - let data = buffer.map_readable().map_err(|_| { - element_error!( - element, - gst::StreamError::Format, - ["Failed to map buffer readable"] - ); - - gst::FlowError::Error - })?; - - for client in clients.iter() { - let socket = match client.ip() { - IpAddr::V4(_) => &mut socket, - IpAddr::V6(_) => &mut socket_v6, - }; - - if let Some(socket) = socket.as_mut() { - gst::log!(CAT, obj: element, "Sending to {:?}", &client); - socket.send_to(&data, *client).await.map_err(|err| { - element_error!( - element, - gst::StreamError::Failed, - ("I/O error"), - ["streaming stopped, I/O error {}", err] - ); - gst::FlowError::Error - })?; - } else { - element_error!( - element, - gst::StreamError::Failed, - ("I/O error"), - ["No socket available for sending to {}", client] - ); - return Err(gst::FlowError::Error); - } - } - - gst::log!( - CAT, - obj: element, - "Sent buffer {:?} to all clients", - &buffer - ); - - Ok(gst::FlowSuccess::Ok) - } - - /* Wait until specified time */ - async fn sync( - &self, - element: &super::UdpSink, - running_time: impl Into>, - ) { - let now = element.current_running_time(); - - match running_time.into().opt_checked_sub(now) { - Ok(Some(delay)) => runtime::time::delay_for(delay.into()).await, - _ => runtime::executor::yield_now().await, - } - } - - async fn handle_event(&self, element: &super::UdpSink, event: gst::Event) { - match event.view() { - EventView::Eos(_) => { - let _ = element.post_message(gst::message::Eos::builder().src(element).build()); - } - EventView::Segment(e) => { - self.0.write().unwrap().segment = Some(e.segment().clone()); - } - EventView::SinkMessage(e) => { - let _ = element.post_message(e.message()); - } - _ => (), - } - } -} +struct UdpSinkPadHandler; impl PadSinkHandler for UdpSinkPadHandler { type ElementImpl = UdpSink; @@ -557,20 +130,19 @@ impl PadSinkHandler for UdpSinkPadHandler { fn sink_chain( &self, _pad: &PadSinkRef, - _udpsink: &UdpSink, + udpsink: &UdpSink, element: &gst::Element, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let sender = Arc::clone(&self.0.read().unwrap().sender); + let sender = udpsink.item_sender.clone(); let element = element.clone().downcast::().unwrap(); async move { - if let Some(sender) = sender.lock().await.as_mut() { - if sender.send(TaskItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); - return Err(gst::FlowError::Flushing); - } + if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { + gst::debug!(CAT, obj: &element, "Flushing"); + return Err(gst::FlowError::Flushing); } + Ok(gst::FlowSuccess::Ok) } .boxed() @@ -579,20 +151,18 @@ impl PadSinkHandler for UdpSinkPadHandler { fn sink_chain_list( &self, _pad: &PadSinkRef, - _udpsink: &UdpSink, + udpsink: &UdpSink, element: &gst::Element, list: gst::BufferList, ) -> BoxFuture<'static, Result> { - let sender = Arc::clone(&self.0.read().unwrap().sender); + let sender = udpsink.item_sender.clone(); let element = element.clone().downcast::().unwrap(); async move { - if let Some(sender) = sender.lock().await.as_mut() { - for buffer in list.iter_owned() { - if sender.send(TaskItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); - return Err(gst::FlowError::Flushing); - } + for buffer in list.iter_owned() { + if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { + gst::debug!(CAT, obj: &element, "Flushing"); + return Err(gst::FlowError::Flushing); } } @@ -604,21 +174,19 @@ impl PadSinkHandler for UdpSinkPadHandler { fn sink_event_serialized( &self, _pad: &PadSinkRef, - _udpsink: &UdpSink, + udpsink: &UdpSink, element: &gst::Element, event: gst::Event, ) -> BoxFuture<'static, bool> { - let sender = Arc::clone(&self.0.read().unwrap().sender); + let sender = udpsink.item_sender.clone(); let element = element.clone().downcast::().unwrap(); async move { if let EventView::FlushStop(_) = event.view() { let udpsink = element.imp(); return udpsink.task.flush_stop().is_ok(); - } else if let Some(sender) = sender.lock().await.as_mut() { - if sender.send(TaskItem::Event(event)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); - } + } else if sender.send_async(TaskItem::Event(event)).await.is_err() { + gst::debug!(CAT, obj: &element, "Flushing"); } true @@ -641,123 +209,90 @@ impl PadSinkHandler for UdpSinkPadHandler { } } +#[derive(Debug)] +enum Command { + AddClient(SocketAddr), + RemoveClient(SocketAddr), + ReplaceWithClients(BTreeSet), + SetLatency(Option), + SetSync(bool), +} + #[derive(Debug)] struct UdpSinkTask { element: super::UdpSink, - sink_pad_handler: UdpSinkPadHandler, - receiver: Option>, + item_receiver: flume::Receiver, + cmd_receiver: flume::Receiver, + clients: BTreeSet, + socket: Option>, + socket_v6: Option>, + sync: bool, + latency: Option, + segment: Option, } impl UdpSinkTask { - fn new(element: &super::UdpSink, sink_pad_handler: &UdpSinkPadHandler) -> Self { + fn new( + element: &super::UdpSink, + item_receiver: flume::Receiver, + cmd_receiver: flume::Receiver, + ) -> Self { UdpSinkTask { element: element.clone(), - sink_pad_handler: sink_pad_handler.clone(), - receiver: None, + item_receiver, + cmd_receiver, + clients: Default::default(), + socket: None, + socket_v6: None, + sync: false, + latency: None, + segment: None, + } + } + + fn process_command(&mut self, cmd: Command) { + use Command::*; + match cmd { + AddClient(client) => self.add_client(client), + RemoveClient(client) => self.remove_client(&client), + ReplaceWithClients(clients) => self.replace_with_clients(clients), + SetSync(sync) => self.sync = sync, + SetLatency(latency) => self.latency = latency, } } } -impl TaskImpl for UdpSinkTask { - fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async move { - gst::log!(CAT, obj: &self.element, "Starting task"); - - let (sender, receiver) = mpsc::channel(0); - - let mut sink_pad_handler = self.sink_pad_handler.0.write().unwrap(); - sink_pad_handler.sender = Arc::new(Mutex::new(Some(sender))); - - self.receiver = Some(receiver); - - gst::log!(CAT, obj: &self.element, "Task started"); - Ok(()) - } - .boxed() - } - - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { - async move { - match self.receiver.as_mut().unwrap().next().await { - Some(TaskItem::Buffer(buffer)) => { - match self.sink_pad_handler.render(&self.element, buffer).await { - Err(err) => { - element_error!( - &self.element, - gst::StreamError::Failed, - ["Failed to render item, stopping task: {}", err] - ); - - Err(gst::FlowError::Error) - } - _ => Ok(()), - } - } - Some(TaskItem::Event(event)) => { - self.sink_pad_handler - .handle_event(&self.element, event) - .await; - Ok(()) - } - None => Err(gst::FlowError::Flushing), - } - } - .boxed() - } -} - -#[derive(Debug)] -enum SocketFamily { - Ipv4, - Ipv6, -} - -#[derive(Debug)] -pub struct UdpSink { - sink_pad: PadSink, - sink_pad_handler: UdpSinkPadHandler, - task: Task, - settings: Arc>, -} - -impl UdpSink { +/// Socket configuration. +impl UdpSinkTask { fn prepare_socket( &self, + settings: &mut Settings, family: SocketFamily, - context: &Context, - element: &super::UdpSink, - ) -> Result<(), gst::ErrorMessage> { - let mut settings = self.settings.lock().unwrap(); - + ) -> Result>, gst::ErrorMessage> { let wrapped_socket = match family { SocketFamily::Ipv4 => &settings.socket, SocketFamily::Ipv6 => &settings.socket_v6, }; - let socket_qualified: SocketQualified; - if let Some(ref wrapped_socket) = wrapped_socket { let socket: UdpSocket = wrapped_socket.get(); - - let socket = context.enter(|| { - Async::::try_from(socket).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to setup Async socket: {}", err] - ) - }) + let socket = Async::::try_from(socket).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup Async socket: {}", err] + ) })?; match family { SocketFamily::Ipv4 => { settings.used_socket = Some(wrapped_socket.clone()); - socket_qualified = SocketQualified::Ipv4(socket); } SocketFamily::Ipv6 => { settings.used_socket_v6 = Some(wrapped_socket.clone()); - socket_qualified = SocketQualified::Ipv6(socket); } } + + Ok(Some(socket)) } else { let bind_addr = match family { SocketFamily::Ipv4 => &settings.bind_address, @@ -777,7 +312,7 @@ impl UdpSink { }; let saddr = SocketAddr::new(bind_addr, bind_port as u16); - gst::debug!(CAT, obj: element, "Binding to {:?}", saddr); + gst::debug!(CAT, obj: &self.element, "Binding to {:?}", saddr); let socket = match family { SocketFamily::Ipv4 => socket2::Socket::new( @@ -797,7 +332,7 @@ impl UdpSink { Err(err) => { gst::warning!( CAT, - obj: element, + obj: &self.element, "Failed to create {} socket: {}", match family { SocketFamily::Ipv4 => "IPv4", @@ -805,7 +340,7 @@ impl UdpSink { }, err ); - return Ok(()); + return Ok(None); } }; @@ -816,13 +351,11 @@ impl UdpSink { ) })?; - let socket = context.enter(|| { - Async::::try_from(socket).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to setup Async socket: {}", err] - ) - }) + let socket = Async::::try_from(socket).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup Async socket: {}", err] + ) })?; let wrapper = wrap_socket(&socket)?; @@ -839,20 +372,413 @@ impl UdpSink { match family { SocketFamily::Ipv4 => { settings.used_socket = Some(wrapper); - socket_qualified = SocketQualified::Ipv4(socket) } SocketFamily::Ipv6 => { settings.used_socket_v6 = Some(wrapper); - socket_qualified = SocketQualified::Ipv6(socket) + } + } + + Ok(Some(socket)) + } + } + + fn add_client(&mut self, addr: SocketAddr) { + if self.clients.contains(&addr) { + gst::warning!(CAT, obj: &self.element, "Not adding client {:?} again", &addr); + return; + } + + let udpsink = self.element.imp(); + let mut settings = udpsink.settings.lock().unwrap(); + match self.configure_client(&settings, &addr) { + Ok(()) => { + gst::info!(CAT, obj: &self.element, "Added client {:?}", addr); + self.clients.insert(addr); + } + Err(err) => { + gst::error!(CAT, obj: &self.element, "Failed to add client {:?}: {}", addr, err); + settings.clients = self.clients.clone(); + self.element.post_error_message(err); + } + } + } + + fn remove_client(&mut self, addr: &SocketAddr) { + if self.clients.take(addr).is_none() { + gst::warning!(CAT, obj: &self.element, "Not removing unknown client {:?}", &addr); + return; + } + + let udpsink = self.element.imp(); + let mut settings = udpsink.settings.lock().unwrap(); + match self.unconfigure_client(&settings, addr) { + Ok(()) => { + gst::info!(CAT, obj: &self.element, "Removed client {:?}", addr); + } + Err(err) => { + gst::error!(CAT, obj: &self.element, "Failed to remove client {:?}: {}", addr, err); + settings.clients = self.clients.clone(); + self.element.post_error_message(err); + } + } + } + + fn replace_with_clients(&mut self, mut clients_to_add: BTreeSet) { + if clients_to_add.is_empty() { + gst::info!(CAT, obj: &self.element, "Clearing clients"); + } else { + gst::info!(CAT, obj: &self.element, "Replacing clients"); + } + + let old_clients = std::mem::take(&mut self.clients); + + let mut res = Ok(()); + let udpsink = self.element.imp(); + let mut settings = udpsink.settings.lock().unwrap(); + + for addr in old_clients.iter() { + if clients_to_add.take(addr).is_some() { + // client is already configured + self.clients.insert(*addr); + } else if let Err(err) = self.unconfigure_client(&settings, addr) { + gst::error!(CAT, obj: &self.element, "Failed to remove client {:?}: {}", addr, err); + res = Err(err); + } + } + + for addr in clients_to_add.into_iter() { + if let Err(err) = self.configure_client(&settings, &addr) { + gst::error!(CAT, obj: &self.element, "Failed to add client {:?}: {}", addr, err); + res = Err(err); + } else { + self.clients.insert(addr); + } + } + + // FIXME: which error handling: + // - If at least one client could be configured, should we keep going? (current) + // - or, should we consider the preparation failed when the first client + // configuration fails? (previously) + if let Err(err) = res { + settings.clients = self.clients.clone(); + self.element.post_error_message(err); + } + } + + fn configure_client( + &self, + settings: &Settings, + client: &SocketAddr, + ) -> Result<(), gst::ErrorMessage> { + if client.ip().is_multicast() { + match client.ip() { + IpAddr::V4(addr) => { + if let Some(socket) = self.socket.as_ref() { + if settings.auto_multicast { + socket + .as_ref() + .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + [ + "Failed to join multicast group for {:?}: {}", + client, + err + ] + ) + })?; + } + if settings.multicast_loop { + socket.as_ref().set_multicast_loop_v4(true).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop for {:?}: {}", client, err] + ) + })?; + } + + socket + .as_ref() + .set_multicast_ttl_v4(settings.ttl_mc) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast ttl for {:?}: {}", client, err] + ) + })?; + } + } + IpAddr::V6(addr) => { + if let Some(socket) = self.socket_v6.as_ref() { + if settings.auto_multicast { + socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to join multicast group for {:?}: {}", client, err] + ) + })?; + } + if settings.multicast_loop { + socket.as_ref().set_multicast_loop_v6(true).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop for {:?}: {}", client, err] + ) + })?; + } + /* FIXME no API for set_multicast_ttl_v6 ? */ + } + } + } + } else { + match client.ip() { + IpAddr::V4(_) => { + if let Some(socket) = self.socket.as_ref() { + socket.as_ref().set_ttl(settings.ttl).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl for {:?}: {}", client, err] + ) + })?; + } + } + IpAddr::V6(_) => { + if let Some(socket) = self.socket_v6.as_ref() { + socket.as_ref().set_ttl(settings.ttl).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl for {:?}: {}", client, err] + ) + })?; + } } } } - self.sink_pad_handler.prepare_socket(socket_qualified); + Ok(()) + } + + fn unconfigure_client( + &self, + settings: &Settings, + client: &SocketAddr, + ) -> Result<(), gst::ErrorMessage> { + if client.ip().is_multicast() { + match client.ip() { + IpAddr::V4(addr) => { + if let Some(socket) = self.socket.as_ref() { + if settings.auto_multicast { + socket + .as_ref() + .leave_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + [ + "Failed to leave multicast group for {:?}: {}", + client, + err + ] + ) + })?; + } + } + } + IpAddr::V6(addr) => { + if let Some(socket) = self.socket_v6.as_ref() { + if settings.auto_multicast { + socket + .as_ref() + .leave_multicast_v6(&addr, 0) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + [ + "Failed to leave multicast group for {:?}: {}", + client, + err + ] + ) + })?; + } + } + } + } + } + + Ok(()) + } +} + +/// Buffer handling. +impl UdpSinkTask { + async fn render(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> { + if self.sync { + let rtime = self.segment.as_ref().and_then(|segment| { + segment + .downcast_ref::() + .and_then(|segment| segment.to_running_time(buffer.pts()).opt_add(self.latency)) + }); + if let Some(rtime) = rtime { + self.sync(rtime).await; + } + } + + let data = buffer.map_readable().map_err(|_| { + element_error!( + self.element, + gst::StreamError::Format, + ["Failed to map buffer readable"] + ); + gst::FlowError::Error + })?; + + for client in self.clients.iter() { + let socket = match client.ip() { + IpAddr::V4(_) => &mut self.socket, + IpAddr::V6(_) => &mut self.socket_v6, + }; + + if let Some(socket) = socket.as_mut() { + gst::log!(CAT, obj: &self.element, "Sending to {:?}", &client); + socket.send_to(&data, *client).await.map_err(|err| { + element_error!( + self.element, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); + gst::FlowError::Error + })?; + } else { + element_error!( + self.element, + gst::StreamError::Failed, + ("I/O error"), + ["No socket available for sending to {}", client] + ); + return Err(gst::FlowError::Error); + } + } + + gst::log!( + CAT, + obj: &self.element, + "Sent buffer {:?} to all clients", + &buffer + ); Ok(()) } + /// Waits until specified time. + async fn sync(&self, running_time: gst::ClockTime) { + let now = self.element.current_running_time(); + + match running_time.opt_checked_sub(now) { + Ok(Some(delay)) => { + runtime::time::delay_for(delay.into()).await; + } + _ => runtime::executor::yield_now().await, + } + } +} + +impl TaskImpl for UdpSinkTask { + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + async move { + gst::info!(CAT, obj: &self.element, "Preparing Task"); + assert!(self.clients.is_empty()); + { + let udpsink = self.element.imp(); + let mut settings = udpsink.settings.lock().unwrap(); + self.sync = settings.sync; + self.socket = self.prepare_socket(&mut settings, SocketFamily::Ipv4)?; + self.socket_v6 = self.prepare_socket(&mut settings, SocketFamily::Ipv6)?; + } + + while let Ok(cmd) = self.cmd_receiver.try_recv() { + self.process_command(cmd); + } + + Ok(()) + } + .boxed() + } + + fn unprepare(&mut self) -> BoxFuture<'_, ()> { + async move { + gst::info!(CAT, obj: &self.element, "Unpreparing Task"); + + let udpsink = self.element.imp(); + let settings = udpsink.settings.lock().unwrap(); + for addr in self.clients.iter() { + let _ = self.unconfigure_client(&settings, addr); + } + } + .boxed() + } + + fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + let item = futures::select_biased! { + cmd = self.cmd_receiver.recv_async() => { + self.process_command(cmd.unwrap()); + return Ok(()); + } + item = self.item_receiver.recv_async() => item, + }; + + match item.map_err(|_| gst::FlowError::Flushing)? { + TaskItem::Buffer(buffer) => self.render(buffer).await.map_err(|err| { + element_error!( + &self.element, + gst::StreamError::Failed, + ["Failed to render item, stopping task: {}", err] + ); + gst::FlowError::Error + })?, + TaskItem::Event(event) => match event.view() { + EventView::Eos(_) => { + let _ = self + .element + .post_message(gst::message::Eos::builder().src(&self.element).build()); + } + EventView::Segment(e) => { + self.segment = Some(e.segment().clone()); + } + EventView::SinkMessage(e) => { + let _ = self.element.post_message(e.message()); + } + _ => (), + }, + } + + Ok(()) + } + .boxed() + } +} + +#[derive(Debug)] +enum SocketFamily { + Ipv4, + Ipv6, +} + +#[derive(Debug)] +pub struct UdpSink { + sink_pad: PadSink, + task: Task, + item_sender: flume::Sender, + item_receiver: flume::Receiver, + cmd_sender: flume::Sender, + cmd_receiver: flume::Receiver, + settings: Mutex, +} + +impl UdpSink { fn prepare(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Preparing"); @@ -867,30 +793,26 @@ impl UdpSink { })? }; - self.sink_pad_handler.prepare(); - self.prepare_socket(SocketFamily::Ipv4, &context, element)?; - self.prepare_socket(SocketFamily::Ipv6, &context, element)?; + let task_impl = UdpSinkTask::new( + element, + self.item_receiver.clone(), + self.cmd_receiver.clone(), + ); + self.task.prepare(task_impl, context).map_err(|err| { + error_msg!( + gst::ResourceError::OpenRead, + ["Error preparing Task: {:?}", err] + ) + })?; - self.task - .prepare(UdpSinkTask::new(element, &self.sink_pad_handler), context) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing Task: {:?}", err] - ) - })?; - - gst::debug!(CAT, obj: element, "Started preparing"); + gst::debug!(CAT, obj: element, "Started preparation"); Ok(()) } fn unprepare(&self, element: &super::UdpSink) { gst::debug!(CAT, obj: element, "Unpreparing"); - self.task.unprepare().unwrap(); - self.sink_pad_handler.unprepare(); - gst::debug!(CAT, obj: element, "Unprepared"); } @@ -910,19 +832,26 @@ impl UdpSink { } impl UdpSink { - fn clear_clients(&self, clients_to_add: impl Iterator) { - self.sink_pad_handler - .clear_clients(self.sink_pad.gst_pad(), clients_to_add); + fn add_client(&self, settings: &mut Settings, client: SocketAddr) { + settings.clients.insert(client); + self.cmd_sender.send(Command::AddClient(client)).unwrap(); } - fn remove_client(&self, addr: SocketAddr) { - self.sink_pad_handler - .remove_client(self.sink_pad.gst_pad(), addr); + fn remove_client(&self, settings: &mut Settings, client: SocketAddr) { + settings.clients.remove(&client); + self.cmd_sender.send(Command::RemoveClient(client)).unwrap(); } - fn add_client(&self, addr: SocketAddr) { - self.sink_pad_handler - .add_client(self.sink_pad.gst_pad(), addr); + fn replace_with_clients( + &self, + settings: &mut Settings, + clients: impl IntoIterator, + ) { + let clients = BTreeSet::::from_iter(clients); + settings.clients = clients.clone(); + self.cmd_sender + .send(Command::ReplaceWithClients(clients)) + .unwrap(); } } @@ -953,17 +882,21 @@ impl ObjectSubclass for UdpSink { type ParentType = gst::Element; fn with_class(klass: &Self::Class) -> Self { - let settings = Arc::new(StdMutex::new(Settings::default())); - let sink_pad_handler = UdpSinkPadHandler::new(Arc::clone(&settings)); + // Enable backpressure for items + let (item_sender, item_receiver) = flume::bounded(0); + let (cmd_sender, cmd_receiver) = flume::unbounded(); Self { sink_pad: PadSink::new( gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")), - sink_pad_handler.clone(), + UdpSinkPadHandler, ), - sink_pad_handler, task: Task::default(), - settings, + item_sender, + item_receiver, + cmd_sender, + cmd_receiver, + settings: Default::default(), } } } @@ -1125,7 +1058,8 @@ impl ObjectImpl for UdpSink { if let Ok(addr) = try_into_socket_addr(&element, &host, port) { let udpsink = element.imp(); - udpsink.add_client(addr); + let mut settings = udpsink.settings.lock().unwrap(); + udpsink.add_client(&mut settings, addr); } None @@ -1144,7 +1078,8 @@ impl ObjectImpl for UdpSink { if let Ok(addr) = try_into_socket_addr(&element, &host, port) { let udpsink = element.imp(); - udpsink.remove_client(addr); + let mut settings = udpsink.settings.lock().unwrap(); + udpsink.remove_client(&mut settings, addr); } None @@ -1156,7 +1091,8 @@ impl ObjectImpl for UdpSink { let element = args[0].get::().expect("signal arg"); let udpsink = element.imp(); - udpsink.clear_clients(std::iter::empty()); + let mut settings = udpsink.settings.lock().unwrap(); + udpsink.replace_with_clients(&mut settings, BTreeSet::new()); None }) @@ -1177,7 +1113,9 @@ impl ObjectImpl for UdpSink { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "sync" => { - settings.sync = value.get().expect("type checked upstream"); + let sync = value.get().expect("type checked upstream"); + settings.sync = sync; + self.cmd_sender.send(Command::SetSync(sync)).unwrap(); } "bind-address" => { settings.bind_address = value @@ -1236,7 +1174,7 @@ impl ObjectImpl for UdpSink { .expect("type checked upstream") .unwrap_or_else(|| "".into()); - let clients_iter = clients.split(',').filter_map(|client| { + let clients = clients.split(',').filter_map(|client| { let rsplit: Vec<&str> = client.rsplitn(2, ':').collect(); if rsplit.len() == 2 { @@ -1251,9 +1189,8 @@ impl ObjectImpl for UdpSink { None } }); - drop(settings); - self.clear_clients(clients_iter); + self.replace_with_clients(&mut settings, clients); } "context" => { settings.context = value @@ -1304,14 +1241,9 @@ impl ObjectImpl for UdpSink { "ttl-mc" => settings.ttl_mc.to_value(), "qos-dscp" => settings.qos_dscp.to_value(), "clients" => { + let clients = settings.clients.clone(); drop(settings); - - let clients: Vec = self - .sink_pad_handler - .clients() - .iter() - .map(ToString::to_string) - .collect(); + let clients: Vec = clients.iter().map(ToString::to_string).collect(); clients.join(",").to_value() } @@ -1396,7 +1328,8 @@ impl ElementImpl for UdpSink { fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool { match event.view() { EventView::Latency(ev) => { - self.sink_pad_handler.set_latency(ev.latency()); + let latency = Some(ev.latency()); + self.cmd_sender.send(Command::SetLatency(latency)).unwrap(); self.sink_pad.gst_pad().push_event(event) } EventView::Step(..) => false,