From f5eb91ebe2f181bb281f988ab08613ac1fb35cd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 6 Mar 2020 15:51:05 +0200 Subject: [PATCH] threadshare/udpsrc: Port udpsrc to new API --- gst-plugin-threadshare/src/lib.rs | 6 +- gst-plugin-threadshare/src/socket.rs | 172 +++++++---- gst-plugin-threadshare/src/udpsrc.rs | 402 ++++++++++++------------- gst-plugin-threadshare/tests/udpsrc.rs | 7 +- 4 files changed, 314 insertions(+), 273 deletions(-) diff --git a/gst-plugin-threadshare/src/lib.rs b/gst-plugin-threadshare/src/lib.rs index 396205cf..07eac078 100644 --- a/gst-plugin-threadshare/src/lib.rs +++ b/gst-plugin-threadshare/src/lib.rs @@ -29,10 +29,10 @@ pub use tokio; #[macro_use] pub mod runtime; -//pub mod socket; +pub mod socket; //mod tcpclientsrc; //mod udpsink; -//mod udpsrc; +mod udpsrc; // //mod appsrc; //pub mod dataqueue; @@ -49,7 +49,7 @@ use gst::prelude::*; use gstreamer_sys as gst_ffi; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - //udpsrc::register(plugin)?; + udpsrc::register(plugin)?; //udpsink::register(plugin)?; //tcpclientsrc::register(plugin)?; //queue::register(plugin)?; diff --git a/gst-plugin-threadshare/src/socket.rs b/gst-plugin-threadshare/src/socket.rs index 1ff3ba44..54bb80b8 100644 --- a/gst-plugin-threadshare/src/socket.rs +++ b/gst-plugin-threadshare/src/socket.rs @@ -16,10 +16,8 @@ // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Boston, MA 02110-1335, USA. -use either::Either; - use futures::future::{abortable, AbortHandle, Aborted, BoxFuture}; -use futures::lock::Mutex; +use futures::prelude::*; use gst; use gst::prelude::*; @@ -28,7 +26,7 @@ use gst::{gst_debug, gst_error, gst_error_msg}; use lazy_static::lazy_static; use std::io; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use gio; use gio::prelude::*; @@ -49,7 +47,6 @@ lazy_static! { ); } -#[derive(Debug)] pub struct Socket(Arc>>); pub trait SocketRead: Send + Unpin { @@ -61,44 +58,51 @@ pub trait SocketRead: Send + Unpin { ) -> BoxFuture<'buf, io::Result<(usize, Option)>>; } -#[derive(PartialEq, Eq, Debug)] -enum SocketState { +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum SocketState { Paused, Prepared, Started, Unprepared, } -#[derive(Debug)] struct SocketInner { state: SocketState, element: gst::Element, - reader: T, buffer_pool: gst::BufferPool, clock: Option, base_time: Option, + create_read_handle: Option, + create_reader_fut: Option>>, read_handle: Option, + reader: Option, } impl Socket { - pub fn new(element: &gst::Element, reader: T, buffer_pool: gst::BufferPool) -> Self { - Socket(Arc::new(Mutex::new(SocketInner:: { + pub fn new( + element: &gst::Element, + buffer_pool: gst::BufferPool, + create_reader_fut: F, + ) -> Result + where + F: Future> + Send + 'static, + { + let socket = Socket(Arc::new(Mutex::new(SocketInner:: { state: SocketState::Unprepared, element: element.clone(), - reader, buffer_pool, clock: None, base_time: None, + create_read_handle: None, + create_reader_fut: Some(create_reader_fut.boxed()), read_handle: None, - }))) - } + reader: None, + }))); - pub async fn prepare(&self) -> Result, ()> { - // Null->Ready - let mut inner = self.0.lock().await; + let mut inner = socket.0.lock().unwrap(); if inner.state != SocketState::Unprepared { gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already prepared"); - return Ok(SocketStream::::new(self)); + return Err(()); } gst_debug!(SOCKET_CAT, obj: &inner.element, "Preparing socket"); @@ -106,28 +110,39 @@ impl Socket { gst_error!(SOCKET_CAT, obj: &inner.element, "Failed to prepare socket: {}", err); })?; inner.state = SocketState::Prepared; + drop(inner); - Ok(SocketStream::::new(self)) + Ok(socket) } - pub async fn start(&self, clock: Option, base_time: Option) { + pub fn state(&self) -> SocketState { + self.0.lock().unwrap().state + } + + pub fn start( + &self, + clock: Option, + base_time: Option, + ) -> Result, ()> { // Paused->Playing - let mut inner = self.0.lock().await; + let mut inner = self.0.lock().unwrap(); assert_ne!(SocketState::Unprepared, inner.state); if inner.state == SocketState::Started { gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already started"); - return; + return Err(()); } gst_debug!(SOCKET_CAT, obj: &inner.element, "Starting socket"); inner.clock = clock; inner.base_time = base_time; inner.state = SocketState::Started; + + Ok(SocketStream::::new(self)) } - pub async fn pause(&self) { + pub fn pause(&self) { // Playing->Paused - let mut inner = self.0.lock().await; + let mut inner = self.0.lock().unwrap(); assert_ne!(SocketState::Unprepared, inner.state); if inner.state != SocketState::Started { gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket not started"); @@ -142,22 +157,26 @@ impl Socket { read_handle.abort(); } } +} - pub async fn unprepare(&self) -> Result<(), ()> { +impl Drop for Socket { + fn drop(&mut self) { // Ready->Null - let mut inner = self.0.lock().await; + let mut inner = self.0.lock().unwrap(); assert_ne!(SocketState::Started, inner.state); if inner.state == SocketState::Unprepared { gst_debug!(SOCKET_CAT, obj: &inner.element, "Socket already unprepared"); - return Ok(()); + return; } - inner.buffer_pool.set_active(false).map_err(|err| { - gst_error!(SOCKET_CAT, obj: &inner.element, "Failed to unprepare socket: {}", err); - })?; - inner.state = SocketState::Unprepared; + if let Some(create_read_handle_handle) = inner.create_read_handle.take() { + create_read_handle_handle.abort(); + } - Ok(()) + if let Err(err) = inner.buffer_pool.set_active(false) { + gst_error!(SOCKET_CAT, obj: &inner.element, "Failed to unprepare socket: {}", err); + } + inner.state = SocketState::Unprepared; } } @@ -167,10 +186,14 @@ impl Clone for Socket { } } -pub type SocketStreamItem = - Result<(gst::Buffer, Option), Either>; +pub type SocketStreamItem = Result<(gst::Buffer, Option), SocketError>; #[derive(Debug)] +pub enum SocketError { + Gst(gst::FlowError), + Io(io::Error), +} + pub struct SocketStream { socket: Socket, mapped_buffer: Option>, @@ -184,18 +207,57 @@ impl SocketStream { } } - // Implementing `next` as an `async fn` instead of a `Stream` because of the `async` `Mutex` - // See https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/merge_requests/204#note_322774 #[allow(clippy::should_implement_trait)] pub async fn next(&mut self) -> Option { + // First create if needed + let (create_reader_fut, element) = { + let mut inner = self.socket.0.lock().unwrap(); + + if let Some(create_reader_fut) = inner.create_reader_fut.take() { + let (create_reader_fut, abort_handle) = abortable(create_reader_fut); + inner.create_read_handle = Some(abort_handle); + (Some(create_reader_fut), inner.element.clone()) + } else { + (None, inner.element.clone()) + } + }; + + if let Some(create_reader_fut) = create_reader_fut { + match create_reader_fut.await { + Ok(Ok(read)) => { + let mut inner = self.socket.0.lock().unwrap(); + inner.create_read_handle = None; + inner.reader = Some(read); + } + Ok(Err(err)) => { + gst_debug!(SOCKET_CAT, obj: &element, "Create reader error {:?}", err); + + return Some(Err(err)); + } + Err(Aborted) => { + gst_debug!(SOCKET_CAT, obj: &element, "Create reader Aborted"); + + return None; + } + } + } + // take the mapped_buffer before locking the socket so as to please the mighty borrow checker - let read_fut = { - let mut inner = self.socket.0.lock().await; + let (read_fut, clock, base_time) = { + let mut inner = self.socket.0.lock().unwrap(); if inner.state != SocketState::Started { gst_debug!(SOCKET_CAT, obj: &inner.element, "DataQueue is not Started"); return None; } + let reader = match inner.reader { + None => { + gst_debug!(SOCKET_CAT, obj: &inner.element, "Have no reader"); + return None; + } + Some(ref reader) => reader, + }; + gst_debug!(SOCKET_CAT, obj: &inner.element, "Trying to read data"); if self.mapped_buffer.is_none() { match inner.buffer_pool.acquire_buffer(None) { @@ -204,32 +266,34 @@ impl SocketStream { } Err(err) => { gst_debug!(SOCKET_CAT, obj: &inner.element, "Failed to acquire buffer {:?}", err); - return Some(Err(Either::Left(err))); + return Some(Err(SocketError::Gst(err))); } } } - let (read_fut, abort_handle) = abortable( - inner - .reader - .read(self.mapped_buffer.as_mut().unwrap().as_mut_slice()), - ); + let (read_fut, abort_handle) = + abortable(reader.read(self.mapped_buffer.as_mut().unwrap().as_mut_slice())); inner.read_handle = Some(abort_handle); - read_fut + (read_fut, inner.clock.clone(), inner.base_time) }; match read_fut.await { Ok(Ok((len, saddr))) => { - let inner = self.socket.0.lock().await; - let dts = if T::DO_TIMESTAMP { - let time = inner.clock.as_ref().unwrap().get_time(); - let running_time = time - inner.base_time.unwrap(); - gst_debug!(SOCKET_CAT, obj: &inner.element, "Read {} bytes at {} (clock {})", len, running_time, time); + let time = clock.as_ref().unwrap().get_time(); + let running_time = time - base_time.unwrap(); + gst_debug!( + SOCKET_CAT, + obj: &element, + "Read {} bytes at {} (clock {})", + len, + running_time, + time + ); running_time } else { - gst_debug!(SOCKET_CAT, obj: &inner.element, "Read {} bytes", len); + gst_debug!(SOCKET_CAT, obj: &element, "Read {} bytes", len); gst::CLOCK_TIME_NONE }; @@ -245,12 +309,12 @@ impl SocketStream { Some(Ok((buffer, saddr))) } Ok(Err(err)) => { - gst_debug!(SOCKET_CAT, obj: &self.socket.0.lock().await.element, "Read error {:?}", err); + gst_debug!(SOCKET_CAT, obj: &element, "Read error {:?}", err); - Some(Err(Either::Right(err))) + Some(Err(SocketError::Io(err))) } Err(Aborted) => { - gst_debug!(SOCKET_CAT, obj: &self.socket.0.lock().await.element, "Read Aborted"); + gst_debug!(SOCKET_CAT, obj: &element, "Read Aborted"); None } diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index 3178563a..a241deae 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -15,10 +15,8 @@ // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Boston, MA 02110-1335, USA. -use either::Either; - use futures::future::BoxFuture; -use futures::lock::Mutex; +use futures::lock::Mutex as FutMutex; use futures::prelude::*; use gio; @@ -41,13 +39,16 @@ use rand; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::Mutex as StdMutex; use std::sync::{self, Arc}; use std::u16; use crate::runtime::prelude::*; -use crate::runtime::{self, Context, JoinHandle, PadSrc, PadSrcRef}; +use crate::runtime::{Context, PadSrc, PadSrcRef}; -use super::socket::{wrap_socket, GioSocketWrapper, Socket, SocketRead, SocketStream}; +use super::socket::{ + wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead, SocketState, SocketStream, +}; const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1"); const DEFAULT_PORT: u32 = 5000; @@ -196,11 +197,11 @@ struct UdpReaderInner { } #[derive(Debug)] -pub struct UdpReader(Arc>); +pub struct UdpReader(Arc>); impl UdpReader { fn new(socket: tokio::net::UdpSocket) -> Self { - UdpReader(Arc::new(Mutex::new(UdpReaderInner { socket }))) + UdpReader(Arc::new(FutMutex::new(UdpReaderInner { socket }))) } } @@ -247,41 +248,39 @@ impl Default for UdpSrcPadHandlerState { #[derive(Debug, Default)] struct UdpSrcPadHandlerInner { state: sync::RwLock, - socket_stream: Mutex>>, - flush_join_handle: sync::Mutex>>>, } #[derive(Clone, Debug, Default)] struct UdpSrcPadHandler(Arc); impl UdpSrcPadHandler { - async fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) { + fn start_task( + &self, + pad: PadSrcRef<'_>, + element: &gst::Element, + socket_stream: SocketStream, + ) { let this = self.clone(); let pad_weak = pad.downgrade(); let element = element.clone(); + let socket_stream = Arc::new(FutMutex::new(socket_stream)); + pad.start_task(move || { let this = this.clone(); let pad_weak = pad_weak.clone(); let element = element.clone(); + let socket_stream = socket_stream.clone(); + async move { - let item = this - .0 - .socket_stream - .lock() - .await - .as_mut() - .expect("Missing SocketStream") - .next() - .await; + let item = socket_stream.lock().await.next().await; let pad = pad_weak.upgrade().expect("PadSrc no longer exists"); let (mut buffer, saddr) = match item { Some(Ok((buffer, saddr))) => (buffer, saddr), Some(Err(err)) => { - gst_error!(CAT, obj: &element, "Got error {}", err); + gst_error!(CAT, obj: &element, "Got error {:?}", err); match err { - Either::Left(gst::FlowError::CustomError) => (), - Either::Left(err) => { + SocketError::Gst(err) => { gst_element_error!( element, gst::StreamError::Failed, @@ -289,7 +288,7 @@ impl UdpSrcPadHandler { ["streaming stopped, reason {}", err] ); } - Either::Right(err) => { + SocketError::Io(err) => { gst_element_error!( element, gst::StreamError::Failed, @@ -298,12 +297,11 @@ impl UdpSrcPadHandler { ); } } - return; + return glib::Continue(false); } None => { gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped"); - pad.pause_task().await; - return; + return glib::Continue(false); } }; @@ -323,74 +321,83 @@ impl UdpSrcPadHandler { } } - this.push_buffer(pad, &element, buffer).await; - } - }) - .await; - } + let res = this.push_buffer(&pad, &element, buffer).await; - async fn push_buffer(&self, pad: PadSrcRef<'_>, element: &gst::Element, buffer: gst::Buffer) { - { - let mut events = Vec::new(); - { - // Only `read` the state in the hot path - if self.0.state.read().unwrap().need_initial_events { - // We will need to `write` and we also want to prevent - // any changes on the state while we are handling initial events - let mut state = self.0.state.write().unwrap(); - assert!(state.need_initial_events); - - gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); - - let stream_id = - format!("{:08x}{:08x}", rand::random::(), rand::random::()); - events.push( - gst::Event::new_stream_start(&stream_id) - .group_id(gst::GroupId::next()) - .build(), - ); - - if let Some(ref caps) = state.caps { - events.push(gst::Event::new_caps(&caps).build()); - state.configured_caps = Some(caps.clone()); + match res { + Ok(_) => { + gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer"); + glib::Continue(true) + } + Err(gst::FlowError::Flushing) => { + gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); + glib::Continue(false) + } + Err(gst::FlowError::Eos) => { + gst_debug!(CAT, obj: pad.gst_pad(), "EOS"); + let eos = gst::Event::new_eos().build(); + pad.push_event(eos).await; + glib::Continue(false) + } + Err(err) => { + gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err); + gst_element_error!( + element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + glib::Continue(false) } - events.push( - gst::Event::new_segment(&gst::FormattedSegment::::new()) - .build(), - ); - - state.need_initial_events = false; } } + }); + } - for event in events { - pad.push_event(event).await; + async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) { + let mut events = Vec::new(); + + // Only `read` the state in the hot path + if self.0.state.read().unwrap().need_initial_events { + // We will need to `write` and we also want to prevent + // any changes on the state while we are handling initial events + let mut state = self.0.state.write().unwrap(); + assert!(state.need_initial_events); + + gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); + + let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); + events.push( + gst::Event::new_stream_start(&stream_id) + .group_id(gst::GroupId::next()) + .build(), + ); + + if let Some(ref caps) = state.caps { + events.push(gst::Event::new_caps(&caps).build()); + state.configured_caps = Some(caps.clone()); } + events.push( + gst::Event::new_segment(&gst::FormattedSegment::::new()).build(), + ); + + state.need_initial_events = false; } - match pad.push(buffer).await { - Ok(_) => { - gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer"); - } - Err(gst::FlowError::Flushing) => { - gst_debug!(CAT, obj: pad.gst_pad(), "Flushing"); - pad.pause_task().await; - } - Err(gst::FlowError::Eos) => { - gst_debug!(CAT, obj: pad.gst_pad(), "EOS"); - pad.pause_task().await; - } - Err(err) => { - gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err); - gst_element_error!( - element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - } + for event in events { + pad.push_event(event).await; } } + + async fn push_buffer( + &self, + pad: &PadSrcRef<'_>, + element: &gst::Element, + buffer: gst::Buffer, + ) -> Result { + self.push_prelude(pad, element).await; + + pad.push(buffer).await + } } impl PadSrcHandler for UdpSrcPadHandler { @@ -399,68 +406,24 @@ impl PadSrcHandler for UdpSrcPadHandler { fn src_event( &self, pad: &PadSrcRef, - _udpsrc: &UdpSrc, + udpsrc: &UdpSrc, element: &gst::Element, event: gst::Event, - ) -> Either> { + ) -> bool { use gst::EventView; gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); let ret = match event.view() { EventView::FlushStart(..) => { - let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap(); - if flush_join_handle.is_none() { - let element = element.clone(); - let pad_weak = pad.downgrade(); - - *flush_join_handle = Some(pad.spawn(async move { - let res = UdpSrc::from_instance(&element).pause(&element).await; - let pad = pad_weak.upgrade().unwrap(); - if res.is_ok() { - gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete"); - } else { - gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed"); - } - - res - })); - } else { - gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress"); - } + udpsrc.pause(element).unwrap(); true } EventView::FlushStop(..) => { - let element = element.clone(); - let inner_weak = Arc::downgrade(&self.0); - let pad_weak = pad.downgrade(); + udpsrc.flush_stop(element); - let fut = async move { - let mut ret = false; - - let pad = pad_weak.upgrade().unwrap(); - let inner_weak = inner_weak.upgrade().unwrap(); - let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take(); - if let Some(flush_join_handle) = flush_join_handle { - if let Ok(Ok(())) = flush_join_handle.await { - ret = UdpSrc::from_instance(&element) - .start(&element) - .await - .is_ok(); - gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop complete"); - } else { - gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed"); - } - } else { - gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress"); - } - - ret - } - .boxed(); - - return Either::Right(fut); + true } EventView::Reconfigure(..) => true, EventView::Latency(..) => true, @@ -473,7 +436,7 @@ impl PadSrcHandler for UdpSrcPadHandler { gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); } - Either::Left(ret) + ret } fn src_query( @@ -526,23 +489,11 @@ impl PadSrcHandler for UdpSrcPadHandler { } } -#[derive(Debug)] -struct State { - socket: Option>, -} - -impl Default for State { - fn default() -> State { - State { socket: None } - } -} - -#[derive(Debug)] struct UdpSrc { src_pad: PadSrc, src_pad_handler: UdpSrcPadHandler, - state: Mutex, - settings: Mutex, + socket: StdMutex>>, + settings: StdMutex, } lazy_static! { @@ -554,9 +505,9 @@ lazy_static! { } impl UdpSrc { - async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { - let mut state = self.state.lock().await; - let mut settings = self.settings.lock().await.clone(); + fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> { + let mut socket_storage = self.socket.lock().unwrap(); + let mut settings = self.settings.lock().unwrap().clone(); gst_debug!(CAT, obj: element, "Preparing"); @@ -725,8 +676,10 @@ impl UdpSrc { ) })?; - let socket = Socket::new(element.upcast_ref(), UdpReader::new(socket), buffer_pool); - let socket_stream = socket.prepare().await.map_err(|err| { + let socket = Socket::new(element.upcast_ref(), buffer_pool, async move { + Ok(UdpReader::new(socket)) + }) + .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, ["Failed to prepare socket {:?}", err] @@ -736,20 +689,16 @@ impl UdpSrc { { let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap(); src_pad_handler_state.retrieve_sender_address = settings.retrieve_sender_address; - src_pad_handler_state.caps = settings.caps.clone(); + src_pad_handler_state.caps = settings.caps; } - drop(settings); - - *self.src_pad_handler.0.socket_stream.lock().await = Some(socket_stream); - - state.socket = Some(socket); + *socket_storage = Some(socket); + drop(socket_storage); element.notify("used-socket"); self.src_pad .prepare(context, &self.src_pad_handler) - .await .map_err(|err| { gst_error_msg!( gst::ResourceError::OpenRead, @@ -762,65 +711,102 @@ impl UdpSrc { Ok(()) } - async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { - let mut state = self.state.lock().await; + fn unprepare(&self, element: &gst::Element) -> Result<(), ()> { gst_debug!(CAT, obj: element, "Unpreparing"); - self.settings.lock().await.used_socket = None; + self.settings.lock().unwrap().used_socket = None; + element.notify("used-socket"); - self.src_pad.stop_task().await; - - *self.src_pad_handler.0.socket_stream.lock().await = None; - - { - let socket = state.socket.take().unwrap(); - socket.unprepare().await.unwrap(); + if let Some(socket) = self.socket.lock().unwrap().take() { + drop(socket); } - let _ = self.src_pad.unprepare().await; + let _ = self.src_pad.unprepare(); + + *self.src_pad_handler.0.state.write().unwrap() = Default::default(); + + gst_debug!(CAT, obj: element, "Unprepared"); + + Ok(()) + } + + fn stop(&self, element: &gst::Element) -> Result<(), ()> { + gst_debug!(CAT, obj: element, "Stopping"); + + // Now stop the task if it was still running, blocking + // until this has actually happened + self.src_pad.stop_task(); + self.src_pad_handler .0 .state .write() .unwrap() - .configured_caps = None; + .need_initial_events = true; + + gst_debug!(CAT, obj: element, "Stopped"); - gst_debug!(CAT, obj: element, "Unprepared"); Ok(()) } - async fn start(&self, element: &gst::Element) -> Result<(), ()> { - let state = self.state.lock().await; - gst_debug!(CAT, obj: element, "Starting"); + fn start(&self, element: &gst::Element) -> Result<(), ()> { + let socket = self.socket.lock().unwrap(); + if let Some(socket) = socket.as_ref() { + if socket.state() == SocketState::Started { + gst_debug!(CAT, obj: element, "Already started"); + return Err(()); + } - if let Some(ref socket) = state.socket { - socket - .start(element.get_clock(), Some(element.get_base_time())) - .await; + gst_debug!(CAT, obj: element, "Starting"); + + self.start_unchecked(element, socket); + + gst_debug!(CAT, obj: element, "Started"); + + Ok(()) + } else { + Err(()) + } + } + + fn flush_stop(&self, element: &gst::Element) { + // Keep the lock on the `socket` until `flush_stop` is complete + // so as to prevent race conditions due to concurrent state transitions. + // Note that this won't deadlock as it doesn't lock the `SocketStream` + // in use within the `src_pad`'s `Task`. + let socket = self.socket.lock().unwrap(); + let socket = socket.as_ref().unwrap(); + if socket.state() == SocketState::Started { + gst_debug!(CAT, obj: element, "Already started"); + return; } - self.src_pad_handler - .start_task(self.src_pad.as_ref(), element) - .await; + gst_debug!(CAT, obj: element, "Stopping Flush"); - gst_debug!(CAT, obj: element, "Started"); + self.src_pad.stop_task(); + self.start_unchecked(element, socket); - Ok(()) + gst_debug!(CAT, obj: element, "Stopped Flush"); } - async fn pause(&self, element: &gst::Element) -> Result<(), ()> { - let pause_completion = { - let state = self.state.lock().await; - gst_debug!(CAT, obj: element, "Pausing"); + fn start_unchecked(&self, element: &gst::Element, socket: &Socket) { + let socket_stream = socket + .start(element.get_clock(), Some(element.get_base_time())) + .unwrap(); - let pause_completion = self.src_pad.pause_task().await; - state.socket.as_ref().unwrap().pause().await; + self.src_pad_handler + .start_task(self.src_pad.as_ref(), element, socket_stream); + } - pause_completion - }; + fn pause(&self, element: &gst::Element) -> Result<(), ()> { + let socket = self.socket.lock().unwrap(); + gst_debug!(CAT, obj: element, "Pausing"); - gst_debug!(CAT, obj: element, "Waiting for Task Pause to complete"); - pause_completion.await; + if let Some(socket) = socket.as_ref() { + socket.pause(); + } + + self.src_pad.cancel_task(); gst_debug!(CAT, obj: element, "Paused"); @@ -880,8 +866,8 @@ impl ObjectSubclass for UdpSrc { Self { src_pad, src_pad_handler: UdpSrcPadHandler::default(), - state: Mutex::new(State::default()), - settings: Mutex::new(Settings::default()), + socket: StdMutex::new(None), + settings: StdMutex::new(Settings::default()), } } } @@ -892,7 +878,7 @@ impl ObjectImpl for UdpSrc { fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; - let mut settings = runtime::executor::block_on(self.settings.lock()); + let mut settings = self.settings.lock().unwrap(); match *prop { subclass::Property("address", ..) => { settings.address = value.get().expect("type checked upstream"); @@ -937,7 +923,7 @@ impl ObjectImpl for UdpSrc { fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { let prop = &PROPERTIES[id]; - let settings = runtime::executor::block_on(self.settings.lock()); + let settings = self.settings.lock().unwrap(); match *prop { subclass::Property("address", ..) => Ok(settings.address.to_value()), subclass::Property("port", ..) => Ok(settings.port.to_value()), @@ -982,18 +968,16 @@ impl ElementImpl for UdpSrc { match transition { gst::StateChange::NullToReady => { - runtime::executor::block_on(self.prepare(element)).map_err(|err| { + self.prepare(element).map_err(|err| { element.post_error_message(&err); gst::StateChangeError })?; } gst::StateChange::PlayingToPaused => { - runtime::executor::block_on(self.pause(element)) - .map_err(|_| gst::StateChangeError)?; + self.pause(element).map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { - runtime::executor::block_on(self.unprepare(element)) - .map_err(|_| gst::StateChangeError)?; + self.unprepare(element).map_err(|_| gst::StateChangeError)?; } _ => (), } @@ -1005,19 +989,13 @@ impl ElementImpl for UdpSrc { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { - runtime::executor::block_on(self.start(element)) - .map_err(|_| gst::StateChangeError)?; + self.start(element).map_err(|_| gst::StateChangeError)?; } gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToReady => { - self.src_pad_handler - .0 - .state - .write() - .unwrap() - .need_initial_events = true; + self.stop(element).map_err(|_| gst::StateChangeError)?; } _ => (), } diff --git a/gst-plugin-threadshare/tests/udpsrc.rs b/gst-plugin-threadshare/tests/udpsrc.rs index fca3dd91..eadfac74 100644 --- a/gst-plugin-threadshare/tests/udpsrc.rs +++ b/gst-plugin-threadshare/tests/udpsrc.rs @@ -82,18 +82,17 @@ fn test_push() { use gst::EventView; let event = h.pull_event().unwrap(); - // The StickyEvent for the TaskContext is pushed first match event.view() { EventView::StreamStart(..) => { - assert_eq!(n_events, 1); + assert_eq!(n_events, 0); } EventView::Caps(ev) => { - assert_eq!(n_events, 2); + assert_eq!(n_events, 1); let event_caps = ev.get_caps(); assert_eq!(caps.as_ref(), event_caps); } EventView::Segment(..) => { - assert_eq!(n_events, 3); + assert_eq!(n_events, 2); break; } _ => (),