threadshare/TaskImpl: allow transition hooks to fail...

... and add error handlers for iterate and transitions hooks with
default implementation.
This commit is contained in:
François Laignel 2020-05-15 19:38:54 +02:00
parent 5c9bbc6818
commit f0793587f6
9 changed files with 1335 additions and 717 deletions

View file

@ -252,14 +252,8 @@ impl PadSrcHandler for AppSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
appsrc.task.flush_start();
true
}
EventView::FlushStop(..) => {
appsrc.task.flush_stop();
true
}
EventView::FlushStart(..) => appsrc.task.flush_start().is_ok(),
EventView::FlushStop(..) => appsrc.task.flush_stop().is_ok(),
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
_ => false,
@ -355,9 +349,7 @@ impl AppSrcTask {
impl TaskImpl for AppSrcTask {
fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
let item = self.receiver.next().await;
let item = match item {
let item = match self.receiver.next().await {
Some(item) => item,
None => {
gst_error!(CAT, obj: &self.element, "SrcPad channel aborted");
@ -404,7 +396,7 @@ impl TaskImpl for AppSrcTask {
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
@ -412,11 +404,12 @@ impl TaskImpl for AppSrcTask {
self.src_pad_handler.reset_state().await;
gst_log!(CAT, obj: &self.element, "Task stopped");
Ok(())
}
.boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, ()> {
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task flush");
@ -424,6 +417,7 @@ impl TaskImpl for AppSrcTask {
self.src_pad_handler.set_need_segment().await;
gst_log!(CAT, obj: &self.element, "Task flush started");
Ok(())
}
.boxed()
}
@ -544,22 +538,25 @@ impl AppSrc {
gst_debug!(CAT, obj: element, "Unprepared");
}
fn stop(&self, element: &gst::Element) {
fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop();
self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) {
fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
self.task.start();
self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn pause(&self, element: &gst::Element) {
fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Pausing");
self.task.pause();
self.task.pause()?;
gst_debug!(CAT, obj: element, "Paused");
Ok(())
}
}
@ -714,7 +711,7 @@ impl ElementImpl for AppSrc {
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause(element);
self.pause(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@ -729,13 +726,13 @@ impl ElementImpl for AppSrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
self.start(element);
self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop(element);
self.stop(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -662,7 +662,9 @@ impl PadSinkHandler for SinkHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
if let EventView::FlushStart(..) = event.view() {
jb.task.flush_start();
if let Err(err) = jb.task.flush_start() {
gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
}
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event);
@ -699,7 +701,10 @@ impl PadSinkHandler for SinkHandler {
.unwrap();
}
EventView::FlushStop(..) => {
jb.task.flush_stop();
if let Err(err) = jb.task.flush_stop() {
// FIXME we should probably return false if that one fails
gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
}
}
EventView::Eos(..) => {
let mut state = jb.state.lock().unwrap();
@ -975,10 +980,15 @@ impl PadSrcHandler for SrcHandler {
match event.view() {
EventView::FlushStart(..) => {
jb.task.flush_start();
if let Err(err) = jb.task.flush_start() {
gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
}
}
EventView::FlushStop(..) => {
jb.task.flush_stop();
if let Err(err) = jb.task.flush_stop() {
// FIXME we should probably return false if that one fails
gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
}
}
_ => (),
}
@ -1125,7 +1135,7 @@ impl JitterBufferTask {
}
impl TaskImpl for JitterBufferTask {
fn start(&mut self) -> BoxFuture<'_, ()> {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
@ -1136,6 +1146,7 @@ impl TaskImpl for JitterBufferTask {
*jb.state.lock().unwrap() = State::default();
gst_log!(CAT, obj: &self.element, "Task started");
Ok(())
}
.boxed()
}
@ -1279,7 +1290,7 @@ impl TaskImpl for JitterBufferTask {
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
@ -1296,6 +1307,7 @@ impl TaskImpl for JitterBufferTask {
*jb_state = State::default();
gst_log!(CAT, obj: &self.element, "Task stopped");
Ok(())
}
.boxed()
}
@ -1359,16 +1371,18 @@ impl JitterBuffer {
gst_debug!(CAT, obj: element, "Unprepared");
}
fn start(&self, element: &gst::Element) {
fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
self.task.start();
self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn stop(&self, element: &gst::Element) {
fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop();
self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
}
@ -1571,7 +1585,7 @@ impl ElementImpl for JitterBuffer {
})?;
}
gst::StateChange::PausedToReady => {
self.stop(element);
self.stop(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@ -1583,7 +1597,7 @@ impl ElementImpl for JitterBuffer {
match transition {
gst::StateChange::ReadyToPaused => {
self.start(element);
self.start(element).map_err(|_| gst::StateChangeError)?;
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PlayingToPaused => {

View file

@ -825,8 +825,17 @@ impl PadSrcHandler for ProxySrcPadHandler {
};
match event.view() {
EventView::FlushStart(..) => proxysrc.task.flush_start(),
EventView::FlushStop(..) => proxysrc.task.flush_stop(),
EventView::FlushStart(..) => {
if let Err(err) = proxysrc.task.flush_start() {
gst_error!(SRC_CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
}
}
EventView::FlushStop(..) => {
if let Err(err) = proxysrc.task.flush_stop() {
// FIXME we should probably return false if that one fails
gst_error!(SRC_CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
}
}
_ => (),
}
@ -905,7 +914,7 @@ impl ProxySrcTask {
}
impl TaskImpl for ProxySrcTask {
fn start(&mut self) -> BoxFuture<'_, ()> {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Starting task");
@ -922,6 +931,7 @@ impl TaskImpl for ProxySrcTask {
self.dataqueue.start();
gst_log!(SRC_CAT, obj: &self.element, "Task started");
Ok(())
}
.boxed()
}
@ -979,7 +989,7 @@ impl TaskImpl for ProxySrcTask {
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Stopping task");
@ -997,11 +1007,12 @@ impl TaskImpl for ProxySrcTask {
}
gst_log!(SRC_CAT, obj: &self.element, "Task stopped");
Ok(())
}
.boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, ()> {
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Starting task flush");
@ -1014,6 +1025,7 @@ impl TaskImpl for ProxySrcTask {
shared_ctx.last_res = Err(gst::FlowError::Flushing);
gst_log!(SRC_CAT, obj: &self.element, "Task flush started");
Ok(())
}
.boxed()
}
@ -1120,22 +1132,25 @@ impl ProxySrc {
gst_debug!(SRC_CAT, obj: element, "Unprepared");
}
fn stop(&self, element: &gst::Element) {
fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(SRC_CAT, obj: element, "Stopping");
self.task.stop();
self.task.stop()?;
gst_debug!(SRC_CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) {
fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(SRC_CAT, obj: element, "Starting");
self.task.start();
self.task.start()?;
gst_debug!(SRC_CAT, obj: element, "Started");
Ok(())
}
fn pause(&self, element: &gst::Element) {
fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(SRC_CAT, obj: element, "Pausing");
self.task.pause();
self.task.pause()?;
gst_debug!(SRC_CAT, obj: element, "Paused");
Ok(())
}
}
@ -1264,7 +1279,7 @@ impl ElementImpl for ProxySrc {
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause(element);
self.pause(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@ -1279,13 +1294,13 @@ impl ElementImpl for ProxySrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
self.start(element);
self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop(element);
self.stop(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -193,7 +193,9 @@ impl PadSinkHandler for QueuePadSinkHandler {
gst_debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event);
if let EventView::FlushStart(..) = event.view() {
queue.task.flush_start();
if let Err(err) = queue.task.flush_start() {
gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
}
}
gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event);
@ -218,7 +220,10 @@ impl PadSinkHandler for QueuePadSinkHandler {
let queue = Queue::from_instance(&element);
if let EventView::FlushStop(..) = event.view() {
queue.task.flush_stop();
if let Err(err) = queue.task.flush_stop() {
// FIXME we should probably return false if that one fails
gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
}
}
gst_log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event);
@ -296,8 +301,17 @@ impl PadSrcHandler for QueuePadSrcHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
match event.view() {
EventView::FlushStart(..) => queue.task.flush_start(),
EventView::FlushStop(..) => queue.task.flush_stop(),
EventView::FlushStart(..) => {
if let Err(err) = queue.task.flush_start() {
gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err);
}
}
EventView::FlushStop(..) => {
if let Err(err) = queue.task.flush_stop() {
// FIXME we should probably return false if that one fails
gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err);
}
}
_ => (),
}
@ -362,7 +376,7 @@ impl QueueTask {
}
impl TaskImpl for QueueTask {
fn start(&mut self) -> BoxFuture<'_, ()> {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
@ -374,6 +388,7 @@ impl TaskImpl for QueueTask {
*last_res = Ok(gst::FlowSuccess::Ok);
gst_log!(CAT, obj: &self.element, "Task started");
Ok(())
}
.boxed()
}
@ -425,7 +440,7 @@ impl TaskImpl for QueueTask {
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
@ -442,11 +457,12 @@ impl TaskImpl for QueueTask {
*last_res = Err(gst::FlowError::Flushing);
gst_log!(CAT, obj: &self.element, "Task stopped");
Ok(())
}
.boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, ()> {
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task flush");
@ -462,6 +478,7 @@ impl TaskImpl for QueueTask {
*last_res = Err(gst::FlowError::Flushing);
gst_log!(CAT, obj: &self.element, "Task flush started");
Ok(())
}
.boxed()
}
@ -699,16 +716,18 @@ impl Queue {
gst_debug!(CAT, obj: element, "Unprepared");
}
fn stop(&self, element: &gst::Element) {
fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop();
self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) {
fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
self.task.start();
self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
}
@ -839,7 +858,7 @@ impl ElementImpl for Queue {
})?;
}
gst::StateChange::PausedToReady => {
self.stop(element);
self.stop(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@ -850,7 +869,7 @@ impl ElementImpl for Queue {
let success = self.parent_change_state(element, transition)?;
if transition == gst::StateChange::ReadyToPaused {
self.start(element);
self.start(element).map_err(|_| gst::StateChangeError)?;
}
Ok(success)

File diff suppressed because it is too large Load diff

View file

@ -41,7 +41,8 @@ use std::u32;
use tokio::io::AsyncReadExt;
use crate::runtime::prelude::*;
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task};
use crate::runtime::task::Transition;
use crate::runtime::{Context, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState};
use super::socket::{Socket, SocketError, SocketRead};
@ -264,16 +265,8 @@ impl PadSrcHandler for TcpClientSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
tcpclientsrc.task.flush_start();
true
}
EventView::FlushStop(..) => {
tcpclientsrc.task.flush_stop();
true
}
EventView::FlushStart(..) => tcpclientsrc.task.flush_start().is_ok(),
EventView::FlushStop(..) => tcpclientsrc.task.flush_stop().is_ok(),
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
_ => false,
@ -393,16 +386,27 @@ impl TaskImpl for TcpClientSrcTask {
);
gst_log!(CAT, obj: &self.element, "Task prepared");
Ok(())
}
.boxed()
}
fn handle_prepare_error(&mut self, err: gst::ErrorMessage) -> BoxFuture<'_, ()> {
fn handle_hook_error(
&mut self,
transition: Transition,
state: TaskState,
err: gst::ErrorMessage,
) -> BoxFuture<'_, Transition> {
async move {
gst_error!(CAT, "Task preparation failed: {:?}", err);
self.element.post_error_message(&err);
match transition {
Transition::Prepare => {
gst_error!(CAT, "Task preparation failed: {:?}", err);
self.element.post_error_message(&err);
Transition::Error
}
other => unreachable!("Hook error {:?} in state {:?}", other, state),
}
}
.boxed()
}
@ -474,20 +478,22 @@ impl TaskImpl for TcpClientSrcTask {
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
self.src_pad_handler.reset_state().await;
gst_log!(CAT, obj: &self.element, "Task stopped");
Ok(())
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task flush");
self.src_pad_handler.set_need_segment().await;
gst_log!(CAT, obj: &self.element, "Task flush stopped");
Ok(())
}
.boxed()
}
@ -584,22 +590,25 @@ impl TcpClientSrc {
gst_debug!(CAT, obj: element, "Unprepared");
}
fn stop(&self, element: &gst::Element) {
fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop();
self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) {
fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
self.task.start();
self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn pause(&self, element: &gst::Element) {
fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Pausing");
self.task.pause();
self.task.pause()?;
gst_debug!(CAT, obj: element, "Paused");
Ok(())
}
}
@ -721,7 +730,7 @@ impl ElementImpl for TcpClientSrc {
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause(element);
self.pause(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@ -736,13 +745,13 @@ impl ElementImpl for TcpClientSrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
self.start(element);
self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop(element);
self.stop(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -827,7 +827,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
async move {
if let EventView::FlushStop(_) = event.view() {
let udpsink = UdpSink::from_instance(&element);
udpsink.task.flush_stop();
return udpsink.task.flush_stop().is_ok();
} else if let Some(sender) = sender.lock().await.as_mut() {
if sender.send(TaskItem::Event(event)).await.is_err() {
gst_debug!(CAT, obj: &element, "Flushing");
@ -847,7 +847,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
event: gst::Event,
) -> bool {
if let EventView::FlushStart(..) = event.view() {
udpsink.task.flush_start();
return udpsink.task.flush_start().is_ok();
}
true
@ -872,7 +872,7 @@ impl UdpSinkTask {
}
impl TaskImpl for UdpSinkTask {
fn start(&mut self) -> BoxFuture<'_, ()> {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
@ -884,6 +884,7 @@ impl TaskImpl for UdpSinkTask {
self.receiver = Some(receiver);
gst_log!(CAT, obj: &self.element, "Task started");
Ok(())
}
.boxed()
}
@ -1109,16 +1110,18 @@ impl UdpSink {
gst_debug!(CAT, obj: element, "Unprepared");
}
fn stop(&self, element: &gst::Element) {
fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop();
self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) {
fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
self.task.start();
self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
}
@ -1515,10 +1518,10 @@ impl ElementImpl for UdpSink {
})?;
}
gst::StateChange::ReadyToPaused => {
self.start(element);
self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
self.stop(element);
self.stop(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);

View file

@ -312,16 +312,8 @@ impl PadSrcHandler for UdpSrcPadHandler {
gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event);
let ret = match event.view() {
EventView::FlushStart(..) => {
udpsrc.task.flush_start();
true
}
EventView::FlushStop(..) => {
udpsrc.task.flush_stop();
true
}
EventView::FlushStart(..) => udpsrc.task.flush_start().is_ok(),
EventView::FlushStop(..) => udpsrc.task.flush_stop().is_ok(),
EventView::Reconfigure(..) => true,
EventView::Latency(..) => true,
_ => false,
@ -409,12 +401,13 @@ impl UdpSrcTask {
}
impl TaskImpl for UdpSrcTask {
fn start(&mut self) -> BoxFuture<'_, ()> {
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Starting task");
self.socket
.set_clock(self.element.get_clock(), Some(self.element.get_base_time()));
gst_log!(CAT, obj: &self.element, "Task started");
Ok(())
}
.boxed()
}
@ -504,20 +497,22 @@ impl TaskImpl for UdpSrcTask {
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task");
self.src_pad_handler.reset_state().await;
gst_log!(CAT, obj: &self.element, "Task stopped");
Ok(())
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, ()> {
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(CAT, obj: &self.element, "Stopping task flush");
self.src_pad_handler.set_need_segment().await;
gst_log!(CAT, obj: &self.element, "Stopped task flush");
Ok(())
}
.boxed()
}
@ -750,22 +745,25 @@ impl UdpSrc {
gst_debug!(CAT, obj: element, "Unprepared");
}
fn stop(&self, element: &gst::Element) {
fn stop(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Stopping");
self.task.stop();
self.task.stop()?;
gst_debug!(CAT, obj: element, "Stopped");
Ok(())
}
fn start(&self, element: &gst::Element) {
fn start(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Starting");
self.task.start();
self.task.start()?;
gst_debug!(CAT, obj: element, "Started");
Ok(())
}
fn pause(&self, element: &gst::Element) {
fn pause(&self, element: &gst::Element) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Pausing");
self.task.pause();
self.task.pause()?;
gst_debug!(CAT, obj: element, "Paused");
Ok(())
}
}
@ -932,7 +930,7 @@ impl ElementImpl for UdpSrc {
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause(element);
self.pause(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare(element);
@ -947,13 +945,13 @@ impl ElementImpl for UdpSrc {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
self.start(element);
self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop(element);
self.stop(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -115,12 +115,12 @@ impl PadSrcHandler for PadSrcTestHandler {
let ret = match event.view() {
EventView::FlushStart(..) => {
elem_src_test.task.flush_start();
elem_src_test.task.flush_start().unwrap();
true
}
EventView::Qos(..) | EventView::Reconfigure(..) | EventView::Latency(..) => true,
EventView::FlushStop(..) => {
elem_src_test.task.flush_stop();
elem_src_test.task.flush_stop().unwrap();
true
}
_ => false,
@ -188,20 +188,22 @@ impl TaskImpl for ElementSrcTestTask {
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, ()> {
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Stopping task");
self.flush();
gst_log!(SRC_CAT, obj: &self.element, "Task stopped");
Ok(())
}
.boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, ()> {
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst_log!(SRC_CAT, obj: &self.element, "Starting task flush");
self.flush();
gst_log!(SRC_CAT, obj: &self.element, "Task flush started");
Ok(())
}
.boxed()
}
@ -253,7 +255,7 @@ impl ElementSrcTest {
)
.map_err(|err| {
gst_error_msg!(
gst::ResourceError::OpenRead,
gst::ResourceError::Failed,
["Error preparing Task: {:?}", err]
)
})?;
@ -274,19 +276,19 @@ impl ElementSrcTest {
fn stop(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Stopping");
self.task.stop();
self.task.stop().unwrap();
gst_debug!(SRC_CAT, obj: element, "Stopped");
}
fn start(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Starting");
self.task.start();
self.task.start().unwrap();
gst_debug!(SRC_CAT, obj: element, "Started");
}
fn pause(&self, element: &gst::Element) {
gst_debug!(SRC_CAT, obj: element, "Pausing");
self.task.pause();
self.task.pause().unwrap();
gst_debug!(SRC_CAT, obj: element, "Paused");
}
}
@ -405,10 +407,10 @@ impl ElementImpl for ElementSrcTest {
fn send_event(&self, _element: &gst::Element, event: gst::Event) -> bool {
match event.view() {
EventView::FlushStart(..) => {
self.task.flush_start();
self.task.flush_start().unwrap();
}
EventView::FlushStop(..) => {
self.task.flush_stop();
self.task.flush_stop().unwrap();
}
_ => (),
}