threadshare/tcpclientsrc: Port tcpclientsrc to new API

This commit is contained in:
Sebastian Dröge 2020-03-13 13:49:03 +02:00
parent 5f5f0fe866
commit ded3af31c1
2 changed files with 207 additions and 282 deletions

View file

@ -30,7 +30,7 @@ pub use tokio;
pub mod runtime;
pub mod socket;
//mod tcpclientsrc;
mod tcpclientsrc;
mod udpsink;
mod udpsrc;
@ -51,7 +51,7 @@ use gstreamer_sys as gst_ffi;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
udpsrc::register(plugin)?;
udpsink::register(plugin)?;
//tcpclientsrc::register(plugin)?;
tcpclientsrc::register(plugin)?;
//queue::register(plugin)?;
//proxy::register(plugin)?;
appsrc::register(plugin)?;

View file

@ -16,9 +16,8 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
use either::Either;
use futures::future::BoxFuture;
use futures::lock::Mutex;
use futures::lock::Mutex as FutMutex;
use futures::prelude::*;
use glib;
@ -38,15 +37,16 @@ use rand;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::Mutex as StdMutex;
use std::sync::{self, Arc};
use std::u16;
use tokio::io::AsyncReadExt;
use crate::runtime::prelude::*;
use crate::runtime::{self, Context, JoinHandle, PadSrc, PadSrcRef};
use crate::runtime::{Context, PadSrc, PadSrcRef};
use super::socket::{Socket, SocketRead, SocketStream};
use super::socket::{Socket, SocketError, SocketRead, SocketState, SocketStream};
const DEFAULT_ADDRESS: Option<&str> = Some("127.0.0.1");
const DEFAULT_PORT: u32 = 5000;
@ -145,11 +145,11 @@ struct TcpClientReaderInner {
socket: tokio::net::TcpStream,
}
pub struct TcpClientReader(Arc<Mutex<TcpClientReaderInner>>);
pub struct TcpClientReader(Arc<FutMutex<TcpClientReaderInner>>);
impl TcpClientReader {
pub fn new(socket: tokio::net::TcpStream) -> Self {
TcpClientReader(Arc::new(Mutex::new(TcpClientReaderInner { socket })))
TcpClientReader(Arc::new(FutMutex::new(TcpClientReaderInner { socket })))
}
}
@ -194,41 +194,39 @@ impl Default for TcpClientSrcPadHandlerState {
#[derive(Debug, Default)]
struct TcpClientSrcPadHandlerInner {
state: sync::RwLock<TcpClientSrcPadHandlerState>,
socket_stream: Mutex<Option<SocketStream<TcpClientReader>>>,
flush_join_handle: sync::Mutex<Option<JoinHandle<Result<(), ()>>>>,
}
#[derive(Clone, Debug, Default)]
struct TcpClientSrcPadHandler(Arc<TcpClientSrcPadHandlerInner>);
impl TcpClientSrcPadHandler {
async fn start_task(&self, pad: PadSrcRef<'_>, element: &gst::Element) {
fn start_task(
&self,
pad: PadSrcRef<'_>,
element: &gst::Element,
socket_stream: SocketStream<TcpClientReader>,
) {
let this = self.clone();
let pad_weak = pad.downgrade();
let element = element.clone();
let socket_stream = Arc::new(FutMutex::new(socket_stream));
pad.start_task(move || {
let this = this.clone();
let pad_weak = pad_weak.clone();
let element = element.clone();
let socket_stream = socket_stream.clone();
async move {
let item = this
.0
.socket_stream
.lock()
.await
.as_mut()
.expect("Missing SocketStream")
.next()
.await;
let item = socket_stream.lock().await.next().await;
let pad = pad_weak.upgrade().expect("PadSrc no longer exists");
let buffer = match item {
Some(Ok((buffer, _))) => buffer,
Some(Err(err)) => {
gst_error!(CAT, obj: &element, "Got error {}", err);
gst_error!(CAT, obj: &element, "Got error {:?}", err);
match err {
Either::Left(gst::FlowError::CustomError) => (),
Either::Left(err) => {
SocketError::Gst(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
@ -236,7 +234,7 @@ impl TcpClientSrcPadHandler {
["streaming stopped, reason {}", err]
);
}
Either::Right(err) => {
SocketError::Io(err) => {
gst_element_error!(
element,
gst::StreamError::Failed,
@ -245,87 +243,97 @@ impl TcpClientSrcPadHandler {
);
}
}
return;
return glib::Continue(false);
}
None => {
gst_log!(CAT, obj: pad.gst_pad(), "SocketStream Stopped");
pad.pause_task().await;
return;
return glib::Continue(false);
}
};
this.push_buffer(pad, &element, buffer).await;
let res = this.push_buffer(&pad, &element, buffer).await;
match res {
Ok(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer");
glib::Continue(true)
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
glib::Continue(false)
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
let eos = gst::Event::new_eos().build();
pad.push_event(eos).await;
glib::Continue(false)
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
glib::Continue(false)
}
}
}
})
.await;
});
}
async fn push_buffer(&self, pad: PadSrcRef<'_>, element: &gst::Element, buffer: gst::Buffer) {
{
let mut events = Vec::new();
{
// Only `read` the state in the hot path
if self.0.state.read().unwrap().need_initial_events {
// We will need to `write` and we also want to prevent
// any changes on the state while we are handling initial events
let mut state = self.0.state.write().unwrap();
assert!(state.need_initial_events);
async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &gst::Element) {
let mut events = Vec::new();
gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
// Only `read` the state in the hot path
if self.0.state.read().unwrap().need_initial_events {
// We will need to `write` and we also want to prevent
// any changes on the state while we are handling initial events
let mut state = self.0.state.write().unwrap();
assert!(state.need_initial_events);
let stream_id =
format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
events.push(
gst::Event::new_stream_start(&stream_id)
.group_id(gst::GroupId::next())
.build(),
);
gst_debug!(CAT, obj: pad.gst_pad(), "Pushing initial events");
if let Some(ref caps) = state.caps {
events.push(gst::Event::new_caps(&caps).build());
state.configured_caps = Some(caps.clone());
}
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new())
.build(),
);
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
events.push(
gst::Event::new_stream_start(&stream_id)
.group_id(gst::GroupId::next())
.build(),
);
state.need_initial_events = false;
}
if buffer.get_size() == 0 {
events.push(gst::Event::new_eos().build());
}
if let Some(ref caps) = state.caps {
events.push(gst::Event::new_caps(&caps).build());
state.configured_caps = Some(caps.clone());
}
events.push(
gst::Event::new_segment(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
);
for event in events {
pad.push_event(event).await;
}
state.need_initial_events = false;
}
match pad.push(buffer).await {
Ok(_) => {
gst_log!(CAT, obj: pad.gst_pad(), "Successfully pushed buffer");
}
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: pad.gst_pad(), "Flushing");
pad.pause_task().await;
}
Err(gst::FlowError::Eos) => {
gst_debug!(CAT, obj: pad.gst_pad(), "EOS");
pad.pause_task().await;
}
Err(err) => {
gst_error!(CAT, obj: pad.gst_pad(), "Got error {}", err);
gst_element_error!(
element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
for event in events {
pad.push_event(event).await;
}
}
async fn push_buffer(
&self,
pad: &PadSrcRef<'_>,
element: &gst::Element,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.push_prelude(pad, element).await;
if buffer.get_size() == 0 {
let event = gst::Event::new_eos().build();
pad.push_event(event).await;
return Ok(gst::FlowSuccess::Ok);
}
pad.push(buffer).await
}
}
impl PadSrcHandler for TcpClientSrcPadHandler {
@ -334,68 +342,24 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
fn src_event(
&self,
pad: &PadSrcRef,
_tcpclientsrc: &TcpClientSrc,
tcpclientsrc: &TcpClientSrc,
element: &gst::Element,
event: gst::Event,
) -> Either<bool, BoxFuture<'static, bool>> {
) -> bool {
use gst::EventView;
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
let mut flush_join_handle = self.0.flush_join_handle.lock().unwrap();
if flush_join_handle.is_none() {
let element = element.clone();
let pad_weak = pad.downgrade();
*flush_join_handle = Some(pad.spawn(async move {
let res = TcpClientSrc::from_instance(&element).pause(&element).await;
let pad = pad_weak.upgrade().unwrap();
if res.is_ok() {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart failed");
}
res
}));
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStart ignored: previous Flush in progress");
}
tcpclientsrc.pause(element).unwrap();
true
}
EventView::FlushStop(..) => {
let element = element.clone();
let inner_weak = Arc::downgrade(&self.0);
let pad_weak = pad.downgrade();
tcpclientsrc.flush_stop(element);
let fut = async move {
let mut ret = false;
let pad = pad_weak.upgrade().unwrap();
let inner_weak = inner_weak.upgrade().unwrap();
let flush_join_handle = inner_weak.flush_join_handle.lock().unwrap().take();
if let Some(flush_join_handle) = flush_join_handle {
if let Ok(Ok(())) = flush_join_handle.await {
ret = TcpClientSrc::from_instance(&element)
.start(&element)
.await
.is_ok();
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop complete");
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop aborted: FlushStart failed");
}
} else {
gst_debug!(CAT, obj: pad.gst_pad(), "FlushStop ignored: no Flush in progress");
}
ret
}
.boxed();
return Either::Right(fut);
true
}
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
@ -408,7 +372,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event);
}
Either::Left(ret)
ret
}
fn src_query(
@ -460,28 +424,11 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
}
}
struct State {
socket: Option<Socket<TcpClientReader>>,
}
impl Default for State {
fn default() -> State {
State { socket: None }
}
}
#[derive(Debug)]
struct PreparationSet {
join_handle: JoinHandle<Result<(), gst::ErrorMessage>>,
context: Context,
}
struct TcpClientSrc {
src_pad: PadSrc,
src_pad_handler: TcpClientSrcPadHandler,
state: Mutex<State>,
settings: Mutex<Settings>,
preparation_set: Mutex<Option<PreparationSet>>,
socket: StdMutex<Option<Socket<TcpClientReader>>>,
settings: StdMutex<Settings>,
}
lazy_static! {
@ -493,39 +440,19 @@ lazy_static! {
}
impl TcpClientSrc {
async fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let _state = self.state.lock().await;
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let mut socket_storage = self.socket.lock().unwrap();
let settings = self.settings.lock().unwrap().clone();
gst_debug!(CAT, obj: element, "Preparing");
let context = {
let settings = self.settings.lock().await;
self.src_pad_handler.0.state.write().unwrap().caps = settings.caps.clone();
let context =
Context::acquire(&settings.context, settings.context_wait).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to acquire Context: {}", err]
)
})?
};
// TcpStream needs to be instantiated in the thread of its I/O reactor
*self.preparation_set.lock().await = Some(PreparationSet {
join_handle: context.spawn(Self::prepare_socket(element.clone())),
context,
});
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
async fn prepare_socket(element: gst::Element) -> Result<(), gst::ErrorMessage> {
let this = Self::from_instance(&element);
let settings = this.settings.lock().await.clone();
gst_debug!(CAT, obj: &element, "Preparing Socket");
})?;
let addr: IpAddr = match settings.address {
None => {
@ -546,15 +473,6 @@ impl TcpClientSrc {
};
let port = settings.port;
let saddr = SocketAddr::new(addr, port as u16);
gst_debug!(CAT, obj: &element, "Connecting to {:?}", saddr);
let socket = tokio::net::TcpStream::connect(saddr).await.map_err(|err| {
gst_error_msg!(
gst::ResourceError::Settings,
["Failed to connect to {:?} {:?}", saddr, err]
)
})?;
let buffer_pool = gst::BufferPool::new();
let mut config = buffer_pool.get_config();
config.set_params(None, settings.chunk_size, 0, 0);
@ -565,49 +483,32 @@ impl TcpClientSrc {
)
})?;
let socket = Socket::new(
element.upcast_ref(),
TcpClientReader::new(socket),
buffer_pool,
);
let socket_stream = socket.prepare().await.map_err(|err| {
let saddr = SocketAddr::new(addr, port as u16);
let element_clone = element.clone();
let socket = Socket::new(element.upcast_ref(), buffer_pool, async move {
gst_debug!(CAT, obj: &element_clone, "Connecting to {:?}", saddr);
let socket = tokio::net::TcpStream::connect(saddr)
.await
.map_err(SocketError::Io)?;
Ok(TcpClientReader::new(socket))
})
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
["Failed to prepare socket {:?}", err]
)
})?;
*this.src_pad_handler.0.socket_stream.lock().await = Some(socket_stream);
this.state.lock().await.socket = Some(socket);
gst_debug!(CAT, obj: &element, "Socket Prepared");
Ok(())
}
async fn complete_preparation(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
let preparation_set = self.preparation_set.lock().await.take();
if preparation_set.is_none() {
gst_log!(CAT, obj: element, "Preparation already completed");
return Ok(());
{
let mut src_pad_handler_state = self.src_pad_handler.0.state.write().unwrap();
src_pad_handler_state.caps = settings.caps;
}
gst_debug!(CAT, obj: element, "Completing preparation");
let PreparationSet {
join_handle,
context,
} = preparation_set.unwrap();
join_handle
.await
.expect("The socket preparation has panicked")?;
*socket_storage = Some(socket);
drop(socket_storage);
self.src_pad
.prepare(context, &self.src_pad_handler)
.await
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
@ -615,68 +516,104 @@ impl TcpClientSrc {
)
})?;
gst_debug!(CAT, obj: element, "Preparation completed");
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
async fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().await;
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Unpreparing");
self.src_pad.stop_task().await;
*self.src_pad_handler.0.socket_stream.lock().await = None;
{
let socket = state.socket.take().unwrap();
socket.unprepare().await.unwrap();
if let Some(socket) = self.socket.lock().unwrap().take() {
drop(socket);
}
let _ = self.src_pad.unprepare().await;
let _ = self.src_pad.unprepare();
*self.src_pad_handler.0.state.write().unwrap() = Default::default();
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
fn stop(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Stopping");
// Now stop the task if it was still running, blocking
// until this has actually happened
self.src_pad.stop_task();
self.src_pad_handler
.0
.state
.write()
.unwrap()
.configured_caps = None;
.need_initial_events = true;
gst_debug!(CAT, obj: element, "Stopped");
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
}
async fn start(&self, element: &gst::Element) -> Result<(), ()> {
let state = self.state.lock().await;
gst_debug!(CAT, obj: element, "Starting");
fn start(&self, element: &gst::Element) -> Result<(), ()> {
let socket = self.socket.lock().unwrap();
if let Some(socket) = socket.as_ref() {
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return Err(());
}
if let Some(ref socket) = state.socket {
socket
.start(element.get_clock(), Some(element.get_base_time()))
.await;
gst_debug!(CAT, obj: element, "Starting");
self.start_unchecked(element, socket);
gst_debug!(CAT, obj: element, "Started");
Ok(())
} else {
Err(())
}
}
fn flush_stop(&self, element: &gst::Element) {
// Keep the lock on the `socket` until `flush_stop` is complete
// so as to prevent race conditions due to concurrent state transitions.
// Note that this won't deadlock as it doesn't lock the `SocketStream`
// in use within the `src_pad`'s `Task`.
let socket = self.socket.lock().unwrap();
let socket = socket.as_ref().unwrap();
if socket.state() == SocketState::Started {
gst_debug!(CAT, obj: element, "Already started");
return;
}
self.src_pad_handler
.start_task(self.src_pad.as_ref(), element)
.await;
gst_debug!(CAT, obj: element, "Stopping Flush");
gst_debug!(CAT, obj: element, "Started");
self.src_pad.stop_task();
self.start_unchecked(element, socket);
Ok(())
gst_debug!(CAT, obj: element, "Stopped Flush");
}
async fn pause(&self, element: &gst::Element) -> Result<(), ()> {
let pause_completion = {
let state = self.state.lock().await;
gst_debug!(CAT, obj: element, "Pausing");
fn start_unchecked(&self, element: &gst::Element, socket: &Socket<TcpClientReader>) {
let socket_stream = socket
.start(element.get_clock(), Some(element.get_base_time()))
.unwrap();
let pause_completion = self.src_pad.pause_task().await;
state.socket.as_ref().unwrap().pause().await;
self.src_pad_handler
.start_task(self.src_pad.as_ref(), element, socket_stream);
}
pause_completion
};
fn pause(&self, element: &gst::Element) -> Result<(), ()> {
let socket = self.socket.lock().unwrap();
gst_debug!(CAT, obj: element, "Pausing");
gst_debug!(CAT, obj: element, "Waiting for Task Pause to complete");
pause_completion.await;
if let Some(socket) = socket.as_ref() {
socket.pause();
}
self.src_pad.cancel_task();
gst_debug!(CAT, obj: element, "Paused");
@ -720,9 +657,8 @@ impl ObjectSubclass for TcpClientSrc {
Self {
src_pad,
src_pad_handler: TcpClientSrcPadHandler::default(),
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
preparation_set: Mutex::new(None),
socket: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
}
}
}
@ -733,7 +669,7 @@ impl ObjectImpl for TcpClientSrc {
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let mut settings = runtime::executor::block_on(self.settings.lock());
let mut settings = self.settings.lock().unwrap();
match *prop {
subclass::Property("address", ..) => {
settings.address = value.get().expect("type checked upstream");
@ -763,7 +699,7 @@ impl ObjectImpl for TcpClientSrc {
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
let settings = runtime::executor::block_on(self.settings.lock());
let settings = self.settings.lock().unwrap();
match *prop {
subclass::Property("address", ..) => Ok(settings.address.to_value()),
subclass::Property("port", ..) => Ok(settings.port.to_value()),
@ -795,24 +731,16 @@ impl ElementImpl for TcpClientSrc {
match transition {
gst::StateChange::NullToReady => {
runtime::executor::block_on(self.prepare(element)).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::ReadyToPaused => {
runtime::executor::block_on(self.complete_preparation(element)).map_err(|err| {
self.prepare(element).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
runtime::executor::block_on(self.pause(element))
.map_err(|_| gst::StateChangeError)?;
self.pause(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
runtime::executor::block_on(self.unprepare(element))
.map_err(|_| gst::StateChangeError)?;
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
@ -821,19 +749,16 @@ impl ElementImpl for TcpClientSrc {
match transition {
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::Success;
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
runtime::executor::block_on(self.start(element))
.map_err(|_| gst::StateChangeError)?;
self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.src_pad_handler
.0
.state
.write()
.unwrap()
.need_initial_events = true;
self.stop(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}