diff --git a/Cargo.lock b/Cargo.lock index 8fb7b573..7d7ecdaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1781,7 +1781,6 @@ name = "gst-plugin-uriplaylistbin" version = "0.8.3" dependencies = [ "anyhow", - "crossbeam-channel", "gst-plugin-version-helper", "gstreamer", "gstreamer-app", diff --git a/utils/uriplaylistbin/Cargo.toml b/utils/uriplaylistbin/Cargo.toml index af74f481..eb537edb 100644 --- a/utils/uriplaylistbin/Cargo.toml +++ b/utils/uriplaylistbin/Cargo.toml @@ -11,7 +11,6 @@ description = "Playlist Plugin" gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.18", version = "0.18", features = ["v1_14"] } once_cell = "1.0" anyhow = "1" -crossbeam-channel = "0.5" [dev-dependencies] gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.18", version = "0.18", features = ["v1_14"]} diff --git a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs index f98c8887..4198c991 100644 --- a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs +++ b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs @@ -7,7 +7,7 @@ // SPDX-License-Identifier: MPL-2.0 use std::sync::Arc; -use std::sync::{Mutex, MutexGuard}; +use std::sync::{mpsc, Mutex, MutexGuard}; use gst::glib; use gst::prelude::*; @@ -175,7 +175,7 @@ impl State { fn unblock_item(&mut self, element: &super::UriPlaylistBin) { if let Some(blocked) = self.blocked.take() { - let (messages, sender) = blocked.set_streaming(self.streams_topology.n_streams()); + let (messages, channels) = blocked.set_streaming(self.streams_topology.n_streams()); gst_log!( CAT, @@ -188,7 +188,8 @@ impl State { for msg in messages { let _ = element.post_message(msg); } - let _ = sender.send(true); + + channels.send(true); self.streaming.push(blocked); } @@ -218,9 +219,8 @@ enum ItemState { /// number of streamsynchronizer src pads which are not eos yet waiting_eos: u32, stream_collection_msg: gst::Message, - // channel used to block pads flow until streamsynchronizer is eos - sender: crossbeam_channel::Sender, - receiver: crossbeam_channel::Receiver, + // channels used to block pads flow until streamsynchronizer is eos + channels: Channels, }, /// Waiting that pads of all the streams have been created on decodebin. /// Required to ensure that streams are plugged to concat in the playlist order. @@ -230,9 +230,8 @@ enum ItemState { stream_collection_msg: gst::Message, /// concat sink pads which have been requested to handle this item concat_sink_pads: Vec<(gst::Element, gst::Pad)>, - // channel used to block pad flow in the Blocked state - sender: crossbeam_channel::Sender, - receiver: crossbeam_channel::Receiver, + // channels used to block pad flow in the Blocked state + channels: Channels, }, /// Pads have been linked to `concat` elements but are blocked until the next item is linked to `concat` as well. /// This is required to ensure gap-less transition between items. @@ -241,7 +240,7 @@ enum ItemState { stream_collection_msg: gst::Message, stream_selected_msg: Option, concat_sink_pads: Vec<(gst::Element, gst::Pad)>, - sender: crossbeam_channel::Sender, + channels: Channels, }, /// Buffers are flowing Streaming { @@ -333,15 +332,17 @@ impl Item { } } - fn receiver(&self) -> crossbeam_channel::Receiver { - let inner = self.inner.lock().unwrap(); + fn receiver(&self) -> mpsc::Receiver { + let mut inner = self.inner.lock().unwrap(); - match &inner.state { - ItemState::WaitingForPads { receiver, .. } => receiver.clone(), - ItemState::WaitingForStreamsynchronizerEos { receiver, .. } => receiver.clone(), + let channels = match &mut inner.state { + ItemState::WaitingForPads { channels, .. } => channels, + ItemState::WaitingForStreamsynchronizerEos { channels, .. } => channels, // receiver is no longer supposed to be accessed once in the `Blocked` state _ => panic!("invalid state: {:?}", inner.state), - } + }; + + channels.get_receiver() } fn add_blocked_pad(&self, pad: gst::Pad) { @@ -443,8 +444,6 @@ impl Item { ItemState::WaitingForStreamCollection { .. } )); - let (sender, receiver) = crossbeam_channel::unbounded::(); - match &inner.state { ItemState::WaitingForStreamCollection { uridecodebin } => { inner.state = ItemState::WaitingForPads { @@ -452,8 +451,7 @@ impl Item { n_pads_pendings: n_streams, stream_collection_msg: msg.copy(), concat_sink_pads: vec![], - sender, - receiver, + channels: Channels::default(), }; } _ => panic!("invalid state: {:?}", inner.state), @@ -465,7 +463,6 @@ impl Item { // having to wait until streamsynchronizer is flushed to internally reorganize the element. fn set_waiting_for_ss_eos(&self, waiting_eos: u32, msg: gst::message::StreamCollection) { let mut inner = self.inner.lock().unwrap(); - let (sender, receiver) = crossbeam_channel::unbounded::(); match &inner.state { ItemState::WaitingForStreamCollection { uridecodebin } => { @@ -475,8 +472,7 @@ impl Item { waiting_eos, // FIXME: save deep copy once https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/363 is fixed stream_collection_msg: msg.copy(), - sender, - receiver, + channels: Channels::default(), }; } _ => panic!("invalid state: {:?}", inner.state), @@ -485,13 +481,7 @@ impl Item { // from the WaitingForStreamsynchronizerEos state, called when the streamsynchronizer has been flushed // and the item can now be processed. - fn done_waiting_for_ss_eos( - &self, - ) -> ( - StreamsTopology, - Vec, - crossbeam_channel::Sender, - ) { + fn done_waiting_for_ss_eos(&self) -> (StreamsTopology, Vec, Channels) { let mut inner = self.inner.lock().unwrap(); match &inner.state { @@ -500,7 +490,7 @@ impl Item { decodebin_pads, waiting_eos, stream_collection_msg, - sender, + channels, .. } => { assert_eq!(*waiting_eos, 0); @@ -512,20 +502,17 @@ impl Item { _ => unreachable!(), }; let pending_pads = decodebin_pads.clone(); - let sender = sender.clone(); - - let (new_sender, new_receiver) = crossbeam_channel::unbounded::(); + let channels = channels.clone(); inner.state = ItemState::WaitingForPads { uridecodebin: uridecodebin.clone(), n_pads_pendings: topology.n_streams(), stream_collection_msg: stream_collection_msg.copy(), concat_sink_pads: vec![], - sender: new_sender, - receiver: new_receiver, + channels: Channels::default(), }; - (topology, pending_pads, sender) + (topology, pending_pads, channels) } _ => panic!("invalid state: {:?}", inner.state), } @@ -538,14 +525,14 @@ impl Item { match &mut inner.state { ItemState::WaitingForPads { uridecodebin, - sender, + channels, stream_collection_msg, concat_sink_pads, .. } => { inner.state = ItemState::Blocked { uridecodebin: uridecodebin.clone(), - sender: sender.clone(), + channels: channels.clone(), concat_sink_pads: concat_sink_pads.clone(), stream_collection_msg: stream_collection_msg.copy(), stream_selected_msg: None, @@ -557,16 +544,13 @@ impl Item { // from the Blocked state, called when the item streaming threads can be unblocked. // Return the queued messages from this item and the sender to unblock their pads - fn set_streaming( - &self, - n_streams: u32, - ) -> (Vec, crossbeam_channel::Sender) { + fn set_streaming(&self, n_streams: u32) -> (Vec, Channels) { let mut inner = self.inner.lock().unwrap(); match &mut inner.state { ItemState::Blocked { uridecodebin, - sender, + channels, stream_collection_msg, stream_selected_msg, concat_sink_pads, @@ -576,7 +560,7 @@ impl Item { if let Some(msg) = stream_selected_msg { messages.push(msg.copy()); } - let sender = sender.clone(); + let channels = channels.clone(); inner.state = ItemState::Streaming { uridecodebin: uridecodebin.clone(), @@ -584,7 +568,7 @@ impl Item { concat_sink_pads: concat_sink_pads.clone(), }; - (messages, sender) + (messages, channels) } _ => panic!("invalid state: {:?}", inner.state), } @@ -609,13 +593,13 @@ impl Item { } } - fn sender(&self) -> crossbeam_channel::Sender { + fn channels(&self) -> Channels { let inner = self.inner.lock().unwrap(); match &inner.state { - ItemState::WaitingForStreamsynchronizerEos { sender, .. } => sender.clone(), - ItemState::WaitingForPads { sender, .. } => sender.clone(), - ItemState::Blocked { sender, .. } => sender.clone(), + ItemState::WaitingForStreamsynchronizerEos { channels, .. } => channels.clone(), + ItemState::WaitingForPads { channels, .. } => channels.clone(), + ItemState::Blocked { channels, .. } => channels.clone(), _ => panic!("invalid state: {:?}", inner.state), } } @@ -955,13 +939,13 @@ impl ElementImpl for UriPlaylistBin { // As a result we have to explicitly unblock all receivers as dropping the sender // is not enough. if let Some(item) = state.waiting_for_ss_eos.take() { - let _ = item.sender().send(false); + item.channels().send(false); } if let Some(item) = state.waiting_for_pads.take() { - let _ = item.sender().send(false); + item.channels().send(false); } if let Some(item) = state.blocked.take() { - let _ = item.sender().send(false); + item.channels().send(false); } } @@ -1086,7 +1070,7 @@ impl UriPlaylistBin { if let Some(item) = state.waiting_for_ss_eos.as_ref() { // block pad until streamsynchronizer is eos let element_weak = element.downgrade(); - let receiver = item.receiver(); + let receiver = Mutex::new(item.receiver()); gst_debug!( CAT, @@ -1102,6 +1086,7 @@ impl UriPlaylistBin { }; let parent = pad.parent().unwrap(); + let receiver = receiver.lock().unwrap(); let _ = receiver.recv(); gst_log!( @@ -1409,7 +1394,7 @@ impl UriPlaylistBin { item.add_concat_sink_pad(&concat, &sink_pad); // block pad until next item is reaching the `Blocked` state - let receiver = item.receiver(); + let receiver = Mutex::new(item.receiver()); let element_weak = element.downgrade(); let item_clone = item.clone(); @@ -1431,6 +1416,8 @@ impl UriPlaylistBin { pad.name() ); + let receiver = receiver.lock().unwrap(); + if let Ok(false) = receiver.recv() { // we are shutting down so remove the probe. // Don't handle Err(_) here as if the item has multiple pads, the sender may be dropped in unblock_item() @@ -1534,7 +1521,7 @@ impl UriPlaylistBin { /// and so the elements can reorganize itself to handle a pending changes in /// streams topology. fn handle_topology_change(&self, element: &super::UriPlaylistBin) { - let (pending_pads, sender) = { + let (pending_pads, channels) = { let mut state_guard = self.state.lock().unwrap(); let state = state_guard.as_mut().unwrap(); @@ -1543,7 +1530,7 @@ impl UriPlaylistBin { None => return, // element is being shutdown }; - let (topology, pending_pads, sender) = item.done_waiting_for_ss_eos(); + let (topology, pending_pads, channels) = item.done_waiting_for_ss_eos(); state.waiting_for_pads = Some(item); // remove now useless concat elements, missing ones will be added when handling src pads from decodebin @@ -1603,7 +1590,7 @@ impl UriPlaylistBin { state.streams_topology = topology; - (pending_pads, sender) + (pending_pads, channels) }; // process decodebin src pads we already received and unblock them @@ -1611,7 +1598,7 @@ impl UriPlaylistBin { self.process_decodebin_pad(pad); } - let _ = sender.send(true); + channels.send(true); } fn failed(&self, element: &super::UriPlaylistBin, error: PlaylistError) { @@ -1692,3 +1679,25 @@ impl UriPlaylistBin { } } } + +#[derive(Default, Clone, Debug)] +struct Channels { + senders: Arc>>>, +} + +impl Channels { + fn get_receiver(&self) -> mpsc::Receiver { + let mut senders = self.senders.lock().unwrap(); + + let (sender, receiver) = mpsc::channel(); + senders.push(sender); + receiver + } + + fn send(&self, val: bool) { + let mut senders = self.senders.lock().unwrap(); + + // remove sender if sending failed + senders.retain(|sender| sender.send(val).is_ok()); + } +}