From 374671cb6fb9c15daf601522f8f1a63fe135c0dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Sat, 6 Aug 2022 11:51:43 +0200 Subject: [PATCH] ts/udpsink: fix default clients not leading to socket configuration During MR !793, the socket configuration mechanism was changed to use commands passed to the Task via a channel. This worked properly for user changes via settings and signals, however the default clients setting was not used. A simple solution could have been to send a command at initialization to add the default clients, but it was considered a better solution to just wait for the Task preparation to configure the sockets based on the value of settings.clients at that time, thus avoiding unnecessary successive removals and additions of clients which could have happened before preparation. Of course, users can still add or remove clients as before, before and after Task preparation. See also https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/793 --- generic/threadshare/src/udpsink/imp.rs | 83 ++++++++++++++++---------- 1 file changed, 50 insertions(+), 33 deletions(-) diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index ab356da6..9a09ba80 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -78,6 +78,7 @@ struct Settings { context: String, context_wait: Duration, clients: BTreeSet, + latency: Option, } impl Default for Settings { @@ -103,6 +104,7 @@ impl Default for Settings { DEFAULT_HOST.unwrap().parse().unwrap(), DEFAULT_PORT as u16, )]), + latency: None, } } } @@ -134,7 +136,7 @@ impl PadSinkHandler for UdpSinkPadHandler { element: &gst::Element, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let sender = udpsink.item_sender.clone(); + let sender = udpsink.clone_item_sender(); let element = element.clone().downcast::().unwrap(); async move { @@ -155,7 +157,7 @@ impl PadSinkHandler for UdpSinkPadHandler { element: &gst::Element, list: gst::BufferList, ) -> BoxFuture<'static, Result> { - let sender = udpsink.item_sender.clone(); + let sender = udpsink.clone_item_sender(); let element = element.clone().downcast::().unwrap(); async move { @@ -178,7 +180,7 @@ impl PadSinkHandler for UdpSinkPadHandler { element: &gst::Element, event: gst::Event, ) -> BoxFuture<'static, bool> { - let sender = udpsink.item_sender.clone(); + let sender = udpsink.clone_item_sender(); let element = element.clone().downcast::().unwrap(); async move { @@ -443,6 +445,8 @@ impl UdpSinkTask { } else if let Err(err) = self.unconfigure_client(&settings, addr) { gst::error!(CAT, obj: &self.element, "Failed to remove client {:?}: {}", addr, err); res = Err(err); + } else { + gst::info!(CAT, obj: &self.element, "Removed client {:?}", addr); } } @@ -451,6 +455,7 @@ impl UdpSinkTask { gst::error!(CAT, obj: &self.element, "Failed to add client {:?}: {}", addr, err); res = Err(err); } else { + gst::info!(CAT, obj: &self.element, "Added client {:?}", addr); self.clients.insert(addr); } } @@ -690,17 +695,17 @@ impl TaskImpl for UdpSinkTask { async move { gst::info!(CAT, obj: &self.element, "Preparing Task"); assert!(self.clients.is_empty()); - { + let clients = { let udpsink = self.element.imp(); let mut settings = udpsink.settings.lock().unwrap(); self.sync = settings.sync; self.socket = self.prepare_socket(&mut settings, SocketFamily::Ipv4)?; self.socket_v6 = self.prepare_socket(&mut settings, SocketFamily::Ipv6)?; - } + self.latency = settings.latency; + settings.clients.clone() + }; - while let Ok(cmd) = self.cmd_receiver.try_recv() { - self.process_command(cmd); - } + self.replace_with_clients(clients); Ok(()) } @@ -771,14 +776,17 @@ enum SocketFamily { pub struct UdpSink { sink_pad: PadSink, task: Task, - item_sender: flume::Sender, - item_receiver: flume::Receiver, - cmd_sender: flume::Sender, - cmd_receiver: flume::Receiver, + item_sender: Mutex>>, + cmd_sender: Mutex>>, settings: Mutex, } impl UdpSink { + #[track_caller] + fn clone_item_sender(&self) -> flume::Sender { + self.item_sender.lock().unwrap().as_ref().unwrap().clone() + } + fn prepare(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { gst::debug!(CAT, obj: element, "Preparing"); @@ -793,11 +801,10 @@ impl UdpSink { })? }; - let task_impl = UdpSinkTask::new( - element, - self.item_receiver.clone(), - self.cmd_receiver.clone(), - ); + // Enable backpressure for items + let (item_sender, item_receiver) = flume::bounded(0); + let (cmd_sender, cmd_receiver) = flume::unbounded(); + let task_impl = UdpSinkTask::new(element, item_receiver, cmd_receiver); self.task.prepare(task_impl, context).map_err(|err| { error_msg!( gst::ResourceError::OpenRead, @@ -805,6 +812,9 @@ impl UdpSink { ) })?; + *self.item_sender.lock().unwrap() = Some(item_sender); + *self.cmd_sender.lock().unwrap() = Some(cmd_sender); + gst::debug!(CAT, obj: element, "Started preparation"); Ok(()) @@ -834,12 +844,16 @@ impl UdpSink { impl UdpSink { fn add_client(&self, settings: &mut Settings, client: SocketAddr) { settings.clients.insert(client); - self.cmd_sender.send(Command::AddClient(client)).unwrap(); + if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { + cmd_sender.send(Command::AddClient(client)).unwrap(); + } } fn remove_client(&self, settings: &mut Settings, client: SocketAddr) { settings.clients.remove(&client); - self.cmd_sender.send(Command::RemoveClient(client)).unwrap(); + if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { + cmd_sender.send(Command::RemoveClient(client)).unwrap(); + } } fn replace_with_clients( @@ -848,10 +862,14 @@ impl UdpSink { clients: impl IntoIterator, ) { let clients = BTreeSet::::from_iter(clients); - settings.clients = clients.clone(); - self.cmd_sender - .send(Command::ReplaceWithClients(clients)) - .unwrap(); + if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { + settings.clients = clients.clone(); + cmd_sender + .send(Command::ReplaceWithClients(clients)) + .unwrap(); + } else { + settings.clients = clients; + } } } @@ -882,20 +900,14 @@ impl ObjectSubclass for UdpSink { type ParentType = gst::Element; fn with_class(klass: &Self::Class) -> Self { - // Enable backpressure for items - let (item_sender, item_receiver) = flume::bounded(0); - let (cmd_sender, cmd_receiver) = flume::unbounded(); - Self { sink_pad: PadSink::new( gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")), UdpSinkPadHandler, ), task: Task::default(), - item_sender, - item_receiver, - cmd_sender, - cmd_receiver, + item_sender: Default::default(), + cmd_sender: Default::default(), settings: Default::default(), } } @@ -1109,7 +1121,9 @@ impl ObjectImpl for UdpSink { "sync" => { let sync = value.get().expect("type checked upstream"); settings.sync = sync; - self.cmd_sender.send(Command::SetSync(sync)).unwrap(); + if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { + cmd_sender.send(Command::SetSync(sync)).unwrap(); + } } "bind-address" => { settings.bind_address = value @@ -1323,7 +1337,10 @@ impl ElementImpl for UdpSink { match event.view() { EventView::Latency(ev) => { let latency = Some(ev.latency()); - self.cmd_sender.send(Command::SetLatency(latency)).unwrap(); + if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() { + cmd_sender.send(Command::SetLatency(latency)).unwrap(); + } + self.settings.lock().unwrap().latency = latency; self.sink_pad.gst_pad().push_event(event) } EventView::Step(..) => false,