diff --git a/generic/threadshare/examples/standalone/sink/imp.rs b/generic/threadshare/examples/standalone/sink/imp.rs index 5ef839c0..97a7dae2 100644 --- a/generic/threadshare/examples/standalone/sink/imp.rs +++ b/generic/threadshare/examples/standalone/sink/imp.rs @@ -19,7 +19,7 @@ use gst::EventView; use once_cell::sync::Lazy; use gstthreadshare::runtime::prelude::*; -use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, Task}; +use gstthreadshare::runtime::{Context, PadSink, Task}; use std::sync::Mutex; use std::task::Poll; @@ -77,7 +77,7 @@ impl PadSinkHandler for TestSinkPadHandler { fn sink_chain( self, - _pad: PadSinkWeak, + _pad: gst::Pad, elem: super::TestSink, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { @@ -95,7 +95,7 @@ impl PadSinkHandler for TestSinkPadHandler { fn sink_chain_list( self, - _pad: PadSinkWeak, + _pad: gst::Pad, elem: super::TestSink, list: gst::BufferList, ) -> BoxFuture<'static, Result> { @@ -115,7 +115,7 @@ impl PadSinkHandler for TestSinkPadHandler { fn sink_event_serialized( self, - _pad: PadSinkWeak, + _pad: gst::Pad, elem: super::TestSink, event: gst::Event, ) -> BoxFuture<'static, bool> { @@ -133,7 +133,7 @@ impl PadSinkHandler for TestSinkPadHandler { .boxed() } - fn sink_event(self, _pad: &PadSinkRef, imp: &TestSink, event: gst::Event) -> bool { + fn sink_event(self, _pad: &gst::Pad, imp: &TestSink, event: gst::Event) -> bool { if let EventView::FlushStart(..) = event.view() { return imp.task.flush_start().await_maybe_on_context().is_ok(); } diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index ff4a8b98..ceb79201 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -33,7 +33,7 @@ use std::time::Duration; use std::u32; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSrc, PadSrcRef, Task, TaskState}; +use crate::runtime::{Context, PadSrc, Task, TaskState}; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; @@ -82,8 +82,8 @@ struct AppSrcPadHandler; impl PadSrcHandler for AppSrcPadHandler { type ElementImpl = AppSrc; - fn src_event(self, pad: &PadSrcRef, imp: &AppSrc, event: gst::Event) -> bool { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + fn src_event(self, pad: &gst::Pad, imp: &AppSrc, event: gst::Event) -> bool { + gst::log!(CAT, obj: pad, "Handling {:?}", event); use gst::EventView; let ret = match event.view() { @@ -95,16 +95,16 @@ impl PadSrcHandler for AppSrcPadHandler { }; if ret { - gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event); + gst::log!(CAT, obj: pad, "Handled {:?}", event); } else { - gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); + gst::log!(CAT, obj: pad, "Didn't handle {:?}", event); } ret } - fn src_query(self, pad: &PadSrcRef, imp: &AppSrc, query: &mut gst::QueryRef) -> bool { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); + fn src_query(self, pad: &gst::Pad, imp: &AppSrc, query: &mut gst::QueryRef) -> bool { + gst::log!(CAT, obj: pad, "Handling {:?}", query); use gst::QueryViewMut; let ret = match query.view_mut() { @@ -136,9 +136,9 @@ impl PadSrcHandler for AppSrcPadHandler { }; if ret { - gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query); + gst::log!(CAT, obj: pad, "Handled {:?}", query); } else { - gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query); + gst::log!(CAT, obj: pad, "Didn't handle {:?}", query); } ret } diff --git a/generic/threadshare/src/inputselector/imp.rs b/generic/threadshare/src/inputselector/imp.rs index f8618dbb..0455f227 100644 --- a/generic/threadshare/src/inputselector/imp.rs +++ b/generic/threadshare/src/inputselector/imp.rs @@ -33,7 +33,7 @@ use std::time::Duration; use std::u32; use crate::runtime::prelude::*; -use crate::runtime::{self, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef}; +use crate::runtime::{self, PadSink, PadSrc}; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; @@ -88,7 +88,7 @@ impl InputSelectorPadSinkHandler { async fn handle_item( &self, - pad: &PadSinkRef<'_>, + pad: &gst::Pad, elem: &super::InputSelector, mut buffer: gst::Buffer, ) -> Result { @@ -111,9 +111,9 @@ impl InputSelectorPadSinkHandler { } let is_active = { - if state.active_sinkpad.as_ref() == Some(pad.gst_pad()) { + if state.active_sinkpad.as_ref() == Some(pad) { if inner.send_sticky || state.switched_pad { - pad.gst_pad().sticky_events_foreach(|event| { + pad.sticky_events_foreach(|event| { use std::ops::ControlFlow; stickies.push(event.clone()); ControlFlow::Continue(gst::EventForeachAction::Keep) @@ -140,7 +140,7 @@ impl InputSelectorPadSinkHandler { } if is_active { - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer); + gst::log!(CAT, obj: pad, "Forwarding {:?}", buffer); if switched_pad && !buffer.flags().contains(gst::BufferFlags::DISCONT) { let buffer = buffer.make_mut(); @@ -159,26 +159,21 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { fn sink_chain( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: super::InputSelector, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - self.handle_item(&pad, &elem, buffer).await - } - .boxed() + async move { self.handle_item(&pad, &elem, buffer).await }.boxed() } fn sink_chain_list( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: super::InputSelector, list: gst::BufferList, ) -> BoxFuture<'static, Result> { async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - gst::log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list); + gst::log!(CAT, obj: pad, "Handling buffer list {:?}", list); // TODO: Ideally we would keep the list intact and forward it in one go for buffer in list.iter_owned() { self.handle_item(&pad, &elem, buffer).await?; @@ -191,7 +186,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { fn sink_event_serialized( self, - _pad: PadSinkWeak, + _pad: gst::Pad, _elem: super::InputSelector, event: gst::Event, ) -> BoxFuture<'static, bool> { @@ -219,7 +214,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { .boxed() } - fn sink_event(self, _pad: &PadSinkRef, imp: &InputSelector, event: gst::Event) -> bool { + fn sink_event(self, _pad: &gst::Pad, imp: &InputSelector, event: gst::Event) -> bool { /* Drop all events for now */ if let gst::EventView::FlushStart(..) = event.view() { /* Unblock downstream */ @@ -234,15 +229,15 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { true } - fn sink_query(self, pad: &PadSinkRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool { - gst::log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query); + fn sink_query(self, pad: &gst::Pad, imp: &InputSelector, query: &mut gst::QueryRef) -> bool { + gst::log!(CAT, obj: pad, "Handling query {:?}", query); if query.is_serialized() { // FIXME: How can we do this (drops ALLOCATION and DRAIN)? - gst::log!(CAT, obj: pad.gst_pad(), "Dropping serialized query {:?}", query); + gst::log!(CAT, obj: pad, "Dropping serialized query {:?}", query); false } else { - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query); + gst::log!(CAT, obj: pad, "Forwarding query {:?}", query); imp.src_pad.gst_pad().peer_query(query) } } @@ -254,8 +249,8 @@ struct InputSelectorPadSrcHandler; impl PadSrcHandler for InputSelectorPadSrcHandler { type ElementImpl = InputSelector; - fn src_query(self, pad: &PadSrcRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); + fn src_query(self, pad: &gst::Pad, imp: &InputSelector, query: &mut gst::QueryRef) -> bool { + gst::log!(CAT, obj: pad, "Handling {:?}", query); use gst::QueryViewMut; match query.view_mut() { diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index 5dbd855f..0b40eb02 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -36,7 +36,7 @@ use std::sync::Mutex as StdMutex; use std::time::Duration; use crate::runtime::prelude::*; -use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task}; +use crate::runtime::{self, Context, PadSink, PadSrc, Task}; use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx}; @@ -488,7 +488,7 @@ impl SinkHandler { fn enqueue_item( &self, - pad: &gst::Pad, + pad: gst::Pad, jb: &JitterBuffer, buffer: Option, ) -> Result { @@ -501,7 +501,7 @@ impl SinkHandler { // This is to avoid recursion with `store`, `reset` and `enqueue_item` while let Some(buf) = buffers.pop_front() { - if let Err(err) = self.store(&mut inner, pad, jb, buf) { + if let Err(err) = self.store(&mut inner, &pad, jb, buf) { match err { gst::FlowError::CustomError => { for gap_packet in self.reset(&mut inner, jb) { @@ -550,26 +550,25 @@ impl PadSinkHandler for SinkHandler { fn sink_chain( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: super::JitterBuffer, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - gst::debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); - self.enqueue_item(pad.gst_pad(), elem.imp(), Some(buffer)) + gst::debug!(CAT, obj: pad, "Handling {:?}", buffer); + self.enqueue_item(pad, elem.imp(), Some(buffer)) } .boxed() } - fn sink_event(self, pad: &PadSinkRef, jb: &JitterBuffer, event: gst::Event) -> bool { + fn sink_event(self, pad: &gst::Pad, jb: &JitterBuffer, event: gst::Event) -> bool { use gst::EventView; - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + gst::log!(CAT, obj: pad, "Handling {:?}", event); if let EventView::FlushStart(..) = event.view() { if let Err(err) = jb.task.flush_start().await_maybe_on_context() { - gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); + gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err); gst::element_imp_error!( jb, gst::StreamError::Failed, @@ -580,20 +579,18 @@ impl PadSinkHandler for SinkHandler { } } - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); + gst::log!(CAT, obj: pad, "Forwarding {:?}", event); jb.src_pad.gst_pad().push_event(event) } fn sink_event_serialized( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: super::JitterBuffer, event: gst::Event, ) -> BoxFuture<'static, bool> { async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + gst::log!(CAT, obj: pad, "Handling {:?}", event); let jb = elem.imp(); @@ -606,7 +603,7 @@ impl PadSinkHandler for SinkHandler { } EventView::FlushStop(..) => { if let Err(err) = jb.task.flush_stop().await_maybe_on_context() { - gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); + gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err); gst::element_error!( elem, gst::StreamError::Failed, @@ -629,7 +626,7 @@ impl PadSinkHandler for SinkHandler { if forward { // FIXME: These events should really be queued up and stay in order - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding serialized {:?}", event); + gst::log!(CAT, obj: pad, "Forwarding serialized {:?}", event); jb.src_pad.push_event(event).await } else { true @@ -870,15 +867,15 @@ impl SrcHandler { impl PadSrcHandler for SrcHandler { type ElementImpl = JitterBuffer; - fn src_event(self, pad: &PadSrcRef, jb: &JitterBuffer, event: gst::Event) -> bool { + fn src_event(self, pad: &gst::Pad, jb: &JitterBuffer, event: gst::Event) -> bool { use gst::EventView; - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + gst::log!(CAT, obj: pad, "Handling {:?}", event); match event.view() { EventView::FlushStart(..) => { if let Err(err) = jb.task.flush_start().await_maybe_on_context() { - gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); + gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err); gst::element_imp_error!( jb, gst::StreamError::Failed, @@ -890,7 +887,7 @@ impl PadSrcHandler for SrcHandler { } EventView::FlushStop(..) => { if let Err(err) = jb.task.flush_stop().await_maybe_on_context() { - gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); + gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err); gst::element_imp_error!( jb, gst::StreamError::Failed, @@ -903,14 +900,14 @@ impl PadSrcHandler for SrcHandler { _ => (), } - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); + gst::log!(CAT, obj: pad, "Forwarding {:?}", event); jb.sink_pad.gst_pad().push_event(event) } - fn src_query(self, pad: &PadSrcRef, jb: &JitterBuffer, query: &mut gst::QueryRef) -> bool { + fn src_query(self, pad: &gst::Pad, jb: &JitterBuffer, query: &mut gst::QueryRef) -> bool { use gst::QueryViewMut; - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); + gst::log!(CAT, obj: pad, "Forwarding {:?}", query); match query.view_mut() { QueryViewMut::Latency(q) => { diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index 7a94da0b..04acca05 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -34,9 +34,7 @@ use std::time::Duration; use std::{u32, u64}; use crate::runtime::prelude::*; -use crate::runtime::{ - Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, PadSrcWeak, Task, -}; +use crate::runtime::{Context, PadSink, PadSinkWeak, PadSrc, PadSrcWeak, Task}; use crate::dataqueue::{DataQueue, DataQueueItem}; @@ -215,13 +213,12 @@ impl PadSinkHandler for ProxySinkPadHandler { fn sink_chain( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: super::ProxySink, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); + gst::log!(SINK_CAT, obj: pad, "Handling {:?}", buffer); let imp = elem.imp(); imp.enqueue_item(DataQueueItem::Buffer(buffer)).await } @@ -230,21 +227,20 @@ impl PadSinkHandler for ProxySinkPadHandler { fn sink_chain_list( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: super::ProxySink, list: gst::BufferList, ) -> BoxFuture<'static, Result> { async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", list); + gst::log!(SINK_CAT, obj: pad, "Handling {:?}", list); let imp = elem.imp(); imp.enqueue_item(DataQueueItem::BufferList(list)).await } .boxed() } - fn sink_event(self, pad: &PadSinkRef, imp: &ProxySink, event: gst::Event) -> bool { - gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); + fn sink_event(self, pad: &gst::Pad, imp: &ProxySink, event: gst::Event) -> bool { + gst::debug!(SINK_CAT, obj: pad, "Handling non-serialized {:?}", event); let src_pad = { let proxy_ctx = imp.proxy_ctx.lock().unwrap(); @@ -262,23 +258,27 @@ impl PadSinkHandler for ProxySinkPadHandler { } if let Some(src_pad) = src_pad { - gst::log!(SINK_CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); + gst::log!(SINK_CAT, obj: pad, "Forwarding non-serialized {:?}", event); src_pad.push_event(event) } else { - gst::error!(SINK_CAT, obj: pad.gst_pad(), "No src pad to forward non-serialized {:?} to", event); + gst::error!( + SINK_CAT, + obj: pad, + "No src pad to forward non-serialized {:?} to", + event + ); true } } fn sink_event_serialized( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: super::ProxySink, event: gst::Event, ) -> BoxFuture<'static, bool> { async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); + gst::log!(SINK_CAT, obj: pad, "Handling serialized {:?}", event); let imp = elem.imp(); @@ -291,7 +291,7 @@ impl PadSinkHandler for ProxySinkPadHandler { _ => (), } - gst::log!(SINK_CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); + gst::log!(SINK_CAT, obj: pad, "Queuing serialized {:?}", event); imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok() } .boxed() @@ -666,8 +666,8 @@ struct ProxySrcPadHandler; impl PadSrcHandler for ProxySrcPadHandler { type ElementImpl = ProxySrc; - fn src_event(self, pad: &PadSrcRef, imp: &ProxySrc, event: gst::Event) -> bool { - gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + fn src_event(self, pad: &gst::Pad, imp: &ProxySrc, event: gst::Event) -> bool { + gst::log!(SRC_CAT, obj: pad, "Handling {:?}", event); let sink_pad = { let proxy_ctx = imp.proxy_ctx.lock().unwrap(); @@ -684,7 +684,7 @@ impl PadSrcHandler for ProxySrcPadHandler { match event.view() { EventView::FlushStart(..) => { if let Err(err) = imp.task.flush_start().await_maybe_on_context() { - gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); + gst::error!(SRC_CAT, obj: pad, "FlushStart failed {:?}", err); gst::element_imp_error!( imp, gst::StreamError::Failed, @@ -696,7 +696,7 @@ impl PadSrcHandler for ProxySrcPadHandler { } EventView::FlushStop(..) => { if let Err(err) = imp.task.flush_stop().await_maybe_on_context() { - gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); + gst::error!(SRC_CAT, obj: pad, "FlushStop failed {:?}", err); gst::element_imp_error!( imp, gst::StreamError::Failed, @@ -710,16 +710,16 @@ impl PadSrcHandler for ProxySrcPadHandler { } if let Some(sink_pad) = sink_pad { - gst::log!(SRC_CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); + gst::log!(SRC_CAT, obj: pad, "Forwarding {:?}", event); sink_pad.push_event(event) } else { - gst::error!(SRC_CAT, obj: pad.gst_pad(), "No sink pad to forward {:?} to", event); + gst::error!(SRC_CAT, obj: pad, "No sink pad to forward {:?} to", event); false } } - fn src_query(self, pad: &PadSrcRef, _proxysrc: &ProxySrc, query: &mut gst::QueryRef) -> bool { - gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query); + fn src_query(self, pad: &gst::Pad, _proxysrc: &ProxySrc, query: &mut gst::QueryRef) -> bool { + gst::log!(SRC_CAT, obj: pad, "Handling {:?}", query); use gst::QueryViewMut; let ret = match query.view_mut() { @@ -733,7 +733,7 @@ impl PadSrcHandler for ProxySrcPadHandler { true } QueryViewMut::Caps(q) => { - let caps = if let Some(ref caps) = pad.gst_pad().current_caps() { + let caps = if let Some(ref caps) = pad.current_caps() { q.filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -751,9 +751,9 @@ impl PadSrcHandler for ProxySrcPadHandler { }; if ret { - gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", query); + gst::log!(SRC_CAT, obj: pad, "Handled {:?}", query); } else { - gst::log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query); + gst::log!(SRC_CAT, obj: pad, "Didn't handle {:?}", query); } ret diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index 2ca02f9c..683066bc 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -33,7 +33,7 @@ use std::time::Duration; use std::{u32, u64}; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task}; +use crate::runtime::{Context, PadSink, PadSrc, Task}; use crate::dataqueue::{DataQueue, DataQueueItem}; @@ -85,13 +85,12 @@ impl PadSinkHandler for QueuePadSinkHandler { fn sink_chain( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: super::Queue, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); + gst::log!(CAT, obj: pad, "Handling {:?}", buffer); let imp = elem.imp(); imp.enqueue_item(DataQueueItem::Buffer(buffer)).await } @@ -100,25 +99,24 @@ impl PadSinkHandler for QueuePadSinkHandler { fn sink_chain_list( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: super::Queue, list: gst::BufferList, ) -> BoxFuture<'static, Result> { async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list); + gst::log!(CAT, obj: pad, "Handling {:?}", list); let imp = elem.imp(); imp.enqueue_item(DataQueueItem::BufferList(list)).await } .boxed() } - fn sink_event(self, pad: &PadSinkRef, imp: &Queue, event: gst::Event) -> bool { - gst::debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); + fn sink_event(self, pad: &gst::Pad, imp: &Queue, event: gst::Event) -> bool { + gst::debug!(CAT, obj: pad, "Handling non-serialized {:?}", event); if let gst::EventView::FlushStart(..) = event.view() { if let Err(err) = imp.task.flush_start().await_maybe_on_context() { - gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); + gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err); gst::element_imp_error!( imp, gst::StreamError::Failed, @@ -129,25 +127,24 @@ impl PadSinkHandler for QueuePadSinkHandler { } } - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); + gst::log!(CAT, obj: pad, "Forwarding non-serialized {:?}", event); imp.src_pad.gst_pad().push_event(event) } fn sink_event_serialized( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: super::Queue, event: gst::Event, ) -> BoxFuture<'static, bool> { async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - gst::log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); + gst::log!(CAT, obj: pad, "Handling serialized {:?}", event); let imp = elem.imp(); if let gst::EventView::FlushStop(..) = event.view() { if let Err(err) = imp.task.flush_stop().await_maybe_on_context() { - gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); + gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err); gst::element_imp_error!( imp, gst::StreamError::Failed, @@ -158,21 +155,21 @@ impl PadSinkHandler for QueuePadSinkHandler { } } - gst::log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); + gst::log!(CAT, obj: pad, "Queuing serialized {:?}", event); imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok() } .boxed() } - fn sink_query(self, pad: &PadSinkRef, imp: &Queue, query: &mut gst::QueryRef) -> bool { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); + fn sink_query(self, pad: &gst::Pad, imp: &Queue, query: &mut gst::QueryRef) -> bool { + gst::log!(CAT, obj: pad, "Handling {:?}", query); if query.is_serialized() { // FIXME: How can we do this? - gst::log!(CAT, obj: pad.gst_pad(), "Dropping serialized {:?}", query); + gst::log!(CAT, obj: pad, "Dropping serialized {:?}", query); false } else { - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); + gst::log!(CAT, obj: pad, "Forwarding {:?}", query); imp.src_pad.gst_pad().peer_query(query) } } @@ -184,19 +181,19 @@ struct QueuePadSrcHandler; impl PadSrcHandler for QueuePadSrcHandler { type ElementImpl = Queue; - fn src_event(self, pad: &PadSrcRef, imp: &Queue, event: gst::Event) -> bool { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + fn src_event(self, pad: &gst::Pad, imp: &Queue, event: gst::Event) -> bool { + gst::log!(CAT, obj: pad, "Handling {:?}", event); use gst::EventView; match event.view() { EventView::FlushStart(..) => { if let Err(err) = imp.task.flush_start().await_maybe_on_context() { - gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); + gst::error!(CAT, obj: pad, "FlushStart failed {:?}", err); } } EventView::FlushStop(..) => { if let Err(err) = imp.task.flush_stop().await_maybe_on_context() { - gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); + gst::error!(CAT, obj: pad, "FlushStop failed {:?}", err); gst::element_imp_error!( imp, gst::StreamError::Failed, @@ -209,12 +206,12 @@ impl PadSrcHandler for QueuePadSrcHandler { _ => (), } - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); + gst::log!(CAT, obj: pad, "Forwarding {:?}", event); imp.sink_pad.gst_pad().push_event(event) } - fn src_query(self, pad: &PadSrcRef, imp: &Queue, query: &mut gst::QueryRef) -> bool { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); + fn src_query(self, pad: &gst::Pad, imp: &Queue, query: &mut gst::QueryRef) -> bool { + gst::log!(CAT, obj: pad, "Handling {:?}", query); if let gst::QueryViewMut::Scheduling(q) = query.view_mut() { let mut new_query = gst::query::Scheduling::new(); @@ -223,7 +220,7 @@ impl PadSrcHandler for QueuePadSrcHandler { return res; } - gst::log!(CAT, obj: pad.gst_pad(), "Upstream returned {:?}", new_query); + gst::log!(CAT, obj: pad, "Upstream returned {:?}", new_query); let (flags, min, max, align) = new_query.result(); q.set(flags, min, max, align); @@ -235,11 +232,11 @@ impl PadSrcHandler for QueuePadSrcHandler { .filter(|m| m != &gst::PadMode::Pull) .collect::>(), ); - gst::log!(CAT, obj: pad.gst_pad(), "Returning {:?}", q.query_mut()); + gst::log!(CAT, obj: pad, "Returning {:?}", q.query_mut()); return true; } - gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); + gst::log!(CAT, obj: pad, "Forwarding {:?}", query); imp.sink_pad.gst_pad().peer_query(query) } } diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs index 857f15a1..865dbf84 100644 --- a/generic/threadshare/src/runtime/pad.rs +++ b/generic/threadshare/src/runtime/pad.rs @@ -123,36 +123,28 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { fn src_activate( self, - pad: &PadSrcRef, + pad: &gst::Pad, _imp: &Self::ElementImpl, ) -> Result<(), gst::LoggableError> { - let gst_pad = pad.gst_pad(); - if gst_pad.is_active() { + if pad.is_active() { gst::debug!( RUNTIME_CAT, - obj: gst_pad, + obj: pad, "Already activated in {:?} mode ", - gst_pad.mode() + pad.mode() ); return Ok(()); } - gst_pad - .activate_mode(gst::PadMode::Push, true) - .map_err(|err| { - gst::error!( - RUNTIME_CAT, - obj: gst_pad, - "Error in PadSrc activate: {:?}", - err - ); - gst::loggable_error!(RUNTIME_CAT, "Error in PadSrc activate: {:?}", err) - }) + pad.activate_mode(gst::PadMode::Push, true).map_err(|err| { + gst::error!(RUNTIME_CAT, obj: pad, "Error in PadSrc activate: {:?}", err); + gst::loggable_error!(RUNTIME_CAT, "Error in PadSrc activate: {:?}", err) + }) } fn src_activatemode( self, - _pad: &PadSrcRef, + _pad: &gst::Pad, _imp: &Self::ElementImpl, _mode: gst::PadMode, _active: bool, @@ -160,8 +152,8 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { Ok(()) } - fn src_event(self, pad: &PadSrcRef, imp: &Self::ElementImpl, event: gst::Event) -> bool { - gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + fn src_event(self, pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event) -> bool { + gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", event); let elem = imp.obj(); // FIXME with GAT on `Self::ElementImpl`, we should be able to @@ -170,12 +162,12 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`. let element = unsafe { elem.unsafe_cast_ref::() }; - gst::Pad::event_default(pad.gst_pad(), Some(element), event) + gst::Pad::event_default(pad, Some(element), event) } fn src_event_full( self, - pad: &PadSrcRef, + pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event, ) -> Result { @@ -185,19 +177,14 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { event_to_event_full(self.src_event(pad, imp, event), event_type) } - fn src_query( - self, - pad: &PadSrcRef, - imp: &Self::ElementImpl, - query: &mut gst::QueryRef, - ) -> bool { - gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query); + fn src_query(self, pad: &gst::Pad, imp: &Self::ElementImpl, query: &mut gst::QueryRef) -> bool { + gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", query); if query.is_serialized() { // FIXME serialized queries should be handled with the dataflow // but we can't return a `Future` because we couldn't honor QueryRef's lifetime false } else { - gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query); + gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", query); let elem = imp.obj(); // FIXME with GAT on `Self::ElementImpl`, we should be able to @@ -206,7 +193,7 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`. let element = unsafe { elem.unsafe_cast_ref::() }; - gst::Pad::query_default(pad.gst_pad(), Some(element), query) + gst::Pad::query_default(pad, Some(element), query) } } } @@ -230,11 +217,11 @@ impl PadSrcInner { } pub async fn push(&self, buffer: gst::Buffer) -> Result { - gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer); + gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Pushing {:?}", buffer); let success = self.gst_pad.push(buffer).map_err(|err| { gst::error!(RUNTIME_CAT, - obj: self.gst_pad(), + obj: self.gst_pad, "Failed to push Buffer to PadSrc: {:?}", err, ); @@ -248,12 +235,12 @@ impl PadSrcInner { } pub async fn push_list(&self, list: gst::BufferList) -> Result { - gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list); + gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Pushing {:?}", list); let success = self.gst_pad.push_list(list).map_err(|err| { gst::error!( RUNTIME_CAT, - obj: self.gst_pad(), + obj: self.gst_pad, "Failed to push BufferList to PadSrc: {:?}", err, ); @@ -269,7 +256,7 @@ impl PadSrcInner { pub async fn push_event(&self, event: gst::Event) -> bool { gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Pushing {:?}", event); - let was_handled = self.gst_pad().push_event(event); + let was_handled = self.gst_pad.push_event(event); gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Processing any pending sub tasks"); if Context::drain_sub_tasks().await.is_err() { @@ -326,26 +313,6 @@ impl<'a> PadSrcRef<'a> { pub fn downgrade(&self) -> PadSrcWeak { PadSrcWeak(Arc::downgrade(&self.strong)) } - - fn activate_mode_hook( - &self, - mode: gst::PadMode, - active: bool, - ) -> Result<(), gst::LoggableError> { - // Important: don't panic here as the hook is used without `catch_panic_pad_function` - // in the default `activatemode` handling - gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "ActivateMode {:?}, {}", mode, active); - - if mode == gst::PadMode::Pull { - gst::error!(RUNTIME_CAT, obj: self.gst_pad(), "Pull mode not supported by PadSrc"); - return Err(gst::loggable_error!( - RUNTIME_CAT, - "Pull mode not supported by PadSrc" - )); - } - - Ok(()) - } } impl<'a> Deref for PadSrcRef<'a> { @@ -384,19 +351,17 @@ impl PadSrc { } pub fn check_reconfigure(&self) -> bool { - self.0.gst_pad().check_reconfigure() + self.0.gst_pad.check_reconfigure() } fn init_pad_functions(&self, handler: H) { // FIXME: Do this better unsafe { let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&self.0); self.0 - .gst_pad() + .gst_pad .set_activate_function(move |gst_pad, parent| { let handler = handler_clone.clone(); - let inner_arc = inner_arc.clone(); H::ElementImpl::catch_panic_pad_function( parent, || { @@ -406,16 +371,15 @@ impl PadSrc { "Panic in PadSrc activate" )) }, - move |imp| H::src_activate(handler, &PadSrcRef::new(inner_arc), imp), + move |imp| H::src_activate(handler, gst_pad, imp), ) }); let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&self.0); - self.gst_pad() + self.0 + .gst_pad .set_activatemode_function(move |gst_pad, parent, mode, active| { let handler = handler_clone.clone(); - let inner_arc = inner_arc.clone(); H::ElementImpl::catch_panic_pad_function( parent, || { @@ -426,9 +390,27 @@ impl PadSrc { )) }, move |imp| { - let this_ref = PadSrcRef::new(inner_arc); - this_ref.activate_mode_hook(mode, active)?; - H::src_activatemode(handler, &this_ref, imp, mode, active) + gst::log!( + RUNTIME_CAT, + obj: gst_pad, + "ActivateMode {:?}, {}", + mode, + active + ); + + if mode == gst::PadMode::Pull { + gst::error!( + RUNTIME_CAT, + obj: gst_pad, + "Pull mode not supported by PadSrc" + ); + return Err(gst::loggable_error!( + RUNTIME_CAT, + "Pull mode not supported by PadSrc" + )); + } + + H::src_activatemode(handler, gst_pad, imp, mode, active) }, ) }); @@ -436,38 +418,38 @@ impl PadSrc { // No need to `set_event_function` since `set_event_full_function` // overrides it and dispatches to `src_event` when necessary let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&self.0); - self.gst_pad() - .set_event_full_function(move |_gst_pad, parent, event| { + self.0 + .gst_pad + .set_event_full_function(move |gst_pad, parent, event| { let handler = handler_clone.clone(); - let inner_arc = inner_arc.clone(); H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), - move |imp| { - H::src_event_full(handler, &PadSrcRef::new(inner_arc), imp, event) - }, + move |imp| H::src_event_full(handler, gst_pad, imp, event), ) }); - let inner_arc = Arc::clone(&self.0); - self.gst_pad() - .set_query_function(move |_gst_pad, parent, query| { - let handler = handler.clone(); - let inner_arc = inner_arc.clone(); - H::ElementImpl::catch_panic_pad_function( - parent, - || false, - move |imp| { - if !query.is_serialized() { - H::src_query(handler, &PadSrcRef::new(inner_arc), imp, query) - } else { - gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported"); - false - } - }, - ) - }); + self.0 + .gst_pad + .set_query_function(move |gst_pad, parent, query| { + let handler = handler.clone(); + H::ElementImpl::catch_panic_pad_function( + parent, + || false, + move |imp| { + if !query.is_serialized() { + H::src_query(handler, gst_pad, imp, query) + } else { + gst::fixme!( + RUNTIME_CAT, + obj: gst_pad, + "Serialized Query not supported" + ); + false + } + }, + ) + }); } } } @@ -476,19 +458,24 @@ impl Drop for PadSrc { fn drop(&mut self) { // FIXME: Do this better unsafe { - self.gst_pad() + self.0 + .gst_pad .set_activate_function(move |_gst_pad, _parent| { Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists")) }); - self.gst_pad() + self.0 + .gst_pad .set_activatemode_function(move |_gst_pad, _parent, _mode, _active| { Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists")) }); - self.gst_pad() + self.0 + .gst_pad .set_event_function(move |_gst_pad, _parent, _event| false); - self.gst_pad() + self.0 + .gst_pad .set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing)); - self.gst_pad() + self.0 + .gst_pad .set_query_function(move |_gst_pad, _parent, _query| false); } } @@ -514,36 +501,33 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_activate( self, - pad: &PadSinkRef, + pad: &gst::Pad, _imp: &Self::ElementImpl, ) -> Result<(), gst::LoggableError> { - let gst_pad = pad.gst_pad(); - if gst_pad.is_active() { + if pad.is_active() { gst::debug!( RUNTIME_CAT, - obj: gst_pad, + obj: pad, "Already activated in {:?} mode ", - gst_pad.mode() + pad.mode() ); return Ok(()); } - gst_pad - .activate_mode(gst::PadMode::Push, true) - .map_err(|err| { - gst::error!( - RUNTIME_CAT, - obj: gst_pad, - "Error in PadSink activate: {:?}", - err - ); - gst::loggable_error!(RUNTIME_CAT, "Error in PadSink activate: {:?}", err) - }) + pad.activate_mode(gst::PadMode::Push, true).map_err(|err| { + gst::error!( + RUNTIME_CAT, + obj: pad, + "Error in PadSink activate: {:?}", + err + ); + gst::loggable_error!(RUNTIME_CAT, "Error in PadSink activate: {:?}", err) + }) } fn sink_activatemode( self, - _pad: &PadSinkRef, + _pad: &gst::Pad, _imp: &Self::ElementImpl, _mode: gst::PadMode, _active: bool, @@ -553,7 +537,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_chain( self, - _pad: PadSinkWeak, + _pad: gst::Pad, _elem: ::Type, _buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { @@ -562,16 +546,16 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_chain_list( self, - _pad: PadSinkWeak, + _pad: gst::Pad, _elem: ::Type, _buffer_list: gst::BufferList, ) -> BoxFuture<'static, Result> { future::err(FlowError::NotSupported).boxed() } - fn sink_event(self, pad: &PadSinkRef, imp: &Self::ElementImpl, event: gst::Event) -> bool { + fn sink_event(self, pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event) -> bool { assert!(!event.is_serialized()); - gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", event); let elem = imp.obj(); // FIXME with GAT on `Self::ElementImpl`, we should be able to @@ -580,12 +564,12 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`. let element = unsafe { elem.unsafe_cast_ref::() }; - gst::Pad::event_default(pad.gst_pad(), Some(element), event) + gst::Pad::event_default(pad, Some(element), event) } fn sink_event_serialized( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: ::Type, event: gst::Event, ) -> BoxFuture<'static, bool> { @@ -597,17 +581,16 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { let element = unsafe { elem.unsafe_cast::() }; async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", event); - gst::Pad::event_default(pad.gst_pad(), Some(&element), event) + gst::Pad::event_default(&pad, Some(&element), event) } .boxed() } fn sink_event_full( self, - pad: &PadSinkRef, + pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event, ) -> Result { @@ -620,7 +603,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_event_full_serialized( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: ::Type, event: gst::Event, ) -> BoxFuture<'static, Result> { @@ -636,17 +619,17 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_query( self, - pad: &PadSinkRef, + pad: &gst::Pad, imp: &Self::ElementImpl, query: &mut gst::QueryRef, ) -> bool { if query.is_serialized() { - gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Dropping {:?}", query); + gst::log!(RUNTIME_CAT, obj: pad, "Dropping {:?}", query); // FIXME serialized queries should be handled with the dataflow // but we can't return a `Future` because we couldn't honor QueryRef's lifetime false } else { - gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query); + gst::log!(RUNTIME_CAT, obj: pad, "Handling {:?}", query); let elem = imp.obj(); // FIXME with GAT on `Self::ElementImpl`, we should be able to @@ -655,7 +638,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`. let element = unsafe { elem.unsafe_cast_ref::() }; - gst::Pad::query_default(pad.gst_pad(), Some(element), query) + gst::Pad::query_default(pad, Some(element), query) } } } @@ -724,26 +707,6 @@ impl<'a> PadSinkRef<'a> { pub fn downgrade(&self) -> PadSinkWeak { PadSinkWeak(Arc::downgrade(&self.strong)) } - - fn activate_mode_hook( - &self, - mode: gst::PadMode, - active: bool, - ) -> Result<(), gst::LoggableError> { - // Important: don't panic here as the hook is used without `catch_panic_pad_function` - // in the default `activatemode` handling - gst::log!(RUNTIME_CAT, obj: self.gst_pad(), "ActivateMode {:?}, {}", mode, active); - - if mode == gst::PadMode::Pull { - gst::error!(RUNTIME_CAT, obj: self.gst_pad(), "Pull mode not supported by PadSink"); - return Err(gst::loggable_error!( - RUNTIME_CAT, - "Pull mode not supported by PadSink" - )); - } - - Ok(()) - } } impl<'a> Deref for PadSinkRef<'a> { @@ -794,12 +757,10 @@ impl PadSink { { unsafe { let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&self.0); - self.gst_pad() + self.0 + .gst_pad .set_activate_function(move |gst_pad, parent| { let handler = handler_clone.clone(); - let inner_arc = inner_arc.clone(); - H::ElementImpl::catch_panic_pad_function( parent, || { @@ -809,16 +770,15 @@ impl PadSink { "Panic in PadSink activate" )) }, - move |imp| H::sink_activate(handler, &PadSinkRef::new(inner_arc), imp), + move |imp| H::sink_activate(handler, gst_pad, imp), ) }); let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&self.0); - self.gst_pad() + self.0 + .gst_pad .set_activatemode_function(move |gst_pad, parent, mode, active| { let handler = handler_clone.clone(); - let inner_arc = inner_arc.clone(); H::ElementImpl::catch_panic_pad_function( parent, || { @@ -829,36 +789,53 @@ impl PadSink { )) }, move |imp| { - let this_ref = PadSinkRef::new(inner_arc); - this_ref.activate_mode_hook(mode, active)?; - H::sink_activatemode(handler, &this_ref, imp, mode, active) + gst::log!( + RUNTIME_CAT, + obj: gst_pad, + "ActivateMode {:?}, {}", + mode, + active + ); + + if mode == gst::PadMode::Pull { + gst::error!( + RUNTIME_CAT, + obj: gst_pad, + "Pull mode not supported by PadSink" + ); + return Err(gst::loggable_error!( + RUNTIME_CAT, + "Pull mode not supported by PadSink" + )); + } + + H::sink_activatemode(handler, gst_pad, imp, mode, active) }, ) }); let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&self.0); - self.gst_pad() - .set_chain_function(move |_gst_pad, parent, buffer| { + self.0 + .gst_pad + .set_chain_function(move |gst_pad, parent, buffer| { let handler = handler_clone.clone(); - let inner_arc = inner_arc.clone(); H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), move |imp| { - let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let elem = imp.obj().clone(); + let gst_pad = gst_pad.clone(); if let Some((ctx, task_id)) = Context::current_task() { let delayed_fut = async move { - H::sink_chain(handler, this_weak, elem, buffer).await + H::sink_chain(handler, gst_pad, elem, buffer).await }; let _ = ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop))); Ok(gst::FlowSuccess::Ok) } else { - let chain_fut = H::sink_chain(handler, this_weak, elem, buffer); + let chain_fut = H::sink_chain(handler, gst_pad, elem, buffer); executor::block_on(chain_fut) } }, @@ -866,21 +843,20 @@ impl PadSink { }); let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&self.0); - self.gst_pad() - .set_chain_list_function(move |_gst_pad, parent, list| { + self.0 + .gst_pad + .set_chain_list_function(move |gst_pad, parent, list| { let handler = handler_clone.clone(); - let inner_arc = inner_arc.clone(); H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), move |imp| { - let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let elem = imp.obj().clone(); + let gst_pad = gst_pad.clone(); if let Some((ctx, task_id)) = Context::current_task() { let delayed_fut = async move { - H::sink_chain_list(handler, this_weak, elem, list).await + H::sink_chain_list(handler, gst_pad, elem, list).await }; let _ = ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop))); @@ -888,7 +864,7 @@ impl PadSink { Ok(gst::FlowSuccess::Ok) } else { let chain_list_fut = - H::sink_chain_list(handler, this_weak, elem, list); + H::sink_chain_list(handler, gst_pad, elem, list); executor::block_on(chain_list_fut) } }, @@ -898,25 +874,22 @@ impl PadSink { // No need to `set_event_function` since `set_event_full_function` // overrides it and dispatches to `sink_event` when necessary let handler_clone = handler.clone(); - let inner_arc = Arc::clone(&self.0); - self.gst_pad() - .set_event_full_function(move |_gst_pad, parent, event| { + self.0 + .gst_pad + .set_event_full_function(move |gst_pad, parent, event| { let handler = handler_clone.clone(); - let inner_arc = inner_arc.clone(); H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), move |imp| { if event.is_serialized() { - let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let elem = imp.obj().clone(); + let gst_pad = gst_pad.clone(); if let Some((ctx, task_id)) = Context::current_task() { let delayed_fut = async move { - H::sink_event_full_serialized( - handler, this_weak, elem, event, - ) - .await + H::sink_event_full_serialized(handler, gst_pad, elem, event) + .await }; let _ = ctx.add_sub_task( task_id, @@ -926,35 +899,38 @@ impl PadSink { Ok(gst::FlowSuccess::Ok) } else { let event_fut = H::sink_event_full_serialized( - handler, this_weak, elem, event, + handler, gst_pad, elem, event, ); executor::block_on(event_fut) } } else { - handler.sink_event_full(&PadSinkRef::new(inner_arc), imp, event) + handler.sink_event_full(gst_pad, imp, event) } }, ) }); - let inner_arc = Arc::clone(&self.0); - self.gst_pad() - .set_query_function(move |_gst_pad, parent, query| { - let handler = handler.clone(); - let inner_arc = inner_arc.clone(); - H::ElementImpl::catch_panic_pad_function( - parent, - || false, - move |imp| { - if !query.is_serialized() { - H::sink_query(handler, &PadSinkRef::new(inner_arc), imp, query) - } else { - gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported"); - false - } - }, - ) - }); + self.0 + .gst_pad + .set_query_function(move |gst_pad, parent, query| { + let handler = handler.clone(); + H::ElementImpl::catch_panic_pad_function( + parent, + || false, + move |imp| { + if !query.is_serialized() { + H::sink_query(handler, gst_pad, imp, query) + } else { + gst::fixme!( + RUNTIME_CAT, + obj: gst_pad, + "Serialized Query not supported" + ); + false + } + }, + ) + }); } } } @@ -963,29 +939,36 @@ impl Drop for PadSink { fn drop(&mut self) { // FIXME: Do this better unsafe { - self.gst_pad() + self.0 + .gst_pad .set_activate_function(move |_gst_pad, _parent| { Err(gst::loggable_error!( RUNTIME_CAT, "PadSink no longer exists" )) }); - self.gst_pad() + self.0 + .gst_pad .set_activatemode_function(move |_gst_pad, _parent, _mode, _active| { Err(gst::loggable_error!( RUNTIME_CAT, "PadSink no longer exists" )) }); - self.gst_pad() + self.0 + .gst_pad .set_chain_function(move |_gst_pad, _parent, _buffer| Err(FlowError::Flushing)); - self.gst_pad() + self.0 + .gst_pad .set_chain_list_function(move |_gst_pad, _parent, _list| Err(FlowError::Flushing)); - self.gst_pad() + self.0 + .gst_pad .set_event_function(move |_gst_pad, _parent, _event| false); - self.gst_pad() + self.0 + .gst_pad .set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing)); - self.gst_pad() + self.0 + .gst_pad .set_query_function(move |_gst_pad, _parent, _query| false); } } diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index 96d105e3..71e702b5 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -36,7 +36,7 @@ use std::u32; use crate::runtime::prelude::*; use crate::runtime::task; -use crate::runtime::{Context, PadSrc, PadSrcRef, Task, TaskState}; +use crate::runtime::{Context, PadSrc, Task, TaskState}; use crate::runtime::Async; use crate::socket::{Socket, SocketError, SocketRead}; @@ -96,8 +96,8 @@ struct TcpClientSrcPadHandler; impl PadSrcHandler for TcpClientSrcPadHandler { type ElementImpl = TcpClientSrc; - fn src_event(self, pad: &PadSrcRef, imp: &TcpClientSrc, event: gst::Event) -> bool { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + fn src_event(self, pad: &gst::Pad, imp: &TcpClientSrc, event: gst::Event) -> bool { + gst::log!(CAT, obj: pad, "Handling {:?}", event); use gst::EventView; let ret = match event.view() { @@ -109,16 +109,16 @@ impl PadSrcHandler for TcpClientSrcPadHandler { }; if ret { - gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event); + gst::log!(CAT, obj: pad, "Handled {:?}", event); } else { - gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); + gst::log!(CAT, obj: pad, "Didn't handle {:?}", event); } ret } - fn src_query(self, pad: &PadSrcRef, imp: &TcpClientSrc, query: &mut gst::QueryRef) -> bool { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); + fn src_query(self, pad: &gst::Pad, imp: &TcpClientSrc, query: &mut gst::QueryRef) -> bool { + gst::log!(CAT, obj: pad, "Handling {:?}", query); use gst::QueryViewMut; let ret = match query.view_mut() { @@ -150,9 +150,9 @@ impl PadSrcHandler for TcpClientSrcPadHandler { }; if ret { - gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query); + gst::log!(CAT, obj: pad, "Handled {:?}", query); } else { - gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query); + gst::log!(CAT, obj: pad, "Didn't handle {:?}", query); } ret diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index 2b45f7ff..f9186b33 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -30,7 +30,7 @@ use gst::{element_error, error_msg}; use once_cell::sync::Lazy; use crate::runtime::prelude::*; -use crate::runtime::{self, Async, Context, PadSink, PadSinkRef, PadSinkWeak, Task}; +use crate::runtime::{self, Async, Context, PadSink, Task}; use crate::socket::{wrap_socket, GioSocketWrapper}; use std::collections::BTreeSet; @@ -134,7 +134,7 @@ impl PadSinkHandler for UdpSinkPadHandler { fn sink_chain( self, - _pad: PadSinkWeak, + _pad: gst::Pad, elem: super::UdpSink, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { @@ -152,7 +152,7 @@ impl PadSinkHandler for UdpSinkPadHandler { fn sink_chain_list( self, - _pad: PadSinkWeak, + _pad: gst::Pad, elem: super::UdpSink, list: gst::BufferList, ) -> BoxFuture<'static, Result> { @@ -172,7 +172,7 @@ impl PadSinkHandler for UdpSinkPadHandler { fn sink_event_serialized( self, - _pad: PadSinkWeak, + _pad: gst::Pad, elem: super::UdpSink, event: gst::Event, ) -> BoxFuture<'static, bool> { @@ -190,7 +190,7 @@ impl PadSinkHandler for UdpSinkPadHandler { .boxed() } - fn sink_event(self, _pad: &PadSinkRef, imp: &UdpSink, event: gst::Event) -> bool { + fn sink_event(self, _pad: &gst::Pad, imp: &UdpSink, event: gst::Event) -> bool { if let EventView::FlushStart(..) = event.view() { return imp.task.flush_start().await_maybe_on_context().is_ok(); } diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index e9b6fcdc..cfd95de8 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -35,7 +35,7 @@ use std::time::Duration; use std::u16; use crate::runtime::prelude::*; -use crate::runtime::{Async, Context, PadSrc, PadSrcRef, Task}; +use crate::runtime::{Async, Context, PadSrc, Task}; use crate::socket::{wrap_socket, GioSocketWrapper, Socket, SocketError, SocketRead}; @@ -113,8 +113,8 @@ struct UdpSrcPadHandler; impl PadSrcHandler for UdpSrcPadHandler { type ElementImpl = UdpSrc; - fn src_event(self, pad: &PadSrcRef, imp: &UdpSrc, event: gst::Event) -> bool { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + fn src_event(self, pad: &gst::Pad, imp: &UdpSrc, event: gst::Event) -> bool { + gst::log!(CAT, obj: pad, "Handling {:?}", event); use gst::EventView; let ret = match event.view() { @@ -126,16 +126,16 @@ impl PadSrcHandler for UdpSrcPadHandler { }; if ret { - gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", event); + gst::log!(CAT, obj: pad, "Handled {:?}", event); } else { - gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); + gst::log!(CAT, obj: pad, "Didn't handle {:?}", event); } ret } - fn src_query(self, pad: &PadSrcRef, imp: &UdpSrc, query: &mut gst::QueryRef) -> bool { - gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); + fn src_query(self, pad: &gst::Pad, imp: &UdpSrc, query: &mut gst::QueryRef) -> bool { + gst::log!(CAT, obj: pad, "Handling {:?}", query); use gst::QueryViewMut; let ret = match query.view_mut() { @@ -167,9 +167,9 @@ impl PadSrcHandler for UdpSrcPadHandler { }; if ret { - gst::log!(CAT, obj: pad.gst_pad(), "Handled {:?}", query); + gst::log!(CAT, obj: pad, "Handled {:?}", query); } else { - gst::log!(CAT, obj: pad.gst_pad(), "Didn't handle {:?}", query); + gst::log!(CAT, obj: pad, "Didn't handle {:?}", query); } ret diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs index 78f32fb9..9c6257c7 100644 --- a/generic/threadshare/tests/pad.rs +++ b/generic/threadshare/tests/pad.rs @@ -36,9 +36,7 @@ use std::sync::Mutex; use std::time::Duration; use gstthreadshare::runtime::prelude::*; -use gstthreadshare::runtime::{ - Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task, TaskState, -}; +use gstthreadshare::runtime::{Context, PadSink, PadSrc, Task, TaskState}; const DEFAULT_CONTEXT: &str = ""; const THROTTLING_DURATION: Duration = Duration::from_millis(2); @@ -89,8 +87,8 @@ mod imp_src { impl PadSrcHandler for PadSrcTestHandler { type ElementImpl = ElementSrcTest; - fn src_event(self, pad: &PadSrcRef, imp: &ElementSrcTest, event: gst::Event) -> bool { - gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + fn src_event(self, pad: &gst::Pad, imp: &ElementSrcTest, event: gst::Event) -> bool { + gst::log!(SRC_CAT, obj: pad, "Handling {:?}", event); let ret = match event.view() { EventView::FlushStart(..) => { @@ -102,9 +100,9 @@ mod imp_src { }; if ret { - gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handled {:?}", event); + gst::log!(SRC_CAT, obj: pad, "Handled {:?}", event); } else { - gst::log!(SRC_CAT, obj: pad.gst_pad(), "Didn't handle {:?}", event); + gst::log!(SRC_CAT, obj: pad, "Didn't handle {:?}", event); } ret @@ -441,7 +439,7 @@ mod imp_sink { fn sink_chain( self, - _pad: PadSinkWeak, + _pad: gst::Pad, elem: super::ElementSinkTest, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { @@ -454,7 +452,7 @@ mod imp_sink { fn sink_chain_list( self, - _pad: PadSinkWeak, + _pad: gst::Pad, elem: super::ElementSinkTest, list: gst::BufferList, ) -> BoxFuture<'static, Result> { @@ -465,8 +463,8 @@ mod imp_sink { .boxed() } - fn sink_event(self, pad: &PadSinkRef, imp: &ElementSinkTest, event: gst::Event) -> bool { - gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); + fn sink_event(self, pad: &gst::Pad, imp: &ElementSinkTest, event: gst::Event) -> bool { + gst::debug!(SINK_CAT, obj: pad, "Handling non-serialized {:?}", event); match event.view() { EventView::FlushStart(..) => { @@ -479,13 +477,12 @@ mod imp_sink { fn sink_event_serialized( self, - pad: PadSinkWeak, + pad: gst::Pad, elem: super::ElementSinkTest, event: gst::Event, ) -> BoxFuture<'static, bool> { async move { - let pad = pad.upgrade().expect("PadSink no longer exists"); - gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); + gst::log!(SINK_CAT, obj: pad, "Handling serialized {:?}", event); let imp = elem.imp(); if let EventView::FlushStop(..) = event.view() {