diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index cfb0763f..77053235 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -19,7 +19,6 @@ use futures::future::BoxFuture; use futures::prelude::*; -use futures::stream::Peekable; use gst::glib; use gst::prelude::*; @@ -29,15 +28,14 @@ use gst::{element_error, error_msg}; use once_cell::sync::Lazy; +use crate::runtime::executor::block_on_or_add_sub_task; use crate::runtime::prelude::*; -use crate::runtime::{self, Async, Context, PadSink, Task}; +use crate::runtime::{self, Async, Context, PadSink}; use crate::socket::{wrap_socket, GioSocketWrapper}; use std::collections::BTreeSet; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; -use std::pin::Pin; -use std::sync::Mutex; -use std::task::Poll; +use std::sync::{Arc, Mutex}; use std::time::Duration; use std::u16; use std::u8; @@ -62,6 +60,25 @@ const DEFAULT_CLIENTS: &str = ""; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; +#[derive(Debug, Clone, Copy)] +struct SocketConf { + auto_multicast: bool, + multicast_loop: bool, + ttl: u32, + ttl_mc: u32, +} + +impl Default for SocketConf { + fn default() -> Self { + SocketConf { + auto_multicast: DEFAULT_AUTO_MULTICAST, + multicast_loop: DEFAULT_LOOP, + ttl: DEFAULT_TTL, + ttl_mc: DEFAULT_TTL_MC, + } + } +} + #[derive(Debug, Clone)] struct Settings { sync: bool, @@ -73,15 +90,10 @@ struct Settings { used_socket: Option, socket_v6: Option, used_socket_v6: Option, - auto_multicast: bool, - multicast_loop: bool, - ttl: u32, - ttl_mc: u32, + socket_conf: SocketConf, qos_dscp: i32, context: String, context_wait: Duration, - clients: BTreeSet, - latency: Option, } impl Default for Settings { @@ -96,18 +108,10 @@ impl Default for Settings { used_socket: DEFAULT_USED_SOCKET, socket_v6: DEFAULT_SOCKET_V6, used_socket_v6: DEFAULT_USED_SOCKET_V6, - auto_multicast: DEFAULT_AUTO_MULTICAST, - multicast_loop: DEFAULT_LOOP, - ttl: DEFAULT_TTL, - ttl_mc: DEFAULT_TTL_MC, + socket_conf: SocketConf::default(), qos_dscp: DEFAULT_QOS_DSCP, context: DEFAULT_CONTEXT.into(), context_wait: DEFAULT_CONTEXT_WAIT, - clients: BTreeSet::from([SocketAddr::new( - DEFAULT_HOST.unwrap().parse().unwrap(), - DEFAULT_PORT as u16, - )]), - latency: None, } } } @@ -120,14 +124,166 @@ static CAT: Lazy = Lazy::new(|| { ) }); -#[derive(Debug)] -enum TaskItem { - Buffer(gst::Buffer), - Event(gst::Event), -} +#[derive(Clone, Debug, Default)] +struct UdpSinkPadHandler(Arc>); -#[derive(Clone, Debug)] -struct UdpSinkPadHandler; +impl UdpSinkPadHandler { + fn prepare( + &self, + _imp: &UdpSink, + socket: Option>, + socket_v6: Option>, + settings: &Settings, + ) -> Result<(), gst::ErrorMessage> { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + + inner.sync = settings.sync; + inner.socket_conf = settings.socket_conf; + inner.socket = socket; + inner.socket_v6 = socket_v6; + + for addr in inner.clients.iter() { + inner.configure_client(addr)?; + } + + Ok(()) + }) + } + + fn unprepare(&self) { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + + for addr in inner.clients.iter() { + let _ = inner.unconfigure_client(addr); + } + + inner.socket = None; + inner.socket_v6 = None; + }) + } + + fn start(&self) { + futures::executor::block_on(async move { + self.0.lock().await.is_flushing = false; + }) + } + + fn stop(&self) { + futures::executor::block_on(async move { + self.0.lock().await.is_flushing = true; + }) + } + + fn set_sync(&self, sync: bool) { + futures::executor::block_on(async move { + self.0.lock().await.sync = sync; + }) + } + + fn set_latency(&self, latency: Option) { + futures::executor::block_on(async move { + self.0.lock().await.latency = latency; + }) + } + + fn set_socket_conf(&self, socket_conf: SocketConf) { + futures::executor::block_on(async move { + self.0.lock().await.socket_conf = socket_conf; + }) + } + + fn clients(&self) -> BTreeSet { + futures::executor::block_on(async move { self.0.lock().await.clients.clone() }) + } + + fn add_client(&self, imp: &UdpSink, addr: SocketAddr) { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + if inner.clients.contains(&addr) { + gst::warning!(CAT, imp: imp, "Not adding client {addr:?} again"); + return; + } + + match inner.configure_client(&addr) { + Ok(()) => { + gst::info!(CAT, imp: imp, "Added client {addr:?}"); + inner.clients.insert(addr); + } + Err(err) => { + gst::error!(CAT, imp: imp, "Failed to add client {addr:?}: {err}"); + imp.obj().post_error_message(err); + } + } + }) + } + + fn remove_client(&self, imp: &UdpSink, addr: SocketAddr) { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + if inner.clients.take(&addr).is_none() { + gst::warning!(CAT, imp: imp, "Not removing unknown client {addr:?}"); + return; + } + + match inner.unconfigure_client(&addr) { + Ok(()) => { + gst::info!(CAT, imp: imp, "Removed client {addr:?}"); + } + Err(err) => { + gst::error!(CAT, imp: imp, "Failed to remove client {addr:?}: {err}"); + imp.obj().post_error_message(err); + } + } + }) + } + + fn replace_clients(&self, imp: &UdpSink, mut new_clients: BTreeSet) { + futures::executor::block_on(async move { + let mut inner = self.0.lock().await; + if new_clients.is_empty() { + gst::info!(CAT, imp: imp, "Clearing clients"); + } else { + gst::info!(CAT, imp: imp, "Replacing clients"); + } + + let old_clients = std::mem::take(&mut inner.clients); + + let mut res = Ok(()); + + for addr in old_clients.iter() { + if new_clients.take(addr).is_some() { + // client is already configured + inner.clients.insert(*addr); + } else if let Err(err) = inner.unconfigure_client(addr) { + gst::error!(CAT, imp: imp, "Failed to remove client {addr:?}: {err}"); + res = Err(err); + } else { + gst::info!(CAT, imp: imp, "Removed client {addr:?}"); + } + } + + for addr in new_clients.into_iter() { + if let Err(err) = inner.configure_client(&addr) { + gst::error!(CAT, imp: imp, "Failed to add client {addr:?}: {err}"); + res = Err(err); + } else { + gst::info!(CAT, imp: imp, "Added client {addr:?}"); + inner.clients.insert(addr); + } + } + + // FIXME: which error handling: + // - If at least one client could be configured, should we keep going? (current) + // - or, should we consider the preparation failed when the first client + // configuration fails? (previously) + if let Err(err) = res { + imp.obj().post_error_message(err); + } + }) + } +} impl PadSinkHandler for UdpSinkPadHandler { type ElementImpl = UdpSink; @@ -138,16 +294,7 @@ impl PadSinkHandler for UdpSinkPadHandler { elem: super::UdpSink, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let sender = elem.imp().clone_item_sender(); - async move { - if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: elem, "Flushing"); - return Err(gst::FlowError::Flushing); - } - - Ok(gst::FlowSuccess::Ok) - } - .boxed() + async move { self.0.lock().await.handle_buffer(&elem, buffer).await }.boxed() } fn sink_chain_list( @@ -156,13 +303,10 @@ impl PadSinkHandler for UdpSinkPadHandler { elem: super::UdpSink, list: gst::BufferList, ) -> BoxFuture<'static, Result> { - let sender = elem.imp().clone_item_sender(); async move { + let mut inner = self.0.lock().await; for buffer in list.iter_owned() { - if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: elem, "Flushing"); - return Err(gst::FlowError::Flushing); - } + inner.handle_buffer(&elem, buffer).await?; } Ok(gst::FlowSuccess::Ok) @@ -176,13 +320,23 @@ impl PadSinkHandler for UdpSinkPadHandler { elem: super::UdpSink, event: gst::Event, ) -> BoxFuture<'static, bool> { - let sender = elem.imp().clone_item_sender(); async move { - if let EventView::FlushStop(_) = event.view() { - let imp = elem.imp(); - return imp.task.flush_stop().await_maybe_on_context().is_ok(); - } else if sender.send_async(TaskItem::Event(event)).await.is_err() { - gst::debug!(CAT, obj: elem, "Flushing"); + gst::debug!(CAT, obj: elem, "Handling {event:?}"); + + match event.view() { + EventView::Eos(_) => { + let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build()); + } + EventView::Segment(e) => { + self.0.lock().await.segment = Some(e.segment().clone()); + } + EventView::FlushStop(_) => { + self.0.lock().await.is_flushing = false; + } + EventView::SinkMessage(e) => { + let _ = elem.post_message(e.message()); + } + _ => (), } true @@ -191,8 +345,12 @@ impl PadSinkHandler for UdpSinkPadHandler { } fn sink_event(self, _pad: &gst::Pad, imp: &UdpSink, event: gst::Event) -> bool { + gst::debug!(CAT, imp: imp, "Handling {event:?}"); + if let EventView::FlushStart(..) = event.view() { - return imp.task.flush_start().await_maybe_on_context().is_ok(); + block_on_or_add_sub_task(async move { + self.0.lock().await.is_flushing = true; + }); } true @@ -200,66 +358,300 @@ impl PadSinkHandler for UdpSinkPadHandler { } #[derive(Debug)] -enum Command { - AddClient(SocketAddr), - RemoveClient(SocketAddr), - ReplaceWithClients(BTreeSet), - SetLatency(Option), - SetSync(bool), -} - -struct UdpSinkTask { - element: super::UdpSink, - item_receiver: Peekable>, - cmd_receiver: flume::Receiver, - clients: BTreeSet, - socket: Option>, - socket_v6: Option>, +struct UdpSinkPadHandlerInner { + is_flushing: bool, sync: bool, latency: Option, + socket: Option>, + socket_v6: Option>, + clients: BTreeSet, + socket_conf: SocketConf, segment: Option, } -impl UdpSinkTask { - fn new( - element: &super::UdpSink, - item_receiver: flume::Receiver, - cmd_receiver: flume::Receiver, - ) -> Self { - UdpSinkTask { - element: element.clone(), - item_receiver: item_receiver.into_stream().peekable(), - cmd_receiver, - clients: Default::default(), +impl Default for UdpSinkPadHandlerInner { + fn default() -> Self { + Self { + is_flushing: true, + sync: DEFAULT_SYNC, + latency: None, socket: None, socket_v6: None, - sync: false, - latency: None, + clients: BTreeSet::from([SocketAddr::new( + DEFAULT_HOST.unwrap().parse().unwrap(), + DEFAULT_PORT as u16, + )]), + socket_conf: Default::default(), segment: None, } } - - async fn flush(&mut self) { - // Purge the channel - while let Poll::Ready(Some(_item)) = futures::poll!(self.item_receiver.next()) {} - } - - fn process_command(&mut self, cmd: Command) { - use Command::*; - match cmd { - AddClient(client) => self.add_client(client), - RemoveClient(client) => self.remove_client(&client), - ReplaceWithClients(clients) => self.replace_with_clients(clients), - SetSync(sync) => self.sync = sync, - SetLatency(latency) => self.latency = latency, - } - } } /// Socket configuration. -impl UdpSinkTask { +impl UdpSinkPadHandlerInner { + fn configure_client(&self, client: &SocketAddr) -> Result<(), gst::ErrorMessage> { + if client.ip().is_multicast() { + match client.ip() { + IpAddr::V4(addr) => { + if let Some(socket) = self.socket.as_ref() { + if self.socket_conf.auto_multicast { + socket + .as_ref() + .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + [ + "Failed to join multicast group for {:?}: {}", + client, + err + ] + ) + })?; + } + if self.socket_conf.multicast_loop { + socket.as_ref().set_multicast_loop_v4(true).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop for {:?}: {}", client, err] + ) + })?; + } + + socket + .as_ref() + .set_multicast_ttl_v4(self.socket_conf.ttl_mc) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast ttl for {:?}: {}", client, err] + ) + })?; + } + } + IpAddr::V6(addr) => { + if let Some(socket) = self.socket_v6.as_ref() { + if self.socket_conf.auto_multicast { + socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to join multicast group for {:?}: {}", client, err] + ) + })?; + } + if self.socket_conf.multicast_loop { + socket.as_ref().set_multicast_loop_v6(true).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop for {:?}: {}", client, err] + ) + })?; + } + /* FIXME no API for set_multicast_ttl_v6 ? */ + } + } + } + } else { + match client.ip() { + IpAddr::V4(_) => { + if let Some(socket) = self.socket.as_ref() { + socket + .as_ref() + .set_ttl(self.socket_conf.ttl) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl for {:?}: {}", client, err] + ) + })?; + } + } + IpAddr::V6(_) => { + if let Some(socket) = self.socket_v6.as_ref() { + socket + .as_ref() + .set_ttl(self.socket_conf.ttl) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl for {:?}: {}", client, err] + ) + })?; + } + } + } + } + + Ok(()) + } + + fn unconfigure_client(&self, client: &SocketAddr) -> Result<(), gst::ErrorMessage> { + if client.ip().is_multicast() { + match client.ip() { + IpAddr::V4(addr) => { + if let Some(socket) = self.socket.as_ref() { + if self.socket_conf.auto_multicast { + socket + .as_ref() + .leave_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + [ + "Failed to leave multicast group for {:?}: {}", + client, + err + ] + ) + })?; + } + } + } + IpAddr::V6(addr) => { + if let Some(socket) = self.socket_v6.as_ref() { + if self.socket_conf.auto_multicast { + socket + .as_ref() + .leave_multicast_v6(&addr, 0) + .map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + [ + "Failed to leave multicast group for {:?}: {}", + client, + err + ] + ) + })?; + } + } + } + } + } + + Ok(()) + } +} + +/// Buffer handling. +impl UdpSinkPadHandlerInner { + async fn render( + &mut self, + elem: &super::UdpSink, + buffer: gst::Buffer, + ) -> Result { + let data = buffer.map_readable().map_err(|_| { + gst::element_error!( + elem, + gst::StreamError::Format, + ["Failed to map buffer readable"] + ); + gst::FlowError::Error + })?; + + for client in self.clients.iter() { + let socket = match client.ip() { + IpAddr::V4(_) => &mut self.socket, + IpAddr::V6(_) => &mut self.socket_v6, + }; + + if let Some(socket) = socket.as_mut() { + gst::log!(CAT, obj: elem, "Sending to {client:?}"); + socket.send_to(&data, *client).await.map_err(|err| { + gst::element_error!( + elem, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); + gst::FlowError::Error + })?; + } else { + gst::element_error!( + elem, + gst::StreamError::Failed, + ("I/O error"), + ["No socket available for sending to {}", client] + ); + return Err(gst::FlowError::Error); + } + } + + gst::log!(CAT, obj: elem, "Sent buffer {buffer:?} to all clients"); + + Ok(gst::FlowSuccess::Ok) + } + + /// Waits until specified time. + async fn sync(&self, elem: &super::UdpSink, running_time: gst::ClockTime) { + let now = elem.current_running_time(); + + if let Ok(Some(delay)) = running_time.opt_checked_sub(now) { + gst::trace!(CAT, obj: elem, "sync: waiting {delay}"); + runtime::timer::delay_for(delay.into()).await; + } + } + + async fn handle_buffer( + &mut self, + elem: &super::UdpSink, + buffer: gst::Buffer, + ) -> Result { + if self.is_flushing { + gst::info!(CAT, obj: elem, "Discarding {buffer:?} (flushing)"); + + return Err(gst::FlowError::Flushing); + } + + if self.sync { + let rtime = self.segment.as_ref().and_then(|segment| { + segment + .downcast_ref::() + .and_then(|segment| segment.to_running_time(buffer.pts()).opt_add(self.latency)) + }); + + if let Some(rtime) = rtime { + self.sync(elem, rtime).await; + + if self.is_flushing { + gst::info!(CAT, obj: elem, "Discarding {buffer:?} (flushing)"); + + return Err(gst::FlowError::Flushing); + } + } + } + + gst::debug!(CAT, obj: elem, "Handling {buffer:?}"); + + self.render(elem, buffer).await.map_err(|err| { + element_error!( + elem, + gst::StreamError::Failed, + ["Failed to render item, stopping task: {}", err] + ); + gst::FlowError::Error + }) + } +} + +#[derive(Debug)] +enum SocketFamily { + Ipv4, + Ipv6, +} + +#[derive(Debug)] +pub struct UdpSink { + sink_pad: PadSink, + sink_pad_handler: UdpSinkPadHandler, + settings: Mutex, + ts_ctx: Mutex>, +} + +impl UdpSink { fn prepare_socket( &self, + ts_ctx: &Context, settings: &mut Settings, family: SocketFamily, ) -> Result>, gst::ErrorMessage> { @@ -270,11 +662,13 @@ impl UdpSinkTask { if let Some(ref wrapped_socket) = wrapped_socket { let socket: UdpSocket = wrapped_socket.get(); - let socket = Async::::try_from(socket).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to setup Async socket: {}", err] - ) + let socket = ts_ctx.enter(|| { + Async::::try_from(socket).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup Async socket: {}", err] + ) + }) })?; match family { @@ -306,7 +700,7 @@ impl UdpSinkTask { }; let saddr = SocketAddr::new(bind_addr, bind_port as u16); - gst::debug!(CAT, obj: self.element, "Binding to {:?}", saddr); + gst::debug!(CAT, imp: self, "Binding to {:?}", saddr); let socket = match family { SocketFamily::Ipv4 => socket2::Socket::new( @@ -326,7 +720,7 @@ impl UdpSinkTask { Err(err) => { gst::warning!( CAT, - obj: self.element, + imp: self, "Failed to create {} socket: {}", match family { SocketFamily::Ipv4 => "IPv4", @@ -345,11 +739,13 @@ impl UdpSinkTask { ) })?; - let socket = Async::::try_from(socket).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to setup Async socket: {}", err] - ) + let socket = ts_ctx.enter(|| { + Async::::try_from(socket).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup Async socket: {}", err] + ) + }) })?; let wrapper = wrap_socket(&socket)?; @@ -376,475 +772,24 @@ impl UdpSinkTask { } } - fn add_client(&mut self, addr: SocketAddr) { - if self.clients.contains(&addr) { - gst::warning!(CAT, obj: self.element, "Not adding client {:?} again", &addr); - return; - } - - let udpsink = self.element.imp(); - let mut settings = udpsink.settings.lock().unwrap(); - match self.configure_client(&settings, &addr) { - Ok(()) => { - gst::info!(CAT, obj: self.element, "Added client {:?}", addr); - self.clients.insert(addr); - } - Err(err) => { - gst::error!(CAT, obj: self.element, "Failed to add client {:?}: {}", addr, err); - settings.clients = self.clients.clone(); - self.element.post_error_message(err); - } - } - } - - fn remove_client(&mut self, addr: &SocketAddr) { - if self.clients.take(addr).is_none() { - gst::warning!(CAT, obj: self.element, "Not removing unknown client {:?}", &addr); - return; - } - - let udpsink = self.element.imp(); - let mut settings = udpsink.settings.lock().unwrap(); - match self.unconfigure_client(&settings, addr) { - Ok(()) => { - gst::info!(CAT, obj: self.element, "Removed client {:?}", addr); - } - Err(err) => { - gst::error!(CAT, obj: self.element, "Failed to remove client {:?}: {}", addr, err); - settings.clients = self.clients.clone(); - self.element.post_error_message(err); - } - } - } - - fn replace_with_clients(&mut self, mut clients_to_add: BTreeSet) { - if clients_to_add.is_empty() { - gst::info!(CAT, obj: self.element, "Clearing clients"); - } else { - gst::info!(CAT, obj: self.element, "Replacing clients"); - } - - let old_clients = std::mem::take(&mut self.clients); - - let mut res = Ok(()); - let udpsink = self.element.imp(); - let mut settings = udpsink.settings.lock().unwrap(); - - for addr in old_clients.iter() { - if clients_to_add.take(addr).is_some() { - // client is already configured - self.clients.insert(*addr); - } else if let Err(err) = self.unconfigure_client(&settings, addr) { - gst::error!(CAT, obj: self.element, "Failed to remove client {:?}: {}", addr, err); - res = Err(err); - } else { - gst::info!(CAT, obj: self.element, "Removed client {:?}", addr); - } - } - - for addr in clients_to_add.into_iter() { - if let Err(err) = self.configure_client(&settings, &addr) { - gst::error!(CAT, obj: self.element, "Failed to add client {:?}: {}", addr, err); - res = Err(err); - } else { - gst::info!(CAT, obj: self.element, "Added client {:?}", addr); - self.clients.insert(addr); - } - } - - // FIXME: which error handling: - // - If at least one client could be configured, should we keep going? (current) - // - or, should we consider the preparation failed when the first client - // configuration fails? (previously) - if let Err(err) = res { - settings.clients = self.clients.clone(); - self.element.post_error_message(err); - } - } - - fn configure_client( - &self, - settings: &Settings, - client: &SocketAddr, - ) -> Result<(), gst::ErrorMessage> { - if client.ip().is_multicast() { - match client.ip() { - IpAddr::V4(addr) => { - if let Some(socket) = self.socket.as_ref() { - if settings.auto_multicast { - socket - .as_ref() - .join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - [ - "Failed to join multicast group for {:?}: {}", - client, - err - ] - ) - })?; - } - if settings.multicast_loop { - socket.as_ref().set_multicast_loop_v4(true).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast loop for {:?}: {}", client, err] - ) - })?; - } - - socket - .as_ref() - .set_multicast_ttl_v4(settings.ttl_mc) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast ttl for {:?}: {}", client, err] - ) - })?; - } - } - IpAddr::V6(addr) => { - if let Some(socket) = self.socket_v6.as_ref() { - if settings.auto_multicast { - socket.as_ref().join_multicast_v6(&addr, 0).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to join multicast group for {:?}: {}", client, err] - ) - })?; - } - if settings.multicast_loop { - socket.as_ref().set_multicast_loop_v6(true).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set multicast loop for {:?}: {}", client, err] - ) - })?; - } - /* FIXME no API for set_multicast_ttl_v6 ? */ - } - } - } - } else { - match client.ip() { - IpAddr::V4(_) => { - if let Some(socket) = self.socket.as_ref() { - socket.as_ref().set_ttl(settings.ttl).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set unicast ttl for {:?}: {}", client, err] - ) - })?; - } - } - IpAddr::V6(_) => { - if let Some(socket) = self.socket_v6.as_ref() { - socket.as_ref().set_ttl(settings.ttl).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to set unicast ttl for {:?}: {}", client, err] - ) - })?; - } - } - } - } - - Ok(()) - } - - fn unconfigure_client( - &self, - settings: &Settings, - client: &SocketAddr, - ) -> Result<(), gst::ErrorMessage> { - if client.ip().is_multicast() { - match client.ip() { - IpAddr::V4(addr) => { - if let Some(socket) = self.socket.as_ref() { - if settings.auto_multicast { - socket - .as_ref() - .leave_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0)) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - [ - "Failed to leave multicast group for {:?}: {}", - client, - err - ] - ) - })?; - } - } - } - IpAddr::V6(addr) => { - if let Some(socket) = self.socket_v6.as_ref() { - if settings.auto_multicast { - socket - .as_ref() - .leave_multicast_v6(&addr, 0) - .map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - [ - "Failed to leave multicast group for {:?}: {}", - client, - err - ] - ) - })?; - } - } - } - } - } - - Ok(()) - } -} - -/// Buffer handling. -impl UdpSinkTask { - async fn render(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> { - let data = buffer.map_readable().map_err(|_| { - element_error!( - self.element, - gst::StreamError::Format, - ["Failed to map buffer readable"] - ); - gst::FlowError::Error - })?; - - for client in self.clients.iter() { - let socket = match client.ip() { - IpAddr::V4(_) => &mut self.socket, - IpAddr::V6(_) => &mut self.socket_v6, - }; - - if let Some(socket) = socket.as_mut() { - gst::log!(CAT, obj: self.element, "Sending to {:?}", &client); - socket.send_to(&data, *client).await.map_err(|err| { - element_error!( - self.element, - gst::StreamError::Failed, - ("I/O error"), - ["streaming stopped, I/O error {}", err] - ); - gst::FlowError::Error - })?; - } else { - element_error!( - self.element, - gst::StreamError::Failed, - ("I/O error"), - ["No socket available for sending to {}", client] - ); - return Err(gst::FlowError::Error); - } - } - - gst::log!( - CAT, - obj: self.element, - "Sent buffer {:?} to all clients", - &buffer - ); - - Ok(()) - } - - /// Waits until specified time. - async fn sync(&self, running_time: gst::ClockTime) { - let now = self.element.current_running_time(); - - if let Ok(Some(delay)) = running_time.opt_checked_sub(now) { - gst::trace!(CAT, obj: self.element, "sync: waiting {}", delay); - runtime::timer::delay_for(delay.into()).await; - } - } -} - -impl TaskImpl for UdpSinkTask { - type Item = TaskItem; - - fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async move { - gst::info!(CAT, obj: self.element, "Preparing Task"); - assert!(self.clients.is_empty()); - let clients = { - let udpsink = self.element.imp(); - let mut settings = udpsink.settings.lock().unwrap(); - self.sync = settings.sync; - self.socket = self.prepare_socket(&mut settings, SocketFamily::Ipv4)?; - self.socket_v6 = self.prepare_socket(&mut settings, SocketFamily::Ipv6)?; - self.latency = settings.latency; - settings.clients.clone() - }; - - self.replace_with_clients(clients); - - Ok(()) - } - .boxed() - } - - fn unprepare(&mut self) -> BoxFuture<'_, ()> { - async move { - gst::info!(CAT, obj: self.element, "Unpreparing Task"); - - let udpsink = self.element.imp(); - let settings = udpsink.settings.lock().unwrap(); - for addr in self.clients.iter() { - let _ = self.unconfigure_client(&settings, addr); - } - } - .boxed() - } - - fn try_next(&mut self) -> BoxFuture<'_, Result> { - async move { - loop { - gst::info!(CAT, obj: self.element, "Awaiting next item or command"); - futures::select_biased! { - cmd = self.cmd_receiver.recv_async() => { - self.process_command(cmd.unwrap()); - } - item_opt = Pin::new(&mut self.item_receiver).peek() => { - // Check the peeked item in case we need to sync. - // The item will still be available in the channel - // in case this is cancelled by a state transition. - match item_opt { - Some(TaskItem::Buffer(buffer)) => { - if self.sync { - let rtime = self.segment.as_ref().and_then(|segment| { - segment - .downcast_ref::() - .and_then(|segment| { - segment.to_running_time(buffer.pts()).opt_add(self.latency) - }) - }); - if let Some(rtime) = rtime { - // This can be cancelled by a state transition. - self.sync(rtime).await; - } - } - } - Some(_) => (), - None => { - panic!("Internal channel sender dropped while Task is Started"); - } - } - - // An item was peeked above, we can now pop it without losing it. - return Ok(self.item_receiver.next().await.unwrap()); - } - } - } - } - .boxed() - } - - fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> { - async move { - gst::info!(CAT, obj: self.element, "Handling {:?}", item); - - match item { - TaskItem::Buffer(buffer) => self.render(buffer).await.map_err(|err| { - element_error!( - &self.element, - gst::StreamError::Failed, - ["Failed to render item, stopping task: {}", err] - ); - gst::FlowError::Error - })?, - TaskItem::Event(event) => match event.view() { - EventView::Eos(_) => { - let _ = self - .element - .post_message(gst::message::Eos::builder().src(&self.element).build()); - } - EventView::Segment(e) => { - self.segment = Some(e.segment().clone()); - } - EventView::SinkMessage(e) => { - let _ = self.element.post_message(e.message()); - } - _ => (), - }, - } - - Ok(()) - } - .boxed() - } - - fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async { - gst::info!(CAT, obj: self.element, "Stopping Task"); - self.flush().await; - Ok(()) - } - .boxed() - } - - fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { - async { - gst::info!(CAT, obj: self.element, "Starting Task Flush"); - self.flush().await; - Ok(()) - } - .boxed() - } -} - -#[derive(Debug)] -enum SocketFamily { - Ipv4, - Ipv6, -} - -#[derive(Debug)] -pub struct UdpSink { - sink_pad: PadSink, - task: Task, - item_sender: Mutex>>, - cmd_sender: Mutex>>, - settings: Mutex, -} - -impl UdpSink { - #[track_caller] - fn clone_item_sender(&self) -> flume::Sender { - self.item_sender.lock().unwrap().as_ref().unwrap().clone() - } - fn prepare(&self) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, imp: self, "Preparing"); - let context = { - let settings = self.settings.lock().unwrap(); + let mut settings = self.settings.lock().unwrap(); - Context::acquire(&settings.context, settings.context_wait).map_err(|err| { - error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to acquire Context: {}", err] - ) - })? - }; + let ts_ctx = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to acquire Context: {}", err] + ) + })?; - // Enable backpressure for items - let (item_sender, item_receiver) = flume::bounded(0); - let (cmd_sender, cmd_receiver) = flume::unbounded(); - let task_impl = UdpSinkTask::new(&self.obj(), item_receiver, cmd_receiver); - self.task.prepare(task_impl, context).block_on()?; + let socket = self.prepare_socket(&ts_ctx, &mut settings, SocketFamily::Ipv4)?; + let socket_v6 = self.prepare_socket(&ts_ctx, &mut settings, SocketFamily::Ipv6)?; - *self.item_sender.lock().unwrap() = Some(item_sender); - *self.cmd_sender.lock().unwrap() = Some(cmd_sender); + self.sink_pad_handler + .prepare(self, socket, socket_v6, &settings)?; + *self.ts_ctx.lock().unwrap() = Some(ts_ctx); gst::debug!(CAT, imp: self, "Started preparation"); @@ -853,73 +798,44 @@ impl UdpSink { fn unprepare(&self) { gst::debug!(CAT, imp: self, "Unpreparing"); - self.task.unprepare().block_on().unwrap(); + self.sink_pad_handler.unprepare(); + *self.ts_ctx.lock().unwrap() = None; gst::debug!(CAT, imp: self, "Unprepared"); } fn stop(&self) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, imp: self, "Stopping"); - self.task.stop().block_on()?; + self.sink_pad_handler.stop(); gst::debug!(CAT, imp: self, "Stopped"); Ok(()) } fn start(&self) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, imp: self, "Starting"); - self.task.start().block_on()?; + self.sink_pad_handler.start(); gst::debug!(CAT, imp: self, "Started"); Ok(()) } - fn add_client(&self, settings: &mut Settings, client: SocketAddr) { - settings.clients.insert(client); - if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { - cmd_sender.send(Command::AddClient(client)).unwrap(); - } + fn try_into_socket_addr(&self, host: &str, port: i32) -> Result { + let addr: IpAddr = match host.parse() { + Err(err) => { + gst::error!(CAT, imp: self, "Failed to parse host {}: {}", host, err); + return Err(()); + } + Ok(addr) => addr, + }; + + let port: u16 = match port.try_into() { + Err(err) => { + gst::error!(CAT, imp: self, "Invalid port {}: {}", port, err); + return Err(()); + } + Ok(port) => port, + }; + + Ok(SocketAddr::new(addr, port)) } - - fn remove_client(&self, settings: &mut Settings, client: SocketAddr) { - settings.clients.remove(&client); - if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { - cmd_sender.send(Command::RemoveClient(client)).unwrap(); - } - } - - fn replace_with_clients( - &self, - settings: &mut Settings, - clients: impl IntoIterator, - ) { - let clients = BTreeSet::::from_iter(clients); - if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { - settings.clients = clients.clone(); - cmd_sender - .send(Command::ReplaceWithClients(clients)) - .unwrap(); - } else { - settings.clients = clients; - } - } -} - -fn try_into_socket_addr(imp: &UdpSink, host: &str, port: i32) -> Result { - let addr: IpAddr = match host.parse() { - Err(err) => { - gst::error!(CAT, imp: imp, "Failed to parse host {}: {}", host, err); - return Err(()); - } - Ok(addr) => addr, - }; - - let port: u16 = match port.try_into() { - Err(err) => { - gst::error!(CAT, imp: imp, "Invalid port {}: {}", port, err); - return Err(()); - } - Ok(port) => port, - }; - - Ok(SocketAddr::new(addr, port)) } #[glib::object_subclass] @@ -929,15 +845,15 @@ impl ObjectSubclass for UdpSink { type ParentType = gst::Element; fn with_class(klass: &Self::Class) -> Self { + let sink_pad_handler = UdpSinkPadHandler::default(); Self { sink_pad: PadSink::new( gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")), - UdpSinkPadHandler, + sink_pad_handler.clone(), ), - task: Task::default(), - item_sender: Default::default(), - cmd_sender: Default::default(), + sink_pad_handler, settings: Default::default(), + ts_ctx: Default::default(), } } } @@ -1051,14 +967,13 @@ impl ObjectImpl for UdpSink { .param_types([String::static_type(), i32::static_type()]) .action() .class_handler(|_, args| { - let element = args[0].get::().expect("signal arg"); + let elem = args[0].get::().expect("signal arg"); let host = args[1].get::().expect("signal arg"); let port = args[2].get::().expect("signal arg"); - let udpsink = element.imp(); + let imp = elem.imp(); - if let Ok(addr) = try_into_socket_addr(udpsink, &host, port) { - let mut settings = udpsink.settings.lock().unwrap(); - udpsink.add_client(&mut settings, addr); + if let Ok(addr) = imp.try_into_socket_addr(&host, port) { + imp.sink_pad_handler.add_client(imp, addr); } None @@ -1068,14 +983,13 @@ impl ObjectImpl for UdpSink { .param_types([String::static_type(), i32::static_type()]) .action() .class_handler(|_, args| { - let element = args[0].get::().expect("signal arg"); + let elem = args[0].get::().expect("signal arg"); let host = args[1].get::().expect("signal arg"); let port = args[2].get::().expect("signal arg"); - let udpsink = element.imp(); + let imp = elem.imp(); - if let Ok(addr) = try_into_socket_addr(udpsink, &host, port) { - let mut settings = udpsink.settings.lock().unwrap(); - udpsink.remove_client(&mut settings, addr); + if let Ok(addr) = imp.try_into_socket_addr(&host, port) { + imp.sink_pad_handler.remove_client(imp, addr); } None @@ -1084,11 +998,10 @@ impl ObjectImpl for UdpSink { glib::subclass::Signal::builder("clear") .action() .class_handler(|_, args| { - let element = args[0].get::().expect("signal arg"); + let elem = args[0].get::().expect("signal arg"); - let udpsink = element.imp(); - let mut settings = udpsink.settings.lock().unwrap(); - udpsink.replace_with_clients(&mut settings, BTreeSet::new()); + let imp = elem.imp(); + imp.sink_pad_handler.replace_clients(imp, BTreeSet::new()); None }) @@ -1105,9 +1018,7 @@ impl ObjectImpl for UdpSink { "sync" => { let sync = value.get().expect("type checked upstream"); settings.sync = sync; - if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { - cmd_sender.send(Command::SetSync(sync)).unwrap(); - } + self.sink_pad_handler.set_sync(sync); } "bind-address" => { settings.bind_address = value @@ -1146,16 +1057,20 @@ impl ObjectImpl for UdpSink { unreachable!(); } "auto-multicast" => { - settings.auto_multicast = value.get().expect("type checked upstream"); + settings.socket_conf.auto_multicast = value.get().expect("type checked upstream"); + self.sink_pad_handler.set_socket_conf(settings.socket_conf); } "loop" => { - settings.multicast_loop = value.get().expect("type checked upstream"); + settings.socket_conf.multicast_loop = value.get().expect("type checked upstream"); + self.sink_pad_handler.set_socket_conf(settings.socket_conf); } "ttl" => { - settings.ttl = value.get().expect("type checked upstream"); + settings.socket_conf.ttl = value.get().expect("type checked upstream"); + self.sink_pad_handler.set_socket_conf(settings.socket_conf); } "ttl-mc" => { - settings.ttl_mc = value.get().expect("type checked upstream"); + settings.socket_conf.ttl_mc = value.get().expect("type checked upstream"); + self.sink_pad_handler.set_socket_conf(settings.socket_conf); } "qos-dscp" => { settings.qos_dscp = value.get().expect("type checked upstream"); @@ -1167,22 +1082,33 @@ impl ObjectImpl for UdpSink { .unwrap_or_else(|| "".into()); let clients = clients.split(',').filter_map(|client| { - let rsplit: Vec<&str> = client.rsplitn(2, ':').collect(); - - if rsplit.len() == 2 { - rsplit[0] - .parse::() - .map_err(|err| { - gst::error!(CAT, imp: self, "Invalid port {}: {}", rsplit[0], err); - }) - .and_then(|port| try_into_socket_addr(self, rsplit[1], port)) - .ok() + let mut splited = client.splitn(2, ':'); + if let Some((addr, port)) = splited.next().zip(splited.next()) { + match port.parse::() { + Ok(port) => match self.try_into_socket_addr(addr, port) { + Ok(socket_addr) => Some(socket_addr), + Err(()) => { + gst::error!( + CAT, + imp: self, + "Invalid socket address {addr}:{port}" + ); + None + } + }, + Err(err) => { + gst::error!(CAT, imp: self, "Invalid port {err}"); + None + } + } } else { + gst::error!(CAT, imp: self, "Invalid client {client}"); None } }); - self.replace_with_clients(&mut settings, clients); + let clients = BTreeSet::from_iter(clients); + self.sink_pad_handler.replace_clients(self, clients); } "context" => { settings.context = value @@ -1227,14 +1153,13 @@ impl ObjectImpl for UdpSink { .as_ref() .map(GioSocketWrapper::as_socket) .to_value(), - "auto-multicast" => settings.sync.to_value(), - "loop" => settings.multicast_loop.to_value(), - "ttl" => settings.ttl.to_value(), - "ttl-mc" => settings.ttl_mc.to_value(), + "auto-multicast" => settings.socket_conf.auto_multicast.to_value(), + "loop" => settings.socket_conf.multicast_loop.to_value(), + "ttl" => settings.socket_conf.ttl.to_value(), + "ttl-mc" => settings.socket_conf.ttl_mc.to_value(), "qos-dscp" => settings.qos_dscp.to_value(), "clients" => { - let clients = settings.clients.clone(); - drop(settings); + let clients = self.sink_pad_handler.clients(); let clients: Vec = clients.iter().map(ToString::to_string).collect(); clients.join(",").to_value() @@ -1320,10 +1245,7 @@ impl ElementImpl for UdpSink { match event.view() { EventView::Latency(ev) => { let latency = Some(ev.latency()); - if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { - cmd_sender.send(Command::SetLatency(latency)).unwrap(); - } - self.settings.lock().unwrap().latency = latency; + self.sink_pad_handler.set_latency(latency); self.sink_pad.gst_pad().push_event(event) } EventView::Step(..) => false,