diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index bc2229d2..0d3f1692 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -11,6 +11,7 @@ rust-version = "1.57" [dependencies] async-task = "4.0.3" concurrent-queue = "1.2.2" +flume = "0.10.13" futures = { version = "0.3.17", features = ["thread-pool"] } libc = "0.2" gio = { git = "https://github.com/gtk-rs/gtk-rs-core" } diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index 1dd8935b..9eb80c84 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -17,9 +17,7 @@ // // SPDX-License-Identifier: LGPL-2.1-or-later -use futures::channel::mpsc; use futures::future::BoxFuture; -use futures::lock::Mutex; use futures::prelude::*; use gst::glib; @@ -34,10 +32,9 @@ use crate::runtime::prelude::*; use crate::runtime::{self, Async, Context, PadSink, PadSinkRef, Task}; use crate::socket::{wrap_socket, GioSocketWrapper}; -use std::mem; +use std::collections::BTreeSet; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; -use std::sync::Mutex as StdMutex; -use std::sync::{Arc, RwLock}; +use std::sync::Mutex; use std::time::Duration; use std::u16; use std::u8; @@ -80,6 +77,7 @@ struct Settings { qos_dscp: i32, context: String, context_wait: Duration, + clients: BTreeSet, } impl Default for Settings { @@ -101,6 +99,10 @@ impl Default for Settings { qos_dscp: DEFAULT_QOS_DSCP, context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, + clients: BTreeSet::from([SocketAddr::new( + DEFAULT_HOST.unwrap().parse().unwrap(), + DEFAULT_PORT as u16, + )]), } } } @@ -119,437 +121,8 @@ enum TaskItem { Event(gst::Event), } -#[derive(Debug)] -struct UdpSinkPadHandlerInner { - sync: bool, - segment: Option, - latency: Option, - socket: Arc>>>, - socket_v6: Arc>>>, - #[allow(clippy::rc_buffer)] - clients: Arc>, - clients_to_configure: Vec, - clients_to_unconfigure: Vec, - sender: Arc>>>, - settings: Arc>, -} - -impl UdpSinkPadHandlerInner { - fn new(settings: Arc>) -> Self { - UdpSinkPadHandlerInner { - sync: DEFAULT_SYNC, - segment: None, - latency: None, - socket: Arc::new(Mutex::new(None)), - socket_v6: Arc::new(Mutex::new(None)), - clients: Arc::new(vec![SocketAddr::new( - DEFAULT_HOST.unwrap().parse().unwrap(), - DEFAULT_PORT as u16, - )]), - clients_to_configure: vec![], - clients_to_unconfigure: vec![], - sender: Arc::new(Mutex::new(None)), - settings, - } - } - - fn clear_clients( - &mut self, - gst_pad: &gst::Pad, - clients_to_add: impl Iterator, - ) { - let old_clients = mem::take(&mut *Arc::make_mut(&mut self.clients)); - - self.clients_to_configure = vec![]; - self.clients_to_unconfigure = vec![]; - - for addr in clients_to_add { - if !old_clients.contains(&addr) { - self.clients_to_unconfigure.push(addr); - } - self.add_client(gst_pad, addr); - } - } - - fn remove_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) { - if !self.clients.contains(&addr) { - gst::warning!(CAT, obj: gst_pad, "Not removing unknown client {:?}", &addr); - return; - } - - gst::info!(CAT, obj: gst_pad, "Removing client {:?}", addr); - - Arc::make_mut(&mut self.clients).retain(|addr2| addr != *addr2); - - self.clients_to_unconfigure.push(addr); - self.clients_to_configure.retain(|addr2| addr != *addr2); - } - - fn add_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) { - if self.clients.contains(&addr) { - gst::warning!(CAT, obj: gst_pad, "Not adding client {:?} again", &addr); - return; - } - - gst::info!(CAT, obj: gst_pad, "Adding client {:?}", addr); - - Arc::make_mut(&mut self.clients).push(addr); - - self.clients_to_configure.push(addr); - self.clients_to_unconfigure.retain(|addr2| addr != *addr2); - } -} - -#[derive(Debug)] -enum SocketQualified { - Ipv4(Async), - Ipv6(Async), -} - #[derive(Clone, Debug)] -struct UdpSinkPadHandler(Arc>); - -impl UdpSinkPadHandler { - fn new(settings: Arc>) -> UdpSinkPadHandler { - Self(Arc::new(RwLock::new(UdpSinkPadHandlerInner::new(settings)))) - } - - fn set_latency(&self, latency: gst::ClockTime) { - self.0.write().unwrap().latency = Some(latency); - } - - fn prepare(&self) { - let mut inner = self.0.write().unwrap(); - inner.clients_to_configure = inner.clients.to_vec(); - } - - fn prepare_socket(&self, socket: SocketQualified) { - let mut inner = self.0.write().unwrap(); - - match socket { - SocketQualified::Ipv4(socket) => inner.socket = Arc::new(Mutex::new(Some(socket))), - SocketQualified::Ipv6(socket) => inner.socket_v6 = Arc::new(Mutex::new(Some(socket))), - } - } - - fn unprepare(&self) { - let mut inner = self.0.write().unwrap(); - *inner = UdpSinkPadHandlerInner::new(Arc::clone(&inner.settings)) - } - - fn clear_clients(&self, gst_pad: &gst::Pad, clients_to_add: impl Iterator) { - self.0 - .write() - .unwrap() - .clear_clients(gst_pad, clients_to_add); - } - - fn remove_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) { - self.0.write().unwrap().remove_client(gst_pad, addr); - } - - fn add_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) { - self.0.write().unwrap().add_client(gst_pad, addr); - } - - fn clients(&self) -> Vec { - (*self.0.read().unwrap().clients).clone() - } - - fn configure_client( - &self, - settings: &Settings, - socket: &mut Option>, - socket_v6: &mut Option>, - client: &SocketAddr, - ) -> Result<(), gst::ErrorMessage> { - if client.ip().is_multicast() { - match client.ip() { - IpAddr::V4(addr) => { - if let Some(socket) = socket.as_mut() { - if settings.auto_multicast { - socket - .as_ref() - .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to join multicast group: {}", err] - ) - })?; - } - if settings.multicast_loop { - socket.as_ref().set_multicast_loop_v4(true).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast loop: {}", err] - ) - })?; - } - socket - .as_ref() - .set_multicast_ttl_v4(settings.ttl_mc) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast ttl: {}", err] - ) - })?; - } - } - IpAddr::V6(addr) => { - if let Some(socket) = socket_v6.as_mut() { - if settings.auto_multicast { - socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to join multicast group: {}", err] - ) - })?; - } - if settings.multicast_loop { - socket.as_ref().set_multicast_loop_v6(true).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast loop: {}", err] - ) - })?; - } - /* FIXME no API for set_multicast_ttl_v6 ? */ - } - } - } - } else { - match client.ip() { - IpAddr::V4(_) => { - if let Some(socket) = socket.as_mut() { - socket.as_ref().set_ttl(settings.ttl).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set unicast ttl: {}", err] - ) - })?; - } - } - IpAddr::V6(_) => { - if let Some(socket) = socket_v6.as_mut() { - socket.as_ref().set_ttl(settings.ttl).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set unicast ttl: {}", err] - ) - })?; - } - } - } - } - - Ok(()) - } - - fn unconfigure_client( - &self, - settings: &Settings, - socket: &mut Option>, - socket_v6: &mut Option>, - client: &SocketAddr, - ) -> Result<(), gst::ErrorMessage> { - if client.ip().is_multicast() { - match client.ip() { - IpAddr::V4(addr) => { - if let Some(socket) = socket.as_mut() { - if settings.auto_multicast { - socket - .as_ref() - .leave_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to join multicast group: {}", err] - ) - })?; - } - } - } - IpAddr::V6(addr) => { - if let Some(socket) = socket_v6.as_mut() { - if settings.auto_multicast { - socket - .as_ref() - .leave_multicast_v6(&addr, 0) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to join multicast group: {}", err] - ) - })?; - } - } - } - } - } - - Ok(()) - } - - async fn render( - &self, - element: &super::UdpSink, - buffer: gst::Buffer, - ) -> Result { - let ( - do_sync, - rtime, - clients, - clients_to_configure, - clients_to_unconfigure, - socket, - socket_v6, - settings, - ) = { - let mut inner = self.0.write().unwrap(); - let do_sync = inner.sync; - let mut rtime = gst::ClockTime::NONE; - - if let Some(segment) = &inner.segment { - rtime = segment - .downcast_ref::() - .and_then(|segment| { - segment.to_running_time(buffer.pts()).opt_add(inner.latency) - }); - } - - let clients_to_configure = mem::take(&mut inner.clients_to_configure); - let clients_to_unconfigure = mem::take(&mut inner.clients_to_unconfigure); - - let settings = inner.settings.lock().unwrap().clone(); - - ( - do_sync, - rtime, - Arc::clone(&inner.clients), - clients_to_configure, - clients_to_unconfigure, - Arc::clone(&inner.socket), - Arc::clone(&inner.socket_v6), - settings, - ) - }; - - let mut socket = socket.lock().await; - let mut socket_v6 = socket_v6.lock().await; - - if !clients_to_configure.is_empty() { - for client in &clients_to_configure { - self.configure_client(&settings, &mut socket, &mut socket_v6, client) - .map_err(|err| { - element_error!( - element, - gst::StreamError::Failed, - ["Failed to configure client {:?}: {}", client, err] - ); - - gst::FlowError::Error - })?; - } - } - - if !clients_to_unconfigure.is_empty() { - for client in &clients_to_unconfigure { - self.unconfigure_client(&settings, &mut socket, &mut socket_v6, client) - .map_err(|err| { - element_error!( - element, - gst::StreamError::Failed, - ["Failed to unconfigure client {:?}: {}", client, err] - ); - - gst::FlowError::Error - })?; - } - } - - if do_sync { - self.sync(element, rtime).await; - } - - let data = buffer.map_readable().map_err(|_| { - element_error!( - element, - gst::StreamError::Format, - ["Failed to map buffer readable"] - ); - - gst::FlowError::Error - })?; - - for client in clients.iter() { - let socket = match client.ip() { - IpAddr::V4(_) => &mut socket, - IpAddr::V6(_) => &mut socket_v6, - }; - - if let Some(socket) = socket.as_mut() { - gst::log!(CAT, obj: element, "Sending to {:?}", &client); - socket.send_to(&data, *client).await.map_err(|err| { - element_error!( - element, - gst::StreamError::Failed, - ("I/O error"), - ["streaming stopped, I/O error {}", err] - ); - gst::FlowError::Error - })?; - } else { - element_error!( - element, - gst::StreamError::Failed, - ("I/O error"), - ["No socket available for sending to {}", client] - ); - return Err(gst::FlowError::Error); - } - } - - gst::log!( - CAT, - obj: element, - "Sent buffer {:?} to all clients", - &buffer - ); - - Ok(gst::FlowSuccess::Ok) - } - - /* Wait until specified time */ - async fn sync( - &self, - element: &super::UdpSink, - running_time: impl Into>, - ) { - let now = element.current_running_time(); - - match running_time.into().opt_checked_sub(now) { - Ok(Some(delay)) => runtime::time::delay_for(delay.into()).await, - _ => runtime::executor::yield_now().await, - } - } - - async fn handle_event(&self, element: &super::UdpSink, event: gst::Event) { - match event.view() { - EventView::Eos(_) => { - let _ = element.post_message(gst::message::Eos::builder().src(element).build()); - } - EventView::Segment(e) => { - self.0.write().unwrap().segment = Some(e.segment().clone()); - } - EventView::SinkMessage(e) => { - let _ = element.post_message(e.message()); - } - _ => (), - } - } -} +struct UdpSinkPadHandler; impl PadSinkHandler for UdpSinkPadHandler { type ElementImpl = UdpSink; @@ -557,20 +130,19 @@ impl PadSinkHandler for UdpSinkPadHandler { fn sink_chain( &self, _pad: &PadSinkRef, - _udpsink: &UdpSink, + udpsink: &UdpSink, element: &gst::Element, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let sender = Arc::clone(&self.0.read().unwrap().sender); + let sender = udpsink.item_sender.clone(); let element = element.clone().downcast::().unwrap(); async move { - if let Some(sender) = sender.lock().await.as_mut() { - if sender.send(TaskItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); - return Err(gst::FlowError::Flushing); - } + if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { + gst::debug!(CAT, obj: &element, "Flushing"); + return Err(gst::FlowError::Flushing); } + Ok(gst::FlowSuccess::Ok) } .boxed() @@ -579,20 +151,18 @@ impl PadSinkHandler for UdpSinkPadHandler { fn sink_chain_list( &self, _pad: &PadSinkRef, - _udpsink: &UdpSink, + udpsink: &UdpSink, element: &gst::Element, list: gst::BufferList, ) -> BoxFuture<'static, Result> { - let sender = Arc::clone(&self.0.read().unwrap().sender); + let sender = udpsink.item_sender.clone(); let element = element.clone().downcast::().unwrap(); async move { - if let Some(sender) = sender.lock().await.as_mut() { - for buffer in list.iter_owned() { - if sender.send(TaskItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); - return Err(gst::FlowError::Flushing); - } + for buffer in list.iter_owned() { + if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { + gst::debug!(CAT, obj: &element, "Flushing"); + return Err(gst::FlowError::Flushing); } } @@ -604,21 +174,19 @@ impl PadSinkHandler for UdpSinkPadHandler { fn sink_event_serialized( &self, _pad: &PadSinkRef, - _udpsink: &UdpSink, + udpsink: &UdpSink, element: &gst::Element, event: gst::Event, ) -> BoxFuture<'static, bool> { - let sender = Arc::clone(&self.0.read().unwrap().sender); + let sender = udpsink.item_sender.clone(); let element = element.clone().downcast::().unwrap(); async move { if let EventView::FlushStop(_) = event.view() { let udpsink = element.imp(); return udpsink.task.flush_stop().is_ok(); - } else if let Some(sender) = sender.lock().await.as_mut() { - if sender.send(TaskItem::Event(event)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); - } + } else if sender.send_async(TaskItem::Event(event)).await.is_err() { + gst::debug!(CAT, obj: &element, "Flushing"); } true @@ -641,123 +209,90 @@ impl PadSinkHandler for UdpSinkPadHandler { } } +#[derive(Debug)] +enum Command { + AddClient(SocketAddr), + RemoveClient(SocketAddr), + ReplaceWithClients(BTreeSet), + SetLatency(Option), + SetSync(bool), +} + #[derive(Debug)] struct UdpSinkTask { element: super::UdpSink, - sink_pad_handler: UdpSinkPadHandler, - receiver: Option>, + item_receiver: flume::Receiver, + cmd_receiver: flume::Receiver, + clients: BTreeSet, + socket: Option>, + socket_v6: Option>, + sync: bool, + latency: Option, + segment: Option, } impl UdpSinkTask { - fn new(element: &super::UdpSink, sink_pad_handler: &UdpSinkPadHandler) -> Self { + fn new( + element: &super::UdpSink, + item_receiver: flume::Receiver, + cmd_receiver: flume::Receiver, + ) -> Self { UdpSinkTask { element: element.clone(), - sink_pad_handler: sink_pad_handler.clone(), - receiver: None, + item_receiver, + cmd_receiver, + clients: Default::default(), + socket: None, + socket_v6: None, + sync: false, + latency: None, + segment: None, + } + } + + fn process_command(&mut self, cmd: Command) { + use Command::*; + match cmd { + AddClient(client) => self.add_client(client), + RemoveClient(client) => self.remove_client(&client), + ReplaceWithClients(clients) => self.replace_with_clients(clients), + SetSync(sync) => self.sync = sync, + SetLatency(latency) => self.latency = latency, } } } -impl TaskImpl for UdpSinkTask { - fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async move { - gst::log!(CAT, obj: &self.element, "Starting task"); - - let (sender, receiver) = mpsc::channel(0); - - let mut sink_pad_handler = self.sink_pad_handler.0.write().unwrap(); - sink_pad_handler.sender = Arc::new(Mutex::new(Some(sender))); - - self.receiver = Some(receiver); - - gst::log!(CAT, obj: &self.element, "Task started"); - Ok(()) - } - .boxed() - } - - fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { - async move { - match self.receiver.as_mut().unwrap().next().await { - Some(TaskItem::Buffer(buffer)) => { - match self.sink_pad_handler.render(&self.element, buffer).await { - Err(err) => { - element_error!( - &self.element, - gst::StreamError::Failed, - ["Failed to render item, stopping task: {}", err] - ); - - Err(gst::FlowError::Error) - } - _ => Ok(()), - } - } - Some(TaskItem::Event(event)) => { - self.sink_pad_handler - .handle_event(&self.element, event) - .await; - Ok(()) - } - None => Err(gst::FlowError::Flushing), - } - } - .boxed() - } -} - -#[derive(Debug)] -enum SocketFamily { - Ipv4, - Ipv6, -} - -#[derive(Debug)] -pub struct UdpSink { - sink_pad: PadSink, - sink_pad_handler: UdpSinkPadHandler, - task: Task, - settings: Arc>, -} - -impl UdpSink { +/// Socket configuration. +impl UdpSinkTask { fn prepare_socket( &self, + settings: &mut Settings, family: SocketFamily, - context: &Context, - element: &super::UdpSink, - ) -> Result<(), gst::ErrorMessage> { - let mut settings = self.settings.lock().unwrap(); - + ) -> Result>, gst::ErrorMessage> { let wrapped_socket = match family { SocketFamily::Ipv4 => &settings.socket, SocketFamily::Ipv6 => &settings.socket_v6, }; - let socket_qualified: SocketQualified; - if let Some(ref wrapped_socket) = wrapped_socket { let socket: UdpSocket = wrapped_socket.get(); - - let socket = context.enter(|| { - Async::::try_from(socket).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to setup Async socket: {}", err] - ) - }) + let socket = Async::::try_from(socket).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup Async socket: {}", err] + ) })?; match family { SocketFamily::Ipv4 => { settings.used_socket = Some(wrapped_socket.clone()); - socket_qualified = SocketQualified::Ipv4(socket); } SocketFamily::Ipv6 => { settings.used_socket_v6 = Some(wrapped_socket.clone()); - socket_qualified = SocketQualified::Ipv6(socket); } } + + Ok(Some(socket)) } else { let bind_addr = match family { SocketFamily::Ipv4 => &settings.bind_address, @@ -777,7 +312,7 @@ impl UdpSink { }; let saddr = SocketAddr::new(bind_addr, bind_port as u16); - gst::debug!(CAT, obj: element, "Binding to {:?}", saddr); + gst::debug!(CAT, obj: &self.element, "Binding to {:?}", saddr); let socket = match family { SocketFamily::Ipv4 => socket2::Socket::new( @@ -797,7 +332,7 @@ impl UdpSink { Err(err) => { gst::warning!( CAT, - obj: element, + obj: &self.element, "Failed to create {} socket: {}", match family { SocketFamily::Ipv4 => "IPv4", @@ -805,7 +340,7 @@ impl UdpSink { }, err ); - return Ok(()); + return Ok(None); } }; @@ -816,13 +351,11 @@ impl UdpSink { ) })?; - let socket = context.enter(|| { - Async::::try_from(socket).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to setup Async socket: {}", err] - ) - }) + let socket = Async::::try_from(socket).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup Async socket: {}", err] + ) })?; let wrapper = wrap_socket(&socket)?; @@ -839,20 +372,413 @@ impl UdpSink { match family { SocketFamily::Ipv4 => { settings.used_socket = Some(wrapper); - socket_qualified = SocketQualified::Ipv4(socket) } SocketFamily::Ipv6 => { settings.used_socket_v6 = Some(wrapper); - socket_qualified = SocketQualified::Ipv6(socket) + } + } + + Ok(Some(socket)) + } + } + + fn add_client(&mut self, addr: SocketAddr) { + if self.clients.contains(&addr) { + gst::warning!(CAT, obj: &self.element, "Not adding client {:?} again", &addr); + return; + } + + let udpsink = self.element.imp(); + let mut settings = udpsink.settings.lock().unwrap(); + match self.configure_client(&settings, &addr) { + Ok(()) => { + gst::info!(CAT, obj: &self.element, "Added client {:?}", addr); + self.clients.insert(addr); + } + Err(err) => { + gst::error!(CAT, obj: &self.element, "Failed to add client {:?}: {}", addr, err); + settings.clients = self.clients.clone(); + self.element.post_error_message(err); + } + } + } + + fn remove_client(&mut self, addr: &SocketAddr) { + if self.clients.take(addr).is_none() { + gst::warning!(CAT, obj: &self.element, "Not removing unknown client {:?}", &addr); + return; + } + + let udpsink = self.element.imp(); + let mut settings = udpsink.settings.lock().unwrap(); + match self.unconfigure_client(&settings, addr) { + Ok(()) => { + gst::info!(CAT, obj: &self.element, "Removed client {:?}", addr); + } + Err(err) => { + gst::error!(CAT, obj: &self.element, "Failed to remove client {:?}: {}", addr, err); + settings.clients = self.clients.clone(); + self.element.post_error_message(err); + } + } + } + + fn replace_with_clients(&mut self, mut clients_to_add: BTreeSet) { + if clients_to_add.is_empty() { + gst::info!(CAT, obj: &self.element, "Clearing clients"); + } else { + gst::info!(CAT, obj: &self.element, "Replacing clients"); + } + + let old_clients = std::mem::take(&mut self.clients); + + let mut res = Ok(()); + let udpsink = self.element.imp(); + let mut settings = udpsink.settings.lock().unwrap(); + + for addr in old_clients.iter() { + if clients_to_add.take(addr).is_some() { + // client is already configured + self.clients.insert(*addr); + } else if let Err(err) = self.unconfigure_client(&settings, addr) { + gst::error!(CAT, obj: &self.element, "Failed to remove client {:?}: {}", addr, err); + res = Err(err); + } + } + + for addr in clients_to_add.into_iter() { + if let Err(err) = self.configure_client(&settings, &addr) { + gst::error!(CAT, obj: &self.element, "Failed to add client {:?}: {}", addr, err); + res = Err(err); + } else { + self.clients.insert(addr); + } + } + + // FIXME: which error handling: + // - If at least one client could be configured, should we keep going? (current) + // - or, should we consider the preparation failed when the first client + // configuration fails? (previously) + if let Err(err) = res { + settings.clients = self.clients.clone(); + self.element.post_error_message(err); + } + } + + fn configure_client( + &self, + settings: &Settings, + client: &SocketAddr, + ) -> Result<(), gst::ErrorMessage> { + if client.ip().is_multicast() { + match client.ip() { + IpAddr::V4(addr) => { + if let Some(socket) = self.socket.as_ref() { + if settings.auto_multicast { + socket + .as_ref() + .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + [ + "Failed to join multicast group for {:?}: {}", + client, + err + ] + ) + })?; + } + if settings.multicast_loop { + socket.as_ref().set_multicast_loop_v4(true).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop for {:?}: {}", client, err] + ) + })?; + } + + socket + .as_ref() + .set_multicast_ttl_v4(settings.ttl_mc) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast ttl for {:?}: {}", client, err] + ) + })?; + } + } + IpAddr::V6(addr) => { + if let Some(socket) = self.socket_v6.as_ref() { + if settings.auto_multicast { + socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to join multicast group for {:?}: {}", client, err] + ) + })?; + } + if settings.multicast_loop { + socket.as_ref().set_multicast_loop_v6(true).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop for {:?}: {}", client, err] + ) + })?; + } + /* FIXME no API for set_multicast_ttl_v6 ? */ + } + } + } + } else { + match client.ip() { + IpAddr::V4(_) => { + if let Some(socket) = self.socket.as_ref() { + socket.as_ref().set_ttl(settings.ttl).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl for {:?}: {}", client, err] + ) + })?; + } + } + IpAddr::V6(_) => { + if let Some(socket) = self.socket_v6.as_ref() { + socket.as_ref().set_ttl(settings.ttl).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl for {:?}: {}", client, err] + ) + })?; + } } } } - self.sink_pad_handler.prepare_socket(socket_qualified); + Ok(()) + } + + fn unconfigure_client( + &self, + settings: &Settings, + client: &SocketAddr, + ) -> Result<(), gst::ErrorMessage> { + if client.ip().is_multicast() { + match client.ip() { + IpAddr::V4(addr) => { + if let Some(socket) = self.socket.as_ref() { + if settings.auto_multicast { + socket + .as_ref() + .leave_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + [ + "Failed to leave multicast group for {:?}: {}", + client, + err + ] + ) + })?; + } + } + } + IpAddr::V6(addr) => { + if let Some(socket) = self.socket_v6.as_ref() { + if settings.auto_multicast { + socket + .as_ref() + .leave_multicast_v6(&addr, 0) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + [ + "Failed to leave multicast group for {:?}: {}", + client, + err + ] + ) + })?; + } + } + } + } + } + + Ok(()) + } +} + +/// Buffer handling. +impl UdpSinkTask { + async fn render(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> { + if self.sync { + let rtime = self.segment.as_ref().and_then(|segment| { + segment + .downcast_ref::() + .and_then(|segment| segment.to_running_time(buffer.pts()).opt_add(self.latency)) + }); + if let Some(rtime) = rtime { + self.sync(rtime).await; + } + } + + let data = buffer.map_readable().map_err(|_| { + element_error!( + self.element, + gst::StreamError::Format, + ["Failed to map buffer readable"] + ); + gst::FlowError::Error + })?; + + for client in self.clients.iter() { + let socket = match client.ip() { + IpAddr::V4(_) => &mut self.socket, + IpAddr::V6(_) => &mut self.socket_v6, + }; + + if let Some(socket) = socket.as_mut() { + gst::log!(CAT, obj: &self.element, "Sending to {:?}", &client); + socket.send_to(&data, *client).await.map_err(|err| { + element_error!( + self.element, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); + gst::FlowError::Error + })?; + } else { + element_error!( + self.element, + gst::StreamError::Failed, + ("I/O error"), + ["No socket available for sending to {}", client] + ); + return Err(gst::FlowError::Error); + } + } + + gst::log!( + CAT, + obj: &self.element, + "Sent buffer {:?} to all clients", + &buffer + ); Ok(()) } + /// Waits until specified time. + async fn sync(&self, running_time: gst::ClockTime) { + let now = self.element.current_running_time(); + + match running_time.opt_checked_sub(now) { + Ok(Some(delay)) => { + runtime::time::delay_for(delay.into()).await; + } + _ => runtime::executor::yield_now().await, + } + } +} + +impl TaskImpl for UdpSinkTask { + fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { + async move { + gst::info!(CAT, obj: &self.element, "Preparing Task"); + assert!(self.clients.is_empty()); + { + let udpsink = self.element.imp(); + let mut settings = udpsink.settings.lock().unwrap(); + self.sync = settings.sync; + self.socket = self.prepare_socket(&mut settings, SocketFamily::Ipv4)?; + self.socket_v6 = self.prepare_socket(&mut settings, SocketFamily::Ipv6)?; + } + + while let Ok(cmd) = self.cmd_receiver.try_recv() { + self.process_command(cmd); + } + + Ok(()) + } + .boxed() + } + + fn unprepare(&mut self) -> BoxFuture<'_, ()> { + async move { + gst::info!(CAT, obj: &self.element, "Unpreparing Task"); + + let udpsink = self.element.imp(); + let settings = udpsink.settings.lock().unwrap(); + for addr in self.clients.iter() { + let _ = self.unconfigure_client(&settings, addr); + } + } + .boxed() + } + + fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { + async move { + let item = futures::select_biased! { + cmd = self.cmd_receiver.recv_async() => { + self.process_command(cmd.unwrap()); + return Ok(()); + } + item = self.item_receiver.recv_async() => item, + }; + + match item.map_err(|_| gst::FlowError::Flushing)? { + TaskItem::Buffer(buffer) => self.render(buffer).await.map_err(|err| { + element_error!( + &self.element, + gst::StreamError::Failed, + ["Failed to render item, stopping task: {}", err] + ); + gst::FlowError::Error + })?, + TaskItem::Event(event) => match event.view() { + EventView::Eos(_) => { + let _ = self + .element + .post_message(gst::message::Eos::builder().src(&self.element).build()); + } + EventView::Segment(e) => { + self.segment = Some(e.segment().clone()); + } + EventView::SinkMessage(e) => { + let _ = self.element.post_message(e.message()); + } + _ => (), + }, + } + + Ok(()) + } + .boxed() + } +} + +#[derive(Debug)] +enum SocketFamily { + Ipv4, + Ipv6, +} + +#[derive(Debug)] +pub struct UdpSink { + sink_pad: PadSink, + task: Task, + item_sender: flume::Sender, + item_receiver: flume::Receiver, + cmd_sender: flume::Sender, + cmd_receiver: flume::Receiver, + settings: Mutex, +} + +impl UdpSink { fn prepare(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Preparing"); @@ -867,30 +793,26 @@ impl UdpSink { })? }; - self.sink_pad_handler.prepare(); - self.prepare_socket(SocketFamily::Ipv4, &context, element)?; - self.prepare_socket(SocketFamily::Ipv6, &context, element)?; + let task_impl = UdpSinkTask::new( + element, + self.item_receiver.clone(), + self.cmd_receiver.clone(), + ); + self.task.prepare(task_impl, context).map_err(|err| { + error_msg!( + gst::ResourceError::OpenRead, + ["Error preparing Task: {:?}", err] + ) + })?; - self.task - .prepare(UdpSinkTask::new(element, &self.sink_pad_handler), context) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenRead, - ["Error preparing Task: {:?}", err] - ) - })?; - - gst::debug!(CAT, obj: element, "Started preparing"); + gst::debug!(CAT, obj: element, "Started preparation"); Ok(()) } fn unprepare(&self, element: &super::UdpSink) { gst::debug!(CAT, obj: element, "Unpreparing"); - self.task.unprepare().unwrap(); - self.sink_pad_handler.unprepare(); - gst::debug!(CAT, obj: element, "Unprepared"); } @@ -910,19 +832,26 @@ impl UdpSink { } impl UdpSink { - fn clear_clients(&self, clients_to_add: impl Iterator) { - self.sink_pad_handler - .clear_clients(self.sink_pad.gst_pad(), clients_to_add); + fn add_client(&self, settings: &mut Settings, client: SocketAddr) { + settings.clients.insert(client); + self.cmd_sender.send(Command::AddClient(client)).unwrap(); } - fn remove_client(&self, addr: SocketAddr) { - self.sink_pad_handler - .remove_client(self.sink_pad.gst_pad(), addr); + fn remove_client(&self, settings: &mut Settings, client: SocketAddr) { + settings.clients.remove(&client); + self.cmd_sender.send(Command::RemoveClient(client)).unwrap(); } - fn add_client(&self, addr: SocketAddr) { - self.sink_pad_handler - .add_client(self.sink_pad.gst_pad(), addr); + fn replace_with_clients( + &self, + settings: &mut Settings, + clients: impl IntoIterator, + ) { + let clients = BTreeSet::::from_iter(clients); + settings.clients = clients.clone(); + self.cmd_sender + .send(Command::ReplaceWithClients(clients)) + .unwrap(); } } @@ -953,17 +882,21 @@ impl ObjectSubclass for UdpSink { type ParentType = gst::Element; fn with_class(klass: &Self::Class) -> Self { - let settings = Arc::new(StdMutex::new(Settings::default())); - let sink_pad_handler = UdpSinkPadHandler::new(Arc::clone(&settings)); + // Enable backpressure for items + let (item_sender, item_receiver) = flume::bounded(0); + let (cmd_sender, cmd_receiver) = flume::unbounded(); Self { sink_pad: PadSink::new( gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")), - sink_pad_handler.clone(), + UdpSinkPadHandler, ), - sink_pad_handler, task: Task::default(), - settings, + item_sender, + item_receiver, + cmd_sender, + cmd_receiver, + settings: Default::default(), } } } @@ -1125,7 +1058,8 @@ impl ObjectImpl for UdpSink { if let Ok(addr) = try_into_socket_addr(&element, &host, port) { let udpsink = element.imp(); - udpsink.add_client(addr); + let mut settings = udpsink.settings.lock().unwrap(); + udpsink.add_client(&mut settings, addr); } None @@ -1144,7 +1078,8 @@ impl ObjectImpl for UdpSink { if let Ok(addr) = try_into_socket_addr(&element, &host, port) { let udpsink = element.imp(); - udpsink.remove_client(addr); + let mut settings = udpsink.settings.lock().unwrap(); + udpsink.remove_client(&mut settings, addr); } None @@ -1156,7 +1091,8 @@ impl ObjectImpl for UdpSink { let element = args[0].get::().expect("signal arg"); let udpsink = element.imp(); - udpsink.clear_clients(std::iter::empty()); + let mut settings = udpsink.settings.lock().unwrap(); + udpsink.replace_with_clients(&mut settings, BTreeSet::new()); None }) @@ -1177,7 +1113,9 @@ impl ObjectImpl for UdpSink { let mut settings = self.settings.lock().unwrap(); match pspec.name() { "sync" => { - settings.sync = value.get().expect("type checked upstream"); + let sync = value.get().expect("type checked upstream"); + settings.sync = sync; + self.cmd_sender.send(Command::SetSync(sync)).unwrap(); } "bind-address" => { settings.bind_address = value @@ -1236,7 +1174,7 @@ impl ObjectImpl for UdpSink { .expect("type checked upstream") .unwrap_or_else(|| "".into()); - let clients_iter = clients.split(',').filter_map(|client| { + let clients = clients.split(',').filter_map(|client| { let rsplit: Vec<&str> = client.rsplitn(2, ':').collect(); if rsplit.len() == 2 { @@ -1251,9 +1189,8 @@ impl ObjectImpl for UdpSink { None } }); - drop(settings); - self.clear_clients(clients_iter); + self.replace_with_clients(&mut settings, clients); } "context" => { settings.context = value @@ -1304,14 +1241,9 @@ impl ObjectImpl for UdpSink { "ttl-mc" => settings.ttl_mc.to_value(), "qos-dscp" => settings.qos_dscp.to_value(), "clients" => { + let clients = settings.clients.clone(); drop(settings); - - let clients: Vec = self - .sink_pad_handler - .clients() - .iter() - .map(ToString::to_string) - .collect(); + let clients: Vec = clients.iter().map(ToString::to_string).collect(); clients.join(",").to_value() } @@ -1396,7 +1328,8 @@ impl ElementImpl for UdpSink { fn send_event(&self, _element: &Self::Type, event: gst::Event) -> bool { match event.view() { EventView::Latency(ev) => { - self.sink_pad_handler.set_latency(ev.latency()); + let latency = Some(ev.latency()); + self.cmd_sender.send(Command::SetLatency(latency)).unwrap(); self.sink_pad.gst_pad().push_event(event) } EventView::Step(..) => false,