diff --git a/gst-plugin-threadshare/src/udpsink.rs b/gst-plugin-threadshare/src/udpsink.rs index aeacf345..d237f32f 100644 --- a/gst-plugin-threadshare/src/udpsink.rs +++ b/gst-plugin-threadshare/src/udpsink.rs @@ -338,6 +338,7 @@ struct UdpSinkPadHandlerState { socket_v6: Arc>>, clients: Arc>, clients_to_configure: Vec, + clients_to_unconfigure: Vec, sender: Arc>>>, settings: Arc>, } @@ -359,6 +360,7 @@ impl UdpSinkPadHandler { DEFAULT_PORT as u16, )]), clients_to_configure: vec![], + clients_to_unconfigure: vec![], sender: Arc::new(Mutex::new(None)), settings, }))) @@ -454,12 +456,61 @@ impl UdpSinkPadHandler { Ok(()) } + fn unconfigure_client( + &self, + auto_multicast: bool, + socket: &mut Option, + socket_v6: &mut Option, + client: &SocketAddr, + ) -> Result<(), gst::ErrorMessage> { + if client.ip().is_multicast() { + match client.ip() { + IpAddr::V4(addr) => { + if let Some(socket) = socket.as_mut() { + if auto_multicast { + socket + .leave_multicast_v4(addr, Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to join multicast group: {}", err] + ) + })?; + } + } + } + IpAddr::V6(addr) => { + if let Some(socket) = socket_v6.as_mut() { + if auto_multicast { + socket.leave_multicast_v6(&addr, 0).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to join multicast group: {}", err] + ) + })?; + } + } + } + } + } + + Ok(()) + } + async fn render( &self, element: &gst::Element, buffer: gst::Buffer, ) -> Result { - let (do_sync, rtime, clients, clients_to_configure, socket, socket_v6) = { + let ( + do_sync, + rtime, + clients, + clients_to_configure, + clients_to_unconfigure, + socket, + socket_v6, + ) = { let mut state = self.0.write().unwrap(); let do_sync = state.sync; let mut rtime: gst::ClockTime = 0.into(); @@ -474,12 +525,14 @@ impl UdpSinkPadHandler { } let clients_to_configure = mem::replace(&mut state.clients_to_configure, vec![]); + let clients_to_unconfigure = mem::replace(&mut state.clients_to_unconfigure, vec![]); ( do_sync, rtime, Arc::clone(&state.clients), clients_to_configure, + clients_to_unconfigure, Arc::clone(&state.socket), Arc::clone(&state.socket_v6), ) @@ -525,6 +578,35 @@ impl UdpSinkPadHandler { } } + if !clients_to_unconfigure.is_empty() { + let (auto_multicast, socket, socket_v6) = { + let state = self.0.read().unwrap(); + let settings = state.settings.lock().unwrap(); + + ( + settings.auto_multicast, + Arc::clone(&state.socket), + Arc::clone(&state.socket_v6), + ) + }; + + let mut socket = socket.lock().await; + let mut socket_v6 = socket_v6.lock().await; + + for client in &clients_to_unconfigure { + self.unconfigure_client(auto_multicast, &mut socket, &mut socket_v6, &client) + .map_err(|err| { + gst_element_error!( + element, + gst::StreamError::Failed, + ["Failed to unconfigure client {:?}: {}", client, err] + ); + + gst::FlowError::Error + })?; + } + } + if do_sync { self.sync(&element, rtime).await; } @@ -944,6 +1026,7 @@ impl UdpSink { clients.clear(); state.clients_to_configure = vec![]; + state.clients_to_unconfigure = vec![]; if let Some(host) = &settings.host { self.add_client(&element, state, &host, settings.port as u16); @@ -975,6 +1058,7 @@ impl UdpSink { let clients = Arc::make_mut(&mut state.clients); clients.retain(|addr2| addr != *addr2); + state.clients_to_unconfigure.push(addr); state.clients_to_configure.retain(|addr2| addr != *addr2); } @@ -1004,6 +1088,7 @@ impl UdpSink { let clients = Arc::make_mut(&mut state.clients); clients.push(addr); state.clients_to_configure.push(addr); + state.clients_to_unconfigure.retain(|addr2| addr != *addr2); } }