ts-udpsrc: Add retrieve-sender-address property

Fixes #41
This commit is contained in:
Abdul Rehman 2019-03-13 21:07:53 +05:00
parent cdbd5c3c91
commit 2f3139dea2
5 changed files with 86 additions and 17 deletions

View file

@ -15,6 +15,8 @@ gio = { git = "https://github.com/gtk-rs/gio" }
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] }
gstreamer-app = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-check = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-net = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
tokio = "0.1"
tokio-reactor = "0.1"
tokio-executor = "0.1"

View file

@ -29,6 +29,7 @@ extern crate gio;
extern crate glib;
#[macro_use]
extern crate gstreamer as gst;
extern crate gstreamer_net as gst_net;
extern crate futures;
extern crate tokio;

View file

@ -51,7 +51,10 @@ enum SocketState {
pub trait SocketRead: Send {
const DO_TIMESTAMP: bool;
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error>;
fn poll_read(
&mut self,
buf: &mut [u8],
) -> Poll<(usize, Option<std::net::SocketAddr>), io::Error>;
}
struct SocketInner<T: SocketRead + 'static> {
@ -81,7 +84,7 @@ impl<T: SocketRead + 'static> Socket<T> {
pub fn schedule<U, F, G>(&self, io_context: &IOContext, func: F, err_func: G) -> Result<(), ()>
where
F: Fn(gst::Buffer) -> U + Send + 'static,
F: Fn((gst::Buffer, Option<std::net::SocketAddr>)) -> U + Send + 'static,
U: IntoFuture<Item = (), Error = gst::FlowError> + 'static,
<U as IntoFuture>::Future: Send + 'static,
G: FnOnce(Either<gst::FlowError, io::Error>) + Send + 'static,
@ -112,7 +115,9 @@ impl<T: SocketRead + 'static> Socket<T> {
let element_clone = inner.element.clone();
io_context.spawn(
stream
.for_each(move |buffer| func(buffer).into_future().map_err(Either::Left))
.for_each(move |(buffer, saddr)| {
func((buffer, saddr)).into_future().map_err(Either::Left)
})
.then(move |res| {
gst_debug!(
SOCKET_CAT,
@ -227,7 +232,7 @@ struct SocketStream<T: SocketRead + 'static>(
);
impl<T: SocketRead + 'static> Stream for SocketStream<T> {
type Item = gst::Buffer;
type Item = (gst::Buffer, Option<std::net::SocketAddr>);
type Error = Either<gst::FlowError, io::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -244,7 +249,7 @@ impl<T: SocketRead + 'static> Stream for SocketStream<T> {
assert_eq!(inner.state, SocketState::Running);
gst_debug!(SOCKET_CAT, obj: &inner.element, "Trying to read data");
let (len, time) = {
let (len, saddr, time) = {
let mut buffer = match self.1 {
Some(ref mut buffer) => buffer,
None => match inner.buffer_pool.acquire_buffer(None) {
@ -269,7 +274,7 @@ impl<T: SocketRead + 'static> Stream for SocketStream<T> {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Read error {:?}", err);
return Err(Either::Right(err));
}
Ok(Async::Ready(len)) => {
Ok(Async::Ready((len, saddr))) => {
let dts = if T::DO_TIMESTAMP {
let time = inner.clock.as_ref().unwrap().get_time();
let running_time = time - inner.base_time.unwrap();
@ -279,7 +284,7 @@ impl<T: SocketRead + 'static> Stream for SocketStream<T> {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Read {} bytes", len);
gst::CLOCK_TIME_NONE
};
(len, dts)
(len, saddr, dts)
}
}
};
@ -293,6 +298,6 @@ impl<T: SocketRead + 'static> Stream for SocketStream<T> {
buffer.set_dts(time);
}
Ok(Async::Ready(Some(buffer)))
Ok(Async::Ready(Some((buffer, saddr))))
}
}

View file

@ -151,7 +151,10 @@ impl TcpClientReader {
impl SocketRead for TcpClientReader {
const DO_TIMESTAMP: bool = false;
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
fn poll_read(
&mut self,
buf: &mut [u8],
) -> Poll<(usize, Option<std::net::SocketAddr>), io::Error> {
let socket = match self.socket {
Some(ref mut socket) => socket,
None => match self.connect_future.poll() {
@ -165,8 +168,15 @@ impl SocketRead for TcpClientReader {
_ => return Ok(Async::NotReady),
},
};
socket.poll_read(buf)
match socket.poll_read(buf) {
Ok(Async::Ready(result)) => {
return Ok(Async::Ready((result, None)));
}
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Err(result) => return Err(result),
};
}
}
@ -458,7 +468,7 @@ impl TcpClientSrc {
socket
.schedule(
&io_context,
move |buffer| {
move |(buffer, _)| {
let tcpclientsrc = Self::from_instance(&element_clone);
tcpclientsrc.push_buffer(&element_clone, buffer)
},

View file

@ -22,6 +22,7 @@ use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_net::*;
use gio;
@ -34,7 +35,7 @@ use std::u16;
use futures;
use futures::future;
use futures::{Future, Poll};
use futures::{Async, Future, Poll};
use tokio::net;
use either::Either;
@ -61,6 +62,7 @@ const DEFAULT_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_USED_SOCKET: Option<GioSocketWrapper> = None;
const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: u32 = 0;
const DEFAULT_RETRIEVE_SENDER_ADDRESS: bool = true;
// Send/Sync struct for passing around a gio::Socket
// and getting the raw fd from it
@ -166,6 +168,7 @@ struct Settings {
used_socket: Option<GioSocketWrapper>,
context: String,
context_wait: u32,
retrieve_sender_address: bool,
}
impl Default for Settings {
@ -180,11 +183,12 @@ impl Default for Settings {
used_socket: DEFAULT_USED_SOCKET,
context: DEFAULT_CONTEXT.into(),
context_wait: DEFAULT_CONTEXT_WAIT,
retrieve_sender_address: DEFAULT_RETRIEVE_SENDER_ADDRESS,
}
}
}
static PROPERTIES: [subclass::Property; 9] = [
static PROPERTIES: [subclass::Property; 10] = [
subclass::Property("address", |name| {
glib::ParamSpec::string(
name,
@ -272,6 +276,15 @@ static PROPERTIES: [subclass::Property; 9] = [
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("retrieve-sender-address", |name| {
glib::ParamSpec::boolean(
name,
"Retrieve sender address",
"Whether to retrieve the sender address and add it to buffers as meta. Disabling this might result in minor performance improvements in certain scenarios",
DEFAULT_REUSE,
glib::ParamFlags::READWRITE,
)
}),
];
pub struct UdpReader {
@ -287,8 +300,19 @@ impl UdpReader {
impl SocketRead for UdpReader {
const DO_TIMESTAMP: bool = true;
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
self.socket.poll_recv(buf)
fn poll_read(
&mut self,
buf: &mut [u8],
) -> Poll<(usize, Option<std::net::SocketAddr>), io::Error> {
match self.socket.poll_recv_from(buf) {
Ok(Async::Ready(result)) => {
return Ok(Async::Ready((result.0, Some(result.1))));
}
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Err(result) => return Err(result),
};
}
}
@ -743,11 +767,30 @@ impl UdpSrc {
let element_clone = element.clone();
let element_clone2 = element.clone();
let retrieve_sender_address = self.settings.lock().unwrap().retrieve_sender_address;
socket
.schedule(
&io_context,
move |buffer| {
move |(mut buffer, saddr)| {
let udpsrc = Self::from_instance(&element_clone);
if let Some(saddr) = saddr {
if retrieve_sender_address {
let inet_addr = match saddr.ip() {
IpAddr::V4(ip) => gio::InetAddress::new_from_bytes(
gio::InetAddressBytes::V4(&ip.octets()),
),
IpAddr::V6(ip) => gio::InetAddress::new_from_bytes(
gio::InetAddressBytes::V6(&ip.octets()),
),
};
let inet_socket_addr =
&gio::InetSocketAddress::new(&inet_addr, saddr.port());
NetAddressMeta::add(buffer.get_mut().unwrap(), inet_socket_addr);
}
}
udpsrc.push_buffer(&element_clone, buffer)
},
move |err| {
@ -978,6 +1021,10 @@ impl ObjectImpl for UdpSrc {
let mut settings = self.settings.lock().unwrap();
settings.context_wait = value.get().unwrap();
}
subclass::Property("retrieve-sender-address", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.retrieve_sender_address = value.get().unwrap();
}
_ => unimplemented!(),
}
}
@ -1030,6 +1077,10 @@ impl ObjectImpl for UdpSrc {
let mut settings = self.settings.lock().unwrap();
Ok(settings.context_wait.to_value())
}
subclass::Property("retrieve-sender-address", ..) => {
let mut settings = self.settings.lock().unwrap();
Ok(settings.retrieve_sender_address.to_value())
}
_ => unimplemented!(),
}
}