threadshare: build Pad{Src,Sink} with handlers

Handlers for `Pad{Src,Sink}` are assigned when `prepare` is called
which prevents them from handling pre-prepare queries.

See https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/247
This commit is contained in:
François Laignel 2020-04-29 15:03:43 +02:00
parent c414f78248
commit 26634f591a
10 changed files with 112 additions and 249 deletions

View file

@ -421,7 +421,6 @@ impl AppSrc {
)
})?;
self.src_pad_handler.prepare(settings.caps.clone());
self.src_pad.prepare(&self.src_pad_handler);
gst_debug!(CAT, obj: element, "Prepared");
@ -435,7 +434,6 @@ impl AppSrc {
*self.receiver.lock().unwrap() = None;
self.task.unprepare().unwrap();
self.src_pad.unprepare();
gst_debug!(CAT, obj: element, "Unprepared");
@ -665,12 +663,14 @@ impl ObjectSubclass for AppSrc {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let src_pad_handler = AppSrcPadHandler::default();
Self {
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
src_pad_handler: AppSrcPadHandler::default(),
src_pad: PadSrc::new(
gst::Pad::new_from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
src_pad_handler.clone(),
),
src_pad_handler,
task: Task::default(),
state: StdMutex::new(AppSrcState::RejectBuffers),
sender: StdMutex::new(None),

View file

@ -105,16 +105,10 @@ impl Default for InputSelectorPadSinkHandlerInner {
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
struct InputSelectorPadSinkHandler(Arc<Mutex<InputSelectorPadSinkHandlerInner>>);
impl InputSelectorPadSinkHandler {
fn new() -> Self {
InputSelectorPadSinkHandler(Arc::new(Mutex::new(
InputSelectorPadSinkHandlerInner::default(),
)))
}
/* Wait until specified time */
async fn sync(&self, element: &gst::Element, running_time: gst::ClockTime) {
let now = element.get_current_running_time();
@ -312,8 +306,6 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
#[derive(Clone, Debug)]
struct InputSelectorPadSrcHandler;
impl InputSelectorPadSrcHandler {}
impl PadSrcHandler for InputSelectorPadSrcHandler {
type ElementImpl = InputSelector;
@ -426,34 +418,10 @@ lazy_static! {
}
impl InputSelector {
fn prepare(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Preparing");
self.src_pad.prepare(&InputSelectorPadSrcHandler);
let pads = self.pads.lock().unwrap();
for pad in pads.sink_pads.values() {
pad.prepare(&InputSelectorPadSinkHandler::new());
}
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
}
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
let mut state = self.state.lock().unwrap();
gst_debug!(CAT, obj: element, "Unpreparing");
self.src_pad.unprepare();
let pads = self.pads.lock().unwrap();
for pad in pads.sink_pads.values() {
pad.unprepare();
}
*state = State::default();
gst_debug!(CAT, obj: element, "Unprepared");
Ok(())
@ -501,10 +469,10 @@ impl ObjectSubclass for InputSelector {
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
Self {
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
src_pad: PadSrc::new(
gst::Pad::new_from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
InputSelectorPadSrcHandler,
),
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
pads: Mutex::new(Pads::default()),
@ -599,17 +567,8 @@ impl ElementImpl for InputSelector {
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
self.prepare(element).map_err(|err| {
element.post_error_message(&err);
gst::StateChangeError
})?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
if let gst::StateChange::ReadyToNull = transition {
self.unprepare(element).map_err(|_| gst::StateChangeError)?;
}
let mut success = self.parent_change_state(element, transition)?;
@ -641,11 +600,9 @@ impl ElementImpl for InputSelector {
pads.pad_serial += 1;
sink_pad.set_active(true).unwrap();
element.add_pad(&sink_pad).unwrap();
let sink_pad = PadSink::new(sink_pad);
let sink_pad = PadSink::new(sink_pad, InputSelectorPadSinkHandler::default());
let ret = sink_pad.gst_pad().clone();
sink_pad.prepare(&InputSelectorPadSinkHandler::new());
if state.active_sinkpad.is_none() {
state.active_sinkpad = Some(ret.clone());
state.switched_pad = true;
@ -663,7 +620,7 @@ impl ElementImpl for InputSelector {
fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) {
let mut pads = self.pads.lock().unwrap();
let sink_pad = pads.sink_pads.remove(pad).unwrap();
sink_pad.unprepare();
drop(sink_pad);
element.remove_pad(pad).unwrap();
drop(pads);

View file

@ -210,14 +210,10 @@ impl Default for SinkHandlerInner {
}
}
#[derive(Clone)]
#[derive(Clone, Default)]
struct SinkHandler(Arc<StdMutex<SinkHandlerInner>>);
impl SinkHandler {
fn new() -> Self {
SinkHandler(Arc::new(StdMutex::new(SinkHandlerInner::default())))
}
fn clear(&self) {
let mut inner = self.0.lock().unwrap();
*inner = SinkHandlerInner::default();
@ -728,14 +724,10 @@ impl PadSinkHandler for SinkHandler {
}
}
#[derive(Clone)]
#[derive(Clone, Default)]
struct SrcHandler;
impl SrcHandler {
fn new() -> Self {
SrcHandler
}
fn clear(&self) {}
fn generate_lost_events(
@ -1154,9 +1146,6 @@ impl JitterBuffer {
)
})?;
self.src_pad.prepare(&self.src_pad_handler);
self.sink_pad.prepare(&self.sink_pad_handler);
gst_info!(CAT, obj: element, "Prepared");
Ok(())
@ -1164,11 +1153,7 @@ impl JitterBuffer {
fn unprepare(&self, element: &gst::Element) {
gst_debug!(CAT, obj: element, "Unpreparing");
self.task.unprepare().unwrap();
self.sink_pad.unprepare();
self.src_pad.unprepare();
gst_debug!(CAT, obj: element, "Unprepared");
}
@ -1418,17 +1403,20 @@ impl ObjectSubclass for JitterBuffer {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let sink_pad_handler = SinkHandler::default();
let src_pad_handler = SrcHandler::default();
Self {
sink_pad: PadSink::new(gst::Pad::new_from_template(
&klass.get_pad_template("sink").unwrap(),
Some("sink"),
)),
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
sink_pad_handler: SinkHandler::new(),
src_pad_handler: SrcHandler::new(),
sink_pad: PadSink::new(
gst::Pad::new_from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
sink_pad_handler.clone(),
),
src_pad: PadSrc::new(
gst::Pad::new_from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
src_pad_handler.clone(),
),
sink_pad_handler,
src_pad_handler,
task: Task::default(),
state: StdMutex::new(State::default()),
settings: StdMutex::new(Settings::default()),

View file

@ -619,8 +619,6 @@ impl ProxySink {
*self.proxy_ctx.lock().unwrap() = Some(proxy_ctx);
self.sink_pad.prepare(&ProxySinkPadHandler);
gst_debug!(SINK_CAT, obj: element, "Prepared");
Ok(())
@ -628,10 +626,7 @@ impl ProxySink {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(SINK_CAT, obj: element, "Unpreparing");
self.sink_pad.unprepare();
*self.proxy_ctx.lock().unwrap() = None;
gst_debug!(SINK_CAT, obj: element, "Unprepared");
Ok(())
@ -703,10 +698,10 @@ impl ObjectSubclass for ProxySink {
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
Self {
sink_pad: PadSink::new(gst::Pad::new_from_template(
&klass.get_pad_template("sink").unwrap(),
Some("sink"),
)),
sink_pad: PadSink::new(
gst::Pad::new_from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
ProxySinkPadHandler,
),
proxy_ctx: StdMutex::new(None),
settings: StdMutex::new(SettingsSink::default()),
}
@ -978,8 +973,6 @@ impl ProxySrc {
*self.dataqueue.lock().unwrap() = Some(dataqueue);
self.src_pad.prepare(&ProxySrcPadHandler);
self.task.prepare(ts_ctx).map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
@ -1002,7 +995,6 @@ impl ProxySrc {
}
self.task.unprepare().unwrap();
self.src_pad.unprepare();
*self.dataqueue.lock().unwrap() = None;
*self.proxy_ctx.lock().unwrap() = None;
@ -1195,10 +1187,10 @@ impl ObjectSubclass for ProxySrc {
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
Self {
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
src_pad: PadSrc::new(
gst::Pad::new_from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
ProxySrcPadHandler,
),
task: Task::default(),
proxy_ctx: StdMutex::new(None),
dataqueue: StdMutex::new(None),

View file

@ -568,9 +568,6 @@ impl Queue {
)
})?;
self.src_pad.prepare(&QueuePadSrcHandler);
self.sink_pad.prepare(&QueuePadSinkHandler);
gst_debug!(CAT, obj: element, "Prepared");
Ok(())
@ -579,9 +576,7 @@ impl Queue {
fn unprepare(&self, element: &gst::Element) -> Result<(), ()> {
gst_debug!(CAT, obj: element, "Unpreparing");
self.sink_pad.unprepare();
self.task.unprepare().unwrap();
self.src_pad.unprepare();
*self.dataqueue.lock().unwrap() = None;
*self.pending_queue.lock().unwrap() = None;
@ -756,14 +751,14 @@ impl ObjectSubclass for Queue {
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
Self {
sink_pad: PadSink::new(gst::Pad::new_from_template(
&klass.get_pad_template("sink").unwrap(),
Some("sink"),
)),
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
sink_pad: PadSink::new(
gst::Pad::new_from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
QueuePadSinkHandler,
),
src_pad: PadSrc::new(
gst::Pad::new_from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
QueuePadSrcHandler,
),
task: Task::default(),
dataqueue: StdMutex::new(None),
pending_queue: StdMutex::new(None),

View file

@ -420,9 +420,9 @@ impl PadSrcStrong {
pub struct PadSrc(PadSrcStrong);
impl PadSrc {
pub fn new(gst_pad: gst::Pad) -> Self {
pub fn new(gst_pad: gst::Pad, handler: impl PadSrcHandler) -> Self {
let this = PadSrc(PadSrcStrong::new(gst_pad));
this.set_default_activatemode_function();
this.init_pad_functions(handler);
this
}
@ -443,38 +443,7 @@ impl PadSrc {
self.gst_pad().check_reconfigure()
}
fn set_default_activatemode_function(&self) {
let this_weak = self.downgrade();
self.gst_pad()
.set_activatemode_function(move |gst_pad, _parent, mode, active| {
// Important: don't panic here as we operate without `catch_panic_pad_function`
// because we may not know which element the PadSrc is associated to yet
this_weak
.upgrade()
.ok_or_else(|| {
gst_error!(RUNTIME_CAT, obj: gst_pad, "PadSrc no longer exists");
gst_loggable_error!(RUNTIME_CAT, "PadSrc no longer exists")
})?
.activate_mode_hook(mode, active)
});
}
///// Spawns `future` using current [`PadContext`].
/////
///// # Panics
/////
///// This function panics if the `PadSrc` is not prepared.
/////
///// [`PadContext`]: ../struct.PadContext.html
//pub fn spawn<Fut>(&self, future: Fut) -> JoinHandle<Fut::Output>
//where
// Fut: Future + Send + 'static,
// Fut::Output: Send + 'static,
//{
// self.0.spawn(future)
//}
fn init_pad_functions<H: PadSrcHandler>(&self, handler: &H) {
fn init_pad_functions<H: PadSrcHandler>(&self, handler: H) {
let handler_clone = handler.clone();
let this_weak = self.downgrade();
self.gst_pad()
@ -535,11 +504,10 @@ impl PadSrc {
)
});
let handler_clone = handler.clone();
let this_weak = self.downgrade();
self.gst_pad()
.set_query_function(move |_gst_pad, parent, query| {
let handler = handler_clone.clone();
let handler = handler.clone();
let this_weak = this_weak.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
@ -557,29 +525,6 @@ impl PadSrc {
});
}
pub fn prepare<H: PadSrcHandler>(&self, handler: &H) {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Preparing");
self.init_pad_functions(handler);
}
/// Releases the resources held by this `PadSrc`.
pub fn unprepare(&self) {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Unpreparing");
self.gst_pad()
.set_activate_function(move |_gst_pad, _parent| {
Err(gst_loggable_error!(RUNTIME_CAT, "PadSrc unprepared"))
});
self.set_default_activatemode_function();
self.gst_pad()
.set_event_function(move |_gst_pad, _parent, _event| false);
self.gst_pad()
.set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
self.gst_pad()
.set_query_function(move |_gst_pad, _parent, _query| false);
}
pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
self.0.push(buffer).await
}
@ -593,6 +538,25 @@ impl PadSrc {
}
}
impl Drop for PadSrc {
fn drop(&mut self) {
self.gst_pad()
.set_activate_function(move |_gst_pad, _parent| {
Err(gst_loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
});
self.gst_pad()
.set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
Err(gst_loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
});
self.gst_pad()
.set_event_function(move |_gst_pad, _parent, _event| false);
self.gst_pad()
.set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
self.gst_pad()
.set_query_function(move |_gst_pad, _parent, _query| false);
}
}
/// A trait to define `handler`s for [`PadSink`] callbacks.
///
/// *See the [`pad` module] documentation for a description of the model.*
@ -885,9 +849,9 @@ impl PadSinkStrong {
pub struct PadSink(PadSinkStrong);
impl PadSink {
pub fn new(gst_pad: gst::Pad) -> Self {
pub fn new(gst_pad: gst::Pad, handler: impl PadSinkHandler) -> Self {
let this = PadSink(PadSinkStrong::new(gst_pad));
this.set_default_activatemode_function();
this.init_pad_functions(handler);
this
}
@ -904,23 +868,7 @@ impl PadSink {
self.0.downgrade()
}
fn set_default_activatemode_function(&self) {
let this_weak = self.downgrade();
self.gst_pad()
.set_activatemode_function(move |gst_pad, _parent, mode, active| {
// Important: don't panic here as we operate without `catch_panic_pad_function`
// because we may not know which element the PadSrc is associated to yet
this_weak
.upgrade()
.ok_or_else(|| {
gst_error!(RUNTIME_CAT, obj: gst_pad, "PadSink no longer exists");
gst_loggable_error!(RUNTIME_CAT, "PadSink no longer exists")
})?
.activate_mode_hook(mode, active)
});
}
fn init_pad_functions<H: PadSinkHandler>(&self, handler: &H) {
fn init_pad_functions<H: PadSinkHandler>(&self, handler: H) {
let handler_clone = handler.clone();
let this_weak = self.downgrade();
self.gst_pad()
@ -1079,11 +1027,10 @@ impl PadSink {
)
});
let handler_clone = handler.clone();
let this_weak = self.downgrade();
self.gst_pad()
.set_query_function(move |_gst_pad, parent, query| {
let handler = handler_clone.clone();
let handler = handler.clone();
let this_weak = this_weak.clone();
H::ElementImpl::catch_panic_pad_function(
parent,
@ -1100,21 +1047,18 @@ impl PadSink {
)
});
}
}
pub fn prepare<H: PadSinkHandler>(&self, handler: &H) {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Preparing");
self.init_pad_functions(handler);
}
/// Releases the resources held by this `PadSink`.
pub fn unprepare(&self) {
gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Unpreparing");
impl Drop for PadSink {
fn drop(&mut self) {
self.gst_pad()
.set_activate_function(move |_gst_pad, _parent| {
Err(gst_loggable_error!(RUNTIME_CAT, "PadSink unprepared"))
Err(gst_loggable_error!(RUNTIME_CAT, "PadSink no longer exists"))
});
self.gst_pad()
.set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
Err(gst_loggable_error!(RUNTIME_CAT, "PadSink no longer exists"))
});
self.set_default_activatemode_function();
self.gst_pad()
.set_chain_function(move |_gst_pad, _parent, _buffer| Err(FlowError::Flushing));
self.gst_pad()

View file

@ -439,7 +439,6 @@ impl TcpClientSrc {
)
})?;
self.src_pad_handler.prepare(settings.caps);
self.src_pad.prepare(&self.src_pad_handler);
gst_debug!(CAT, obj: element, "Prepared");
@ -450,9 +449,7 @@ impl TcpClientSrc {
gst_debug!(CAT, obj: element, "Unpreparing");
*self.socket.lock().unwrap() = None;
self.task.unprepare().unwrap();
self.src_pad.unprepare();
gst_debug!(CAT, obj: element, "Unprepared");
@ -639,11 +636,14 @@ impl ObjectSubclass for TcpClientSrc {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("src").unwrap();
let src_pad_handler = TcpClientSrcPadHandler::default();
Self {
src_pad: PadSrc::new(gst::Pad::new_from_template(&templ, Some("src"))),
src_pad_handler: TcpClientSrcPadHandler::default(),
src_pad: PadSrc::new(
gst::Pad::new_from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
src_pad_handler.clone(),
),
src_pad_handler,
task: Task::default(),
socket: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),

View file

@ -1029,8 +1029,6 @@ impl UdpSink {
)
})?;
self.sink_pad.prepare(&self.sink_pad_handler);
gst_debug!(CAT, obj: element, "Started preparing");
Ok(())
@ -1041,7 +1039,6 @@ impl UdpSink {
self.task.unprepare().unwrap();
self.sink_pad_handler.unprepare();
self.sink_pad.unprepare();
gst_debug!(CAT, obj: element, "Unprepared");
@ -1258,13 +1255,14 @@ impl ObjectSubclass for UdpSink {
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let settings = Arc::new(StdMutex::new(Settings::default()));
let sink_pad_handler = UdpSinkPadHandler::new(Arc::clone(&settings));
Self {
sink_pad: PadSink::new(gst::Pad::new_from_template(
&klass.get_pad_template("sink").unwrap(),
Some("sink"),
)),
sink_pad_handler: UdpSinkPadHandler::new(Arc::clone(&settings)),
sink_pad: PadSink::new(
gst::Pad::new_from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
sink_pad_handler.clone(),
),
sink_pad_handler,
task: Task::default(),
settings,
}

View file

@ -607,7 +607,6 @@ impl UdpSrc {
})?;
self.src_pad_handler
.prepare(settings.caps, settings.retrieve_sender_address);
self.src_pad.prepare(&self.src_pad_handler);
gst_debug!(CAT, obj: element, "Prepared");
@ -622,7 +621,6 @@ impl UdpSrc {
element.notify("used-socket");
self.task.unprepare().unwrap();
self.src_pad.unprepare();
gst_debug!(CAT, obj: element, "Unprepared");
@ -841,12 +839,14 @@ impl ObjectSubclass for UdpSrc {
}
fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
let src_pad_handler = UdpSrcPadHandler::default();
Self {
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
src_pad_handler: UdpSrcPadHandler::default(),
src_pad: PadSrc::new(
gst::Pad::new_from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
src_pad_handler.clone(),
),
src_pad_handler,
task: Task::default(),
socket: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),

View file

@ -183,7 +183,6 @@ impl ElementSrcTest {
["Error preparing Task: {:?}", err]
)
})?;
self.src_pad.prepare(&PadSrcTestHandler);
let (sender, receiver) = mpsc::channel(1);
*self.sender.lock().unwrap() = Some(sender);
@ -198,7 +197,6 @@ impl ElementSrcTest {
gst_debug!(SRC_CAT, obj: element, "Unpreparing");
self.task.unprepare().unwrap();
self.src_pad.unprepare();
*self.sender.lock().unwrap() = None;
*self.receiver.lock().unwrap() = None;
@ -369,10 +367,10 @@ impl ObjectSubclass for ElementSrcTest {
fn new_with_class(klass: &glib::subclass::simple::ClassStruct<Self>) -> Self {
ElementSrcTest {
src_pad: PadSrc::new(gst::Pad::new_from_template(
&klass.get_pad_template("src").unwrap(),
Some("src"),
)),
src_pad: PadSrc::new(
gst::Pad::new_from_template(&klass.get_pad_template("src").unwrap(), Some("src")),
PadSrcTestHandler,
),
task: Task::default(),
state: StdMutex::new(ElementSrcTestState::RejectItems),
sender: StdMutex::new(None),
@ -684,11 +682,11 @@ impl ObjectSubclass for ElementSinkTest {
}
fn new_with_class(klass: &glib::subclass::simple::ClassStruct<Self>) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let gst_pad = gst::Pad::new_from_template(&templ, Some("sink"));
ElementSinkTest {
sink_pad: PadSink::new(gst_pad),
sink_pad: PadSink::new(
gst::Pad::new_from_template(&klass.get_pad_template("sink").unwrap(), Some("sink")),
PadSinkTestHandler,
),
flushing: AtomicBool::new(true),
sender: FutMutex::new(None),
}
@ -730,22 +728,13 @@ impl ElementImpl for ElementSinkTest {
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_log!(SINK_CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::NullToReady => {
self.sink_pad.prepare(&PadSinkTestHandler::default());
}
gst::StateChange::PausedToReady => {
self.stop(element);
}
gst::StateChange::ReadyToNull => {
self.sink_pad.unprepare();
}
_ => (),
if let gst::StateChange::PausedToReady = transition {
self.stop(element);
}
let success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused {
if let gst::StateChange::ReadyToPaused = transition {
self.start(element);
}