diff --git a/Cargo.toml b/Cargo.toml index e2065554..796c6568 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "net/rusoto", "utils/fallbackswitch", "utils/togglerecord", + "utils/uriplaylistbin", "video/cdg", "video/closedcaption", "video/videofx", @@ -46,6 +47,7 @@ default-members = [ "net/rusoto", "utils/fallbackswitch", "utils/togglerecord", + "utils/uriplaylistbin", "video/cdg", "video/ffv1", "video/flavors", diff --git a/meson.build b/meson.build index 8dbbef34..772d714d 100644 --- a/meson.build +++ b/meson.build @@ -58,6 +58,7 @@ plugins = { # https://github.com/qnighy/libwebp-sys2-rs/issues/4 'gst-plugin-webp': 'libgstrswebp', 'gst-plugin-videofx': 'libgstvideofx', + 'gst-plugin-uriplaylistbin': 'libgsturiplaylistbin', } extra_env = {} diff --git a/utils/uriplaylistbin/Cargo.toml b/utils/uriplaylistbin/Cargo.toml new file mode 100644 index 00000000..1b102265 --- /dev/null +++ b/utils/uriplaylistbin/Cargo.toml @@ -0,0 +1,50 @@ +[package] +name = "gst-plugin-uriplaylistbin" +version = "0.8.0" +authors = ["Guillaume Desmottes "] +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" +license = "MPL-2.0" +edition = "2018" +description = "Playlist Plugin" + +[dependencies] +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] } +once_cell = "1.0" +anyhow = "1" +crossbeam-channel = "0.5" + +[dev-dependencies] +gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"]} +structopt = "0.3" +url = "2.2" +more-asserts = "0.2" + +[lib] +name = "gsturiplaylistbin" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[[example]] +name = "playlist" +path = "examples/playlist.rs" + +[build-dependencies] +gst-plugin-version-helper = { path="../../version-helper" } + +[features] +# GStreamer 1.14 is required for static linking +static = ["gst/v1_14"] +capi = [] + +[package.metadata.capi] +min_version = "0.8.0" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/utils/uriplaylistbin/build.rs b/utils/uriplaylistbin/build.rs new file mode 100644 index 00000000..cda12e57 --- /dev/null +++ b/utils/uriplaylistbin/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} diff --git a/utils/uriplaylistbin/examples/playlist.rs b/utils/uriplaylistbin/examples/playlist.rs new file mode 100644 index 00000000..ee81000b --- /dev/null +++ b/utils/uriplaylistbin/examples/playlist.rs @@ -0,0 +1,137 @@ +// Copyright (C) 2021 OneStream Live +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::{ + collections::HashMap, + path::Path, + sync::{Arc, Mutex}, +}; + +use gst::prelude::*; +use structopt::StructOpt; + +#[derive(Debug, StructOpt)] +#[structopt(name = "playlist", about = "An example of uriplaylistbin usage.")] +struct Opt { + #[structopt(default_value = "1", short = "i", long = "iterations")] + iterations: u32, + uris: Vec, +} + +fn create_pipeline(uris: Vec, iterations: u32) -> anyhow::Result { + let pipeline = gst::Pipeline::new(None); + let playlist = gst::ElementFactory::make("uriplaylistbin", None)?; + + pipeline.add(&playlist)?; + + playlist.set_property("uris", &uris); + playlist.set_property("iterations", &iterations); + + let sink_bins = Arc::new(Mutex::new(HashMap::new())); + let sink_bins_clone = sink_bins.clone(); + + let pipeline_weak = pipeline.downgrade(); + playlist.connect_pad_added(move |_playlist, src_pad| { + let pipeline = match pipeline_weak.upgrade() { + None => return, + Some(pipeline) => pipeline, + }; + let pad_name = src_pad.name(); + + let sink = if pad_name.starts_with("audio") { + gst::parse_bin_from_description("audioconvert ! audioresample ! autoaudiosink", true) + .unwrap() + } else if pad_name.starts_with("video") { + gst::parse_bin_from_description("videoconvert ! autovideosink", true).unwrap() + } else { + unimplemented!(); + }; + + pipeline.add(&sink).unwrap(); + sink.sync_state_with_parent().unwrap(); + + let sink_pad = sink.static_pad("sink").unwrap(); + src_pad.link(&sink_pad).unwrap(); + + sink_bins.lock().unwrap().insert(pad_name, sink); + }); + + let pipeline_weak = pipeline.downgrade(); + playlist.connect_pad_removed(move |_playlist, pad| { + let pipeline = match pipeline_weak.upgrade() { + None => return, + Some(pipeline) => pipeline, + }; + + // remove sink bin that was handling the pad + let sink_bins = sink_bins_clone.lock().unwrap(); + let sink = sink_bins.get(&pad.name()).unwrap(); + pipeline.remove(sink).unwrap(); + let _ = sink.set_state(gst::State::Null); + }); + + Ok(pipeline) +} + +fn main() -> anyhow::Result<()> { + gst::init().unwrap(); + gsturiplaylistbin::plugin_register_static().expect("Failed to register uriplaylistbin plugin"); + + let opt = Opt::from_args(); + if opt.uris.is_empty() { + anyhow::bail!("Need at least one URI to play"); + } + + let uris = opt + .uris + .into_iter() + .map(|uri| { + let p = Path::new(&uri); + match p.canonicalize() { + Ok(p) => format!("file://{}", p.to_str().unwrap().to_string()), + _ => uri, + } + }) + .collect(); + + { + let pipeline = create_pipeline(uris, opt.iterations)?; + + pipeline + .set_state(gst::State::Playing) + .expect("Unable to set the pipeline to the `Playing` state"); + + let bus = pipeline.bus().unwrap(); + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + match msg.view() { + MessageView::Error(err) => { + eprintln!( + "Error received from element {:?}: {}", + err.src().map(|s| s.path_string()), + err.error() + ); + eprintln!("Debugging information: {:?}", err.debug()); + break; + } + MessageView::Eos(..) => break, + _ => (), + } + } + + pipeline + .set_state(gst::State::Null) + .expect("Unable to set the pipeline to the `Null` state"); + } + + unsafe { + gst::deinit(); + } + + Ok(()) +} diff --git a/utils/uriplaylistbin/src/lib.rs b/utils/uriplaylistbin/src/lib.rs new file mode 100644 index 00000000..38c32e8e --- /dev/null +++ b/utils/uriplaylistbin/src/lib.rs @@ -0,0 +1,28 @@ +// Copyright (C) 2021 OneStream Live +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; + +mod uriplaylistbin; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + uriplaylistbin::register(plugin)?; + Ok(()) +} + +gst::plugin_define!( + uriplaylistbin, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "LGPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs new file mode 100644 index 00000000..906cf524 --- /dev/null +++ b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs @@ -0,0 +1,1572 @@ +// Copyright (C) 2021 OneStream Live +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::sync::Arc; +use std::sync::Mutex; + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::{gst_debug, gst_error, gst_fixme, gst_info, gst_log, gst_warning}; + +use once_cell::sync::Lazy; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "uriplaylistbin", + gst::DebugColorFlags::empty(), + Some("Uri Playlist Bin"), + ) +}); + +/// how many items are allowed to be prepared and waiting in the pipeline +const MAX_STREAMING_ITEMS: usize = 2; + +#[derive(Debug)] +enum PlaylistError { + PluginMissing { error: anyhow::Error }, + ItemFailed { error: anyhow::Error, item: Item }, +} + +impl std::fmt::Display for PlaylistError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PlaylistError::PluginMissing { error } => { + write!(f, "{}", error) + } + PlaylistError::ItemFailed { error, item } => { + write!(f, "{} (URI: {})", error, item.uri()) + } + } + } +} + +impl std::error::Error for PlaylistError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + PlaylistError::PluginMissing { error } | PlaylistError::ItemFailed { error, .. } => { + Some(error.as_ref()) + } + } + } +} + +/// Number of different streams currently handled by the element +#[derive(Debug, Clone, PartialEq)] +struct StreamsTopology { + audio: u32, + video: u32, + text: u32, +} + +impl Default for StreamsTopology { + fn default() -> Self { + Self { + audio: 0, + video: 0, + text: 0, + } + } +} + +impl StreamsTopology { + fn n_streams(&self) -> u32 { + self.audio + self.video + self.text + } +} + +impl<'a> From for StreamsTopology { + fn from(collection: gst::StreamCollection) -> Self { + let (mut audio, mut video, mut text) = (0, 0, 0); + for stream in collection.iter() { + match stream.stream_type() { + gst::StreamType::AUDIO => audio += 1, + gst::StreamType::VIDEO => video += 1, + gst::StreamType::TEXT => text += 1, + _ => {} + } + } + + Self { audio, video, text } + } +} + +#[derive(Debug, Clone)] +struct Settings { + uris: Vec, + iterations: u32, +} + +impl Default for Settings { + fn default() -> Self { + Self { + uris: vec![], + iterations: 1, + } + } +} + +struct State { + streamsynchronizer: gst::Element, + concat_audio: Vec, + concat_video: Vec, + concat_text: Vec, + + playlist: Playlist, + + /// the current number of streams handled by the element + streams_topology: StreamsTopology, + // true if the element stopped because of an error + errored: bool, + + // we have max one item in one of each of those states + waiting_for_stream_collection: Option, + waiting_for_ss_eos: Option, + waiting_for_pads: Option, + blocked: Option, + // multiple items can be streaming, `concat` elements will block them all but the active one + streaming: Vec, + // items which have been fully played, waiting to be cleaned up + done: Vec, +} + +impl State { + fn new(uris: Vec, iterations: u32, streamsynchronizer: gst::Element) -> Self { + Self { + concat_audio: vec![], + concat_video: vec![], + concat_text: vec![], + streamsynchronizer, + playlist: Playlist::new(uris, iterations), + streams_topology: StreamsTopology::default(), + errored: false, + waiting_for_stream_collection: None, + waiting_for_ss_eos: None, + waiting_for_pads: None, + blocked: None, + streaming: vec![], + done: vec![], + } + } + + /// Return the item whose decodebin is either `src` or an ancestor of `src` + fn find_item_from_src(&self, src: &gst::Object) -> Option { + // iterate in all the places we store `Item`, ordering does not matter + // as one decodebin element can be in only one Item. + let mut items = self + .waiting_for_stream_collection + .iter() + .chain(self.waiting_for_ss_eos.iter()) + .chain(self.waiting_for_pads.iter()) + .chain(self.blocked.iter()) + .chain(self.streaming.iter()) + .chain(self.done.iter()); + + items + .find(|item| { + let decodebin = item.uridecodebin(); + let from_decodebin = src == &decodebin; + let bin = decodebin.downcast_ref::().unwrap(); + from_decodebin || src.has_as_ancestor(bin) + }) + .cloned() + } + + fn unblock_item(&mut self, element: &super::UriPlaylistBin) { + if let Some(blocked) = self.blocked.take() { + let (messages, sender) = blocked.set_streaming(self.streams_topology.n_streams()); + + gst_log!( + CAT, + obj: element, + "send pending message of item #{} and unblock its pads", + blocked.index() + ); + + // send pending messages then unblock pads + for msg in messages { + let _ = element.post_message(msg); + } + let _ = sender.send(true); + + self.streaming.push(blocked); + } + } +} + +#[derive(Default)] +pub struct UriPlaylistBin { + settings: Mutex, + state: Mutex>, +} + +#[derive(Debug, Clone)] +enum ItemState { + /// Waiting to create a decodebin element + Pending, + /// Waiting to receive the stream collection from its decodebin element + WaitingForStreamCollection { uridecodebin: gst::Element }, + /// Waiting for streamsynchronizer to be eos on all its src pads. + /// Only used to block item whose streams topology is different from the one + /// currently handled by the element. In such case we need to wait for + /// streamsynchronizer to be flushed before adding/removing concat elements. + WaitingForStreamsynchronizerEos { + uridecodebin: gst::Element, + /// src pads from decodebin currently blocked + decodebin_pads: Vec, + /// number of streamsynchronizer src pads which are not eos yet + waiting_eos: u32, + stream_collection_msg: gst::Message, + // channel used to block pads flow until streamsynchronizer is eos + sender: crossbeam_channel::Sender, + receiver: crossbeam_channel::Receiver, + }, + /// Waiting that pads of all the streams have been created on decodebin. + /// Required to ensure that streams are plugged to concat in the playlist order. + WaitingForPads { + uridecodebin: gst::Element, + n_pads_pendings: u32, + stream_collection_msg: gst::Message, + /// concat sink pads which have been requested to handle this item + concat_sink_pads: Vec<(gst::Element, gst::Pad)>, + // channel used to block pad flow in the Blocked state + sender: crossbeam_channel::Sender, + receiver: crossbeam_channel::Receiver, + }, + /// Pads have been linked to `concat` elements but are blocked until the next item is linked to `concat` as well. + /// This is required to ensure gap-less transition between items. + Blocked { + uridecodebin: gst::Element, + stream_collection_msg: gst::Message, + stream_selected_msg: Option, + concat_sink_pads: Vec<(gst::Element, gst::Pad)>, + sender: crossbeam_channel::Sender, + receiver: crossbeam_channel::Receiver, + }, + /// Buffers are flowing + Streaming { + uridecodebin: gst::Element, + concat_sink_pads: Vec<(gst::Element, gst::Pad)>, + // number of pads which are not eos yet + waiting_eos: u32, + }, + /// Item has been fully streamed + Done { + uridecodebin: gst::Element, + concat_sink_pads: Vec<(gst::Element, gst::Pad)>, + }, +} + +#[derive(Debug, Clone)] +struct Item { + inner: Arc>, +} + +impl Item { + fn new(uri: String, index: usize) -> Self { + let inner = ItemInner { + uri, + index, + state: ItemState::Pending, + }; + + Self { + inner: Arc::new(Mutex::new(inner)), + } + } + + fn uri(&self) -> String { + let inner = self.inner.lock().unwrap(); + inner.uri.clone() + } + + fn index(&self) -> usize { + let inner = self.inner.lock().unwrap(); + inner.index + } + + fn uridecodebin(&self) -> gst::Element { + let inner = self.inner.lock().unwrap(); + + match &inner.state { + ItemState::WaitingForStreamCollection { uridecodebin } + | ItemState::WaitingForStreamsynchronizerEos { uridecodebin, .. } + | ItemState::WaitingForPads { uridecodebin, .. } + | ItemState::Blocked { uridecodebin, .. } + | ItemState::Streaming { uridecodebin, .. } + | ItemState::Done { uridecodebin, .. } => uridecodebin.clone(), + _ => unreachable!(), + } + } + + fn concat_sink_pads(&self) -> Vec<(gst::Element, gst::Pad)> { + let inner = self.inner.lock().unwrap(); + + match &inner.state { + ItemState::WaitingForPads { + concat_sink_pads, .. + } + | ItemState::Blocked { + concat_sink_pads, .. + } + | ItemState::Streaming { + concat_sink_pads, .. + } + | ItemState::Done { + concat_sink_pads, .. + } => concat_sink_pads.clone(), + _ => unreachable!(), + } + } + + fn dec_n_pads_pending(&self) -> u32 { + let mut inner = self.inner.lock().unwrap(); + + match &mut inner.state { + ItemState::WaitingForPads { + n_pads_pendings, .. + } => { + *n_pads_pendings -= 1; + *n_pads_pendings + } + _ => unreachable!(), + } + } + + fn receiver(&self) -> crossbeam_channel::Receiver { + let inner = self.inner.lock().unwrap(); + + match &inner.state { + ItemState::WaitingForPads { receiver, .. } => receiver.clone(), + ItemState::WaitingForStreamsynchronizerEos { receiver, .. } => receiver.clone(), + // receiver is no longer supposed to be accessed once in the `Blocked` state + _ => unreachable!(), + } + } + + fn add_blocked_pad(&self, pad: gst::Pad) { + let mut inner = self.inner.lock().unwrap(); + + match &mut inner.state { + ItemState::WaitingForStreamsynchronizerEos { decodebin_pads, .. } => { + decodebin_pads.push(pad); + } + _ => unreachable!(), + } + } + + /// decrement waiting_eos on a WaitingForStreamsynchronizeEos item, returns if all the streams are now eos or not + fn dec_waiting_eos_ss(&self) -> bool { + let mut inner = self.inner.lock().unwrap(); + + match &mut inner.state { + ItemState::WaitingForStreamsynchronizerEos { waiting_eos, .. } => { + *waiting_eos -= 1; + *waiting_eos == 0 + } + _ => unreachable!(), + } + } + + fn is_streaming(&self) -> bool { + let inner = self.inner.lock().unwrap(); + + matches!(&inner.state, ItemState::Streaming { .. }) + } + + /// queue the stream-selected message of a blocked item + fn add_stream_selected(&self, msg: gst::Message) { + let mut inner = self.inner.lock().unwrap(); + + match &mut inner.state { + ItemState::Blocked { + stream_selected_msg, + .. + } => { + *stream_selected_msg = Some(msg); + } + _ => unreachable!(), + } + } + + // decrement waiting_eos on a Streaming item, returns if all the streams are now eos or not + fn dec_waiting_eos(&self) -> bool { + let mut inner = self.inner.lock().unwrap(); + + match &mut inner.state { + ItemState::Streaming { waiting_eos, .. } => { + *waiting_eos -= 1; + *waiting_eos == 0 + } + _ => unreachable!(), + } + } + + fn add_concat_sink_pad(&self, concat: &gst::Element, sink_pad: &gst::Pad) { + let mut inner = self.inner.lock().unwrap(); + + match &mut inner.state { + ItemState::WaitingForPads { + concat_sink_pads, .. + } => { + concat_sink_pads.push((concat.clone(), sink_pad.clone())); + } + _ => unreachable!(), + } + } + + // change state methods + + // from the Pending state, called when starting to process the item + fn set_waiting_for_stream_collection(&self) -> Result<(), PlaylistError> { + let mut inner = self.inner.lock().unwrap(); + + let uridecodebin = gst::ElementFactory::make( + "uridecodebin3", + Some(&format!("playlist-decodebin-{}", inner.index)), + ) + .map_err(|e| PlaylistError::PluginMissing { error: e.into() })?; + uridecodebin.set_property("uri", &inner.uri); + + assert!(matches!(inner.state, ItemState::Pending)); + inner.state = ItemState::WaitingForStreamCollection { uridecodebin }; + + Ok(()) + } + + // from the WaitingForStreamCollection state, called when we received the item stream collection + // and its stream topology matches what is currently being processed by the element. + fn set_waiting_for_pads(&self, n_streams: u32, msg: gst::message::StreamCollection) { + let mut inner = self.inner.lock().unwrap(); + assert!(matches!( + inner.state, + ItemState::WaitingForStreamCollection { .. } + )); + + let (sender, receiver) = crossbeam_channel::unbounded::(); + + match &inner.state { + ItemState::WaitingForStreamCollection { uridecodebin } => { + inner.state = ItemState::WaitingForPads { + uridecodebin: uridecodebin.clone(), + n_pads_pendings: n_streams, + stream_collection_msg: msg.copy(), + concat_sink_pads: vec![], + sender, + receiver, + }; + } + _ => unreachable!(), + } + } + + // from the WaitingForStreamCollection state, called when we received the item stream collection + // but its stream topology does not match what is currently being processed by the element, + // having to wait until streamsynchronizer is flushed to internally reorganize the element. + fn set_waiting_for_ss_eos(&self, waiting_eos: u32, msg: gst::message::StreamCollection) { + let mut inner = self.inner.lock().unwrap(); + let (sender, receiver) = crossbeam_channel::unbounded::(); + + match &inner.state { + ItemState::WaitingForStreamCollection { uridecodebin } => { + inner.state = ItemState::WaitingForStreamsynchronizerEos { + uridecodebin: uridecodebin.clone(), + decodebin_pads: vec![], + waiting_eos, + // FIXME: save deep copy once https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/363 is fixed + stream_collection_msg: msg.copy(), + sender, + receiver, + }; + } + _ => unreachable!(), + } + } + + // from the WaitingForStreamsynchronizerEos state, called when the streamsynchronizer has been flushed + // and the item can now be processed. + fn done_waiting_for_ss_eos( + &self, + ) -> ( + StreamsTopology, + Vec, + crossbeam_channel::Sender, + ) { + let mut inner = self.inner.lock().unwrap(); + + match &inner.state { + ItemState::WaitingForStreamsynchronizerEos { + uridecodebin, + decodebin_pads, + waiting_eos, + stream_collection_msg, + sender, + .. + } => { + assert_eq!(*waiting_eos, 0); + + let topology = match stream_collection_msg.view() { + gst::MessageView::StreamCollection(stream_collection_msg) => { + StreamsTopology::from(stream_collection_msg.stream_collection()) + } + _ => unreachable!(), + }; + let pending_pads = decodebin_pads.clone(); + let sender = sender.clone(); + + let (new_sender, new_receiver) = crossbeam_channel::unbounded::(); + + inner.state = ItemState::WaitingForPads { + uridecodebin: uridecodebin.clone(), + n_pads_pendings: topology.n_streams(), + stream_collection_msg: stream_collection_msg.copy(), + concat_sink_pads: vec![], + sender: new_sender, + receiver: new_receiver, + }; + + (topology, pending_pads, sender) + } + _ => unreachable!(), + } + } + + // from the WaitingForPads state, called when all the pads from decodebin have been added and connected to concat elements. + fn set_blocked(&self) { + let mut inner = self.inner.lock().unwrap(); + + match &mut inner.state { + ItemState::WaitingForPads { + uridecodebin, + sender, + receiver, + stream_collection_msg, + concat_sink_pads, + .. + } => { + inner.state = ItemState::Blocked { + uridecodebin: uridecodebin.clone(), + sender: sender.clone(), + receiver: receiver.clone(), + concat_sink_pads: concat_sink_pads.clone(), + stream_collection_msg: stream_collection_msg.copy(), + stream_selected_msg: None, + }; + } + _ => unreachable!(), + } + } + + // from the Blocked state, called when the item streaming threads can be unblocked. + // Return the queued messages from this item and the sender to unblock their pads + fn set_streaming( + &self, + n_streams: u32, + ) -> (Vec, crossbeam_channel::Sender) { + let mut inner = self.inner.lock().unwrap(); + + match &mut inner.state { + ItemState::Blocked { + uridecodebin, + sender, + stream_collection_msg, + stream_selected_msg, + concat_sink_pads, + .. + } => { + let mut messages = vec![stream_collection_msg.copy()]; + if let Some(msg) = stream_selected_msg { + messages.push(msg.copy()); + } + let sender = sender.clone(); + + inner.state = ItemState::Streaming { + uridecodebin: uridecodebin.clone(), + waiting_eos: n_streams, + concat_sink_pads: concat_sink_pads.clone(), + }; + + (messages, sender) + } + _ => unreachable!(), + } + } + + // from the Streaming state, called when the item has been fully processed and can be cleaned up + fn set_done(&self) { + let mut inner = self.inner.lock().unwrap(); + + match &mut inner.state { + ItemState::Streaming { + uridecodebin, + concat_sink_pads, + .. + } => { + inner.state = ItemState::Done { + uridecodebin: uridecodebin.clone(), + concat_sink_pads: concat_sink_pads.clone(), + }; + } + _ => unreachable!(), + } + } +} + +#[derive(Debug, Clone)] +struct ItemInner { + uri: String, + index: usize, + state: ItemState, +} + +struct Playlist { + items: Box + Send>, +} + +impl Playlist { + fn new(uris: Vec, iterations: u32) -> Self { + fn infinite_iter(uris: Vec) -> Box + Send + Sync> { + Box::new( + uris.into_iter() + .cycle() + .enumerate() + .map(|(index, uri)| Item::new(uri, index)), + ) + } + fn finite_iter( + uris: Vec, + iterations: u32, + ) -> Box + Send + Sync> { + let n = (iterations as usize) + .checked_mul(uris.len()) + .unwrap_or(usize::MAX); + + Box::new( + uris.into_iter() + .cycle() + .take(n) + .enumerate() + .map(|(index, uri)| Item::new(uri, index)), + ) + } + + let items = if iterations == 0 { + infinite_iter(uris) + } else { + finite_iter(uris, iterations) + }; + + Self { items } + } + + fn next(&mut self) -> Result, PlaylistError> { + let item = match self.items.next() { + None => return Ok(None), + Some(item) => item, + }; + + item.set_waiting_for_stream_collection()?; + + Ok(Some(item)) + } +} + +fn stream_type_from_pad_name(name: &str) -> anyhow::Result<(gst::StreamType, usize)> { + if let Some(index) = name.strip_prefix("audio_") { + Ok((gst::StreamType::AUDIO, index.parse().unwrap())) + } else if let Some(index) = name.strip_prefix("video_") { + Ok((gst::StreamType::VIDEO, index.parse().unwrap())) + } else if let Some(index) = name.strip_prefix("text_") { + Ok((gst::StreamType::TEXT, index.parse().unwrap())) + } else { + Err(anyhow::anyhow!("type of pad {} not supported", name)) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for UriPlaylistBin { + const NAME: &'static str = "GstUriPlaylistBin"; + type Type = super::UriPlaylistBin; + type ParentType = gst::Bin; +} + +impl ObjectImpl for UriPlaylistBin { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecBoxed::new( + "uris", + "URIs", + "URIs of the medias to play", + Vec::::static_type(), + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpecUInt::new( + "iterations", + "Iterations", + "Number of time the playlist items should be played each (0 = unlimited)", + 0, + u32::MAX, + 1, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property( + &self, + obj: &Self::Type, + _id: usize, + value: &glib::Value, + pspec: &glib::ParamSpec, + ) { + match pspec.name() { + "uris" => { + let mut settings = self.settings.lock().unwrap(); + let new_value = value.get().expect("type checked upstream"); + gst_info!( + CAT, + obj: obj, + "Changing uris from {:?} to {:?}", + settings.uris, + new_value, + ); + settings.uris = new_value; + } + "iterations" => { + let mut settings = self.settings.lock().unwrap(); + let new_value = value.get().expect("type checked upstream"); + gst_info!( + CAT, + obj: obj, + "Changing iterations from {:?} to {:?}", + settings.iterations, + new_value, + ); + settings.iterations = new_value; + } + _ => unimplemented!(), + } + } + + fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "uris" => { + let settings = self.settings.lock().unwrap(); + settings.uris.to_value() + } + "iterations" => { + let settings = self.settings.lock().unwrap(); + settings.iterations.to_value() + } + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK); + obj.set_element_flags(gst::ElementFlags::SOURCE); + } +} + +impl GstObjectImpl for UriPlaylistBin {} + +impl BinImpl for UriPlaylistBin { + fn handle_message(&self, element: &Self::Type, msg: gst::Message) { + match msg.view() { + gst::MessageView::StreamCollection(stream_collection_msg) => { + if let Err(e) = self.handle_stream_collection(element, stream_collection_msg) { + self.failed(element, e); + } + // stream collection will be send when the item starts streaming + return; + } + gst::MessageView::StreamsSelected(stream_selected) => { + if !self.handle_stream_selected(element, stream_selected) { + return; + } + } + gst::MessageView::Error(error) => { + // find item which raised the error + let self_ = UriPlaylistBin::from_instance(element); + let mut state_guard = self_.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + + let src = error.src().unwrap(); + let item = state.find_item_from_src(&src); + + drop(state_guard); + + if let Some(item) = item { + // handle the error message so we can add the failing uri as error details + + self.failed( + element, + PlaylistError::ItemFailed { + error: anyhow::anyhow!( + "Error when processing item #{} ({}): {}", + item.index(), + item.uri(), + error.error().to_string() + ), + item, + }, + ); + return; + } + } + _ => (), + } + + self.parent_handle_message(element, msg) + } +} + +impl ElementImpl for UriPlaylistBin { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "Playlist Source", + "Generic/Source", + "Sequentially play uri streams", + "Guillaume Desmottes ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let audio_src_pad_template = gst::PadTemplate::new( + "audio_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_any(), + ) + .unwrap(); + + let video_src_pad_template = gst::PadTemplate::new( + "video_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_any(), + ) + .unwrap(); + + let text_src_pad_template = gst::PadTemplate::new( + "text_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &gst::Caps::new_any(), + ) + .unwrap(); + + vec![ + audio_src_pad_template, + video_src_pad_template, + text_src_pad_template, + ] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result { + if transition == gst::StateChange::NullToReady { + if let Err(e) = self.start(element) { + self.failed(element, e); + return Err(gst::StateChangeError); + } + } + + self.parent_change_state(element, transition) + } +} + +impl UriPlaylistBin { + fn start(&self, element: &super::UriPlaylistBin) -> Result<(), PlaylistError> { + gst_debug!(CAT, obj: element, "Starting"); + { + let mut state_guard = self.state.lock().unwrap(); + assert!(state_guard.is_none()); + + let streamsynchronizer = + gst::ElementFactory::make("streamsynchronizer", Some("playlist-streamsync")) + .map_err(|e| PlaylistError::PluginMissing { error: e.into() })?; + + element.add(&streamsynchronizer).unwrap(); + + let settings = self.settings.lock().unwrap(); + + *state_guard = Some(State::new( + settings.uris.clone(), + settings.iterations, + streamsynchronizer, + )); + } + + self.start_next_item(element)?; + + Ok(()) + } + + fn start_next_item(&self, element: &super::UriPlaylistBin) -> Result<(), PlaylistError> { + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + + // clean up done items, so uridecodebin elements and concat sink pads don't pile up in the pipeline + while let Some(done) = state.done.pop() { + let uridecodebin = done.uridecodebin(); + gst_log!(CAT, obj: element, "remove {} from bin", uridecodebin.name()); + + for (concat, sink_pad) in done.concat_sink_pads() { + // calling release_request_pad() while holding the pad stream lock would deadlock + concat.call_async(move |concat| { + concat.release_request_pad(&sink_pad); + }); + } + + // can't change state from the streaming thread + let uridecodebin_clone = uridecodebin.clone(); + element.call_async(move |_element| { + let _ = uridecodebin_clone.set_state(gst::State::Null); + }); + + element.remove(&uridecodebin).unwrap(); + } + + if state.waiting_for_stream_collection.is_some() + || state.waiting_for_pads.is_some() + || state.waiting_for_ss_eos.is_some() + { + // another item is being prepared + return Ok(()); + } + + let n_streaming = state.streaming.len(); + if n_streaming > MAX_STREAMING_ITEMS { + gst_log!( + CAT, + obj: element, + "Too many items streaming ({}), wait before starting the next one", + n_streaming + ); + + return Ok(()); + } + + let item = match state.playlist.next()? { + Some(item) => item, + None => { + gst_debug!(CAT, obj: element, "no more item to queue",); + + // unblock last item + state.unblock_item(element); + + return Ok(()); + } + }; + + gst_debug!( + CAT, + obj: element, + "start decoding item #{}: {}", + item.index(), + item.uri() + ); + + let uridecodebin = item.uridecodebin(); + + element.add(&uridecodebin).unwrap(); + + let element_weak = element.downgrade(); + let uridecodebin_clone = uridecodebin.clone(); + + let item_clone = item.clone(); + assert!(state.waiting_for_stream_collection.is_none()); + state.waiting_for_stream_collection = Some(item); + + uridecodebin.connect_pad_added(move |_uridecodebin, src_pad| { + let element = match element_weak.upgrade() { + Some(element) => element, + None => return, + }; + let self_ = UriPlaylistBin::from_instance(&element); + + let item = { + let mut state_guard = self_.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + state.waiting_for_ss_eos.as_ref().cloned() + }; + + if let Some(item) = item { + // block pad until streamsynchronizer is eos + let element_weak = element.downgrade(); + let receiver = item.receiver(); + + gst_debug!( + CAT, + obj: &element, + "Block pad {} until streamsynchronizer is flushed", + src_pad.name(), + ); + + src_pad.add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, move |pad, _info| { + let element = match element_weak.upgrade() { + Some(element) => element, + None => return gst::PadProbeReturn::Remove, + }; + let parent = pad.parent().unwrap(); + + let _ = receiver.recv(); + + gst_log!( + CAT, + obj: &element, + "pad {}:{} has been unblocked", + parent.name(), + pad.name() + ); + + gst::PadProbeReturn::Remove + }); + + item.add_blocked_pad(src_pad.clone()); + } else { + self_.process_decodebin_pad(src_pad); + } + }); + + drop(state_guard); + + uridecodebin_clone + .sync_state_with_parent() + .map_err(|e| PlaylistError::ItemFailed { + error: e.into(), + item: item_clone, + })?; + + Ok(()) + } + + fn handle_stream_collection( + &self, + element: &super::UriPlaylistBin, + stream_collection_msg: gst::message::StreamCollection, + ) -> Result<(), PlaylistError> { + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + let src = stream_collection_msg.src().unwrap(); + + if let Some(item) = state.waiting_for_stream_collection.clone() { + // check message is from the decodebin we are waiting for + let uridecodebin = item.uridecodebin(); + + if src.has_as_ancestor(&uridecodebin) { + let topology = StreamsTopology::from(stream_collection_msg.stream_collection()); + + gst_debug!( + CAT, + obj: element, + "got stream collection from {}: {:?}", + src.name(), + topology + ); + + if state.streams_topology.n_streams() == 0 { + state.streams_topology = topology.clone(); + } + + assert!(state.waiting_for_pads.is_none()); + + if state.streams_topology != topology { + gst_debug!( + CAT, + obj: element, "streams topoly changed ('{:?}' -> '{:?}'), waiting for streamsynchronize to be flushed", + state.streams_topology, topology); + item.set_waiting_for_ss_eos( + state.streams_topology.n_streams(), + stream_collection_msg, + ); + state.waiting_for_ss_eos = Some(item); + + // unblock previous item as we need it to be flushed out of streamsynchronizer + state.unblock_item(element); + } else { + item.set_waiting_for_pads(topology.n_streams(), stream_collection_msg); + state.waiting_for_pads = Some(item); + } + + state.waiting_for_stream_collection = None; + } + } + Ok(()) + } + + // return true if the message can be forwarded + fn handle_stream_selected( + &self, + element: &super::UriPlaylistBin, + stream_selected_msg: gst::message::StreamsSelected, + ) -> bool { + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + let src = stream_selected_msg.src().unwrap(); + + if let Some(item) = state.blocked.clone() { + let uridecodebin = item.uridecodebin(); + + if src.has_as_ancestor(&uridecodebin) { + // stream-selected message is from the blocked item, queue the message until it's unblocked + gst_debug!( + CAT, + obj: element, + "queue stream-selected message from {} as item is currently blocked", + src.name(), + ); + + item.add_stream_selected(stream_selected_msg.copy()); + false + } else { + true + } + } else { + true + } + } + + fn process_decodebin_pad(&self, src_pad: &gst::Pad) { + let element = self.instance(); + + let start_next = { + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + + if state.errored { + return; + } + + let item = state.waiting_for_pads.clone().unwrap(); + + // Parse the pad name to extract the stream type and its index. + // We could get the type from the Stream object from the StreamStart sticky event but we'd still have + // to parse the name for the index. + let pad_name = src_pad.name(); + let (stream_type, stream_index) = match stream_type_from_pad_name(&pad_name) { + Ok((stream_type, stream_index)) => (stream_type, stream_index), + Err(e) => { + gst_warning!(CAT, obj: &element, "Ignoring pad {}: {}", pad_name, e); + return; + } + }; + + let concat = match stream_type { + gst::StreamType::AUDIO => state.concat_audio.get(stream_index), + gst::StreamType::VIDEO => state.concat_video.get(stream_index), + gst::StreamType::TEXT => state.concat_text.get(stream_index), + _ => unreachable!(), // early return on unsupported streams above + }; + + let concat = match concat { + None => { + gst_debug!( + CAT, + obj: &element, + "stream {} from item #{}: creating concat element", + pad_name, + item.index() + ); + + let concat = match gst::ElementFactory::make( + "concat", + Some(&format!( + "playlist-concat-{}-{}", + stream_type.name(), + stream_index + )), + ) { + Ok(concat) => concat, + Err(_) => { + drop(state_guard); + self.failed( + &element, + PlaylistError::PluginMissing { + error: anyhow::anyhow!("element 'concat' missing"), + }, + ); + return; + } + }; + + // this is done by the streamsynchronizer element downstream + concat.set_property("adjust-base", false); + + element.add(&concat).unwrap(); + + concat.sync_state_with_parent().unwrap(); + + // link concat elements to streamsynchronizer + let concat_src = concat.static_pad("src").unwrap(); + let sync_sink = state + .streamsynchronizer + .request_pad_simple("sink_%u") + .unwrap(); + concat_src.link(&sync_sink).unwrap(); + + let element_weak = element.downgrade(); + + // add event probe on streamsynchronizer src pad. Will only be used when we are waiting for the + // streamsynchronizer to be flushed in order to handle streams topology changes. + let src_pad_name = sync_sink.name().to_string().replace("sink", "src"); + let sync_src = state.streamsynchronizer.static_pad(&src_pad_name).unwrap(); + sync_src.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |_pad, info| { + match info.data { + Some(gst::PadProbeData::Event(ref ev)) + if ev.type_() == gst::EventType::Eos => + { + let element = match element_weak.upgrade() { + Some(element) => element, + None => return gst::PadProbeReturn::Remove, + }; + let self_ = UriPlaylistBin::from_instance(&element); + + let item = { + let mut state_guard = self_.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + state.waiting_for_ss_eos.as_ref().cloned() + }; + + if let Some(item) = item { + if item.dec_waiting_eos_ss() { + gst_debug!(CAT, obj: &element, "streamsynchronizer has been flushed, reorganize pipeline to fit new streams topology and unblock item"); + self_.handle_topology_change(&element); + gst::PadProbeReturn::Drop + } else { + gst::PadProbeReturn::Drop + } + } else { + gst::PadProbeReturn::Pass + } + } + _ => gst::PadProbeReturn::Pass, + } + }); + + // ghost streamsynchronizer src pad + let sync_src_name = sync_sink.name().as_str().replace("sink", "src"); + let src = state.streamsynchronizer.static_pad(&sync_src_name).unwrap(); + let ghost = gst::GhostPad::with_target(Some(pad_name.as_str()), &src).unwrap(); + ghost.set_active(true).unwrap(); + + // proxy sticky events + src.sticky_events_foreach(|event| { + let _ = ghost.store_sticky_event(&event); + Ok(Some(event)) + }); + + unsafe { + ghost.set_event_function(|pad, parent, event| match event.view() { + gst::EventView::SelectStreams(_) => { + // TODO: handle select-streams event + let element = parent.unwrap(); + gst_fixme!( + CAT, + obj: element, + "select-streams event not supported ('{:?}')", + event + ); + false + } + _ => pad.event_default(parent, event), + }); + } + + element.add_pad(&ghost).unwrap(); + + match stream_type { + gst::StreamType::AUDIO => { + state.concat_audio.push(concat.clone()); + } + gst::StreamType::VIDEO => { + state.concat_video.push(concat.clone()); + } + gst::StreamType::TEXT => { + state.concat_text.push(concat.clone()); + } + _ => unreachable!(), // early return on unsupported streams above + } + + concat + } + Some(concat) => { + gst_debug!( + CAT, + obj: &element, + "stream {} from item #{}: re-using concat element {}", + pad_name, + item.index(), + concat.name() + ); + + concat.clone() + } + }; + + let sink_pad = concat.request_pad_simple("sink_%u").unwrap(); + src_pad.link(&sink_pad).unwrap(); + + item.add_concat_sink_pad(&concat, &sink_pad); + + // block pad until next item is reaching the `Blocked` state + let receiver = item.receiver(); + let element_weak = element.downgrade(); + let item_clone = item.clone(); + + sink_pad.add_probe(gst::PadProbeType::BLOCK_DOWNSTREAM, move |pad, info| { + let element = match element_weak.upgrade() { + Some(element) => element, + None => return gst::PadProbeReturn::Remove, + }; + let parent = pad.parent().unwrap(); + let item = &item_clone; + + if !item.is_streaming() { + // block pad until next item is ready + gst_log!( + CAT, + obj: &element, + "blocking pad {}:{} until next item is ready", + parent.name(), + pad.name() + ); + + let _ = receiver.recv(); + + gst_log!( + CAT, + obj: &element, + "pad {}:{} has been unblocked", + parent.name(), + pad.name() + ); + + gst::PadProbeReturn::Pass + } else { + match info.data { + Some(gst::PadProbeData::Event(ref ev)) + if ev.type_() == gst::EventType::Eos => + { + if item.dec_waiting_eos() { + // all the streams are eos, item is now done + gst_log!( + CAT, + obj: &element, + "all streams of item #{} are eos", + item.index() + ); + + let self_ = UriPlaylistBin::from_instance(&element); + { + let mut state_guard = self_.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + + let index = item.index(); + + let removed = state + .streaming + .iter() + .position(|i| i.index() == index) + .map(|e| state.streaming.remove(e)); + + if let Some(item) = removed { + item.set_done(); + state.done.push(item); + } + } + + if let Err(e) = self_.start_next_item(&element) { + self_.failed(&element, e); + } + } + + gst::PadProbeReturn::Remove + } + _ => gst::PadProbeReturn::Pass, + } + } + }); + + if item.dec_n_pads_pending() == 0 { + // we got all the pads + gst_debug!( + CAT, + obj: &element, + "got all the pads for item #{}", + item.index() + ); + + // all pads have been linked to concat, unblock previous item + state.unblock_item(&element); + + state.waiting_for_pads = None; + // block item until the next one is fully linked to concat + item.set_blocked(); + state.blocked = Some(item); + + true + } else { + false + } + }; + + if start_next { + gst_debug!( + CAT, + obj: &element, + "got all pending streams, queue next item" + ); + + if let Err(e) = self.start_next_item(&element) { + self.failed(&element, e); + } + } + } + + /// called when all previous items have been flushed from streamsynchronizer + /// and so the elements can reorganize itself to handle a pending changes in + /// streams topology. + fn handle_topology_change(&self, element: &super::UriPlaylistBin) { + let (pending_pads, sender) = { + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + + let item = state.waiting_for_ss_eos.take().unwrap(); + let (topology, pending_pads, sender) = item.done_waiting_for_ss_eos(); + state.waiting_for_pads = Some(item); + + // remove now useless concat elements, missing ones will be added when handling src pads from decodebin + + fn remove_useless_concat( + n_stream: usize, + concats: &mut Vec, + element: &super::UriPlaylistBin, + streamsynchronizer: &gst::Element, + ) { + while n_stream < concats.len() { + // need to remove concat elements + let concat = concats.pop().unwrap(); + gst_log!(CAT, obj: element, "remove {}", concat.name()); + + let concat_src = concat.static_pad("src").unwrap(); + let ss_sink = concat_src.peer().unwrap(); + + // unlink and remove sink pad from streamsynchronizer + concat_src.unlink(&ss_sink).unwrap(); + streamsynchronizer.release_request_pad(&ss_sink); + + // remove associated ghost pad + let src_pads = element.src_pads(); + let ghost = src_pads + .iter() + .find(|pad| { + let ghost = pad.downcast_ref::().unwrap(); + ghost.target().is_none() + }) + .unwrap(); + element.remove_pad(ghost).unwrap(); + + element.remove(&concat).unwrap(); + let _ = concat.set_state(gst::State::Null); + } + } + + remove_useless_concat( + topology.audio as usize, + &mut state.concat_audio, + element, + &state.streamsynchronizer, + ); + remove_useless_concat( + topology.video as usize, + &mut state.concat_video, + element, + &state.streamsynchronizer, + ); + remove_useless_concat( + topology.text as usize, + &mut state.concat_text, + element, + &state.streamsynchronizer, + ); + + state.streams_topology = topology; + + (pending_pads, sender) + }; + + // process decodebin src pads we already received and unblock them + for pad in pending_pads.iter() { + self.process_decodebin_pad(pad); + } + + let _ = sender.send(true); + } + + fn failed(&self, element: &super::UriPlaylistBin, error: PlaylistError) { + { + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + + if state.errored { + return; + } + state.errored = true; + + if let Some(blocked) = state.blocked.take() { + // unblock streaming thread + blocked.set_streaming(state.streams_topology.n_streams()); + } + } + let error_msg = error.to_string(); + gst_error!(CAT, obj: element, "{}", error_msg); + + match error { + PlaylistError::PluginMissing { .. } => { + gst::element_error!(element, gst::CoreError::MissingPlugin, [&error_msg]); + } + PlaylistError::ItemFailed { item, .. } => { + // remove failing uridecodebin + let uridecodebin = item.uridecodebin(); + uridecodebin.call_async(move |uridecodebin| { + let _ = uridecodebin.set_state(gst::State::Null); + }); + let _ = element.remove(&uridecodebin); + + let details = gst::Structure::builder("details"); + let details = details.field("uri", item.uri()); + + gst::element_error!( + element, + gst::LibraryError::Failed, + [&error_msg], + details: details.build() + ); + } + } + } +} diff --git a/utils/uriplaylistbin/src/uriplaylistbin/mod.rs b/utils/uriplaylistbin/src/uriplaylistbin/mod.rs new file mode 100644 index 00000000..a3fa7c16 --- /dev/null +++ b/utils/uriplaylistbin/src/uriplaylistbin/mod.rs @@ -0,0 +1,30 @@ +// Copyright (C) 2021 OneStream Live +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct UriPlaylistBin(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; +} + +// GStreamer elements need to be thread-safe. For the private implementation this is automatically +// enforced but for the public wrapper type we need to specify this manually. +unsafe impl Send for UriPlaylistBin {} +unsafe impl Sync for UriPlaylistBin {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "uriplaylistbin", + gst::Rank::None, + UriPlaylistBin::static_type(), + ) +} diff --git a/utils/uriplaylistbin/tests/sample.mkv b/utils/uriplaylistbin/tests/sample.mkv new file mode 100644 index 00000000..331e063b Binary files /dev/null and b/utils/uriplaylistbin/tests/sample.mkv differ diff --git a/utils/uriplaylistbin/tests/sample.ogg b/utils/uriplaylistbin/tests/sample.ogg new file mode 100644 index 00000000..fd79fe10 Binary files /dev/null and b/utils/uriplaylistbin/tests/sample.ogg differ diff --git a/utils/uriplaylistbin/tests/uriplaylistbin.rs b/utils/uriplaylistbin/tests/uriplaylistbin.rs new file mode 100644 index 00000000..e8e4842c --- /dev/null +++ b/utils/uriplaylistbin/tests/uriplaylistbin.rs @@ -0,0 +1,299 @@ +// Copyright (C) 2021 OneStream Live +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::path::PathBuf; + +use gst::prelude::*; +use gst::MessageView; +use more_asserts::assert_ge; + +struct TestMedia { + uri: String, + len: gst::ClockTime, +} + +fn file_name_to_uri(name: &str) -> String { + let input_path = { + let mut r = PathBuf::new(); + r.push(env!("CARGO_MANIFEST_DIR")); + r.push("tests"); + r.push(name); + r + }; + + let url = url::Url::from_file_path(&input_path).unwrap(); + url.to_string() +} + +impl TestMedia { + fn ogg() -> Self { + Self { + uri: file_name_to_uri("sample.ogg"), + len: gst::ClockTime::from_mseconds(510), + } + } + + fn mkv() -> Self { + Self { + uri: file_name_to_uri("sample.mkv"), + len: gst::ClockTime::from_mseconds(510), + } + } + + fn missing_file() -> Self { + Self { + uri: "file:///not-there.ogg".to_string(), + len: gst::ClockTime::from_mseconds(10), + } + } + + fn missing_http() -> Self { + Self { + uri: "http:///not-there.ogg".to_string(), + len: gst::ClockTime::from_mseconds(10), + } + } +} + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gsturiplaylistbin::plugin_register_static() + .expect("Failed to register uriplaylistbin plugin"); + }); +} + +fn test( + medias: Vec, + n_streams: u32, + iterations: u32, + check_streams: bool, +) -> Vec { + init(); + + let playlist_len = medias.len() * (iterations as usize); + + let pipeline = gst::Pipeline::new(None); + let playlist = gst::ElementFactory::make("uriplaylistbin", None).unwrap(); + let mq = gst::ElementFactory::make("multiqueue", None).unwrap(); + + pipeline.add_many(&[&playlist, &mq]).unwrap(); + + let total_len = gst::ClockTime::from_nseconds( + medias + .iter() + .map(|t| t.len.nseconds() * (iterations as u64)) + .sum(), + ); + + let uris: Vec = medias.iter().map(|t| t.uri.clone()).collect(); + + playlist.set_property("uris", &uris); + playlist.set_property("iterations", &iterations); + + let mq_clone = mq.clone(); + playlist.connect_pad_added(move |_playlist, src_pad| { + let mq_sink = mq_clone.request_pad_simple("sink_%u").unwrap(); + src_pad.link(&mq_sink).unwrap(); + }); + + let pipeline_weak = pipeline.downgrade(); + mq.connect_pad_added(move |_mq, pad| { + if pad.direction() != gst::PadDirection::Src { + return; + } + + let pipeline = match pipeline_weak.upgrade() { + Some(pipeline) => pipeline, + None => return, + }; + + let sink = gst::ElementFactory::make("fakesink", None).unwrap(); + pipeline.add(&sink).unwrap(); + sink.sync_state_with_parent().unwrap(); + + pad.link(&sink.static_pad("sink").unwrap()).unwrap(); + }); + + pipeline.set_state(gst::State::Playing).unwrap(); + + let bus = pipeline.bus().unwrap(); + let mut events = vec![]; + + loop { + let msg = bus.iter_timed(gst::ClockTime::NONE).next().unwrap(); + + match msg.view() { + MessageView::Error(_) | MessageView::Eos(..) => { + events.push(msg.clone()); + break; + } + // check stream related messages + MessageView::StreamCollection(_) | MessageView::StreamsSelected(_) => { + events.push(msg.clone()) + } + _ => {} + } + } + + // check we actually played all files and all streams + fn stream_end_ts(sink: &gst::Element) -> gst::ClockTime { + let sample: gst::Sample = sink.property("last-sample"); + let buffer = sample.buffer().unwrap(); + let pts = buffer.pts().unwrap(); + let segment = sample.segment().unwrap(); + let segment = segment.downcast_ref::().unwrap(); + let rt = segment.to_running_time(pts).unwrap(); + + rt + buffer.duration().unwrap() + } + + if check_streams { + // check all streams have been fully played + let mut n = 0; + for sink in pipeline.iterate_sinks() { + let sink = sink.unwrap(); + assert_ge!( + stream_end_ts(&sink), + total_len, + "{}: {} < {}", + sink.name(), + stream_end_ts(&sink), + total_len + ); + n += 1; + } + assert_eq!(n, n_streams); + + // check stream-collection and streams-selected message ordering + let mut events = events.clone().into_iter(); + + for _ in 0..playlist_len { + let decodebin = assert_stream_collection(events.next().unwrap(), n_streams as usize); + assert_eq!( + assert_stream_selected(events.next().unwrap(), n_streams as usize), + decodebin + ); + } + } + + pipeline.set_state(gst::State::Null).unwrap(); + + events +} + +fn assert_eos(msg: gst::Message) { + assert!(matches!(msg.view(), MessageView::Eos(_))); +} + +fn assert_error(msg: gst::Message, failing: TestMedia) { + match msg.view() { + MessageView::Error(err) => { + let details = err.details().unwrap(); + assert_eq!(details.get::<&str>("uri").unwrap(), failing.uri); + } + _ => { + panic!("last message is not an error"); + } + } +} + +fn assert_stream_collection(msg: gst::Message, n_streams: usize) -> gst::Object { + match msg.view() { + MessageView::StreamCollection(sc) => { + let collection = sc.stream_collection(); + assert_eq!(collection.len(), n_streams); + sc.src().unwrap() + } + _ => { + panic!("message is not a stream collection"); + } + } +} + +fn assert_stream_selected(msg: gst::Message, n_streams: usize) -> gst::Object { + match msg.view() { + MessageView::StreamsSelected(ss) => { + let collection = ss.stream_collection(); + assert_eq!(collection.len(), n_streams); + ss.src().unwrap() + } + _ => { + panic!("message is not stream selected"); + } + } +} + +#[test] +fn single_audio() { + let events = test(vec![TestMedia::ogg()], 1, 1, true).into_iter(); + assert_eos(events.last().unwrap()); +} + +#[test] +fn single_video() { + let events = test(vec![TestMedia::mkv()], 2, 1, true).into_iter(); + assert_eos(events.last().unwrap()); +} + +#[test] +fn multi_audio() { + let events = test( + vec![TestMedia::ogg(), TestMedia::ogg(), TestMedia::ogg()], + 1, + 1, + true, + ) + .into_iter(); + assert_eos(events.last().unwrap()); +} + +#[test] +fn multi_audio_video() { + let events = test(vec![TestMedia::mkv(), TestMedia::mkv()], 2, 1, true).into_iter(); + assert_eos(events.last().unwrap()); +} + +#[test] +fn iterations() { + let events = test(vec![TestMedia::mkv(), TestMedia::mkv()], 2, 2, true).into_iter(); + assert_eos(events.last().unwrap()); +} + +#[test] +fn nb_streams_increasing() { + let events = test(vec![TestMedia::ogg(), TestMedia::mkv()], 2, 1, false).into_iter(); + assert_eos(events.last().unwrap()); +} + +#[test] +fn missing_file() { + let events = test( + vec![TestMedia::ogg(), TestMedia::missing_file()], + 1, + 1, + false, + ) + .into_iter(); + assert_error(events.last().unwrap(), TestMedia::missing_file()); +} + +#[test] +fn missing_http() { + let events = test( + vec![TestMedia::ogg(), TestMedia::missing_http()], + 1, + 1, + false, + ) + .into_iter(); + assert_error(events.last().unwrap(), TestMedia::missing_http()); +}