diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index 3587ea27..c6f3fcd0 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -1,5 +1,5 @@ // Copyright (C) 2018 Sebastian Dröge -// Copyright (C) 2019-2020 François Laignel +// Copyright (C) 2019-2022 François Laignel // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public @@ -20,7 +20,6 @@ use futures::channel::mpsc; use futures::future::BoxFuture; -use futures::lock::Mutex as FutMutex; use futures::prelude::*; use gst::glib; @@ -29,13 +28,12 @@ use gst::subclass::prelude::*; use once_cell::sync::Lazy; -use std::sync::Arc; -use std::sync::Mutex as StdMutex; +use std::sync::Mutex; use std::time::Duration; use std::u32; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState}; +use crate::runtime::{Context, PadSrc, PadSrcRef, Task, TaskState}; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; @@ -78,108 +76,8 @@ enum StreamItem { Event(gst::Event), } -#[derive(Debug)] -struct AppSrcPadHandlerState { - need_initial_events: bool, - need_segment: bool, - caps: Option, -} - -impl Default for AppSrcPadHandlerState { - fn default() -> Self { - AppSrcPadHandlerState { - need_initial_events: true, - need_segment: true, - caps: None, - } - } -} - -#[derive(Debug, Default)] -struct AppSrcPadHandlerInner { - state: FutMutex, - configured_caps: StdMutex>, -} - -#[derive(Clone, Debug, Default)] -struct AppSrcPadHandler(Arc); - -impl AppSrcPadHandler { - fn prepare(&self, caps: Option) { - self.0 - .state - .try_lock() - .expect("State locked elsewhere") - .caps = caps; - } - - async fn reset_state(&self) { - *self.0.state.lock().await = Default::default(); - } - - async fn set_need_segment(&self) { - self.0.state.lock().await.need_segment = true; - } - - async fn push_prelude(&self, pad: &PadSrcRef<'_>, _element: &super::AppSrc) { - let mut state = self.0.state.lock().await; - if state.need_initial_events { - gst::debug!(CAT, obj: pad.gst_pad(), "Pushing initial events"); - - let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); - let stream_start_evt = gst::event::StreamStart::builder(&stream_id) - .group_id(gst::GroupId::next()) - .build(); - pad.push_event(stream_start_evt).await; - - if let Some(ref caps) = state.caps { - pad.push_event(gst::event::Caps::new(caps)).await; - *self.0.configured_caps.lock().unwrap() = Some(caps.clone()); - } - - state.need_initial_events = false; - } - - if state.need_segment { - let segment_evt = - gst::event::Segment::new(&gst::FormattedSegment::::new()); - pad.push_event(segment_evt).await; - - state.need_segment = false; - } - } - - async fn push_item( - &self, - pad: &PadSrcRef<'_>, - element: &super::AppSrc, - item: StreamItem, - ) -> Result { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", item); - - self.push_prelude(pad, element).await; - - match item { - StreamItem::Buffer(buffer) => { - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer); - pad.push(buffer).await - } - StreamItem::Event(event) => { - match event.view() { - gst::EventView::Eos(_) => { - // Let the caller push the event - Err(gst::FlowError::Eos) - } - _ => { - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); - pad.push_event(event).await; - Ok(gst::FlowSuccess::Ok) - } - } - } - } - } -} +#[derive(Clone, Debug)] +struct AppSrcPadHandler; impl PadSrcHandler for AppSrcPadHandler { type ElementImpl = AppSrc; @@ -215,7 +113,7 @@ impl PadSrcHandler for AppSrcPadHandler { fn src_query( &self, pad: &PadSrcRef, - _appsrc: &AppSrc, + appsrc: &AppSrc, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool { @@ -233,7 +131,7 @@ impl PadSrcHandler for AppSrcPadHandler { true } QueryViewMut::Caps(q) => { - let caps = if let Some(caps) = self.0.configured_caps.lock().unwrap().as_ref() { + let caps = if let Some(caps) = appsrc.configured_caps.lock().unwrap().as_ref() { q.filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -262,23 +160,18 @@ impl PadSrcHandler for AppSrcPadHandler { #[derive(Debug)] struct AppSrcTask { element: super::AppSrc, - src_pad: PadSrcWeak, - src_pad_handler: AppSrcPadHandler, receiver: mpsc::Receiver, + need_initial_events: bool, + need_segment: bool, } impl AppSrcTask { - fn new( - element: &super::AppSrc, - src_pad: &PadSrc, - src_pad_handler: &AppSrcPadHandler, - receiver: mpsc::Receiver, - ) -> Self { + fn new(element: super::AppSrc, receiver: mpsc::Receiver) -> Self { AppSrcTask { - element: element.clone(), - src_pad: src_pad.downgrade(), - src_pad_handler: src_pad_handler.clone(), + element, receiver, + need_initial_events: true, + need_segment: true, } } } @@ -288,37 +181,85 @@ impl AppSrcTask { // Purge the channel while let Ok(Some(_item)) = self.receiver.try_next() {} } + + async fn push_item(&mut self, item: StreamItem) -> Result { + gst::log!(CAT, obj: &self.element, "Handling {:?}", item); + let appsrc = self.element.imp(); + + if self.need_initial_events { + gst::debug!(CAT, obj: &self.element, "Pushing initial events"); + + let stream_id = format!("{:08x}{:08x}", rand::random::(), rand::random::()); + let stream_start_evt = gst::event::StreamStart::builder(&stream_id) + .group_id(gst::GroupId::next()) + .build(); + appsrc.src_pad.push_event(stream_start_evt).await; + + let caps = appsrc.settings.lock().unwrap().caps.clone(); + if let Some(caps) = caps { + appsrc + .src_pad + .push_event(gst::event::Caps::new(&caps)) + .await; + *appsrc.configured_caps.lock().unwrap() = Some(caps.clone()); + } + + self.need_initial_events = false; + } + + if self.need_segment { + let segment_evt = + gst::event::Segment::new(&gst::FormattedSegment::::new()); + appsrc.src_pad.push_event(segment_evt).await; + + self.need_segment = false; + } + + match item { + StreamItem::Buffer(buffer) => { + gst::log!(CAT, obj: &self.element, "Forwarding {:?}", buffer); + appsrc.src_pad.push(buffer).await + } + StreamItem::Event(event) => { + match event.view() { + gst::EventView::Eos(_) => { + // Let the caller push the event + Err(gst::FlowError::Eos) + } + _ => { + gst::log!(CAT, obj: &self.element, "Forwarding {:?}", event); + appsrc.src_pad.push_event(event).await; + Ok(gst::FlowSuccess::Ok) + } + } + } + } + } } impl TaskImpl for AppSrcTask { fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { - let item = match self.receiver.next().await { - Some(item) => item, - None => { - gst::error!(CAT, obj: &self.element, "SrcPad channel aborted"); - gst::element_error!( - &self.element, - gst::StreamError::Failed, - ("Internal data stream error"), - ["streaming stopped, reason: channel aborted"] - ); - return Err(gst::FlowError::Flushing); - } - }; + let item = self.receiver.next().await.ok_or_else(|| { + gst::error!(CAT, obj: &self.element, "SrcPad channel aborted"); + gst::element_error!( + &self.element, + gst::StreamError::Failed, + ("Internal data stream error"), + ["streaming stopped, reason: channel aborted"] + ); + gst::FlowError::Flushing + })?; - let pad = self.src_pad.upgrade().expect("PadSrc no longer exists"); - let res = self - .src_pad_handler - .push_item(&pad, &self.element, item) - .await; + let res = self.push_item(item).await; match res { Ok(_) => { gst::log!(CAT, obj: &self.element, "Successfully pushed item"); } Err(gst::FlowError::Eos) => { gst::debug!(CAT, obj: &self.element, "EOS"); - pad.push_event(gst::event::Eos::new()).await; + let appsrc = self.element.imp(); + appsrc.src_pad.push_event(gst::event::Eos::new()).await; } Err(gst::FlowError::Flushing) => { gst::debug!(CAT, obj: &self.element, "Flushing"); @@ -344,7 +285,8 @@ impl TaskImpl for AppSrcTask { gst::log!(CAT, obj: &self.element, "Stopping task"); self.flush(); - self.src_pad_handler.reset_state().await; + self.need_initial_events = true; + self.need_segment = true; gst::log!(CAT, obj: &self.element, "Task stopped"); Ok(()) @@ -357,7 +299,7 @@ impl TaskImpl for AppSrcTask { gst::log!(CAT, obj: &self.element, "Starting task flush"); self.flush(); - self.src_pad_handler.set_need_segment().await; + self.need_segment = true; gst::log!(CAT, obj: &self.element, "Task flush started"); Ok(()) @@ -369,10 +311,10 @@ impl TaskImpl for AppSrcTask { #[derive(Debug)] pub struct AppSrc { src_pad: PadSrc, - src_pad_handler: AppSrcPadHandler, task: Task, - sender: StdMutex>>, - settings: StdMutex, + sender: Mutex>>, + configured_caps: Mutex>, + settings: Mutex, } impl AppSrc { @@ -429,9 +371,9 @@ impl AppSrc { } fn prepare(&self, element: &super::AppSrc) -> Result<(), gst::ErrorMessage> { - let settings = self.settings.lock().unwrap(); gst::debug!(CAT, obj: element, "Preparing"); + let settings = self.settings.lock().unwrap(); let context = Context::acquire(&settings.context, settings.context_wait).map_err(|err| { gst::error_msg!( @@ -439,24 +381,21 @@ impl AppSrc { ["Failed to acquire Context: {}", err] ) })?; - let max_buffers = settings.max_buffers.try_into().map_err(|err| { gst::error_msg!( gst::ResourceError::Settings, ["Invalid max-buffers: {}, {}", settings.max_buffers, err] ) })?; + drop(settings); + + *self.configured_caps.lock().unwrap() = None; let (sender, receiver) = mpsc::channel(max_buffers); *self.sender.lock().unwrap() = Some(sender); - self.src_pad_handler.prepare(settings.caps.clone()); - self.task - .prepare( - AppSrcTask::new(element, &self.src_pad, &self.src_pad_handler, receiver), - context, - ) + .prepare(AppSrcTask::new(element.clone(), receiver), context) .map_err(|err| { gst::error_msg!( gst::ResourceError::OpenRead, @@ -507,17 +446,15 @@ impl ObjectSubclass for AppSrc { type ParentType = gst::Element; fn with_class(klass: &Self::Class) -> Self { - let src_pad_handler = AppSrcPadHandler::default(); - Self { src_pad: PadSrc::new( gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")), - src_pad_handler.clone(), + AppSrcPadHandler, ), - src_pad_handler, task: Task::default(), - sender: StdMutex::new(None), - settings: StdMutex::new(Settings::default()), + sender: Default::default(), + configured_caps: Default::default(), + settings: Default::default(), } } }