From 2bffdec691c441c9effefa4f27f72d64681c0bf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Wed, 12 Oct 2022 12:35:20 +0200 Subject: [PATCH] ts: better use of `imp` & `elem` args in `Pad{Sink,Src}Handler`s This is a follow-up to commit 7ee4afac. This commit cleans up the `Pad{Sink,Src}Handler` by - Keeping arguments which are strictly necessary. - Passing arguments by value for the trait functions which return a `Future`. The arguments which were previously passed by reference were `clone`d internally and then `clone`d again in most implementations. There are unfortunate differences in trait function signatures between those which return a `Future` and the sync functions. This is due to the requirement for the arguments to be moved to the resulting `Future`, whereas sync functions can rely on references. One particular notable difference is the use of the `imp` in sync functions instead of the `elem` in functions returning a `Future`. Because the `imp` is not guaranteed to implement `Clone`, we can't move it to the resulting `Future`, so the `elem` is used. --- .../examples/standalone/sink/imp.rs | 59 ++-- generic/threadshare/src/appsrc/imp.rs | 44 +-- generic/threadshare/src/inputselector/imp.rs | 92 ++---- generic/threadshare/src/jitterbuffer/imp.rs | 131 +++----- generic/threadshare/src/proxy/imp.rs | 116 +++---- generic/threadshare/src/queue/imp.rs | 131 +++----- generic/threadshare/src/runtime/pad.rs | 289 +++++++----------- generic/threadshare/src/tcpclientsrc/imp.rs | 38 +-- generic/threadshare/src/udpsink/imp.rs | 55 ++-- generic/threadshare/src/udpsrc/imp.rs | 28 +- generic/threadshare/tests/pad.rs | 92 ++---- 11 files changed, 370 insertions(+), 705 deletions(-) diff --git a/generic/threadshare/examples/standalone/sink/imp.rs b/generic/threadshare/examples/standalone/sink/imp.rs index 62ea73b6..7991dd2a 100644 --- a/generic/threadshare/examples/standalone/sink/imp.rs +++ b/generic/threadshare/examples/standalone/sink/imp.rs @@ -19,7 +19,7 @@ use gst::EventView; use once_cell::sync::Lazy; use gstthreadshare::runtime::prelude::*; -use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, Task}; +use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, Task}; use std::sync::Mutex; use std::task::Poll; @@ -76,18 +76,15 @@ impl PadSinkHandler for TestSinkPadHandler { type ElementImpl = TestSink; fn sink_chain( - &self, - _pad: &PadSinkRef, - test_sink: &TestSink, - element: &gst::Element, + self, + _pad: PadSinkWeak, + elem: super::TestSink, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let sender = test_sink.clone_item_sender(); - let element = element.clone().downcast::().unwrap(); - + let sender = elem.imp().clone_item_sender(); async move { if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); + gst::debug!(CAT, obj: &elem, "Flushing"); return Err(gst::FlowError::Flushing); } @@ -97,19 +94,16 @@ impl PadSinkHandler for TestSinkPadHandler { } fn sink_chain_list( - &self, - _pad: &PadSinkRef, - test_sink: &TestSink, - element: &gst::Element, + self, + _pad: PadSinkWeak, + elem: super::TestSink, list: gst::BufferList, ) -> BoxFuture<'static, Result> { - let sender = test_sink.clone_item_sender(); - let element = element.clone().downcast::().unwrap(); - + let sender = elem.imp().clone_item_sender(); async move { for buffer in list.iter_owned() { if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); + gst::debug!(CAT, obj: &elem, "Flushing"); return Err(gst::FlowError::Flushing); } } @@ -120,21 +114,18 @@ impl PadSinkHandler for TestSinkPadHandler { } fn sink_event_serialized( - &self, - _pad: &PadSinkRef, - test_sink: &TestSink, - element: &gst::Element, + self, + _pad: PadSinkWeak, + elem: super::TestSink, event: gst::Event, ) -> BoxFuture<'static, bool> { - let sender = test_sink.clone_item_sender(); - let element = element.clone().downcast::().unwrap(); - + let sender = elem.imp().clone_item_sender(); async move { if let EventView::FlushStop(_) = event.view() { - let test_sink = element.imp(); - return test_sink.task.flush_stop().await_maybe_on_context().is_ok(); + let imp = elem.imp(); + return imp.task.flush_stop().await_maybe_on_context().is_ok(); } else if sender.send_async(StreamItem::Event(event)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); + gst::debug!(CAT, obj: &elem, "Flushing"); } true @@ -142,19 +133,9 @@ impl PadSinkHandler for TestSinkPadHandler { .boxed() } - fn sink_event( - &self, - _pad: &PadSinkRef, - test_sink: &TestSink, - _element: &gst::Element, - event: gst::Event, - ) -> bool { + fn sink_event(&self, _pad: &PadSinkRef, imp: &TestSink, event: gst::Event) -> bool { if let EventView::FlushStart(..) = event.view() { - return test_sink - .task - .flush_start() - .await_maybe_on_context() - .is_ok(); + return imp.task.flush_start().await_maybe_on_context().is_ok(); } true diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index 8eee681a..b5636e08 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -82,20 +82,13 @@ struct AppSrcPadHandler; impl PadSrcHandler for AppSrcPadHandler { type ElementImpl = AppSrc; - fn src_event( - &self, - pad: &PadSrcRef, - appsrc: &AppSrc, - _element: &gst::Element, - event: gst::Event, - ) -> bool { - use gst::EventView; - + fn src_event(&self, pad: &PadSrcRef, imp: &AppSrc, event: gst::Event) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + use gst::EventView; let ret = match event.view() { - EventView::FlushStart(..) => appsrc.task.flush_start().await_maybe_on_context().is_ok(), - EventView::FlushStop(..) => appsrc.task.flush_stop().await_maybe_on_context().is_ok(), + EventView::FlushStart(..) => imp.task.flush_start().await_maybe_on_context().is_ok(), + EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(), EventView::Reconfigure(..) => true, EventView::Latency(..) => true, _ => false, @@ -110,16 +103,10 @@ impl PadSrcHandler for AppSrcPadHandler { ret } - fn src_query( - &self, - pad: &PadSrcRef, - appsrc: &AppSrc, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryViewMut; - + fn src_query(&self, pad: &PadSrcRef, imp: &AppSrc, query: &mut gst::QueryRef) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); + + use gst::QueryViewMut; let ret = match query.view_mut() { QueryViewMut::Latency(q) => { q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE); @@ -131,7 +118,7 @@ impl PadSrcHandler for AppSrcPadHandler { true } QueryViewMut::Caps(q) => { - let caps = if let Some(caps) = appsrc.configured_caps.lock().unwrap().as_ref() { + let caps = if let Some(caps) = imp.configured_caps.lock().unwrap().as_ref() { q.filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) @@ -328,8 +315,9 @@ impl AppSrc { let do_timestamp = self.settings.lock().unwrap().do_timestamp; if do_timestamp { - if let Some(clock) = self.instance().clock() { - let base_time = self.instance().base_time(); + let elem = self.instance(); + if let Some(clock) = elem.clock() { + let base_time = elem.base_time(); let now = clock.time(); let buffer = buffer.make_mut(); @@ -499,11 +487,10 @@ impl ObjectImpl for AppSrc { .return_type::() .action() .class_handler(|_, args| { - let element = args[0].get::().expect("signal arg"); + let elem = args[0].get::().expect("signal arg"); let buffer = args[1].get::().expect("signal arg"); - let appsrc = element.imp(); - Some(appsrc.push_buffer(buffer).to_value()) + Some(elem.imp().push_buffer(buffer).to_value()) }) .build(), /** @@ -516,10 +503,9 @@ impl ObjectImpl for AppSrc { .return_type::() .action() .class_handler(|_, args| { - let element = args[0].get::().expect("signal arg"); - let appsrc = element.imp(); + let elem = args[0].get::().expect("signal arg"); - Some(appsrc.end_of_stream().to_value()) + Some(elem.imp().end_of_stream().to_value()) }) .build(), ] diff --git a/generic/threadshare/src/inputselector/imp.rs b/generic/threadshare/src/inputselector/imp.rs index 4175f4cf..d29320b3 100644 --- a/generic/threadshare/src/inputselector/imp.rs +++ b/generic/threadshare/src/inputselector/imp.rs @@ -33,7 +33,7 @@ use std::time::Duration; use std::u32; use crate::runtime::prelude::*; -use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef}; +use crate::runtime::{self, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef}; const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; @@ -75,14 +75,10 @@ struct InputSelectorPadSinkHandler(Arc>) impl InputSelectorPadSinkHandler { /* Wait until specified time */ - async fn sync( - &self, - element: &super::InputSelector, - running_time: impl Into>, - ) { - let now = element.current_running_time(); + async fn sync(&self, elem: &super::InputSelector, running_time: Option) { + let now = elem.current_running_time(); - match running_time.into().opt_checked_sub(now) { + match running_time.opt_checked_sub(now) { Ok(Some(delay)) => { runtime::timer::delay_for(delay.into()).await; } @@ -92,11 +88,11 @@ impl InputSelectorPadSinkHandler { async fn handle_item( &self, - element: &super::InputSelector, pad: &PadSinkRef<'_>, + elem: &super::InputSelector, mut buffer: gst::Buffer, ) -> Result { - let inputselector = element.imp(); + let inputselector = elem.imp(); let (stickies, is_active, sync_future, switched_pad) = { let mut state = inputselector.state.lock().unwrap(); @@ -108,7 +104,7 @@ impl InputSelectorPadSinkHandler { if let Some(segment) = &inner.segment { if let Some(segment) = segment.downcast_ref::() { let rtime = segment.to_running_time(buffer.pts()); - let (sync_fut, abort_handle) = abortable(self.sync(element, rtime)); + let (sync_fut, abort_handle) = abortable(self.sync(elem, rtime)); inner.abort_handle = Some(abort_handle); sync_future = Some(sync_fut.map_err(|_| gst::FlowError::Flushing)); } @@ -162,38 +158,30 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { type ElementImpl = InputSelector; fn sink_chain( - &self, - pad: &PadSinkRef, - _inputselector: &InputSelector, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::InputSelector, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let this = self.clone(); - let element = element.clone().downcast::().unwrap(); - let pad_weak = pad.downgrade(); async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - this.handle_item(&element, &pad, buffer).await + let pad = pad.upgrade().expect("PadSink no longer exists"); + self.handle_item(&pad, &elem, buffer).await } .boxed() } fn sink_chain_list( - &self, - pad: &PadSinkRef, - _inputselector: &InputSelector, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::InputSelector, list: gst::BufferList, ) -> BoxFuture<'static, Result> { - let this = self.clone(); - let element = element.clone().downcast::().unwrap(); - let pad_weak = pad.downgrade(); async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + let pad = pad.upgrade().expect("PadSink no longer exists"); gst::log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list); // TODO: Ideally we would keep the list intact and forward it in one go for buffer in list.iter_owned() { - this.handle_item(&element, &pad, buffer).await?; + self.handle_item(&pad, &elem, buffer).await?; } Ok(gst::FlowSuccess::Ok) @@ -202,16 +190,13 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { } fn sink_event_serialized( - &self, - _pad: &PadSinkRef, - _inputselector: &InputSelector, - _element: &gst::Element, + self, + _pad: PadSinkWeak, + _elem: super::InputSelector, event: gst::Event, ) -> BoxFuture<'static, bool> { - let this = self.clone(); - async move { - let mut inner = this.0.lock().unwrap(); + let mut inner = self.0.lock().unwrap(); // Remember the segment for later use if let gst::EventView::Segment(e) = event.view() { @@ -234,17 +219,11 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { .boxed() } - fn sink_event( - &self, - _pad: &PadSinkRef, - inputselector: &InputSelector, - _element: &gst::Element, - event: gst::Event, - ) -> bool { + fn sink_event(&self, _pad: &PadSinkRef, imp: &InputSelector, event: gst::Event) -> bool { /* Drop all events for now */ if let gst::EventView::FlushStart(..) = event.view() { /* Unblock downstream */ - inputselector.src_pad.gst_pad().push_event(event.clone()); + imp.src_pad.gst_pad().push_event(event.clone()); let mut inner = self.0.lock().unwrap(); @@ -255,13 +234,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { true } - fn sink_query( - &self, - pad: &PadSinkRef, - inputselector: &InputSelector, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { + fn sink_query(&self, pad: &PadSinkRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query); if query.is_serialized() { @@ -270,7 +243,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler { false } else { gst::log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query); - inputselector.src_pad.gst_pad().peer_query(query) + imp.src_pad.gst_pad().peer_query(query) } } } @@ -281,24 +254,17 @@ struct InputSelectorPadSrcHandler; impl PadSrcHandler for InputSelectorPadSrcHandler { type ElementImpl = InputSelector; - fn src_query( - &self, - pad: &PadSrcRef, - inputselector: &InputSelector, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryViewMut; - + fn src_query(&self, pad: &PadSrcRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); + use gst::QueryViewMut; match query.view_mut() { QueryViewMut::Latency(q) => { let mut ret = true; let mut min_latency = gst::ClockTime::ZERO; let mut max_latency = gst::ClockTime::NONE; let pads = { - let pads = inputselector.pads.lock().unwrap(); + let pads = imp.pads.lock().unwrap(); pads.sink_pads .iter() .map(|p| p.0.clone()) @@ -325,7 +291,7 @@ impl PadSrcHandler for InputSelectorPadSrcHandler { } _ => { let sinkpad = { - let state = inputselector.state.lock().unwrap(); + let state = imp.state.lock().unwrap(); state.active_sinkpad.clone() }; diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index f2ec87fa..44cd903b 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -36,7 +36,7 @@ use std::sync::Mutex as StdMutex; use std::time::Duration; use crate::runtime::prelude::*; -use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task}; +use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task}; use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx}; @@ -143,14 +143,10 @@ impl SinkHandler { } // For resetting if seqnum discontinuities - fn reset( - &self, - inner: &mut SinkHandlerInner, - state: &mut State, - element: &super::JitterBuffer, - ) -> BTreeSet { - gst::info!(CAT, obj: element, "Resetting"); + fn reset(&self, inner: &mut SinkHandlerInner, jb: &JitterBuffer) -> BTreeSet { + gst::info!(CAT, imp: jb, "Resetting"); + let mut state = jb.state.lock().unwrap(); state.jbuf.flush(); state.jbuf.reset_skew(); state.discont = true; @@ -174,23 +170,23 @@ impl SinkHandler { &self, inner: &mut SinkHandlerInner, state: &mut State, - element: &super::JitterBuffer, + jb: &JitterBuffer, caps: &gst::Caps, pt: u8, ) -> Result { let s = caps.structure(0).ok_or(gst::FlowError::Error)?; - gst::debug!(CAT, obj: element, "Parsing {:?}", caps); + gst::debug!(CAT, imp: jb, "Parsing {:?}", caps); let payload = s.get::("payload").map_err(|err| { - gst::debug!(CAT, obj: element, "Caps 'payload': {}", err); + gst::debug!(CAT, imp: jb, "Caps 'payload': {}", err); gst::FlowError::Error })?; if pt != 0 && payload as u8 != pt { gst::debug!( CAT, - obj: element, + imp: jb, "Caps 'payload' ({}) doesn't match payload type ({})", payload, pt @@ -200,12 +196,12 @@ impl SinkHandler { inner.last_pt = Some(pt); let clock_rate = s.get::("clock-rate").map_err(|err| { - gst::debug!(CAT, obj: element, "Caps 'clock-rate': {}", err); + gst::debug!(CAT, imp: jb, "Caps 'clock-rate': {}", err); gst::FlowError::Error })?; if clock_rate <= 0 { - gst::debug!(CAT, obj: element, "Caps 'clock-rate' <= 0"); + gst::debug!(CAT, imp: jb, "Caps 'clock-rate' <= 0"); return Err(gst::FlowError::Error); } state.clock_rate = Some(clock_rate as u32); @@ -253,7 +249,7 @@ impl SinkHandler { fn handle_big_gap_buffer( &self, inner: &mut SinkHandlerInner, - element: &super::JitterBuffer, + jb: &JitterBuffer, buffer: gst::Buffer, pt: u8, ) -> bool { @@ -262,7 +258,7 @@ impl SinkHandler { gst::debug!( CAT, - obj: element, + imp: jb, "Handling big gap, gap packets length: {}", gap_packets_length ); @@ -276,7 +272,7 @@ impl SinkHandler { for gap_packet in inner.gap_packets.iter() { gst::log!( CAT, - obj: element, + imp: jb, "Looking at gap packet with seq {}", gap_packet.seq, ); @@ -296,7 +292,7 @@ impl SinkHandler { } } - gst::debug!(CAT, obj: element, "all consecutive: {}", all_consecutive); + gst::debug!(CAT, imp: jb, "all consecutive: {}", all_consecutive); if all_consecutive && gap_packets_length > 3 { reset = true; @@ -312,10 +308,9 @@ impl SinkHandler { &self, inner: &mut SinkHandlerInner, pad: &gst::Pad, - element: &super::JitterBuffer, + jb: &JitterBuffer, buffer: gst::Buffer, ) -> Result { - let jb = element.imp(); let mut state = jb.state.lock().unwrap(); let (max_misorder_time, max_dropout_time) = { @@ -339,13 +334,15 @@ impl SinkHandler { gst::log!( CAT, - obj: element, + imp: jb, "Storing buffer, seq: {}, rtptime: {}, pt: {}", seq, rtptime, pt ); + let element = jb.instance(); + if dts.is_none() { dts = pts; } else if pts.is_none() { @@ -374,7 +371,7 @@ impl SinkHandler { if let Some(caps) = pad.current_caps() { /* Ignore errors at this point, as we want to emit request-pt-map */ - let _ = self.parse_caps(inner, &mut state, element, &caps, pt); + let _ = self.parse_caps(inner, &mut state, jb, &caps, pt); } } @@ -388,7 +385,7 @@ impl SinkHandler { gst::FlowError::Error })?; let mut state = jb.state.lock().unwrap(); - self.parse_caps(inner, &mut state, element, &caps, pt)?; + self.parse_caps(inner, &mut state, jb, &caps, pt)?; state } else { state @@ -407,7 +404,7 @@ impl SinkHandler { if pts.is_none() { gst::debug!( CAT, - obj: element, + imp: jb, "cannot calculate a valid pts for #{}, discard", seq ); @@ -420,7 +417,7 @@ impl SinkHandler { self.calculate_packet_spacing(inner, &mut state, rtptime, pts); } else { if (gap != -1 && gap < -(max_misorder as i32)) || (gap >= max_dropout as i32) { - let reset = self.handle_big_gap_buffer(inner, element, buffer, pt); + let reset = self.handle_big_gap_buffer(inner, jb, buffer, pt); if reset { // Handle reset in `enqueue_item` to avoid recursion return Err(gst::FlowError::CustomError); @@ -440,7 +437,7 @@ impl SinkHandler { if gap <= 0 { state.stats.num_late += 1; - gst::debug!(CAT, obj: element, "Dropping late {}", seq); + gst::debug!(CAT, imp: jb, "Dropping late {}", seq); return Ok(gst::FlowSuccess::Ok); } } @@ -492,7 +489,7 @@ impl SinkHandler { fn enqueue_item( &self, pad: &gst::Pad, - element: &super::JitterBuffer, + jb: &JitterBuffer, buffer: Option, ) -> Result { let mut inner = self.0.lock().unwrap(); @@ -504,12 +501,10 @@ impl SinkHandler { // This is to avoid recursion with `store`, `reset` and `enqueue_item` while let Some(buf) = buffers.pop_front() { - if let Err(err) = self.store(&mut inner, pad, element, buf) { + if let Err(err) = self.store(&mut inner, pad, jb, buf) { match err { gst::FlowError::CustomError => { - let jb = element.imp(); - let mut state = jb.state.lock().unwrap(); - for gap_packet in self.reset(&mut inner, &mut state, element) { + for gap_packet in self.reset(&mut inner, jb) { buffers.push_back(gap_packet.buffer); } } @@ -518,7 +513,6 @@ impl SinkHandler { } } - let jb = element.imp(); let mut state = jb.state.lock().unwrap(); let (latency, context_wait) = { @@ -529,7 +523,7 @@ impl SinkHandler { // Reschedule if needed let (_, next_wakeup) = jb.src_pad_handler - .next_wakeup(element, &state, latency, context_wait); + .next_wakeup(&jb.instance(), &state, latency, context_wait); if let Some((next_wakeup, _)) = next_wakeup { if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle { if previous_next_wakeup.is_none() @@ -555,32 +549,20 @@ impl PadSinkHandler for SinkHandler { type ElementImpl = JitterBuffer; fn sink_chain( - &self, - pad: &PadSinkRef, - _jb: &JitterBuffer, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::JitterBuffer, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let pad_weak = pad.downgrade(); - let element = element.clone().downcast::().unwrap(); - let this = self.clone(); - async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - + let pad = pad.upgrade().expect("PadSink no longer exists"); gst::debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); - this.enqueue_item(pad.gst_pad(), &element, Some(buffer)) + self.enqueue_item(pad.gst_pad(), elem.imp(), Some(buffer)) } .boxed() } - fn sink_event( - &self, - pad: &PadSinkRef, - jb: &JitterBuffer, - element: &gst::Element, - event: gst::Event, - ) -> bool { + fn sink_event(&self, pad: &PadSinkRef, jb: &JitterBuffer, event: gst::Event) -> bool { use gst::EventView; gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); @@ -588,8 +570,8 @@ impl PadSinkHandler for SinkHandler { if let EventView::FlushStart(..) = event.view() { if let Err(err) = jb.task.flush_start().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); - gst::element_error!( - element, + gst::element_imp_error!( + jb, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStart failed {:?}", err] @@ -603,25 +585,20 @@ impl PadSinkHandler for SinkHandler { } fn sink_event_serialized( - &self, - pad: &PadSinkRef, - _jb: &JitterBuffer, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::JitterBuffer, event: gst::Event, ) -> BoxFuture<'static, bool> { - use gst::EventView; - - let pad_weak = pad.downgrade(); - let element = element.clone().downcast::().unwrap(); - async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + let pad = pad.upgrade().expect("PadSink no longer exists"); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); - let jb = element.imp(); + let jb = elem.imp(); let mut forward = true; + use gst::EventView; match event.view() { EventView::Segment(e) => { let mut state = jb.state.lock().unwrap(); @@ -631,7 +608,7 @@ impl PadSinkHandler for SinkHandler { if let Err(err) = jb.task.flush_stop().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); gst::element_error!( - element, + elem, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStop failed {:?}", err] @@ -893,13 +870,7 @@ impl SrcHandler { impl PadSrcHandler for SrcHandler { type ElementImpl = JitterBuffer; - fn src_event( - &self, - pad: &PadSrcRef, - jb: &JitterBuffer, - element: &gst::Element, - event: gst::Event, - ) -> bool { + fn src_event(&self, pad: &PadSrcRef, jb: &JitterBuffer, event: gst::Event) -> bool { use gst::EventView; gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); @@ -908,8 +879,8 @@ impl PadSrcHandler for SrcHandler { EventView::FlushStart(..) => { if let Err(err) = jb.task.flush_start().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); - gst::element_error!( - element, + gst::element_imp_error!( + jb, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStart failed {:?}", err] @@ -920,8 +891,8 @@ impl PadSrcHandler for SrcHandler { EventView::FlushStop(..) => { if let Err(err) = jb.task.flush_stop().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); - gst::element_error!( - element, + gst::element_imp_error!( + jb, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStop failed {:?}", err] @@ -936,13 +907,7 @@ impl PadSrcHandler for SrcHandler { jb.sink_pad.gst_pad().push_event(event) } - fn src_query( - &self, - pad: &PadSrcRef, - jb: &JitterBuffer, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { + fn src_query(&self, pad: &PadSrcRef, jb: &JitterBuffer, query: &mut gst::QueryRef) -> bool { use gst::QueryViewMut; gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index e7ce6b30..9cad8078 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -214,57 +214,40 @@ impl PadSinkHandler for ProxySinkPadHandler { type ElementImpl = ProxySink; fn sink_chain( - &self, - pad: &PadSinkRef, - _proxysink: &ProxySink, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::ProxySink, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let pad_weak = pad.downgrade(); - let element = element.clone().downcast::().unwrap(); - async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + let pad = pad.upgrade().expect("PadSink no longer exists"); gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); - let proxysink = element.imp(); - proxysink.enqueue_item(DataQueueItem::Buffer(buffer)).await + let imp = elem.imp(); + imp.enqueue_item(DataQueueItem::Buffer(buffer)).await } .boxed() } fn sink_chain_list( - &self, - pad: &PadSinkRef, - _proxysink: &ProxySink, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::ProxySink, list: gst::BufferList, ) -> BoxFuture<'static, Result> { - let pad_weak = pad.downgrade(); - let element = element.clone().downcast::().unwrap(); async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + let pad = pad.upgrade().expect("PadSink no longer exists"); gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", list); - let proxysink = element.imp(); - proxysink - .enqueue_item(DataQueueItem::BufferList(list)) - .await + let imp = elem.imp(); + imp.enqueue_item(DataQueueItem::BufferList(list)).await } .boxed() } - fn sink_event( - &self, - pad: &PadSinkRef, - proxysink: &ProxySink, - _element: &gst::Element, - event: gst::Event, - ) -> bool { - use gst::EventView; - + fn sink_event(&self, pad: &PadSinkRef, imp: &ProxySink, event: gst::Event) -> bool { gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); let src_pad = { - let proxy_ctx = proxysink.proxy_ctx.lock().unwrap(); + let proxy_ctx = imp.proxy_ctx.lock().unwrap(); PROXY_SRC_PADS .lock() @@ -274,8 +257,8 @@ impl PadSinkHandler for ProxySinkPadHandler { .map(|src_pad| src_pad.gst_pad().clone()) }; - if let EventView::FlushStart(..) = event.view() { - proxysink.stop(); + if let gst::EventView::FlushStart(..) = event.view() { + imp.stop(); } if let Some(src_pad) = src_pad { @@ -288,36 +271,28 @@ impl PadSinkHandler for ProxySinkPadHandler { } fn sink_event_serialized( - &self, - pad: &PadSinkRef, - _proxysink: &ProxySink, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::ProxySink, event: gst::Event, ) -> BoxFuture<'static, bool> { - use gst::EventView; - - gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); - - let pad_weak = pad.downgrade(); - let element = element.clone().downcast::().unwrap(); async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - let proxysink = element.imp(); + let pad = pad.upgrade().expect("PadSink no longer exists"); + gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); + let imp = elem.imp(); + + use gst::EventView; match event.view() { EventView::Eos(..) => { - let _ = - element.post_message(gst::message::Eos::builder().src(&element).build()); + let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build()); } - EventView::FlushStop(..) => proxysink.start(), + EventView::FlushStop(..) => imp.start(), _ => (), } gst::log!(SINK_CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); - proxysink - .enqueue_item(DataQueueItem::Event(event)) - .await - .is_ok() + imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok() } .boxed() } @@ -691,19 +666,11 @@ struct ProxySrcPadHandler; impl PadSrcHandler for ProxySrcPadHandler { type ElementImpl = ProxySrc; - fn src_event( - &self, - pad: &PadSrcRef, - proxysrc: &ProxySrc, - element: &gst::Element, - event: gst::Event, - ) -> bool { - use gst::EventView; - + fn src_event(&self, pad: &PadSrcRef, imp: &ProxySrc, event: gst::Event) -> bool { gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); let sink_pad = { - let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap(); + let proxy_ctx = imp.proxy_ctx.lock().unwrap(); PROXY_SINK_PADS .lock() @@ -713,12 +680,13 @@ impl PadSrcHandler for ProxySrcPadHandler { .map(|sink_pad| sink_pad.gst_pad().clone()) }; + use gst::EventView; match event.view() { EventView::FlushStart(..) => { - if let Err(err) = proxysrc.task.flush_start().await_maybe_on_context() { + if let Err(err) = imp.task.flush_start().await_maybe_on_context() { gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); - gst::element_error!( - element, + gst::element_imp_error!( + imp, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStart failed {:?}", err] @@ -727,10 +695,10 @@ impl PadSrcHandler for ProxySrcPadHandler { } } EventView::FlushStop(..) => { - if let Err(err) = proxysrc.task.flush_stop().await_maybe_on_context() { + if let Err(err) = imp.task.flush_stop().await_maybe_on_context() { gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); - gst::element_error!( - element, + gst::element_imp_error!( + imp, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStop failed {:?}", err] @@ -750,16 +718,10 @@ impl PadSrcHandler for ProxySrcPadHandler { } } - fn src_query( - &self, - pad: &PadSrcRef, - _proxysrc: &ProxySrc, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryViewMut; - + fn src_query(&self, pad: &PadSrcRef, _proxysrc: &ProxySrc, query: &mut gst::QueryRef) -> bool { gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query); + + use gst::QueryViewMut; let ret = match query.view_mut() { QueryViewMut::Latency(q) => { q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE); diff --git a/generic/threadshare/src/queue/imp.rs b/generic/threadshare/src/queue/imp.rs index b3d1df62..32f6f472 100644 --- a/generic/threadshare/src/queue/imp.rs +++ b/generic/threadshare/src/queue/imp.rs @@ -33,7 +33,7 @@ use std::time::Duration; use std::{u32, u64}; use crate::runtime::prelude::*; -use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task}; +use crate::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task}; use crate::dataqueue::{DataQueue, DataQueueItem}; @@ -84,57 +84,43 @@ impl PadSinkHandler for QueuePadSinkHandler { type ElementImpl = Queue; fn sink_chain( - &self, - pad: &PadSinkRef, - _queue: &Queue, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::Queue, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let pad_weak = pad.downgrade(); - let element = element.clone().downcast::().unwrap(); async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + let pad = pad.upgrade().expect("PadSink no longer exists"); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); - let queue = element.imp(); - queue.enqueue_item(DataQueueItem::Buffer(buffer)).await + let imp = elem.imp(); + imp.enqueue_item(DataQueueItem::Buffer(buffer)).await } .boxed() } fn sink_chain_list( - &self, - pad: &PadSinkRef, - _queue: &Queue, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::Queue, list: gst::BufferList, ) -> BoxFuture<'static, Result> { - let pad_weak = pad.downgrade(); - let element = element.clone().downcast::().unwrap(); async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + let pad = pad.upgrade().expect("PadSink no longer exists"); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list); - let queue = element.imp(); - queue.enqueue_item(DataQueueItem::BufferList(list)).await + let imp = elem.imp(); + imp.enqueue_item(DataQueueItem::BufferList(list)).await } .boxed() } - fn sink_event( - &self, - pad: &PadSinkRef, - queue: &Queue, - element: &gst::Element, - event: gst::Event, - ) -> bool { - use gst::EventView; - + fn sink_event(&self, pad: &PadSinkRef, imp: &Queue, event: gst::Event) -> bool { gst::debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); - if let EventView::FlushStart(..) = event.view() { - if let Err(err) = queue.task.flush_start().await_maybe_on_context() { + if let gst::EventView::FlushStart(..) = event.view() { + if let Err(err) = imp.task.flush_start().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); - gst::element_error!( - element, + gst::element_imp_error!( + imp, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStart failed {:?}", err] @@ -144,31 +130,26 @@ impl PadSinkHandler for QueuePadSinkHandler { } gst::log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); - queue.src_pad.gst_pad().push_event(event) + imp.src_pad.gst_pad().push_event(event) } fn sink_event_serialized( - &self, - pad: &PadSinkRef, - _queue: &Queue, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::Queue, event: gst::Event, ) -> BoxFuture<'static, bool> { - use gst::EventView; - - gst::log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); - - let pad_weak = pad.downgrade(); - let element = element.clone().downcast::().unwrap(); async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); - let queue = element.imp(); + let pad = pad.upgrade().expect("PadSink no longer exists"); + gst::log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); - if let EventView::FlushStop(..) = event.view() { - if let Err(err) = queue.task.flush_stop().await_maybe_on_context() { + let imp = elem.imp(); + + if let gst::EventView::FlushStop(..) = event.view() { + if let Err(err) = imp.task.flush_stop().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); - gst::element_error!( - element, + gst::element_imp_error!( + imp, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStop failed {:?}", err] @@ -178,21 +159,12 @@ impl PadSinkHandler for QueuePadSinkHandler { } gst::log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); - queue - .enqueue_item(DataQueueItem::Event(event)) - .await - .is_ok() + imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok() } .boxed() } - fn sink_query( - &self, - pad: &PadSinkRef, - queue: &Queue, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { + fn sink_query(&self, pad: &PadSinkRef, imp: &Queue, query: &mut gst::QueryRef) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); if query.is_serialized() { @@ -201,7 +173,7 @@ impl PadSinkHandler for QueuePadSinkHandler { false } else { gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); - queue.src_pad.gst_pad().peer_query(query) + imp.src_pad.gst_pad().peer_query(query) } } } @@ -212,28 +184,21 @@ struct QueuePadSrcHandler; impl PadSrcHandler for QueuePadSrcHandler { type ElementImpl = Queue; - fn src_event( - &self, - pad: &PadSrcRef, - queue: &Queue, - element: &gst::Element, - event: gst::Event, - ) -> bool { - use gst::EventView; - + fn src_event(&self, pad: &PadSrcRef, imp: &Queue, event: gst::Event) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + use gst::EventView; match event.view() { EventView::FlushStart(..) => { - if let Err(err) = queue.task.flush_start().await_maybe_on_context() { + if let Err(err) = imp.task.flush_start().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); } } EventView::FlushStop(..) => { - if let Err(err) = queue.task.flush_stop().await_maybe_on_context() { + if let Err(err) = imp.task.flush_stop().await_maybe_on_context() { gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); - gst::element_error!( - element, + gst::element_imp_error!( + imp, gst::StreamError::Failed, ("Internal data stream error"), ["FlushStop failed {:?}", err] @@ -245,23 +210,15 @@ impl PadSrcHandler for QueuePadSrcHandler { } gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); - queue.sink_pad.gst_pad().push_event(event) + imp.sink_pad.gst_pad().push_event(event) } - fn src_query( - &self, - pad: &PadSrcRef, - queue: &Queue, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryViewMut; - + fn src_query(&self, pad: &PadSrcRef, imp: &Queue, query: &mut gst::QueryRef) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); - if let QueryViewMut::Scheduling(q) = query.view_mut() { + if let gst::QueryViewMut::Scheduling(q) = query.view_mut() { let mut new_query = gst::query::Scheduling::new(); - let res = queue.sink_pad.gst_pad().peer_query(&mut new_query); + let res = imp.sink_pad.gst_pad().peer_query(&mut new_query); if !res { return res; } @@ -283,7 +240,7 @@ impl PadSrcHandler for QueuePadSrcHandler { } gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); - queue.sink_pad.gst_pad().peer_query(query) + imp.sink_pad.gst_pad().peer_query(query) } } diff --git a/generic/threadshare/src/runtime/pad.rs b/generic/threadshare/src/runtime/pad.rs index 7b1b1496..28afb780 100644 --- a/generic/threadshare/src/runtime/pad.rs +++ b/generic/threadshare/src/runtime/pad.rs @@ -118,13 +118,13 @@ fn event_to_event_full_serialized( /// [`PadSrc`]: struct.PadSrc.html /// [`pad` module]: index.html pub trait PadSrcHandler: Clone + Send + Sync + 'static { + // FIXME we should use a GAT here: ObjectSubclass + Send> type ElementImpl: ElementImpl + ObjectSubclass; fn src_activate( &self, pad: &PadSrcRef, _imp: &Self::ElementImpl, - _element: &gst::Element, ) -> Result<(), gst::LoggableError> { let gst_pad = pad.gst_pad(); if gst_pad.is_active() { @@ -154,21 +154,22 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { &self, _pad: &PadSrcRef, _imp: &Self::ElementImpl, - _element: &gst::Element, _mode: gst::PadMode, _active: bool, ) -> Result<(), gst::LoggableError> { Ok(()) } - fn src_event( - &self, - pad: &PadSrcRef, - _imp: &Self::ElementImpl, - element: &gst::Element, - event: gst::Event, - ) -> bool { + fn src_event(&self, pad: &PadSrcRef, imp: &Self::ElementImpl, event: gst::Event) -> bool { gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + + let elem = imp.instance(); + // FIXME with GAT on `Self::ElementImpl`, we should be able to + // use `.upcast::()` + // + // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`. + let element = unsafe { elem.unsafe_cast_ref::() }; + pad.gst_pad().event_default(Some(element), event) } @@ -176,20 +177,18 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { &self, pad: &PadSrcRef, imp: &Self::ElementImpl, - element: &gst::Element, event: gst::Event, ) -> Result { // default is to dispatch to `src_event` // (as implemented in `gst_pad_send_event_unchecked`) let event_type = event.type_(); - event_to_event_full(self.src_event(pad, imp, element, event), event_type) + event_to_event_full(self.src_event(pad, imp, event), event_type) } fn src_query( &self, pad: &PadSrcRef, - _imp: &Self::ElementImpl, - element: &gst::Element, + imp: &Self::ElementImpl, query: &mut gst::QueryRef, ) -> bool { gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query); @@ -198,6 +197,15 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static { // but we can't return a `Future` because we couldn't honor QueryRef's lifetime false } else { + gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query); + + let elem = imp.instance(); + // FIXME with GAT on `Self::ElementImpl`, we should be able to + // use `.upcast::()` + // + // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`. + let element = unsafe { elem.unsafe_cast_ref::() }; + pad.gst_pad().query_default(Some(element), query) } } @@ -398,15 +406,7 @@ impl PadSrc { "Panic in PadSrc activate" )) }, - move |imp| { - let this_ref = PadSrcRef::new(inner_arc); - let element = imp.instance(); - handler.src_activate( - &this_ref, - imp, - element.dynamic_cast_ref::().unwrap(), - ) - }, + move |imp| handler.src_activate(&PadSrcRef::new(inner_arc), imp), ) }); @@ -427,15 +427,8 @@ impl PadSrc { }, move |imp| { let this_ref = PadSrcRef::new(inner_arc); - let element = imp.instance(); this_ref.activate_mode_hook(mode, active)?; - handler.src_activatemode( - &this_ref, - imp, - element.dynamic_cast_ref::().unwrap(), - mode, - active, - ) + handler.src_activatemode(&this_ref, imp, mode, active) }, ) }); @@ -451,16 +444,7 @@ impl PadSrc { H::ElementImpl::catch_panic_pad_function( parent, || Err(FlowError::Error), - move |imp| { - let this_ref = PadSrcRef::new(inner_arc); - let element = imp.instance(); - handler.src_event_full( - &this_ref, - imp, - element.dynamic_cast_ref::().unwrap(), - event, - ) - }, + move |imp| handler.src_event_full(&PadSrcRef::new(inner_arc), imp, event), ) }); @@ -473,12 +457,10 @@ impl PadSrc { parent, || false, move |imp| { - let this_ref = PadSrcRef::new(inner_arc); - let element = imp.instance(); if !query.is_serialized() { - handler.src_query(&this_ref, imp, element.dynamic_cast_ref::().unwrap(), query) + handler.src_query(&PadSrcRef::new(inner_arc), imp, query) } else { - gst::fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported"); + gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported"); false } }, @@ -525,15 +507,13 @@ impl Deref for PadSrc { /// [`PadSink`]: struct.PadSink.html /// [`pad` module]: index.html pub trait PadSinkHandler: Clone + Send + Sync + 'static { + // FIXME we should use a GAT here: ObjectSubclass + Send> type ElementImpl: ElementImpl + ObjectSubclass; - // FIXME: Once associated type bounds are stable we should use ObjectSubclass::Type below - // instead of &gst::Element fn sink_activate( &self, pad: &PadSinkRef, _imp: &Self::ElementImpl, - _element: &gst::Element, ) -> Result<(), gst::LoggableError> { let gst_pad = pad.gst_pad(); if gst_pad.is_active() { @@ -563,7 +543,6 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { &self, _pad: &PadSinkRef, _imp: &Self::ElementImpl, - _element: &gst::Element, _mode: gst::PadMode, _active: bool, ) -> Result<(), gst::LoggableError> { @@ -571,50 +550,52 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { } fn sink_chain( - &self, - _pad: &PadSinkRef, - _imp: &Self::ElementImpl, - _element: &gst::Element, + self, + _pad: PadSinkWeak, + _elem: ::Type, _buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { future::err(FlowError::NotSupported).boxed() } fn sink_chain_list( - &self, - _pad: &PadSinkRef, - _imp: &Self::ElementImpl, - _element: &gst::Element, + self, + _pad: PadSinkWeak, + _elem: ::Type, _buffer_list: gst::BufferList, ) -> BoxFuture<'static, Result> { future::err(FlowError::NotSupported).boxed() } - fn sink_event( - &self, - pad: &PadSinkRef, - _imp: &Self::ElementImpl, - element: &gst::Element, - event: gst::Event, - ) -> bool { + fn sink_event(&self, pad: &PadSinkRef, imp: &Self::ElementImpl, event: gst::Event) -> bool { assert!(!event.is_serialized()); gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); + + let elem = imp.instance(); + // FIXME with GAT on `Self::ElementImpl`, we should be able to + // use `.upcast::()` + // + // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`. + let element = unsafe { elem.unsafe_cast_ref::() }; + pad.gst_pad().event_default(Some(element), event) } fn sink_event_serialized( - &self, - pad: &PadSinkRef, - _imp: &Self::ElementImpl, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: ::Type, event: gst::Event, ) -> BoxFuture<'static, bool> { assert!(event.is_serialized()); - let pad_weak = pad.downgrade(); - let element = element.clone(); + // FIXME with GAT on `Self::ElementImpl`, we should be able to + // use `.upcast::()` + // + // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`. + let element = unsafe { elem.unsafe_cast::() }; async move { - let pad = pad_weak.upgrade().expect("PadSink no longer exists"); + let pad = pad.upgrade().expect("PadSink no longer exists"); gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); pad.gst_pad().event_default(Some(&element), event) @@ -626,21 +607,19 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { &self, pad: &PadSinkRef, imp: &Self::ElementImpl, - element: &gst::Element, event: gst::Event, ) -> Result { assert!(!event.is_serialized()); // default is to dispatch to `sink_event` // (as implemented in `gst_pad_send_event_unchecked`) let event_type = event.type_(); - event_to_event_full(self.sink_event(pad, imp, element, event), event_type) + event_to_event_full(self.sink_event(pad, imp, event), event_type) } fn sink_event_full_serialized( - &self, - pad: &PadSinkRef, - imp: &Self::ElementImpl, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: ::Type, event: gst::Event, ) -> BoxFuture<'static, Result> { assert!(event.is_serialized()); @@ -648,7 +627,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { // (as implemented in `gst_pad_send_event_unchecked`) let event_type = event.type_(); event_to_event_full_serialized( - self.sink_event_serialized(pad, imp, element, event), + Self::sink_event_serialized(self, pad, elem, event), event_type, ) } @@ -656,8 +635,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { fn sink_query( &self, pad: &PadSinkRef, - _imp: &Self::ElementImpl, - element: &gst::Element, + imp: &Self::ElementImpl, query: &mut gst::QueryRef, ) -> bool { if query.is_serialized() { @@ -667,6 +645,14 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static { false } else { gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query); + + let elem = imp.instance(); + // FIXME with GAT on `Self::ElementImpl`, we should be able to + // use `.upcast::()` + // + // Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`. + let element = unsafe { elem.unsafe_cast_ref::() }; + pad.gst_pad().query_default(Some(element), query) } } @@ -778,13 +764,6 @@ impl<'a> Deref for PadSinkRef<'a> { pub struct PadSink(Arc); impl PadSink { - pub fn new(gst_pad: gst::Pad, handler: impl PadSinkHandler) -> Self { - let this = PadSink(Arc::new(PadSinkInner::new(gst_pad))); - this.init_pad_functions(handler); - - this - } - pub fn downgrade(&self) -> PadSinkWeak { PadSinkWeak(Arc::downgrade(&self.0)) } @@ -792,9 +771,25 @@ impl PadSink { pub fn as_ref(&self) -> PadSinkRef<'_> { PadSinkRef::new(Arc::clone(&self.0)) } +} - fn init_pad_functions(&self, handler: H) { - // FIXME: Do this better +impl PadSink { + pub fn new(gst_pad: gst::Pad, handler: H) -> Self + where + H: PadSinkHandler, + ::Type: IsA + Send, + { + let this = PadSink(Arc::new(PadSinkInner::new(gst_pad))); + this.init_pad_functions(handler); + + this + } + + fn init_pad_functions(&self, handler: H) + where + H: PadSinkHandler, + ::Type: IsA + Send, + { unsafe { let handler_clone = handler.clone(); let inner_arc = Arc::clone(&self.0); @@ -802,6 +797,7 @@ impl PadSink { .set_activate_function(move |gst_pad, parent| { let handler = handler_clone.clone(); let inner_arc = inner_arc.clone(); + H::ElementImpl::catch_panic_pad_function( parent, || { @@ -811,15 +807,7 @@ impl PadSink { "Panic in PadSink activate" )) }, - move |imp| { - let this_ref = PadSinkRef::new(inner_arc); - let element = imp.instance(); - handler.sink_activate( - &this_ref, - imp, - element.dynamic_cast_ref::().unwrap(), - ) - }, + move |imp| handler.sink_activate(&PadSinkRef::new(inner_arc), imp), ) }); @@ -840,16 +828,8 @@ impl PadSink { }, move |imp| { let this_ref = PadSinkRef::new(inner_arc); - let element = imp.instance(); this_ref.activate_mode_hook(mode, active)?; - - handler.sink_activatemode( - &this_ref, - imp, - element.dynamic_cast_ref::().unwrap(), - mode, - active, - ) + handler.sink_activatemode(&this_ref, imp, mode, active) }, ) }); @@ -864,32 +844,19 @@ impl PadSink { parent, || Err(FlowError::Error), move |imp| { - let element = imp.instance(); + let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); + let elem = imp.instance().clone(); + if let Some((ctx, task_id)) = Context::current_task() { - let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); - let handler = handler.clone(); - let element = - element.clone().dynamic_cast::().unwrap(); let delayed_fut = async move { - let imp = ::from_instance( - element.unsafe_cast_ref(), - ); - let this_ref = - this_weak.upgrade().ok_or(gst::FlowError::Flushing)?; - handler.sink_chain(&this_ref, imp, &element, buffer).await + H::sink_chain(handler, this_weak, elem, buffer).await }; let _ = ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop))); Ok(gst::FlowSuccess::Ok) } else { - let this_ref = PadSinkRef::new(inner_arc); - let chain_fut = handler.sink_chain( - &this_ref, - imp, - element.dynamic_cast_ref::().unwrap(), - buffer, - ); + let chain_fut = H::sink_chain(handler, this_weak, elem, buffer); executor::block_on(chain_fut) } }, @@ -906,34 +873,20 @@ impl PadSink { parent, || Err(FlowError::Error), move |imp| { - let element = imp.instance(); + let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); + let elem = imp.instance().clone(); + if let Some((ctx, task_id)) = Context::current_task() { - let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); - let handler = handler.clone(); - let element = - element.clone().dynamic_cast::().unwrap(); let delayed_fut = async move { - let imp = ::from_instance( - element.unsafe_cast_ref(), - ); - let this_ref = - this_weak.upgrade().ok_or(gst::FlowError::Flushing)?; - handler - .sink_chain_list(&this_ref, imp, &element, list) - .await + H::sink_chain_list(handler, this_weak, elem, list).await }; let _ = ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop))); Ok(gst::FlowSuccess::Ok) } else { - let this_ref = PadSinkRef::new(inner_arc); - let chain_list_fut = handler.sink_chain_list( - &this_ref, - imp, - element.dynamic_cast_ref::().unwrap(), - list, - ); + let chain_list_fut = + H::sink_chain_list(handler, this_weak, elem, list); executor::block_on(chain_list_fut) } }, @@ -952,26 +905,16 @@ impl PadSink { parent, || Err(FlowError::Error), move |imp| { - let element = imp.instance(); if event.is_serialized() { - if let Some((ctx, task_id)) = Context::current_task() { - let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); - let handler = handler.clone(); - let element = - element.clone().dynamic_cast::().unwrap(); - let delayed_fut = async move { - let imp = - ::from_instance( - element.unsafe_cast_ref(), - ); - let this_ref = - this_weak.upgrade().ok_or(gst::FlowError::Flushing)?; + let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); + let elem = imp.instance().clone(); - handler - .sink_event_full_serialized( - &this_ref, imp, &element, event, - ) - .await + if let Some((ctx, task_id)) = Context::current_task() { + let delayed_fut = async move { + H::sink_event_full_serialized( + handler, this_weak, elem, event, + ) + .await }; let _ = ctx.add_sub_task( task_id, @@ -980,23 +923,13 @@ impl PadSink { Ok(gst::FlowSuccess::Ok) } else { - let this_ref = PadSinkRef::new(inner_arc); - let event_fut = handler.sink_event_full_serialized( - &this_ref, - imp, - element.dynamic_cast_ref::().unwrap(), - event, + let event_fut = H::sink_event_full_serialized( + handler, this_weak, elem, event, ); executor::block_on(event_fut) } } else { - let this_ref = PadSinkRef::new(inner_arc); - handler.sink_event_full( - &this_ref, - imp, - element.dynamic_cast_ref::().unwrap(), - event, - ) + handler.sink_event_full(&PadSinkRef::new(inner_arc), imp, event) } }, ) @@ -1011,12 +944,10 @@ impl PadSink { parent, || false, move |imp| { - let this_ref = PadSinkRef::new(inner_arc); - let element = imp.instance(); if !query.is_serialized() { - handler.sink_query(&this_ref, imp, element.dynamic_cast_ref::().unwrap(), query) + handler.sink_query(&PadSinkRef::new(inner_arc), imp, query) } else { - gst::fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported"); + gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported"); false } }, diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index 41cdaced..f616407d 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -96,28 +96,13 @@ struct TcpClientSrcPadHandler; impl PadSrcHandler for TcpClientSrcPadHandler { type ElementImpl = TcpClientSrc; - fn src_event( - &self, - pad: &PadSrcRef, - tcpclientsrc: &TcpClientSrc, - _element: &gst::Element, - event: gst::Event, - ) -> bool { - use gst::EventView; - + fn src_event(&self, pad: &PadSrcRef, imp: &TcpClientSrc, event: gst::Event) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + use gst::EventView; let ret = match event.view() { - EventView::FlushStart(..) => tcpclientsrc - .task - .flush_start() - .await_maybe_on_context() - .is_ok(), - EventView::FlushStop(..) => tcpclientsrc - .task - .flush_stop() - .await_maybe_on_context() - .is_ok(), + EventView::FlushStart(..) => imp.task.flush_start().await_maybe_on_context().is_ok(), + EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(), EventView::Reconfigure(..) => true, EventView::Latency(..) => true, _ => false, @@ -132,16 +117,10 @@ impl PadSrcHandler for TcpClientSrcPadHandler { ret } - fn src_query( - &self, - pad: &PadSrcRef, - tcpclientsrc: &TcpClientSrc, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryViewMut; - + fn src_query(&self, pad: &PadSrcRef, imp: &TcpClientSrc, query: &mut gst::QueryRef) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); + + use gst::QueryViewMut; let ret = match query.view_mut() { QueryViewMut::Latency(q) => { q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE); @@ -153,8 +132,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler { true } QueryViewMut::Caps(q) => { - let caps = if let Some(caps) = tcpclientsrc.configured_caps.lock().unwrap().as_ref() - { + let caps = if let Some(caps) = imp.configured_caps.lock().unwrap().as_ref() { q.filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) diff --git a/generic/threadshare/src/udpsink/imp.rs b/generic/threadshare/src/udpsink/imp.rs index b99e432c..efb0da6e 100644 --- a/generic/threadshare/src/udpsink/imp.rs +++ b/generic/threadshare/src/udpsink/imp.rs @@ -30,7 +30,7 @@ use gst::{element_error, error_msg}; use once_cell::sync::Lazy; use crate::runtime::prelude::*; -use crate::runtime::{self, Async, Context, PadSink, PadSinkRef, Task}; +use crate::runtime::{self, Async, Context, PadSink, PadSinkRef, PadSinkWeak, Task}; use crate::socket::{wrap_socket, GioSocketWrapper}; use std::collections::BTreeSet; @@ -133,18 +133,15 @@ impl PadSinkHandler for UdpSinkPadHandler { type ElementImpl = UdpSink; fn sink_chain( - &self, - _pad: &PadSinkRef, - udpsink: &UdpSink, - element: &gst::Element, + self, + _pad: PadSinkWeak, + elem: super::UdpSink, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let sender = udpsink.clone_item_sender(); - let element = element.clone().downcast::().unwrap(); - + let sender = elem.imp().clone_item_sender(); async move { if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); + gst::debug!(CAT, obj: &elem, "Flushing"); return Err(gst::FlowError::Flushing); } @@ -154,19 +151,16 @@ impl PadSinkHandler for UdpSinkPadHandler { } fn sink_chain_list( - &self, - _pad: &PadSinkRef, - udpsink: &UdpSink, - element: &gst::Element, + self, + _pad: PadSinkWeak, + elem: super::UdpSink, list: gst::BufferList, ) -> BoxFuture<'static, Result> { - let sender = udpsink.clone_item_sender(); - let element = element.clone().downcast::().unwrap(); - + let sender = elem.imp().clone_item_sender(); async move { for buffer in list.iter_owned() { if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); + gst::debug!(CAT, obj: &elem, "Flushing"); return Err(gst::FlowError::Flushing); } } @@ -177,21 +171,18 @@ impl PadSinkHandler for UdpSinkPadHandler { } fn sink_event_serialized( - &self, - _pad: &PadSinkRef, - udpsink: &UdpSink, - element: &gst::Element, + self, + _pad: PadSinkWeak, + elem: super::UdpSink, event: gst::Event, ) -> BoxFuture<'static, bool> { - let sender = udpsink.clone_item_sender(); - let element = element.clone().downcast::().unwrap(); - + let sender = elem.imp().clone_item_sender(); async move { if let EventView::FlushStop(_) = event.view() { - let udpsink = element.imp(); - return udpsink.task.flush_stop().await_maybe_on_context().is_ok(); + let imp = elem.imp(); + return imp.task.flush_stop().await_maybe_on_context().is_ok(); } else if sender.send_async(TaskItem::Event(event)).await.is_err() { - gst::debug!(CAT, obj: &element, "Flushing"); + gst::debug!(CAT, obj: &elem, "Flushing"); } true @@ -199,15 +190,9 @@ impl PadSinkHandler for UdpSinkPadHandler { .boxed() } - fn sink_event( - &self, - _pad: &PadSinkRef, - udpsink: &UdpSink, - _element: &gst::Element, - event: gst::Event, - ) -> bool { + fn sink_event(&self, _pad: &PadSinkRef, imp: &UdpSink, event: gst::Event) -> bool { if let EventView::FlushStart(..) = event.view() { - return udpsink.task.flush_start().await_maybe_on_context().is_ok(); + return imp.task.flush_start().await_maybe_on_context().is_ok(); } true diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index 5c7b79bf..f6105d0b 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -113,20 +113,13 @@ struct UdpSrcPadHandler; impl PadSrcHandler for UdpSrcPadHandler { type ElementImpl = UdpSrc; - fn src_event( - &self, - pad: &PadSrcRef, - udpsrc: &UdpSrc, - _element: &gst::Element, - event: gst::Event, - ) -> bool { - use gst::EventView; - + fn src_event(&self, pad: &PadSrcRef, imp: &UdpSrc, event: gst::Event) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); + use gst::EventView; let ret = match event.view() { - EventView::FlushStart(..) => udpsrc.task.flush_start().await_maybe_on_context().is_ok(), - EventView::FlushStop(..) => udpsrc.task.flush_stop().await_maybe_on_context().is_ok(), + EventView::FlushStart(..) => imp.task.flush_start().await_maybe_on_context().is_ok(), + EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(), EventView::Reconfigure(..) => true, EventView::Latency(..) => true, _ => false, @@ -141,17 +134,10 @@ impl PadSrcHandler for UdpSrcPadHandler { ret } - fn src_query( - &self, - pad: &PadSrcRef, - udpsrc: &UdpSrc, - _element: &gst::Element, - query: &mut gst::QueryRef, - ) -> bool { - use gst::QueryViewMut; - + fn src_query(&self, pad: &PadSrcRef, imp: &UdpSrc, query: &mut gst::QueryRef) -> bool { gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); + use gst::QueryViewMut; let ret = match query.view_mut() { QueryViewMut::Latency(q) => { q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE); @@ -163,7 +149,7 @@ impl PadSrcHandler for UdpSrcPadHandler { true } QueryViewMut::Caps(q) => { - let caps = if let Some(caps) = udpsrc.configured_caps.lock().unwrap().as_ref() { + let caps = if let Some(caps) = imp.configured_caps.lock().unwrap().as_ref() { q.filter() .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .unwrap_or_else(|| caps.clone()) diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs index 312d2820..e1ca62f3 100644 --- a/generic/threadshare/tests/pad.rs +++ b/generic/threadshare/tests/pad.rs @@ -36,7 +36,9 @@ use std::sync::Mutex; use std::time::Duration; use gstthreadshare::runtime::prelude::*; -use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task, TaskState}; +use gstthreadshare::runtime::{ + Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task, TaskState, +}; const DEFAULT_CONTEXT: &str = ""; const THROTTLING_DURATION: Duration = Duration::from_millis(2); @@ -87,27 +89,15 @@ mod imp_src { impl PadSrcHandler for PadSrcTestHandler { type ElementImpl = ElementSrcTest; - fn src_event( - &self, - pad: &PadSrcRef, - elem_src_test: &ElementSrcTest, - _element: &gst::Element, - event: gst::Event, - ) -> bool { + fn src_event(&self, pad: &PadSrcRef, imp: &ElementSrcTest, event: gst::Event) -> bool { gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); let ret = match event.view() { - EventView::FlushStart(..) => elem_src_test - .task - .flush_start() - .await_maybe_on_context() - .is_ok(), + EventView::FlushStart(..) => { + imp.task.flush_start().await_maybe_on_context().is_ok() + } EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true, - EventView::FlushStop(..) => elem_src_test - .task - .flush_stop() - .await_maybe_on_context() - .is_ok(), + EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(), _ => false, }; @@ -337,6 +327,7 @@ mod imp_src { let obj = self.instance(); obj.add_pad(self.src_pad.gst_pad()).unwrap(); + obj.set_element_flags(gst::ElementFlags::SOURCE); } } @@ -449,53 +440,37 @@ mod imp_sink { type ElementImpl = ElementSinkTest; fn sink_chain( - &self, - _pad: &PadSinkRef, - _elem_sink_test: &ElementSinkTest, - element: &gst::Element, + self, + _pad: PadSinkWeak, + elem: super::ElementSinkTest, buffer: gst::Buffer, ) -> BoxFuture<'static, Result> { - let element = element - .clone() - .downcast::() - .unwrap(); async move { - let elem_sink_test = element.imp(); - elem_sink_test.forward_item(Item::Buffer(buffer)).await + let imp = elem.imp(); + imp.forward_item(Item::Buffer(buffer)).await } .boxed() } fn sink_chain_list( - &self, - _pad: &PadSinkRef, - _elem_sink_test: &ElementSinkTest, - element: &gst::Element, + self, + _pad: PadSinkWeak, + elem: super::ElementSinkTest, list: gst::BufferList, ) -> BoxFuture<'static, Result> { - let element = element - .clone() - .downcast::() - .unwrap(); async move { - let elem_sink_test = element.imp(); - elem_sink_test.forward_item(Item::BufferList(list)).await + let imp = elem.imp(); + imp.forward_item(Item::BufferList(list)).await } .boxed() } - fn sink_event( - &self, - pad: &PadSinkRef, - elem_sink_test: &ElementSinkTest, - _element: &gst::Element, - event: gst::Event, - ) -> bool { + fn sink_event(&self, pad: &PadSinkRef, imp: &ElementSinkTest, event: gst::Event) -> bool { gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); match event.view() { EventView::FlushStart(..) => { - elem_sink_test.stop(); + imp.stop(); true } _ => false, @@ -503,29 +478,21 @@ mod imp_sink { } fn sink_event_serialized( - &self, - pad: &PadSinkRef, - _elem_sink_test: &ElementSinkTest, - element: &gst::Element, + self, + pad: PadSinkWeak, + elem: super::ElementSinkTest, event: gst::Event, ) -> BoxFuture<'static, bool> { - gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); - - let element = element - .clone() - .downcast::() - .unwrap(); async move { - let elem_sink_test = element.imp(); + let pad = pad.upgrade().expect("PadSink no longer exists"); + gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); + let imp = elem.imp(); if let EventView::FlushStop(..) = event.view() { - elem_sink_test.start(); + imp.start(); } - elem_sink_test - .forward_item(Item::Event(event)) - .await - .is_ok() + imp.forward_item(Item::Event(event)).await.is_ok() } .boxed() } @@ -652,6 +619,7 @@ mod imp_sink { let obj = self.instance(); obj.add_pad(self.sink_pad.gst_pad()).unwrap(); + obj.set_element_flags(gst::ElementFlags::SINK); } }