2019-12-23 15:23:38 +00:00
|
|
|
// Copyright (C) 2019 Mathieu Duponchelle <mathieu@centricular.com>
|
|
|
|
//
|
|
|
|
// This library is free software; you can redistribute it and/or
|
|
|
|
// modify it under the terms of the GNU Library General Public
|
|
|
|
// License as published by the Free Software Foundation; either
|
|
|
|
// version 2 of the License, or (at your option) any later version.
|
|
|
|
//
|
|
|
|
// This library is distributed in the hope that it will be useful,
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
|
|
// Library General Public License for more details.
|
|
|
|
//
|
|
|
|
// You should have received a copy of the GNU Library General Public
|
|
|
|
// License along with this library; if not, write to the
|
|
|
|
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
|
|
|
|
// Boston, MA 02110-1335, USA.
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
use futures::channel::mpsc;
|
2019-12-23 15:23:38 +00:00
|
|
|
use futures::future::BoxFuture;
|
2020-03-15 11:13:26 +00:00
|
|
|
use futures::lock::Mutex;
|
2019-12-23 15:23:38 +00:00
|
|
|
use futures::prelude::*;
|
|
|
|
|
2020-07-26 15:33:14 +00:00
|
|
|
use glib::glib_object_subclass;
|
2019-12-23 15:23:38 +00:00
|
|
|
use glib::prelude::*;
|
|
|
|
use glib::subclass;
|
|
|
|
use glib::subclass::prelude::*;
|
|
|
|
|
|
|
|
use gst::prelude::*;
|
|
|
|
use gst::subclass::prelude::*;
|
|
|
|
use gst::EventView;
|
|
|
|
use gst::{
|
|
|
|
gst_debug, gst_element_error, gst_error, gst_error_msg, gst_info, gst_log, gst_trace,
|
|
|
|
gst_warning,
|
|
|
|
};
|
|
|
|
|
|
|
|
use lazy_static::lazy_static;
|
|
|
|
|
|
|
|
use crate::runtime::prelude::*;
|
2020-03-26 18:07:25 +00:00
|
|
|
use crate::runtime::{self, Context, PadSink, PadSinkRef, Task};
|
2019-12-23 15:23:38 +00:00
|
|
|
use crate::socket::{wrap_socket, GioSocketWrapper};
|
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
use std::convert::TryInto;
|
2020-03-15 11:13:26 +00:00
|
|
|
use std::mem;
|
2019-12-23 15:23:38 +00:00
|
|
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
2020-03-26 18:07:25 +00:00
|
|
|
use std::string::ToString;
|
2020-03-15 11:13:26 +00:00
|
|
|
use std::sync::Mutex as StdMutex;
|
2020-03-26 18:07:25 +00:00
|
|
|
use std::sync::{Arc, RwLock};
|
2019-12-23 15:23:38 +00:00
|
|
|
use std::time::Duration;
|
|
|
|
use std::u16;
|
|
|
|
use std::u8;
|
|
|
|
|
|
|
|
const DEFAULT_HOST: Option<&str> = Some("127.0.0.1");
|
2020-04-27 09:22:26 +00:00
|
|
|
const DEFAULT_PORT: i32 = 5004;
|
2019-12-23 15:23:38 +00:00
|
|
|
const DEFAULT_SYNC: bool = true;
|
|
|
|
const DEFAULT_BIND_ADDRESS: &str = "0.0.0.0";
|
2020-04-27 09:22:26 +00:00
|
|
|
const DEFAULT_BIND_PORT: i32 = 0;
|
2019-12-23 15:23:38 +00:00
|
|
|
const DEFAULT_BIND_ADDRESS_V6: &str = "::";
|
2020-04-27 09:22:26 +00:00
|
|
|
const DEFAULT_BIND_PORT_V6: i32 = 0;
|
2019-12-23 15:23:38 +00:00
|
|
|
const DEFAULT_SOCKET: Option<GioSocketWrapper> = None;
|
|
|
|
const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None;
|
|
|
|
const DEFAULT_SOCKET_V6: Option<GioSocketWrapper> = None;
|
|
|
|
const DEFAULT_USED_SOCKET_V6: Option<GioSocketWrapper> = None;
|
|
|
|
const DEFAULT_AUTO_MULTICAST: bool = true;
|
|
|
|
const DEFAULT_LOOP: bool = true;
|
|
|
|
const DEFAULT_TTL: u32 = 64;
|
|
|
|
const DEFAULT_TTL_MC: u32 = 1;
|
|
|
|
const DEFAULT_QOS_DSCP: i32 = -1;
|
|
|
|
const DEFAULT_CLIENTS: &str = "";
|
|
|
|
const DEFAULT_CONTEXT: &str = "";
|
|
|
|
const DEFAULT_CONTEXT_WAIT: u32 = 0;
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
struct Settings {
|
|
|
|
sync: bool,
|
|
|
|
bind_address: String,
|
2020-04-27 09:22:26 +00:00
|
|
|
bind_port: i32,
|
2019-12-23 15:23:38 +00:00
|
|
|
bind_address_v6: String,
|
2020-04-27 09:22:26 +00:00
|
|
|
bind_port_v6: i32,
|
2019-12-23 15:23:38 +00:00
|
|
|
socket: Option<GioSocketWrapper>,
|
|
|
|
used_socket: Option<GioSocketWrapper>,
|
|
|
|
socket_v6: Option<GioSocketWrapper>,
|
|
|
|
used_socket_v6: Option<GioSocketWrapper>,
|
|
|
|
auto_multicast: bool,
|
|
|
|
multicast_loop: bool,
|
|
|
|
ttl: u32,
|
|
|
|
ttl_mc: u32,
|
|
|
|
qos_dscp: i32,
|
|
|
|
context: String,
|
|
|
|
context_wait: u32,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Settings {
|
|
|
|
fn default() -> Self {
|
|
|
|
Settings {
|
|
|
|
sync: DEFAULT_SYNC,
|
|
|
|
bind_address: DEFAULT_BIND_ADDRESS.into(),
|
|
|
|
bind_port: DEFAULT_BIND_PORT,
|
|
|
|
bind_address_v6: DEFAULT_BIND_ADDRESS_V6.into(),
|
|
|
|
bind_port_v6: DEFAULT_BIND_PORT_V6,
|
|
|
|
socket: DEFAULT_SOCKET,
|
|
|
|
used_socket: DEFAULT_USED_SOCKET,
|
|
|
|
socket_v6: DEFAULT_SOCKET_V6,
|
|
|
|
used_socket_v6: DEFAULT_USED_SOCKET_V6,
|
|
|
|
auto_multicast: DEFAULT_AUTO_MULTICAST,
|
|
|
|
multicast_loop: DEFAULT_LOOP,
|
|
|
|
ttl: DEFAULT_TTL,
|
|
|
|
ttl_mc: DEFAULT_TTL_MC,
|
|
|
|
qos_dscp: DEFAULT_QOS_DSCP,
|
|
|
|
context: DEFAULT_CONTEXT.into(),
|
|
|
|
context_wait: DEFAULT_CONTEXT_WAIT,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
lazy_static! {
|
|
|
|
static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
|
|
|
|
"ts-udpsink",
|
|
|
|
gst::DebugColorFlags::empty(),
|
|
|
|
Some("Thread-sharing UDP sink"),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2020-08-11 08:45:17 +00:00
|
|
|
static PROPERTIES: [subclass::Property; 17] = [
|
2019-12-23 15:23:38 +00:00
|
|
|
subclass::Property("sync", |name| {
|
|
|
|
glib::ParamSpec::boolean(
|
|
|
|
name,
|
|
|
|
"Sync",
|
|
|
|
"Sync on the clock",
|
|
|
|
DEFAULT_SYNC,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("bind-address", |name| {
|
|
|
|
glib::ParamSpec::string(
|
|
|
|
name,
|
|
|
|
"Bind Address",
|
|
|
|
"Address to bind the socket to",
|
|
|
|
Some(DEFAULT_BIND_ADDRESS),
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("bind-port", |name| {
|
2020-04-27 09:22:26 +00:00
|
|
|
glib::ParamSpec::int(
|
2019-12-23 15:23:38 +00:00
|
|
|
name,
|
|
|
|
"Bind Port",
|
|
|
|
"Port to bind the socket to",
|
|
|
|
0,
|
2020-04-27 09:22:26 +00:00
|
|
|
u16::MAX as i32,
|
2019-12-23 15:23:38 +00:00
|
|
|
DEFAULT_BIND_PORT,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("bind-address-v6", |name| {
|
|
|
|
glib::ParamSpec::string(
|
|
|
|
name,
|
|
|
|
"Bind Address V6",
|
|
|
|
"Address to bind the V6 socket to",
|
2020-01-16 11:48:22 +00:00
|
|
|
Some(DEFAULT_BIND_ADDRESS_V6),
|
2019-12-23 15:23:38 +00:00
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("bind-port-v6", |name| {
|
2020-04-27 09:22:26 +00:00
|
|
|
glib::ParamSpec::int(
|
2019-12-23 15:23:38 +00:00
|
|
|
name,
|
|
|
|
"Bind Port",
|
|
|
|
"Port to bind the V6 socket to",
|
|
|
|
0,
|
2020-04-27 09:22:26 +00:00
|
|
|
u16::MAX as i32,
|
2020-01-16 11:48:22 +00:00
|
|
|
DEFAULT_BIND_PORT_V6,
|
2019-12-23 15:23:38 +00:00
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("socket", |name| {
|
|
|
|
glib::ParamSpec::object(
|
|
|
|
name,
|
|
|
|
"Socket",
|
|
|
|
"Socket to use for UDP transmission. (None == allocate)",
|
|
|
|
gio::Socket::static_type(),
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("used-socket", |name| {
|
|
|
|
glib::ParamSpec::object(
|
|
|
|
name,
|
|
|
|
"Used Socket",
|
|
|
|
"Socket currently in use for UDP transmission. (None = no socket)",
|
|
|
|
gio::Socket::static_type(),
|
|
|
|
glib::ParamFlags::READABLE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("socket-v6", |name| {
|
|
|
|
glib::ParamSpec::object(
|
|
|
|
name,
|
|
|
|
"Socket V6",
|
|
|
|
"IPV6 Socket to use for UDP transmission. (None == allocate)",
|
|
|
|
gio::Socket::static_type(),
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("used-socket-v6", |name| {
|
|
|
|
glib::ParamSpec::object(
|
|
|
|
name,
|
|
|
|
"Used Socket V6",
|
|
|
|
"V6 Socket currently in use for UDP transmission. (None = no socket)",
|
|
|
|
gio::Socket::static_type(),
|
|
|
|
glib::ParamFlags::READABLE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("auto-multicast", |name| {
|
|
|
|
glib::ParamSpec::boolean(
|
|
|
|
name,
|
|
|
|
"Auto multicast",
|
|
|
|
"Automatically join/leave the multicast groups, FALSE means user has to do it himself",
|
|
|
|
DEFAULT_AUTO_MULTICAST,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("loop", |name| {
|
|
|
|
glib::ParamSpec::boolean(
|
|
|
|
name,
|
|
|
|
"Loop",
|
|
|
|
"Set the multicast loop parameter.",
|
|
|
|
DEFAULT_LOOP,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("ttl", |name| {
|
|
|
|
glib::ParamSpec::uint(
|
|
|
|
name,
|
|
|
|
"Time To Live",
|
|
|
|
"Used for setting the unicast TTL parameter",
|
|
|
|
0,
|
|
|
|
u8::MAX as u32,
|
|
|
|
DEFAULT_TTL,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("ttl-mc", |name| {
|
|
|
|
glib::ParamSpec::uint(
|
|
|
|
name,
|
|
|
|
"Time To Live Multicast",
|
|
|
|
"Used for setting the multicast TTL parameter",
|
|
|
|
0,
|
|
|
|
u8::MAX as u32,
|
|
|
|
DEFAULT_TTL_MC,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("qos-dscp", |name| {
|
|
|
|
glib::ParamSpec::int(
|
|
|
|
name,
|
|
|
|
"QoS DSCP",
|
|
|
|
"Quality of Service, differentiated services code point (-1 default)",
|
|
|
|
-1,
|
|
|
|
63,
|
|
|
|
DEFAULT_QOS_DSCP,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("clients", |name| {
|
|
|
|
glib::ParamSpec::string(
|
|
|
|
name,
|
|
|
|
"Clients",
|
|
|
|
"A comma separated list of host:port pairs with destinations",
|
|
|
|
Some(DEFAULT_CLIENTS),
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("context", |name| {
|
|
|
|
glib::ParamSpec::string(
|
|
|
|
name,
|
|
|
|
"Context",
|
|
|
|
"Context name to share threads with",
|
|
|
|
Some(DEFAULT_CONTEXT),
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("context-wait", |name| {
|
|
|
|
glib::ParamSpec::uint(
|
|
|
|
name,
|
|
|
|
"Context Wait",
|
|
|
|
"Throttle poll loop to run at most once every this many ms",
|
|
|
|
0,
|
|
|
|
1000,
|
|
|
|
DEFAULT_CONTEXT_WAIT,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
];
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
#[derive(Debug)]
|
2020-03-26 18:07:25 +00:00
|
|
|
enum TaskItem {
|
|
|
|
Buffer(gst::Buffer),
|
|
|
|
Event(gst::Event),
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct UdpSinkPadHandlerInner {
|
2020-03-15 11:13:26 +00:00
|
|
|
sync: bool,
|
|
|
|
segment: Option<gst::Segment>,
|
|
|
|
latency: gst::ClockTime,
|
|
|
|
socket: Arc<Mutex<Option<tokio::net::UdpSocket>>>,
|
|
|
|
socket_v6: Arc<Mutex<Option<tokio::net::UdpSocket>>>,
|
|
|
|
clients: Arc<Vec<SocketAddr>>,
|
|
|
|
clients_to_configure: Vec<SocketAddr>,
|
2020-03-16 10:33:18 +00:00
|
|
|
clients_to_unconfigure: Vec<SocketAddr>,
|
2020-03-15 11:13:26 +00:00
|
|
|
sender: Arc<Mutex<Option<mpsc::Sender<TaskItem>>>>,
|
|
|
|
settings: Arc<StdMutex<Settings>>,
|
|
|
|
}
|
2019-12-23 15:23:38 +00:00
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
impl UdpSinkPadHandlerInner {
|
|
|
|
fn new(settings: Arc<StdMutex<Settings>>) -> Self {
|
|
|
|
UdpSinkPadHandlerInner {
|
2020-03-15 11:13:26 +00:00
|
|
|
sync: DEFAULT_SYNC,
|
|
|
|
segment: None,
|
|
|
|
latency: gst::CLOCK_TIME_NONE,
|
|
|
|
socket: Arc::new(Mutex::new(None)),
|
|
|
|
socket_v6: Arc::new(Mutex::new(None)),
|
|
|
|
clients: Arc::new(vec![SocketAddr::new(
|
|
|
|
DEFAULT_HOST.unwrap().parse().unwrap(),
|
|
|
|
DEFAULT_PORT as u16,
|
|
|
|
)]),
|
|
|
|
clients_to_configure: vec![],
|
2020-03-16 10:33:18 +00:00
|
|
|
clients_to_unconfigure: vec![],
|
2020-03-15 11:13:26 +00:00
|
|
|
sender: Arc::new(Mutex::new(None)),
|
2020-03-15 11:40:45 +00:00
|
|
|
settings,
|
2020-03-26 18:07:25 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn clear_clients(
|
|
|
|
&mut self,
|
|
|
|
gst_pad: &gst::Pad,
|
|
|
|
clients_to_add: impl Iterator<Item = SocketAddr>,
|
|
|
|
) {
|
2020-04-02 16:42:00 +00:00
|
|
|
let old_clients = mem::replace(&mut *Arc::make_mut(&mut self.clients), vec![]);
|
2020-03-26 18:07:25 +00:00
|
|
|
|
|
|
|
self.clients_to_configure = vec![];
|
|
|
|
self.clients_to_unconfigure = vec![];
|
|
|
|
|
|
|
|
for addr in clients_to_add {
|
2020-04-02 16:42:00 +00:00
|
|
|
if !old_clients.contains(&addr) {
|
|
|
|
self.clients_to_unconfigure.push(addr);
|
|
|
|
}
|
2020-03-26 18:07:25 +00:00
|
|
|
self.add_client(gst_pad, addr);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn remove_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) {
|
|
|
|
if !self.clients.contains(&addr) {
|
|
|
|
gst_warning!(CAT, obj: gst_pad, "Not removing unknown client {:?}", &addr);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
gst_info!(CAT, obj: gst_pad, "Removing client {:?}", addr);
|
|
|
|
|
|
|
|
Arc::make_mut(&mut self.clients).retain(|addr2| addr != *addr2);
|
|
|
|
|
|
|
|
self.clients_to_unconfigure.push(addr);
|
|
|
|
self.clients_to_configure.retain(|addr2| addr != *addr2);
|
|
|
|
}
|
|
|
|
|
|
|
|
fn add_client(&mut self, gst_pad: &gst::Pad, addr: SocketAddr) {
|
|
|
|
if self.clients.contains(&addr) {
|
|
|
|
gst_warning!(CAT, obj: gst_pad, "Not adding client {:?} again", &addr);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
gst_info!(CAT, obj: gst_pad, "Adding client {:?}", addr);
|
|
|
|
|
|
|
|
Arc::make_mut(&mut self.clients).push(addr);
|
|
|
|
|
|
|
|
self.clients_to_configure.push(addr);
|
|
|
|
self.clients_to_unconfigure.retain(|addr2| addr != *addr2);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
enum SocketQualified {
|
|
|
|
Ipv4(tokio::net::UdpSocket),
|
|
|
|
Ipv6(tokio::net::UdpSocket),
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
struct UdpSinkPadHandler(Arc<RwLock<UdpSinkPadHandlerInner>>);
|
|
|
|
|
|
|
|
impl UdpSinkPadHandler {
|
|
|
|
fn new(settings: Arc<StdMutex<Settings>>) -> UdpSinkPadHandler {
|
|
|
|
Self(Arc::new(RwLock::new(UdpSinkPadHandlerInner::new(settings))))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn set_latency(&self, latency: gst::ClockTime) {
|
|
|
|
self.0.write().unwrap().latency = latency;
|
|
|
|
}
|
|
|
|
|
|
|
|
fn prepare(&self) {
|
|
|
|
let mut inner = self.0.write().unwrap();
|
|
|
|
inner.clients_to_configure = inner.clients.to_vec();
|
|
|
|
}
|
|
|
|
|
|
|
|
fn prepare_socket(&self, socket: SocketQualified) {
|
|
|
|
let mut inner = self.0.write().unwrap();
|
|
|
|
|
|
|
|
match socket {
|
|
|
|
SocketQualified::Ipv4(socket) => inner.socket = Arc::new(Mutex::new(Some(socket))),
|
|
|
|
SocketQualified::Ipv6(socket) => inner.socket_v6 = Arc::new(Mutex::new(Some(socket))),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn unprepare(&self) {
|
|
|
|
let mut inner = self.0.write().unwrap();
|
|
|
|
*inner = UdpSinkPadHandlerInner::new(Arc::clone(&inner.settings))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn clear_clients(&self, gst_pad: &gst::Pad, clients_to_add: impl Iterator<Item = SocketAddr>) {
|
|
|
|
self.0
|
|
|
|
.write()
|
|
|
|
.unwrap()
|
|
|
|
.clear_clients(gst_pad, clients_to_add);
|
|
|
|
}
|
|
|
|
|
|
|
|
fn remove_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) {
|
|
|
|
self.0.write().unwrap().remove_client(gst_pad, addr);
|
|
|
|
}
|
|
|
|
|
|
|
|
fn add_client(&self, gst_pad: &gst::Pad, addr: SocketAddr) {
|
|
|
|
self.0.write().unwrap().add_client(gst_pad, addr);
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_clients(&self) -> Vec<SocketAddr> {
|
|
|
|
(*self.0.read().unwrap().clients).clone()
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
|
2020-03-16 10:30:53 +00:00
|
|
|
fn configure_client(
|
|
|
|
&self,
|
2020-03-16 10:42:33 +00:00
|
|
|
settings: &Settings,
|
2020-03-16 10:30:53 +00:00
|
|
|
socket: &mut Option<tokio::net::UdpSocket>,
|
|
|
|
socket_v6: &mut Option<tokio::net::UdpSocket>,
|
|
|
|
client: &SocketAddr,
|
|
|
|
) -> Result<(), gst::ErrorMessage> {
|
2020-03-15 11:13:26 +00:00
|
|
|
if client.ip().is_multicast() {
|
|
|
|
match client.ip() {
|
|
|
|
IpAddr::V4(addr) => {
|
2020-03-16 10:30:53 +00:00
|
|
|
if let Some(socket) = socket.as_mut() {
|
2020-03-16 10:42:33 +00:00
|
|
|
if settings.auto_multicast {
|
2020-03-15 11:13:26 +00:00
|
|
|
socket
|
|
|
|
.join_multicast_v4(addr, Ipv4Addr::new(0, 0, 0, 0))
|
|
|
|
.map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to join multicast group: {}", err]
|
|
|
|
)
|
|
|
|
})?;
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
2020-03-16 10:42:33 +00:00
|
|
|
if settings.multicast_loop {
|
2020-03-15 11:13:26 +00:00
|
|
|
socket.set_multicast_loop_v4(true).map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to set multicast loop: {}", err]
|
|
|
|
)
|
|
|
|
})?;
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
2020-03-16 10:42:33 +00:00
|
|
|
socket
|
|
|
|
.set_multicast_ttl_v4(settings.ttl_mc)
|
|
|
|
.map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to set multicast ttl: {}", err]
|
|
|
|
)
|
|
|
|
})?;
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
}
|
2020-03-15 11:13:26 +00:00
|
|
|
IpAddr::V6(addr) => {
|
2020-03-16 10:30:53 +00:00
|
|
|
if let Some(socket) = socket_v6.as_mut() {
|
2020-03-16 10:42:33 +00:00
|
|
|
if settings.auto_multicast {
|
2020-03-15 11:13:26 +00:00
|
|
|
socket.join_multicast_v6(&addr, 0).map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to join multicast group: {}", err]
|
|
|
|
)
|
|
|
|
})?;
|
|
|
|
}
|
2020-03-16 10:42:33 +00:00
|
|
|
if settings.multicast_loop {
|
2020-03-15 11:13:26 +00:00
|
|
|
socket.set_multicast_loop_v6(true).map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to set multicast loop: {}", err]
|
|
|
|
)
|
|
|
|
})?;
|
|
|
|
}
|
|
|
|
/* FIXME no API for set_multicast_ttl_v6 ? */
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-12-23 15:23:38 +00:00
|
|
|
} else {
|
2020-03-15 11:13:26 +00:00
|
|
|
match client.ip() {
|
|
|
|
IpAddr::V4(_) => {
|
2020-03-16 10:30:53 +00:00
|
|
|
if let Some(socket) = socket.as_mut() {
|
2020-03-16 10:42:33 +00:00
|
|
|
socket.set_ttl(settings.ttl).map_err(|err| {
|
2020-03-15 11:13:26 +00:00
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to set unicast ttl: {}", err]
|
|
|
|
)
|
|
|
|
})?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
IpAddr::V6(_) => {
|
2020-03-16 10:30:53 +00:00
|
|
|
if let Some(socket) = socket_v6.as_mut() {
|
2020-03-16 10:42:33 +00:00
|
|
|
socket.set_ttl(settings.ttl).map_err(|err| {
|
2020-03-15 11:13:26 +00:00
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to set unicast ttl: {}", err]
|
|
|
|
)
|
|
|
|
})?;
|
|
|
|
}
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
Ok(())
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
|
2020-03-16 10:33:18 +00:00
|
|
|
fn unconfigure_client(
|
|
|
|
&self,
|
2020-03-16 10:42:33 +00:00
|
|
|
settings: &Settings,
|
2020-03-16 10:33:18 +00:00
|
|
|
socket: &mut Option<tokio::net::UdpSocket>,
|
|
|
|
socket_v6: &mut Option<tokio::net::UdpSocket>,
|
|
|
|
client: &SocketAddr,
|
|
|
|
) -> Result<(), gst::ErrorMessage> {
|
|
|
|
if client.ip().is_multicast() {
|
|
|
|
match client.ip() {
|
|
|
|
IpAddr::V4(addr) => {
|
|
|
|
if let Some(socket) = socket.as_mut() {
|
2020-03-16 10:42:33 +00:00
|
|
|
if settings.auto_multicast {
|
2020-03-16 10:33:18 +00:00
|
|
|
socket
|
|
|
|
.leave_multicast_v4(addr, Ipv4Addr::new(0, 0, 0, 0))
|
|
|
|
.map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to join multicast group: {}", err]
|
|
|
|
)
|
|
|
|
})?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
IpAddr::V6(addr) => {
|
|
|
|
if let Some(socket) = socket_v6.as_mut() {
|
2020-03-16 10:42:33 +00:00
|
|
|
if settings.auto_multicast {
|
2020-03-16 10:33:18 +00:00
|
|
|
socket.leave_multicast_v6(&addr, 0).map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to join multicast group: {}", err]
|
|
|
|
)
|
|
|
|
})?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
async fn render(
|
2019-12-23 15:23:38 +00:00
|
|
|
&self,
|
|
|
|
element: &gst::Element,
|
2020-03-15 11:13:26 +00:00
|
|
|
buffer: gst::Buffer,
|
2019-12-23 15:23:38 +00:00
|
|
|
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
2020-03-16 10:33:18 +00:00
|
|
|
let (
|
|
|
|
do_sync,
|
|
|
|
rtime,
|
|
|
|
clients,
|
|
|
|
clients_to_configure,
|
|
|
|
clients_to_unconfigure,
|
|
|
|
socket,
|
|
|
|
socket_v6,
|
2020-03-16 10:42:33 +00:00
|
|
|
settings,
|
2020-03-16 10:33:18 +00:00
|
|
|
) = {
|
2020-03-26 18:07:25 +00:00
|
|
|
let mut inner = self.0.write().unwrap();
|
|
|
|
let do_sync = inner.sync;
|
2020-03-15 11:13:26 +00:00
|
|
|
let mut rtime: gst::ClockTime = 0.into();
|
2019-12-23 15:23:38 +00:00
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
if let Some(segment) = &inner.segment {
|
2019-12-23 15:23:38 +00:00
|
|
|
if let Some(segment) = segment.downcast_ref::<gst::format::Time>() {
|
2020-03-15 11:13:26 +00:00
|
|
|
rtime = segment.to_running_time(buffer.get_pts());
|
2020-03-26 18:07:25 +00:00
|
|
|
if inner.latency.is_some() {
|
|
|
|
rtime += inner.latency;
|
2020-03-15 11:13:26 +00:00
|
|
|
}
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
}
|
2020-03-15 11:13:26 +00:00
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
let clients_to_configure = mem::replace(&mut inner.clients_to_configure, vec![]);
|
|
|
|
let clients_to_unconfigure = mem::replace(&mut inner.clients_to_unconfigure, vec![]);
|
2020-03-15 11:13:26 +00:00
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
let settings = inner.settings.lock().unwrap().clone();
|
2020-03-16 10:42:33 +00:00
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
(
|
|
|
|
do_sync,
|
|
|
|
rtime,
|
2020-03-26 18:07:25 +00:00
|
|
|
Arc::clone(&inner.clients),
|
2020-03-15 11:13:26 +00:00
|
|
|
clients_to_configure,
|
2020-03-16 10:33:18 +00:00
|
|
|
clients_to_unconfigure,
|
2020-03-26 18:07:25 +00:00
|
|
|
Arc::clone(&inner.socket),
|
|
|
|
Arc::clone(&inner.socket_v6),
|
2020-03-16 10:42:33 +00:00
|
|
|
settings,
|
2020-03-15 11:13:26 +00:00
|
|
|
)
|
|
|
|
};
|
|
|
|
|
2020-03-16 10:42:33 +00:00
|
|
|
let mut socket = socket.lock().await;
|
|
|
|
let mut socket_v6 = socket_v6.lock().await;
|
2020-03-16 10:30:53 +00:00
|
|
|
|
2020-03-16 10:42:33 +00:00
|
|
|
if !clients_to_configure.is_empty() {
|
2020-03-16 10:30:53 +00:00
|
|
|
for client in &clients_to_configure {
|
2020-03-16 10:42:33 +00:00
|
|
|
self.configure_client(&settings, &mut socket, &mut socket_v6, &client)
|
|
|
|
.map_err(|err| {
|
|
|
|
gst_element_error!(
|
|
|
|
element,
|
|
|
|
gst::StreamError::Failed,
|
|
|
|
["Failed to configure client {:?}: {}", client, err]
|
|
|
|
);
|
2020-03-16 10:30:53 +00:00
|
|
|
|
2020-03-16 10:42:33 +00:00
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
2020-03-16 10:30:53 +00:00
|
|
|
}
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
|
2020-03-16 10:33:18 +00:00
|
|
|
if !clients_to_unconfigure.is_empty() {
|
|
|
|
for client in &clients_to_unconfigure {
|
2020-03-16 10:42:33 +00:00
|
|
|
self.unconfigure_client(&settings, &mut socket, &mut socket_v6, &client)
|
2020-03-16 10:33:18 +00:00
|
|
|
.map_err(|err| {
|
|
|
|
gst_element_error!(
|
|
|
|
element,
|
|
|
|
gst::StreamError::Failed,
|
|
|
|
["Failed to unconfigure client {:?}: {}", client, err]
|
|
|
|
);
|
|
|
|
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
if do_sync {
|
|
|
|
self.sync(&element, rtime).await;
|
|
|
|
}
|
2019-12-23 15:23:38 +00:00
|
|
|
|
|
|
|
let data = buffer.map_readable().map_err(|_| {
|
|
|
|
gst_element_error!(
|
|
|
|
element,
|
|
|
|
gst::StreamError::Format,
|
|
|
|
["Failed to map buffer readable"]
|
|
|
|
);
|
|
|
|
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
for client in clients.iter() {
|
2019-12-23 15:23:38 +00:00
|
|
|
let socket = match client.ip() {
|
2020-03-16 10:42:33 +00:00
|
|
|
IpAddr::V4(_) => &mut socket,
|
|
|
|
IpAddr::V6(_) => &mut socket_v6,
|
2019-12-23 15:23:38 +00:00
|
|
|
};
|
|
|
|
|
2020-03-16 10:42:33 +00:00
|
|
|
if let Some(socket) = socket.as_mut() {
|
2019-12-23 15:23:38 +00:00
|
|
|
gst_log!(CAT, obj: element, "Sending to {:?}", &client);
|
|
|
|
socket.send_to(&data, client).await.map_err(|err| {
|
|
|
|
gst_element_error!(
|
|
|
|
element,
|
|
|
|
gst::StreamError::Failed,
|
|
|
|
("I/O error"),
|
|
|
|
["streaming stopped, I/O error {}", err]
|
|
|
|
);
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
2020-01-15 21:31:20 +00:00
|
|
|
} else {
|
|
|
|
gst_element_error!(
|
|
|
|
element,
|
|
|
|
gst::StreamError::Failed,
|
|
|
|
("I/O error"),
|
|
|
|
["No socket available for sending to {}", client]
|
|
|
|
);
|
|
|
|
return Err(gst::FlowError::Error);
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
gst_log!(
|
|
|
|
CAT,
|
|
|
|
obj: element,
|
|
|
|
"Sent buffer {:?} to all clients",
|
|
|
|
&buffer
|
|
|
|
);
|
|
|
|
|
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Wait until specified time */
|
|
|
|
async fn sync(&self, element: &gst::Element, running_time: gst::ClockTime) {
|
2020-04-13 15:06:14 +00:00
|
|
|
let now = element.get_current_running_time();
|
2019-12-23 15:23:38 +00:00
|
|
|
|
|
|
|
if now < running_time {
|
|
|
|
let delay = running_time - now;
|
2020-02-19 19:38:38 +00:00
|
|
|
runtime::time::delay_for(Duration::from_nanos(delay.nseconds().unwrap())).await;
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
async fn handle_event(&self, element: &gst::Element, event: gst::Event) {
|
|
|
|
match event.view() {
|
|
|
|
EventView::Eos(_) => {
|
2020-06-30 20:57:22 +00:00
|
|
|
let _ = element.post_message(gst::message::Eos::builder().src(element).build());
|
2020-03-15 11:13:26 +00:00
|
|
|
}
|
|
|
|
EventView::Segment(e) => {
|
2020-03-26 18:07:25 +00:00
|
|
|
self.0.write().unwrap().segment = Some(e.get_segment().clone());
|
2020-03-15 11:13:26 +00:00
|
|
|
}
|
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PadSinkHandler for UdpSinkPadHandler {
|
|
|
|
type ElementImpl = UdpSink;
|
|
|
|
|
|
|
|
fn sink_chain(
|
|
|
|
&self,
|
|
|
|
_pad: &PadSinkRef,
|
|
|
|
_udpsink: &UdpSink,
|
2020-04-06 11:01:16 +00:00
|
|
|
element: &gst::Element,
|
2020-03-15 11:13:26 +00:00
|
|
|
buffer: gst::Buffer,
|
|
|
|
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
|
|
|
|
let sender = Arc::clone(&self.0.read().unwrap().sender);
|
2020-04-06 11:01:16 +00:00
|
|
|
let element = element.clone();
|
2020-03-15 11:13:26 +00:00
|
|
|
|
|
|
|
async move {
|
|
|
|
if let Some(sender) = sender.lock().await.as_mut() {
|
2020-04-27 09:22:26 +00:00
|
|
|
if sender.send(TaskItem::Buffer(buffer)).await.is_err() {
|
2020-04-06 11:01:16 +00:00
|
|
|
gst_debug!(CAT, obj: &element, "Flushing");
|
|
|
|
return Err(gst::FlowError::Flushing);
|
|
|
|
}
|
2020-03-15 11:13:26 +00:00
|
|
|
}
|
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
}
|
|
|
|
.boxed()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn sink_chain_list(
|
|
|
|
&self,
|
|
|
|
_pad: &PadSinkRef,
|
|
|
|
_udpsink: &UdpSink,
|
2020-04-06 11:01:16 +00:00
|
|
|
element: &gst::Element,
|
2020-03-15 11:13:26 +00:00
|
|
|
list: gst::BufferList,
|
|
|
|
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
|
|
|
|
let sender = Arc::clone(&self.0.read().unwrap().sender);
|
2020-04-06 11:01:16 +00:00
|
|
|
let element = element.clone();
|
2020-03-15 11:13:26 +00:00
|
|
|
|
|
|
|
async move {
|
|
|
|
if let Some(sender) = sender.lock().await.as_mut() {
|
|
|
|
for buffer in list.iter_owned() {
|
2020-04-27 09:22:26 +00:00
|
|
|
if sender.send(TaskItem::Buffer(buffer)).await.is_err() {
|
2020-04-06 11:01:16 +00:00
|
|
|
gst_debug!(CAT, obj: &element, "Flushing");
|
|
|
|
return Err(gst::FlowError::Flushing);
|
|
|
|
}
|
2020-03-15 11:13:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
}
|
|
|
|
.boxed()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn sink_event_serialized(
|
|
|
|
&self,
|
|
|
|
_pad: &PadSinkRef,
|
|
|
|
_udpsink: &UdpSink,
|
|
|
|
element: &gst::Element,
|
|
|
|
event: gst::Event,
|
|
|
|
) -> BoxFuture<'static, bool> {
|
|
|
|
let sender = Arc::clone(&self.0.read().unwrap().sender);
|
|
|
|
let element = element.clone();
|
|
|
|
|
|
|
|
async move {
|
|
|
|
if let EventView::FlushStop(_) = event.view() {
|
2020-03-26 18:07:25 +00:00
|
|
|
let udpsink = UdpSink::from_instance(&element);
|
2020-05-15 17:38:54 +00:00
|
|
|
return udpsink.task.flush_stop().is_ok();
|
2020-03-15 11:40:45 +00:00
|
|
|
} else if let Some(sender) = sender.lock().await.as_mut() {
|
2020-04-27 09:22:26 +00:00
|
|
|
if sender.send(TaskItem::Event(event)).await.is_err() {
|
2020-04-06 11:01:16 +00:00
|
|
|
gst_debug!(CAT, obj: &element, "Flushing");
|
|
|
|
}
|
2020-03-15 11:13:26 +00:00
|
|
|
}
|
2020-03-26 18:07:25 +00:00
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
true
|
|
|
|
}
|
|
|
|
.boxed()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn sink_event(
|
2019-12-23 15:23:38 +00:00
|
|
|
&self,
|
2020-03-15 11:13:26 +00:00
|
|
|
_pad: &PadSinkRef,
|
2020-03-26 18:07:25 +00:00
|
|
|
udpsink: &UdpSink,
|
2020-04-20 19:35:06 +00:00
|
|
|
_element: &gst::Element,
|
2020-03-15 11:13:26 +00:00
|
|
|
event: gst::Event,
|
|
|
|
) -> bool {
|
2020-03-26 18:07:25 +00:00
|
|
|
if let EventView::FlushStart(..) = event.view() {
|
2020-05-15 17:38:54 +00:00
|
|
|
return udpsink.task.flush_start().is_ok();
|
2020-03-15 11:13:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-20 19:35:06 +00:00
|
|
|
#[derive(Debug)]
|
|
|
|
struct UdpSinkTask {
|
|
|
|
element: gst::Element,
|
|
|
|
sink_pad_handler: UdpSinkPadHandler,
|
|
|
|
receiver: Option<mpsc::Receiver<TaskItem>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl UdpSinkTask {
|
|
|
|
fn new(element: &gst::Element, sink_pad_handler: &UdpSinkPadHandler) -> Self {
|
|
|
|
UdpSinkTask {
|
|
|
|
element: element.clone(),
|
|
|
|
sink_pad_handler: sink_pad_handler.clone(),
|
|
|
|
receiver: None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl TaskImpl for UdpSinkTask {
|
2020-05-15 17:38:54 +00:00
|
|
|
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
2020-04-20 19:35:06 +00:00
|
|
|
async move {
|
|
|
|
gst_log!(CAT, obj: &self.element, "Starting task");
|
|
|
|
|
|
|
|
let (sender, receiver) = mpsc::channel(0);
|
|
|
|
|
|
|
|
let mut sink_pad_handler = self.sink_pad_handler.0.write().unwrap();
|
|
|
|
sink_pad_handler.sender = Arc::new(Mutex::new(Some(sender)));
|
|
|
|
|
|
|
|
self.receiver = Some(receiver);
|
|
|
|
|
|
|
|
gst_log!(CAT, obj: &self.element, "Task started");
|
2020-05-15 17:38:54 +00:00
|
|
|
Ok(())
|
2020-04-20 19:35:06 +00:00
|
|
|
}
|
|
|
|
.boxed()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
|
|
|
|
async move {
|
|
|
|
match self.receiver.as_mut().unwrap().next().await {
|
|
|
|
Some(TaskItem::Buffer(buffer)) => {
|
|
|
|
match self.sink_pad_handler.render(&self.element, buffer).await {
|
|
|
|
Err(err) => {
|
|
|
|
gst_element_error!(
|
|
|
|
&self.element,
|
|
|
|
gst::StreamError::Failed,
|
|
|
|
["Failed to render item, stopping task: {}", err]
|
|
|
|
);
|
|
|
|
|
|
|
|
Err(gst::FlowError::Error)
|
|
|
|
}
|
|
|
|
_ => Ok(()),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Some(TaskItem::Event(event)) => {
|
|
|
|
self.sink_pad_handler
|
|
|
|
.handle_event(&self.element, event)
|
|
|
|
.await;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
None => Err(gst::FlowError::Flushing),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
.boxed()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
#[derive(Debug)]
|
|
|
|
enum SocketFamily {
|
|
|
|
Ipv4,
|
|
|
|
Ipv6,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct UdpSink {
|
|
|
|
sink_pad: PadSink,
|
|
|
|
sink_pad_handler: UdpSinkPadHandler,
|
|
|
|
task: Task,
|
|
|
|
settings: Arc<StdMutex<Settings>>,
|
|
|
|
}
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
impl UdpSink {
|
2020-03-26 18:07:25 +00:00
|
|
|
fn prepare_socket(
|
2020-03-15 11:13:26 +00:00
|
|
|
&self,
|
2020-03-26 18:07:25 +00:00
|
|
|
family: SocketFamily,
|
2020-03-15 11:13:26 +00:00
|
|
|
context: &Context,
|
2019-12-23 15:23:38 +00:00
|
|
|
element: &gst::Element,
|
|
|
|
) -> Result<(), gst::ErrorMessage> {
|
2020-03-15 11:13:26 +00:00
|
|
|
let mut settings = self.settings.lock().unwrap();
|
2019-12-23 15:23:38 +00:00
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
let wrapped_socket = match family {
|
|
|
|
SocketFamily::Ipv4 => &settings.socket,
|
|
|
|
SocketFamily::Ipv6 => &settings.socket_v6,
|
|
|
|
};
|
|
|
|
|
|
|
|
let socket_qualified: SocketQualified;
|
|
|
|
|
|
|
|
if let Some(ref wrapped_socket) = wrapped_socket {
|
2019-12-23 15:23:38 +00:00
|
|
|
use std::net::UdpSocket;
|
|
|
|
|
|
|
|
let socket: UdpSocket;
|
|
|
|
|
|
|
|
#[cfg(unix)]
|
|
|
|
{
|
|
|
|
socket = wrapped_socket.get()
|
|
|
|
}
|
|
|
|
#[cfg(windows)]
|
|
|
|
{
|
|
|
|
socket = wrapped_socket.get()
|
|
|
|
}
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
let socket = context.enter(|| {
|
|
|
|
tokio::net::UdpSocket::from_std(socket).map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to setup socket for tokio: {}", err]
|
|
|
|
)
|
|
|
|
})
|
2019-12-23 15:23:38 +00:00
|
|
|
})?;
|
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
match family {
|
|
|
|
SocketFamily::Ipv4 => {
|
|
|
|
settings.used_socket = Some(wrapped_socket.clone());
|
|
|
|
socket_qualified = SocketQualified::Ipv4(socket);
|
|
|
|
}
|
|
|
|
SocketFamily::Ipv6 => {
|
|
|
|
settings.used_socket_v6 = Some(wrapped_socket.clone());
|
|
|
|
socket_qualified = SocketQualified::Ipv6(socket);
|
|
|
|
}
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
} else {
|
2020-03-26 18:07:25 +00:00
|
|
|
let bind_addr = match family {
|
|
|
|
SocketFamily::Ipv4 => &settings.bind_address,
|
|
|
|
SocketFamily::Ipv6 => &settings.bind_address_v6,
|
2019-12-23 15:23:38 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
let bind_addr: IpAddr = bind_addr.parse().map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::Settings,
|
|
|
|
["Invalid address '{}' set: {}", bind_addr, err]
|
|
|
|
)
|
|
|
|
})?;
|
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
let bind_port = match family {
|
|
|
|
SocketFamily::Ipv4 => settings.bind_port,
|
|
|
|
SocketFamily::Ipv6 => settings.bind_port_v6,
|
2019-12-23 15:23:38 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
let saddr = SocketAddr::new(bind_addr, bind_port as u16);
|
|
|
|
gst_debug!(CAT, obj: element, "Binding to {:?}", saddr);
|
|
|
|
|
2020-05-29 10:07:14 +00:00
|
|
|
let socket = match family {
|
|
|
|
SocketFamily::Ipv4 => socket2::Socket::new(
|
|
|
|
socket2::Domain::ipv4(),
|
|
|
|
socket2::Type::dgram(),
|
|
|
|
Some(socket2::Protocol::udp()),
|
|
|
|
),
|
|
|
|
SocketFamily::Ipv6 => socket2::Socket::new(
|
|
|
|
socket2::Domain::ipv6(),
|
|
|
|
socket2::Type::dgram(),
|
|
|
|
Some(socket2::Protocol::udp()),
|
|
|
|
),
|
2020-01-15 21:31:20 +00:00
|
|
|
};
|
|
|
|
|
2020-05-29 10:07:14 +00:00
|
|
|
let socket = match socket {
|
|
|
|
Ok(socket) => socket,
|
2020-01-15 21:31:20 +00:00
|
|
|
Err(err) => {
|
|
|
|
gst_warning!(
|
|
|
|
CAT,
|
|
|
|
obj: element,
|
2020-05-29 10:07:14 +00:00
|
|
|
"Failed to create {} socket: {}",
|
2020-03-26 18:07:25 +00:00
|
|
|
match family {
|
|
|
|
SocketFamily::Ipv4 => "IPv4",
|
|
|
|
SocketFamily::Ipv6 => "IPv6",
|
|
|
|
},
|
2020-01-15 21:31:20 +00:00
|
|
|
err
|
|
|
|
);
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
};
|
2019-12-23 15:23:38 +00:00
|
|
|
|
2020-05-29 10:07:14 +00:00
|
|
|
socket.bind(&saddr.into()).map_err(|err| {
|
2019-12-23 15:23:38 +00:00
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to bind socket: {}", err]
|
|
|
|
)
|
|
|
|
})?;
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
let socket = context.enter(|| {
|
2020-05-29 10:07:14 +00:00
|
|
|
tokio::net::UdpSocket::from_std(socket.into()).map_err(|err| {
|
2020-03-15 11:13:26 +00:00
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to setup socket for tokio: {}", err]
|
|
|
|
)
|
|
|
|
})
|
2019-12-23 15:23:38 +00:00
|
|
|
})?;
|
|
|
|
|
|
|
|
let wrapper = wrap_socket(&socket)?;
|
|
|
|
|
|
|
|
if settings.qos_dscp != -1 {
|
|
|
|
wrapper.set_tos(settings.qos_dscp).map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to set QoS DSCP: {}", err]
|
|
|
|
)
|
|
|
|
})?;
|
|
|
|
}
|
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
match family {
|
|
|
|
SocketFamily::Ipv4 => {
|
|
|
|
settings.used_socket = Some(wrapper);
|
|
|
|
socket_qualified = SocketQualified::Ipv4(socket)
|
|
|
|
}
|
|
|
|
SocketFamily::Ipv6 => {
|
|
|
|
settings.used_socket_v6 = Some(wrapper);
|
|
|
|
socket_qualified = SocketQualified::Ipv6(socket)
|
|
|
|
}
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
self.sink_pad_handler.prepare_socket(socket_qualified);
|
2019-12-23 15:23:38 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
|
2019-12-23 15:23:38 +00:00
|
|
|
gst_debug!(CAT, obj: element, "Preparing");
|
|
|
|
|
|
|
|
let context = {
|
2020-03-15 11:13:26 +00:00
|
|
|
let settings = self.settings.lock().unwrap();
|
2019-12-23 15:23:38 +00:00
|
|
|
|
|
|
|
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to acquire Context: {}", err]
|
|
|
|
)
|
|
|
|
})?
|
|
|
|
};
|
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
self.sink_pad_handler.prepare();
|
|
|
|
self.prepare_socket(SocketFamily::Ipv4, &context, element)?;
|
|
|
|
self.prepare_socket(SocketFamily::Ipv6, &context, element)?;
|
2019-12-23 15:23:38 +00:00
|
|
|
|
2020-04-20 19:35:06 +00:00
|
|
|
self.task
|
|
|
|
.prepare(UdpSinkTask::new(&element, &self.sink_pad_handler), context)
|
|
|
|
.map_err(|err| {
|
|
|
|
gst_error_msg!(
|
|
|
|
gst::ResourceError::OpenRead,
|
|
|
|
["Error preparing Task: {:?}", err]
|
|
|
|
)
|
|
|
|
})?;
|
2020-03-15 11:13:26 +00:00
|
|
|
|
2019-12-23 15:23:38 +00:00
|
|
|
gst_debug!(CAT, obj: element, "Started preparing");
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2020-04-20 19:35:06 +00:00
|
|
|
fn unprepare(&self, element: &gst::Element) {
|
2020-03-15 11:13:26 +00:00
|
|
|
gst_debug!(CAT, obj: element, "Unpreparing");
|
2019-12-23 15:23:38 +00:00
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
self.task.unprepare().unwrap();
|
2020-03-19 18:34:51 +00:00
|
|
|
self.sink_pad_handler.unprepare();
|
2019-12-23 15:23:38 +00:00
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
gst_debug!(CAT, obj: element, "Unprepared");
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
|
2020-05-15 17:38:54 +00:00
|
|
|
fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
|
2019-12-23 15:23:38 +00:00
|
|
|
gst_debug!(CAT, obj: element, "Stopping");
|
2020-05-15 17:38:54 +00:00
|
|
|
self.task.stop()?;
|
2019-12-23 15:23:38 +00:00
|
|
|
gst_debug!(CAT, obj: element, "Stopped");
|
2020-05-15 17:38:54 +00:00
|
|
|
Ok(())
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
|
2020-05-15 17:38:54 +00:00
|
|
|
fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
|
2020-03-26 18:07:25 +00:00
|
|
|
gst_debug!(CAT, obj: element, "Starting");
|
2020-05-15 17:38:54 +00:00
|
|
|
self.task.start()?;
|
2020-04-20 19:35:06 +00:00
|
|
|
gst_debug!(CAT, obj: element, "Started");
|
2020-05-15 17:38:54 +00:00
|
|
|
Ok(())
|
2020-03-26 18:07:25 +00:00
|
|
|
}
|
|
|
|
}
|
2020-03-16 10:30:53 +00:00
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
impl UdpSink {
|
|
|
|
fn clear_clients(&self, clients_to_add: impl Iterator<Item = SocketAddr>) {
|
|
|
|
self.sink_pad_handler
|
|
|
|
.clear_clients(&self.sink_pad.gst_pad(), clients_to_add);
|
|
|
|
}
|
2019-12-23 15:23:38 +00:00
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
fn remove_client(&self, addr: SocketAddr) {
|
|
|
|
self.sink_pad_handler
|
|
|
|
.remove_client(&self.sink_pad.gst_pad(), addr);
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
fn add_client(&self, addr: SocketAddr) {
|
|
|
|
self.sink_pad_handler
|
|
|
|
.add_client(&self.sink_pad.gst_pad(), addr);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-27 09:22:26 +00:00
|
|
|
fn try_into_socket_addr(element: &gst::Element, host: &str, port: i32) -> Result<SocketAddr, ()> {
|
2020-03-26 18:07:25 +00:00
|
|
|
let addr: IpAddr = match host.parse() {
|
|
|
|
Err(err) => {
|
|
|
|
gst_error!(CAT, obj: element, "Failed to parse host {}: {}", host, err);
|
|
|
|
return Err(());
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
2020-03-26 18:07:25 +00:00
|
|
|
Ok(addr) => addr,
|
|
|
|
};
|
2020-03-16 10:30:53 +00:00
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
let port: u16 = match port.try_into() {
|
|
|
|
Err(err) => {
|
|
|
|
gst_error!(CAT, obj: element, "Invalid port {}: {}", port, err);
|
|
|
|
return Err(());
|
|
|
|
}
|
|
|
|
Ok(port) => port,
|
|
|
|
};
|
2020-03-16 10:30:53 +00:00
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
Ok(SocketAddr::new(addr, port))
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl ObjectSubclass for UdpSink {
|
|
|
|
const NAME: &'static str = "RsTsUdpSink";
|
|
|
|
type ParentType = gst::Element;
|
|
|
|
type Instance = gst::subclass::ElementInstanceStruct<Self>;
|
|
|
|
type Class = subclass::simple::ClassStruct<Self>;
|
|
|
|
|
|
|
|
glib_object_subclass!();
|
|
|
|
|
|
|
|
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
|
|
|
|
klass.set_metadata(
|
|
|
|
"Thread-sharing UDP sink",
|
|
|
|
"Sink/Network",
|
|
|
|
"Thread-sharing UDP sink",
|
|
|
|
"Mathieu <mathieu@centricular.com>",
|
|
|
|
);
|
|
|
|
|
|
|
|
let caps = gst::Caps::new_any();
|
|
|
|
|
|
|
|
let sink_pad_template = gst::PadTemplate::new(
|
|
|
|
"sink",
|
|
|
|
gst::PadDirection::Sink,
|
|
|
|
gst::PadPresence::Always,
|
|
|
|
&caps,
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
klass.add_pad_template(sink_pad_template);
|
|
|
|
klass.add_signal_with_class_handler(
|
|
|
|
"add",
|
|
|
|
glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION,
|
|
|
|
&[String::static_type(), i32::static_type()],
|
|
|
|
glib::types::Type::Unit,
|
|
|
|
|_, args| {
|
|
|
|
let element = args[0]
|
|
|
|
.get::<gst::Element>()
|
|
|
|
.expect("signal arg")
|
|
|
|
.expect("missing signal arg");
|
|
|
|
let host = args[1]
|
|
|
|
.get::<String>()
|
|
|
|
.expect("signal arg")
|
|
|
|
.expect("missing signal arg");
|
|
|
|
let port = args[2]
|
|
|
|
.get::<i32>()
|
|
|
|
.expect("signal arg")
|
2020-04-27 09:22:26 +00:00
|
|
|
.expect("missing signal arg");
|
2019-12-23 15:23:38 +00:00
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
if let Ok(addr) = try_into_socket_addr(&element, &host, port) {
|
|
|
|
let udpsink = Self::from_instance(&element);
|
|
|
|
udpsink.add_client(addr);
|
|
|
|
}
|
2019-12-23 15:23:38 +00:00
|
|
|
|
|
|
|
None
|
|
|
|
},
|
|
|
|
);
|
|
|
|
|
|
|
|
klass.add_signal_with_class_handler(
|
|
|
|
"remove",
|
|
|
|
glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION,
|
|
|
|
&[String::static_type(), i32::static_type()],
|
|
|
|
glib::types::Type::Unit,
|
|
|
|
|_, args| {
|
|
|
|
let element = args[0]
|
|
|
|
.get::<gst::Element>()
|
|
|
|
.expect("signal arg")
|
|
|
|
.expect("missing signal arg");
|
|
|
|
let host = args[1]
|
|
|
|
.get::<String>()
|
|
|
|
.expect("signal arg")
|
|
|
|
.expect("missing signal arg");
|
|
|
|
let port = args[2]
|
|
|
|
.get::<i32>()
|
|
|
|
.expect("signal arg")
|
2020-04-27 09:22:26 +00:00
|
|
|
.expect("missing signal arg");
|
2019-12-23 15:23:38 +00:00
|
|
|
|
|
|
|
let udpsink = Self::from_instance(&element);
|
|
|
|
|
2020-08-11 08:45:17 +00:00
|
|
|
if let Ok(addr) = try_into_socket_addr(&element, &host, port) {
|
|
|
|
udpsink.remove_client(addr);
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
None
|
|
|
|
},
|
|
|
|
);
|
|
|
|
|
|
|
|
klass.add_signal_with_class_handler(
|
|
|
|
"clear",
|
|
|
|
glib::SignalFlags::RUN_LAST | glib::SignalFlags::ACTION,
|
|
|
|
&[],
|
|
|
|
glib::types::Type::Unit,
|
|
|
|
|_, args| {
|
|
|
|
let element = args[0]
|
|
|
|
.get::<gst::Element>()
|
|
|
|
.expect("signal arg")
|
|
|
|
.expect("missing signal arg");
|
|
|
|
|
|
|
|
let udpsink = Self::from_instance(&element);
|
2020-08-11 08:45:17 +00:00
|
|
|
udpsink.clear_clients(std::iter::empty());
|
2019-12-23 15:23:38 +00:00
|
|
|
|
|
|
|
None
|
|
|
|
},
|
|
|
|
);
|
|
|
|
|
|
|
|
klass.install_properties(&PROPERTIES);
|
|
|
|
}
|
|
|
|
|
2020-06-16 08:56:48 +00:00
|
|
|
fn with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
|
2020-03-15 11:13:26 +00:00
|
|
|
let settings = Arc::new(StdMutex::new(Settings::default()));
|
2020-04-29 13:03:43 +00:00
|
|
|
let sink_pad_handler = UdpSinkPadHandler::new(Arc::clone(&settings));
|
2019-12-23 15:23:38 +00:00
|
|
|
|
|
|
|
Self {
|
2020-04-29 13:03:43 +00:00
|
|
|
sink_pad: PadSink::new(
|
2020-06-11 10:45:15 +00:00
|
|
|
gst::Pad::from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
|
2020-04-29 13:03:43 +00:00
|
|
|
sink_pad_handler.clone(),
|
|
|
|
),
|
|
|
|
sink_pad_handler,
|
2020-03-26 18:07:25 +00:00
|
|
|
task: Task::default(),
|
2020-03-15 11:40:45 +00:00
|
|
|
settings,
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ObjectImpl for UdpSink {
|
|
|
|
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
|
|
|
|
let prop = &PROPERTIES[id];
|
|
|
|
let element = obj.downcast_ref::<gst::Element>().unwrap();
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
let mut settings = self.settings.lock().unwrap();
|
2019-12-23 15:23:38 +00:00
|
|
|
match *prop {
|
|
|
|
subclass::Property("sync", ..) => {
|
|
|
|
settings.sync = value.get_some().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
subclass::Property("bind-address", ..) => {
|
|
|
|
settings.bind_address = value
|
|
|
|
.get()
|
|
|
|
.expect("type checked upstream")
|
|
|
|
.unwrap_or_else(|| "".into());
|
|
|
|
}
|
|
|
|
subclass::Property("bind-port", ..) => {
|
|
|
|
settings.bind_port = value.get_some().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
subclass::Property("bind-address-v6", ..) => {
|
|
|
|
settings.bind_address_v6 = value
|
|
|
|
.get()
|
|
|
|
.expect("type checked upstream")
|
|
|
|
.unwrap_or_else(|| "".into());
|
|
|
|
}
|
|
|
|
subclass::Property("bind-port-v6", ..) => {
|
|
|
|
settings.bind_port_v6 = value.get_some().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
subclass::Property("socket", ..) => {
|
|
|
|
settings.socket = value
|
|
|
|
.get::<gio::Socket>()
|
|
|
|
.expect("type checked upstream")
|
|
|
|
.map(|socket| GioSocketWrapper::new(&socket));
|
|
|
|
}
|
|
|
|
subclass::Property("used-socket", ..) => {
|
|
|
|
unreachable!();
|
|
|
|
}
|
|
|
|
subclass::Property("socket-v6", ..) => {
|
|
|
|
settings.socket_v6 = value
|
|
|
|
.get::<gio::Socket>()
|
|
|
|
.expect("type checked upstream")
|
|
|
|
.map(|socket| GioSocketWrapper::new(&socket));
|
|
|
|
}
|
|
|
|
subclass::Property("used-socket-v6", ..) => {
|
|
|
|
unreachable!();
|
|
|
|
}
|
|
|
|
subclass::Property("auto-multicast", ..) => {
|
|
|
|
settings.auto_multicast = value.get_some().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
subclass::Property("loop", ..) => {
|
|
|
|
settings.multicast_loop = value.get_some().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
subclass::Property("ttl", ..) => {
|
|
|
|
settings.ttl = value.get_some().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
subclass::Property("ttl-mc", ..) => {
|
|
|
|
settings.ttl_mc = value.get_some().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
subclass::Property("qos-dscp", ..) => {
|
|
|
|
settings.qos_dscp = value.get_some().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
subclass::Property("clients", ..) => {
|
|
|
|
let clients: String = value
|
|
|
|
.get()
|
|
|
|
.expect("type checked upstream")
|
|
|
|
.unwrap_or_else(|| "".into());
|
2020-03-26 18:07:25 +00:00
|
|
|
|
2020-08-11 08:45:17 +00:00
|
|
|
let clients_iter = clients.split(',').filter_map(|client| {
|
2020-03-26 18:07:25 +00:00
|
|
|
let rsplit: Vec<&str> = client.rsplitn(2, ':').collect();
|
|
|
|
|
|
|
|
if rsplit.len() == 2 {
|
|
|
|
rsplit[0]
|
2020-04-27 09:22:26 +00:00
|
|
|
.parse::<i32>()
|
2020-03-26 18:07:25 +00:00
|
|
|
.map_err(|err| {
|
|
|
|
gst_error!(
|
|
|
|
CAT,
|
|
|
|
obj: element,
|
|
|
|
"Invalid port {}: {}",
|
|
|
|
rsplit[0],
|
|
|
|
err
|
|
|
|
);
|
|
|
|
})
|
|
|
|
.and_then(|port| try_into_socket_addr(&element, rsplit[1], port))
|
|
|
|
.ok()
|
|
|
|
} else {
|
|
|
|
None
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
2020-08-11 08:45:17 +00:00
|
|
|
});
|
2020-07-30 13:26:13 +00:00
|
|
|
drop(settings);
|
2020-03-26 18:07:25 +00:00
|
|
|
|
|
|
|
self.clear_clients(clients_iter);
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
subclass::Property("context", ..) => {
|
|
|
|
settings.context = value
|
|
|
|
.get()
|
|
|
|
.expect("type checked upstream")
|
|
|
|
.unwrap_or_else(|| "".into());
|
|
|
|
}
|
|
|
|
subclass::Property("context-wait", ..) => {
|
|
|
|
settings.context_wait = value.get_some().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
|
|
|
|
let prop = &PROPERTIES[id];
|
|
|
|
|
2020-03-15 11:13:26 +00:00
|
|
|
let settings = self.settings.lock().unwrap();
|
2019-12-23 15:23:38 +00:00
|
|
|
match *prop {
|
2020-03-15 11:13:26 +00:00
|
|
|
subclass::Property("sync", ..) => Ok(settings.sync.to_value()),
|
|
|
|
subclass::Property("bind-address", ..) => Ok(settings.bind_address.to_value()),
|
|
|
|
subclass::Property("bind-port", ..) => Ok(settings.bind_port.to_value()),
|
|
|
|
subclass::Property("bind-address-v6", ..) => Ok(settings.bind_address_v6.to_value()),
|
|
|
|
subclass::Property("bind-port-v6", ..) => Ok(settings.bind_port_v6.to_value()),
|
|
|
|
subclass::Property("socket", ..) => Ok(settings
|
|
|
|
.socket
|
|
|
|
.as_ref()
|
|
|
|
.map(GioSocketWrapper::as_socket)
|
|
|
|
.to_value()),
|
|
|
|
subclass::Property("used-socket", ..) => Ok(settings
|
|
|
|
.used_socket
|
|
|
|
.as_ref()
|
|
|
|
.map(GioSocketWrapper::as_socket)
|
|
|
|
.to_value()),
|
|
|
|
subclass::Property("socket-v6", ..) => Ok(settings
|
|
|
|
.socket_v6
|
|
|
|
.as_ref()
|
|
|
|
.map(GioSocketWrapper::as_socket)
|
|
|
|
.to_value()),
|
|
|
|
subclass::Property("used-socket-v6", ..) => Ok(settings
|
|
|
|
.used_socket_v6
|
|
|
|
.as_ref()
|
|
|
|
.map(GioSocketWrapper::as_socket)
|
|
|
|
.to_value()),
|
|
|
|
subclass::Property("auto-multicast", ..) => Ok(settings.sync.to_value()),
|
|
|
|
subclass::Property("loop", ..) => Ok(settings.multicast_loop.to_value()),
|
|
|
|
subclass::Property("ttl", ..) => Ok(settings.ttl.to_value()),
|
|
|
|
subclass::Property("ttl-mc", ..) => Ok(settings.ttl_mc.to_value()),
|
|
|
|
subclass::Property("qos-dscp", ..) => Ok(settings.qos_dscp.to_value()),
|
2019-12-23 15:23:38 +00:00
|
|
|
subclass::Property("clients", ..) => {
|
2020-08-11 09:20:56 +00:00
|
|
|
drop(settings);
|
|
|
|
|
2020-03-26 18:07:25 +00:00
|
|
|
let clients: Vec<String> = self
|
|
|
|
.sink_pad_handler
|
|
|
|
.get_clients()
|
|
|
|
.iter()
|
|
|
|
.map(ToString::to_string)
|
|
|
|
.collect();
|
2019-12-23 15:23:38 +00:00
|
|
|
|
|
|
|
Ok(clients.join(",").to_value())
|
|
|
|
}
|
2020-03-15 11:13:26 +00:00
|
|
|
subclass::Property("context", ..) => Ok(settings.context.to_value()),
|
|
|
|
subclass::Property("context-wait", ..) => Ok(settings.context_wait.to_value()),
|
2019-12-23 15:23:38 +00:00
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn constructed(&self, obj: &glib::Object) {
|
|
|
|
self.parent_constructed(obj);
|
|
|
|
|
|
|
|
let element = obj.downcast_ref::<gst::Element>().unwrap();
|
|
|
|
element.add_pad(self.sink_pad.gst_pad()).unwrap();
|
|
|
|
|
|
|
|
super::set_element_flags(element, gst::ElementFlags::SINK);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ElementImpl for UdpSink {
|
|
|
|
fn change_state(
|
|
|
|
&self,
|
|
|
|
element: &gst::Element,
|
|
|
|
transition: gst::StateChange,
|
|
|
|
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
|
|
|
|
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
|
|
|
|
|
|
|
|
match transition {
|
|
|
|
gst::StateChange::NullToReady => {
|
2020-03-15 11:13:26 +00:00
|
|
|
self.prepare(element).map_err(|err| {
|
2020-06-30 20:57:22 +00:00
|
|
|
element.post_error_message(err);
|
2019-12-23 15:23:38 +00:00
|
|
|
gst::StateChangeError
|
|
|
|
})?;
|
|
|
|
}
|
|
|
|
gst::StateChange::ReadyToPaused => {
|
2020-05-15 17:38:54 +00:00
|
|
|
self.start(element).map_err(|_| gst::StateChangeError)?;
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
gst::StateChange::PausedToReady => {
|
2020-05-15 17:38:54 +00:00
|
|
|
self.stop(element).map_err(|_| gst::StateChangeError)?;
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
gst::StateChange::ReadyToNull => {
|
2020-04-20 19:35:06 +00:00
|
|
|
self.unprepare(element);
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
_ => (),
|
|
|
|
}
|
|
|
|
|
|
|
|
self.parent_change_state(element, transition)
|
|
|
|
}
|
2020-02-19 19:38:50 +00:00
|
|
|
|
|
|
|
fn send_event(&self, _element: &gst::Element, event: gst::Event) -> bool {
|
|
|
|
match event.view() {
|
|
|
|
EventView::Latency(ev) => {
|
2020-03-26 18:07:25 +00:00
|
|
|
self.sink_pad_handler.set_latency(ev.get_latency());
|
2020-02-19 19:38:50 +00:00
|
|
|
self.sink_pad.gst_pad().push_event(event)
|
|
|
|
}
|
|
|
|
EventView::Step(..) => false,
|
|
|
|
_ => self.sink_pad.gst_pad().push_event(event),
|
|
|
|
}
|
|
|
|
}
|
2019-12-23 15:23:38 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
|
|
|
gst::Element::register(
|
|
|
|
Some(plugin),
|
|
|
|
"ts-udpsink",
|
|
|
|
gst::Rank::None,
|
|
|
|
UdpSink::get_type(),
|
|
|
|
)
|
|
|
|
}
|