ts/udpsink: fix default clients not leading to socket configuration

During MR !793, the socket configuration mechanism was changed to
use commands passed to the Task via a channel. This worked properly
for user changes via settings and signals, however the default
clients setting was not used.

A simple solution could have been to send a command at initialization
to add the default clients, but it was considered a better solution
to just wait for the Task preparation to configure the sockets based
on the value of settings.clients at that time, thus avoiding
unnecessary successive removals and additions of clients which could
have happened before preparation.

Of course, users can still add or remove clients as before, before
and after Task preparation.

See also https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/793
This commit is contained in:
François Laignel 2022-08-06 11:51:43 +02:00 committed by Sebastian Dröge
parent f646cabb3d
commit 374671cb6f

View file

@ -78,6 +78,7 @@ struct Settings {
context: String, context: String,
context_wait: Duration, context_wait: Duration,
clients: BTreeSet<SocketAddr>, clients: BTreeSet<SocketAddr>,
latency: Option<gst::ClockTime>,
} }
impl Default for Settings { impl Default for Settings {
@ -103,6 +104,7 @@ impl Default for Settings {
DEFAULT_HOST.unwrap().parse().unwrap(), DEFAULT_HOST.unwrap().parse().unwrap(),
DEFAULT_PORT as u16, DEFAULT_PORT as u16,
)]), )]),
latency: None,
} }
} }
} }
@ -134,7 +136,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
element: &gst::Element, element: &gst::Element,
buffer: gst::Buffer, buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let sender = udpsink.item_sender.clone(); let sender = udpsink.clone_item_sender();
let element = element.clone().downcast::<super::UdpSink>().unwrap(); let element = element.clone().downcast::<super::UdpSink>().unwrap();
async move { async move {
@ -155,7 +157,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
element: &gst::Element, element: &gst::Element,
list: gst::BufferList, list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let sender = udpsink.item_sender.clone(); let sender = udpsink.clone_item_sender();
let element = element.clone().downcast::<super::UdpSink>().unwrap(); let element = element.clone().downcast::<super::UdpSink>().unwrap();
async move { async move {
@ -178,7 +180,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
element: &gst::Element, element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> BoxFuture<'static, bool> { ) -> BoxFuture<'static, bool> {
let sender = udpsink.item_sender.clone(); let sender = udpsink.clone_item_sender();
let element = element.clone().downcast::<super::UdpSink>().unwrap(); let element = element.clone().downcast::<super::UdpSink>().unwrap();
async move { async move {
@ -443,6 +445,8 @@ impl UdpSinkTask {
} else if let Err(err) = self.unconfigure_client(&settings, addr) { } else if let Err(err) = self.unconfigure_client(&settings, addr) {
gst::error!(CAT, obj: &self.element, "Failed to remove client {:?}: {}", addr, err); gst::error!(CAT, obj: &self.element, "Failed to remove client {:?}: {}", addr, err);
res = Err(err); res = Err(err);
} else {
gst::info!(CAT, obj: &self.element, "Removed client {:?}", addr);
} }
} }
@ -451,6 +455,7 @@ impl UdpSinkTask {
gst::error!(CAT, obj: &self.element, "Failed to add client {:?}: {}", addr, err); gst::error!(CAT, obj: &self.element, "Failed to add client {:?}: {}", addr, err);
res = Err(err); res = Err(err);
} else { } else {
gst::info!(CAT, obj: &self.element, "Added client {:?}", addr);
self.clients.insert(addr); self.clients.insert(addr);
} }
} }
@ -690,17 +695,17 @@ impl TaskImpl for UdpSinkTask {
async move { async move {
gst::info!(CAT, obj: &self.element, "Preparing Task"); gst::info!(CAT, obj: &self.element, "Preparing Task");
assert!(self.clients.is_empty()); assert!(self.clients.is_empty());
{ let clients = {
let udpsink = self.element.imp(); let udpsink = self.element.imp();
let mut settings = udpsink.settings.lock().unwrap(); let mut settings = udpsink.settings.lock().unwrap();
self.sync = settings.sync; self.sync = settings.sync;
self.socket = self.prepare_socket(&mut settings, SocketFamily::Ipv4)?; self.socket = self.prepare_socket(&mut settings, SocketFamily::Ipv4)?;
self.socket_v6 = self.prepare_socket(&mut settings, SocketFamily::Ipv6)?; self.socket_v6 = self.prepare_socket(&mut settings, SocketFamily::Ipv6)?;
} self.latency = settings.latency;
settings.clients.clone()
};
while let Ok(cmd) = self.cmd_receiver.try_recv() { self.replace_with_clients(clients);
self.process_command(cmd);
}
Ok(()) Ok(())
} }
@ -771,14 +776,17 @@ enum SocketFamily {
pub struct UdpSink { pub struct UdpSink {
sink_pad: PadSink, sink_pad: PadSink,
task: Task, task: Task,
item_sender: flume::Sender<TaskItem>, item_sender: Mutex<Option<flume::Sender<TaskItem>>>,
item_receiver: flume::Receiver<TaskItem>, cmd_sender: Mutex<Option<flume::Sender<Command>>>,
cmd_sender: flume::Sender<Command>,
cmd_receiver: flume::Receiver<Command>,
settings: Mutex<Settings>, settings: Mutex<Settings>,
} }
impl UdpSink { impl UdpSink {
#[track_caller]
fn clone_item_sender(&self) -> flume::Sender<TaskItem> {
self.item_sender.lock().unwrap().as_ref().unwrap().clone()
}
fn prepare(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> { fn prepare(&self, element: &super::UdpSink) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, obj: element, "Preparing"); gst::debug!(CAT, obj: element, "Preparing");
@ -793,11 +801,10 @@ impl UdpSink {
})? })?
}; };
let task_impl = UdpSinkTask::new( // Enable backpressure for items
element, let (item_sender, item_receiver) = flume::bounded(0);
self.item_receiver.clone(), let (cmd_sender, cmd_receiver) = flume::unbounded();
self.cmd_receiver.clone(), let task_impl = UdpSinkTask::new(element, item_receiver, cmd_receiver);
);
self.task.prepare(task_impl, context).map_err(|err| { self.task.prepare(task_impl, context).map_err(|err| {
error_msg!( error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
@ -805,6 +812,9 @@ impl UdpSink {
) )
})?; })?;
*self.item_sender.lock().unwrap() = Some(item_sender);
*self.cmd_sender.lock().unwrap() = Some(cmd_sender);
gst::debug!(CAT, obj: element, "Started preparation"); gst::debug!(CAT, obj: element, "Started preparation");
Ok(()) Ok(())
@ -834,12 +844,16 @@ impl UdpSink {
impl UdpSink { impl UdpSink {
fn add_client(&self, settings: &mut Settings, client: SocketAddr) { fn add_client(&self, settings: &mut Settings, client: SocketAddr) {
settings.clients.insert(client); settings.clients.insert(client);
self.cmd_sender.send(Command::AddClient(client)).unwrap(); if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() {
cmd_sender.send(Command::AddClient(client)).unwrap();
}
} }
fn remove_client(&self, settings: &mut Settings, client: SocketAddr) { fn remove_client(&self, settings: &mut Settings, client: SocketAddr) {
settings.clients.remove(&client); settings.clients.remove(&client);
self.cmd_sender.send(Command::RemoveClient(client)).unwrap(); if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() {
cmd_sender.send(Command::RemoveClient(client)).unwrap();
}
} }
fn replace_with_clients( fn replace_with_clients(
@ -848,10 +862,14 @@ impl UdpSink {
clients: impl IntoIterator<Item = SocketAddr>, clients: impl IntoIterator<Item = SocketAddr>,
) { ) {
let clients = BTreeSet::<SocketAddr>::from_iter(clients); let clients = BTreeSet::<SocketAddr>::from_iter(clients);
if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() {
settings.clients = clients.clone(); settings.clients = clients.clone();
self.cmd_sender cmd_sender
.send(Command::ReplaceWithClients(clients)) .send(Command::ReplaceWithClients(clients))
.unwrap(); .unwrap();
} else {
settings.clients = clients;
}
} }
} }
@ -882,20 +900,14 @@ impl ObjectSubclass for UdpSink {
type ParentType = gst::Element; type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self { fn with_class(klass: &Self::Class) -> Self {
// Enable backpressure for items
let (item_sender, item_receiver) = flume::bounded(0);
let (cmd_sender, cmd_receiver) = flume::unbounded();
Self { Self {
sink_pad: PadSink::new( sink_pad: PadSink::new(
gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")), gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")),
UdpSinkPadHandler, UdpSinkPadHandler,
), ),
task: Task::default(), task: Task::default(),
item_sender, item_sender: Default::default(),
item_receiver, cmd_sender: Default::default(),
cmd_sender,
cmd_receiver,
settings: Default::default(), settings: Default::default(),
} }
} }
@ -1109,7 +1121,9 @@ impl ObjectImpl for UdpSink {
"sync" => { "sync" => {
let sync = value.get().expect("type checked upstream"); let sync = value.get().expect("type checked upstream");
settings.sync = sync; settings.sync = sync;
self.cmd_sender.send(Command::SetSync(sync)).unwrap(); if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() {
cmd_sender.send(Command::SetSync(sync)).unwrap();
}
} }
"bind-address" => { "bind-address" => {
settings.bind_address = value settings.bind_address = value
@ -1323,7 +1337,10 @@ impl ElementImpl for UdpSink {
match event.view() { match event.view() {
EventView::Latency(ev) => { EventView::Latency(ev) => {
let latency = Some(ev.latency()); let latency = Some(ev.latency());
self.cmd_sender.send(Command::SetLatency(latency)).unwrap(); if let Some(cmd_sender) = self.cmd_sender.lock().unwrap().as_mut() {
cmd_sender.send(Command::SetLatency(latency)).unwrap();
}
self.settings.lock().unwrap().latency = latency;
self.sink_pad.gst_pad().push_event(event) self.sink_pad.gst_pad().push_event(event)
} }
EventView::Step(..) => false, EventView::Step(..) => false,