threadshare/udpsrc: Port udpsrc to new API

This commit is contained in:
Sebastian Dröge 2020-03-06 15:51:05 +02:00
parent 4c584fd162
commit f5eb91ebe2
4 changed files with 314 additions and 273 deletions

View file

@ -29,10 +29,10 @@ pub use tokio;
#[macro_use] #[macro_use]
pub mod runtime; pub mod runtime;
//pub mod socket; pub mod socket;
//mod tcpclientsrc; //mod tcpclientsrc;
//mod udpsink; //mod udpsink;
//mod udpsrc; mod udpsrc;
// //
//mod appsrc; //mod appsrc;
//pub mod dataqueue; //pub mod dataqueue;
@ -49,7 +49,7 @@ use gst::prelude::*;
use gstreamer_sys as gst_ffi; use gstreamer_sys as gst_ffi;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
//udpsrc::register(plugin)?; udpsrc::register(plugin)?;
//udpsink::register(plugin)?; //udpsink::register(plugin)?;
//tcpclientsrc::register(plugin)?; //tcpclientsrc::register(plugin)?;
//queue::register(plugin)?; //queue::register(plugin)?;

View file

@ -16,10 +16,8 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use either::Either;
use futures::future::{abortable, AbortHandle, Aborted, BoxFuture}; use futures::future::{abortable, AbortHandle, Aborted, BoxFuture};
use futures::lock::Mutex; use futures::prelude::*;
use gst; use gst;
use gst::prelude::*; use gst::prelude::*;
@ -28,7 +26,7 @@ use gst::{gst_debug, gst_error, gst_error_msg};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::io; use std::io;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use gio; use gio;
use gio::prelude::*; use gio::prelude::*;
@ -49,7 +47,6 @@ lazy_static! {
); );
} }
#[derive(Debug)]
pub struct Socket<T: SocketRead + 'static>(Arc<Mutex<SocketInner<T>>>); pub struct Socket<T: SocketRead + 'static>(Arc<Mutex<SocketInner<T>>>);
pub trait SocketRead: Send + Unpin { pub trait SocketRead: Send + Unpin {
@ -61,44 +58,51 @@ pub trait SocketRead: Send + Unpin {
) -> BoxFuture<'buf, io::Result<(usize, Option<std::net::SocketAddr>)>>; ) -> BoxFuture<'buf, io::Result<(usize, Option<std::net::SocketAddr>)>>;
} }
#[derive(PartialEq, Eq, Debug)] #[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum SocketState { pub enum SocketState {
Paused, Paused,
Prepared, Prepared,
Started, Started,
Unprepared, Unprepared,
} }
#[derive(Debug)]
struct SocketInner<T: SocketRead + 'static> { struct SocketInner<T: SocketRead + 'static> {
state: SocketState, state: SocketState,
element: gst::Element, element: gst::Element,
reader: T,
buffer_pool: gst::BufferPool, buffer_pool: gst::BufferPool,
clock: Option<gst::Clock>, clock: Option<gst::Clock>,
base_time: Option<gst::ClockTime>, base_time: Option<gst::ClockTime>,
create_read_handle: Option<AbortHandle>,
create_reader_fut: Option<BoxFuture<'static, Result<T, SocketError>>>,
read_handle: Option<AbortHandle>, read_handle: Option<AbortHandle>,
reader: Option<T>,
} }
impl<T: SocketRead + 'static> Socket<T> { impl<T: SocketRead + 'static> Socket<T> {
pub fn new(element: &gst::Element, reader: T, buffer_pool: gst::BufferPool) -> Self { pub fn new<F>(
Socket(Arc::new(Mutex::new(SocketInner::<T> { element: &gst::Element,
buffer_pool: gst::BufferPool,
create_reader_fut: F,
) -> Result<Self, ()>
where
F: Future<Output = Result<T, SocketError>> + Send + 'static,
{
let socket = Socket(Arc::new(Mutex::new(SocketInner::<T> {
state: SocketState::Unprepared, state: SocketState::Unprepared,
element: element.clone(), element: element.clone(),
reader,
buffer_pool, buffer_pool,
clock: None, clock: None,
base_time: None, base_time: None,
create_read_handle: None,
create_reader_fut: Some(create_reader_fut.boxed()),
read_handle: None, read_handle: None,
}))) reader: None,
} })));
pub async fn prepare(&self) -> Result<SocketStream<T>, ()> { let mut inner = socket.0.lock().unwrap();
// Null->Ready
let mut inner = self.0.lock().await;
if inner.state != SocketState::Unprepared { if inner.state != SocketState::Unprepared {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already prepared"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already prepared");
return Ok(SocketStream::<T>::new(self)); return Err(());
} }
gst_debug!(SOCKET_CAT, obj: &inner.element, "Preparing socket"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Preparing socket");
@ -106,28 +110,39 @@ impl<T: SocketRead + 'static> Socket<T> {
gst_error!(SOCKET_CAT, obj: &inner.element, "Failed to prepare socket: {}", err); gst_error!(SOCKET_CAT, obj: &inner.element, "Failed to prepare socket: {}", err);
})?; })?;
inner.state = SocketState::Prepared; inner.state = SocketState::Prepared;
drop(inner);
Ok(SocketStream::<T>::new(self)) Ok(socket)
} }
pub async fn start(&self, clock: Option<gst::Clock>, base_time: Option<gst::ClockTime>) { pub fn state(&self) -> SocketState {
self.0.lock().unwrap().state
}
pub fn start(
&self,
clock: Option<gst::Clock>,
base_time: Option<gst::ClockTime>,
) -> Result<SocketStream<T>, ()> {
// Paused->Playing // Paused->Playing
let mut inner = self.0.lock().await; let mut inner = self.0.lock().unwrap();
assert_ne!(SocketState::Unprepared, inner.state); assert_ne!(SocketState::Unprepared, inner.state);
if inner.state == SocketState::Started { if inner.state == SocketState::Started {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already started"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already started");
return; return Err(());
} }
gst_debug!(SOCKET_CAT, obj: &inner.element, "Starting socket"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Starting socket");
inner.clock = clock; inner.clock = clock;
inner.base_time = base_time; inner.base_time = base_time;
inner.state = SocketState::Started; inner.state = SocketState::Started;
Ok(SocketStream::<T>::new(self))
} }
pub async fn pause(&self) { pub fn pause(&self) {
// Playing->Paused // Playing->Paused
let mut inner = self.0.lock().await; let mut inner = self.0.lock().unwrap();
assert_ne!(SocketState::Unprepared, inner.state); assert_ne!(SocketState::Unprepared, inner.state);
if inner.state != SocketState::Started { if inner.state != SocketState::Started {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket not started"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket not started");
@ -142,22 +157,26 @@ impl<T: SocketRead + 'static> Socket<T> {
read_handle.abort(); read_handle.abort();
} }
} }
}
pub async fn unprepare(&self) -> Result<(), ()> { impl<T: SocketRead> Drop for Socket<T> {
fn drop(&mut self) {
// Ready->Null // Ready->Null
let mut inner = self.0.lock().await; let mut inner = self.0.lock().unwrap();
assert_ne!(SocketState::Started, inner.state); assert_ne!(SocketState::Started, inner.state);
if inner.state == SocketState::Unprepared { if inner.state == SocketState::Unprepared {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already unprepared"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already unprepared");
return Ok(()); return;
} }
inner.buffer_pool.set_active(false).map_err(|err| { if let Some(create_read_handle_handle) = inner.create_read_handle.take() {
gst_error!(SOCKET_CAT, obj: &inner.element, "Failed to unprepare socket: {}", err); create_read_handle_handle.abort();
})?; }
inner.state = SocketState::Unprepared;
Ok(()) if let Err(err) = inner.buffer_pool.set_active(false) {
gst_error!(SOCKET_CAT, obj: &inner.element, "Failed to unprepare socket: {}", err);
}
inner.state = SocketState::Unprepared;
} }
} }
@ -167,10 +186,14 @@ impl<T: SocketRead + Unpin + 'static> Clone for Socket<T> {
} }
} }
pub type SocketStreamItem = pub type SocketStreamItem = Result<(gst::Buffer, Option<std::net::SocketAddr>), SocketError>;
Result<(gst::Buffer, Option<std::net::SocketAddr>), Either<gst::FlowError, io::Error>>;
#[derive(Debug)] #[derive(Debug)]
pub enum SocketError {
Gst(gst::FlowError),
Io(io::Error),
}
pub struct SocketStream<T: SocketRead + 'static> { pub struct SocketStream<T: SocketRead + 'static> {
socket: Socket<T>, socket: Socket<T>,
mapped_buffer: Option<gst::MappedBuffer<gst::buffer::Writable>>, mapped_buffer: Option<gst::MappedBuffer<gst::buffer::Writable>>,
@ -184,18 +207,57 @@ impl<T: SocketRead + 'static> SocketStream<T> {
} }
} }
// Implementing `next` as an `async fn` instead of a `Stream` because of the `async` `Mutex`
// See https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/merge_requests/204#note_322774
#[allow(clippy::should_implement_trait)] #[allow(clippy::should_implement_trait)]
pub async fn next(&mut self) -> Option<SocketStreamItem> { pub async fn next(&mut self) -> Option<SocketStreamItem> {
// First create if needed
let (create_reader_fut, element) = {
let mut inner = self.socket.0.lock().unwrap();
if let Some(create_reader_fut) = inner.create_reader_fut.take() {
let (create_reader_fut, abort_handle) = abortable(create_reader_fut);
inner.create_read_handle = Some(abort_handle);
(Some(create_reader_fut), inner.element.clone())
} else {
(None, inner.element.clone())
}
};
if let Some(create_reader_fut) = create_reader_fut {
match create_reader_fut.await {
Ok(Ok(read)) => {
let mut inner = self.socket.0.lock().unwrap();
inner.create_read_handle = None;
inner.reader = Some(read);
}
Ok(Err(err)) => {
gst_debug!(SOCKET_CAT, obj: &element, "Create reader error {:?}", err);
return Some(Err(err));
}
Err(Aborted) => {
gst_debug!(SOCKET_CAT, obj: &element, "Create reader Aborted");
return None;
}
}
}
// take the mapped_buffer before locking the socket so as to please the mighty borrow checker // take the mapped_buffer before locking the socket so as to please the mighty borrow checker
let read_fut = { let (read_fut, clock, base_time) = {
let mut inner = self.socket.0.lock().await; let mut inner = self.socket.0.lock().unwrap();
if inner.state != SocketState::Started { if inner.state != SocketState::Started {
gst_debug!(SOCKET_CAT, obj: &inner.element, "DataQueue is not Started"); gst_debug!(SOCKET_CAT, obj: &inner.element, "DataQueue is not Started");
return None; return None;
} }
let reader = match inner.reader {
None => {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Have no reader");
return None;
}
Some(ref reader) => reader,
};
gst_debug!(SOCKET_CAT, obj: &inner.element, "Trying to read data"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Trying to read data");
if self.mapped_buffer.is_none() { if self.mapped_buffer.is_none() {
match inner.buffer_pool.acquire_buffer(None) { match inner.buffer_pool.acquire_buffer(None) {
@ -204,32 +266,34 @@ impl<T: SocketRead + 'static> SocketStream<T> {
} }
Err(err) => { Err(err) => {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Failed to acquire buffer {:?}", err); gst_debug!(SOCKET_CAT, obj: &inner.element, "Failed to acquire buffer {:?}", err);
return Some(Err(Either::Left(err))); return Some(Err(SocketError::Gst(err)));
} }
} }
} }
let (read_fut, abort_handle) = abortable( let (read_fut, abort_handle) =
inner abortable(reader.read(self.mapped_buffer.as_mut().unwrap().as_mut_slice()));
.reader
.read(self.mapped_buffer.as_mut().unwrap().as_mut_slice()),
);
inner.read_handle = Some(abort_handle); inner.read_handle = Some(abort_handle);
read_fut (read_fut, inner.clock.clone(), inner.base_time)
}; };
match read_fut.await { match read_fut.await {
Ok(Ok((len, saddr))) => { Ok(Ok((len, saddr))) => {
let inner = self.socket.0.lock().await;
let dts = if T::DO_TIMESTAMP { let dts = if T::DO_TIMESTAMP {
let time = inner.clock.as_ref().unwrap().get_time(); let time = clock.as_ref().unwrap().get_time();
let running_time = time - inner.base_time.unwrap(); let running_time = time - base_time.unwrap();
gst_debug!(SOCKET_CAT, obj: &inner.element, "Read {} bytes at {} (clock {})", len, running_time, time); gst_debug!(
SOCKET_CAT,
obj: &element,
"Read {} bytes at {} (clock {})",
len,
running_time,
time
);
running_time running_time
} else { } else {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Read {} bytes", len); gst_debug!(SOCKET_CAT, obj: &element, "Read {} bytes", len);
gst::CLOCK_TIME_NONE gst::CLOCK_TIME_NONE
}; };
@ -245,12 +309,12 @@ impl<T: SocketRead + 'static> SocketStream<T> {
Some(Ok((buffer, saddr))) Some(Ok((buffer, saddr)))
} }
Ok(Err(err)) => { Ok(Err(err)) => {
gst_debug!(SOCKET_CAT, obj: &self.socket.0.lock().await.element, "Read error {:?}", err); gst_debug!(SOCKET_CAT, obj: &element, "Read error {:?}", err);
Some(Err(Either::Right(err))) Some(Err(SocketError::Io(err)))
} }
Err(Aborted) => { Err(Aborted) => {
gst_debug!(SOCKET_CAT, obj: &self.socket.0.lock().await.element, "Read Aborted"); gst_debug!(SOCKET_CAT, obj: &element, "Read Aborted");
None None
} }

View file

@ -15,10 +15,8 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use either::Either;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::lock::Mutex; use futures::lock::Mutex as FutMutex;
use futures::prelude::*; use futures::prelude::*;
use gio; use gio;
@ -41,13 +39,16 @@ use rand;
use std::io; use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Mutex as StdMutex;
use std::sync::{self, Arc}; use std::sync::{self, Arc};
use std::u16; use std::u16;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{self, Context, JoinHandle, PadSrc, PadSrcRef}; use crate::runtime::{Context, PadSrc, PadSrcRef};
use super::socket::{wrap_socket, GioSocketWrapper, Socket, SocketRead, SocketStream}; use super::socket::{
wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead, SocketState, SocketStream,
};
const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1"); const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1");
const DEFAULT_PORT: u32 = 5000; const DEFAULT_PORT: u32 = 5000;
@ -196,11 +197,11 @@ struct UdpReaderInner {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct UdpReader(Arc<Mutex<UdpReaderInner>>); pub struct UdpReader(Arc<FutMutex<UdpReaderInner>>);
impl UdpReader { impl UdpReader {
fn new(socket: tokio::net::UdpSocket) -> Self { fn new(socket: tokio::net::UdpSocket) -> Self {
UdpReader(Arc::new(Mutex::new(UdpReaderInner { socket }))) UdpReader(Arc::new(FutMutex::new(UdpReaderInner { socket })))
} }
} }
@ -247,41 +248,39 @@ impl Default for UdpSrcPadHandlerState {
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct UdpSrcPadHandlerInner { struct UdpSrcPadHandlerInner {
state: sync::RwLock<UdpSrcPadHandlerState>, state: sync::RwLock<UdpSrcPadHandlerState>,
socket_stream: Mutex<Option<SocketStream<UdpReader>>>,
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>,
} }
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
struct UdpSrcPadHandler(Arc<UdpSrcPadHandlerInner>); struct UdpSrcPadHandler(Arc<UdpSrcPadHandlerInner>);
impl UdpSrcPadHandler { impl UdpSrcPadHandler {
async fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) { fn start_task(
&self,
pad: PadSrcRef<'_>,
element: &gst::Element,
socket_stream: SocketStream<UdpReader>,
) {
let this = self.clone(); let this = self.clone();
let pad_weak = pad.downgrade(); let pad_weak = pad.downgrade();
let element = element.clone(); let element = element.clone();
let socket_stream = Arc::new(FutMutex::new(socket_stream));
pad.start_task(move || { pad.start_task(move || {
let this = this.clone(); let this = this.clone();
let pad_weak = pad_weak.clone(); let pad_weak = pad_weak.clone();
let element = element.clone(); let element = element.clone();
let socket_stream = socket_stream.clone();
async move { async move {
let item = this let item = socket_stream.lock().await.next().await;
.0
.socket_stream
.lock()
.await
.as_mut()
.expect("Missing SocketStream")
.next()
.await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let (mut buffer, saddr) = match item { let (mut buffer, saddr) = match item {
Some(Ok((buffer, saddr))) => (buffer, saddr), Some(Ok((buffer, saddr))) => (buffer, saddr),
Some(Err(err)) => { Some(Err(err)) => {
gst_error!(CAT, obj: &element, "Got error {}", err); gst_error!(CAT, obj: &element, "Got error {:?}", err);
match err { match err {
Either::Left(gst::FlowError::CustomError) => (), SocketError::Gst(err) => {
Either::Left(err) => {
gst_element_error!( gst_element_error!(
element, element,
gst::StreamError::Failed, gst::StreamError::Failed,
@ -289,7 +288,7 @@ impl UdpSrcPadHandler {
["streaming stopped, reason {}", err] ["streaming stopped, reason {}", err]
); );
} }
Either::Right(err) => { SocketError::Io(err) => {
gst_element_error!( gst_element_error!(
element, element,
gst::StreamError::Failed, gst::StreamError::Failed,
@ -298,12 +297,11 @@ impl UdpSrcPadHandler {
); );
} }
} }
return; return glib::Continue(false);
} }
None => { None => {
gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped"); gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped");
pad.pause_task().await; return glib::Continue(false);
return;
} }
}; };
@ -323,74 +321,83 @@ impl UdpSrcPadHandler {
} }
} }
this.push_buffer(pad, &element, buffer).await; let res = this.push_buffer(&pad, &element, buffer).await;
}
})
.await;
}
async fn push_buffer(&self, pad: PadSrcRef<'_>, element: &gst::Element, buffer: gst::Buffer) { match res {
{ Ok(_) => {
let mut events = Vec::new(); gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer");
{ glib::Continue(true)
// Only `read` the state in the hot path }
if self.0.state.read().unwrap().need_initial_events { Err(gst::FlowError::Flushing) => {
// We will need to `write` and we also want to prevent gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
// any changes on the state while we are handling initial events glib::Continue(false)
let mut state = self.0.state.write().unwrap(); }
assert!(state.need_initial_events); Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
let stream_id = glib::Continue(false)
format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>()); }
events.push( Err(err) => {
gst::Event::new_stream_start(&stream_id) gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
.group_id(gst::GroupId::next()) gst_element_error!(
.build(), element,
); gst::StreamError::Failed,
("Internal data stream error"),
if let Some(ref caps) = state.caps { ["streaming stopped, reason {}", err]
events.push(gst::Event::new_caps(&caps).build()); );
state.configured_caps = Some(caps.clone()); glib::Continue(false)
} }
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
state.need_initial_events = false;
} }
} }
});
}
for event in events { async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) {
pad.push_event(event).await; let mut events = Vec::new();
// Only `read` the state in the hot path
if self.0.state.read().unwrap().need_initial_events {
// We will need to `write` and we also want to prevent
// any changes on the state while we are handling initial events
let mut state = self.0.state.write().unwrap();
assert!(state.need_initial_events);
gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
events.push(
gst::Event::new_stream_start(&stream_id)
.group_id(gst::GroupId::next())
.build(),
);
if let Some(ref caps) = state.caps {
events.push(gst::Event::new_caps(&caps).build());
state.configured_caps = Some(caps.clone());
} }
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
);
state.need_initial_events = false;
} }
match pad.push(buffer).await { for event in events {
Ok(_) => { pad.push_event(event).await;
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer");
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
pad.pause_task().await;
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
pad.pause_task().await;
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
} }
} }
async fn push_buffer(
&self,
pad: &PadSrcRef<'_>,
element: &gst::Element,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.push_prelude(pad, element).await;
pad.push(buffer).await
}
} }
impl PadSrcHandler for UdpSrcPadHandler { impl PadSrcHandler for UdpSrcPadHandler {
@ -399,68 +406,24 @@ impl PadSrcHandler for UdpSrcPadHandler {
fn src_event( fn src_event(
&self, &self,
pad: &PadSrcRef, pad: &PadSrcRef,
_udpsrc: &UdpSrc, udpsrc: &UdpSrc,
element: &gst::Element, element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> { ) -> bool {
use gst::EventView; use gst::EventView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() { let ret = match event.view() {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); udpsrc.pause(element).unwrap();
if flush_join_handle.is_none() {
let element = element.clone();
let pad_weak = pad.downgrade();
*flush_join_handle = Some(pad.spawn(async move {
let res = UdpSrc::from_instance(&element).pause(&element).await;
let pad = pad_weak.upgrade().unwrap();
if res.is_ok() {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed");
}
res
}));
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress");
}
true true
} }
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
let element = element.clone(); udpsrc.flush_stop(element);
let inner_weak = Arc::downgrade(&self.0);
let pad_weak = pad.downgrade();
let fut = async move { true
let mut ret = false;
let pad = pad_weak.upgrade().unwrap();
let inner_weak = inner_weak.upgrade().unwrap();
let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take();
if let Some(flush_join_handle) = flush_join_handle {
if let Ok(Ok(())) = flush_join_handle.await {
ret = UdpSrc::from_instance(&element)
.start(&element)
.await
.is_ok();
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed");
}
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
}
ret
}
.boxed();
return Either::Right(fut);
} }
EventView::Reconfigure(..) => true, EventView::Reconfigure(..) => true,
EventView::Latency(..) => true, EventView::Latency(..) => true,
@ -473,7 +436,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
} }
Either::Left(ret) ret
} }
fn src_query( fn src_query(
@ -526,23 +489,11 @@ impl PadSrcHandler for UdpSrcPadHandler {
} }
} }
#[derive(Debug)]
struct State {
socket: Option<Socket<UdpReader>>,
}
impl Default for State {
fn default() -> State {
State { socket: None }
}
}
#[derive(Debug)]
struct UdpSrc { struct UdpSrc {
src_pad: PadSrc, src_pad: PadSrc,
src_pad_handler: UdpSrcPadHandler, src_pad_handler: UdpSrcPadHandler,
state: Mutex<State>, socket: StdMutex<Option<Socket<UdpReader>>>,
settings: Mutex<Settings>, settings: StdMutex<Settings>,
} }
lazy_static! { lazy_static! {
@ -554,9 +505,9 @@ lazy_static! {
} }
impl UdpSrc { impl UdpSrc {
async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().await; let mut socket_storage = self.socket.lock().unwrap();
let mut settings = self.settings.lock().await.clone(); let mut settings = self.settings.lock().unwrap().clone();
gst_debug!(CAT, obj: element, "Preparing"); gst_debug!(CAT, obj: element, "Preparing");
@ -725,8 +676,10 @@ impl UdpSrc {
) )
})?; })?;
let socket = Socket::new(element.upcast_ref(), UdpReader::new(socket), buffer_pool); let socket = Socket::new(element.upcast_ref(), buffer_pool, async move {
let socket_stream = socket.prepare().await.map_err(|err| { Ok(UdpReader::new(socket))
})
.map_err(|err| {
gst_error_msg!( gst_error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
["Failed to prepare socket {:?}", err] ["Failed to prepare socket {:?}", err]
@ -736,20 +689,16 @@ impl UdpSrc {
{ {
let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap(); let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap();
src_pad_handler_state.retrieve_sender_address = settings.retrieve_sender_address; src_pad_handler_state.retrieve_sender_address = settings.retrieve_sender_address;
src_pad_handler_state.caps = settings.caps.clone(); src_pad_handler_state.caps = settings.caps;
} }
drop(settings); *socket_storage = Some(socket);
drop(socket_storage);
*self.src_pad_handler.0.socket_stream.lock().await = Some(socket_stream);
state.socket = Some(socket);
element.notify("used-socket"); element.notify("used-socket");
self.src_pad self.src_pad
.prepare(context, &self.src_pad_handler) .prepare(context, &self.src_pad_handler)
.await
.map_err(|err| { .map_err(|err| {
gst_error_msg!( gst_error_msg!(
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
@ -762,65 +711,102 @@ impl UdpSrc {
Ok(()) Ok(())
} }
async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().await;
gst_debug!(CAT, obj: element, "Unpreparing"); gst_debug!(CAT, obj: element, "Unpreparing");
self.settings.lock().await.used_socket = None; self.settings.lock().unwrap().used_socket = None;
element.notify("used-socket");
self.src_pad.stop_task().await; if let Some(socket) = self.socket.lock().unwrap().take() {
drop(socket);
*self.src_pad_handler.0.socket_stream.lock().await = None;
{
let socket = state.socket.take().unwrap();
socket.unprepare().await.unwrap();
} }
let _ = self.src_pad.unprepare().await; let _ = self.src_pad.unprepare();
*self.src_pad_handler.0.state.write().unwrap() = Default::default();
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Stopping");
// Now stop the task if it was still running, blocking
// until this has actually happened
self.src_pad.stop_task();
self.src_pad_handler self.src_pad_handler
.0 .0
.state .state
.write() .write()
.unwrap() .unwrap()
.configured_caps = None; .need_initial_events = true;
gst_debug!(CAT, obj: element, "Stopped");
gst_debug!(CAT, obj: element, "Unprepared");
Ok(()) Ok(())
} }
async fn start(&self, element: &gst::Element) -> Result<(), ()> { fn start(&self, element: &gst::Element) -> Result<(), ()> {
let state = self.state.lock().await; let socket = self.socket.lock().unwrap();
gst_debug!(CAT, obj: element, "Starting"); if let Some(socket) = socket.as_ref() {
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return Err(());
}
if let Some(ref socket) = state.socket { gst_debug!(CAT, obj: element, "Starting");
socket
.start(element.get_clock(), Some(element.get_base_time())) self.start_unchecked(element, socket);
.await;
gst_debug!(CAT, obj: element, "Started");
Ok(())
} else {
Err(())
}
}
fn flush_stop(&self, element: &gst::Element) {
// Keep the lock on the `socket` until `flush_stop` is complete
// so as to prevent race conditions due to concurrent state transitions.
// Note that this won't deadlock as it doesn't lock the `SocketStream`
// in use within the `src_pad`'s `Task`.
let socket = self.socket.lock().unwrap();
let socket = socket.as_ref().unwrap();
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return;
} }
self.src_pad_handler gst_debug!(CAT, obj: element, "Stopping Flush");
.start_task(self.src_pad.as_ref(), element)
.await;
gst_debug!(CAT, obj: element, "Started"); self.src_pad.stop_task();
self.start_unchecked(element, socket);
Ok(()) gst_debug!(CAT, obj: element, "Stopped Flush");
} }
async fn pause(&self, element: &gst::Element) -> Result<(), ()> { fn start_unchecked(&self, element: &gst::Element, socket: &Socket<UdpReader>) {
let pause_completion = { let socket_stream = socket
let state = self.state.lock().await; .start(element.get_clock(), Some(element.get_base_time()))
gst_debug!(CAT, obj: element, "Pausing"); .unwrap();
let pause_completion = self.src_pad.pause_task().await; self.src_pad_handler
state.socket.as_ref().unwrap().pause().await; .start_task(self.src_pad.as_ref(), element, socket_stream);
}
pause_completion fn pause(&self, element: &gst::Element) -> Result<(), ()> {
}; let socket = self.socket.lock().unwrap();
gst_debug!(CAT, obj: element, "Pausing");
gst_debug!(CAT, obj: element, "Waiting for Task Pause to complete"); if let Some(socket) = socket.as_ref() {
pause_completion.await; socket.pause();
}
self.src_pad.cancel_task();
gst_debug!(CAT, obj: element, "Paused"); gst_debug!(CAT, obj: element, "Paused");
@ -880,8 +866,8 @@ impl ObjectSubclass for UdpSrc {
Self { Self {
src_pad, src_pad,
src_pad_handler: UdpSrcPadHandler::default(), src_pad_handler: UdpSrcPadHandler::default(),
state: Mutex::new(State::default()), socket: StdMutex::new(None),
settings: Mutex::new(Settings::default()), settings: StdMutex::new(Settings::default()),
} }
} }
} }
@ -892,7 +878,7 @@ impl ObjectImpl for UdpSrc {
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id]; let prop = &PROPERTIES[id];
let mut settings = runtime::executor::block_on(self.settings.lock()); let mut settings = self.settings.lock().unwrap();
match *prop { match *prop {
subclass::Property("address", ..) => { subclass::Property("address", ..) => {
settings.address = value.get().expect("type checked upstream"); settings.address = value.get().expect("type checked upstream");
@ -937,7 +923,7 @@ impl ObjectImpl for UdpSrc {
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> { fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id]; let prop = &PROPERTIES[id];
let settings = runtime::executor::block_on(self.settings.lock()); let settings = self.settings.lock().unwrap();
match *prop { match *prop {
subclass::Property("address", ..) => Ok(settings.address.to_value()), subclass::Property("address", ..) => Ok(settings.address.to_value()),
subclass::Property("port", ..) => Ok(settings.port.to_value()), subclass::Property("port", ..) => Ok(settings.port.to_value()),
@ -982,18 +968,16 @@ impl ElementImpl for UdpSrc {
match transition { match transition {
gst::StateChange::NullToReady => { gst::StateChange::NullToReady => {
runtime::executor::block_on(self.prepare(element)).map_err(|err| { self.prepare(element).map_err(|err| {
element.post_error_message(&err); element.post_error_message(&err);
gst::StateChangeError gst::StateChangeError
})?; })?;
} }
gst::StateChange::PlayingToPaused => { gst::StateChange::PlayingToPaused => {
runtime::executor::block_on(self.pause(element)) self.pause(element).map_err(|_| gst::StateChangeError)?;
.map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::ReadyToNull => { gst::StateChange::ReadyToNull => {
runtime::executor::block_on(self.unprepare(element)) self.unprepare(element).map_err(|_| gst::StateChangeError)?;
.map_err(|_| gst::StateChangeError)?;
} }
_ => (), _ => (),
} }
@ -1005,19 +989,13 @@ impl ElementImpl for UdpSrc {
success = gst::StateChangeSuccess::NoPreroll; success = gst::StateChangeSuccess::NoPreroll;
} }
gst::StateChange::PausedToPlaying => { gst::StateChange::PausedToPlaying => {
runtime::executor::block_on(self.start(element)) self.start(element).map_err(|_| gst::StateChangeError)?;
.map_err(|_| gst::StateChangeError)?;
} }
gst::StateChange::PlayingToPaused => { gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll; success = gst::StateChangeSuccess::NoPreroll;
} }
gst::StateChange::PausedToReady => { gst::StateChange::PausedToReady => {
self.src_pad_handler self.stop(element).map_err(|_| gst::StateChangeError)?;
.0
.state
.write()
.unwrap()
.need_initial_events = true;
} }
_ => (), _ => (),
} }

View file

@ -82,18 +82,17 @@ fn test_push() {
use gst::EventView; use gst::EventView;
let event = h.pull_event().unwrap(); let event = h.pull_event().unwrap();
// The StickyEvent for the TaskContext is pushed first
match event.view() { match event.view() {
EventView::StreamStart(..) => { EventView::StreamStart(..) => {
assert_eq!(n_events, 1); assert_eq!(n_events, 0);
} }
EventView::Caps(ev) => { EventView::Caps(ev) => {
assert_eq!(n_events, 2); assert_eq!(n_events, 1);
let event_caps = ev.get_caps(); let event_caps = ev.get_caps();
assert_eq!(caps.as_ref(), event_caps); assert_eq!(caps.as_ref(), event_caps);
} }
EventView::Segment(..) => { EventView::Segment(..) => {
assert_eq!(n_events, 3); assert_eq!(n_events, 2);
break; break;
} }
_ => (), _ => (),