diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index 5b03ad09..69fe7b2b 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -19,7 +19,6 @@ // SPDX-License-Identifier: LGPL-2.1-or-later use futures::future::BoxFuture; -use futures::lock::Mutex as FutMutex; use futures::prelude::*; use gst::glib; @@ -30,15 +29,14 @@ use once_cell::sync::Lazy; use std::io; use std::net::{IpAddr, SocketAddr, TcpStream}; -use std::sync::Arc; -use std::sync::Mutex as StdMutex; +use std::sync::Mutex; use std::time::Duration; use std::u16; use std::u32; use crate::runtime::prelude::*; use crate::runtime::task; -use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState}; +use crate::runtime::{Context, PadSrc, PadSrcRef, Task, TaskState}; use crate::runtime::Async; use crate::socket::{Socket, SocketError, SocketRead}; @@ -92,95 +90,8 @@ impl SocketRead for TcpClientReader { } } -#[derive(Debug)] -struct TcpClientSrcPadHandlerState { - need_initial_events: bool, - need_segment: bool, - caps: Option, -} - -impl Default for TcpClientSrcPadHandlerState { - fn default() -> Self { - TcpClientSrcPadHandlerState { - need_initial_events: true, - need_segment: true, - caps: None, - } - } -} - -#[derive(Debug, Default)] -struct TcpClientSrcPadHandlerInner { - state: FutMutex, - configured_caps: StdMutex>, -} - -#[derive(Clone, Debug, Default)] -struct TcpClientSrcPadHandler(Arc); - -impl TcpClientSrcPadHandler { - fn prepare(&self, caps: Option) { - self.0 - .state - .try_lock() - .expect("State locked elsewhere") - .caps = caps; - } - - async fn reset_state(&self) { - *self.0.configured_caps.lock().unwrap() = None; - } - - async fn set_need_segment(&self) { - self.0.state.lock().await.need_segment = true; - } - - async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &super::TcpClientSrc) { - let mut state = self.0.state.lock().await; - if state.need_initial_events { - gst::debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); - - let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); - let stream_start_evt = gst::event::StreamStart::builder(&stream_id) - .group_id(gst::GroupId::next()) - .build(); - pad.push_event(stream_start_evt).await; - - if let Some(ref caps) = state.caps { - pad.push_event(gst::event::Caps::new(caps)).await; - *self.0.configured_caps.lock().unwrap() = Some(caps.clone()); - } - - state.need_initial_events = false; - } - - if state.need_segment { - let segment_evt = - gst::event::Segment::new(&gst::FormattedSegment::::new()); - pad.push_event(segment_evt).await; - - state.need_segment = false; - } - } - - async fn push_buffer( - &self, - pad: &PadSrcRef<'_>, - element: &super::TcpClientSrc, - buffer: gst::Buffer, - ) -> Result { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); - - self.push_prelude(pad, element).await; - - if buffer.size() == 0 { - pad.push_event(gst::event::Eos::new()).await; - return Ok(gst::FlowSuccess::Ok); - } - - pad.push(buffer).await - } -} +#[derive(Clone, Debug)] +struct TcpClientSrcPadHandler; impl PadSrcHandler for TcpClientSrcPadHandler { type ElementImpl = TcpClientSrc; @@ -216,7 +127,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { fn src_query( &self, pad: &PadSrcRef, - _tcpclientsrc: &TcpClientSrc, + tcpclientsrc: &TcpClientSrc, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { @@ -234,7 +145,8 @@ impl PadSrcHandler for TcpClientSrcPadHandler { true } QueryViewMut::Caps(q) => { - let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() { + let caps = if let Some(caps) = tcpclientsrc.configured_caps.lock().unwrap().as_ref() + { q.filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -263,30 +175,98 @@ impl PadSrcHandler for TcpClientSrcPadHandler { struct TcpClientSrcTask { element: super::TcpClientSrc, - src_pad: PadSrcWeak, - src_pad_handler: TcpClientSrcPadHandler, saddr: SocketAddr, buffer_pool: Option, socket: Option>, + need_initial_events: bool, + need_segment: bool, } impl TcpClientSrcTask { - fn new( - element: &super::TcpClientSrc, - src_pad: &PadSrc, - src_pad_handler: &TcpClientSrcPadHandler, - saddr: SocketAddr, - buffer_pool: gst::BufferPool, - ) -> Self { + fn new(element: super::TcpClientSrc, saddr: SocketAddr, buffer_pool: gst::BufferPool) -> Self { TcpClientSrcTask { - element: element.clone(), - src_pad: src_pad.downgrade(), - src_pad_handler: src_pad_handler.clone(), + element, saddr, buffer_pool: Some(buffer_pool), socket: None, + need_initial_events: true, + need_segment: true, } } + + async fn push_buffer( + &mut self, + buffer: gst::Buffer, + ) -> Result { + gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer); + + let tcpclientsrc = self.element.imp(); + + if self.need_initial_events { + gst::debug!(CAT, obj: &self.element, "Pushing initial events"); + + let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); + let stream_start_evt = gst::event::StreamStart::builder(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + tcpclientsrc.src_pad.push_event(stream_start_evt).await; + + let caps = tcpclientsrc.settings.lock().unwrap().caps.clone(); + if let Some(caps) = caps { + tcpclientsrc + .src_pad + .push_event(gst::event::Caps::new(&caps)) + .await; + *tcpclientsrc.configured_caps.lock().unwrap() = Some(caps); + } + + self.need_initial_events = false; + } + + if self.need_segment { + let segment_evt = + gst::event::Segment::new(&gst::FormattedSegment::::new()); + tcpclientsrc.src_pad.push_event(segment_evt).await; + + self.need_segment = false; + } + + if buffer.size() == 0 { + tcpclientsrc + .src_pad + .push_event(gst::event::Eos::new()) + .await; + return Ok(gst::FlowSuccess::Ok); + } + + let res = tcpclientsrc.src_pad.push(buffer).await; + match res { + Ok(_) => { + gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"); + } + Err(gst::FlowError::Flushing) => { + gst::debug!(CAT, obj: &self.element, "Flushing"); + } + Err(gst::FlowError::Eos) => { + gst::debug!(CAT, obj: &self.element, "EOS"); + tcpclientsrc + .src_pad + .push_event(gst::event::Eos::new()) + .await; + } + Err(err) => { + gst::error!(CAT, obj: &self.element, "Got error {}", err); + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + } + } + + res + } } impl TaskImpl for TcpClientSrcTask { @@ -345,66 +325,35 @@ impl TaskImpl for TcpClientSrcTask { fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { - let item = self.socket.as_mut().unwrap().next().await; + let item = self.socket.as_mut().unwrap().next().await.ok_or_else(|| { + gst::log!(CAT, obj: &self.element, "SocketStream Stopped"); + gst::FlowError::Flushing + })?; - let buffer = match item { - Some(Ok((buffer, _))) => buffer, - Some(Err(err)) => { - gst::error!(CAT, obj: &self.element, "Got error {:?}", err); - match err { - SocketError::Gst(err) => { - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - } - SocketError::Io(err) => { - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("I/O error"), - ["streaming stopped, I/O error {}", err] - ); - } + let (buffer, _) = item.map_err(|err| { + gst::error!(CAT, obj: &self.element, "Got error {:?}", err); + match err { + SocketError::Gst(err) => { + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason {}", err] + ); + } + SocketError::Io(err) => { + gst::element_error!( + self.element, + gst::StreamError::Failed, + ("I/O error"), + ["streaming stopped, I/O error {}", err] + ); } - return Err(gst::FlowError::Error); } - None => { - gst::log!(CAT, obj: &self.element, "SocketStream Stopped"); - return Err(gst::FlowError::Flushing); - } - }; + gst::FlowError::Error + })?; - let pad = self.src_pad.upgrade().expect("PadSrc no longer exists"); - let res = self - .src_pad_handler - .push_buffer(&pad, &self.element, buffer) - .await; - match res { - Ok(_) => { - gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"); - } - Err(gst::FlowError::Flushing) => { - gst::debug!(CAT, obj: &self.element, "Flushing"); - } - Err(gst::FlowError::Eos) => { - gst::debug!(CAT, obj: &self.element, "EOS"); - pad.push_event(gst::event::Eos::new()).await; - } - Err(err) => { - gst::error!(CAT, obj: &self.element, "Got error {}", err); - gst::element_error!( - self.element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason {}", err] - ); - } - } - - res.map(drop) + self.push_buffer(buffer).await.map(drop) } .boxed() } @@ -412,7 +361,7 @@ impl TaskImpl for TcpClientSrcTask { fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Stopping task"); - self.src_pad_handler.reset_state().await; + self.need_initial_events = true; gst::log!(CAT, obj: &self.element, "Task stopped"); Ok(()) } @@ -422,7 +371,7 @@ impl TaskImpl for TcpClientSrcTask { fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { async move { gst::log!(CAT, obj: &self.element, "Stopping task flush"); - self.src_pad_handler.set_need_segment().await; + self.need_initial_events = true; gst::log!(CAT, obj: &self.element, "Task flush stopped"); Ok(()) } @@ -432,9 +381,9 @@ impl TaskImpl for TcpClientSrcTask { pub struct TcpClientSrc { src_pad: PadSrc, - src_pad_handler: TcpClientSrcPadHandler, task: Task, - settings: StdMutex, + configured_caps: Mutex>, + settings: Mutex, } static CAT: Lazy = Lazy::new(|| { @@ -447,9 +396,8 @@ static CAT: Lazy = Lazy::new(|| { impl TcpClientSrc { fn prepare(&self, element: &super::TcpClientSrc) -> Result<(), gst::ErrorMessage> { - let settings = self.settings.lock().unwrap().clone(); - gst::debug!(CAT, obj: element, "Preparing"); + let settings = self.settings.lock().unwrap().clone(); let context = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { @@ -459,6 +407,8 @@ impl TcpClientSrc { ) })?; + *self.configured_caps.lock().unwrap() = None; + let host: IpAddr = match settings.host { None => { return Err(gst::error_msg!( @@ -490,17 +440,9 @@ impl TcpClientSrc { let saddr = SocketAddr::new(host, port as u16); - self.src_pad_handler.prepare(settings.caps); - self.task .prepare( - TcpClientSrcTask::new( - element, - &self.src_pad, - &self.src_pad_handler, - saddr, - buffer_pool, - ), + TcpClientSrcTask::new(element.clone(), saddr, buffer_pool), context, ) .map_err(|err| { @@ -550,16 +492,14 @@ impl ObjectSubclass for TcpClientSrc { type ParentType = gst::Element; fn with_class(klass: &Self::Class) -> Self { - let src_pad_handler = TcpClientSrcPadHandler::default(); - Self { src_pad: PadSrc::new( gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")), - src_pad_handler.clone(), + TcpClientSrcPadHandler, ), - src_pad_handler, task: Task::default(), - settings: StdMutex::new(Settings::default()), + configured_caps: Default::default(), + settings: Default::default(), } } }