threadshare/udpsink: Some more refactoring

Let's take locks less often.
This commit is contained in:
Sebastian Dröge 2020-03-16 12:42:33 +02:00
parent 85cbbf5240
commit d2ad227a2f

View file

@ -368,10 +368,7 @@ impl UdpSinkPadHandler {
fn configure_client(
&self,
auto_multicast: bool,
multicast_loop: bool,
ttl_mc: u32,
ttl: u32,
settings: &Settings,
socket: &mut Option<tokio::net::UdpSocket>,
socket_v6: &mut Option<tokio::net::UdpSocket>,
client: &SocketAddr,
@ -380,7 +377,7 @@ impl UdpSinkPadHandler {
match client.ip() {
IpAddr::V4(addr) => {
if let Some(socket) = socket.as_mut() {
if auto_multicast {
if settings.auto_multicast {
socket
.join_multicast_v4(addr, Ipv4Addr::new(0, 0, 0, 0))
.map_err(|err| {
@ -390,7 +387,7 @@ impl UdpSinkPadHandler {
)
})?;
}
if multicast_loop {
if settings.multicast_loop {
socket.set_multicast_loop_v4(true).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
@ -398,17 +395,19 @@ impl UdpSinkPadHandler {
)
})?;
}
socket.set_multicast_ttl_v4(ttl_mc).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
["Failed to set multicast ttl: {}", err]
)
})?;
socket
.set_multicast_ttl_v4(settings.ttl_mc)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
["Failed to set multicast ttl: {}", err]
)
})?;
}
}
IpAddr::V6(addr) => {
if let Some(socket) = socket_v6.as_mut() {
if auto_multicast {
if settings.auto_multicast {
socket.join_multicast_v6(&addr, 0).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
@ -416,7 +415,7 @@ impl UdpSinkPadHandler {
)
})?;
}
if multicast_loop {
if settings.multicast_loop {
socket.set_multicast_loop_v6(true).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
@ -432,7 +431,7 @@ impl UdpSinkPadHandler {
match client.ip() {
IpAddr::V4(_) => {
if let Some(socket) = socket.as_mut() {
socket.set_ttl(ttl).map_err(|err| {
socket.set_ttl(settings.ttl).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
["Failed to set unicast ttl: {}", err]
@ -442,7 +441,7 @@ impl UdpSinkPadHandler {
}
IpAddr::V6(_) => {
if let Some(socket) = socket_v6.as_mut() {
socket.set_ttl(ttl).map_err(|err| {
socket.set_ttl(settings.ttl).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
["Failed to set unicast ttl: {}", err]
@ -458,7 +457,7 @@ impl UdpSinkPadHandler {
fn unconfigure_client(
&self,
auto_multicast: bool,
settings: &Settings,
socket: &mut Option<tokio::net::UdpSocket>,
socket_v6: &mut Option<tokio::net::UdpSocket>,
client: &SocketAddr,
@ -467,7 +466,7 @@ impl UdpSinkPadHandler {
match client.ip() {
IpAddr::V4(addr) => {
if let Some(socket) = socket.as_mut() {
if auto_multicast {
if settings.auto_multicast {
socket
.leave_multicast_v4(addr, Ipv4Addr::new(0, 0, 0, 0))
.map_err(|err| {
@ -481,7 +480,7 @@ impl UdpSinkPadHandler {
}
IpAddr::V6(addr) => {
if let Some(socket) = socket_v6.as_mut() {
if auto_multicast {
if settings.auto_multicast {
socket.leave_multicast_v6(&addr, 0).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
@ -510,6 +509,7 @@ impl UdpSinkPadHandler {
clients_to_unconfigure,
socket,
socket_v6,
settings,
) = {
let mut state = self.0.write().unwrap();
let do_sync = state.sync;
@ -527,6 +527,8 @@ impl UdpSinkPadHandler {
let clients_to_configure = mem::replace(&mut state.clients_to_configure, vec![]);
let clients_to_unconfigure = mem::replace(&mut state.clients_to_unconfigure, vec![]);
let settings = state.settings.lock().unwrap().clone();
(
do_sync,
rtime,
@ -535,66 +537,31 @@ impl UdpSinkPadHandler {
clients_to_unconfigure,
Arc::clone(&state.socket),
Arc::clone(&state.socket_v6),
settings,
)
};
let mut socket = socket.lock().await;
let mut socket_v6 = socket_v6.lock().await;
if !clients_to_configure.is_empty() {
let (auto_multicast, multicast_loop, ttl_mc, ttl, socket, socket_v6) = {
let state = self.0.read().unwrap();
let settings = state.settings.lock().unwrap();
(
settings.auto_multicast,
settings.multicast_loop,
settings.ttl_mc,
settings.ttl,
Arc::clone(&state.socket),
Arc::clone(&state.socket_v6),
)
};
let mut socket = socket.lock().await;
let mut socket_v6 = socket_v6.lock().await;
for client in &clients_to_configure {
self.configure_client(
auto_multicast,
multicast_loop,
ttl_mc,
ttl,
&mut socket,
&mut socket_v6,
&client,
)
.map_err(|err| {
gst_element_error!(
element,
gst::StreamError::Failed,
["Failed to configure client {:?}: {}", client, err]
);
self.configure_client(&settings, &mut socket, &mut socket_v6, &client)
.map_err(|err| {
gst_element_error!(
element,
gst::StreamError::Failed,
["Failed to configure client {:?}: {}", client, err]
);
gst::FlowError::Error
})?;
gst::FlowError::Error
})?;
}
}
if !clients_to_unconfigure.is_empty() {
let (auto_multicast, socket, socket_v6) = {
let state = self.0.read().unwrap();
let settings = state.settings.lock().unwrap();
(
settings.auto_multicast,
Arc::clone(&state.socket),
Arc::clone(&state.socket_v6),
)
};
let mut socket = socket.lock().await;
let mut socket_v6 = socket_v6.lock().await;
for client in &clients_to_unconfigure {
self.unconfigure_client(auto_multicast, &mut socket, &mut socket_v6, &client)
self.unconfigure_client(&settings, &mut socket, &mut socket_v6, &client)
.map_err(|err| {
gst_element_error!(
element,
@ -623,11 +590,11 @@ impl UdpSinkPadHandler {
for client in clients.iter() {
let socket = match client.ip() {
IpAddr::V4(_) => &socket,
IpAddr::V6(_) => &socket_v6,
IpAddr::V4(_) => &mut socket,
IpAddr::V6(_) => &mut socket_v6,
};
if let Some(socket) = socket.lock().await.as_mut() {
if let Some(socket) = socket.as_mut() {
gst_log!(CAT, obj: element, "Sending to {:?}", &client);
socket.send_to(&data, client).await.map_err(|err| {
gst_element_error!(