From 97dba9046b11dd62d2aa273fbf6b69d870f10ed4 Mon Sep 17 00:00:00 2001 From: Guillaume Desmottes Date: Thu, 7 Apr 2022 14:19:56 +0200 Subject: [PATCH] uridecodebin: stop using crossbeam_channel I give up on crossbeam_channel. For some reasons some receivers are not always unblocked and I was not able to reproduce using simpler test cases. Use with mpsc channels instead which are more reliable. --- utils/uriplaylistbin/Cargo.toml | 1 - .../uriplaylistbin/src/uriplaylistbin/imp.rs | 127 ++++++++++-------- 2 files changed, 68 insertions(+), 60 deletions(-) diff --git a/utils/uriplaylistbin/Cargo.toml b/utils/uriplaylistbin/Cargo.toml index 06e8a649..0bfe1f83 100644 --- a/utils/uriplaylistbin/Cargo.toml +++ b/utils/uriplaylistbin/Cargo.toml @@ -11,7 +11,6 @@ description = "Playlist Plugin" gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } once_cell = "1.0" anyhow = "1" -crossbeam-channel = "0.5" [dev-dependencies] gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } diff --git a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs index ad05123a..359ab7af 100644 --- a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs +++ b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs @@ -7,7 +7,7 @@ // SPDX-License-Identifier: MPL-2.0 use std::sync::Arc; -use std::sync::{Mutex, MutexGuard}; +use std::sync::{mpsc, Mutex, MutexGuard}; use gst::glib; use gst::prelude::*; @@ -174,7 +174,7 @@ impl State { fn unblock_item(&mut self, element: &super::UriPlaylistBin) { if let Some(blocked) = self.blocked.take() { - let (messages, sender) = blocked.set_streaming(self.streams_topology.n_streams()); + let (messages, channels) = blocked.set_streaming(self.streams_topology.n_streams()); gst::log!( CAT, @@ -187,7 +187,8 @@ impl State { for msg in messages { let _ = element.post_message(msg); } - let _ = sender.send(true); + + channels.send(true); self.streaming.push(blocked); } @@ -217,9 +218,8 @@ enum ItemState { /// number of streamsynchronizer src pads which are not eos yet waiting_eos: u32, stream_collection_msg: gst::Message, - // channel used to block pads flow until streamsynchronizer is eos - sender: crossbeam_channel::Sender, - receiver: crossbeam_channel::Receiver, + // channels used to block pads flow until streamsynchronizer is eos + channels: Channels, }, /// Waiting that pads of all the streams have been created on decodebin. /// Required to ensure that streams are plugged to concat in the playlist order. @@ -229,9 +229,8 @@ enum ItemState { stream_collection_msg: gst::Message, /// concat sink pads which have been requested to handle this item concat_sink_pads: Vec<(gst::Element, gst::Pad)>, - // channel used to block pad flow in the Blocked state - sender: crossbeam_channel::Sender, - receiver: crossbeam_channel::Receiver, + // channels used to block pad flow in the Blocked state + channels: Channels, }, /// Pads have been linked to `concat` elements but are blocked until the next item is linked to `concat` as well. /// This is required to ensure gap-less transition between items. @@ -240,7 +239,7 @@ enum ItemState { stream_collection_msg: gst::Message, stream_selected_msg: Option, concat_sink_pads: Vec<(gst::Element, gst::Pad)>, - sender: crossbeam_channel::Sender, + channels: Channels, }, /// Buffers are flowing Streaming { @@ -332,15 +331,17 @@ impl Item { } } - fn receiver(&self) -> crossbeam_channel::Receiver { - let inner = self.inner.lock().unwrap(); + fn receiver(&self) -> mpsc::Receiver { + let mut inner = self.inner.lock().unwrap(); - match &inner.state { - ItemState::WaitingForPads { receiver, .. } => receiver.clone(), - ItemState::WaitingForStreamsynchronizerEos { receiver, .. } => receiver.clone(), + let channels = match &mut inner.state { + ItemState::WaitingForPads { channels, .. } => channels, + ItemState::WaitingForStreamsynchronizerEos { channels, .. } => channels, // receiver is no longer supposed to be accessed once in the `Blocked` state _ => panic!("invalid state: {:?}", inner.state), - } + }; + + channels.get_receiver() } fn add_blocked_pad(&self, pad: gst::Pad) { @@ -442,8 +443,6 @@ impl Item { ItemState::WaitingForStreamCollection { .. } )); - let (sender, receiver) = crossbeam_channel::unbounded::(); - match &inner.state { ItemState::WaitingForStreamCollection { uridecodebin } => { inner.state = ItemState::WaitingForPads { @@ -451,8 +450,7 @@ impl Item { n_pads_pendings: n_streams, stream_collection_msg: msg.copy(), concat_sink_pads: vec![], - sender, - receiver, + channels: Channels::default(), }; } _ => panic!("invalid state: {:?}", inner.state), @@ -464,7 +462,6 @@ impl Item { // having to wait until streamsynchronizer is flushed to internally reorganize the element. fn set_waiting_for_ss_eos(&self, waiting_eos: u32, msg: &gst::message::StreamCollection) { let mut inner = self.inner.lock().unwrap(); - let (sender, receiver) = crossbeam_channel::unbounded::(); match &inner.state { ItemState::WaitingForStreamCollection { uridecodebin } => { @@ -474,8 +471,7 @@ impl Item { waiting_eos, // FIXME: save deep copy once https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/363 is fixed stream_collection_msg: msg.copy(), - sender, - receiver, + channels: Channels::default(), }; } _ => panic!("invalid state: {:?}", inner.state), @@ -484,13 +480,7 @@ impl Item { // from the WaitingForStreamsynchronizerEos state, called when the streamsynchronizer has been flushed // and the item can now be processed. - fn done_waiting_for_ss_eos( - &self, - ) -> ( - StreamsTopology, - Vec, - crossbeam_channel::Sender, - ) { + fn done_waiting_for_ss_eos(&self) -> (StreamsTopology, Vec, Channels) { let mut inner = self.inner.lock().unwrap(); match &inner.state { @@ -499,7 +489,7 @@ impl Item { decodebin_pads, waiting_eos, stream_collection_msg, - sender, + channels, .. } => { assert_eq!(*waiting_eos, 0); @@ -511,20 +501,17 @@ impl Item { _ => unreachable!(), }; let pending_pads = decodebin_pads.clone(); - let sender = sender.clone(); - - let (new_sender, new_receiver) = crossbeam_channel::unbounded::(); + let channels = channels.clone(); inner.state = ItemState::WaitingForPads { uridecodebin: uridecodebin.clone(), n_pads_pendings: topology.n_streams(), stream_collection_msg: stream_collection_msg.copy(), concat_sink_pads: vec![], - sender: new_sender, - receiver: new_receiver, + channels: Channels::default(), }; - (topology, pending_pads, sender) + (topology, pending_pads, channels) } _ => panic!("invalid state: {:?}", inner.state), } @@ -537,14 +524,14 @@ impl Item { match &mut inner.state { ItemState::WaitingForPads { uridecodebin, - sender, + channels, stream_collection_msg, concat_sink_pads, .. } => { inner.state = ItemState::Blocked { uridecodebin: uridecodebin.clone(), - sender: sender.clone(), + channels: channels.clone(), concat_sink_pads: concat_sink_pads.clone(), stream_collection_msg: stream_collection_msg.copy(), stream_selected_msg: None, @@ -556,16 +543,13 @@ impl Item { // from the Blocked state, called when the item streaming threads can be unblocked. // Return the queued messages from this item and the sender to unblock their pads - fn set_streaming( - &self, - n_streams: u32, - ) -> (Vec, crossbeam_channel::Sender) { + fn set_streaming(&self, n_streams: u32) -> (Vec, Channels) { let mut inner = self.inner.lock().unwrap(); match &mut inner.state { ItemState::Blocked { uridecodebin, - sender, + channels, stream_collection_msg, stream_selected_msg, concat_sink_pads, @@ -575,7 +559,7 @@ impl Item { if let Some(msg) = stream_selected_msg { messages.push(msg.copy()); } - let sender = sender.clone(); + let channels = channels.clone(); inner.state = ItemState::Streaming { uridecodebin: uridecodebin.clone(), @@ -583,7 +567,7 @@ impl Item { concat_sink_pads: concat_sink_pads.clone(), }; - (messages, sender) + (messages, channels) } _ => panic!("invalid state: {:?}", inner.state), } @@ -608,13 +592,13 @@ impl Item { } } - fn sender(&self) -> crossbeam_channel::Sender { + fn channels(&self) -> Channels { let inner = self.inner.lock().unwrap(); match &inner.state { - ItemState::WaitingForStreamsynchronizerEos { sender, .. } => sender.clone(), - ItemState::WaitingForPads { sender, .. } => sender.clone(), - ItemState::Blocked { sender, .. } => sender.clone(), + ItemState::WaitingForStreamsynchronizerEos { channels, .. } => channels.clone(), + ItemState::WaitingForPads { channels, .. } => channels.clone(), + ItemState::Blocked { channels, .. } => channels.clone(), _ => panic!("invalid state: {:?}", inner.state), } } @@ -954,13 +938,13 @@ impl ElementImpl for UriPlaylistBin { // As a result we have to explicitly unblock all receivers as dropping the sender // is not enough. if let Some(item) = state.waiting_for_ss_eos.take() { - let _ = item.sender().send(false); + item.channels().send(false); } if let Some(item) = state.waiting_for_pads.take() { - let _ = item.sender().send(false); + item.channels().send(false); } if let Some(item) = state.blocked.take() { - let _ = item.sender().send(false); + item.channels().send(false); } } @@ -1085,7 +1069,7 @@ impl UriPlaylistBin { if let Some(item) = state.waiting_for_ss_eos.as_ref() { // block pad until streamsynchronizer is eos let element_weak = element.downgrade(); - let receiver = item.receiver(); + let receiver = Mutex::new(item.receiver()); gst::debug!( CAT, @@ -1101,6 +1085,7 @@ impl UriPlaylistBin { }; let parent = pad.parent().unwrap(); + let receiver = receiver.lock().unwrap(); let _ = receiver.recv(); gst::log!( @@ -1408,7 +1393,7 @@ impl UriPlaylistBin { item.add_concat_sink_pad(&concat, &sink_pad); // block pad until next item is reaching the `Blocked` state - let receiver = item.receiver(); + let receiver = Mutex::new(item.receiver()); let element_weak = element.downgrade(); let item_clone = item.clone(); @@ -1430,6 +1415,8 @@ impl UriPlaylistBin { pad.name() ); + let receiver = receiver.lock().unwrap(); + if let Ok(false) = receiver.recv() { // we are shutting down so remove the probe. // Don't handle Err(_) here as if the item has multiple pads, the sender may be dropped in unblock_item() @@ -1533,7 +1520,7 @@ impl UriPlaylistBin { /// and so the elements can reorganize itself to handle a pending changes in /// streams topology. fn handle_topology_change(&self, element: &super::UriPlaylistBin) { - let (pending_pads, sender) = { + let (pending_pads, channels) = { let mut state_guard = self.state.lock().unwrap(); let state = state_guard.as_mut().unwrap(); @@ -1542,7 +1529,7 @@ impl UriPlaylistBin { None => return, // element is being shutdown }; - let (topology, pending_pads, sender) = item.done_waiting_for_ss_eos(); + let (topology, pending_pads, channels) = item.done_waiting_for_ss_eos(); state.waiting_for_pads = Some(item); // remove now useless concat elements, missing ones will be added when handling src pads from decodebin @@ -1602,7 +1589,7 @@ impl UriPlaylistBin { state.streams_topology = topology; - (pending_pads, sender) + (pending_pads, channels) }; // process decodebin src pads we already received and unblock them @@ -1610,7 +1597,7 @@ impl UriPlaylistBin { self.process_decodebin_pad(pad); } - let _ = sender.send(true); + channels.send(true); } fn failed(&self, element: &super::UriPlaylistBin, error: PlaylistError) { @@ -1691,3 +1678,25 @@ impl UriPlaylistBin { } } } + +#[derive(Default, Clone, Debug)] +struct Channels { + senders: Arc>>>, +} + +impl Channels { + fn get_receiver(&self) -> mpsc::Receiver { + let mut senders = self.senders.lock().unwrap(); + + let (sender, receiver) = mpsc::channel(); + senders.push(sender); + receiver + } + + fn send(&self, val: bool) { + let mut senders = self.senders.lock().unwrap(); + + // remove sender if sending failed + senders.retain(|sender| sender.send(val).is_ok()); + } +}