threadshare/inputselector: Implement a correct chain_list() function

Instead of directly forwarding the list, handle each buffer separately
for now. Previously we would directly forward the lists from any pad,
including inactive ones, downstream.
This commit is contained in:
Sebastian Dröge 2020-02-27 16:00:37 +02:00
parent 0b240b829e
commit 6c4108671f

View file

@ -148,7 +148,7 @@ impl InputSelectorPadSinkHandler {
async fn handle_item( async fn handle_item(
&self, &self,
pad: PadSinkRef<'_>, pad: &PadSinkRef<'_>,
element: &gst::Element, element: &gst::Element,
buffer: gst::Buffer, buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
@ -220,7 +220,7 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
let pad_weak = pad.downgrade(); let pad_weak = pad.downgrade();
async move { async move {
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad_weak.upgrade().expect("PadSink no longer exists");
this.handle_item(pad, &element, buffer).await this.handle_item(&pad, &element, buffer).await
} }
.boxed() .boxed()
} }
@ -232,13 +232,18 @@ impl PadSinkHandler for InputSelectorPadSinkHandler {
element: &gst::Element, element: &gst::Element,
list: gst::BufferList, list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
let this = self.clone();
let element = element.clone(); let element = element.clone();
let pad_weak = pad.downgrade(); let pad_weak = pad.downgrade();
async move { async move {
let inputselector = InputSelector::from_instance(&element);
let pad = pad_weak.upgrade().expect("PadSink no longer exists"); let pad = pad_weak.upgrade().expect("PadSink no longer exists");
gst_log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list); gst_log!(CAT, obj: pad.gst_pad(), "Handling buffer list {:?}", list);
inputselector.src_pad.push_list(list).await // TODO: Ideally we would keep the list intact and forward it in one go
for buffer in list.iter_owned() {
this.handle_item(&pad, &element, buffer).await?;
}
Ok(gst::FlowSuccess::Ok)
} }
.boxed() .boxed()
} }