threadshare: src elements: don't pause the task in downward state transitions

The threadshare async Task doesn't cancel a currently running iteration. If the
streaming thread is blocked, the iteration will not return blocking the state
change.

This commit skips the `pause` transition for the src elements, so a blocked
iteration can be interrupted transitioning to `Stop`.

Also make sure the stop transition is called chaining the state change up.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2380>
This commit is contained in:
François Laignel 2025-07-16 11:46:19 +02:00
parent 9612655f50
commit 716ac58a20
10 changed files with 44 additions and 164 deletions

View file

@ -293,15 +293,6 @@ impl TestSrc {
Ok(())
}
fn pause(&self) -> Result<(), gst::ErrorMessage> {
let is_main_elem = self.settings.lock().unwrap().is_main_elem;
debug_or_trace!(CAT, is_main_elem, imp = self, "Pausing");
self.task.pause().block_on()?;
debug_or_trace!(CAT, is_main_elem, imp = self, "Paused");
Ok(())
}
}
#[glib::object_subclass]
@ -457,8 +448,8 @@ impl ElementImpl for TestSrc {
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause().map_err(|_| gst::StateChangeError)?;
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare();
@ -478,9 +469,6 @@ impl ElementImpl for TestSrc {
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -400,13 +400,6 @@ impl AppSrc {
gst::debug!(CAT, imp = self, "Started");
Ok(())
}
fn pause(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Pausing");
self.task.pause().block_on()?;
gst::debug!(CAT, imp = self, "Paused");
Ok(())
}
}
#[glib::object_subclass]
@ -594,8 +587,8 @@ impl ElementImpl for AppSrc {
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause().map_err(|_| gst::StateChangeError)?;
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare();
@ -615,9 +608,6 @@ impl ElementImpl for AppSrc {
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -507,14 +507,6 @@ impl AudioTestSrc {
Ok(())
}
fn pause(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Pausing");
self.task.pause().block_on()?;
gst::debug!(CAT, imp = self, "Paused");
Ok(())
}
}
#[glib::object_subclass]
@ -687,8 +679,8 @@ impl ElementImpl for AudioTestSrc {
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause().map_err(|_| gst::StateChangeError)?;
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare();
@ -699,19 +691,12 @@ impl ElementImpl for AudioTestSrc {
let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::ReadyToPaused => {
self.pause().map_err(|_| gst::StateChangeError)?;
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
self.start().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -618,32 +618,17 @@ impl InterSrc {
self.join_inter_ctx_blocking(target_inter_ctx, ts_ctx, dataqueue)?;
match self.obj().current_state() {
gst::State::Paused => {
self.srcpad
.gst_pad()
.push_event(gst::event::FlushStop::builder(true).seqnum(seqnum).build());
if self.obj().current_state() == gst::State::Playing {
self.srcpad
.gst_pad()
.push_event(gst::event::FlushStop::builder(true).seqnum(seqnum).build());
if let Err(err) = self.task.pause().block_on() {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to set new Task in Pause: {err}"]
));
}
if let Err(err) = self.task.start().block_on() {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to start new Task: {err}"]
));
}
gst::State::Playing => {
self.srcpad
.gst_pad()
.push_event(gst::event::FlushStop::builder(true).seqnum(seqnum).build());
if let Err(err) = self.task.start().block_on() {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to start new Task: {err}"]
));
}
}
_ => (),
}
Ok(())
@ -759,13 +744,6 @@ impl InterSrc {
Ok(())
}
fn pause(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Pausing");
self.task.pause().await_maybe_on_context()?;
gst::debug!(CAT, imp = self, "Paused");
Ok(())
}
fn flush_start(&self) -> Result<(), gst::FlowError> {
gst::debug!(CAT, imp = self, "Flushing");
@ -1017,8 +995,8 @@ impl ElementImpl for InterSrc {
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause().map_err(|_| gst::StateChangeError)?;
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare();
@ -1038,9 +1016,6 @@ impl ElementImpl for InterSrc {
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -1000,13 +1000,6 @@ impl ProxySrc {
gst::debug!(SRC_CAT, imp = self, "Started");
Ok(())
}
fn pause(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(SRC_CAT, imp = self, "Pausing");
self.task.pause().block_on()?;
gst::debug!(SRC_CAT, imp = self, "Paused");
Ok(())
}
}
#[glib::object_subclass]
@ -1173,8 +1166,8 @@ impl ElementImpl for ProxySrc {
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause().map_err(|_| gst::StateChangeError)?;
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare();
@ -1194,9 +1187,6 @@ impl ElementImpl for ProxySrc {
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -1108,14 +1108,6 @@ impl RTPDTMFSrc {
Ok(())
}
fn pause(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Pausing");
self.task.pause().block_on()?;
gst::debug!(CAT, imp = self, "Paused");
Ok(())
}
}
#[glib::object_subclass]
@ -1346,8 +1338,8 @@ impl ElementImpl for RTPDTMFSrc {
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause().map_err(|_| gst::StateChangeError)?;
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare();
@ -1359,7 +1351,6 @@ impl ElementImpl for RTPDTMFSrc {
match transition {
gst::StateChange::ReadyToPaused => {
self.pause().map_err(|_| gst::StateChangeError)?;
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToPlaying => {
@ -1368,9 +1359,6 @@ impl ElementImpl for RTPDTMFSrc {
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -532,13 +532,6 @@ impl TcpClientSrc {
Ok(())
}
fn pause(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Pausing");
self.task.pause().block_on()?;
gst::debug!(CAT, imp = self, "Paused");
Ok(())
}
fn state(&self) -> TaskState {
self.task.state()
}
@ -704,8 +697,8 @@ impl ElementImpl for TcpClientSrc {
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause().map_err(|_| gst::StateChangeError)?;
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare();
@ -725,9 +718,6 @@ impl ElementImpl for TcpClientSrc {
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -852,13 +852,6 @@ impl UdpSrc {
Ok(())
}
fn pause(&self) -> Result<(), gst::ErrorMessage> {
gst::debug!(CAT, imp = self, "Pausing");
self.task.pause().block_on()?;
gst::debug!(CAT, imp = self, "Paused");
Ok(())
}
fn state(&self) -> TaskState {
self.task.state()
}
@ -1111,8 +1104,8 @@ impl ElementImpl for UdpSrc {
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause().map_err(|_| gst::StateChangeError)?;
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::ReadyToNull => {
self.unprepare();
@ -1132,9 +1125,6 @@ impl ElementImpl for UdpSrc {
gst::StateChange::PlayingToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}
gst::StateChange::PausedToReady => {
self.stop().map_err(|_| gst::StateChangeError)?;
}
_ => (),
}

View file

@ -204,18 +204,17 @@ fn one_to_one_up_first() {
pipe_down.set_base_time(gst::ClockTime::ZERO);
pipe_down.set_start_time(gst::ClockTime::NONE);
let (eos_tx, mut eos_rx) = oneshot::channel::<()>();
let (n_buf_tx, mut n_buf_rx) = oneshot::channel::<()>();
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let mut samples = 0;
let mut eos_tx = Some(eos_tx);
let mut eos_tx = Some(n_buf_tx);
move |appsink| {
let _ = appsink.pull_sample().unwrap();
samples += 1;
if samples == 100 {
if samples == 10 {
eos_tx.take().unwrap().send(()).unwrap();
return Err(gst::FlowError::Eos);
}
Ok(gst::FlowSuccess::Ok)
@ -236,8 +235,8 @@ fn one_to_one_up_first() {
loop {
futures::select! {
_ = eos_rx => {
println!("inter::one_to_one_up_first got eos notif");
_ = n_buf_rx => {
println!("inter::one_to_one_up_first got n_buf notif");
break;
}
msg = bus_up_stream.next() => {
@ -325,9 +324,9 @@ fn one_to_many_up_first() {
pipe_down.set_start_time(gst::ClockTime::NONE);
let samples = Arc::new(AtomicU32::new(0));
let (mut eos_tx, eos_rx) = if num_buf.is_some() {
let (eos_tx, eos_rx) = oneshot::channel::<()>();
(Some(eos_tx), Some(eos_rx))
let (mut n_buf_tx, n_buf_rx) = if num_buf.is_some() {
let (n_buf_tx, n_buf_rx) = oneshot::channel::<()>();
(Some(n_buf_tx), Some(n_buf_rx))
} else {
(None, None)
};
@ -340,8 +339,7 @@ fn one_to_many_up_first() {
let cur = samples.fetch_add(1, Ordering::SeqCst);
if let Some(num_buf) = num_buf {
if cur + 1 == num_buf {
eos_tx.take().unwrap().send(()).unwrap();
return Err(gst::FlowError::Eos);
n_buf_tx.take().unwrap().send(()).unwrap();
}
}
Ok(gst::FlowSuccess::Ok)
@ -350,7 +348,7 @@ fn one_to_many_up_first() {
.build(),
);
(pipe_down, samples, eos_rx)
(pipe_down, samples, n_buf_rx)
}
let pipe_up = gst::Pipeline::with_name("upstream::one_to_many_up_first");
@ -375,12 +373,12 @@ fn one_to_many_up_first() {
// Starting upstream first
pipe_up.set_state(gst::State::Playing).unwrap();
let (pipe_down_1, samples_1, eos_rx_1) = build_pipe_down(1, 20);
let mut eos_rx_1 = eos_rx_1.unwrap();
let (pipe_down_1, samples_1, n_buf_rx_1) = build_pipe_down(1, 20);
let mut n_buf_rx_1 = n_buf_rx_1.unwrap();
pipe_down_1.set_state(gst::State::Playing).unwrap();
let (pipe_down_2, samples_2, eos_rx_2) = build_pipe_down(2, 20);
let eos_rx_2 = eos_rx_2.unwrap();
let (pipe_down_2, samples_2, n_buf_rx_2) = build_pipe_down(2, 20);
let n_buf_rx_2 = n_buf_rx_2.unwrap();
pipe_down_2.set_state(gst::State::Playing).unwrap();
futures::executor::block_on(async {
@ -392,8 +390,8 @@ fn one_to_many_up_first() {
loop {
futures::select! {
_ = eos_rx_1 => {
println!("inter::one_to_many_up_first got eos notif");
_ = n_buf_rx_1 => {
println!("inter::one_to_many_up_first got n_buf notif");
break;
}
msg_down_1 = bus_down_stream_1.next() => {
@ -435,7 +433,7 @@ fn one_to_many_up_first() {
assert_eq!(samples_1.load(Ordering::SeqCst), 20);
// Waiting for pipe_down_2 to handle its buffers too
futures::executor::block_on(eos_rx_2).unwrap();
futures::executor::block_on(n_buf_rx_2).unwrap();
pipe_down_2.set_state(gst::State::Null).unwrap();
assert_eq!(samples_2.load(Ordering::SeqCst), 20);
@ -561,12 +559,10 @@ fn changing_inter_ctx() {
let starting = starting.clone();
let mut cur_caps = None;
let mut samples = 0;
let mut start_count = 0;
move |appsink| {
if starting.fetch_and(false, Ordering::SeqCst) {
cur_caps = None;
samples = 0;
start_count += 1;
}
let sample = appsink.pull_sample().unwrap();
@ -581,9 +577,6 @@ fn changing_inter_ctx() {
if samples == 10 {
n_buffers_tx.try_send(()).unwrap();
if start_count == 2 {
return Err(gst::FlowError::Eos);
}
}
Ok(gst::FlowSuccess::Ok)

View file

@ -252,12 +252,6 @@ mod imp_src {
self.task.start().await_maybe_on_context().unwrap();
gst::debug!(SRC_CAT, imp = self, "Started");
}
fn pause(&self) {
gst::debug!(SRC_CAT, imp = self, "Pausing");
self.task.pause().block_on().unwrap();
gst::debug!(SRC_CAT, imp = self, "Paused");
}
}
#[glib::object_subclass]
@ -363,8 +357,8 @@ mod imp_src {
gst::StateChangeError
})?;
}
gst::StateChange::PlayingToPaused => {
self.pause();
gst::StateChange::PausedToReady => {
self.stop();
}
gst::StateChange::ReadyToNull => {
self.unprepare();
@ -375,9 +369,6 @@ mod imp_src {
let mut success = self.parent_change_state(transition)?;
match transition {
gst::StateChange::PausedToReady => {
self.stop();
}
gst::StateChange::PausedToPlaying => {
self.start();
}