diff --git a/generic/threadshare/examples/standalone/src/imp.rs b/generic/threadshare/examples/standalone/src/imp.rs index 91ba9aef6..df64b603b 100644 --- a/generic/threadshare/examples/standalone/src/imp.rs +++ b/generic/threadshare/examples/standalone/src/imp.rs @@ -293,15 +293,6 @@ impl TestSrc { Ok(()) } - - fn pause(&self) -> Result<(), gst::ErrorMessage> { - let is_main_elem = self.settings.lock().unwrap().is_main_elem; - debug_or_trace!(CAT, is_main_elem, imp = self, "Pausing"); - self.task.pause().block_on()?; - debug_or_trace!(CAT, is_main_elem, imp = self, "Paused"); - - Ok(()) - } } #[glib::object_subclass] @@ -457,8 +448,8 @@ impl ElementImpl for TestSrc { gst::StateChangeError })?; } - gst::StateChange::PlayingToPaused => { - self.pause().map_err(|_| gst::StateChangeError)?; + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { self.unprepare(); @@ -478,9 +469,6 @@ impl ElementImpl for TestSrc { gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } - gst::StateChange::PausedToReady => { - self.stop().map_err(|_| gst::StateChangeError)?; - } _ => (), } diff --git a/generic/threadshare/src/appsrc/imp.rs b/generic/threadshare/src/appsrc/imp.rs index 6050afcc5..52ba570f7 100644 --- a/generic/threadshare/src/appsrc/imp.rs +++ b/generic/threadshare/src/appsrc/imp.rs @@ -400,13 +400,6 @@ impl AppSrc { gst::debug!(CAT, imp = self, "Started"); Ok(()) } - - fn pause(&self) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, imp = self, "Pausing"); - self.task.pause().block_on()?; - gst::debug!(CAT, imp = self, "Paused"); - Ok(()) - } } #[glib::object_subclass] @@ -594,8 +587,8 @@ impl ElementImpl for AppSrc { gst::StateChangeError })?; } - gst::StateChange::PlayingToPaused => { - self.pause().map_err(|_| gst::StateChangeError)?; + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { self.unprepare(); @@ -615,9 +608,6 @@ impl ElementImpl for AppSrc { gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } - gst::StateChange::PausedToReady => { - self.stop().map_err(|_| gst::StateChangeError)?; - } _ => (), } diff --git a/generic/threadshare/src/audiotestsrc/imp.rs b/generic/threadshare/src/audiotestsrc/imp.rs index e6c9641c5..7b2de0dfe 100644 --- a/generic/threadshare/src/audiotestsrc/imp.rs +++ b/generic/threadshare/src/audiotestsrc/imp.rs @@ -507,14 +507,6 @@ impl AudioTestSrc { Ok(()) } - - fn pause(&self) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, imp = self, "Pausing"); - self.task.pause().block_on()?; - gst::debug!(CAT, imp = self, "Paused"); - - Ok(()) - } } #[glib::object_subclass] @@ -687,8 +679,8 @@ impl ElementImpl for AudioTestSrc { gst::StateChangeError })?; } - gst::StateChange::PlayingToPaused => { - self.pause().map_err(|_| gst::StateChangeError)?; + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { self.unprepare(); @@ -699,19 +691,12 @@ impl ElementImpl for AudioTestSrc { let mut success = self.parent_change_state(transition)?; match transition { - gst::StateChange::ReadyToPaused => { - self.pause().map_err(|_| gst::StateChangeError)?; - success = gst::StateChangeSuccess::NoPreroll; - } gst::StateChange::PausedToPlaying => { self.start().map_err(|_| gst::StateChangeError)?; } gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } - gst::StateChange::PausedToReady => { - self.stop().map_err(|_| gst::StateChangeError)?; - } _ => (), } diff --git a/generic/threadshare/src/inter/src/imp.rs b/generic/threadshare/src/inter/src/imp.rs index 120d3afed..29c26c0cd 100644 --- a/generic/threadshare/src/inter/src/imp.rs +++ b/generic/threadshare/src/inter/src/imp.rs @@ -618,32 +618,17 @@ impl InterSrc { self.join_inter_ctx_blocking(target_inter_ctx, ts_ctx, dataqueue)?; - match self.obj().current_state() { - gst::State::Paused => { - self.srcpad - .gst_pad() - .push_event(gst::event::FlushStop::builder(true).seqnum(seqnum).build()); + if self.obj().current_state() == gst::State::Playing { + self.srcpad + .gst_pad() + .push_event(gst::event::FlushStop::builder(true).seqnum(seqnum).build()); - if let Err(err) = self.task.pause().block_on() { - return Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Failed to set new Task in Pause: {err}"] - )); - } + if let Err(err) = self.task.start().block_on() { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to start new Task: {err}"] + )); } - gst::State::Playing => { - self.srcpad - .gst_pad() - .push_event(gst::event::FlushStop::builder(true).seqnum(seqnum).build()); - - if let Err(err) = self.task.start().block_on() { - return Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Failed to start new Task: {err}"] - )); - } - } - _ => (), } Ok(()) @@ -759,13 +744,6 @@ impl InterSrc { Ok(()) } - fn pause(&self) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, imp = self, "Pausing"); - self.task.pause().await_maybe_on_context()?; - gst::debug!(CAT, imp = self, "Paused"); - Ok(()) - } - fn flush_start(&self) -> Result<(), gst::FlowError> { gst::debug!(CAT, imp = self, "Flushing"); @@ -1017,8 +995,8 @@ impl ElementImpl for InterSrc { gst::StateChangeError })?; } - gst::StateChange::PlayingToPaused => { - self.pause().map_err(|_| gst::StateChangeError)?; + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { self.unprepare(); @@ -1038,9 +1016,6 @@ impl ElementImpl for InterSrc { gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } - gst::StateChange::PausedToReady => { - self.stop().map_err(|_| gst::StateChangeError)?; - } _ => (), } diff --git a/generic/threadshare/src/proxy/imp.rs b/generic/threadshare/src/proxy/imp.rs index ae41a5bd3..9f8d1ff69 100644 --- a/generic/threadshare/src/proxy/imp.rs +++ b/generic/threadshare/src/proxy/imp.rs @@ -1000,13 +1000,6 @@ impl ProxySrc { gst::debug!(SRC_CAT, imp = self, "Started"); Ok(()) } - - fn pause(&self) -> Result<(), gst::ErrorMessage> { - gst::debug!(SRC_CAT, imp = self, "Pausing"); - self.task.pause().block_on()?; - gst::debug!(SRC_CAT, imp = self, "Paused"); - Ok(()) - } } #[glib::object_subclass] @@ -1173,8 +1166,8 @@ impl ElementImpl for ProxySrc { gst::StateChangeError })?; } - gst::StateChange::PlayingToPaused => { - self.pause().map_err(|_| gst::StateChangeError)?; + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { self.unprepare(); @@ -1194,9 +1187,6 @@ impl ElementImpl for ProxySrc { gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } - gst::StateChange::PausedToReady => { - self.stop().map_err(|_| gst::StateChangeError)?; - } _ => (), } diff --git a/generic/threadshare/src/rtpdtmfsrc/imp.rs b/generic/threadshare/src/rtpdtmfsrc/imp.rs index 51626efa1..638d76464 100644 --- a/generic/threadshare/src/rtpdtmfsrc/imp.rs +++ b/generic/threadshare/src/rtpdtmfsrc/imp.rs @@ -1108,14 +1108,6 @@ impl RTPDTMFSrc { Ok(()) } - - fn pause(&self) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, imp = self, "Pausing"); - self.task.pause().block_on()?; - gst::debug!(CAT, imp = self, "Paused"); - - Ok(()) - } } #[glib::object_subclass] @@ -1346,8 +1338,8 @@ impl ElementImpl for RTPDTMFSrc { gst::StateChangeError })?; } - gst::StateChange::PlayingToPaused => { - self.pause().map_err(|_| gst::StateChangeError)?; + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { self.unprepare(); @@ -1359,7 +1351,6 @@ impl ElementImpl for RTPDTMFSrc { match transition { gst::StateChange::ReadyToPaused => { - self.pause().map_err(|_| gst::StateChangeError)?; success = gst::StateChangeSuccess::NoPreroll; } gst::StateChange::PausedToPlaying => { @@ -1368,9 +1359,6 @@ impl ElementImpl for RTPDTMFSrc { gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } - gst::StateChange::PausedToReady => { - self.stop().map_err(|_| gst::StateChangeError)?; - } _ => (), } diff --git a/generic/threadshare/src/tcpclientsrc/imp.rs b/generic/threadshare/src/tcpclientsrc/imp.rs index 7e925fb8a..513a639c1 100644 --- a/generic/threadshare/src/tcpclientsrc/imp.rs +++ b/generic/threadshare/src/tcpclientsrc/imp.rs @@ -532,13 +532,6 @@ impl TcpClientSrc { Ok(()) } - fn pause(&self) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, imp = self, "Pausing"); - self.task.pause().block_on()?; - gst::debug!(CAT, imp = self, "Paused"); - Ok(()) - } - fn state(&self) -> TaskState { self.task.state() } @@ -704,8 +697,8 @@ impl ElementImpl for TcpClientSrc { gst::StateChangeError })?; } - gst::StateChange::PlayingToPaused => { - self.pause().map_err(|_| gst::StateChangeError)?; + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { self.unprepare(); @@ -725,9 +718,6 @@ impl ElementImpl for TcpClientSrc { gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } - gst::StateChange::PausedToReady => { - self.stop().map_err(|_| gst::StateChangeError)?; - } _ => (), } diff --git a/generic/threadshare/src/udpsrc/imp.rs b/generic/threadshare/src/udpsrc/imp.rs index d72af290c..33c277c7e 100644 --- a/generic/threadshare/src/udpsrc/imp.rs +++ b/generic/threadshare/src/udpsrc/imp.rs @@ -852,13 +852,6 @@ impl UdpSrc { Ok(()) } - fn pause(&self) -> Result<(), gst::ErrorMessage> { - gst::debug!(CAT, imp = self, "Pausing"); - self.task.pause().block_on()?; - gst::debug!(CAT, imp = self, "Paused"); - Ok(()) - } - fn state(&self) -> TaskState { self.task.state() } @@ -1111,8 +1104,8 @@ impl ElementImpl for UdpSrc { gst::StateChangeError })?; } - gst::StateChange::PlayingToPaused => { - self.pause().map_err(|_| gst::StateChangeError)?; + gst::StateChange::PausedToReady => { + self.stop().map_err(|_| gst::StateChangeError)?; } gst::StateChange::ReadyToNull => { self.unprepare(); @@ -1132,9 +1125,6 @@ impl ElementImpl for UdpSrc { gst::StateChange::PlayingToPaused => { success = gst::StateChangeSuccess::NoPreroll; } - gst::StateChange::PausedToReady => { - self.stop().map_err(|_| gst::StateChangeError)?; - } _ => (), } diff --git a/generic/threadshare/tests/inter.rs b/generic/threadshare/tests/inter.rs index 1326aabc8..76f5faae0 100644 --- a/generic/threadshare/tests/inter.rs +++ b/generic/threadshare/tests/inter.rs @@ -204,18 +204,17 @@ fn one_to_one_up_first() { pipe_down.set_base_time(gst::ClockTime::ZERO); pipe_down.set_start_time(gst::ClockTime::NONE); - let (eos_tx, mut eos_rx) = oneshot::channel::<()>(); + let (n_buf_tx, mut n_buf_rx) = oneshot::channel::<()>(); appsink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample({ let mut samples = 0; - let mut eos_tx = Some(eos_tx); + let mut eos_tx = Some(n_buf_tx); move |appsink| { let _ = appsink.pull_sample().unwrap(); samples += 1; - if samples == 100 { + if samples == 10 { eos_tx.take().unwrap().send(()).unwrap(); - return Err(gst::FlowError::Eos); } Ok(gst::FlowSuccess::Ok) @@ -236,8 +235,8 @@ fn one_to_one_up_first() { loop { futures::select! { - _ = eos_rx => { - println!("inter::one_to_one_up_first got eos notif"); + _ = n_buf_rx => { + println!("inter::one_to_one_up_first got n_buf notif"); break; } msg = bus_up_stream.next() => { @@ -325,9 +324,9 @@ fn one_to_many_up_first() { pipe_down.set_start_time(gst::ClockTime::NONE); let samples = Arc::new(AtomicU32::new(0)); - let (mut eos_tx, eos_rx) = if num_buf.is_some() { - let (eos_tx, eos_rx) = oneshot::channel::<()>(); - (Some(eos_tx), Some(eos_rx)) + let (mut n_buf_tx, n_buf_rx) = if num_buf.is_some() { + let (n_buf_tx, n_buf_rx) = oneshot::channel::<()>(); + (Some(n_buf_tx), Some(n_buf_rx)) } else { (None, None) }; @@ -340,8 +339,7 @@ fn one_to_many_up_first() { let cur = samples.fetch_add(1, Ordering::SeqCst); if let Some(num_buf) = num_buf { if cur + 1 == num_buf { - eos_tx.take().unwrap().send(()).unwrap(); - return Err(gst::FlowError::Eos); + n_buf_tx.take().unwrap().send(()).unwrap(); } } Ok(gst::FlowSuccess::Ok) @@ -350,7 +348,7 @@ fn one_to_many_up_first() { .build(), ); - (pipe_down, samples, eos_rx) + (pipe_down, samples, n_buf_rx) } let pipe_up = gst::Pipeline::with_name("upstream::one_to_many_up_first"); @@ -375,12 +373,12 @@ fn one_to_many_up_first() { // Starting upstream first pipe_up.set_state(gst::State::Playing).unwrap(); - let (pipe_down_1, samples_1, eos_rx_1) = build_pipe_down(1, 20); - let mut eos_rx_1 = eos_rx_1.unwrap(); + let (pipe_down_1, samples_1, n_buf_rx_1) = build_pipe_down(1, 20); + let mut n_buf_rx_1 = n_buf_rx_1.unwrap(); pipe_down_1.set_state(gst::State::Playing).unwrap(); - let (pipe_down_2, samples_2, eos_rx_2) = build_pipe_down(2, 20); - let eos_rx_2 = eos_rx_2.unwrap(); + let (pipe_down_2, samples_2, n_buf_rx_2) = build_pipe_down(2, 20); + let n_buf_rx_2 = n_buf_rx_2.unwrap(); pipe_down_2.set_state(gst::State::Playing).unwrap(); futures::executor::block_on(async { @@ -392,8 +390,8 @@ fn one_to_many_up_first() { loop { futures::select! { - _ = eos_rx_1 => { - println!("inter::one_to_many_up_first got eos notif"); + _ = n_buf_rx_1 => { + println!("inter::one_to_many_up_first got n_buf notif"); break; } msg_down_1 = bus_down_stream_1.next() => { @@ -435,7 +433,7 @@ fn one_to_many_up_first() { assert_eq!(samples_1.load(Ordering::SeqCst), 20); // Waiting for pipe_down_2 to handle its buffers too - futures::executor::block_on(eos_rx_2).unwrap(); + futures::executor::block_on(n_buf_rx_2).unwrap(); pipe_down_2.set_state(gst::State::Null).unwrap(); assert_eq!(samples_2.load(Ordering::SeqCst), 20); @@ -561,12 +559,10 @@ fn changing_inter_ctx() { let starting = starting.clone(); let mut cur_caps = None; let mut samples = 0; - let mut start_count = 0; move |appsink| { if starting.fetch_and(false, Ordering::SeqCst) { cur_caps = None; samples = 0; - start_count += 1; } let sample = appsink.pull_sample().unwrap(); @@ -581,9 +577,6 @@ fn changing_inter_ctx() { if samples == 10 { n_buffers_tx.try_send(()).unwrap(); - if start_count == 2 { - return Err(gst::FlowError::Eos); - } } Ok(gst::FlowSuccess::Ok) diff --git a/generic/threadshare/tests/pad.rs b/generic/threadshare/tests/pad.rs index f98fd9626..95d327f41 100644 --- a/generic/threadshare/tests/pad.rs +++ b/generic/threadshare/tests/pad.rs @@ -252,12 +252,6 @@ mod imp_src { self.task.start().await_maybe_on_context().unwrap(); gst::debug!(SRC_CAT, imp = self, "Started"); } - - fn pause(&self) { - gst::debug!(SRC_CAT, imp = self, "Pausing"); - self.task.pause().block_on().unwrap(); - gst::debug!(SRC_CAT, imp = self, "Paused"); - } } #[glib::object_subclass] @@ -363,8 +357,8 @@ mod imp_src { gst::StateChangeError })?; } - gst::StateChange::PlayingToPaused => { - self.pause(); + gst::StateChange::PausedToReady => { + self.stop(); } gst::StateChange::ReadyToNull => { self.unprepare(); @@ -375,9 +369,6 @@ mod imp_src { let mut success = self.parent_change_state(transition)?; match transition { - gst::StateChange::PausedToReady => { - self.stop(); - } gst::StateChange::PausedToPlaying => { self.start(); }