diff --git a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs index 272ce066..f3026949 100644 --- a/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs +++ b/gst-plugin-threadshare/src/jitterbuffer/jitterbuffer.rs @@ -43,6 +43,7 @@ use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; use std::time::Duration; +use crate::get_current_running_time; use crate::runtime::prelude::*; use crate::runtime::{ self, Context, JoinHandle, PadContext, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak, @@ -433,18 +434,6 @@ lazy_static! { } impl JitterBuffer { - fn get_current_running_time(&self, element: &gst::Element) -> gst::ClockTime { - if let Some(clock) = element.get_clock() { - if clock.get_time() > element.get_base_time() { - clock.get_time() - element.get_base_time() - } else { - gst::ClockTime(Some(0)) - } - } else { - gst::CLOCK_TIME_NONE - } - } - fn parse_caps( &self, state: &mut MutexGuard, @@ -645,7 +634,7 @@ impl JitterBuffer { } if dts == gst::CLOCK_TIME_NONE { - dts = self.get_current_running_time(element); + dts = get_current_running_time(element); pts = dts; estimated_dts = state.clock_rate != -1; @@ -947,7 +936,7 @@ impl JitterBuffer { ) }; - let now = self.get_current_running_time(element); + let now = get_current_running_time(element); gst_debug!( CAT, @@ -1018,7 +1007,7 @@ impl JitterBuffer { None => return, }; - let now = jb.get_current_running_time(&element); + let now = get_current_running_time(&element); gst_debug!( CAT, diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs index 4add7f9d..c8171fdf 100644 --- a/gst-plugin-threadshare/src/lib.rs +++ b/gst-plugin-threadshare/src/lib.rs @@ -31,6 +31,7 @@ pub mod runtime; pub mod socket; mod tcpclientsrc; +mod udpsink; mod udpsrc; mod appsrc; @@ -44,10 +45,12 @@ use glib_sys as glib_ffi; use gst; use gst::gst_plugin_define; +use gst::prelude::*; use gstreamer_sys as gst_ffi; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { udpsrc::register(plugin)?; + udpsink::register(plugin)?; tcpclientsrc::register(plugin)?; queue::register(plugin)?; proxy::register(plugin)?; @@ -98,3 +101,15 @@ impl<'a> Drop for MutexGuard<'a> { } } } + +pub fn get_current_running_time(element: &gst::Element) -> gst::ClockTime { + if let Some(clock) = element.get_clock() { + if clock.get_time() > element.get_base_time() { + clock.get_time() - element.get_base_time() + } else { + 0.into() + } + } else { + gst::CLOCK_TIME_NONE + } +} diff --git a/gst-plugin-threadshare/src/socket.rs b/gst-plugin-threadshare/src/socket.rs index 4d978c5c..1ff3ba44 100644 --- a/gst-plugin-threadshare/src/socket.rs +++ b/gst-plugin-threadshare/src/socket.rs @@ -23,13 +23,24 @@ use futures::lock::Mutex; use gst; use gst::prelude::*; -use gst::{gst_debug, gst_error}; +use gst::{gst_debug, gst_error, gst_error_msg}; use lazy_static::lazy_static; use std::io; use std::sync::Arc; +use gio; +use gio::prelude::*; +use gio_sys as gio_ffi; +use gobject_sys as gobject_ffi; + +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; + +#[cfg(windows)] +use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; + lazy_static! { static ref SOCKET_CAT: gst::DebugCategory = gst::DebugCategory::new( "ts-socket", @@ -246,3 +257,169 @@ impl SocketStream { } } } + +// Send/Sync struct for passing around a gio::Socket +// and getting the raw fd from it +// +// gio::Socket is not Send/Sync as it's generally unsafe +// to access it from multiple threads. Getting the underlying raw +// fd is safe though, as is receiving/sending from two different threads +#[derive(Debug)] +pub struct GioSocketWrapper { + socket: *mut gio_ffi::GSocket, +} + +unsafe impl Send for GioSocketWrapper {} +unsafe impl Sync for GioSocketWrapper {} + +impl GioSocketWrapper { + pub fn new(socket: &gio::Socket) -> Self { + use glib::translate::*; + + Self { + socket: socket.to_glib_full(), + } + } + + pub fn as_socket(&self) -> gio::Socket { + unsafe { + use glib::translate::*; + + from_glib_none(self.socket) + } + } + + #[cfg(unix)] + pub fn set_tos(&self, qos_dscp: i32) -> Result<(), glib::error::Error> { + use libc::{IPPROTO_IP, IPPROTO_IPV6, IPV6_TCLASS, IP_TOS}; + + let tos = (qos_dscp & 0x3f) << 2; + + let socket = self.as_socket(); + + socket.set_option(IPPROTO_IP, IP_TOS, tos)?; + + if socket.get_family() == gio::SocketFamily::Ipv6 { + socket.set_option(IPPROTO_IPV6, IPV6_TCLASS, tos)?; + } + + Ok(()) + } + + #[cfg(not(unix))] + pub fn set_tos(&self, qos_dscp: i32) -> Result<(), Error> { + Ok(()) + } + + #[cfg(unix)] + pub fn get(&self) -> T { + unsafe { FromRawFd::from_raw_fd(libc::dup(gio_ffi::g_socket_get_fd(self.socket))) } + } + + #[cfg(windows)] + pub fn get(&self) -> T { + unsafe { + FromRawSocket::from_raw_socket( + dup_socket(gio_ffi::g_socket_get_fd(self.socket) as _) as _ + ) + } + } +} + +impl Clone for GioSocketWrapper { + fn clone(&self) -> Self { + Self { + socket: unsafe { gobject_ffi::g_object_ref(self.socket as *mut _) as *mut _ }, + } + } +} + +impl Drop for GioSocketWrapper { + fn drop(&mut self) { + unsafe { + gobject_ffi::g_object_unref(self.socket as *mut _); + } + } +} + +#[cfg(windows)] +unsafe fn dup_socket(socket: usize) -> usize { + use std::mem; + use winapi::shared::ws2def; + use winapi::um::processthreadsapi; + use winapi::um::winsock2; + + let mut proto_info = mem::MaybeUninit::uninit(); + let ret = winsock2::WSADuplicateSocketA( + socket, + processthreadsapi::GetCurrentProcessId(), + proto_info.as_mut_ptr(), + ); + assert_eq!(ret, 0); + let mut proto_info = proto_info.assume_init(); + + let socket = winsock2::WSASocketA( + ws2def::AF_INET, + ws2def::SOCK_DGRAM, + ws2def::IPPROTO_UDP as i32, + &mut proto_info, + 0, + 0, + ); + + assert_ne!(socket, winsock2::INVALID_SOCKET); + + socket +} + +pub fn wrap_socket(socket: &tokio::net::UdpSocket) -> Result { + #[cfg(unix)] + unsafe { + let fd = libc::dup(socket.as_raw_fd()); + + // This is unsafe because it allows us to share the fd between the socket and the + // GIO socket below, but safety of this is the job of the application + struct FdConverter(RawFd); + impl IntoRawFd for FdConverter { + fn into_raw_fd(self) -> RawFd { + self.0 + } + } + + let fd = FdConverter(fd); + + let gio_socket = gio::Socket::new_from_fd(fd).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to create wrapped GIO socket: {}", err] + ) + })?; + Ok(GioSocketWrapper::new(&gio_socket)) + } + #[cfg(windows)] + unsafe { + // FIXME: Needs https://github.com/tokio-rs/tokio/pull/806 + // and https://github.com/carllerche/mio/pull/859 + let fd = unreachable!(); //dup_socket(socket.as_raw_socket() as _) as _; + + // This is unsafe because it allows us to share the fd between the socket and the + // GIO socket below, but safety of this is the job of the application + struct SocketConverter(RawSocket); + impl IntoRawSocket for SocketConverter { + fn into_raw_socket(self) -> RawSocket { + self.0 + } + } + + let fd = SocketConverter(fd); + + let gio_socket = gio::Socket::new_from_socket(fd).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to create wrapped GIO socket: {}", err] + ) + })?; + + Ok(GioSocketWrapper::new(&gio_socket)) + } +} diff --git a/gst-plugin-threadshare/src/udpsink.rs b/gst-plugin-threadshare/src/udpsink.rs new file mode 100644 index 00000000..d51212d2 --- /dev/null +++ b/gst-plugin-threadshare/src/udpsink.rs @@ -0,0 +1,1337 @@ +// Copyright (C) 2019 Mathieu Duponchelle +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use either::Either; + +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future::BoxFuture; +use futures::future::{abortable, AbortHandle, Aborted}; +use futures::lock::{Mutex, MutexGuard}; +use futures::prelude::*; + +use glib; +use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; +use glib::{glib_object_impl, glib_object_subclass}; + +use gst; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::EventView; +use gst::{ + gst_debug, gst_element_error, gst_error, gst_error_msg, gst_info, gst_log, gst_trace, + gst_warning, +}; + +use lazy_static::lazy_static; + +use crate::runtime::prelude::*; +use crate::runtime::{Context, PadSink, PadSinkRef}; +use crate::socket::{wrap_socket, GioSocketWrapper}; + +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; +use std::time::Duration; +use std::u16; +use std::u8; +use tokio::task::JoinHandle; + +const DEFAULT_HOST: Option<&str> = Some("127.0.0.1"); +const DEFAULT_PORT: u32 = 5000; +const DEFAULT_SYNC: bool = true; +const DEFAULT_BIND_ADDRESS: &str = "0.0.0.0"; +const DEFAULT_BIND_PORT: u32 = 0; +const DEFAULT_BIND_ADDRESS_V6: &str = "::"; +const DEFAULT_BIND_PORT_V6: u32 = 0; +const DEFAULT_SOCKET: Option = None; +const DEFAULT_USED_SOCKET: Option = None; +const DEFAULT_SOCKET_V6: Option = None; +const DEFAULT_USED_SOCKET_V6: Option = None; +const DEFAULT_AUTO_MULTICAST: bool = true; +const DEFAULT_LOOP: bool = true; +const DEFAULT_TTL: u32 = 64; +const DEFAULT_TTL_MC: u32 = 1; +const DEFAULT_QOS_DSCP: i32 = -1; +const DEFAULT_CLIENTS: &str = ""; +const DEFAULT_CONTEXT: &str = ""; +const DEFAULT_CONTEXT_WAIT: u32 = 0; + +#[derive(Debug, Clone)] +struct Settings { + host: Option, + port: u32, + sync: bool, + bind_address: String, + bind_port: u32, + bind_address_v6: String, + bind_port_v6: u32, + socket: Option, + used_socket: Option, + socket_v6: Option, + used_socket_v6: Option, + auto_multicast: bool, + multicast_loop: bool, + ttl: u32, + ttl_mc: u32, + qos_dscp: i32, + context: String, + context_wait: u32, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + host: DEFAULT_HOST.map(Into::into), + port: DEFAULT_PORT, + sync: DEFAULT_SYNC, + bind_address: DEFAULT_BIND_ADDRESS.into(), + bind_port: DEFAULT_BIND_PORT, + bind_address_v6: DEFAULT_BIND_ADDRESS_V6.into(), + bind_port_v6: DEFAULT_BIND_PORT_V6, + socket: DEFAULT_SOCKET, + used_socket: DEFAULT_USED_SOCKET, + socket_v6: DEFAULT_SOCKET_V6, + used_socket_v6: DEFAULT_USED_SOCKET_V6, + auto_multicast: DEFAULT_AUTO_MULTICAST, + multicast_loop: DEFAULT_LOOP, + ttl: DEFAULT_TTL, + ttl_mc: DEFAULT_TTL_MC, + qos_dscp: DEFAULT_QOS_DSCP, + context: DEFAULT_CONTEXT.into(), + context_wait: DEFAULT_CONTEXT_WAIT, + } + } +} + +#[derive(Debug)] +struct State { + segment: Option, + context: Option, + socket: Arc>>, + socket_v6: Arc>>, + clients: Vec, + abort_handle: Option, +} + +impl Default for State { + fn default() -> State { + State { + segment: None, + context: None, + socket: Arc::new(Mutex::new(None)), + socket_v6: Arc::new(Mutex::new(None)), + clients: vec![SocketAddr::new( + DEFAULT_HOST.unwrap().parse().unwrap(), + DEFAULT_PORT as u16, + )], + abort_handle: None, + } + } +} + +#[derive(Debug)] +struct PreparationSet { + join_handle: JoinHandle>, +} + +#[derive(Debug)] +struct UdpSink { + sink_pad: PadSink, + state: Mutex, + settings: Mutex, + preparation_set: Mutex>, +} + +lazy_static! { + static ref CAT: gst::DebugCategory = gst::DebugCategory::new( + "ts-udpsink", + gst::DebugColorFlags::empty(), + Some("Thread-sharing UDP sink"), + ); +} + +static PROPERTIES: [subclass::Property; 19] = [ + subclass::Property("host", |name| { + glib::ParamSpec::string( + name, + "Host", + "The host/IP/Multicast group to send the packets to", + DEFAULT_HOST, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("port", |name| { + glib::ParamSpec::uint( + name, + "Port", + "The port to send the packets to", + 0, + u16::MAX as u32, + DEFAULT_PORT, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("sync", |name| { + glib::ParamSpec::boolean( + name, + "Sync", + "Sync on the clock", + DEFAULT_SYNC, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("bind-address", |name| { + glib::ParamSpec::string( + name, + "Bind Address", + "Address to bind the socket to", + Some(DEFAULT_BIND_ADDRESS), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("bind-port", |name| { + glib::ParamSpec::uint( + name, + "Bind Port", + "Port to bind the socket to", + 0, + u16::MAX as u32, + DEFAULT_BIND_PORT, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("bind-address-v6", |name| { + glib::ParamSpec::string( + name, + "Bind Address V6", + "Address to bind the V6 socket to", + Some(DEFAULT_BIND_ADDRESS), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("bind-port-v6", |name| { + glib::ParamSpec::uint( + name, + "Bind Port", + "Port to bind the V6 socket to", + 0, + u16::MAX as u32, + DEFAULT_BIND_PORT, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("socket", |name| { + glib::ParamSpec::object( + name, + "Socket", + "Socket to use for UDP transmission. (None == allocate)", + gio::Socket::static_type(), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("used-socket", |name| { + glib::ParamSpec::object( + name, + "Used Socket", + "Socket currently in use for UDP transmission. (None = no socket)", + gio::Socket::static_type(), + glib::ParamFlags::READABLE, + ) + }), + subclass::Property("socket-v6", |name| { + glib::ParamSpec::object( + name, + "Socket V6", + "IPV6 Socket to use for UDP transmission. (None == allocate)", + gio::Socket::static_type(), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("used-socket-v6", |name| { + glib::ParamSpec::object( + name, + "Used Socket V6", + "V6 Socket currently in use for UDP transmission. (None = no socket)", + gio::Socket::static_type(), + glib::ParamFlags::READABLE, + ) + }), + subclass::Property("auto-multicast", |name| { + glib::ParamSpec::boolean( + name, + "Auto multicast", + "Automatically join/leave the multicast groups, FALSE means user has to do it himself", + DEFAULT_AUTO_MULTICAST, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("loop", |name| { + glib::ParamSpec::boolean( + name, + "Loop", + "Set the multicast loop parameter.", + DEFAULT_LOOP, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("ttl", |name| { + glib::ParamSpec::uint( + name, + "Time To Live", + "Used for setting the unicast TTL parameter", + 0, + u8::MAX as u32, + DEFAULT_TTL, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("ttl-mc", |name| { + glib::ParamSpec::uint( + name, + "Time To Live Multicast", + "Used for setting the multicast TTL parameter", + 0, + u8::MAX as u32, + DEFAULT_TTL_MC, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("qos-dscp", |name| { + glib::ParamSpec::int( + name, + "QoS DSCP", + "Quality of Service, differentiated services code point (-1 default)", + -1, + 63, + DEFAULT_QOS_DSCP, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("clients", |name| { + glib::ParamSpec::string( + name, + "Clients", + "A comma separated list of host:port pairs with destinations", + Some(DEFAULT_CLIENTS), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context", |name| { + glib::ParamSpec::string( + name, + "Context", + "Context name to share threads with", + Some(DEFAULT_CONTEXT), + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("context-wait", |name| { + glib::ParamSpec::uint( + name, + "Context Wait", + "Throttle poll loop to run at most once every this many ms", + 0, + 1000, + DEFAULT_CONTEXT_WAIT, + glib::ParamFlags::READWRITE, + ) + }), +]; + +#[derive(Clone, Debug)] +struct UdpSinkPadHandler; + +impl PadSinkHandler for UdpSinkPadHandler { + type ElementImpl = UdpSink; + + fn sink_chain( + &self, + _pad: PadSinkRef, + _udpsink: &UdpSink, + element: &gst::Element, + buffer: gst::Buffer, + ) -> BoxFuture<'static, Result> { + let element = element.clone(); + + async move { + let udpsink = UdpSink::from_instance(&element); + + udpsink.render(&element, &buffer).await + } + .boxed() + } + + fn sink_chain_list( + &self, + _pad: PadSinkRef, + _udpsink: &UdpSink, + element: &gst::Element, + list: gst::BufferList, + ) -> BoxFuture<'static, Result> { + let element = element.clone(); + + async move { + let udpsink = UdpSink::from_instance(&element); + + for buffer in list.iter() { + udpsink.render(&element, buffer).await?; + } + + Ok(gst::FlowSuccess::Ok) + } + .boxed() + } + + fn sink_event( + &self, + pad: PadSinkRef, + udpsink: &UdpSink, + element: &gst::Element, + event: gst::Event, + ) -> Either> { + if event.is_serialized() { + let pad_weak = pad.downgrade(); + let element = element.clone(); + Either::Right( + async move { + let udpsink = UdpSink::from_instance(&element); + let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + gst_log!(CAT, obj: pad.gst_pad(), "Handling event {:?}", event); + + match event.view() { + EventView::Eos(_) => { + let _ = element + .post_message(&gst::Message::new_eos().src(Some(&element)).build()); + } + EventView::Segment(e) => { + let mut state = udpsink.state.lock().await; + state.segment = Some(e.get_segment().clone()); + } + _ => (), + } + + gst_log!(CAT, obj: pad.gst_pad(), "Queuing event {:?}", event); + true + } + .boxed(), + ) + } else { + match event.view() { + EventView::FlushStart(..) => { + let _ = block_on(udpsink.stop(element)); + } + _ => (), + } + + gst_debug!(CAT, obj: pad.gst_pad(), "Fowarding non-serialized event {:?}", event); + + Either::Left(true) + } + } +} + +impl UdpSink { + /* Sends buffer to all clients, abortable */ + async fn render( + &self, + element: &gst::Element, + buffer: &gst::BufferRef, + ) -> Result { + let mut state = self.state.lock().await; + let (render_fut, abort_handle) = abortable(self.do_render(element, buffer)); + state.abort_handle = Some(abort_handle); + + /* Drop our state so we can be interrupted while awaiting */ + drop(state); + + match render_fut.await { + Ok(res) => res, + Err(Aborted) => { + gst_log!(CAT, obj: element, "Aborted render"); + Ok(gst::FlowSuccess::Ok) + } + } + } + + async fn do_render( + &self, + element: &gst::Element, + buffer: &gst::BufferRef, + ) -> Result { + let do_sync = self.settings.lock().await.sync; + + if do_sync { + let state = self.state.lock().await; + if let Some(segment) = &state.segment { + if let Some(segment) = segment.downcast_ref::() { + let rtime = segment.to_running_time(buffer.get_pts()); + drop(state); + self.sync(&element, rtime).await; + } + } + } + + gst_log!(CAT, obj: element, "Handling buffer {:?}", &buffer); + + let clients = self.state.lock().await.clients.clone(); + let data = buffer.map_readable().map_err(|_| { + gst_element_error!( + element, + gst::StreamError::Format, + ["Failed to map buffer readable"] + ); + + gst::FlowError::Error + })?; + + let (socket, socket_v6) = { + let state = self.state.lock().await; + (Arc::clone(&state.socket), Arc::clone(&state.socket_v6)) + }; + + for client in &clients { + let socket = match client.ip() { + IpAddr::V4(_) => &socket, + IpAddr::V6(_) => &socket_v6, + }; + + if let Some(socket) = socket.lock().await.as_mut() { + gst_log!(CAT, obj: element, "Sending to {:?}", &client); + socket.send_to(&data, client).await.map_err(|err| { + gst_element_error!( + element, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); + gst::FlowError::Error + })?; + } + } + + gst_log!( + CAT, + obj: element, + "Sent buffer {:?} to all clients", + &buffer + ); + + Ok(gst::FlowSuccess::Ok) + } + + /* Wait until specified time */ + async fn sync(&self, element: &gst::Element, running_time: gst::ClockTime) { + let now = super::get_current_running_time(&element); + + if now < running_time { + let state = self.state.lock().await; + let context = state.context.as_ref().expect("No context was prepared"); + let (sender, receiver) = oneshot::channel(); + let delay = running_time - now; + let delay_for_fut = + context.delay_for(Duration::from_nanos(delay.nseconds().unwrap()), move || { + async { + let _ = sender.send(()); + } + }); + context.spawn(delay_for_fut); + drop(state); + let _ = receiver.await; + } + } + + async fn prepare_socket_family( + &self, + element: &gst::Element, + ipv6: bool, + ) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.lock().await; + let mut settings = self.settings.lock().await; + + let socket = if let Some(ref wrapped_socket) = if ipv6 { + &settings.socket_v6 + } else { + &settings.socket + } { + use std::net::UdpSocket; + + let socket: UdpSocket; + + #[cfg(unix)] + { + socket = wrapped_socket.get() + } + #[cfg(windows)] + { + socket = wrapped_socket.get() + } + + let socket = tokio::net::UdpSocket::from_std(socket).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup socket for tokio: {}", err] + ) + })?; + + if ipv6 { + settings.used_socket_v6 = Some(wrapped_socket.clone()); + } else { + settings.used_socket = Some(wrapped_socket.clone()); + } + + socket + } else { + let bind_addr = if ipv6 { + &settings.bind_address_v6 + } else { + &settings.bind_address + }; + + let bind_addr: IpAddr = bind_addr.parse().map_err(|err| { + gst_error_msg!( + gst::ResourceError::Settings, + ["Invalid address '{}' set: {}", bind_addr, err] + ) + })?; + + let bind_port = if ipv6 { + settings.bind_port_v6 + } else { + settings.bind_port + }; + + let saddr = SocketAddr::new(bind_addr, bind_port as u16); + gst_debug!(CAT, obj: element, "Binding to {:?}", saddr); + + let builder = if ipv6 { + net2::UdpBuilder::new_v6() + } else { + net2::UdpBuilder::new_v4() + } + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to create socket: {}", err] + ) + })?; + + let socket = builder.bind(&saddr).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to bind socket: {}", err] + ) + })?; + + let socket = tokio::net::UdpSocket::from_std(socket).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to setup socket for tokio: {}", err] + ) + })?; + + let wrapper = wrap_socket(&socket)?; + + if settings.qos_dscp != -1 { + wrapper.set_tos(settings.qos_dscp).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set QoS DSCP: {}", err] + ) + })?; + } + + if ipv6 { + settings.used_socket_v6 = Some(wrapper); + } else { + settings.used_socket = Some(wrapper); + } + + socket + }; + + if ipv6 { + state.socket_v6 = Arc::new(Mutex::new(Some(socket))); + } else { + state.socket = Arc::new(Mutex::new(Some(socket))); + } + + Ok(()) + } + + async fn configure_client( + &self, + client: &SocketAddr, + state: &MutexGuard<'_, State>, + ) -> Result<(), gst::ErrorMessage> { + let settings = self.settings.lock().await; + + if client.ip().is_multicast() { + match client.ip() { + IpAddr::V4(addr) => { + if let Some(socket) = state.socket.lock().await.as_mut() { + if settings.auto_multicast { + socket + .join_multicast_v4(addr, Ipv4Addr::new(0, 0, 0, 0)) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to join multicast group: {}", err] + ) + })?; + } + if settings.multicast_loop { + socket.set_multicast_loop_v4(true).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop: {}", err] + ) + })?; + } + socket + .set_multicast_ttl_v4(settings.ttl_mc) + .map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast ttl: {}", err] + ) + })?; + } + } + IpAddr::V6(addr) => { + if let Some(socket) = state.socket_v6.lock().await.as_mut() { + if settings.auto_multicast { + socket.join_multicast_v6(&addr, 0).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to join multicast group: {}", err] + ) + })?; + } + if settings.multicast_loop { + socket.set_multicast_loop_v6(true).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set multicast loop: {}", err] + ) + })?; + } + /* FIXME no API for set_multicast_ttl_v6 ? */ + } + } + } + } else { + match client.ip() { + IpAddr::V4(_) => { + if let Some(socket) = state.socket.lock().await.as_mut() { + socket.set_ttl(settings.ttl).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl: {}", err] + ) + })?; + } + } + IpAddr::V6(_) => { + if let Some(socket) = state.socket_v6.lock().await.as_mut() { + socket.set_ttl(settings.ttl).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to set unicast ttl: {}", err] + ) + })?; + } + } + } + } + + Ok(()) + } + + async fn prepare_sockets(element: gst::Element) -> Result<(), gst::ErrorMessage> { + let this = Self::from_instance(&element); + + this.prepare_socket_family(&element, false).await?; + this.prepare_socket_family(&element, true).await?; + + let state = this.state.lock().await; + + for client in &state.clients { + this.configure_client(&client, &state).await?; + } + + Ok(()) + } + + async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Preparing"); + + let mut state = self.state.lock().await; + + let context = { + let settings = self.settings.lock().await.clone(); + + Context::acquire(&settings.context, settings.context_wait).map_err(|err| { + gst_error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to acquire Context: {}", err] + ) + })? + }; + + *self.preparation_set.lock().await = Some(PreparationSet { + join_handle: context.spawn(Self::prepare_sockets(element.clone())), + }); + + state.context = Some(context); + + gst_debug!(CAT, obj: element, "Started preparing"); + + Ok(()) + } + + async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + gst_debug!(CAT, obj: element, "Completing preparation"); + + let PreparationSet { join_handle } = self + .preparation_set + .lock() + .await + .take() + .expect("preparation_set already taken"); + + join_handle + .await + .expect("The socket preparation has panicked")?; + + self.sink_pad.prepare(&UdpSinkPadHandler).await; + + gst_debug!(CAT, obj: element, "Preparation completed"); + + Ok(()) + } + + async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { + let mut state = self.state.lock().await; + gst_debug!(CAT, obj: element, "Unpreparing"); + + *state = State::default(); + + self.sink_pad.unprepare().await; + + gst_debug!(CAT, obj: element, "Unprepared"); + Ok(()) + } + + async fn stop(&self, element: &gst::Element) -> Result<(), ()> { + gst_debug!(CAT, obj: element, "Stopping"); + + let mut state = self.state.lock().await; + + if let Some(abort_handle) = state.abort_handle.take() { + abort_handle.abort(); + } + + gst_debug!(CAT, obj: element, "Stopped"); + + Ok(()) + } + + fn clear_clients( + &self, + element: &gst::Element, + state: &mut MutexGuard<'_, State>, + settings: &MutexGuard<'_, Settings>, + ) { + state.clients.clear(); + + if let Some(host) = &settings.host { + self.add_client(&element, state, &host, settings.port as u16); + } + } + + fn remove_client( + &self, + element: &gst::Element, + state: &mut MutexGuard<'_, State>, + host: &str, + port: u16, + ) { + let addr: IpAddr = match host.parse() { + Err(err) => { + gst_error!(CAT, obj: element, "Failed to parse host {}: {}", host, err); + return; + } + Ok(addr) => addr, + }; + let addr = SocketAddr::new(addr, port); + + gst_info!(CAT, obj: element, "Removing client {:?}", addr); + + state.clients.retain(|addr2| addr != *addr2); + } + + fn add_client( + &self, + element: &gst::Element, + state: &mut MutexGuard<'_, State>, + host: &str, + port: u16, + ) { + let addr: IpAddr = match host.parse() { + Err(err) => { + gst_error!(CAT, obj: element, "Failed to parse host {}: {}", host, err); + return; + } + Ok(addr) => addr, + }; + let addr = SocketAddr::new(addr, port); + + let _ = self.configure_client(&addr, &state); + + if !state.clients.contains(&addr) { + gst_info!(CAT, obj: element, "Adding client {:?}", addr); + state.clients.push(addr); + } else { + gst_warning!(CAT, obj: element, "Not adding client {:?} again", &addr); + } + } +} + +impl ObjectSubclass for UdpSink { + const NAME: &'static str = "RsTsUdpSink"; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib_object_subclass!(); + + fn class_init(klass: &mut subclass::simple::ClassStruct) { + klass.set_metadata( + "Thread-sharing UDP sink", + "Sink/Network", + "Thread-sharing UDP sink", + "Mathieu ", + ); + + let caps = gst::Caps::new_any(); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); + klass.add_signal_with_class_handler( + "add", + glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION, + &[String::static_type(), i32::static_type()], + glib::types::Type::Unit, + |_, args| { + let element = args[0] + .get::() + .expect("signal arg") + .expect("missing signal arg"); + let host = args[1] + .get::() + .expect("signal arg") + .expect("missing signal arg"); + let port = args[2] + .get::() + .expect("signal arg") + .expect("missing signal arg"); + + let udpsink = Self::from_instance(&element); + let mut state = block_on(udpsink.state.lock()); + + udpsink.add_client(&element, &mut state, &host, port as u16); + + None + }, + ); + + klass.add_signal_with_class_handler( + "remove", + glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION, + &[String::static_type(), i32::static_type()], + glib::types::Type::Unit, + |_, args| { + let element = args[0] + .get::() + .expect("signal arg") + .expect("missing signal arg"); + let host = args[1] + .get::() + .expect("signal arg") + .expect("missing signal arg"); + let port = args[2] + .get::() + .expect("signal arg") + .expect("missing signal arg"); + + let udpsink = Self::from_instance(&element); + + let settings = block_on(udpsink.settings.lock()); + let mut state = block_on(udpsink.state.lock()); + + if Some(&host) != settings.host.as_ref() || port != settings.port as i32 { + udpsink.remove_client(&element, &mut state, &host, port as u16); + } + + None + }, + ); + + klass.add_signal_with_class_handler( + "clear", + glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION, + &[], + glib::types::Type::Unit, + |_, args| { + let element = args[0] + .get::() + .expect("signal arg") + .expect("missing signal arg"); + + let udpsink = Self::from_instance(&element); + let mut state = block_on(udpsink.state.lock()); + let settings = block_on(udpsink.settings.lock()); + + udpsink.clear_clients(&element, &mut state, &settings); + + None + }, + ); + + klass.install_properties(&PROPERTIES); + } + + fn new_with_class(klass: &subclass::simple::ClassStruct) -> Self { + let templ = klass.get_pad_template("sink").unwrap(); + let sink_pad = PadSink::new_from_template(&templ, Some("sink")); + + Self { + sink_pad, + state: Mutex::new(State::default()), + settings: Mutex::new(Settings::default()), + preparation_set: Mutex::new(None), + } + } +} + +impl ObjectImpl for UdpSink { + glib_object_impl!(); + + fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; + let element = obj.downcast_ref::().unwrap(); + + match *prop { + subclass::Property("host", ..) => { + let mut settings = block_on(self.settings.lock()); + let mut state = block_on(self.state.lock()); + if let Some(host) = &settings.host { + self.remove_client(&element, &mut state, &host, settings.port as u16); + } + + settings.host = value.get().expect("type checked upstream"); + + if let Some(host) = &settings.host { + self.add_client(&element, &mut state, &host, settings.port as u16); + } + } + subclass::Property("port", ..) => { + let mut settings = block_on(self.settings.lock()); + let mut state = block_on(self.state.lock()); + if let Some(host) = &settings.host { + self.remove_client(&element, &mut state, &host, settings.port as u16); + } + + settings.port = value.get_some().expect("type checked upstream"); + + if let Some(host) = &settings.host { + self.add_client(&element, &mut state, &host, settings.port as u16); + } + } + subclass::Property("sync", ..) => { + let mut settings = block_on(self.settings.lock()); + + settings.sync = value.get_some().expect("type checked upstream"); + } + subclass::Property("bind-address", ..) => { + let mut settings = block_on(self.settings.lock()); + settings.bind_address = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); + } + subclass::Property("bind-port", ..) => { + let mut settings = block_on(self.settings.lock()); + + settings.bind_port = value.get_some().expect("type checked upstream"); + } + subclass::Property("bind-address-v6", ..) => { + let mut settings = block_on(self.settings.lock()); + settings.bind_address_v6 = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); + } + subclass::Property("bind-port-v6", ..) => { + let mut settings = block_on(self.settings.lock()); + + settings.bind_port_v6 = value.get_some().expect("type checked upstream"); + } + subclass::Property("socket", ..) => { + let mut settings = block_on(self.settings.lock()); + settings.socket = value + .get::() + .expect("type checked upstream") + .map(|socket| GioSocketWrapper::new(&socket)); + } + subclass::Property("used-socket", ..) => { + unreachable!(); + } + subclass::Property("socket-v6", ..) => { + let mut settings = block_on(self.settings.lock()); + settings.socket_v6 = value + .get::() + .expect("type checked upstream") + .map(|socket| GioSocketWrapper::new(&socket)); + } + subclass::Property("used-socket-v6", ..) => { + unreachable!(); + } + subclass::Property("auto-multicast", ..) => { + let mut settings = block_on(self.settings.lock()); + + settings.auto_multicast = value.get_some().expect("type checked upstream"); + } + subclass::Property("loop", ..) => { + let mut settings = block_on(self.settings.lock()); + + settings.multicast_loop = value.get_some().expect("type checked upstream"); + } + subclass::Property("ttl", ..) => { + let mut settings = block_on(self.settings.lock()); + + settings.ttl = value.get_some().expect("type checked upstream"); + } + subclass::Property("ttl-mc", ..) => { + let mut settings = block_on(self.settings.lock()); + + settings.ttl_mc = value.get_some().expect("type checked upstream"); + } + subclass::Property("qos-dscp", ..) => { + let mut settings = block_on(self.settings.lock()); + + settings.qos_dscp = value.get_some().expect("type checked upstream"); + } + subclass::Property("clients", ..) => { + let clients: String = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); + let mut state = block_on(self.state.lock()); + let clients = clients.split(','); + let settings = block_on(self.settings.lock()); + self.clear_clients(element, &mut state, &settings); + + for client in clients { + let split: Vec<&str> = client.rsplitn(2, ':').collect(); + + if split.len() == 2 { + match split[0].parse::() { + Ok(port) => self.add_client(element, &mut state, split[1], port), + Err(_) => (), + } + } + } + } + subclass::Property("context", ..) => { + let mut settings = block_on(self.settings.lock()); + settings.context = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| "".into()); + } + subclass::Property("context-wait", ..) => { + let mut settings = block_on(self.settings.lock()); + settings.context_wait = value.get_some().expect("type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + let prop = &PROPERTIES[id]; + + match *prop { + subclass::Property("host", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.host.to_value()) + } + subclass::Property("port", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.port.to_value()) + } + subclass::Property("sync", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.sync.to_value()) + } + subclass::Property("bind-address", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.bind_address.to_value()) + } + subclass::Property("bind-port", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.bind_port.to_value()) + } + subclass::Property("bind-address-v6", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.bind_address_v6.to_value()) + } + subclass::Property("bind-port-v6", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.bind_port_v6.to_value()) + } + subclass::Property("socket", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings + .socket + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()) + } + subclass::Property("used-socket", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings + .used_socket + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()) + } + subclass::Property("socket-v6", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings + .socket_v6 + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()) + } + subclass::Property("used-socket-v6", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings + .used_socket_v6 + .as_ref() + .map(GioSocketWrapper::as_socket) + .to_value()) + } + subclass::Property("auto-multicast", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.sync.to_value()) + } + subclass::Property("loop", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.multicast_loop.to_value()) + } + subclass::Property("ttl", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.ttl.to_value()) + } + subclass::Property("ttl-mc", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.ttl_mc.to_value()) + } + subclass::Property("qos-dscp", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.qos_dscp.to_value()) + } + subclass::Property("clients", ..) => { + let state = block_on(self.state.lock()); + + let clients: Vec = + state.clients.iter().map(|addr| addr.to_string()).collect(); + Ok(clients.join(",").to_value()) + } + subclass::Property("context", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.context.to_value()) + } + subclass::Property("context-wait", ..) => { + let settings = block_on(self.settings.lock()); + Ok(settings.context_wait.to_value()) + } + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let element = obj.downcast_ref::().unwrap(); + element.add_pad(self.sink_pad.gst_pad()).unwrap(); + + super::set_element_flags(element, gst::ElementFlags::SINK); + } +} + +impl ElementImpl for UdpSink { + fn change_state( + &self, + element: &gst::Element, + transition: gst::StateChange, + ) -> Result { + gst_trace!(CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::NullToReady => { + block_on(self.prepare(element)).map_err(|err| { + element.post_error_message(&err); + gst::StateChangeError + })?; + } + gst::StateChange::ReadyToPaused => { + block_on(self.complete_preparation(element)).map_err(|err| { + element.post_error_message(&err); + gst::StateChangeError + })?; + } + gst::StateChange::PausedToReady => { + block_on(self.stop(element)).map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::ReadyToNull => { + block_on(self.unprepare(element)).map_err(|_| gst::StateChangeError)?; + } + _ => (), + } + + self.parent_change_state(element, transition) + } +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "ts-udpsink", + gst::Rank::None, + UdpSink::get_type(), + ) +} diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index f64cea0d..3178563a 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -22,7 +22,6 @@ use futures::lock::Mutex; use futures::prelude::*; use gio; -use gio_sys as gio_ffi; use glib; use glib::prelude::*; @@ -30,8 +29,6 @@ use glib::subclass; use glib::subclass::prelude::*; use glib::{glib_object_impl, glib_object_subclass}; -use gobject_sys as gobject_ffi; - use gst; use gst::prelude::*; use gst::subclass::prelude::*; @@ -47,16 +44,10 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::{self, Arc}; use std::u16; -#[cfg(unix)] -use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; - -#[cfg(windows)] -use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; - use crate::runtime::prelude::*; use crate::runtime::{self, Context, JoinHandle, PadSrc, PadSrcRef}; -use super::socket::{Socket, SocketRead, SocketStream}; +use super::socket::{wrap_socket, GioSocketWrapper, Socket, SocketRead, SocketStream}; const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1"); const DEFAULT_PORT: u32 = 5000; @@ -69,98 +60,6 @@ const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: u32 = 0; const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true; -// Send/Sync struct for passing around a gio::Socket -// and getting the raw fd from it -// -// gio::Socket is not Send/Sync as it's generally unsafe -// to access it from multiple threads. Getting the underlying raw -// fd is safe though, as is receiving/sending from two different threads -#[derive(Debug)] -struct GioSocketWrapper { - socket: *mut gio_ffi::GSocket, -} - -unsafe impl Send for GioSocketWrapper {} -unsafe impl Sync for GioSocketWrapper {} - -impl GioSocketWrapper { - fn new(socket: &gio::Socket) -> Self { - use glib::translate::*; - - Self { - socket: socket.to_glib_full(), - } - } - - fn as_socket(&self) -> gio::Socket { - unsafe { - use glib::translate::*; - - from_glib_none(self.socket) - } - } - - #[cfg(unix)] - fn get(&self) -> T { - unsafe { FromRawFd::from_raw_fd(libc::dup(gio_ffi::g_socket_get_fd(self.socket))) } - } - - #[cfg(windows)] - fn get(&self) -> T { - unsafe { - FromRawSocket::from_raw_socket( - dup_socket(gio_ffi::g_socket_get_fd(self.socket) as _) as _ - ) - } - } -} - -impl Clone for GioSocketWrapper { - fn clone(&self) -> Self { - Self { - socket: unsafe { gobject_ffi::g_object_ref(self.socket as *mut _) as *mut _ }, - } - } -} - -impl Drop for GioSocketWrapper { - fn drop(&mut self) { - unsafe { - gobject_ffi::g_object_unref(self.socket as *mut _); - } - } -} - -#[cfg(windows)] -unsafe fn dup_socket(socket: usize) -> usize { - use std::mem; - use winapi::shared::ws2def; - use winapi::um::processthreadsapi; - use winapi::um::winsock2; - - let mut proto_info = mem::MaybeUninit::uninit(); - let ret = winsock2::WSADuplicateSocketA( - socket, - processthreadsapi::GetCurrentProcessId(), - proto_info.as_mut_ptr(), - ); - assert_eq!(ret, 0); - let mut proto_info = proto_info.assume_init(); - - let socket = winsock2::WSASocketA( - ws2def::AF_INET, - ws2def::SOCK_DGRAM, - ws2def::IPPROTO_UDP as i32, - &mut proto_info, - 0, - 0, - ); - - assert_ne!(socket, winsock2::INVALID_SOCKET); - - socket -} - #[derive(Debug, Clone)] struct Settings { address: Option, @@ -811,57 +710,7 @@ impl UdpSrc { } } - // Store the socket as used-socket in the settings - #[cfg(unix)] - unsafe { - let fd = libc::dup(socket.as_raw_fd()); - - // This is unsafe because it allows us to share the fd between the socket and the - // GIO socket below, but safety of this is the job of the application - struct FdConverter(RawFd); - impl IntoRawFd for FdConverter { - fn into_raw_fd(self) -> RawFd { - self.0 - } - } - - let fd = FdConverter(fd); - - let gio_socket = gio::Socket::new_from_fd(fd).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to create wrapped GIO socket: {}", err] - ) - })?; - let wrapper = GioSocketWrapper::new(&gio_socket); - settings.used_socket = Some(wrapper); - } - #[cfg(windows)] - unsafe { - // FIXME: Needs https://github.com/tokio-rs/tokio/pull/806 - // and https://github.com/carllerche/mio/pull/859 - let fd = unreachable!(); //dup_socket(socket.as_raw_socket() as _) as _; - - // This is unsafe because it allows us to share the fd between the socket and the - // GIO socket below, but safety of this is the job of the application - struct SocketConverter(RawSocket); - impl IntoRawSocket for SocketConverter { - fn into_raw_socket(self) -> RawSocket { - self.0 - } - } - - let fd = SocketConverter(fd); - - let gio_socket = gio::Socket::new_from_socket(fd).map_err(|err| { - gst_error_msg!( - gst::ResourceError::OpenRead, - ["Failed to create wrapped GIO socket: {}", err] - ) - })?; - let wrapper = GioSocketWrapper::new(&gio_socket); - settings.used_socket = Some(wrapper); - } + settings.used_socket = Some(wrap_socket(&socket)?); socket }; diff --git a/gst-plugin-threadshare/tests/udpsink.rs b/gst-plugin-threadshare/tests/udpsink.rs new file mode 100644 index 00000000..420b34fe --- /dev/null +++ b/gst-plugin-threadshare/tests/udpsink.rs @@ -0,0 +1,160 @@ +// Copyright (C) 2019 Mathieu Duponchelle +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use std::thread; + +use glib; +use glib::prelude::*; + +use gst; +use gst_check; + +use gstthreadshare; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstthreadshare::plugin_register_static().expect("gstthreadshare udpsrc test"); + }); +} + +#[test] +fn test_client_management() { + init(); + + let h = gst_check::Harness::new("ts-udpsink"); + let udpsink = h.get_element().unwrap(); + + let clients = udpsink + .get_property("clients") + .unwrap() + .get::() + .unwrap() + .unwrap(); + + assert_eq!(clients, "127.0.0.1:5000"); + + udpsink.emit("add", &[&"192.168.1.1", &57]).unwrap(); + let clients = udpsink + .get_property("clients") + .unwrap() + .get::() + .unwrap() + .unwrap(); + assert_eq!(clients, "127.0.0.1:5000,192.168.1.1:57"); + + /* Adding a client twice is not supported */ + udpsink.emit("add", &[&"192.168.1.1", &57]).unwrap(); + let clients = udpsink + .get_property("clients") + .unwrap() + .get::() + .unwrap() + .unwrap(); + assert_eq!(clients, "127.0.0.1:5000,192.168.1.1:57"); + + udpsink.emit("remove", &[&"192.168.1.1", &57]).unwrap(); + let clients = udpsink + .get_property("clients") + .unwrap() + .get::() + .unwrap() + .unwrap(); + assert_eq!(clients, "127.0.0.1:5000"); + + /* Removing a non-existing client should not be a problem */ + udpsink.emit("remove", &[&"192.168.1.1", &57]).unwrap(); + let clients = udpsink + .get_property("clients") + .unwrap() + .get::() + .unwrap() + .unwrap(); + assert_eq!(clients, "127.0.0.1:5000"); + + /* While the default host:address client is listed in clients, + * it can't be removed with the remove signal */ + udpsink.emit("remove", &[&"127.0.0.1", &5000]).unwrap(); + let clients = udpsink + .get_property("clients") + .unwrap() + .get::() + .unwrap() + .unwrap(); + assert_eq!(clients, "127.0.0.1:5000"); + + /* It is however possible to remove the default client by setting + * host to None */ + let host: Option = None; + udpsink.set_property("host", &host).unwrap(); + let clients = udpsink + .get_property("clients") + .unwrap() + .get::() + .unwrap() + .unwrap(); + assert_eq!(clients, ""); + + /* The client properties is writable too */ + udpsink + .set_property("clients", &"127.0.0.1:5000,192.168.1.1:57") + .unwrap(); + let clients = udpsink + .get_property("clients") + .unwrap() + .get::() + .unwrap() + .unwrap(); + assert_eq!(clients, "127.0.0.1:5000,192.168.1.1:57"); + + udpsink.emit("clear", &[]).unwrap(); + let clients = udpsink + .get_property("clients") + .unwrap() + .get::() + .unwrap() + .unwrap(); + assert_eq!(clients, ""); +} + +#[test] +fn test_chain() { + init(); + + let mut h = gst_check::Harness::new("ts-udpsink"); + h.set_src_caps_str(&"foo/bar"); + + thread::spawn(move || { + use std::net; + use std::time; + + thread::sleep(time::Duration::from_millis(50)); + + let socket = net::UdpSocket::bind("127.0.0.1:5000").unwrap(); + let mut buf = [0; 5]; + let (amt, _) = socket.recv_from(&mut buf).unwrap(); + + assert!(amt == 4); + assert!(buf == [42, 43, 44, 45, 0]); + }); + + let buf = gst::Buffer::from_slice(&[42, 43, 44, 45]); + assert!(h.push(buf) == Ok(gst::FlowSuccess::Ok)); +}