ts: better use of imp & elem args in Pad{Sink,Src}Handlers

This is a follow-up to commit 7ee4afac.

This commit cleans up the `Pad{Sink,Src}Handler` by

- Keeping arguments which are strictly necessary.
- Passing arguments by value for the trait functions which return
  a `Future`. The arguments which were previously passed by reference
  were `clone`d internally and then `clone`d again in most
  implementations.

There are unfortunate differences in trait function signatures
between those which return a `Future` and the sync functions. This
is due to the requirement for the arguments to be moved to the
resulting `Future`, whereas sync functions can rely on references.
One particular notable difference is the use of the `imp` in sync
functions instead of the `elem` in functions returning a `Future`.
Because the `imp` is not guaranteed to implement `Clone`, we can't
move it to the resulting `Future`, so the `elem` is used.
This commit is contained in:
François Laignel 2022-10-12 12:35:20 +02:00
parent bc5b51687d
commit 2bffdec691
11 changed files with 370 additions and 705 deletions

View file

@ -19,7 +19,7 @@ use gst::EventView;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use gstthreadshare::runtime::prelude::*; use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, Task}; use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, Task};
use std::sync::Mutex; use std::sync::Mutex;
use std::task::Poll; use std::task::Poll;
@ -76,18 +76,15 @@ impl PadSinkHandler for TestSinkPadHandler {
type ElementImpl = TestSink; type ElementImpl = TestSink;
fn sink_chain( fn sink_chain(
&self, self,
_pad: &PadSinkRef, _pad: PadSinkWeak,
test_sink: &TestSink, elem: super::TestSink,
element: &gst::Element,
buffer: gst::Buffer, buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let sender = test_sink.clone_item_sender(); let sender = elem.imp().clone_item_sender();
let element = element.clone().downcast::<super::TestSink>().unwrap();
async move { async move {
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() { if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing"); gst::debug!(CAT, obj: &elem, "Flushing");
return Err(gst::FlowError::Flushing); return Err(gst::FlowError::Flushing);
} }
@ -97,19 +94,16 @@ impl PadSinkHandler for TestSinkPadHandler {
} }
fn sink_chain_list( fn sink_chain_list(
&self, self,
_pad: &PadSinkRef, _pad: PadSinkWeak,
test_sink: &TestSink, elem: super::TestSink,
element: &gst::Element,
list: gst::BufferList, list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let sender = test_sink.clone_item_sender(); let sender = elem.imp().clone_item_sender();
let element = element.clone().downcast::<super::TestSink>().unwrap();
async move { async move {
for buffer in list.iter_owned() { for buffer in list.iter_owned() {
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() { if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing"); gst::debug!(CAT, obj: &elem, "Flushing");
return Err(gst::FlowError::Flushing); return Err(gst::FlowError::Flushing);
} }
} }
@ -120,21 +114,18 @@ impl PadSinkHandler for TestSinkPadHandler {
} }
fn sink_event_serialized( fn sink_event_serialized(
&self, self,
_pad: &PadSinkRef, _pad: PadSinkWeak,
test_sink: &TestSink, elem: super::TestSink,
element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> BoxFuture<'static, bool> { ) -> BoxFuture<'static, bool> {
let sender = test_sink.clone_item_sender(); let sender = elem.imp().clone_item_sender();
let element = element.clone().downcast::<super::TestSink>().unwrap();
async move { async move {
if let EventView::FlushStop(_) = event.view() { if let EventView::FlushStop(_) = event.view() {
let test_sink = element.imp(); let imp = elem.imp();
return test_sink.task.flush_stop().await_maybe_on_context().is_ok(); return imp.task.flush_stop().await_maybe_on_context().is_ok();
} else if sender.send_async(StreamItem::Event(event)).await.is_err() { } else if sender.send_async(StreamItem::Event(event)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing"); gst::debug!(CAT, obj: &elem, "Flushing");
} }
true true
@ -142,19 +133,9 @@ impl PadSinkHandler for TestSinkPadHandler {
.boxed() .boxed()
} }
fn sink_event( fn sink_event(&self, _pad: &PadSinkRef, imp: &TestSink, event: gst::Event) -> bool {
&self,
_pad: &PadSinkRef,
test_sink: &TestSink,
_element: &gst::Element,
event: gst::Event,
) -> bool {
if let EventView::FlushStart(..) = event.view() { if let EventView::FlushStart(..) = event.view() {
return test_sink return imp.task.flush_start().await_maybe_on_context().is_ok();
.task
.flush_start()
.await_maybe_on_context()
.is_ok();
} }
true true

View file

@ -82,20 +82,13 @@ struct AppSrcPadHandler;
impl PadSrcHandler for AppSrcPadHandler { impl PadSrcHandler for AppSrcPadHandler {
type ElementImpl = AppSrc; type ElementImpl = AppSrc;
fn src_event( fn src_event(&self, pad: &PadSrcRef, imp: &AppSrc, event: gst::Event) -> bool {
&self,
pad: &PadSrcRef,
appsrc: &AppSrc,
_element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
use gst::EventView;
let ret = match event.view() { let ret = match event.view() {
EventView::FlushStart(..) => appsrc.task.flush_start().await_maybe_on_context().is_ok(), EventView::FlushStart(..) => imp.task.flush_start().await_maybe_on_context().is_ok(),
EventView::FlushStop(..) => appsrc.task.flush_stop().await_maybe_on_context().is_ok(), EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(),
EventView::Reconfigure(..) => true, EventView::Reconfigure(..) => true,
EventView::Latency(..) => true, EventView::Latency(..) => true,
_ => false, _ => false,
@ -110,16 +103,10 @@ impl PadSrcHandler for AppSrcPadHandler {
ret ret
} }
fn src_query( fn src_query(&self, pad: &PadSrcRef, imp: &AppSrc, query: &mut gst::QueryRef) -> bool {
&self,
pad: &PadSrcRef,
appsrc: &AppSrc,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryViewMut;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() { let ret = match query.view_mut() {
QueryViewMut::Latency(q) => { QueryViewMut::Latency(q) => {
q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE); q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);
@ -131,7 +118,7 @@ impl PadSrcHandler for AppSrcPadHandler {
true true
} }
QueryViewMut::Caps(q) => { QueryViewMut::Caps(q) => {
let caps = if let Some(caps) = appsrc.configured_caps.lock().unwrap().as_ref() { let caps = if let Some(caps) = imp.configured_caps.lock().unwrap().as_ref() {
q.filter() q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone()) .unwrap_or_else(|| caps.clone())
@ -328,8 +315,9 @@ impl AppSrc {
let do_timestamp = self.settings.lock().unwrap().do_timestamp; let do_timestamp = self.settings.lock().unwrap().do_timestamp;
if do_timestamp { if do_timestamp {
if let Some(clock) = self.instance().clock() { let elem = self.instance();
let base_time = self.instance().base_time(); if let Some(clock) = elem.clock() {
let base_time = elem.base_time();
let now = clock.time(); let now = clock.time();
let buffer = buffer.make_mut(); let buffer = buffer.make_mut();
@ -499,11 +487,10 @@ impl ObjectImpl for AppSrc {
.return_type::<bool>() .return_type::<bool>()
.action() .action()
.class_handler(|_, args| { .class_handler(|_, args| {
let element = args[0].get::<super::AppSrc>().expect("signal arg"); let elem = args[0].get::<super::AppSrc>().expect("signal arg");
let buffer = args[1].get::<gst::Buffer>().expect("signal arg"); let buffer = args[1].get::<gst::Buffer>().expect("signal arg");
let appsrc = element.imp();
Some(appsrc.push_buffer(buffer).to_value()) Some(elem.imp().push_buffer(buffer).to_value())
}) })
.build(), .build(),
/** /**
@ -516,10 +503,9 @@ impl ObjectImpl for AppSrc {
.return_type::<bool>() .return_type::<bool>()
.action() .action()
.class_handler(|_, args| { .class_handler(|_, args| {
let element = args[0].get::<super::AppSrc>().expect("signal arg"); let elem = args[0].get::<super::AppSrc>().expect("signal arg");
let appsrc = element.imp();
Some(appsrc.end_of_stream().to_value()) Some(elem.imp().end_of_stream().to_value())
}) })
.build(), .build(),
] ]

View file

@ -33,7 +33,7 @@ use std::time::Duration;
use std::u32; use std::u32;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{self, PadSink, PadSinkRef, PadSrc, PadSrcRef}; use crate::runtime::{self, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef};
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO; const DEFAULT_CONTEXT_WAIT: Duration = Duration::ZERO;
@ -75,14 +75,10 @@ struct InputSelectorPadSinkHandler(Arc<Mutex<InputSelectorPadSinkHandlerInner>>)
impl InputSelectorPadSinkHandler { impl InputSelectorPadSinkHandler {
/* Wait until specified time */ /* Wait until specified time */
async fn sync( async fn sync(&self, elem: &super::InputSelector, running_time: Option<gst::ClockTime>) {
&self, let now = elem.current_running_time();
element: &super::InputSelector,
running_time: impl Into<Option<gst::ClockTime>>,
) {
let now = element.current_running_time();
match running_time.into().opt_checked_sub(now) { match running_time.opt_checked_sub(now) {
Ok(Some(delay)) => { Ok(Some(delay)) => {
runtime::timer::delay_for(delay.into()).await; runtime::timer::delay_for(delay.into()).await;
} }
@ -92,11 +88,11 @@ impl InputSelectorPadSinkHandler {
async fn handle_item( async fn handle_item(
&self, &self,
element: &super::InputSelector,
pad: &PadSinkRef<'_>, pad: &PadSinkRef<'_>,
elem: &super::InputSelector,
mut buffer: gst::Buffer, mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let inputselector = element.imp(); let inputselector = elem.imp();
let (stickies, is_active, sync_future, switched_pad) = { let (stickies, is_active, sync_future, switched_pad) = {
let mut state = inputselector.state.lock().unwrap(); let mut state = inputselector.state.lock().unwrap();
@ -108,7 +104,7 @@ impl InputSelectorPadSinkHandler {
if let Some(segment) = &inner.segment { if let Some(segment) = &inner.segment {
if let Some(segment) = segment.downcast_ref::<gst::format::Time>() { if let Some(segment) = segment.downcast_ref::<gst::format::Time>() {
let rtime = segment.to_running_time(buffer.pts()); let rtime = segment.to_running_time(buffer.pts());
let (sync_fut, abort_handle) = abortable(self.sync(element, rtime)); let (sync_fut, abort_handle) = abortable(self.sync(elem, rtime));
inner.abort_handle = Some(abort_handle); inner.abort_handle = Some(abort_handle);
sync_future = Some(sync_fut.map_err(|_| gst::FlowError::Flushing)); sync_future = Some(sync_fut.map_err(|_| gst::FlowError::Flushing));
} }
@ -162,38 +158,30 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
type ElementImpl = InputSelector; type ElementImpl = InputSelector;
fn sink_chain( fn sink_chain(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_inputselector: &InputSelector, elem: super::InputSelector,
element: &gst::Element,
buffer: gst::Buffer, buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let this = self.clone();
let element = element.clone().downcast::<super::InputSelector>().unwrap();
let pad_weak = pad.downgrade();
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad.upgrade().expect("PadSink no longer exists");
this.handle_item(&element, &pad, buffer).await self.handle_item(&pad, &elem, buffer).await
} }
.boxed() .boxed()
} }
fn sink_chain_list( fn sink_chain_list(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_inputselector: &InputSelector, elem: super::InputSelector,
element: &gst::Element,
list: gst::BufferList, list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let this = self.clone();
let element = element.clone().downcast::<super::InputSelector>().unwrap();
let pad_weak = pad.downgrade();
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list); gst::log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list);
// TODO: Ideally we would keep the list intact and forward it in one go // TODO: Ideally we would keep the list intact and forward it in one go
for buffer in list.iter_owned() { for buffer in list.iter_owned() {
this.handle_item(&element, &pad, buffer).await?; self.handle_item(&pad, &elem, buffer).await?;
} }
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
@ -202,16 +190,13 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
} }
fn sink_event_serialized( fn sink_event_serialized(
&self, self,
_pad: &PadSinkRef, _pad: PadSinkWeak,
_inputselector: &InputSelector, _elem: super::InputSelector,
_element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> BoxFuture<'static, bool> { ) -> BoxFuture<'static, bool> {
let this = self.clone();
async move { async move {
let mut inner = this.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
// Remember the segment for later use // Remember the segment for later use
if let gst::EventView::Segment(e) = event.view() { if let gst::EventView::Segment(e) = event.view() {
@ -234,17 +219,11 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
.boxed() .boxed()
} }
fn sink_event( fn sink_event(&self, _pad: &PadSinkRef, imp: &InputSelector, event: gst::Event) -> bool {
&self,
_pad: &PadSinkRef,
inputselector: &InputSelector,
_element: &gst::Element,
event: gst::Event,
) -> bool {
/* Drop all events for now */ /* Drop all events for now */
if let gst::EventView::FlushStart(..) = event.view() { if let gst::EventView::FlushStart(..) = event.view() {
/* Unblock downstream */ /* Unblock downstream */
inputselector.src_pad.gst_pad().push_event(event.clone()); imp.src_pad.gst_pad().push_event(event.clone());
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
@ -255,13 +234,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
true true
} }
fn sink_query( fn sink_query(&self, pad: &PadSinkRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
&self,
pad: &PadSinkRef,
inputselector: &InputSelector,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query); gst::log!(CAT, obj: pad.gst_pad(), "Handling query {:?}", query);
if query.is_serialized() { if query.is_serialized() {
@ -270,7 +243,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
false false
} else { } else {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query); gst::log!(CAT, obj: pad.gst_pad(), "Forwarding query {:?}", query);
inputselector.src_pad.gst_pad().peer_query(query) imp.src_pad.gst_pad().peer_query(query)
} }
} }
} }
@ -281,24 +254,17 @@ struct InputSelectorPadSrcHandler;
impl PadSrcHandler for InputSelectorPadSrcHandler { impl PadSrcHandler for InputSelectorPadSrcHandler {
type ElementImpl = InputSelector; type ElementImpl = InputSelector;
fn src_query( fn src_query(&self, pad: &PadSrcRef, imp: &InputSelector, query: &mut gst::QueryRef) -> bool {
&self,
pad: &PadSrcRef,
inputselector: &InputSelector,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryViewMut;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
use gst::QueryViewMut;
match query.view_mut() { match query.view_mut() {
QueryViewMut::Latency(q) => { QueryViewMut::Latency(q) => {
let mut ret = true; let mut ret = true;
let mut min_latency = gst::ClockTime::ZERO; let mut min_latency = gst::ClockTime::ZERO;
let mut max_latency = gst::ClockTime::NONE; let mut max_latency = gst::ClockTime::NONE;
let pads = { let pads = {
let pads = inputselector.pads.lock().unwrap(); let pads = imp.pads.lock().unwrap();
pads.sink_pads pads.sink_pads
.iter() .iter()
.map(|p| p.0.clone()) .map(|p| p.0.clone())
@ -325,7 +291,7 @@ impl PadSrcHandler for InputSelectorPadSrcHandler {
} }
_ => { _ => {
let sinkpad = { let sinkpad = {
let state = inputselector.state.lock().unwrap(); let state = imp.state.lock().unwrap();
state.active_sinkpad.clone() state.active_sinkpad.clone()
}; };

View file

@ -36,7 +36,7 @@ use std::sync::Mutex as StdMutex;
use std::time::Duration; use std::time::Duration;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task}; use crate::runtime::{self, Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task};
use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx}; use super::jitterbuffer::{RTPJitterBuffer, RTPJitterBufferItem, RTPPacketRateCtx};
@ -143,14 +143,10 @@ impl SinkHandler {
} }
// For resetting if seqnum discontinuities // For resetting if seqnum discontinuities
fn reset( fn reset(&self, inner: &mut SinkHandlerInner, jb: &JitterBuffer) -> BTreeSet<GapPacket> {
&self, gst::info!(CAT, imp: jb, "Resetting");
inner: &mut SinkHandlerInner,
state: &mut State,
element: &super::JitterBuffer,
) -> BTreeSet<GapPacket> {
gst::info!(CAT, obj: element, "Resetting");
let mut state = jb.state.lock().unwrap();
state.jbuf.flush(); state.jbuf.flush();
state.jbuf.reset_skew(); state.jbuf.reset_skew();
state.discont = true; state.discont = true;
@ -174,23 +170,23 @@ impl SinkHandler {
&self, &self,
inner: &mut SinkHandlerInner, inner: &mut SinkHandlerInner,
state: &mut State, state: &mut State,
element: &super::JitterBuffer, jb: &JitterBuffer,
caps: &gst::Caps, caps: &gst::Caps,
pt: u8, pt: u8,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let s = caps.structure(0).ok_or(gst::FlowError::Error)?; let s = caps.structure(0).ok_or(gst::FlowError::Error)?;
gst::debug!(CAT, obj: element, "Parsing {:?}", caps); gst::debug!(CAT, imp: jb, "Parsing {:?}", caps);
let payload = s.get::<i32>("payload").map_err(|err| { let payload = s.get::<i32>("payload").map_err(|err| {
gst::debug!(CAT, obj: element, "Caps 'payload': {}", err); gst::debug!(CAT, imp: jb, "Caps 'payload': {}", err);
gst::FlowError::Error gst::FlowError::Error
})?; })?;
if pt != 0 && payload as u8 != pt { if pt != 0 && payload as u8 != pt {
gst::debug!( gst::debug!(
CAT, CAT,
obj: element, imp: jb,
"Caps 'payload' ({}) doesn't match payload type ({})", "Caps 'payload' ({}) doesn't match payload type ({})",
payload, payload,
pt pt
@ -200,12 +196,12 @@ impl SinkHandler {
inner.last_pt = Some(pt); inner.last_pt = Some(pt);
let clock_rate = s.get::<i32>("clock-rate").map_err(|err| { let clock_rate = s.get::<i32>("clock-rate").map_err(|err| {
gst::debug!(CAT, obj: element, "Caps 'clock-rate': {}", err); gst::debug!(CAT, imp: jb, "Caps 'clock-rate': {}", err);
gst::FlowError::Error gst::FlowError::Error
})?; })?;
if clock_rate <= 0 { if clock_rate <= 0 {
gst::debug!(CAT, obj: element, "Caps 'clock-rate' <= 0"); gst::debug!(CAT, imp: jb, "Caps 'clock-rate' <= 0");
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
} }
state.clock_rate = Some(clock_rate as u32); state.clock_rate = Some(clock_rate as u32);
@ -253,7 +249,7 @@ impl SinkHandler {
fn handle_big_gap_buffer( fn handle_big_gap_buffer(
&self, &self,
inner: &mut SinkHandlerInner, inner: &mut SinkHandlerInner,
element: &super::JitterBuffer, jb: &JitterBuffer,
buffer: gst::Buffer, buffer: gst::Buffer,
pt: u8, pt: u8,
) -> bool { ) -> bool {
@ -262,7 +258,7 @@ impl SinkHandler {
gst::debug!( gst::debug!(
CAT, CAT,
obj: element, imp: jb,
"Handling big gap, gap packets length: {}", "Handling big gap, gap packets length: {}",
gap_packets_length gap_packets_length
); );
@ -276,7 +272,7 @@ impl SinkHandler {
for gap_packet in inner.gap_packets.iter() { for gap_packet in inner.gap_packets.iter() {
gst::log!( gst::log!(
CAT, CAT,
obj: element, imp: jb,
"Looking at gap packet with seq {}", "Looking at gap packet with seq {}",
gap_packet.seq, gap_packet.seq,
); );
@ -296,7 +292,7 @@ impl SinkHandler {
} }
} }
gst::debug!(CAT, obj: element, "all consecutive: {}", all_consecutive); gst::debug!(CAT, imp: jb, "all consecutive: {}", all_consecutive);
if all_consecutive && gap_packets_length > 3 { if all_consecutive && gap_packets_length > 3 {
reset = true; reset = true;
@ -312,10 +308,9 @@ impl SinkHandler {
&self, &self,
inner: &mut SinkHandlerInner, inner: &mut SinkHandlerInner,
pad: &gst::Pad, pad: &gst::Pad,
element: &super::JitterBuffer, jb: &JitterBuffer,
buffer: gst::Buffer, buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let jb = element.imp();
let mut state = jb.state.lock().unwrap(); let mut state = jb.state.lock().unwrap();
let (max_misorder_time, max_dropout_time) = { let (max_misorder_time, max_dropout_time) = {
@ -339,13 +334,15 @@ impl SinkHandler {
gst::log!( gst::log!(
CAT, CAT,
obj: element, imp: jb,
"Storing buffer, seq: {}, rtptime: {}, pt: {}", "Storing buffer, seq: {}, rtptime: {}, pt: {}",
seq, seq,
rtptime, rtptime,
pt pt
); );
let element = jb.instance();
if dts.is_none() { if dts.is_none() {
dts = pts; dts = pts;
} else if pts.is_none() { } else if pts.is_none() {
@ -374,7 +371,7 @@ impl SinkHandler {
if let Some(caps) = pad.current_caps() { if let Some(caps) = pad.current_caps() {
/* Ignore errors at this point, as we want to emit request-pt-map */ /* Ignore errors at this point, as we want to emit request-pt-map */
let _ = self.parse_caps(inner, &mut state, element, &caps, pt); let _ = self.parse_caps(inner, &mut state, jb, &caps, pt);
} }
} }
@ -388,7 +385,7 @@ impl SinkHandler {
gst::FlowError::Error gst::FlowError::Error
})?; })?;
let mut state = jb.state.lock().unwrap(); let mut state = jb.state.lock().unwrap();
self.parse_caps(inner, &mut state, element, &caps, pt)?; self.parse_caps(inner, &mut state, jb, &caps, pt)?;
state state
} else { } else {
state state
@ -407,7 +404,7 @@ impl SinkHandler {
if pts.is_none() { if pts.is_none() {
gst::debug!( gst::debug!(
CAT, CAT,
obj: element, imp: jb,
"cannot calculate a valid pts for #{}, discard", "cannot calculate a valid pts for #{}, discard",
seq seq
); );
@ -420,7 +417,7 @@ impl SinkHandler {
self.calculate_packet_spacing(inner, &mut state, rtptime, pts); self.calculate_packet_spacing(inner, &mut state, rtptime, pts);
} else { } else {
if (gap != -1 && gap < -(max_misorder as i32)) || (gap >= max_dropout as i32) { if (gap != -1 && gap < -(max_misorder as i32)) || (gap >= max_dropout as i32) {
let reset = self.handle_big_gap_buffer(inner, element, buffer, pt); let reset = self.handle_big_gap_buffer(inner, jb, buffer, pt);
if reset { if reset {
// Handle reset in `enqueue_item` to avoid recursion // Handle reset in `enqueue_item` to avoid recursion
return Err(gst::FlowError::CustomError); return Err(gst::FlowError::CustomError);
@ -440,7 +437,7 @@ impl SinkHandler {
if gap <= 0 { if gap <= 0 {
state.stats.num_late += 1; state.stats.num_late += 1;
gst::debug!(CAT, obj: element, "Dropping late {}", seq); gst::debug!(CAT, imp: jb, "Dropping late {}", seq);
return Ok(gst::FlowSuccess::Ok); return Ok(gst::FlowSuccess::Ok);
} }
} }
@ -492,7 +489,7 @@ impl SinkHandler {
fn enqueue_item( fn enqueue_item(
&self, &self,
pad: &gst::Pad, pad: &gst::Pad,
element: &super::JitterBuffer, jb: &JitterBuffer,
buffer: Option<gst::Buffer>, buffer: Option<gst::Buffer>,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
@ -504,12 +501,10 @@ impl SinkHandler {
// This is to avoid recursion with `store`, `reset` and `enqueue_item` // This is to avoid recursion with `store`, `reset` and `enqueue_item`
while let Some(buf) = buffers.pop_front() { while let Some(buf) = buffers.pop_front() {
if let Err(err) = self.store(&mut inner, pad, element, buf) { if let Err(err) = self.store(&mut inner, pad, jb, buf) {
match err { match err {
gst::FlowError::CustomError => { gst::FlowError::CustomError => {
let jb = element.imp(); for gap_packet in self.reset(&mut inner, jb) {
let mut state = jb.state.lock().unwrap();
for gap_packet in self.reset(&mut inner, &mut state, element) {
buffers.push_back(gap_packet.buffer); buffers.push_back(gap_packet.buffer);
} }
} }
@ -518,7 +513,6 @@ impl SinkHandler {
} }
} }
let jb = element.imp();
let mut state = jb.state.lock().unwrap(); let mut state = jb.state.lock().unwrap();
let (latency, context_wait) = { let (latency, context_wait) = {
@ -529,7 +523,7 @@ impl SinkHandler {
// Reschedule if needed // Reschedule if needed
let (_, next_wakeup) = let (_, next_wakeup) =
jb.src_pad_handler jb.src_pad_handler
.next_wakeup(element, &state, latency, context_wait); .next_wakeup(&jb.instance(), &state, latency, context_wait);
if let Some((next_wakeup, _)) = next_wakeup { if let Some((next_wakeup, _)) = next_wakeup {
if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle { if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle {
if previous_next_wakeup.is_none() if previous_next_wakeup.is_none()
@ -555,32 +549,20 @@ impl PadSinkHandler for SinkHandler {
type ElementImpl = JitterBuffer; type ElementImpl = JitterBuffer;
fn sink_chain( fn sink_chain(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_jb: &JitterBuffer, elem: super::JitterBuffer,
element: &gst::Element,
buffer: gst::Buffer, buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let pad_weak = pad.downgrade();
let element = element.clone().downcast::<super::JitterBuffer>().unwrap();
let this = self.clone();
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad.upgrade().expect("PadSink no longer exists");
gst::debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); gst::debug!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
this.enqueue_item(pad.gst_pad(), &element, Some(buffer)) self.enqueue_item(pad.gst_pad(), elem.imp(), Some(buffer))
} }
.boxed() .boxed()
} }
fn sink_event( fn sink_event(&self, pad: &PadSinkRef, jb: &JitterBuffer, event: gst::Event) -> bool {
&self,
pad: &PadSinkRef,
jb: &JitterBuffer,
element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView; use gst::EventView;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
@ -588,8 +570,8 @@ impl PadSinkHandler for SinkHandler {
if let EventView::FlushStart(..) = event.view() { if let EventView::FlushStart(..) = event.view() {
if let Err(err) = jb.task.flush_start().await_maybe_on_context() { if let Err(err) = jb.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
gst::element_error!( gst::element_imp_error!(
element, jb,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["FlushStart failed {:?}", err] ["FlushStart failed {:?}", err]
@ -603,25 +585,20 @@ impl PadSinkHandler for SinkHandler {
} }
fn sink_event_serialized( fn sink_event_serialized(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_jb: &JitterBuffer, elem: super::JitterBuffer,
element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> BoxFuture<'static, bool> { ) -> BoxFuture<'static, bool> {
use gst::EventView;
let pad_weak = pad.downgrade();
let element = element.clone().downcast::<super::JitterBuffer>().unwrap();
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let jb = element.imp(); let jb = elem.imp();
let mut forward = true; let mut forward = true;
use gst::EventView;
match event.view() { match event.view() {
EventView::Segment(e) => { EventView::Segment(e) => {
let mut state = jb.state.lock().unwrap(); let mut state = jb.state.lock().unwrap();
@ -631,7 +608,7 @@ impl PadSinkHandler for SinkHandler {
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() { if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
gst::element_error!( gst::element_error!(
element, elem,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["FlushStop failed {:?}", err] ["FlushStop failed {:?}", err]
@ -893,13 +870,7 @@ impl SrcHandler {
impl PadSrcHandler for SrcHandler { impl PadSrcHandler for SrcHandler {
type ElementImpl = JitterBuffer; type ElementImpl = JitterBuffer;
fn src_event( fn src_event(&self, pad: &PadSrcRef, jb: &JitterBuffer, event: gst::Event) -> bool {
&self,
pad: &PadSrcRef,
jb: &JitterBuffer,
element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView; use gst::EventView;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
@ -908,8 +879,8 @@ impl PadSrcHandler for SrcHandler {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
if let Err(err) = jb.task.flush_start().await_maybe_on_context() { if let Err(err) = jb.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
gst::element_error!( gst::element_imp_error!(
element, jb,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["FlushStart failed {:?}", err] ["FlushStart failed {:?}", err]
@ -920,8 +891,8 @@ impl PadSrcHandler for SrcHandler {
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() { if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
gst::element_error!( gst::element_imp_error!(
element, jb,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["FlushStop failed {:?}", err] ["FlushStop failed {:?}", err]
@ -936,13 +907,7 @@ impl PadSrcHandler for SrcHandler {
jb.sink_pad.gst_pad().push_event(event) jb.sink_pad.gst_pad().push_event(event)
} }
fn src_query( fn src_query(&self, pad: &PadSrcRef, jb: &JitterBuffer, query: &mut gst::QueryRef) -> bool {
&self,
pad: &PadSrcRef,
jb: &JitterBuffer,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryViewMut; use gst::QueryViewMut;
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);

View file

@ -214,57 +214,40 @@ impl PadSinkHandler for ProxySinkPadHandler {
type ElementImpl = ProxySink; type ElementImpl = ProxySink;
fn sink_chain( fn sink_chain(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_proxysink: &ProxySink, elem: super::ProxySink,
element: &gst::Element,
buffer: gst::Buffer, buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let pad_weak = pad.downgrade();
let element = element.clone().downcast::<super::ProxySink>().unwrap();
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
let proxysink = element.imp(); let imp = elem.imp();
proxysink.enqueue_item(DataQueueItem::Buffer(buffer)).await imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
} }
.boxed() .boxed()
} }
fn sink_chain_list( fn sink_chain_list(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_proxysink: &ProxySink, elem: super::ProxySink,
element: &gst::Element,
list: gst::BufferList, list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let pad_weak = pad.downgrade();
let element = element.clone().downcast::<super::ProxySink>().unwrap();
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", list); gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling {:?}", list);
let proxysink = element.imp(); let imp = elem.imp();
proxysink imp.enqueue_item(DataQueueItem::BufferList(list)).await
.enqueue_item(DataQueueItem::BufferList(list))
.await
} }
.boxed() .boxed()
} }
fn sink_event( fn sink_event(&self, pad: &PadSinkRef, imp: &ProxySink, event: gst::Event) -> bool {
&self,
pad: &PadSinkRef,
proxysink: &ProxySink,
_element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
let src_pad = { let src_pad = {
let proxy_ctx = proxysink.proxy_ctx.lock().unwrap(); let proxy_ctx = imp.proxy_ctx.lock().unwrap();
PROXY_SRC_PADS PROXY_SRC_PADS
.lock() .lock()
@ -274,8 +257,8 @@ impl PadSinkHandler for ProxySinkPadHandler {
.map(|src_pad| src_pad.gst_pad().clone()) .map(|src_pad| src_pad.gst_pad().clone())
}; };
if let EventView::FlushStart(..) = event.view() { if let gst::EventView::FlushStart(..) = event.view() {
proxysink.stop(); imp.stop();
} }
if let Some(src_pad) = src_pad { if let Some(src_pad) = src_pad {
@ -288,36 +271,28 @@ impl PadSinkHandler for ProxySinkPadHandler {
} }
fn sink_event_serialized( fn sink_event_serialized(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_proxysink: &ProxySink, elem: super::ProxySink,
element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> BoxFuture<'static, bool> { ) -> BoxFuture<'static, bool> {
use gst::EventView;
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
let pad_weak = pad.downgrade();
let element = element.clone().downcast::<super::ProxySink>().unwrap();
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad.upgrade().expect("PadSink no longer exists");
let proxysink = element.imp(); gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
let imp = elem.imp();
use gst::EventView;
match event.view() { match event.view() {
EventView::Eos(..) => { EventView::Eos(..) => {
let _ = let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build());
element.post_message(gst::message::Eos::builder().src(&element).build());
} }
EventView::FlushStop(..) => proxysink.start(), EventView::FlushStop(..) => imp.start(),
_ => (), _ => (),
} }
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); gst::log!(SINK_CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
proxysink imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
.enqueue_item(DataQueueItem::Event(event))
.await
.is_ok()
} }
.boxed() .boxed()
} }
@ -691,19 +666,11 @@ struct ProxySrcPadHandler;
impl PadSrcHandler for ProxySrcPadHandler { impl PadSrcHandler for ProxySrcPadHandler {
type ElementImpl = ProxySrc; type ElementImpl = ProxySrc;
fn src_event( fn src_event(&self, pad: &PadSrcRef, imp: &ProxySrc, event: gst::Event) -> bool {
&self,
pad: &PadSrcRef,
proxysrc: &ProxySrc,
element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let sink_pad = { let sink_pad = {
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap(); let proxy_ctx = imp.proxy_ctx.lock().unwrap();
PROXY_SINK_PADS PROXY_SINK_PADS
.lock() .lock()
@ -713,12 +680,13 @@ impl PadSrcHandler for ProxySrcPadHandler {
.map(|sink_pad| sink_pad.gst_pad().clone()) .map(|sink_pad| sink_pad.gst_pad().clone())
}; };
use gst::EventView;
match event.view() { match event.view() {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
if let Err(err) = proxysrc.task.flush_start().await_maybe_on_context() { if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
gst::element_error!( gst::element_imp_error!(
element, imp,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["FlushStart failed {:?}", err] ["FlushStart failed {:?}", err]
@ -727,10 +695,10 @@ impl PadSrcHandler for ProxySrcPadHandler {
} }
} }
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
if let Err(err) = proxysrc.task.flush_stop().await_maybe_on_context() { if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); gst::error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
gst::element_error!( gst::element_imp_error!(
element, imp,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["FlushStop failed {:?}", err] ["FlushStop failed {:?}", err]
@ -750,16 +718,10 @@ impl PadSrcHandler for ProxySrcPadHandler {
} }
} }
fn src_query( fn src_query(&self, pad: &PadSrcRef, _proxysrc: &ProxySrc, query: &mut gst::QueryRef) -> bool {
&self,
pad: &PadSrcRef,
_proxysrc: &ProxySrc,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryViewMut;
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() { let ret = match query.view_mut() {
QueryViewMut::Latency(q) => { QueryViewMut::Latency(q) => {
q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE); q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);

View file

@ -33,7 +33,7 @@ use std::time::Duration;
use std::{u32, u64}; use std::{u32, u64};
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task}; use crate::runtime::{Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task};
use crate::dataqueue::{DataQueue, DataQueueItem}; use crate::dataqueue::{DataQueue, DataQueueItem};
@ -84,57 +84,43 @@ impl PadSinkHandler for QueuePadSinkHandler {
type ElementImpl = Queue; type ElementImpl = Queue;
fn sink_chain( fn sink_chain(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_queue: &Queue, elem: super::Queue,
element: &gst::Element,
buffer: gst::Buffer, buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let pad_weak = pad.downgrade();
let element = element.clone().downcast::<super::Queue>().unwrap();
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer);
let queue = element.imp(); let imp = elem.imp();
queue.enqueue_item(DataQueueItem::Buffer(buffer)).await imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
} }
.boxed() .boxed()
} }
fn sink_chain_list( fn sink_chain_list(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_queue: &Queue, elem: super::Queue,
element: &gst::Element,
list: gst::BufferList, list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let pad_weak = pad.downgrade();
let element = element.clone().downcast::<super::Queue>().unwrap();
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list);
let queue = element.imp(); let imp = elem.imp();
queue.enqueue_item(DataQueueItem::BufferList(list)).await imp.enqueue_item(DataQueueItem::BufferList(list)).await
} }
.boxed() .boxed()
} }
fn sink_event( fn sink_event(&self, pad: &PadSinkRef, imp: &Queue, event: gst::Event) -> bool {
&self,
pad: &PadSinkRef,
queue: &Queue,
element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
gst::debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); gst::debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
if let EventView::FlushStart(..) = event.view() { if let gst::EventView::FlushStart(..) = event.view() {
if let Err(err) = queue.task.flush_start().await_maybe_on_context() { if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
gst::element_error!( gst::element_imp_error!(
element, imp,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["FlushStart failed {:?}", err] ["FlushStart failed {:?}", err]
@ -144,31 +130,26 @@ impl PadSinkHandler for QueuePadSinkHandler {
} }
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); gst::log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
queue.src_pad.gst_pad().push_event(event) imp.src_pad.gst_pad().push_event(event)
} }
fn sink_event_serialized( fn sink_event_serialized(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_queue: &Queue, elem: super::Queue,
element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> BoxFuture<'static, bool> { ) -> BoxFuture<'static, bool> {
use gst::EventView;
gst::log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
let pad_weak = pad.downgrade();
let element = element.clone().downcast::<super::Queue>().unwrap();
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad.upgrade().expect("PadSink no longer exists");
let queue = element.imp(); gst::log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
if let EventView::FlushStop(..) = event.view() { let imp = elem.imp();
if let Err(err) = queue.task.flush_stop().await_maybe_on_context() {
if let gst::EventView::FlushStop(..) = event.view() {
if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
gst::element_error!( gst::element_imp_error!(
element, imp,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["FlushStop failed {:?}", err] ["FlushStop failed {:?}", err]
@ -178,21 +159,12 @@ impl PadSinkHandler for QueuePadSinkHandler {
} }
gst::log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); gst::log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
queue imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
.enqueue_item(DataQueueItem::Event(event))
.await
.is_ok()
} }
.boxed() .boxed()
} }
fn sink_query( fn sink_query(&self, pad: &PadSinkRef, imp: &Queue, query: &mut gst::QueryRef) -> bool {
&self,
pad: &PadSinkRef,
queue: &Queue,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
if query.is_serialized() { if query.is_serialized() {
@ -201,7 +173,7 @@ impl PadSinkHandler for QueuePadSinkHandler {
false false
} else { } else {
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
queue.src_pad.gst_pad().peer_query(query) imp.src_pad.gst_pad().peer_query(query)
} }
} }
} }
@ -212,28 +184,21 @@ struct QueuePadSrcHandler;
impl PadSrcHandler for QueuePadSrcHandler { impl PadSrcHandler for QueuePadSrcHandler {
type ElementImpl = Queue; type ElementImpl = Queue;
fn src_event( fn src_event(&self, pad: &PadSrcRef, imp: &Queue, event: gst::Event) -> bool {
&self,
pad: &PadSrcRef,
queue: &Queue,
element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
use gst::EventView;
match event.view() { match event.view() {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
if let Err(err) = queue.task.flush_start().await_maybe_on_context() { if let Err(err) = imp.task.flush_start().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); gst::error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
} }
} }
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
if let Err(err) = queue.task.flush_stop().await_maybe_on_context() { if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); gst::error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
gst::element_error!( gst::element_imp_error!(
element, imp,
gst::StreamError::Failed, gst::StreamError::Failed,
("Internal data stream error"), ("Internal data stream error"),
["FlushStop failed {:?}", err] ["FlushStop failed {:?}", err]
@ -245,23 +210,15 @@ impl PadSrcHandler for QueuePadSrcHandler {
} }
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
queue.sink_pad.gst_pad().push_event(event) imp.sink_pad.gst_pad().push_event(event)
} }
fn src_query( fn src_query(&self, pad: &PadSrcRef, imp: &Queue, query: &mut gst::QueryRef) -> bool {
&self,
pad: &PadSrcRef,
queue: &Queue,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryViewMut;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
if let QueryViewMut::Scheduling(q) = query.view_mut() { if let gst::QueryViewMut::Scheduling(q) = query.view_mut() {
let mut new_query = gst::query::Scheduling::new(); let mut new_query = gst::query::Scheduling::new();
let res = queue.sink_pad.gst_pad().peer_query(&mut new_query); let res = imp.sink_pad.gst_pad().peer_query(&mut new_query);
if !res { if !res {
return res; return res;
} }
@ -283,7 +240,7 @@ impl PadSrcHandler for QueuePadSrcHandler {
} }
gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); gst::log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query);
queue.sink_pad.gst_pad().peer_query(query) imp.sink_pad.gst_pad().peer_query(query)
} }
} }

View file

@ -118,13 +118,13 @@ fn event_to_event_full_serialized(
/// [`PadSrc`]: struct.PadSrc.html /// [`PadSrc`]: struct.PadSrc.html
/// [`pad` module]: index.html /// [`pad` module]: index.html
pub trait PadSrcHandler: Clone + Send + Sync + 'static { pub trait PadSrcHandler: Clone + Send + Sync + 'static {
// FIXME we should use a GAT here: ObjectSubclass<Type: IsA<gst::Element> + Send>
type ElementImpl: ElementImpl + ObjectSubclass; type ElementImpl: ElementImpl + ObjectSubclass;
fn src_activate( fn src_activate(
&self, &self,
pad: &PadSrcRef, pad: &PadSrcRef,
_imp: &Self::ElementImpl, _imp: &Self::ElementImpl,
_element: &gst::Element,
) -> Result<(), gst::LoggableError> { ) -> Result<(), gst::LoggableError> {
let gst_pad = pad.gst_pad(); let gst_pad = pad.gst_pad();
if gst_pad.is_active() { if gst_pad.is_active() {
@ -154,21 +154,22 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
&self, &self,
_pad: &PadSrcRef, _pad: &PadSrcRef,
_imp: &Self::ElementImpl, _imp: &Self::ElementImpl,
_element: &gst::Element,
_mode: gst::PadMode, _mode: gst::PadMode,
_active: bool, _active: bool,
) -> Result<(), gst::LoggableError> { ) -> Result<(), gst::LoggableError> {
Ok(()) Ok(())
} }
fn src_event( fn src_event(&self, pad: &PadSrcRef, imp: &Self::ElementImpl, event: gst::Event) -> bool {
&self,
pad: &PadSrcRef,
_imp: &Self::ElementImpl,
element: &gst::Element,
event: gst::Event,
) -> bool {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let elem = imp.instance();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
// use `.upcast::<gst::Element>()`
//
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
pad.gst_pad().event_default(Some(element), event) pad.gst_pad().event_default(Some(element), event)
} }
@ -176,20 +177,18 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
&self, &self,
pad: &PadSrcRef, pad: &PadSrcRef,
imp: &Self::ElementImpl, imp: &Self::ElementImpl,
element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> Result<FlowSuccess, FlowError> { ) -> Result<FlowSuccess, FlowError> {
// default is to dispatch to `src_event` // default is to dispatch to `src_event`
// (as implemented in `gst_pad_send_event_unchecked`) // (as implemented in `gst_pad_send_event_unchecked`)
let event_type = event.type_(); let event_type = event.type_();
event_to_event_full(self.src_event(pad, imp, element, event), event_type) event_to_event_full(self.src_event(pad, imp, event), event_type)
} }
fn src_query( fn src_query(
&self, &self,
pad: &PadSrcRef, pad: &PadSrcRef,
_imp: &Self::ElementImpl, imp: &Self::ElementImpl,
element: &gst::Element,
query: &mut gst::QueryRef, query: &mut gst::QueryRef,
) -> bool { ) -> bool {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
@ -198,6 +197,15 @@ pub trait PadSrcHandler: Clone + Send + Sync + 'static {
// but we can't return a `Future` because we couldn't honor QueryRef's lifetime // but we can't return a `Future` because we couldn't honor QueryRef's lifetime
false false
} else { } else {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
let elem = imp.instance();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
// use `.upcast::<gst::Element>()`
//
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
pad.gst_pad().query_default(Some(element), query) pad.gst_pad().query_default(Some(element), query)
} }
} }
@ -398,15 +406,7 @@ impl PadSrc {
"Panic in PadSrc activate" "Panic in PadSrc activate"
)) ))
}, },
move |imp| { move |imp| handler.src_activate(&PadSrcRef::new(inner_arc), imp),
let this_ref = PadSrcRef::new(inner_arc);
let element = imp.instance();
handler.src_activate(
&this_ref,
imp,
element.dynamic_cast_ref::<gst::Element>().unwrap(),
)
},
) )
}); });
@ -427,15 +427,8 @@ impl PadSrc {
}, },
move |imp| { move |imp| {
let this_ref = PadSrcRef::new(inner_arc); let this_ref = PadSrcRef::new(inner_arc);
let element = imp.instance();
this_ref.activate_mode_hook(mode, active)?; this_ref.activate_mode_hook(mode, active)?;
handler.src_activatemode( handler.src_activatemode(&this_ref, imp, mode, active)
&this_ref,
imp,
element.dynamic_cast_ref::<gst::Element>().unwrap(),
mode,
active,
)
}, },
) )
}); });
@ -451,16 +444,7 @@ impl PadSrc {
H::ElementImpl::catch_panic_pad_function( H::ElementImpl::catch_panic_pad_function(
parent, parent,
|| Err(FlowError::Error), || Err(FlowError::Error),
move |imp| { move |imp| handler.src_event_full(&PadSrcRef::new(inner_arc), imp, event),
let this_ref = PadSrcRef::new(inner_arc);
let element = imp.instance();
handler.src_event_full(
&this_ref,
imp,
element.dynamic_cast_ref::<gst::Element>().unwrap(),
event,
)
},
) )
}); });
@ -473,12 +457,10 @@ impl PadSrc {
parent, parent,
|| false, || false,
move |imp| { move |imp| {
let this_ref = PadSrcRef::new(inner_arc);
let element = imp.instance();
if !query.is_serialized() { if !query.is_serialized() {
handler.src_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query) handler.src_query(&PadSrcRef::new(inner_arc), imp, query)
} else { } else {
gst::fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported"); gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported");
false false
} }
}, },
@ -525,15 +507,13 @@ impl Deref for PadSrc {
/// [`PadSink`]: struct.PadSink.html /// [`PadSink`]: struct.PadSink.html
/// [`pad` module]: index.html /// [`pad` module]: index.html
pub trait PadSinkHandler: Clone + Send + Sync + 'static { pub trait PadSinkHandler: Clone + Send + Sync + 'static {
// FIXME we should use a GAT here: ObjectSubclass<Type: IsA<gst::Element> + Send>
type ElementImpl: ElementImpl + ObjectSubclass; type ElementImpl: ElementImpl + ObjectSubclass;
// FIXME: Once associated type bounds are stable we should use ObjectSubclass::Type below
// instead of &gst::Element
fn sink_activate( fn sink_activate(
&self, &self,
pad: &PadSinkRef, pad: &PadSinkRef,
_imp: &Self::ElementImpl, _imp: &Self::ElementImpl,
_element: &gst::Element,
) -> Result<(), gst::LoggableError> { ) -> Result<(), gst::LoggableError> {
let gst_pad = pad.gst_pad(); let gst_pad = pad.gst_pad();
if gst_pad.is_active() { if gst_pad.is_active() {
@ -563,7 +543,6 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
&self, &self,
_pad: &PadSinkRef, _pad: &PadSinkRef,
_imp: &Self::ElementImpl, _imp: &Self::ElementImpl,
_element: &gst::Element,
_mode: gst::PadMode, _mode: gst::PadMode,
_active: bool, _active: bool,
) -> Result<(), gst::LoggableError> { ) -> Result<(), gst::LoggableError> {
@ -571,50 +550,52 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
} }
fn sink_chain( fn sink_chain(
&self, self,
_pad: &PadSinkRef, _pad: PadSinkWeak,
_imp: &Self::ElementImpl, _elem: <Self::ElementImpl as ObjectSubclass>::Type,
_element: &gst::Element,
_buffer: gst::Buffer, _buffer: gst::Buffer,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> { ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
future::err(FlowError::NotSupported).boxed() future::err(FlowError::NotSupported).boxed()
} }
fn sink_chain_list( fn sink_chain_list(
&self, self,
_pad: &PadSinkRef, _pad: PadSinkWeak,
_imp: &Self::ElementImpl, _elem: <Self::ElementImpl as ObjectSubclass>::Type,
_element: &gst::Element,
_buffer_list: gst::BufferList, _buffer_list: gst::BufferList,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> { ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
future::err(FlowError::NotSupported).boxed() future::err(FlowError::NotSupported).boxed()
} }
fn sink_event( fn sink_event(&self, pad: &PadSinkRef, imp: &Self::ElementImpl, event: gst::Event) -> bool {
&self,
pad: &PadSinkRef,
_imp: &Self::ElementImpl,
element: &gst::Element,
event: gst::Event,
) -> bool {
assert!(!event.is_serialized()); assert!(!event.is_serialized());
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let elem = imp.instance();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
// use `.upcast::<gst::Element>()`
//
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
pad.gst_pad().event_default(Some(element), event) pad.gst_pad().event_default(Some(element), event)
} }
fn sink_event_serialized( fn sink_event_serialized(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_imp: &Self::ElementImpl, elem: <Self::ElementImpl as ObjectSubclass>::Type,
element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> BoxFuture<'static, bool> { ) -> BoxFuture<'static, bool> {
assert!(event.is_serialized()); assert!(event.is_serialized());
let pad_weak = pad.downgrade(); // FIXME with GAT on `Self::ElementImpl`, we should be able to
let element = element.clone(); // use `.upcast::<gst::Element>()`
//
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast::<gst::Element>() };
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
pad.gst_pad().event_default(Some(&element), event) pad.gst_pad().event_default(Some(&element), event)
@ -626,21 +607,19 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
&self, &self,
pad: &PadSinkRef, pad: &PadSinkRef,
imp: &Self::ElementImpl, imp: &Self::ElementImpl,
element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> Result<FlowSuccess, FlowError> { ) -> Result<FlowSuccess, FlowError> {
assert!(!event.is_serialized()); assert!(!event.is_serialized());
// default is to dispatch to `sink_event` // default is to dispatch to `sink_event`
// (as implemented in `gst_pad_send_event_unchecked`) // (as implemented in `gst_pad_send_event_unchecked`)
let event_type = event.type_(); let event_type = event.type_();
event_to_event_full(self.sink_event(pad, imp, element, event), event_type) event_to_event_full(self.sink_event(pad, imp, event), event_type)
} }
fn sink_event_full_serialized( fn sink_event_full_serialized(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
imp: &Self::ElementImpl, elem: <Self::ElementImpl as ObjectSubclass>::Type,
element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> { ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
assert!(event.is_serialized()); assert!(event.is_serialized());
@ -648,7 +627,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
// (as implemented in `gst_pad_send_event_unchecked`) // (as implemented in `gst_pad_send_event_unchecked`)
let event_type = event.type_(); let event_type = event.type_();
event_to_event_full_serialized( event_to_event_full_serialized(
self.sink_event_serialized(pad, imp, element, event), Self::sink_event_serialized(self, pad, elem, event),
event_type, event_type,
) )
} }
@ -656,8 +635,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
fn sink_query( fn sink_query(
&self, &self,
pad: &PadSinkRef, pad: &PadSinkRef,
_imp: &Self::ElementImpl, imp: &Self::ElementImpl,
element: &gst::Element,
query: &mut gst::QueryRef, query: &mut gst::QueryRef,
) -> bool { ) -> bool {
if query.is_serialized() { if query.is_serialized() {
@ -667,6 +645,14 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
false false
} else { } else {
gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst::log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
let elem = imp.instance();
// FIXME with GAT on `Self::ElementImpl`, we should be able to
// use `.upcast::<gst::Element>()`
//
// Safety: `Self::ElementImpl` is bound to `gst::subclass::ElementImpl`.
let element = unsafe { elem.unsafe_cast_ref::<gst::Element>() };
pad.gst_pad().query_default(Some(element), query) pad.gst_pad().query_default(Some(element), query)
} }
} }
@ -778,13 +764,6 @@ impl<'a> Deref for PadSinkRef<'a> {
pub struct PadSink(Arc<PadSinkInner>); pub struct PadSink(Arc<PadSinkInner>);
impl PadSink { impl PadSink {
pub fn new(gst_pad: gst::Pad, handler: impl PadSinkHandler) -> Self {
let this = PadSink(Arc::new(PadSinkInner::new(gst_pad)));
this.init_pad_functions(handler);
this
}
pub fn downgrade(&self) -> PadSinkWeak { pub fn downgrade(&self) -> PadSinkWeak {
PadSinkWeak(Arc::downgrade(&self.0)) PadSinkWeak(Arc::downgrade(&self.0))
} }
@ -792,9 +771,25 @@ impl PadSink {
pub fn as_ref(&self) -> PadSinkRef<'_> { pub fn as_ref(&self) -> PadSinkRef<'_> {
PadSinkRef::new(Arc::clone(&self.0)) PadSinkRef::new(Arc::clone(&self.0))
} }
}
fn init_pad_functions<H: PadSinkHandler>(&self, handler: H) { impl PadSink {
// FIXME: Do this better pub fn new<H>(gst_pad: gst::Pad, handler: H) -> Self
where
H: PadSinkHandler,
<H::ElementImpl as ObjectSubclass>::Type: IsA<gst::Element> + Send,
{
let this = PadSink(Arc::new(PadSinkInner::new(gst_pad)));
this.init_pad_functions(handler);
this
}
fn init_pad_functions<H>(&self, handler: H)
where
H: PadSinkHandler,
<H::ElementImpl as ObjectSubclass>::Type: IsA<gst::Element> + Send,
{
unsafe { unsafe {
let handler_clone = handler.clone(); let handler_clone = handler.clone();
let inner_arc = Arc::clone(&self.0); let inner_arc = Arc::clone(&self.0);
@ -802,6 +797,7 @@ impl PadSink {
.set_activate_function(move |gst_pad, parent| { .set_activate_function(move |gst_pad, parent| {
let handler = handler_clone.clone(); let handler = handler_clone.clone();
let inner_arc = inner_arc.clone(); let inner_arc = inner_arc.clone();
H::ElementImpl::catch_panic_pad_function( H::ElementImpl::catch_panic_pad_function(
parent, parent,
|| { || {
@ -811,15 +807,7 @@ impl PadSink {
"Panic in PadSink activate" "Panic in PadSink activate"
)) ))
}, },
move |imp| { move |imp| handler.sink_activate(&PadSinkRef::new(inner_arc), imp),
let this_ref = PadSinkRef::new(inner_arc);
let element = imp.instance();
handler.sink_activate(
&this_ref,
imp,
element.dynamic_cast_ref::<gst::Element>().unwrap(),
)
},
) )
}); });
@ -840,16 +828,8 @@ impl PadSink {
}, },
move |imp| { move |imp| {
let this_ref = PadSinkRef::new(inner_arc); let this_ref = PadSinkRef::new(inner_arc);
let element = imp.instance();
this_ref.activate_mode_hook(mode, active)?; this_ref.activate_mode_hook(mode, active)?;
handler.sink_activatemode(&this_ref, imp, mode, active)
handler.sink_activatemode(
&this_ref,
imp,
element.dynamic_cast_ref::<gst::Element>().unwrap(),
mode,
active,
)
}, },
) )
}); });
@ -864,32 +844,19 @@ impl PadSink {
parent, parent,
|| Err(FlowError::Error), || Err(FlowError::Error),
move |imp| { move |imp| {
let element = imp.instance(); let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let elem = imp.instance().clone();
if let Some((ctx, task_id)) = Context::current_task() { if let Some((ctx, task_id)) = Context::current_task() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
let element =
element.clone().dynamic_cast::<gst::Element>().unwrap();
let delayed_fut = async move { let delayed_fut = async move {
let imp = <H::ElementImpl as ObjectSubclassExt>::from_instance( H::sink_chain(handler, this_weak, elem, buffer).await
element.unsafe_cast_ref(),
);
let this_ref =
this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
handler.sink_chain(&this_ref, imp, &element, buffer).await
}; };
let _ = let _ =
ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop))); ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
} else { } else {
let this_ref = PadSinkRef::new(inner_arc); let chain_fut = H::sink_chain(handler, this_weak, elem, buffer);
let chain_fut = handler.sink_chain(
&this_ref,
imp,
element.dynamic_cast_ref::<gst::Element>().unwrap(),
buffer,
);
executor::block_on(chain_fut) executor::block_on(chain_fut)
} }
}, },
@ -906,34 +873,20 @@ impl PadSink {
parent, parent,
|| Err(FlowError::Error), || Err(FlowError::Error),
move |imp| { move |imp| {
let element = imp.instance(); let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let elem = imp.instance().clone();
if let Some((ctx, task_id)) = Context::current_task() { if let Some((ctx, task_id)) = Context::current_task() {
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let handler = handler.clone();
let element =
element.clone().dynamic_cast::<gst::Element>().unwrap();
let delayed_fut = async move { let delayed_fut = async move {
let imp = <H::ElementImpl as ObjectSubclassExt>::from_instance( H::sink_chain_list(handler, this_weak, elem, list).await
element.unsafe_cast_ref(),
);
let this_ref =
this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
handler
.sink_chain_list(&this_ref, imp, &element, list)
.await
}; };
let _ = let _ =
ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop))); ctx.add_sub_task(task_id, delayed_fut.map(|res| res.map(drop)));
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
} else { } else {
let this_ref = PadSinkRef::new(inner_arc); let chain_list_fut =
let chain_list_fut = handler.sink_chain_list( H::sink_chain_list(handler, this_weak, elem, list);
&this_ref,
imp,
element.dynamic_cast_ref::<gst::Element>().unwrap(),
list,
);
executor::block_on(chain_list_fut) executor::block_on(chain_list_fut)
} }
}, },
@ -952,26 +905,16 @@ impl PadSink {
parent, parent,
|| Err(FlowError::Error), || Err(FlowError::Error),
move |imp| { move |imp| {
let element = imp.instance();
if event.is_serialized() { if event.is_serialized() {
if let Some((ctx, task_id)) = Context::current_task() { let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc)); let elem = imp.instance().clone();
let handler = handler.clone();
let element =
element.clone().dynamic_cast::<gst::Element>().unwrap();
let delayed_fut = async move {
let imp =
<H::ElementImpl as ObjectSubclassExt>::from_instance(
element.unsafe_cast_ref(),
);
let this_ref =
this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
handler if let Some((ctx, task_id)) = Context::current_task() {
.sink_event_full_serialized( let delayed_fut = async move {
&this_ref, imp, &element, event, H::sink_event_full_serialized(
) handler, this_weak, elem, event,
.await )
.await
}; };
let _ = ctx.add_sub_task( let _ = ctx.add_sub_task(
task_id, task_id,
@ -980,23 +923,13 @@ impl PadSink {
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
} else { } else {
let this_ref = PadSinkRef::new(inner_arc); let event_fut = H::sink_event_full_serialized(
let event_fut = handler.sink_event_full_serialized( handler, this_weak, elem, event,
&this_ref,
imp,
element.dynamic_cast_ref::<gst::Element>().unwrap(),
event,
); );
executor::block_on(event_fut) executor::block_on(event_fut)
} }
} else { } else {
let this_ref = PadSinkRef::new(inner_arc); handler.sink_event_full(&PadSinkRef::new(inner_arc), imp, event)
handler.sink_event_full(
&this_ref,
imp,
element.dynamic_cast_ref::<gst::Element>().unwrap(),
event,
)
} }
}, },
) )
@ -1011,12 +944,10 @@ impl PadSink {
parent, parent,
|| false, || false,
move |imp| { move |imp| {
let this_ref = PadSinkRef::new(inner_arc);
let element = imp.instance();
if !query.is_serialized() { if !query.is_serialized() {
handler.sink_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query) handler.sink_query(&PadSinkRef::new(inner_arc), imp, query)
} else { } else {
gst::fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported"); gst::fixme!(RUNTIME_CAT, obj: inner_arc.gst_pad(), "Serialized Query not supported");
false false
} }
}, },

View file

@ -96,28 +96,13 @@ struct TcpClientSrcPadHandler;
impl PadSrcHandler for TcpClientSrcPadHandler { impl PadSrcHandler for TcpClientSrcPadHandler {
type ElementImpl = TcpClientSrc; type ElementImpl = TcpClientSrc;
fn src_event( fn src_event(&self, pad: &PadSrcRef, imp: &TcpClientSrc, event: gst::Event) -> bool {
&self,
pad: &PadSrcRef,
tcpclientsrc: &TcpClientSrc,
_element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
use gst::EventView;
let ret = match event.view() { let ret = match event.view() {
EventView::FlushStart(..) => tcpclientsrc EventView::FlushStart(..) => imp.task.flush_start().await_maybe_on_context().is_ok(),
.task EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(),
.flush_start()
.await_maybe_on_context()
.is_ok(),
EventView::FlushStop(..) => tcpclientsrc
.task
.flush_stop()
.await_maybe_on_context()
.is_ok(),
EventView::Reconfigure(..) => true, EventView::Reconfigure(..) => true,
EventView::Latency(..) => true, EventView::Latency(..) => true,
_ => false, _ => false,
@ -132,16 +117,10 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
ret ret
} }
fn src_query( fn src_query(&self, pad: &PadSrcRef, imp: &TcpClientSrc, query: &mut gst::QueryRef) -> bool {
&self,
pad: &PadSrcRef,
tcpclientsrc: &TcpClientSrc,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryViewMut;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() { let ret = match query.view_mut() {
QueryViewMut::Latency(q) => { QueryViewMut::Latency(q) => {
q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE); q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE);
@ -153,8 +132,7 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
true true
} }
QueryViewMut::Caps(q) => { QueryViewMut::Caps(q) => {
let caps = if let Some(caps) = tcpclientsrc.configured_caps.lock().unwrap().as_ref() let caps = if let Some(caps) = imp.configured_caps.lock().unwrap().as_ref() {
{
q.filter() q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone()) .unwrap_or_else(|| caps.clone())

View file

@ -30,7 +30,7 @@ use gst::{element_error, error_msg};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use crate::runtime::prelude::*; use crate::runtime::prelude::*;
use crate::runtime::{self, Async, Context, PadSink, PadSinkRef, Task}; use crate::runtime::{self, Async, Context, PadSink, PadSinkRef, PadSinkWeak, Task};
use crate::socket::{wrap_socket, GioSocketWrapper}; use crate::socket::{wrap_socket, GioSocketWrapper};
use std::collections::BTreeSet; use std::collections::BTreeSet;
@ -133,18 +133,15 @@ impl PadSinkHandler for UdpSinkPadHandler {
type ElementImpl = UdpSink; type ElementImpl = UdpSink;
fn sink_chain( fn sink_chain(
&self, self,
_pad: &PadSinkRef, _pad: PadSinkWeak,
udpsink: &UdpSink, elem: super::UdpSink,
element: &gst::Element,
buffer: gst::Buffer, buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let sender = udpsink.clone_item_sender(); let sender = elem.imp().clone_item_sender();
let element = element.clone().downcast::<super::UdpSink>().unwrap();
async move { async move {
if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing"); gst::debug!(CAT, obj: &elem, "Flushing");
return Err(gst::FlowError::Flushing); return Err(gst::FlowError::Flushing);
} }
@ -154,19 +151,16 @@ impl PadSinkHandler for UdpSinkPadHandler {
} }
fn sink_chain_list( fn sink_chain_list(
&self, self,
_pad: &PadSinkRef, _pad: PadSinkWeak,
udpsink: &UdpSink, elem: super::UdpSink,
element: &gst::Element,
list: gst::BufferList, list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let sender = udpsink.clone_item_sender(); let sender = elem.imp().clone_item_sender();
let element = element.clone().downcast::<super::UdpSink>().unwrap();
async move { async move {
for buffer in list.iter_owned() { for buffer in list.iter_owned() {
if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() { if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing"); gst::debug!(CAT, obj: &elem, "Flushing");
return Err(gst::FlowError::Flushing); return Err(gst::FlowError::Flushing);
} }
} }
@ -177,21 +171,18 @@ impl PadSinkHandler for UdpSinkPadHandler {
} }
fn sink_event_serialized( fn sink_event_serialized(
&self, self,
_pad: &PadSinkRef, _pad: PadSinkWeak,
udpsink: &UdpSink, elem: super::UdpSink,
element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> BoxFuture<'static, bool> { ) -> BoxFuture<'static, bool> {
let sender = udpsink.clone_item_sender(); let sender = elem.imp().clone_item_sender();
let element = element.clone().downcast::<super::UdpSink>().unwrap();
async move { async move {
if let EventView::FlushStop(_) = event.view() { if let EventView::FlushStop(_) = event.view() {
let udpsink = element.imp(); let imp = elem.imp();
return udpsink.task.flush_stop().await_maybe_on_context().is_ok(); return imp.task.flush_stop().await_maybe_on_context().is_ok();
} else if sender.send_async(TaskItem::Event(event)).await.is_err() { } else if sender.send_async(TaskItem::Event(event)).await.is_err() {
gst::debug!(CAT, obj: &element, "Flushing"); gst::debug!(CAT, obj: &elem, "Flushing");
} }
true true
@ -199,15 +190,9 @@ impl PadSinkHandler for UdpSinkPadHandler {
.boxed() .boxed()
} }
fn sink_event( fn sink_event(&self, _pad: &PadSinkRef, imp: &UdpSink, event: gst::Event) -> bool {
&self,
_pad: &PadSinkRef,
udpsink: &UdpSink,
_element: &gst::Element,
event: gst::Event,
) -> bool {
if let EventView::FlushStart(..) = event.view() { if let EventView::FlushStart(..) = event.view() {
return udpsink.task.flush_start().await_maybe_on_context().is_ok(); return imp.task.flush_start().await_maybe_on_context().is_ok();
} }
true true

View file

@ -113,20 +113,13 @@ struct UdpSrcPadHandler;
impl PadSrcHandler for UdpSrcPadHandler { impl PadSrcHandler for UdpSrcPadHandler {
type ElementImpl = UdpSrc; type ElementImpl = UdpSrc;
fn src_event( fn src_event(&self, pad: &PadSrcRef, imp: &UdpSrc, event: gst::Event) -> bool {
&self,
pad: &PadSrcRef,
udpsrc: &UdpSrc,
_element: &gst::Element,
event: gst::Event,
) -> bool {
use gst::EventView;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
use gst::EventView;
let ret = match event.view() { let ret = match event.view() {
EventView::FlushStart(..) => udpsrc.task.flush_start().await_maybe_on_context().is_ok(), EventView::FlushStart(..) => imp.task.flush_start().await_maybe_on_context().is_ok(),
EventView::FlushStop(..) => udpsrc.task.flush_stop().await_maybe_on_context().is_ok(), EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(),
EventView::Reconfigure(..) => true, EventView::Reconfigure(..) => true,
EventView::Latency(..) => true, EventView::Latency(..) => true,
_ => false, _ => false,
@ -141,17 +134,10 @@ impl PadSrcHandler for UdpSrcPadHandler {
ret ret
} }
fn src_query( fn src_query(&self, pad: &PadSrcRef, imp: &UdpSrc, query: &mut gst::QueryRef) -> bool {
&self,
pad: &PadSrcRef,
udpsrc: &UdpSrc,
_element: &gst::Element,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryViewMut;
gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); gst::log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query);
use gst::QueryViewMut;
let ret = match query.view_mut() { let ret = match query.view_mut() {
QueryViewMut::Latency(q) => { QueryViewMut::Latency(q) => {
q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE); q.set(true, gst::ClockTime::ZERO, gst::ClockTime::NONE);
@ -163,7 +149,7 @@ impl PadSrcHandler for UdpSrcPadHandler {
true true
} }
QueryViewMut::Caps(q) => { QueryViewMut::Caps(q) => {
let caps = if let Some(caps) = udpsrc.configured_caps.lock().unwrap().as_ref() { let caps = if let Some(caps) = imp.configured_caps.lock().unwrap().as_ref() {
q.filter() q.filter()
.map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First)) .map(|f| f.intersect_with_mode(caps, gst::CapsIntersectMode::First))
.unwrap_or_else(|| caps.clone()) .unwrap_or_else(|| caps.clone())

View file

@ -36,7 +36,9 @@ use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use gstthreadshare::runtime::prelude::*; use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task, TaskState}; use gstthreadshare::runtime::{
Context, PadSink, PadSinkRef, PadSinkWeak, PadSrc, PadSrcRef, Task, TaskState,
};
const DEFAULT_CONTEXT: &str = ""; const DEFAULT_CONTEXT: &str = "";
const THROTTLING_DURATION: Duration = Duration::from_millis(2); const THROTTLING_DURATION: Duration = Duration::from_millis(2);
@ -87,27 +89,15 @@ mod imp_src {
impl PadSrcHandler for PadSrcTestHandler { impl PadSrcHandler for PadSrcTestHandler {
type ElementImpl = ElementSrcTest; type ElementImpl = ElementSrcTest;
fn src_event( fn src_event(&self, pad: &PadSrcRef, imp: &ElementSrcTest, event: gst::Event) -> bool {
&self,
pad: &PadSrcRef,
elem_src_test: &ElementSrcTest,
_element: &gst::Element,
event: gst::Event,
) -> bool {
gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event); gst::log!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() { let ret = match event.view() {
EventView::FlushStart(..) => elem_src_test EventView::FlushStart(..) => {
.task imp.task.flush_start().await_maybe_on_context().is_ok()
.flush_start() }
.await_maybe_on_context()
.is_ok(),
EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true, EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true,
EventView::FlushStop(..) => elem_src_test EventView::FlushStop(..) => imp.task.flush_stop().await_maybe_on_context().is_ok(),
.task
.flush_stop()
.await_maybe_on_context()
.is_ok(),
_ => false, _ => false,
}; };
@ -337,6 +327,7 @@ mod imp_src {
let obj = self.instance(); let obj = self.instance();
obj.add_pad(self.src_pad.gst_pad()).unwrap(); obj.add_pad(self.src_pad.gst_pad()).unwrap();
obj.set_element_flags(gst::ElementFlags::SOURCE);
} }
} }
@ -449,53 +440,37 @@ mod imp_sink {
type ElementImpl = ElementSinkTest; type ElementImpl = ElementSinkTest;
fn sink_chain( fn sink_chain(
&self, self,
_pad: &PadSinkRef, _pad: PadSinkWeak,
_elem_sink_test: &ElementSinkTest, elem: super::ElementSinkTest,
element: &gst::Element,
buffer: gst::Buffer, buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let element = element
.clone()
.downcast::<super::ElementSinkTest>()
.unwrap();
async move { async move {
let elem_sink_test = element.imp(); let imp = elem.imp();
elem_sink_test.forward_item(Item::Buffer(buffer)).await imp.forward_item(Item::Buffer(buffer)).await
} }
.boxed() .boxed()
} }
fn sink_chain_list( fn sink_chain_list(
&self, self,
_pad: &PadSinkRef, _pad: PadSinkWeak,
_elem_sink_test: &ElementSinkTest, elem: super::ElementSinkTest,
element: &gst::Element,
list: gst::BufferList, list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let element = element
.clone()
.downcast::<super::ElementSinkTest>()
.unwrap();
async move { async move {
let elem_sink_test = element.imp(); let imp = elem.imp();
elem_sink_test.forward_item(Item::BufferList(list)).await imp.forward_item(Item::BufferList(list)).await
} }
.boxed() .boxed()
} }
fn sink_event( fn sink_event(&self, pad: &PadSinkRef, imp: &ElementSinkTest, event: gst::Event) -> bool {
&self,
pad: &PadSinkRef,
elem_sink_test: &ElementSinkTest,
_element: &gst::Element,
event: gst::Event,
) -> bool {
gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); gst::debug!(SINK_CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
match event.view() { match event.view() {
EventView::FlushStart(..) => { EventView::FlushStart(..) => {
elem_sink_test.stop(); imp.stop();
true true
} }
_ => false, _ => false,
@ -503,29 +478,21 @@ mod imp_sink {
} }
fn sink_event_serialized( fn sink_event_serialized(
&self, self,
pad: &PadSinkRef, pad: PadSinkWeak,
_elem_sink_test: &ElementSinkTest, elem: super::ElementSinkTest,
element: &gst::Element,
event: gst::Event, event: gst::Event,
) -> BoxFuture<'static, bool> { ) -> BoxFuture<'static, bool> {
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
let element = element
.clone()
.downcast::<super::ElementSinkTest>()
.unwrap();
async move { async move {
let elem_sink_test = element.imp(); let pad = pad.upgrade().expect("PadSink no longer exists");
gst::log!(SINK_CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event);
let imp = elem.imp();
if let EventView::FlushStop(..) = event.view() { if let EventView::FlushStop(..) = event.view() {
elem_sink_test.start(); imp.start();
} }
elem_sink_test imp.forward_item(Item::Event(event)).await.is_ok()
.forward_item(Item::Event(event))
.await
.is_ok()
} }
.boxed() .boxed()
} }
@ -652,6 +619,7 @@ mod imp_sink {
let obj = self.instance(); let obj = self.instance();
obj.add_pad(self.sink_pad.gst_pad()).unwrap(); obj.add_pad(self.sink_pad.gst_pad()).unwrap();
obj.set_element_flags(gst::ElementFlags::SINK);
} }
} }