ts: use async fn in traits when possible

This is stable since Rust 1.75.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2278>
This commit is contained in:
François Laignel 2025-06-05 17:00:30 +02:00 committed by GStreamer Marge Bot
parent 9b677234dd
commit d0ae6b87b4
17 changed files with 1938 additions and 2306 deletions

View file

@ -6,9 +6,6 @@
//
// SPDX-License-Identifier: MPL-2.0
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
@ -96,61 +93,55 @@ struct AsyncPadSinkHandler(Arc<futures::lock::Mutex<PadSinkHandlerInner>>);
impl PadSinkHandler for AsyncPadSinkHandler {
type ElementImpl = AsyncMutexSink;
fn sink_chain(
async fn sink_chain(
self,
_pad: gst::Pad,
elem: super::AsyncMutexSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
if self.0.lock().await.handle_buffer(&elem, buffer).is_err() {
return Err(gst::FlowError::Flushing);
}
Ok(gst::FlowSuccess::Ok)
) -> Result<gst::FlowSuccess, gst::FlowError> {
if self.0.lock().await.handle_buffer(&elem, buffer).is_err() {
return Err(gst::FlowError::Flushing);
}
.boxed()
Ok(gst::FlowSuccess::Ok)
}
fn sink_event_serialized(
async fn sink_event_serialized(
self,
_pad: gst::Pad,
elem: super::AsyncMutexSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
match event.view() {
EventView::Eos(_) => {
{
let mut inner = self.0.lock().await;
debug_or_trace!(CAT, inner.is_main_elem, obj = elem, "EOS");
inner.is_flushing = true;
}
) -> bool {
match event.view() {
EventView::Eos(_) => {
{
let mut inner = self.0.lock().await;
debug_or_trace!(CAT, inner.is_main_elem, obj = elem, "EOS");
inner.is_flushing = true;
}
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ = elem
.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
}
EventView::FlushStop(_) => {
self.0.lock().await.is_flushing = false;
}
EventView::Segment(evt) => {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.0.lock().await.segment_start = time_seg.start();
}
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
}
_ => (),
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ =
elem.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
}
true
EventView::FlushStop(_) => {
self.0.lock().await.is_flushing = false;
}
EventView::Segment(evt) => {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.0.lock().await.segment_start = time_seg.start();
}
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
}
_ => (),
}
.boxed()
true
}
fn sink_event(self, _pad: &gst::Pad, _imp: &AsyncMutexSink, event: gst::Event) -> bool {

View file

@ -6,9 +6,6 @@
//
// SPDX-License-Identifier: MPL-2.0
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
@ -95,61 +92,55 @@ struct SyncPadSinkHandler(Arc<Mutex<PadSinkHandlerInner>>);
impl PadSinkHandler for SyncPadSinkHandler {
type ElementImpl = DirectSink;
fn sink_chain(
async fn sink_chain(
self,
_pad: gst::Pad,
elem: super::DirectSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
if self.0.lock().unwrap().handle_buffer(&elem, buffer).is_err() {
return Err(gst::FlowError::Flushing);
}
Ok(gst::FlowSuccess::Ok)
) -> Result<gst::FlowSuccess, gst::FlowError> {
if self.0.lock().unwrap().handle_buffer(&elem, buffer).is_err() {
return Err(gst::FlowError::Flushing);
}
.boxed()
Ok(gst::FlowSuccess::Ok)
}
fn sink_event_serialized(
async fn sink_event_serialized(
self,
_pad: gst::Pad,
elem: super::DirectSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
match event.view() {
EventView::Eos(_) => {
{
let mut inner = self.0.lock().unwrap();
debug_or_trace!(CAT, inner.is_main_elem, obj = elem, "EOS");
inner.is_flushing = true;
}
) -> bool {
match event.view() {
EventView::Eos(_) => {
{
let mut inner = self.0.lock().unwrap();
debug_or_trace!(CAT, inner.is_main_elem, obj = elem, "EOS");
inner.is_flushing = true;
}
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ = elem
.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
}
EventView::FlushStop(_) => {
self.0.lock().unwrap().is_flushing = false;
}
EventView::Segment(evt) => {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.0.lock().unwrap().segment_start = time_seg.start();
}
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
}
_ => (),
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ =
elem.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
}
true
EventView::FlushStop(_) => {
self.0.lock().unwrap().is_flushing = false;
}
EventView::Segment(evt) => {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.0.lock().unwrap().segment_start = time_seg.start();
}
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
}
_ => (),
}
.boxed()
true
}
fn sink_event(self, _pad: &gst::Pad, _imp: &DirectSink, event: gst::Event) -> bool {

View file

@ -6,9 +6,6 @@
//
// SPDX-License-Identifier: MPL-2.0
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::error_msg;
use gst::glib;
use gst::prelude::*;
@ -37,59 +34,53 @@ struct TaskPadSinkHandler;
impl PadSinkHandler for TaskPadSinkHandler {
type ElementImpl = TaskSink;
fn sink_chain(
async fn sink_chain(
self,
_pad: gst::Pad,
elem: super::TaskSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
) -> Result<gst::FlowSuccess, gst::FlowError> {
let sender = elem.imp().clone_item_sender();
async move {
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
return Err(gst::FlowError::Flushing);
}
Ok(gst::FlowSuccess::Ok)
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
return Err(gst::FlowError::Flushing);
}
.boxed()
Ok(gst::FlowSuccess::Ok)
}
fn sink_event_serialized(
async fn sink_event_serialized(
self,
_pad: gst::Pad,
elem: super::TaskSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
) -> bool {
let sender = elem.imp().clone_item_sender();
async move {
match event.view() {
EventView::Segment(_) => {
let _ = sender.send_async(StreamItem::Event(event)).await;
}
EventView::Eos(_) => {
let is_main_elem = elem.imp().settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, obj = elem, "EOS");
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ = elem
.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
}
EventView::FlushStop(_) => {
let imp = elem.imp();
return imp.task.flush_stop().await_maybe_on_context().is_ok();
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
}
_ => (),
match event.view() {
EventView::Segment(_) => {
let _ = sender.send_async(StreamItem::Event(event)).await;
}
EventView::Eos(_) => {
let is_main_elem = elem.imp().settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, obj = elem, "EOS");
true
// When each element sends its own EOS message,
// it takes ages for the pipeline to process all of them.
// Let's just post an error message and let main shuts down
// after all streams have posted this message.
let _ =
elem.post_message(gst::message::Error::new(gst::LibraryError::Shutdown, "EOS"));
}
EventView::FlushStop(_) => {
let imp = elem.imp();
return imp.task.flush_stop().await_maybe_on_context().is_ok();
}
EventView::SinkMessage(evt) => {
let _ = elem.post_message(evt.message());
}
_ => (),
}
.boxed()
true
}
fn sink_event(self, _pad: &gst::Pad, imp: &TaskSink, event: gst::Event) -> bool {
@ -136,92 +127,80 @@ impl TaskSinkTask {
impl TaskImpl for TaskSinkTask {
type Item = StreamItem;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Preparing Task");
future::ok(()).boxed()
Ok(())
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Starting Task");
self.last_ts = None;
if let Some(stats) = self.stats.as_mut() {
stats.start();
}
Ok(())
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Starting Task");
self.last_ts = None;
if let Some(stats) = self.stats.as_mut() {
stats.start();
}
.boxed()
Ok(())
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Stopping Task");
self.flush();
Ok(())
}
.boxed()
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Stopping Task");
self.flush();
Ok(())
}
fn try_next(&mut self) -> BoxFuture<'_, Result<StreamItem, gst::FlowError>> {
self.item_receiver
.recv_async()
.map(|opt_item| Ok(opt_item.unwrap()))
.boxed()
async fn try_next(&mut self) -> Result<StreamItem, gst::FlowError> {
Ok(self.item_receiver.recv_async().await.unwrap())
}
fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
debug_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Received {item:?}");
async fn handle_item(&mut self, item: StreamItem) -> Result<(), gst::FlowError> {
debug_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Received {item:?}");
match item {
StreamItem::Buffer(buffer) => {
let ts = buffer
.dts_or_pts()
.expect("Buffer without ts")
// FIXME do proper segment to running time
.checked_sub(self.segment_start.expect("Buffer without Time Segment"))
.expect("dts before Segment start");
match item {
StreamItem::Buffer(buffer) => {
let ts = buffer
.dts_or_pts()
.expect("Buffer without ts")
// FIXME do proper segment to running time
.checked_sub(self.segment_start.expect("Buffer without Time Segment"))
.expect("dts before Segment start");
if let Some(last_ts) = self.last_ts {
let cur_ts = self.elem.current_running_time().unwrap();
let latency: Duration = (cur_ts - ts).into();
let interval: Duration = (ts - last_ts).into();
if let Some(last_ts) = self.last_ts {
let cur_ts = self.elem.current_running_time().unwrap();
let latency: Duration = (cur_ts - ts).into();
let interval: Duration = (ts - last_ts).into();
if let Some(stats) = self.stats.as_mut() {
stats.add_buffer(latency, interval);
}
debug_or_trace!(
CAT,
self.is_main_elem,
obj = self.elem,
"o latency {latency:.2?}",
);
debug_or_trace!(
CAT,
self.is_main_elem,
obj = self.elem,
"o interval {interval:.2?}",
);
if let Some(stats) = self.stats.as_mut() {
stats.add_buffer(latency, interval);
}
self.last_ts = Some(ts);
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Buffer processed");
debug_or_trace!(
CAT,
self.is_main_elem,
obj = self.elem,
"o latency {latency:.2?}",
);
debug_or_trace!(
CAT,
self.is_main_elem,
obj = self.elem,
"o interval {interval:.2?}",
);
}
StreamItem::Event(evt) => {
if let EventView::Segment(evt) = evt.view() {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.segment_start = time_seg.start();
}
self.last_ts = Some(ts);
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Buffer processed");
}
StreamItem::Event(evt) => {
if let EventView::Segment(evt) = evt.view() {
if let Some(time_seg) = evt.segment().downcast_ref::<gst::ClockTime>() {
self.segment_start = time_seg.start();
}
}
}
Ok(())
}
.boxed()
Ok(())
}
}

View file

@ -6,7 +6,6 @@
//
// SPDX-License-Identifier: MPL-2.0
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
@ -98,7 +97,7 @@ impl SrcTask {
impl TaskImpl for SrcTask {
type Item = ();
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
let imp = self.elem.imp();
let settings = imp.settings.lock().unwrap();
self.is_main_elem = settings.is_main_elem;
@ -108,148 +107,135 @@ impl TaskImpl for SrcTask {
self.push_period = settings.push_period;
self.num_buffers = settings.num_buffers;
future::ok(()).boxed()
Ok(())
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Starting Task");
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Starting Task");
if self.need_initial_events {
let imp = self.elem.imp();
if self.need_initial_events {
let imp = self.elem.imp();
debug_or_trace!(
CAT,
self.is_main_elem,
obj = self.elem,
"Pushing initial events"
);
let stream_id =
format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
imp.src_pad.push_event(stream_start_evt).await;
imp.src_pad
.push_event(gst::event::Caps::new(
&gst::Caps::builder("foo/bar").build(),
))
.await;
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
imp.src_pad.push_event(segment_evt).await;
self.need_initial_events = false;
}
self.timer = Some(
timer::interval_delayed_by(
// Delay first buffer push so as to let others start.
Duration::from_secs(2),
self.push_period.into(),
)
.expect("push period must be greater than 0"),
debug_or_trace!(
CAT,
self.is_main_elem,
obj = self.elem,
"Pushing initial events"
);
self.buffer_count = 0;
self.buffer_pool.set_active(true).unwrap();
Ok(())
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
imp.src_pad.push_event(stream_start_evt).await;
imp.src_pad
.push_event(gst::event::Caps::new(
&gst::Caps::builder("foo/bar").build(),
))
.await;
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
imp.src_pad.push_event(segment_evt).await;
self.need_initial_events = false;
}
.boxed()
self.timer = Some(
timer::interval_delayed_by(
// Delay first buffer push so as to let others start.
Duration::from_secs(2),
self.push_period.into(),
)
.expect("push period must be greater than 0"),
);
self.buffer_count = 0;
self.buffer_pool.set_active(true).unwrap();
Ok(())
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Stopping Task");
self.buffer_pool.set_active(false).unwrap();
self.timer = None;
self.need_initial_events = true;
future::ok(()).boxed()
Ok(())
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Awaiting timer");
self.timer.as_mut().unwrap().next().await;
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Timer ticked");
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Awaiting timer");
self.timer.as_mut().unwrap().next().await;
log_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Timer ticked");
Ok(())
Ok(())
}
async fn handle_item(&mut self, _: ()) -> Result<(), gst::FlowError> {
let buffer = self
.buffer_pool
.acquire_buffer(None)
.map(|mut buffer| {
{
let buffer = buffer.get_mut().unwrap();
let rtime = self.elem.current_running_time().unwrap();
buffer.set_pts(rtime);
}
buffer
})
.inspect_err(|&err| {
gst::error!(CAT, obj = self.elem, "Failed to acquire buffer {err}");
})?;
debug_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Forwarding buffer");
self.elem.imp().src_pad.push(buffer).await?;
log_or_trace!(
CAT,
self.is_main_elem,
obj = self.elem,
"Successfully pushed buffer"
);
self.buffer_count += 1;
if self.num_buffers.opt_eq(self.buffer_count) == Some(true) {
return Err(gst::FlowError::Eos);
}
.boxed()
Ok(())
}
fn handle_item(&mut self, _: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let buffer = self
.buffer_pool
.acquire_buffer(None)
.map(|mut buffer| {
{
let buffer = buffer.get_mut().unwrap();
let rtime = self.elem.current_running_time().unwrap();
buffer.set_pts(rtime);
}
buffer
})
.inspect_err(|&err| {
gst::error!(CAT, obj = self.elem, "Failed to acquire buffer {err}");
})?;
async fn handle_loop_error(&mut self, err: gst::FlowError) -> task::Trigger {
match err {
gst::FlowError::Eos => {
debug_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Pushing EOS");
debug_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Forwarding buffer");
self.elem.imp().src_pad.push(buffer).await?;
log_or_trace!(
CAT,
self.is_main_elem,
obj = self.elem,
"Successfully pushed buffer"
);
let imp = self.elem.imp();
if !imp.src_pad.push_event(gst::event::Eos::new()).await {
gst::error!(CAT, imp = imp, "Error pushing EOS");
}
self.buffer_count += 1;
if self.num_buffers.opt_eq(self.buffer_count) == Some(true) {
return Err(gst::FlowError::Eos);
task::Trigger::Stop
}
gst::FlowError::Flushing => {
debug_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Flushing");
Ok(())
}
.boxed()
}
task::Trigger::FlushStart
}
err => {
gst::error!(CAT, obj = self.elem, "Got error {err}");
gst::element_error!(
&self.elem,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, task::Trigger> {
async move {
match err {
gst::FlowError::Eos => {
debug_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Pushing EOS");
let imp = self.elem.imp();
if !imp.src_pad.push_event(gst::event::Eos::new()).await {
gst::error!(CAT, imp = imp, "Error pushing EOS");
}
task::Trigger::Stop
}
gst::FlowError::Flushing => {
debug_or_trace!(CAT, self.is_main_elem, obj = self.elem, "Flushing");
task::Trigger::FlushStart
}
err => {
gst::error!(CAT, obj = self.elem, "Got error {err}");
gst::element_error!(
&self.elem,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
task::Trigger::Error
}
task::Trigger::Error
}
}
.boxed()
}
}

View file

@ -19,7 +19,6 @@
// SPDX-License-Identifier: LGPL-2.1-or-later
use futures::channel::mpsc;
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
@ -226,72 +225,60 @@ impl AppSrcTask {
impl TaskImpl for AppSrcTask {
type Item = StreamItem;
fn try_next(&mut self) -> BoxFuture<'_, Result<StreamItem, gst::FlowError>> {
async move {
self.receiver
.next()
.await
.ok_or_else(|| panic!("Internal channel sender dropped while Task is Started"))
}
.boxed()
async fn try_next(&mut self) -> Result<StreamItem, gst::FlowError> {
self.receiver
.next()
.await
.ok_or_else(|| panic!("Internal channel sender dropped while Task is Started"))
}
fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let res = self.push_item(item).await;
match res {
Ok(_) => {
gst::log!(CAT, obj = self.element, "Successfully pushed item");
}
Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj = self.element, "EOS");
let appsrc = self.element.imp();
appsrc.src_pad.push_event(gst::event::Eos::new()).await;
}
Err(gst::FlowError::Flushing) => {
gst::debug!(CAT, obj = self.element, "Flushing");
}
Err(err) => {
gst::error!(CAT, obj = self.element, "Got error {}", err);
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
async fn handle_item(&mut self, item: StreamItem) -> Result<(), gst::FlowError> {
let res = self.push_item(item).await;
match res {
Ok(_) => {
gst::log!(CAT, obj = self.element, "Successfully pushed item");
}
Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj = self.element, "EOS");
let appsrc = self.element.imp();
appsrc.src_pad.push_event(gst::event::Eos::new()).await;
}
Err(gst::FlowError::Flushing) => {
gst::debug!(CAT, obj = self.element, "Flushing");
}
Err(err) => {
gst::error!(CAT, obj = self.element, "Got error {}", err);
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
}
res.map(drop)
}
.boxed()
res.map(drop)
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj = self.element, "Stopping task");
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Stopping task");
self.flush();
self.need_initial_events = true;
self.need_segment = true;
self.flush();
self.need_initial_events = true;
self.need_segment = true;
gst::log!(CAT, obj = self.element, "Task stopped");
Ok(())
}
.boxed()
gst::log!(CAT, obj = self.element, "Task stopped");
Ok(())
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj = self.element, "Starting task flush");
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Starting task flush");
self.flush();
self.need_segment = true;
self.flush();
self.need_segment = true;
gst::log!(CAT, obj = self.element, "Task flush started");
Ok(())
}
.boxed()
gst::log!(CAT, obj = self.element, "Task flush started");
Ok(())
}
}

View file

@ -6,9 +6,6 @@
//
// SPDX-License-Identifier: MPL-2.0
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
@ -240,7 +237,7 @@ impl AudioTestSrcTask {
impl TaskImpl for AudioTestSrcTask {
type Item = gst::Buffer;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.elem, "Preparing Task");
let imp = self.elem.imp();
@ -255,84 +252,80 @@ impl TaskImpl for AudioTestSrcTask {
self.is_main_elem = settings.is_main_elem;
}
future::ok(()).boxed()
Ok(())
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj = self.elem, "Starting Task");
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.elem, "Starting Task");
if self.need_initial_events {
gst::debug!(CAT, obj = self.elem, "Pushing initial events");
if self.need_initial_events {
gst::debug!(CAT, obj = self.elem, "Pushing initial events");
let stream_id =
format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
self.elem.imp().src_pad.push_event(stream_start_evt).await;
}
if self.negotiate().await?.has_changed() {
let bytes_per_buffer = (self.rate as u64)
* self.buffer_duration.mseconds()
* self.channels as u64
* size_of::<i16>() as u64
/ 1_000;
let mut pool_config = self.buffer_pool.config();
pool_config
.as_mut()
.set_params(Some(&self.caps), bytes_per_buffer as u32, 2, 6);
self.buffer_pool.set_config(pool_config).unwrap();
}
assert!(!self.caps.is_empty());
self.buffer_pool.set_active(true).unwrap();
if self.need_initial_events {
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
self.elem.imp().src_pad.push_event(segment_evt).await;
self.need_initial_events = false;
}
self.buffer_count = 0;
#[cfg(feature = "tuning")]
if self.is_main_elem {
self.parked_duration_init = None;
}
Ok(())
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
.group_id(gst::GroupId::next())
.build();
self.elem.imp().src_pad.push_event(stream_start_evt).await;
}
.boxed()
if self.negotiate().await?.has_changed() {
let bytes_per_buffer = (self.rate as u64)
* self.buffer_duration.mseconds()
* self.channels as u64
* size_of::<i16>() as u64
/ 1_000;
let mut pool_config = self.buffer_pool.config();
pool_config
.as_mut()
.set_params(Some(&self.caps), bytes_per_buffer as u32, 2, 6);
self.buffer_pool.set_config(pool_config).unwrap();
}
assert!(!self.caps.is_empty());
self.buffer_pool.set_active(true).unwrap();
if self.need_initial_events {
let segment_evt =
gst::event::Segment::new(&gst::FormattedSegment::<gst::format::Time>::new());
self.elem.imp().src_pad.push_event(segment_evt).await;
self.need_initial_events = false;
}
self.buffer_count = 0;
#[cfg(feature = "tuning")]
if self.is_main_elem {
self.parked_duration_init = None;
}
Ok(())
}
fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.elem, "Pausing Task");
self.buffer_pool.set_active(false).unwrap();
future::ok(()).boxed()
Ok(())
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.elem, "Stopping Task");
self.need_initial_events = true;
self.accumulator = 0.0;
self.last_buffer_end = None;
future::ok(()).boxed()
Ok(())
}
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async fn try_next(&mut self) -> Result<gst::Buffer, gst::FlowError> {
let mut buffer = match self.buffer_pool.acquire_buffer(None) {
Ok(buffer) => buffer,
Err(err) => {
gst::error!(CAT, obj = self.elem, "Failed to acquire buffer {}", err);
return future::err(err).boxed();
return Err(err);
}
};
@ -375,101 +368,91 @@ impl TaskImpl for AudioTestSrcTask {
self.last_buffer_end = start.opt_add(self.buffer_duration);
async move {
if self.is_live {
if let Some(delay) = self
.last_buffer_end
.unwrap()
.checked_sub(self.elem.current_running_time().unwrap())
{
// Wait for all samples to fit in last time slice
timer::delay_for_at_least(delay.into()).await;
}
} else {
// Let the scheduler share time with other tasks
runtime::executor::yield_now().await;
if self.is_live {
if let Some(delay) = self
.last_buffer_end
.unwrap()
.checked_sub(self.elem.current_running_time().unwrap())
{
// Wait for all samples to fit in last time slice
timer::delay_for_at_least(delay.into()).await;
}
Ok(buffer)
} else {
// Let the scheduler share time with other tasks
runtime::executor::yield_now().await;
}
.boxed()
Ok(buffer)
}
fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let imp = self.elem.imp();
async fn handle_item(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
let imp = self.elem.imp();
gst::debug!(CAT, imp = imp, "Pushing {buffer:?}");
imp.src_pad.push(buffer).await?;
gst::log!(CAT, imp = imp, "Successfully pushed buffer");
gst::debug!(CAT, imp = imp, "Pushing {buffer:?}");
imp.src_pad.push(buffer).await?;
gst::log!(CAT, imp = imp, "Successfully pushed buffer");
self.buffer_count += 1;
self.buffer_count += 1;
#[cfg(feature = "tuning")]
if self.is_main_elem {
if let Some(parked_duration_init) = self.parked_duration_init {
if self.buffer_count % LOG_BUFFER_INTERVAL == 0 {
let parked_duration =
runtime::Context::current().unwrap().parked_duration()
- parked_duration_init;
#[cfg(feature = "tuning")]
if self.is_main_elem {
if let Some(parked_duration_init) = self.parked_duration_init {
if self.buffer_count % LOG_BUFFER_INTERVAL == 0 {
let parked_duration = runtime::Context::current().unwrap().parked_duration()
- parked_duration_init;
gst::info!(
CAT,
"Parked: {:5.2?}%",
parked_duration.as_nanos() as f32 * 100.0
/ self.log_start.elapsed().as_nanos() as f32,
);
}
} else if self.buffer_count == RAMPUP_BUFFER_COUNT {
self.parked_duration_init =
Some(runtime::Context::current().unwrap().parked_duration());
self.log_start = Instant::now();
gst::info!(CAT, "Ramp up complete");
}
}
if self.num_buffers.opt_eq(self.buffer_count) == Some(true) {
return Err(gst::FlowError::Eos);
}
Ok(())
}
.boxed()
}
fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, task::Trigger> {
async move {
match err {
gst::FlowError::Flushing => {
gst::debug!(CAT, obj = self.elem, "Flushing");
task::Trigger::FlushStart
}
gst::FlowError::Eos => {
gst::debug!(CAT, obj = self.elem, "EOS");
self.elem
.imp()
.src_pad
.push_event(gst::event::Eos::new())
.await;
task::Trigger::Stop
}
err => {
gst::error!(CAT, obj = self.elem, "Got error {err}");
gst::element_error!(
&self.elem,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
gst::info!(
CAT,
"Parked: {:5.2?}%",
parked_duration.as_nanos() as f32 * 100.0
/ self.log_start.elapsed().as_nanos() as f32,
);
task::Trigger::Error
}
} else if self.buffer_count == RAMPUP_BUFFER_COUNT {
self.parked_duration_init =
Some(runtime::Context::current().unwrap().parked_duration());
self.log_start = Instant::now();
gst::info!(CAT, "Ramp up complete");
}
}
if self.num_buffers.opt_eq(self.buffer_count) == Some(true) {
return Err(gst::FlowError::Eos);
}
Ok(())
}
async fn handle_loop_error(&mut self, err: gst::FlowError) -> task::Trigger {
match err {
gst::FlowError::Flushing => {
gst::debug!(CAT, obj = self.elem, "Flushing");
task::Trigger::FlushStart
}
gst::FlowError::Eos => {
gst::debug!(CAT, obj = self.elem, "EOS");
self.elem
.imp()
.src_pad
.push_event(gst::event::Eos::new())
.await;
task::Trigger::Stop
}
err => {
gst::error!(CAT, obj = self.elem, "Got error {err}");
gst::element_error!(
&self.elem,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
task::Trigger::Error
}
}
.boxed()
}
}

View file

@ -17,7 +17,6 @@
//
// SPDX-License-Identifier: LGPL-2.1-or-later
use futures::future::BoxFuture;
use futures::future::{abortable, AbortHandle};
use futures::prelude::*;
@ -156,61 +155,55 @@ impl InputSelectorPadSinkHandler {
impl PadSinkHandler for InputSelectorPadSinkHandler {
type ElementImpl = InputSelector;
fn sink_chain(
async fn sink_chain(
self,
pad: gst::Pad,
elem: super::InputSelector,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move { self.handle_item(&pad, &elem, buffer).await }.boxed()
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.handle_item(&pad, &elem, buffer).await
}
fn sink_chain_list(
async fn sink_chain_list(
self,
pad: gst::Pad,
elem: super::InputSelector,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
gst::log!(CAT, obj = pad, "Handling buffer list {:?}", list);
// TODO: Ideally we would keep the list intact and forward it in one go
for buffer in list.iter_owned() {
self.handle_item(&pad, &elem, buffer).await?;
}
Ok(gst::FlowSuccess::Ok)
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj = pad, "Handling buffer list {:?}", list);
// TODO: Ideally we would keep the list intact and forward it in one go
for buffer in list.iter_owned() {
self.handle_item(&pad, &elem, buffer).await?;
}
.boxed()
Ok(gst::FlowSuccess::Ok)
}
fn sink_event_serialized(
async fn sink_event_serialized(
self,
_pad: gst::Pad,
_elem: super::InputSelector,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
let mut inner = self.0.lock().unwrap();
) -> bool {
let mut inner = self.0.lock().unwrap();
// Remember the segment for later use
if let gst::EventView::Segment(e) = event.view() {
inner.segment = Some(e.segment().clone());
}
// We sent sticky events together with the next buffer once it becomes
// the active pad.
//
// TODO: Other serialized events for the active pad can also be forwarded
// here, and sticky events could be forwarded directly. Needs forwarding of
// all other sticky events first!
if event.is_sticky() {
inner.send_sticky = true;
true
} else {
true
}
// Remember the segment for later use
if let gst::EventView::Segment(e) = event.view() {
inner.segment = Some(e.segment().clone());
}
// We sent sticky events together with the next buffer once it becomes
// the active pad.
//
// TODO: Other serialized events for the active pad can also be forwarded
// here, and sticky events could be forwarded directly. Needs forwarding of
// all other sticky events first!
if event.is_sticky() {
inner.send_sticky = true;
true
} else {
true
}
.boxed()
}
fn sink_event(self, _pad: &gst::Pad, imp: &InputSelector, event: gst::Event) -> bool {

View file

@ -17,7 +17,6 @@
//
// SPDX-License-Identifier: LGPL-2.1-or-later
use futures::future::BoxFuture;
use futures::future::{abortable, AbortHandle, Aborted};
use futures::prelude::*;
@ -546,17 +545,14 @@ impl SinkHandler {
impl PadSinkHandler for SinkHandler {
type ElementImpl = JitterBuffer;
fn sink_chain(
async fn sink_chain(
self,
pad: gst::Pad,
elem: super::JitterBuffer,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
gst::debug!(CAT, obj = pad, "Handling {:?}", buffer);
self.enqueue_item(pad, elem.imp(), Some(buffer))
}
.boxed()
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::debug!(CAT, obj = pad, "Handling {:?}", buffer);
self.enqueue_item(pad, elem.imp(), Some(buffer))
}
fn sink_event(self, pad: &gst::Pad, jb: &JitterBuffer, event: gst::Event) -> bool {
@ -581,56 +577,53 @@ impl PadSinkHandler for SinkHandler {
jb.src_pad.gst_pad().push_event(event)
}
fn sink_event_serialized(
async fn sink_event_serialized(
self,
pad: gst::Pad,
elem: super::JitterBuffer,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
gst::log!(CAT, obj = pad, "Handling {:?}", event);
) -> bool {
gst::log!(CAT, obj = pad, "Handling {:?}", event);
let jb = elem.imp();
let jb = elem.imp();
let mut forward = true;
use gst::EventView;
match event.view() {
EventView::Segment(e) => {
let mut state = jb.state.lock().unwrap();
state.segment = e.segment().clone().downcast::<gst::format::Time>().unwrap();
}
EventView::FlushStop(..) => {
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj = pad, "FlushStop failed {:?}", err);
gst::element_error!(
elem,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStop failed {:?}", err]
);
return false;
}
}
EventView::Eos(..) => {
let mut state = jb.state.lock().unwrap();
state.eos = true;
if let Some((_, abort_handle)) = state.wait_handle.take() {
abort_handle.abort();
}
forward = false;
}
_ => (),
};
if forward {
// FIXME: These events should really be queued up and stay in order
gst::log!(CAT, obj = pad, "Forwarding serialized {:?}", event);
jb.src_pad.push_event(event).await
} else {
true
let mut forward = true;
use gst::EventView;
match event.view() {
EventView::Segment(e) => {
let mut state = jb.state.lock().unwrap();
state.segment = e.segment().clone().downcast::<gst::format::Time>().unwrap();
}
EventView::FlushStop(..) => {
if let Err(err) = jb.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj = pad, "FlushStop failed {:?}", err);
gst::element_error!(
elem,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStop failed {:?}", err]
);
return false;
}
}
EventView::Eos(..) => {
let mut state = jb.state.lock().unwrap();
state.eos = true;
if let Some((_, abort_handle)) = state.wait_handle.take() {
abort_handle.abort();
}
forward = false;
}
_ => (),
};
if forward {
// FIXME: These events should really be queued up and stay in order
gst::log!(CAT, obj = pad, "Forwarding serialized {:?}", event);
jb.src_pad.push_event(event).await
} else {
true
}
.boxed()
}
}
@ -1105,25 +1098,22 @@ impl JitterBufferTask {
impl TaskImpl for JitterBufferTask {
type Item = ();
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj = self.element, "Starting task");
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Starting task");
self.src_pad_handler.clear();
self.sink_pad_handler.clear();
self.src_pad_handler.clear();
self.sink_pad_handler.clear();
let jb = self.element.imp();
let jb = self.element.imp();
let latency = jb.settings.lock().unwrap().latency;
let state = State::default();
let latency = jb.settings.lock().unwrap().latency;
let state = State::default();
state.jbuf.set_delay(latency);
*jb.state.lock().unwrap() = state;
state.jbuf.set_delay(latency);
*jb.state.lock().unwrap() = state;
gst::log!(CAT, obj = self.element, "Task started");
Ok(())
}
.boxed()
gst::log!(CAT, obj = self.element, "Task started");
Ok(())
}
// FIXME this function was migrated to the try_next / handle_item model
@ -1136,67 +1126,233 @@ impl TaskImpl for JitterBufferTask {
// If latency can change during processing, a command based mechanism
// could be implemented. See the command implementation for ts-udpsink as
// an example.
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let jb = self.element.imp();
let (latency, context_wait, do_lost, max_dropout_time) = {
let settings = jb.settings.lock().unwrap();
(
settings.latency,
settings.context_wait,
settings.do_lost,
gst::ClockTime::from_mseconds(settings.max_dropout_time as u64),
)
};
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
let jb = self.element.imp();
let (latency, context_wait, do_lost, max_dropout_time) = {
let settings = jb.settings.lock().unwrap();
(
settings.latency,
settings.context_wait,
settings.do_lost,
gst::ClockTime::from_mseconds(settings.max_dropout_time as u64),
)
};
loop {
let delay_fut = {
let mut state = jb.state.lock().unwrap();
let (_, next_wakeup) = self.src_pad_handler.next_wakeup(
&self.element,
&state,
do_lost,
latency,
context_wait,
max_dropout_time,
);
loop {
let delay_fut = {
let mut state = jb.state.lock().unwrap();
let (_, next_wakeup) = self.src_pad_handler.next_wakeup(
&self.element,
&state,
do_lost,
latency,
context_wait,
max_dropout_time,
);
let (delay_fut, abort_handle) = match next_wakeup {
Some((_, delay)) if delay.is_zero() => (None, None),
_ => {
let (delay_fut, abort_handle) = abortable(async move {
match next_wakeup {
Some((_, delay)) => {
runtime::timer::delay_for_at_least(delay).await;
}
None => {
future::pending::<()>().await;
}
};
});
let (delay_fut, abort_handle) = match next_wakeup {
Some((_, delay)) if delay.is_zero() => (None, None),
_ => {
let (delay_fut, abort_handle) = abortable(async move {
match next_wakeup {
Some((_, delay)) => {
runtime::timer::delay_for_at_least(delay).await;
}
None => {
future::pending::<()>().await;
}
};
});
let next_wakeup = next_wakeup.and_then(|w| w.0);
(Some(delay_fut), Some((next_wakeup, abort_handle)))
}
};
state.wait_handle = abort_handle;
delay_fut
let next_wakeup = next_wakeup.and_then(|w| w.0);
(Some(delay_fut), Some((next_wakeup, abort_handle)))
}
};
// Got aborted, reschedule if needed
if let Some(delay_fut) = delay_fut {
gst::debug!(CAT, obj = self.element, "Waiting");
if let Err(Aborted) = delay_fut.await {
gst::debug!(CAT, obj = self.element, "Waiting aborted");
state.wait_handle = abort_handle;
delay_fut
};
// Got aborted, reschedule if needed
if let Some(delay_fut) = delay_fut {
gst::debug!(CAT, obj = self.element, "Waiting");
if let Err(Aborted) = delay_fut.await {
gst::debug!(CAT, obj = self.element, "Waiting aborted");
return Ok(());
}
}
let (head_pts, head_seq, lost_events) = {
let mut state = jb.state.lock().unwrap();
//
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) = self.src_pad_handler.next_wakeup(
&self.element,
&state,
do_lost,
latency,
context_wait,
max_dropout_time,
);
gst::debug!(
CAT,
obj = self.element,
"Woke up at {}, earliest_pts {}",
now.display(),
state.earliest_pts.display()
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup.opt_gt(now).unwrap_or(false) {
// Reschedule and wait a bit longer in the next iteration
return Ok(());
}
} else {
return Ok(());
}
let (head_pts, head_seq) = state.jbuf.peek();
let mut events = vec![];
// We may have woken up in order to push lost events on time
// (see next_packet_wakeup())
if do_lost && state.equidistant > 3 && !state.packet_spacing.is_zero() {
loop {
// Make sure we don't push longer than max_dropout_time
// consecutive PacketLost events
let dropout_time = state
.last_popped_pts
.opt_saturating_sub(state.last_popped_buffer_pts);
if dropout_time.opt_gt(max_dropout_time).unwrap_or(false) {
break;
}
if let Some((lost_seq, lost_pts)) =
state.last_popped_seqnum.and_then(|last| {
if let Some(last_popped_pts) = state.last_popped_pts {
let next = last.wrapping_add(1);
if (last_popped_pts + latency - context_wait / 2)
.opt_lt(now)
.unwrap_or(false)
{
if let Some(earliest) = state.earliest_seqnum {
if next != earliest {
Some((next, last_popped_pts + state.packet_spacing))
} else {
None
}
} else {
Some((next, last_popped_pts + state.packet_spacing))
}
} else {
None
}
} else {
None
}
})
{
if (lost_pts + latency).opt_lt(now).unwrap_or(false) {
/* We woke up to push the next lost event exactly on time, yet
* clearly we are now too late to do so. This may have happened
* because of a seqnum jump on the input side or some other
* condition, but in any case we want to let the regular
* generate_lost_events method take over, with its lost events
* aggregation logic.
*/
break;
}
if lost_pts.opt_gt(state.earliest_pts).unwrap_or(false) {
/* Don't let our logic carry us too far in the future */
break;
}
let s = gst::Structure::builder("GstRTPPacketLost")
.field("seqnum", lost_seq as u32)
.field("timestamp", lost_pts)
.field("duration", state.packet_spacing)
.field("retry", 0)
.build();
events.push(gst::event::CustomDownstream::new(s));
state.stats.num_lost += 1;
state.last_popped_pts = Some(lost_pts);
state.last_popped_seqnum = Some(lost_seq);
} else {
break;
}
}
}
let (head_pts, head_seq, lost_events) = {
let mut state = jb.state.lock().unwrap();
//
(head_pts, head_seq, events)
};
{
// Push any lost events we may have woken up to push on schedule
for event in lost_events {
gst::debug!(
CAT,
obj = jb.src_pad.gst_pad(),
"Pushing lost event {:?}",
event
);
let _ = jb.src_pad.push_event(event).await;
}
let state = jb.state.lock().unwrap();
//
// Now recheck earliest PTS as we have just retaken the lock and may
// have advanced last_popped_* fields
let (now, next_wakeup) = self.src_pad_handler.next_wakeup(
&self.element,
&state,
do_lost,
latency,
context_wait,
max_dropout_time,
);
gst::debug!(
CAT,
obj = &self.element,
"Woke up at {}, earliest_pts {}",
now.display(),
state.earliest_pts.display()
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup.opt_gt(now).unwrap_or(false) {
// Reschedule and wait a bit longer in the next iteration
return Ok(());
}
} else {
return Ok(());
}
}
let res = self.src_pad_handler.pop_and_push(&self.element).await;
{
let mut state = jb.state.lock().unwrap();
state.last_res = res;
if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum {
if state.jbuf.num_packets() > 0 {
let (earliest_pts, earliest_seqnum) = state.jbuf.find_earliest();
state.earliest_pts = earliest_pts;
state.earliest_seqnum = earliest_seqnum;
} else {
state.earliest_pts = None;
state.earliest_seqnum = None;
}
}
if res.is_ok() {
// Return and reschedule if the next packet would be in the future
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) = self.src_pad_handler.next_wakeup(
&self.element,
@ -1206,139 +1362,8 @@ impl TaskImpl for JitterBufferTask {
context_wait,
max_dropout_time,
);
gst::debug!(
CAT,
obj = self.element,
"Woke up at {}, earliest_pts {}",
now.display(),
state.earliest_pts.display()
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup.opt_gt(now).unwrap_or(false) {
// Reschedule and wait a bit longer in the next iteration
return Ok(());
}
} else {
return Ok(());
}
let (head_pts, head_seq) = state.jbuf.peek();
let mut events = vec![];
// We may have woken up in order to push lost events on time
// (see next_packet_wakeup())
if do_lost && state.equidistant > 3 && !state.packet_spacing.is_zero() {
loop {
// Make sure we don't push longer than max_dropout_time
// consecutive PacketLost events
let dropout_time = state
.last_popped_pts
.opt_saturating_sub(state.last_popped_buffer_pts);
if dropout_time.opt_gt(max_dropout_time).unwrap_or(false) {
break;
}
if let Some((lost_seq, lost_pts)) =
state.last_popped_seqnum.and_then(|last| {
if let Some(last_popped_pts) = state.last_popped_pts {
let next = last.wrapping_add(1);
if (last_popped_pts + latency - context_wait / 2)
.opt_lt(now)
.unwrap_or(false)
{
if let Some(earliest) = state.earliest_seqnum {
if next != earliest {
Some((
next,
last_popped_pts + state.packet_spacing,
))
} else {
None
}
} else {
Some((next, last_popped_pts + state.packet_spacing))
}
} else {
None
}
} else {
None
}
})
{
if (lost_pts + latency).opt_lt(now).unwrap_or(false) {
/* We woke up to push the next lost event exactly on time, yet
* clearly we are now too late to do so. This may have happened
* because of a seqnum jump on the input side or some other
* condition, but in any case we want to let the regular
* generate_lost_events method take over, with its lost events
* aggregation logic.
*/
break;
}
if lost_pts.opt_gt(state.earliest_pts).unwrap_or(false) {
/* Don't let our logic carry us too far in the future */
break;
}
let s = gst::Structure::builder("GstRTPPacketLost")
.field("seqnum", lost_seq as u32)
.field("timestamp", lost_pts)
.field("duration", state.packet_spacing)
.field("retry", 0)
.build();
events.push(gst::event::CustomDownstream::new(s));
state.stats.num_lost += 1;
state.last_popped_pts = Some(lost_pts);
state.last_popped_seqnum = Some(lost_seq);
} else {
break;
}
}
}
(head_pts, head_seq, events)
};
{
// Push any lost events we may have woken up to push on schedule
for event in lost_events {
gst::debug!(
CAT,
obj = jb.src_pad.gst_pad(),
"Pushing lost event {:?}",
event
);
let _ = jb.src_pad.push_event(event).await;
}
let state = jb.state.lock().unwrap();
//
// Now recheck earliest PTS as we have just retaken the lock and may
// have advanced last_popped_* fields
let (now, next_wakeup) = self.src_pad_handler.next_wakeup(
&self.element,
&state,
do_lost,
latency,
context_wait,
max_dropout_time,
);
gst::debug!(
CAT,
obj = &self.element,
"Woke up at {}, earliest_pts {}",
now.display(),
state.earliest_pts.display()
);
if let Some((next_wakeup, _)) = next_wakeup {
if next_wakeup.opt_gt(now).unwrap_or(false) {
if let Some((Some(next_wakeup), _)) = next_wakeup {
if now.is_some_and(|now| next_wakeup > now) {
// Reschedule and wait a bit longer in the next iteration
return Ok(());
}
@ -1346,90 +1371,46 @@ impl TaskImpl for JitterBufferTask {
return Ok(());
}
}
let res = self.src_pad_handler.pop_and_push(&self.element).await;
{
let mut state = jb.state.lock().unwrap();
state.last_res = res;
if head_pts == state.earliest_pts && head_seq == state.earliest_seqnum {
if state.jbuf.num_packets() > 0 {
let (earliest_pts, earliest_seqnum) = state.jbuf.find_earliest();
state.earliest_pts = earliest_pts;
state.earliest_seqnum = earliest_seqnum;
} else {
state.earliest_pts = None;
state.earliest_seqnum = None;
}
}
if res.is_ok() {
// Return and reschedule if the next packet would be in the future
// Check earliest PTS as we have just taken the lock
let (now, next_wakeup) = self.src_pad_handler.next_wakeup(
&self.element,
&state,
do_lost,
latency,
context_wait,
max_dropout_time,
);
if let Some((Some(next_wakeup), _)) = next_wakeup {
if now.is_some_and(|now| next_wakeup > now) {
// Reschedule and wait a bit longer in the next iteration
return Ok(());
}
} else {
return Ok(());
}
}
}
if let Err(err) = res {
match err {
gst::FlowError::Eos => {
gst::debug!(CAT, obj = self.element, "Pushing EOS event");
let _ = jb.src_pad.push_event(gst::event::Eos::new()).await;
}
gst::FlowError::Flushing => {
gst::debug!(CAT, obj = self.element, "Flushing")
}
err => gst::error!(CAT, obj = self.element, "Error {}", err),
}
return Err(err);
}
}
}
.boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::ok(()).boxed()
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj = self.element, "Stopping task");
let jb = self.element.imp();
let mut jb_state = jb.state.lock().unwrap();
if let Some((_, abort_handle)) = jb_state.wait_handle.take() {
abort_handle.abort();
}
self.src_pad_handler.clear();
self.sink_pad_handler.clear();
if let Err(err) = res {
match err {
gst::FlowError::Eos => {
gst::debug!(CAT, obj = self.element, "Pushing EOS event");
let _ = jb.src_pad.push_event(gst::event::Eos::new()).await;
}
gst::FlowError::Flushing => {
gst::debug!(CAT, obj = self.element, "Flushing")
}
err => gst::error!(CAT, obj = self.element, "Error {}", err),
}
*jb_state = State::default();
gst::log!(CAT, obj = self.element, "Task stopped");
Ok(())
return Err(err);
}
}
.boxed()
}
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
Ok(())
}
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Stopping task");
let jb = self.element.imp();
let mut jb_state = jb.state.lock().unwrap();
if let Some((_, abort_handle)) = jb_state.wait_handle.take() {
abort_handle.abort();
}
self.src_pad_handler.clear();
self.sink_pad_handler.clear();
*jb_state = State::default();
gst::log!(CAT, obj = self.element, "Task stopped");
Ok(())
}
}

View file

@ -18,8 +18,6 @@
// SPDX-License-Identifier: LGPL-2.1-or-later
use futures::channel::oneshot;
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
use gst::prelude::*;
@ -210,32 +208,26 @@ struct ProxySinkPadHandler;
impl PadSinkHandler for ProxySinkPadHandler {
type ElementImpl = ProxySink;
fn sink_chain(
async fn sink_chain(
self,
pad: gst::Pad,
elem: super::ProxySink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
gst::log!(SINK_CAT, obj = pad, "Handling {:?}", buffer);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
.boxed()
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(SINK_CAT, obj = pad, "Handling {:?}", buffer);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
fn sink_chain_list(
async fn sink_chain_list(
self,
pad: gst::Pad,
elem: super::ProxySink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
gst::log!(SINK_CAT, obj = pad, "Handling {:?}", list);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::BufferList(list)).await
}
.boxed()
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(SINK_CAT, obj = pad, "Handling {:?}", list);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::BufferList(list)).await
}
fn sink_event(self, pad: &gst::Pad, imp: &ProxySink, event: gst::Event) -> bool {
@ -270,30 +262,27 @@ impl PadSinkHandler for ProxySinkPadHandler {
}
}
fn sink_event_serialized(
async fn sink_event_serialized(
self,
pad: gst::Pad,
elem: super::ProxySink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
gst::log!(SINK_CAT, obj = pad, "Handling serialized {:?}", event);
) -> bool {
gst::log!(SINK_CAT, obj = pad, "Handling serialized {:?}", event);
let imp = elem.imp();
let imp = elem.imp();
use gst::EventView;
match event.view() {
EventView::Eos(..) => {
let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build());
}
EventView::FlushStop(..) => imp.start(),
_ => (),
use gst::EventView;
match event.view() {
EventView::Eos(..) => {
let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build());
}
gst::log!(SINK_CAT, obj = pad, "Queuing serialized {:?}", event);
imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
EventView::FlushStop(..) => imp.start(),
_ => (),
}
.boxed()
gst::log!(SINK_CAT, obj = pad, "Queuing serialized {:?}", event);
imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
}
}
@ -802,119 +791,104 @@ impl ProxySrcTask {
impl TaskImpl for ProxySrcTask {
type Item = DataQueueItem;
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(SRC_CAT, obj = self.element, "Starting task");
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(SRC_CAT, obj = self.element, "Starting task");
let proxysrc = self.element.imp();
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
let proxysrc = self.element.imp();
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() {
pending_queue.notify_more_queue_space();
if let Some(pending_queue) = shared_ctx.pending_queue.as_mut() {
pending_queue.notify_more_queue_space();
}
self.dataqueue.start();
gst::log!(SRC_CAT, obj = self.element, "Task started");
Ok(())
}
async fn try_next(&mut self) -> Result<DataQueueItem, gst::FlowError> {
self.dataqueue
.next()
.await
.ok_or_else(|| panic!("DataQueue stopped while Task is Started"))
}
async fn handle_item(&mut self, item: DataQueueItem) -> Result<(), gst::FlowError> {
let res = self.push_item(item).await;
let proxysrc = self.element.imp();
match res {
Ok(()) => {
gst::log!(SRC_CAT, obj = self.element, "Successfully pushed item");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
}
self.dataqueue.start();
gst::log!(SRC_CAT, obj = self.element, "Task started");
Ok(())
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<DataQueueItem, gst::FlowError>> {
async move {
self.dataqueue
.next()
.await
.ok_or_else(|| panic!("DataQueue stopped while Task is Started"))
}
.boxed()
}
fn handle_item(&mut self, item: DataQueueItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let res = self.push_item(item).await;
let proxysrc = self.element.imp();
match res {
Ok(()) => {
gst::log!(SRC_CAT, obj = self.element, "Successfully pushed item");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
}
Err(gst::FlowError::Flushing) => {
gst::debug!(SRC_CAT, obj = self.element, "Flushing");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
}
Err(gst::FlowError::Eos) => {
gst::debug!(SRC_CAT, obj = self.element, "EOS");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Eos);
}
Err(err) => {
gst::error!(SRC_CAT, obj = self.element, "Got error {}", err);
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(err);
}
Err(gst::FlowError::Flushing) => {
gst::debug!(SRC_CAT, obj = self.element, "Flushing");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
}
res
}
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(SRC_CAT, obj = self.element, "Stopping task");
let proxysrc = self.element.imp();
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
self.dataqueue.clear();
self.dataqueue.stop();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
if let Some(mut pending_queue) = shared_ctx.pending_queue.take() {
pending_queue.notify_more_queue_space();
Err(gst::FlowError::Eos) => {
gst::debug!(SRC_CAT, obj = self.element, "EOS");
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(gst::FlowError::Eos);
}
Err(err) => {
gst::error!(SRC_CAT, obj = self.element, "Got error {}", err);
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
shared_ctx.last_res = Err(err);
}
gst::log!(SRC_CAT, obj = self.element, "Task stopped");
Ok(())
}
.boxed()
res
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(SRC_CAT, obj = self.element, "Starting task flush");
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(SRC_CAT, obj = self.element, "Stopping task");
let proxysrc = self.element.imp();
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
let proxysrc = self.element.imp();
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
self.dataqueue.clear();
self.dataqueue.clear();
self.dataqueue.stop();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
shared_ctx.last_res = Err(gst::FlowError::Flushing);
gst::log!(SRC_CAT, obj = self.element, "Task flush started");
Ok(())
if let Some(mut pending_queue) = shared_ctx.pending_queue.take() {
pending_queue.notify_more_queue_space();
}
.boxed()
gst::log!(SRC_CAT, obj = self.element, "Task stopped");
Ok(())
}
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(SRC_CAT, obj = self.element, "Starting task flush");
let proxysrc = self.element.imp();
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
self.dataqueue.clear();
shared_ctx.last_res = Err(gst::FlowError::Flushing);
gst::log!(SRC_CAT, obj = self.element, "Task flush started");
Ok(())
}
}

View file

@ -18,8 +18,6 @@
// SPDX-License-Identifier: LGPL-2.1-or-later
use futures::channel::oneshot;
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
use gst::prelude::*;
@ -82,32 +80,26 @@ struct QueuePadSinkHandler;
impl PadSinkHandler for QueuePadSinkHandler {
type ElementImpl = Queue;
fn sink_chain(
async fn sink_chain(
self,
pad: gst::Pad,
elem: super::Queue,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
gst::log!(CAT, obj = pad, "Handling {:?}", buffer);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
.boxed()
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj = pad, "Handling {:?}", buffer);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::Buffer(buffer)).await
}
fn sink_chain_list(
async fn sink_chain_list(
self,
pad: gst::Pad,
elem: super::Queue,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
gst::log!(CAT, obj = pad, "Handling {:?}", list);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::BufferList(list)).await
}
.boxed()
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::log!(CAT, obj = pad, "Handling {:?}", list);
let imp = elem.imp();
imp.enqueue_item(DataQueueItem::BufferList(list)).await
}
fn sink_event(self, pad: &gst::Pad, imp: &Queue, event: gst::Event) -> bool {
@ -130,34 +122,31 @@ impl PadSinkHandler for QueuePadSinkHandler {
imp.src_pad.gst_pad().push_event(event)
}
fn sink_event_serialized(
async fn sink_event_serialized(
self,
pad: gst::Pad,
elem: super::Queue,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
gst::log!(CAT, obj = pad, "Handling serialized {:?}", event);
) -> bool {
gst::log!(CAT, obj = pad, "Handling serialized {:?}", event);
let imp = elem.imp();
let imp = elem.imp();
if let gst::EventView::FlushStop(..) = event.view() {
if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj = pad, "FlushStop failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStop failed {:?}", err]
);
return false;
}
if let gst::EventView::FlushStop(..) = event.view() {
if let Err(err) = imp.task.flush_stop().await_maybe_on_context() {
gst::error!(CAT, obj = pad, "FlushStop failed {:?}", err);
gst::element_imp_error!(
imp,
gst::StreamError::Failed,
("Internal data stream error"),
["FlushStop failed {:?}", err]
);
return false;
}
gst::log!(CAT, obj = pad, "Queuing serialized {:?}", event);
imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
}
.boxed()
gst::log!(CAT, obj = pad, "Queuing serialized {:?}", event);
imp.enqueue_item(DataQueueItem::Event(event)).await.is_ok()
}
fn sink_query(self, pad: &gst::Pad, imp: &Queue, query: &mut gst::QueryRef) -> bool {
@ -276,109 +265,94 @@ impl QueueTask {
impl TaskImpl for QueueTask {
type Item = DataQueueItem;
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj = self.element, "Starting task");
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Starting task");
let queue = self.element.imp();
let mut last_res = queue.last_res.lock().unwrap();
let queue = self.element.imp();
let mut last_res = queue.last_res.lock().unwrap();
self.dataqueue.start();
self.dataqueue.start();
*last_res = Ok(gst::FlowSuccess::Ok);
*last_res = Ok(gst::FlowSuccess::Ok);
gst::log!(CAT, obj = self.element, "Task started");
Ok(())
}
.boxed()
gst::log!(CAT, obj = self.element, "Task started");
Ok(())
}
fn try_next(&mut self) -> BoxFuture<'_, Result<DataQueueItem, gst::FlowError>> {
async move {
self.dataqueue
.next()
.await
.ok_or_else(|| panic!("DataQueue stopped while Task is Started"))
}
.boxed()
async fn try_next(&mut self) -> Result<DataQueueItem, gst::FlowError> {
self.dataqueue
.next()
.await
.ok_or_else(|| panic!("DataQueue stopped while Task is Started"))
}
fn handle_item(&mut self, item: DataQueueItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async {
let res = self.push_item(item).await;
let queue = self.element.imp();
match res {
Ok(()) => {
gst::log!(CAT, obj = self.element, "Successfully pushed item");
*queue.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok);
}
Err(gst::FlowError::Flushing) => {
gst::debug!(CAT, obj = self.element, "Flushing");
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Flushing);
}
Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj = self.element, "EOS");
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Eos);
queue.src_pad.push_event(gst::event::Eos::new()).await;
}
Err(err) => {
gst::error!(CAT, obj = self.element, "Got error {}", err);
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
*queue.last_res.lock().unwrap() = Err(err);
}
async fn handle_item(&mut self, item: DataQueueItem) -> Result<(), gst::FlowError> {
let res = self.push_item(item).await;
let queue = self.element.imp();
match res {
Ok(()) => {
gst::log!(CAT, obj = self.element, "Successfully pushed item");
*queue.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok);
}
Err(gst::FlowError::Flushing) => {
gst::debug!(CAT, obj = self.element, "Flushing");
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Flushing);
}
Err(gst::FlowError::Eos) => {
gst::debug!(CAT, obj = self.element, "EOS");
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Eos);
queue.src_pad.push_event(gst::event::Eos::new()).await;
}
Err(err) => {
gst::error!(CAT, obj = self.element, "Got error {}", err);
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
*queue.last_res.lock().unwrap() = Err(err);
}
res
}
.boxed()
res
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj = self.element, "Stopping task");
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Stopping task");
let queue = self.element.imp();
let mut last_res = queue.last_res.lock().unwrap();
let queue = self.element.imp();
let mut last_res = queue.last_res.lock().unwrap();
self.dataqueue.stop();
self.dataqueue.clear();
self.dataqueue.stop();
self.dataqueue.clear();
if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() {
pending_queue.notify_more_queue_space();
}
*last_res = Err(gst::FlowError::Flushing);
gst::log!(CAT, obj = self.element, "Task stopped");
Ok(())
if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() {
pending_queue.notify_more_queue_space();
}
.boxed()
*last_res = Err(gst::FlowError::Flushing);
gst::log!(CAT, obj = self.element, "Task stopped");
Ok(())
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj = self.element, "Starting task flush");
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Starting task flush");
let queue = self.element.imp();
let mut last_res = queue.last_res.lock().unwrap();
let queue = self.element.imp();
let mut last_res = queue.last_res.lock().unwrap();
self.dataqueue.clear();
self.dataqueue.clear();
if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() {
pending_queue.notify_more_queue_space();
}
*last_res = Err(gst::FlowError::Flushing);
gst::log!(CAT, obj = self.element, "Task flush started");
Ok(())
if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() {
pending_queue.notify_more_queue_space();
}
.boxed()
*last_res = Err(gst::FlowError::Flushing);
gst::log!(CAT, obj = self.element, "Task flush started");
Ok(())
}
}

View file

@ -69,7 +69,6 @@
//! [`Context`]: ../executor/struct.Context.html
use futures::future;
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::prelude::*;
@ -104,11 +103,10 @@ fn event_to_event_full(ret: bool, event_type: gst::EventType) -> Result<FlowSucc
#[inline]
fn event_to_event_full_serialized(
ret: BoxFuture<'static, bool>,
ret: impl Future<Output = bool> + Send,
event_type: gst::EventType,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
) -> impl Future<Output = Result<FlowSuccess, FlowError>> + Send {
ret.map(move |ret| event_ret_to_event_full_res(ret, event_type))
.boxed()
}
/// A trait to define `handler`s for [`PadSrc`] callbacks.
@ -538,8 +536,8 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
_pad: gst::Pad,
_elem: <Self::ElementImpl as ObjectSubclass>::Type,
_buffer: gst::Buffer,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
future::err(FlowError::NotSupported).boxed()
) -> impl Future<Output = Result<FlowSuccess, FlowError>> + Send {
future::err(FlowError::NotSupported)
}
fn sink_chain_list(
@ -547,8 +545,8 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
_pad: gst::Pad,
_elem: <Self::ElementImpl as ObjectSubclass>::Type,
_buffer_list: gst::BufferList,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
future::err(FlowError::NotSupported).boxed()
) -> impl Future<Output = Result<FlowSuccess, FlowError>> + Send {
future::err(FlowError::NotSupported)
}
fn sink_event(self, pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event) -> bool {
@ -562,14 +560,13 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
pad: gst::Pad,
elem: <Self::ElementImpl as ObjectSubclass>::Type,
event: gst::Event,
) -> BoxFuture<'static, bool> {
) -> impl Future<Output = bool> + Send {
assert!(event.is_serialized());
async move {
gst::log!(RUNTIME_CAT, obj = pad, "Handling {:?}", event);
gst::Pad::event_default(&pad, Some(&elem), event)
}
.boxed()
}
fn sink_event_full(
@ -590,7 +587,7 @@ pub trait PadSinkHandler: Clone + Send + Sync + 'static {
pad: gst::Pad,
elem: <Self::ElementImpl as ObjectSubclass>::Type,
event: gst::Event,
) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
) -> impl Future<Output = Result<FlowSuccess, FlowError>> + Send {
assert!(event.is_serialized());
// default is to dispatch to `sink_event`
// (as implemented in `gst_pad_send_event_unchecked`)

View file

@ -22,12 +22,11 @@
use futures::channel::mpsc as async_mpsc;
use futures::channel::oneshot;
use futures::future::{self, BoxFuture};
use futures::prelude::*;
use std::fmt;
use std::ops::Deref;
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::Poll;
@ -319,16 +318,16 @@ impl fmt::Debug for TransitionStatus {
pub trait TaskImpl: Send + 'static {
type Item: Send + 'static;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
fn prepare(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
future::ok(())
}
fn unprepare(&mut self) -> BoxFuture<'_, ()> {
future::ready(()).boxed()
fn unprepare(&mut self) -> impl Future<Output = ()> + Send {
future::ready(())
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
fn start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
future::ok(())
}
/// Tries to retrieve the next item to process.
@ -343,7 +342,7 @@ pub trait TaskImpl: Send + 'static {
/// with said `Item`.
///
/// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`].
fn try_next(&mut self) -> BoxFuture<'_, Result<Self::Item, gst::FlowError>>;
fn try_next(&mut self) -> impl Future<Output = Result<Self::Item, gst::FlowError>> + Send;
/// Does whatever needs to be done with the `item`.
///
@ -355,22 +354,25 @@ pub trait TaskImpl: Send + 'static {
/// to completion even if a state transition is requested.
///
/// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`].
fn handle_item(&mut self, _item: Self::Item) -> BoxFuture<'_, Result<(), gst::FlowError>>;
fn handle_item(
&mut self,
_item: Self::Item,
) -> impl Future<Output = Result<(), gst::FlowError>> + Send;
fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
fn pause(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
future::ok(())
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
fn flush_start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
future::ready(Ok(()))
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
fn flush_stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
future::ready(Ok(()))
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
fn stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
future::ready(Ok(()))
}
/// Handles an error occurring during the execution of the `Task` loop.
@ -387,7 +389,7 @@ pub trait TaskImpl: Send + 'static {
/// - `FlowError::Flushing` -> `Trigger::FlushStart`.
/// - `FlowError::Eos` -> `Trigger::Stop`.
/// - Other `FlowError` -> `Trigger::Error`.
fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, Trigger> {
fn handle_loop_error(&mut self, err: gst::FlowError) -> impl Future<Output = Trigger> + Send {
async move {
match err {
gst::FlowError::Flushing => {
@ -407,7 +409,6 @@ pub trait TaskImpl: Send + 'static {
}
}
}
.boxed()
}
/// Handles an error occurring during the execution of a transition action.
@ -425,7 +426,7 @@ pub trait TaskImpl: Send + 'static {
trigger: Trigger,
state: TaskState,
err: gst::ErrorMessage,
) -> BoxFuture<'_, Trigger> {
) -> impl Future<Output = Trigger> + Send {
async move {
gst::error!(
RUNTIME_CAT,
@ -437,7 +438,6 @@ pub trait TaskImpl: Send + 'static {
Trigger::Error
}
.boxed()
}
}
@ -663,11 +663,7 @@ impl Task {
inner.state = TaskState::Preparing;
gst::log!(RUNTIME_CAT, "Spawning task state machine");
inner.state_machine_handle = Some(StateMachine::spawn(
self.0.clone(),
Box::new(task_impl),
context,
));
inner.state_machine_handle = Some(StateMachine::spawn(self.0.clone(), task_impl, context));
let ack_rx = match inner.trigger(Trigger::Prepare) {
Ok(ack_rx) => ack_rx,
@ -809,8 +805,8 @@ impl Task {
}
}
struct StateMachine<Item: Send + 'static> {
task_impl: Box<dyn TaskImpl<Item = Item>>,
struct StateMachine<Task: TaskImpl> {
task_impl: Task,
triggering_evt_rx: async_mpsc::Receiver<TriggeringEvent>,
pending_triggering_evt: Option<TriggeringEvent>,
}
@ -845,12 +841,10 @@ macro_rules! exec_action {
}};
}
impl<Item: Send + 'static> StateMachine<Item> {
// Use dynamic dispatch for TaskImpl as it reduces memory usage compared to monomorphization
// without inducing any significant performance penalties.
impl<Task: TaskImpl> StateMachine<Task> {
fn spawn(
task_inner: Arc<Mutex<TaskInner>>,
task_impl: Box<dyn TaskImpl<Item = Item>>,
task_impl: Task,
context: Context,
) -> StateMachineHandle {
let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4);
@ -1127,7 +1121,7 @@ impl<Item: Send + 'static> StateMachine<Item> {
// `try_next`. Since we need to get a new `BoxFuture` at
// each iteration, we can guarantee that the future is
// always valid for use in `select_biased`.
let mut try_next_fut = self.task_impl.try_next().fuse();
let mut try_next_fut = pin!(self.task_impl.try_next().fuse());
futures::select_biased! {
triggering_evt = self.triggering_evt_rx.next() => {
let triggering_evt = triggering_evt.expect("broken state machine channel");
@ -1156,8 +1150,8 @@ impl<Item: Send + 'static> StateMachine<Item> {
mod tests {
use futures::channel::{mpsc, oneshot};
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::prelude::*;
use std::future::pending;
use std::time::Duration;
use super::{
@ -1195,73 +1189,52 @@ mod tests {
impl TaskImpl for TaskTest {
type Item = ();
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: prepared");
self.prepared_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "nominal: prepared");
self.prepared_sender.send(()).await.unwrap();
Ok(())
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: started");
self.started_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "nominal: started");
self.started_sender.send(()).await.unwrap();
Ok(())
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: entering try_next");
self.try_next_ready_sender.send(()).await.unwrap();
gst::debug!(RUNTIME_CAT, "nominal: awaiting try_next");
self.try_next_receiver.next().await.unwrap();
Ok(())
}
.boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
gst::debug!(RUNTIME_CAT, "nominal: entering try_next");
self.try_next_ready_sender.send(()).await.unwrap();
gst::debug!(RUNTIME_CAT, "nominal: awaiting try_next");
self.try_next_receiver.next().await.unwrap();
Ok(())
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: entering handle_item");
self.handle_item_ready_sender.send(()).await.unwrap();
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
gst::debug!(RUNTIME_CAT, "nominal: entering handle_item");
self.handle_item_ready_sender.send(()).await.unwrap();
gst::debug!(RUNTIME_CAT, "nominal: locked in handle_item");
self.handle_item_sender.send(()).await.unwrap();
gst::debug!(RUNTIME_CAT, "nominal: leaving handle_item");
gst::debug!(RUNTIME_CAT, "nominal: locked in handle_item");
self.handle_item_sender.send(()).await.unwrap();
gst::debug!(RUNTIME_CAT, "nominal: leaving handle_item");
Ok(())
}
.boxed()
Ok(())
}
fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: paused");
self.paused_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "nominal: paused");
self.paused_sender.send(()).await.unwrap();
Ok(())
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: stopped");
self.stopped_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "nominal: stopped");
self.stopped_sender.send(()).await.unwrap();
Ok(())
}
fn unprepare(&mut self) -> BoxFuture<'_, ()> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: unprepared");
self.unprepared_sender.send(()).await.unwrap();
}
.boxed()
async fn unprepare(&mut self) {
gst::debug!(RUNTIME_CAT, "nominal: unprepared");
self.unprepared_sender.send(()).await.unwrap();
}
}
@ -1488,45 +1461,39 @@ mod tests {
impl TaskImpl for TaskPrepareTest {
type Item = ();
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
Err(gst::error_msg!(
gst::ResourceError::Failed,
["prepare_error: intentional error"]
))
}
.boxed()
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
Err(gst::error_msg!(
gst::ResourceError::Failed,
["prepare_error: intentional error"]
))
}
fn handle_action_error(
async fn handle_action_error(
&mut self,
trigger: Trigger,
state: TaskState,
err: gst::ErrorMessage,
) -> BoxFuture<'_, Trigger> {
async move {
gst::debug!(
RUNTIME_CAT,
"prepare_error: handling prepare error {:?}",
err
);
match (trigger, state) {
(Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
other => unreachable!("{:?}", other),
) -> Trigger {
gst::debug!(
RUNTIME_CAT,
"prepare_error: handling prepare error {:?}",
err
);
match (trigger, state) {
(Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
Trigger::Error
other => unreachable!("{:?}", other),
}
.boxed()
Trigger::Error
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
unreachable!("prepare_error: try_next");
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
unreachable!("prepare_error: handle_item");
}
}
@ -1590,41 +1557,35 @@ mod tests {
impl TaskImpl for TaskPrepareTest {
type Item = ();
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
RUNTIME_CAT,
"prepare_start_ok: preparation awaiting trigger"
);
self.prepare_receiver.next().await.unwrap();
gst::debug!(RUNTIME_CAT, "prepare_start_ok: preparation complete Ok");
Ok(())
}
.boxed()
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(
RUNTIME_CAT,
"prepare_start_ok: preparation awaiting trigger"
);
self.prepare_receiver.next().await.unwrap();
gst::debug!(RUNTIME_CAT, "prepare_start_ok: preparation complete Ok");
Ok(())
}
fn handle_action_error(
async fn handle_action_error(
&mut self,
_trigger: Trigger,
_state: TaskState,
_err: gst::ErrorMessage,
) -> BoxFuture<'_, Trigger> {
) -> Trigger {
unreachable!("prepare_start_ok: handle_prepare_error");
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "prepare_start_ok: started");
Ok(())
}
.boxed()
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "prepare_start_ok: started");
Ok(())
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
unreachable!("prepare_start_ok: handle_item");
}
}
@ -1721,55 +1682,49 @@ mod tests {
impl TaskImpl for TaskPrepareTest {
type Item = ();
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
RUNTIME_CAT,
"prepare_start_error: preparation awaiting trigger"
);
self.prepare_receiver.next().await.unwrap();
gst::debug!(RUNTIME_CAT, "prepare_start_error: preparation complete Err");
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(
RUNTIME_CAT,
"prepare_start_error: preparation awaiting trigger"
);
self.prepare_receiver.next().await.unwrap();
gst::debug!(RUNTIME_CAT, "prepare_start_error: preparation complete Err");
Err(gst::error_msg!(
gst::ResourceError::Failed,
["prepare_start_error: intentional error"]
))
}
.boxed()
Err(gst::error_msg!(
gst::ResourceError::Failed,
["prepare_start_error: intentional error"]
))
}
fn handle_action_error(
async fn handle_action_error(
&mut self,
trigger: Trigger,
state: TaskState,
err: gst::ErrorMessage,
) -> BoxFuture<'_, Trigger> {
async move {
gst::debug!(
RUNTIME_CAT,
"prepare_start_error: handling prepare error {:?}",
err
);
match (trigger, state) {
(Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
other => panic!("action error for {other:?}"),
) -> Trigger {
gst::debug!(
RUNTIME_CAT,
"prepare_start_error: handling prepare error {:?}",
err
);
match (trigger, state) {
(Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
Trigger::Error
other => panic!("action error for {other:?}"),
}
.boxed()
Trigger::Error
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
unreachable!("prepare_start_error: start");
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
unreachable!("prepare_start_error: try_next");
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
unreachable!("prepare_start_error: handle_item");
}
}
@ -1862,23 +1817,14 @@ mod tests {
impl TaskImpl for TaskTest {
type Item = gst::FlowError;
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::FlowError, gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next");
Ok(self.try_next_receiver.next().await.unwrap())
}
.boxed()
async fn try_next(&mut self) -> Result<gst::FlowError, gst::FlowError> {
gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next");
Ok(self.try_next_receiver.next().await.unwrap())
}
fn handle_item(
&mut self,
item: gst::FlowError,
) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "item_error: handle_item received {:?}", item);
Err(item)
}
.boxed()
async fn handle_item(&mut self, item: gst::FlowError) -> Result<(), gst::FlowError> {
gst::debug!(RUNTIME_CAT, "item_error: handle_item received {:?}", item);
Err(item)
}
}
@ -1945,30 +1891,24 @@ mod tests {
impl TaskImpl for TaskFlushTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
unreachable!("flush_regular_sync: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_regular_sync: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "flush_regular_sync: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
}
@ -2032,36 +1972,30 @@ mod tests {
impl TaskImpl for TaskFlushTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
unreachable!("flush_regular_different_context: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
RUNTIME_CAT,
"flush_regular_different_context: started flushing"
);
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(
RUNTIME_CAT,
"flush_regular_different_context: started flushing"
);
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
RUNTIME_CAT,
"flush_regular_different_context: stopped flushing"
);
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(
RUNTIME_CAT,
"flush_regular_different_context: stopped flushing"
);
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
}
@ -2151,30 +2085,24 @@ mod tests {
impl TaskImpl for TaskFlushTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
unreachable!("flush_regular_same_context: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_regular_same_context: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "flush_regular_same_context: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
}
@ -2255,33 +2183,27 @@ mod tests {
impl TaskImpl for TaskFlushTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::ok(()).boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
Ok(())
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from handle_item");
match self.task.flush_start() {
Pending {
trigger: FlushStart,
origin: Started,
..
} => (),
other => panic!("{other:?}"),
}
Ok(())
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from handle_item");
match self.task.flush_start() {
Pending {
trigger: FlushStart,
origin: Started,
..
} => (),
other => panic!("{other:?}"),
}
.boxed()
Ok(())
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_from_loop: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "flush_from_loop: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
}
@ -2332,38 +2254,32 @@ mod tests {
impl TaskImpl for TaskStartTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::ok(()).boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
Ok(())
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item");
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item");
crate::runtime::timer::delay_for(Duration::from_millis(50)).await;
crate::runtime::timer::delay_for(Duration::from_millis(50)).await;
gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item");
match self.task.pause() {
Pending {
trigger: Pause,
origin: Started,
..
} => (),
other => panic!("{other:?}"),
}
Ok(())
gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item");
match self.task.pause() {
Pending {
trigger: Pause,
origin: Started,
..
} => (),
other => panic!("{other:?}"),
}
.boxed()
Ok(())
}
fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_from_loop: entering pause action");
self.pause_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "pause_from_loop: entering pause action");
self.pause_sender.send(()).await.unwrap();
Ok(())
}
}
@ -2403,41 +2319,35 @@ mod tests {
impl TaskImpl for TaskFlushTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
unreachable!("trigger_from_action: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
RUNTIME_CAT,
"trigger_from_action: flush_start triggering flush_stop"
);
match self.task.flush_stop() {
Pending {
trigger: FlushStop,
origin: Started,
..
} => (),
other => panic!("{other:?}"),
}
Ok(())
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(
RUNTIME_CAT,
"trigger_from_action: flush_start triggering flush_stop"
);
match self.task.flush_stop() {
Pending {
trigger: FlushStop,
origin: Started,
..
} => (),
other => panic!("{other:?}"),
}
.boxed()
Ok(())
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "trigger_from_action: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "trigger_from_action: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
}
@ -2481,39 +2391,30 @@ mod tests {
impl TaskImpl for TaskFlushTest {
type Item = ();
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flush_start: started");
self.started_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "pause_flush_start: started");
self.started_sender.send(()).await.unwrap();
Ok(())
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
unreachable!("pause_flush_start: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flush_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "pause_flush_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flush_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "pause_flush_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
}
@ -2597,39 +2498,30 @@ mod tests {
impl TaskImpl for TaskFlushTest {
type Item = ();
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: started");
self.started_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: started");
self.started_sender.send(()).await.unwrap();
Ok(())
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
unreachable!("pause_flushing_start: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
}
@ -2703,30 +2595,24 @@ mod tests {
impl TaskImpl for TaskStartTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
pending().await
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
unreachable!("flush_concurrent_start: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
}
@ -2826,34 +2712,25 @@ mod tests {
impl TaskImpl for TaskTimerTest {
type Item = ();
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50)));
gst::debug!(RUNTIME_CAT, "start_timer: started");
Ok(())
}
.boxed()
async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50)));
gst::debug!(RUNTIME_CAT, "start_timer: started");
Ok(())
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "start_timer: awaiting timer");
self.timer.take().unwrap().await;
Ok(())
}
.boxed()
async fn try_next(&mut self) -> Result<(), gst::FlowError> {
gst::debug!(RUNTIME_CAT, "start_timer: awaiting timer");
self.timer.take().unwrap().await;
Ok(())
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed");
if let Some(timer_elapsed_sender) = self.timer_elapsed_sender.take() {
timer_elapsed_sender.send(()).unwrap();
}
Err(gst::FlowError::Eos)
async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed");
if let Some(timer_elapsed_sender) = self.timer_elapsed_sender.take() {
timer_elapsed_sender.send(()).unwrap();
}
.boxed()
Err(gst::FlowError::Eos)
}
}

View file

@ -18,8 +18,6 @@
//
// SPDX-License-Identifier: LGPL-2.1-or-later
use futures::future::BoxFuture;
use gst::glib;
use gst::prelude::*;
@ -27,6 +25,7 @@ use std::sync::LazyLock;
use std::error;
use std::fmt;
use std::future::Future;
use std::io;
use std::net::UdpSocket;
@ -52,7 +51,7 @@ pub trait SocketRead: Send + Unpin {
fn read<'buf>(
&'buf mut self,
buffer: &'buf mut [u8],
) -> BoxFuture<'buf, io::Result<(usize, Option<std::net::SocketAddr>)>>;
) -> impl Future<Output = io::Result<(usize, Option<std::net::SocketAddr>)>> + Send;
}
pub struct Socket<T: SocketRead> {

View file

@ -18,7 +18,6 @@
//
// SPDX-License-Identifier: LGPL-2.1-or-later
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
@ -87,11 +86,11 @@ impl TcpClientReader {
impl SocketRead for TcpClientReader {
const DO_TIMESTAMP: bool = false;
fn read<'buf>(
async fn read<'buf>(
&'buf mut self,
buffer: &'buf mut [u8],
) -> BoxFuture<'buf, io::Result<(usize, Option<std::net::SocketAddr>)>> {
async move { self.0.read(buffer).await.map(|read_size| (read_size, None)) }.boxed()
) -> io::Result<(usize, Option<std::net::SocketAddr>)> {
Ok((self.0.read(buffer).await?, None))
}
}
@ -270,178 +269,161 @@ impl TcpClientSrcTask {
impl TaskImpl for TcpClientSrcTask {
type Item = gst::Buffer;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(
CAT,
obj = self.element,
"Preparing task connecting to {:?}",
self.saddr
);
async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(
CAT,
obj = self.element,
"Preparing task connecting to {:?}",
self.saddr
);
let socket = Async::<TcpStream>::connect(self.saddr)
.await
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to connect to {:?}: {:?}", self.saddr, err]
)
})?;
self.socket = Some(
Socket::try_new(
self.element.clone().upcast(),
self.buffer_pool.take().unwrap(),
TcpClientReader::new(socket),
let socket = Async::<TcpStream>::connect(self.saddr)
.await
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to connect to {:?}: {:?}", self.saddr, err]
)
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to prepare socket {:?}", err]
)
})?,
);
})?;
gst::log!(CAT, obj = self.element, "Task prepared");
Ok(())
}
.boxed()
self.socket = Some(
Socket::try_new(
self.element.clone().upcast(),
self.buffer_pool.take().unwrap(),
TcpClientReader::new(socket),
)
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::OpenRead,
["Failed to prepare socket {:?}", err]
)
})?,
);
gst::log!(CAT, obj = self.element, "Task prepared");
Ok(())
}
fn handle_action_error(
async fn handle_action_error(
&mut self,
trigger: task::Trigger,
state: TaskState,
err: gst::ErrorMessage,
) -> BoxFuture<'_, task::Trigger> {
async move {
match trigger {
task::Trigger::Prepare => {
gst::error!(CAT, "Task preparation failed: {:?}", err);
self.element.post_error_message(err);
) -> task::Trigger {
match trigger {
task::Trigger::Prepare => {
gst::error!(CAT, "Task preparation failed: {:?}", err);
self.element.post_error_message(err);
task::Trigger::Error
}
other => unreachable!("Action error for {:?} in state {:?}", other, state),
task::Trigger::Error
}
other => unreachable!("Action error for {:?} in state {:?}", other, state),
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
async move {
let event_fut = self.event_receiver.next().fuse();
let socket_fut = self.socket.as_mut().unwrap().try_next().fuse();
async fn try_next(&mut self) -> Result<gst::Buffer, gst::FlowError> {
let event_fut = self.event_receiver.next().fuse();
let socket_fut = self.socket.as_mut().unwrap().try_next().fuse();
pin_mut!(event_fut);
pin_mut!(socket_fut);
pin_mut!(event_fut);
pin_mut!(socket_fut);
futures::select! {
event_res = event_fut => match event_res {
Some(event) => {
gst::debug!(CAT, obj = self.element, "Handling element level event {event:?}");
futures::select! {
event_res = event_fut => match event_res {
Some(event) => {
gst::debug!(CAT, obj = self.element, "Handling element level event {event:?}");
match event.view() {
gst::EventView::Eos(_) => Err(gst::FlowError::Eos),
ev => {
gst::error!(CAT, obj = self.element, "Unexpected event {ev:?} on channel");
Err(gst::FlowError::Error)
}
match event.view() {
gst::EventView::Eos(_) => Err(gst::FlowError::Eos),
ev => {
gst::error!(CAT, obj = self.element, "Unexpected event {ev:?} on channel");
Err(gst::FlowError::Error)
}
}
None => {
gst::error!(CAT, obj = self.element, "Unexpected return on event channel");
Err(gst::FlowError::Error)
}
},
socket_res = socket_fut => match socket_res {
Ok((buffer, _saddr)) => Ok(buffer),
Err(err) => {
gst::error!(CAT, obj = self.element, "Got error {err:#}");
}
None => {
gst::error!(CAT, obj = self.element, "Unexpected return on event channel");
Err(gst::FlowError::Error)
}
},
socket_res = socket_fut => match socket_res {
Ok((buffer, _saddr)) => Ok(buffer),
Err(err) => {
gst::error!(CAT, obj = self.element, "Got error {err:#}");
match err {
SocketError::Gst(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {err}"]
);
}
SocketError::Io(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {err}"]
);
}
match err {
SocketError::Gst(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {err}"]
);
}
SocketError::Io(err) => {
gst::element_error!(
self.element,
gst::StreamError::Failed,
("I/O error"),
["streaming stopped, I/O error {err}"]
);
}
Err(gst::FlowError::Error)
}
},
Err(gst::FlowError::Error)
}
},
}
}
async fn handle_item(&mut self, buffer: gst::Buffer) -> Result<(), gst::FlowError> {
let _ = self.push_buffer(buffer).await?;
Ok(())
}
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Stopping task");
self.need_initial_events = true;
gst::log!(CAT, obj = self.element, "Task stopped");
Ok(())
}
async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(CAT, obj = self.element, "Stopping task flush");
self.need_initial_events = true;
gst::log!(CAT, obj = self.element, "Task flush stopped");
Ok(())
}
async fn handle_loop_error(&mut self, err: gst::FlowError) -> task::Trigger {
match err {
gst::FlowError::Flushing => {
gst::debug!(CAT, obj = self.element, "Flushing");
task::Trigger::FlushStart
}
gst::FlowError::Eos => {
gst::debug!(CAT, obj = self.element, "EOS");
self.element
.imp()
.src_pad
.push_event(gst::event::Eos::new())
.await;
task::Trigger::Stop
}
err => {
gst::error!(CAT, obj = self.element, "Got error {err}");
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
task::Trigger::Error
}
}
.boxed()
}
fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
self.push_buffer(buffer).map_ok(drop).boxed()
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj = self.element, "Stopping task");
self.need_initial_events = true;
gst::log!(CAT, obj = self.element, "Task stopped");
Ok(())
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(CAT, obj = self.element, "Stopping task flush");
self.need_initial_events = true;
gst::log!(CAT, obj = self.element, "Task flush stopped");
Ok(())
}
.boxed()
}
fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, task::Trigger> {
async move {
match err {
gst::FlowError::Flushing => {
gst::debug!(CAT, obj = self.element, "Flushing");
task::Trigger::FlushStart
}
gst::FlowError::Eos => {
gst::debug!(CAT, obj = self.element, "EOS");
self.element
.imp()
.src_pad
.push_event(gst::event::Eos::new())
.await;
task::Trigger::Stop
}
err => {
gst::error!(CAT, obj = self.element, "Got error {err}");
gst::element_error!(
&self.element,
gst::StreamError::Failed,
("Internal data stream error"),
["streaming stopped, reason {}", err]
);
task::Trigger::Error
}
}
}
.boxed()
}
}

View file

@ -17,9 +17,6 @@
//
// SPDX-License-Identifier: LGPL-2.1-or-later
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
@ -350,60 +347,54 @@ impl UdpSinkPadHandler {
impl PadSinkHandler for UdpSinkPadHandler {
type ElementImpl = UdpSink;
fn sink_chain(
async fn sink_chain(
self,
_pad: gst::Pad,
elem: super::UdpSink,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move { self.0.lock().await.handle_buffer(&elem, buffer).await }.boxed()
) -> Result<gst::FlowSuccess, gst::FlowError> {
self.0.lock().await.handle_buffer(&elem, buffer).await
}
fn sink_chain_list(
async fn sink_chain_list(
self,
_pad: gst::Pad,
elem: super::UdpSink,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
let mut inner = self.0.lock().await;
for buffer in list.iter_owned() {
inner.handle_buffer(&elem, buffer).await?;
}
Ok(gst::FlowSuccess::Ok)
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut inner = self.0.lock().await;
for buffer in list.iter_owned() {
inner.handle_buffer(&elem, buffer).await?;
}
.boxed()
Ok(gst::FlowSuccess::Ok)
}
fn sink_event_serialized(
async fn sink_event_serialized(
self,
_pad: gst::Pad,
elem: super::UdpSink,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
gst::debug!(CAT, obj = elem, "Handling {event:?}");
) -> bool {
gst::debug!(CAT, obj = elem, "Handling {event:?}");
match event.view() {
EventView::Eos(_) => {
let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build());
}
EventView::Segment(e) => {
self.0.lock().await.segment = Some(e.segment().clone());
}
EventView::FlushStop(_) => {
self.0.lock().await.is_flushing = false;
}
EventView::SinkMessage(e) => {
let _ = elem.post_message(e.message());
}
_ => (),
match event.view() {
EventView::Eos(_) => {
let _ = elem.post_message(gst::message::Eos::builder().src(&elem).build());
}
true
EventView::Segment(e) => {
self.0.lock().await.segment = Some(e.segment().clone());
}
EventView::FlushStop(_) => {
self.0.lock().await.is_flushing = false;
}
EventView::SinkMessage(e) => {
let _ = elem.post_message(e.message());
}
_ => (),
}
.boxed()
true
}
fn sink_event(self, _pad: &gst::Pad, imp: &UdpSink, event: gst::Event) -> bool {

File diff suppressed because it is too large Load diff

View file

@ -20,7 +20,6 @@
#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)]
use futures::channel::mpsc;
use futures::future::BoxFuture;
use futures::prelude::*;
use gst::glib;
@ -145,50 +144,38 @@ mod imp_src {
impl TaskImpl for ElementSrcTestTask {
type Item = Item;
fn try_next(&mut self) -> BoxFuture<'_, Result<Item, gst::FlowError>> {
async move {
self.receiver.next().await.ok_or_else(|| {
gst::log!(SRC_CAT, obj = self.element, "SrcPad channel aborted");
gst::FlowError::Eos
})
}
.boxed()
async fn try_next(&mut self) -> Result<Item, gst::FlowError> {
self.receiver.next().await.ok_or_else(|| {
gst::log!(SRC_CAT, obj = self.element, "SrcPad channel aborted");
gst::FlowError::Eos
})
}
fn handle_item(&mut self, item: Item) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let res = self.push_item(item).await.map(drop);
match res {
Ok(_) => gst::log!(SRC_CAT, obj = self.element, "Successfully pushed item"),
Err(gst::FlowError::Flushing) => {
gst::debug!(SRC_CAT, obj = self.element, "Flushing")
}
Err(err) => panic!("Got error {err}"),
async fn handle_item(&mut self, item: Item) -> Result<(), gst::FlowError> {
let res = self.push_item(item).await.map(drop);
match res {
Ok(_) => gst::log!(SRC_CAT, obj = self.element, "Successfully pushed item"),
Err(gst::FlowError::Flushing) => {
gst::debug!(SRC_CAT, obj = self.element, "Flushing")
}
res
Err(err) => panic!("Got error {err}"),
}
.boxed()
res
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(SRC_CAT, obj = self.element, "Stopping task");
self.flush();
gst::log!(SRC_CAT, obj = self.element, "Task stopped");
Ok(())
}
.boxed()
async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(SRC_CAT, obj = self.element, "Stopping task");
self.flush();
gst::log!(SRC_CAT, obj = self.element, "Task stopped");
Ok(())
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::log!(SRC_CAT, obj = self.element, "Starting task flush");
self.flush();
gst::log!(SRC_CAT, obj = self.element, "Task flush started");
Ok(())
}
.boxed()
async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
gst::log!(SRC_CAT, obj = self.element, "Starting task flush");
self.flush();
gst::log!(SRC_CAT, obj = self.element, "Task flush started");
Ok(())
}
}
@ -438,30 +425,24 @@ mod imp_sink {
impl PadSinkHandler for PadSinkTestHandler {
type ElementImpl = ElementSinkTest;
fn sink_chain(
async fn sink_chain(
self,
_pad: gst::Pad,
elem: super::ElementSinkTest,
buffer: gst::Buffer,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
let imp = elem.imp();
imp.forward_item(Item::Buffer(buffer)).await
}
.boxed()
) -> Result<gst::FlowSuccess, gst::FlowError> {
let imp = elem.imp();
imp.forward_item(Item::Buffer(buffer)).await
}
fn sink_chain_list(
async fn sink_chain_list(
self,
_pad: gst::Pad,
elem: super::ElementSinkTest,
list: gst::BufferList,
) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> {
async move {
let imp = elem.imp();
imp.forward_item(Item::BufferList(list)).await
}
.boxed()
) -> Result<gst::FlowSuccess, gst::FlowError> {
let imp = elem.imp();
imp.forward_item(Item::BufferList(list)).await
}
fn sink_event(self, pad: &gst::Pad, imp: &ElementSinkTest, event: gst::Event) -> bool {
@ -476,23 +457,20 @@ mod imp_sink {
}
}
fn sink_event_serialized(
async fn sink_event_serialized(
self,
pad: gst::Pad,
elem: super::ElementSinkTest,
event: gst::Event,
) -> BoxFuture<'static, bool> {
async move {
gst::log!(SINK_CAT, obj = pad, "Handling serialized {:?}", event);
) -> bool {
gst::log!(SINK_CAT, obj = pad, "Handling serialized {:?}", event);
let imp = elem.imp();
if let EventView::FlushStop(..) = event.view() {
imp.start();
}
imp.forward_item(Item::Event(event)).await.is_ok()
let imp = elem.imp();
if let EventView::FlushStop(..) = event.view() {
imp.start();
}
.boxed()
imp.forward_item(Item::Event(event)).await.is_ok()
}
}