threadshare/udpsink: Properly remove clients

Leave any multicast groups they might have joined.
This commit is contained in:
Sebastian Dröge 2020-03-16 12:33:18 +02:00
parent ac574cd112
commit 85cbbf5240

View file

@ -338,6 +338,7 @@ struct UdpSinkPadHandlerState {
socket_v6: Arc<Mutex<Option<tokio::net::UdpSocket>>>,
clients: Arc<Vec<SocketAddr>>,
clients_to_configure: Vec<SocketAddr>,
clients_to_unconfigure: Vec<SocketAddr>,
sender: Arc<Mutex<Option<mpsc::Sender<TaskItem>>>>,
settings: Arc<StdMutex<Settings>>,
}
@ -359,6 +360,7 @@ impl UdpSinkPadHandler {
DEFAULT_PORT as u16,
)]),
clients_to_configure: vec![],
clients_to_unconfigure: vec![],
sender: Arc::new(Mutex::new(None)),
settings,
})))
@ -454,12 +456,61 @@ impl UdpSinkPadHandler {
Ok(())
}
fn unconfigure_client(
&self,
auto_multicast: bool,
socket: &mut Option<tokio::net::UdpSocket>,
socket_v6: &mut Option<tokio::net::UdpSocket>,
client: &SocketAddr,
) -> Result<(), gst::ErrorMessage> {
if client.ip().is_multicast() {
match client.ip() {
IpAddr::V4(addr) => {
if let Some(socket) = socket.as_mut() {
if auto_multicast {
socket
.leave_multicast_v4(addr, Ipv4Addr::new(0, 0, 0, 0))
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
["Failed to join multicast group: {}", err]
)
})?;
}
}
}
IpAddr::V6(addr) => {
if let Some(socket) = socket_v6.as_mut() {
if auto_multicast {
socket.leave_multicast_v6(&addr, 0).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
["Failed to join multicast group: {}", err]
)
})?;
}
}
}
}
}
Ok(())
}
async fn render(
&self,
element: &gst::Element,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let (do_sync, rtime, clients, clients_to_configure, socket, socket_v6) = {
let (
do_sync,
rtime,
clients,
clients_to_configure,
clients_to_unconfigure,
socket,
socket_v6,
) = {
let mut state = self.0.write().unwrap();
let do_sync = state.sync;
let mut rtime: gst::ClockTime = 0.into();
@ -474,12 +525,14 @@ impl UdpSinkPadHandler {
}
let clients_to_configure = mem::replace(&mut state.clients_to_configure, vec![]);
let clients_to_unconfigure = mem::replace(&mut state.clients_to_unconfigure, vec![]);
(
do_sync,
rtime,
Arc::clone(&state.clients),
clients_to_configure,
clients_to_unconfigure,
Arc::clone(&state.socket),
Arc::clone(&state.socket_v6),
)
@ -525,6 +578,35 @@ impl UdpSinkPadHandler {
}
}
if !clients_to_unconfigure.is_empty() {
let (auto_multicast, socket, socket_v6) = {
let state = self.0.read().unwrap();
let settings = state.settings.lock().unwrap();
(
settings.auto_multicast,
Arc::clone(&state.socket),
Arc::clone(&state.socket_v6),
)
};
let mut socket = socket.lock().await;
let mut socket_v6 = socket_v6.lock().await;
for client in &clients_to_unconfigure {
self.unconfigure_client(auto_multicast, &mut socket, &mut socket_v6, &client)
.map_err(|err| {
gst_element_error!(
element,
gst::StreamError::Failed,
["Failed to unconfigure client {:?}: {}", client, err]
);
gst::FlowError::Error
})?;
}
}
if do_sync {
self.sync(&element, rtime).await;
}
@ -944,6 +1026,7 @@ impl UdpSink {
clients.clear();
state.clients_to_configure = vec![];
state.clients_to_unconfigure = vec![];
if let Some(host) = &settings.host {
self.add_client(&element, state, &host, settings.port as u16);
@ -975,6 +1058,7 @@ impl UdpSink {
let clients = Arc::make_mut(&mut state.clients);
clients.retain(|addr2| addr != *addr2);
state.clients_to_unconfigure.push(addr);
state.clients_to_configure.retain(|addr2| addr != *addr2);
}
@ -1004,6 +1088,7 @@ impl UdpSink {
let clients = Arc::make_mut(&mut state.clients);
clients.push(addr);
state.clients_to_configure.push(addr);
state.clients_to_unconfigure.retain(|addr2| addr != *addr2);
}
}