uridecodebin: stop using crossbeam_channel

I give up on crossbeam_channel. For some reasons some receivers are not
always unblocked and I was not able to reproduce using simpler test
cases.
Use with mpsc channels instead which are more reliable.
This commit is contained in:
Guillaume Desmottes 2022-04-07 14:19:56 +02:00
parent 14a62ec7ce
commit 2eb4a82093
3 changed files with 68 additions and 61 deletions

1
Cargo.lock generated
View file

@ -1781,7 +1781,6 @@ name = "gst-plugin-uriplaylistbin"
version = "0.8.3" version = "0.8.3"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"crossbeam-channel",
"gst-plugin-version-helper", "gst-plugin-version-helper",
"gstreamer", "gstreamer",
"gstreamer-app", "gstreamer-app",

View file

@ -11,7 +11,6 @@ description = "Playlist Plugin"
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.18", version = "0.18", features = ["v1_14"] } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.18", version = "0.18", features = ["v1_14"] }
once_cell = "1.0" once_cell = "1.0"
anyhow = "1" anyhow = "1"
crossbeam-channel = "0.5"
[dev-dependencies] [dev-dependencies]
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.18", version = "0.18", features = ["v1_14"]} gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.18", version = "0.18", features = ["v1_14"]}

View file

@ -7,7 +7,7 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use std::sync::Arc; use std::sync::Arc;
use std::sync::{Mutex, MutexGuard}; use std::sync::{mpsc, Mutex, MutexGuard};
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
@ -175,7 +175,7 @@ impl State {
fn unblock_item(&mut self, element: &super::UriPlaylistBin) { fn unblock_item(&mut self, element: &super::UriPlaylistBin) {
if let Some(blocked) = self.blocked.take() { if let Some(blocked) = self.blocked.take() {
let (messages, sender) = blocked.set_streaming(self.streams_topology.n_streams()); let (messages, channels) = blocked.set_streaming(self.streams_topology.n_streams());
gst_log!( gst_log!(
CAT, CAT,
@ -188,7 +188,8 @@ impl State {
for msg in messages { for msg in messages {
let _ = element.post_message(msg); let _ = element.post_message(msg);
} }
let _ = sender.send(true);
channels.send(true);
self.streaming.push(blocked); self.streaming.push(blocked);
} }
@ -218,9 +219,8 @@ enum ItemState {
/// number of streamsynchronizer src pads which are not eos yet /// number of streamsynchronizer src pads which are not eos yet
waiting_eos: u32, waiting_eos: u32,
stream_collection_msg: gst::Message, stream_collection_msg: gst::Message,
// channel used to block pads flow until streamsynchronizer is eos // channels used to block pads flow until streamsynchronizer is eos
sender: crossbeam_channel::Sender<bool>, channels: Channels,
receiver: crossbeam_channel::Receiver<bool>,
}, },
/// Waiting that pads of all the streams have been created on decodebin. /// Waiting that pads of all the streams have been created on decodebin.
/// Required to ensure that streams are plugged to concat in the playlist order. /// Required to ensure that streams are plugged to concat in the playlist order.
@ -230,9 +230,8 @@ enum ItemState {
stream_collection_msg: gst::Message, stream_collection_msg: gst::Message,
/// concat sink pads which have been requested to handle this item /// concat sink pads which have been requested to handle this item
concat_sink_pads: Vec<(gst::Element, gst::Pad)>, concat_sink_pads: Vec<(gst::Element, gst::Pad)>,
// channel used to block pad flow in the Blocked state // channels used to block pad flow in the Blocked state
sender: crossbeam_channel::Sender<bool>, channels: Channels,
receiver: crossbeam_channel::Receiver<bool>,
}, },
/// Pads have been linked to `concat` elements but are blocked until the next item is linked to `concat` as well. /// Pads have been linked to `concat` elements but are blocked until the next item is linked to `concat` as well.
/// This is required to ensure gap-less transition between items. /// This is required to ensure gap-less transition between items.
@ -241,7 +240,7 @@ enum ItemState {
stream_collection_msg: gst::Message, stream_collection_msg: gst::Message,
stream_selected_msg: Option<gst::Message>, stream_selected_msg: Option<gst::Message>,
concat_sink_pads: Vec<(gst::Element, gst::Pad)>, concat_sink_pads: Vec<(gst::Element, gst::Pad)>,
sender: crossbeam_channel::Sender<bool>, channels: Channels,
}, },
/// Buffers are flowing /// Buffers are flowing
Streaming { Streaming {
@ -333,15 +332,17 @@ impl Item {
} }
} }
fn receiver(&self) -> crossbeam_channel::Receiver<bool> { fn receiver(&self) -> mpsc::Receiver<bool> {
let inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
match &inner.state { let channels = match &mut inner.state {
ItemState::WaitingForPads { receiver, .. } => receiver.clone(), ItemState::WaitingForPads { channels, .. } => channels,
ItemState::WaitingForStreamsynchronizerEos { receiver, .. } => receiver.clone(), ItemState::WaitingForStreamsynchronizerEos { channels, .. } => channels,
// receiver is no longer supposed to be accessed once in the `Blocked` state // receiver is no longer supposed to be accessed once in the `Blocked` state
_ => panic!("invalid state: {:?}", inner.state), _ => panic!("invalid state: {:?}", inner.state),
} };
channels.get_receiver()
} }
fn add_blocked_pad(&self, pad: gst::Pad) { fn add_blocked_pad(&self, pad: gst::Pad) {
@ -443,8 +444,6 @@ impl Item {
ItemState::WaitingForStreamCollection { .. } ItemState::WaitingForStreamCollection { .. }
)); ));
let (sender, receiver) = crossbeam_channel::unbounded::<bool>();
match &inner.state { match &inner.state {
ItemState::WaitingForStreamCollection { uridecodebin } => { ItemState::WaitingForStreamCollection { uridecodebin } => {
inner.state = ItemState::WaitingForPads { inner.state = ItemState::WaitingForPads {
@ -452,8 +451,7 @@ impl Item {
n_pads_pendings: n_streams, n_pads_pendings: n_streams,
stream_collection_msg: msg.copy(), stream_collection_msg: msg.copy(),
concat_sink_pads: vec![], concat_sink_pads: vec![],
sender, channels: Channels::default(),
receiver,
}; };
} }
_ => panic!("invalid state: {:?}", inner.state), _ => panic!("invalid state: {:?}", inner.state),
@ -465,7 +463,6 @@ impl Item {
// having to wait until streamsynchronizer is flushed to internally reorganize the element. // having to wait until streamsynchronizer is flushed to internally reorganize the element.
fn set_waiting_for_ss_eos(&self, waiting_eos: u32, msg: gst::message::StreamCollection) { fn set_waiting_for_ss_eos(&self, waiting_eos: u32, msg: gst::message::StreamCollection) {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
let (sender, receiver) = crossbeam_channel::unbounded::<bool>();
match &inner.state { match &inner.state {
ItemState::WaitingForStreamCollection { uridecodebin } => { ItemState::WaitingForStreamCollection { uridecodebin } => {
@ -475,8 +472,7 @@ impl Item {
waiting_eos, waiting_eos,
// FIXME: save deep copy once https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/363 is fixed // FIXME: save deep copy once https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/363 is fixed
stream_collection_msg: msg.copy(), stream_collection_msg: msg.copy(),
sender, channels: Channels::default(),
receiver,
}; };
} }
_ => panic!("invalid state: {:?}", inner.state), _ => panic!("invalid state: {:?}", inner.state),
@ -485,13 +481,7 @@ impl Item {
// from the WaitingForStreamsynchronizerEos state, called when the streamsynchronizer has been flushed // from the WaitingForStreamsynchronizerEos state, called when the streamsynchronizer has been flushed
// and the item can now be processed. // and the item can now be processed.
fn done_waiting_for_ss_eos( fn done_waiting_for_ss_eos(&self) -> (StreamsTopology, Vec<gst::Pad>, Channels) {
&self,
) -> (
StreamsTopology,
Vec<gst::Pad>,
crossbeam_channel::Sender<bool>,
) {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
match &inner.state { match &inner.state {
@ -500,7 +490,7 @@ impl Item {
decodebin_pads, decodebin_pads,
waiting_eos, waiting_eos,
stream_collection_msg, stream_collection_msg,
sender, channels,
.. ..
} => { } => {
assert_eq!(*waiting_eos, 0); assert_eq!(*waiting_eos, 0);
@ -512,20 +502,17 @@ impl Item {
_ => unreachable!(), _ => unreachable!(),
}; };
let pending_pads = decodebin_pads.clone(); let pending_pads = decodebin_pads.clone();
let sender = sender.clone(); let channels = channels.clone();
let (new_sender, new_receiver) = crossbeam_channel::unbounded::<bool>();
inner.state = ItemState::WaitingForPads { inner.state = ItemState::WaitingForPads {
uridecodebin: uridecodebin.clone(), uridecodebin: uridecodebin.clone(),
n_pads_pendings: topology.n_streams(), n_pads_pendings: topology.n_streams(),
stream_collection_msg: stream_collection_msg.copy(), stream_collection_msg: stream_collection_msg.copy(),
concat_sink_pads: vec![], concat_sink_pads: vec![],
sender: new_sender, channels: Channels::default(),
receiver: new_receiver,
}; };
(topology, pending_pads, sender) (topology, pending_pads, channels)
} }
_ => panic!("invalid state: {:?}", inner.state), _ => panic!("invalid state: {:?}", inner.state),
} }
@ -538,14 +525,14 @@ impl Item {
match &mut inner.state { match &mut inner.state {
ItemState::WaitingForPads { ItemState::WaitingForPads {
uridecodebin, uridecodebin,
sender, channels,
stream_collection_msg, stream_collection_msg,
concat_sink_pads, concat_sink_pads,
.. ..
} => { } => {
inner.state = ItemState::Blocked { inner.state = ItemState::Blocked {
uridecodebin: uridecodebin.clone(), uridecodebin: uridecodebin.clone(),
sender: sender.clone(), channels: channels.clone(),
concat_sink_pads: concat_sink_pads.clone(), concat_sink_pads: concat_sink_pads.clone(),
stream_collection_msg: stream_collection_msg.copy(), stream_collection_msg: stream_collection_msg.copy(),
stream_selected_msg: None, stream_selected_msg: None,
@ -557,16 +544,13 @@ impl Item {
// from the Blocked state, called when the item streaming threads can be unblocked. // from the Blocked state, called when the item streaming threads can be unblocked.
// Return the queued messages from this item and the sender to unblock their pads // Return the queued messages from this item and the sender to unblock their pads
fn set_streaming( fn set_streaming(&self, n_streams: u32) -> (Vec<gst::Message>, Channels) {
&self,
n_streams: u32,
) -> (Vec<gst::Message>, crossbeam_channel::Sender<bool>) {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
match &mut inner.state { match &mut inner.state {
ItemState::Blocked { ItemState::Blocked {
uridecodebin, uridecodebin,
sender, channels,
stream_collection_msg, stream_collection_msg,
stream_selected_msg, stream_selected_msg,
concat_sink_pads, concat_sink_pads,
@ -576,7 +560,7 @@ impl Item {
if let Some(msg) = stream_selected_msg { if let Some(msg) = stream_selected_msg {
messages.push(msg.copy()); messages.push(msg.copy());
} }
let sender = sender.clone(); let channels = channels.clone();
inner.state = ItemState::Streaming { inner.state = ItemState::Streaming {
uridecodebin: uridecodebin.clone(), uridecodebin: uridecodebin.clone(),
@ -584,7 +568,7 @@ impl Item {
concat_sink_pads: concat_sink_pads.clone(), concat_sink_pads: concat_sink_pads.clone(),
}; };
(messages, sender) (messages, channels)
} }
_ => panic!("invalid state: {:?}", inner.state), _ => panic!("invalid state: {:?}", inner.state),
} }
@ -609,13 +593,13 @@ impl Item {
} }
} }
fn sender(&self) -> crossbeam_channel::Sender<bool> { fn channels(&self) -> Channels {
let inner = self.inner.lock().unwrap(); let inner = self.inner.lock().unwrap();
match &inner.state { match &inner.state {
ItemState::WaitingForStreamsynchronizerEos { sender, .. } => sender.clone(), ItemState::WaitingForStreamsynchronizerEos { channels, .. } => channels.clone(),
ItemState::WaitingForPads { sender, .. } => sender.clone(), ItemState::WaitingForPads { channels, .. } => channels.clone(),
ItemState::Blocked { sender, .. } => sender.clone(), ItemState::Blocked { channels, .. } => channels.clone(),
_ => panic!("invalid state: {:?}", inner.state), _ => panic!("invalid state: {:?}", inner.state),
} }
} }
@ -955,13 +939,13 @@ impl ElementImpl for UriPlaylistBin {
// As a result we have to explicitly unblock all receivers as dropping the sender // As a result we have to explicitly unblock all receivers as dropping the sender
// is not enough. // is not enough.
if let Some(item) = state.waiting_for_ss_eos.take() { if let Some(item) = state.waiting_for_ss_eos.take() {
let _ = item.sender().send(false); item.channels().send(false);
} }
if let Some(item) = state.waiting_for_pads.take() { if let Some(item) = state.waiting_for_pads.take() {
let _ = item.sender().send(false); item.channels().send(false);
} }
if let Some(item) = state.blocked.take() { if let Some(item) = state.blocked.take() {
let _ = item.sender().send(false); item.channels().send(false);
} }
} }
@ -1086,7 +1070,7 @@ impl UriPlaylistBin {
if let Some(item) = state.waiting_for_ss_eos.as_ref() { if let Some(item) = state.waiting_for_ss_eos.as_ref() {
// block pad until streamsynchronizer is eos // block pad until streamsynchronizer is eos
let element_weak = element.downgrade(); let element_weak = element.downgrade();
let receiver = item.receiver(); let receiver = Mutex::new(item.receiver());
gst_debug!( gst_debug!(
CAT, CAT,
@ -1102,6 +1086,7 @@ impl UriPlaylistBin {
}; };
let parent = pad.parent().unwrap(); let parent = pad.parent().unwrap();
let receiver = receiver.lock().unwrap();
let _ = receiver.recv(); let _ = receiver.recv();
gst_log!( gst_log!(
@ -1409,7 +1394,7 @@ impl UriPlaylistBin {
item.add_concat_sink_pad(&concat, &sink_pad); item.add_concat_sink_pad(&concat, &sink_pad);
// block pad until next item is reaching the `Blocked` state // block pad until next item is reaching the `Blocked` state
let receiver = item.receiver(); let receiver = Mutex::new(item.receiver());
let element_weak = element.downgrade(); let element_weak = element.downgrade();
let item_clone = item.clone(); let item_clone = item.clone();
@ -1431,6 +1416,8 @@ impl UriPlaylistBin {
pad.name() pad.name()
); );
let receiver = receiver.lock().unwrap();
if let Ok(false) = receiver.recv() { if let Ok(false) = receiver.recv() {
// we are shutting down so remove the probe. // we are shutting down so remove the probe.
// Don't handle Err(_) here as if the item has multiple pads, the sender may be dropped in unblock_item() // Don't handle Err(_) here as if the item has multiple pads, the sender may be dropped in unblock_item()
@ -1534,7 +1521,7 @@ impl UriPlaylistBin {
/// and so the elements can reorganize itself to handle a pending changes in /// and so the elements can reorganize itself to handle a pending changes in
/// streams topology. /// streams topology.
fn handle_topology_change(&self, element: &super::UriPlaylistBin) { fn handle_topology_change(&self, element: &super::UriPlaylistBin) {
let (pending_pads, sender) = { let (pending_pads, channels) = {
let mut state_guard = self.state.lock().unwrap(); let mut state_guard = self.state.lock().unwrap();
let state = state_guard.as_mut().unwrap(); let state = state_guard.as_mut().unwrap();
@ -1543,7 +1530,7 @@ impl UriPlaylistBin {
None => return, // element is being shutdown None => return, // element is being shutdown
}; };
let (topology, pending_pads, sender) = item.done_waiting_for_ss_eos(); let (topology, pending_pads, channels) = item.done_waiting_for_ss_eos();
state.waiting_for_pads = Some(item); state.waiting_for_pads = Some(item);
// remove now useless concat elements, missing ones will be added when handling src pads from decodebin // remove now useless concat elements, missing ones will be added when handling src pads from decodebin
@ -1603,7 +1590,7 @@ impl UriPlaylistBin {
state.streams_topology = topology; state.streams_topology = topology;
(pending_pads, sender) (pending_pads, channels)
}; };
// process decodebin src pads we already received and unblock them // process decodebin src pads we already received and unblock them
@ -1611,7 +1598,7 @@ impl UriPlaylistBin {
self.process_decodebin_pad(pad); self.process_decodebin_pad(pad);
} }
let _ = sender.send(true); channels.send(true);
} }
fn failed(&self, element: &super::UriPlaylistBin, error: PlaylistError) { fn failed(&self, element: &super::UriPlaylistBin, error: PlaylistError) {
@ -1692,3 +1679,25 @@ impl UriPlaylistBin {
} }
} }
} }
#[derive(Default, Clone, Debug)]
struct Channels {
senders: Arc<Mutex<Vec<mpsc::Sender<bool>>>>,
}
impl Channels {
fn get_receiver(&self) -> mpsc::Receiver<bool> {
let mut senders = self.senders.lock().unwrap();
let (sender, receiver) = mpsc::channel();
senders.push(sender);
receiver
}
fn send(&self, val: bool) {
let mut senders = self.senders.lock().unwrap();
// remove sender if sending failed
senders.retain(|sender| sender.send(val).is_ok());
}
}