ts-udpsrc: Implement socket and used-socket properties like in udpsrc

This commit is contained in:
Sebastian Dröge 2018-11-13 14:13:23 +02:00
parent 4fb18382c2
commit ab08cbd412
3 changed files with 302 additions and 94 deletions

View file

@ -6,8 +6,11 @@ license = "LGPL-2.1+"
[dependencies]
glib-sys = { git = "https://github.com/gtk-rs/sys" }
gobject-sys = { git = "https://github.com/gtk-rs/sys" }
gio-sys = { git = "https://github.com/gtk-rs/sys" }
gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" }
glib = { git = "https://github.com/gtk-rs/glib" }
gio = { git = "https://github.com/gtk-rs/gio" }
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gobject-subclass = { git = "https://github.com/gtk-rs/gobject-subclass" }
gst-plugin = { path = "../gst-plugin" }

View file

@ -17,9 +17,12 @@
#![crate_type = "cdylib"]
extern crate gio_sys as gio_ffi;
extern crate glib_sys as glib_ffi;
extern crate gobject_sys as gobject_ffi;
extern crate gstreamer_sys as gst_ffi;
extern crate gio;
extern crate glib;
extern crate gobject_subclass;
#[macro_use]

View file

@ -20,6 +20,11 @@ use glib::prelude::*;
use gst;
use gst::prelude::*;
use gio;
use gio_ffi;
use gobject_ffi;
use gobject_subclass::object::*;
use gst_plugin::element::*;
@ -38,6 +43,12 @@ use rand;
use net2;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawFd, RawSocket};
use iocontext::*;
use socket::*;
@ -46,9 +57,69 @@ const DEFAULT_PORT: u32 = 5000;
const DEFAULT_REUSE: bool = true;
const DEFAULT_CAPS: Option<gst::Caps> = None;
const DEFAULT_MTU: u32 = 1500;
const DEFAULT_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_CONTEXT: &'static str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0;
// Send/Sync struct for passing around a gio::Socket
// and getting the raw fd from it
//
// gio::Socket is not Send/Sync as it's generally unsafe
// to access it from multiple threads. Getting the underlying raw
// fd is safe though, as is receiving/sending from two different threads
#[derive(Debug)]
struct GioSocketWrapper {
socket: *mut gio_ffi::GSocket,
}
unsafe impl Send for GioSocketWrapper {}
unsafe impl Sync for GioSocketWrapper {}
impl GioSocketWrapper {
fn new(socket: &gio::Socket) -> Self {
use glib::translate::*;
Self {
socket: socket.to_glib_full(),
}
}
fn as_socket(&self) -> gio::Socket {
unsafe {
use glib::translate::*;
from_glib_none(self.socket)
}
}
#[cfg(unix)]
fn get<T: FromRawFd>(&self) -> T {
unsafe { FromRawFd::from_raw_fd(gio_ffi::g_socket_get_fd(self.socket)) }
}
#[cfg(windows)]
fn get<T: FromRawSocket>(&self) -> T {
unsafe { FromRawSocket::from_raw_socket(ffi::g_socket_get_fd(self.socket) as _) }
}
}
impl Clone for GioSocketWrapper {
fn clone(&self) -> Self {
Self {
socket: unsafe { gobject_ffi::g_object_ref(self.socket as *mut _) as *mut _ },
}
}
}
impl Drop for GioSocketWrapper {
fn drop(&mut self) {
unsafe {
gobject_ffi::g_object_unref(self.socket as *mut _);
}
}
}
#[derive(Debug, Clone)]
struct Settings {
address: Option<String>,
@ -56,6 +127,8 @@ struct Settings {
reuse: bool,
caps: Option<gst::Caps>,
mtu: u32,
socket: Option<GioSocketWrapper>,
used_socket: Option<GioSocketWrapper>,
context: String,
context_wait: u32,
}
@ -68,13 +141,15 @@ impl Default for Settings {
reuse: DEFAULT_REUSE,
caps: DEFAULT_CAPS,
mtu: DEFAULT_MTU,
socket: DEFAULT_SOCKET,
used_socket: DEFAULT_USED_SOCKET,
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
}
}
}
static PROPERTIES: [Property; 7] = [
static PROPERTIES: [Property; 9] = [
Property::String(
"address",
"Address",
@ -112,6 +187,20 @@ static PROPERTIES: [Property; 7] = [
DEFAULT_MTU,
PropertyMutability::ReadWrite,
),
Property::Object(
"socket",
"Socket",
"Socket to use for UDP reception. (None == allocate)",
gio::Socket::static_type,
PropertyMutability::ReadWrite,
),
Property::Object(
"used-socket",
"Used Socket",
"Socket currently in use for UDP reception. (None = no socket)",
gio::Socket::static_type,
PropertyMutability::Readable,
),
Property::String(
"context",
"Context",
@ -438,120 +527,203 @@ impl UdpSrc {
)
})?;
let addr: IpAddr = match settings.address {
None => {
return Err(gst_error_msg!(
gst::ResourceError::Settings,
["No address set"]
))
let socket = if let Some(ref wrapped_socket) = settings.socket {
use std::net::UdpSocket;
let mut socket: UdpSocket;
#[cfg(unix)]
{
socket = wrapped_socket.get()
}
Some(ref addr) => match addr.parse() {
Err(err) => {
#[cfg(windows)]
{
socket = wrapped_socket.get()
}
let socket =
net::UdpSocket::from_std(socket, io_context.reactor_handle()).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to setup socket for tokio: {}", err]
)
})?;
self.settings.lock().unwrap().used_socket = Some(wrapped_socket.clone());
socket
} else {
let addr: IpAddr = match settings.address {
None => {
return Err(gst_error_msg!(
gst::ResourceError::Settings,
["Invalid address '{}' set: {}", addr, err]
["No address set"]
))
}
Ok(addr) => addr,
},
};
let port = settings.port;
Some(ref addr) => match addr.parse() {
Err(err) => {
return Err(gst_error_msg!(
gst::ResourceError::Settings,
["Invalid address '{}' set: {}", addr, err]
))
}
Ok(addr) => addr,
},
};
let port = settings.port;
// TODO: TTL, multicast loopback, etc
let saddr = if addr.is_multicast() {
// TODO: Use ::unspecified() constructor once stable
let bind_addr = if addr.is_ipv4() {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))
// TODO: TTL, multicast loopback, etc
let saddr = if addr.is_multicast() {
// TODO: Use ::unspecified() constructor once stable
let bind_addr = if addr.is_ipv4() {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))
} else {
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0))
};
let saddr = SocketAddr::new(bind_addr, port as u16);
gst_debug!(
self.cat,
obj: element,
"Binding to {:?} for multicast group {:?}",
saddr,
addr
);
saddr
} else {
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0))
let saddr = SocketAddr::new(addr, port as u16);
gst_debug!(self.cat, obj: element, "Binding to {:?}", saddr);
saddr
};
let saddr = SocketAddr::new(bind_addr, port as u16);
gst_debug!(
self.cat,
obj: element,
"Binding to {:?} for multicast group {:?}",
saddr,
addr
);
saddr
} else {
let saddr = SocketAddr::new(addr, port as u16);
gst_debug!(self.cat, obj: element, "Binding to {:?}", saddr);
saddr
};
let builder = if addr.is_ipv4() {
net2::UdpBuilder::new_v4()
} else {
net2::UdpBuilder::new_v6()
}
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to create socket: {}", err]
)
})?;
builder.reuse_address(settings.reuse).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to set reuse_address: {}", err]
)
})?;
#[cfg(unix)]
{
use net2::unix::UnixUdpBuilderExt;
builder.reuse_port(settings.reuse).map_err(|err| {
let builder = if addr.is_ipv4() {
net2::UdpBuilder::new_v4()
} else {
net2::UdpBuilder::new_v6()
}
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to set reuse_port: {}", err]
)
})?;
}
let socket = builder.bind(&saddr).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to bind socket: {}", err]
)
})?;
let socket =
net::UdpSocket::from_std(socket, io_context.reactor_handle()).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to setup socket for tokio: {}", err]
["Failed to create socket: {}", err]
)
})?;
if addr.is_multicast() {
// TODO: Multicast interface configuration, going to be tricky
match addr {
IpAddr::V4(addr) => {
socket
.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
.map_err(|err| {
builder.reuse_address(settings.reuse).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to set reuse_address: {}", err]
)
})?;
#[cfg(unix)]
{
use net2::unix::UnixUdpBuilderExt;
builder.reuse_port(settings.reuse).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to set reuse_port: {}", err]
)
})?;
}
let socket = builder.bind(&saddr).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to bind socket: {}", err]
)
})?;
let socket =
net::UdpSocket::from_std(socket, io_context.reactor_handle()).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to setup socket for tokio: {}", err]
)
})?;
if addr.is_multicast() {
// TODO: Multicast interface configuration, going to be tricky
match addr {
IpAddr::V4(addr) => {
socket
.join_multicast_v4(&addr, &Ipv4Addr::new(0, 0, 0, 0))
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
IpAddr::V6(addr) => {
socket.join_multicast_v6(&addr, 0).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
IpAddr::V6(addr) => {
socket.join_multicast_v6(&addr, 0).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to join multicast group: {}", err]
)
})?;
}
}
}
}
// Store the socket as used-socket in the settings
#[cfg(unix)]
{
let fd = socket.as_raw_fd();
// This is technically unsafe because it allows
// us to share the fd between the socket and the
// GIO socket below, but safety of this is the
// job of the application
struct FdConverter(RawFd);
impl IntoRawFd for FdConverter {
fn into_raw_fd(self) -> RawFd {
self.0
}
}
let fd = FdConverter(fd);
let gio_socket = gio::Socket::new_from_fd(fd).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to create wrapped GIO socket: {}", err]
)
})?;;
let wrapper = GioSocketWrapper::new(&gio_socket);
self.settings.lock().unwrap().used_socket = Some(wrapper);
}
#[cfg(windows)]
{
let fd = socket.as_raw_socket();
// This is technically unsafe because it allows
// us to share the fd between the socket and the
// GIO socket below, but safety of this is the
// job of the application
struct SocketConverter(RawSocket);
impl IntoRawSocket for SocketConverter {
fn into_raw_socket(self) -> RawSocket {
self.0
}
}
let fd = SocketConverter(fd);
let gio_socket = gio::Socket::new_from_socket(fd).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to create wrapped GIO socket: {}", err]
)
})?;;
let wrapper = GioSocketWrapper::new(&gio_socket);
self.settings.lock().unwrap().used_socket = Some(wrapper);
}
socket
};
let buffer_pool = gst::BufferPool::new();
let mut config = buffer_pool.get_config();
@ -615,6 +787,9 @@ impl UdpSrc {
state.pending_future_id = Some(pending_future_id);
gst_debug!(self.cat, obj: element, "Prepared");
drop(state);
element.notify("used-socket");
Ok(())
}
@ -622,6 +797,8 @@ impl UdpSrc {
fn unprepare(&self, element: &Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing");
self.settings.lock().unwrap().used_socket = None;
// FIXME: The IO Context has to be alive longer than the queue,
// otherwise the queue can't finish any remaining work
let (mut socket, io_context) = {
@ -701,6 +878,15 @@ impl ObjectImpl<Element> for UdpSrc {
let mut settings = self.settings.lock().unwrap();
settings.mtu = value.get().unwrap();
}
Property::Object("socket", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.socket = value
.get::<gio::Socket>()
.map(|socket| GioSocketWrapper::new(&socket));
}
Property::Object("used-socket", ..) => {
unreachable!();
}
Property::String("context", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.context = value.get().unwrap_or_else(|| "".into());
@ -737,6 +923,22 @@ impl ObjectImpl<Element> for UdpSrc {
let mut settings = self.settings.lock().unwrap();
Ok(settings.mtu.to_value())
}
Property::Object("socket", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings
.socket
.as_ref()
.map(GioSocketWrapper::as_socket)
.to_value())
}
Property::Object("used-socket", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings
.used_socket
.as_ref()
.map(GioSocketWrapper::as_socket)
.to_value())
}
Property::String("context", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context.to_value())