diff --git a/Cargo.lock b/Cargo.lock index c67683e1..1db6c839 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3211,6 +3211,7 @@ dependencies = [ "gstreamer", "gstreamer-app", "more-asserts", + "tempfile", "thiserror 2.0.9", "url", ] diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 4ae10c68..684b1a43 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -15734,6 +15734,30 @@ } }, "properties": { + "cache": { + "blurb": "Cache playlist items from the network to disk so they are downloaded only once when playing multiple iterations.", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + }, + "cache-dir": { + "blurb": "The directory where playlist items are downloaded to, if 'cache' is enabled. If not set (default), the XDG cache directory is used.", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + }, "current-iteration": { "blurb": "The index of the current playlist iteration, or 0 if the iterations property is 0 (unlimited playlist)", "conditionally-available": false, diff --git a/utils/uriplaylistbin/Cargo.toml b/utils/uriplaylistbin/Cargo.toml index 836ecac5..7ba5d556 100644 --- a/utils/uriplaylistbin/Cargo.toml +++ b/utils/uriplaylistbin/Cargo.toml @@ -18,6 +18,7 @@ thiserror = "2" gst-app.workspace = true url = "2.2" more-asserts = "0.3" +tempfile = "3" [lib] name = "gsturiplaylistbin" diff --git a/utils/uriplaylistbin/examples/playlist.rs b/utils/uriplaylistbin/examples/playlist.rs index a927648a..d6c98b48 100644 --- a/utils/uriplaylistbin/examples/playlist.rs +++ b/utils/uriplaylistbin/examples/playlist.rs @@ -20,14 +20,25 @@ use gst::prelude::*; struct Opt { #[clap(short, default_value = "1")] iterations: u32, + #[clap(long, help = "Enable items cache")] + cache: bool, + #[clap(long, help = "Cache directory")] + cache_dir: Option, uris: Vec, } -fn create_pipeline(uris: Vec, iterations: u32) -> anyhow::Result { +fn create_pipeline( + uris: Vec, + iterations: u32, + cache: bool, + cache_dir: Option, +) -> anyhow::Result { let pipeline = gst::Pipeline::default(); let playlist = gst::ElementFactory::make("uriplaylistbin") .property("uris", &uris) .property("iterations", iterations) + .property("cache", cache) + .property("cache-dir", cache_dir) .build()?; pipeline.add(&playlist)?; @@ -117,7 +128,7 @@ fn main() -> anyhow::Result<()> { .collect(); { - let pipeline = create_pipeline(uris, opt.iterations)?; + let pipeline = create_pipeline(uris, opt.iterations, opt.cache, opt.cache_dir)?; pipeline .set_state(gst::State::Playing) diff --git a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs index 3a7e57be..59a84fd8 100644 --- a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs +++ b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs @@ -8,6 +8,7 @@ use std::{ collections::{HashMap, VecDeque}, + path::PathBuf, sync::{Arc, Mutex, MutexGuard}, }; @@ -35,6 +36,8 @@ enum PlaylistError { struct Settings { uris: Vec, iterations: u32, + cache: bool, + cache_dir: Option, } impl Default for Settings { @@ -42,6 +45,8 @@ impl Default for Settings { Self { uris: vec![], iterations: 1, + cache: false, + cache_dir: None, } } } @@ -55,6 +60,8 @@ struct State { current_item: Option, /// key are src pads from uridecodebin pads: HashMap, + /// URIs cached on disk, only used if `cache` property is enabled. + cached_uris: HashMap, // read-only properties current_iteration: u32, @@ -77,6 +84,7 @@ impl State { pending_current_items: VecDeque::new(), current_item: None, pads: HashMap::new(), + cached_uris: HashMap::new(), current_iteration: 0, current_uri_index: 0, } @@ -195,6 +203,30 @@ impl ObjectImpl for UriPlaylistBin { .default_value(1) .mutable_playing() .build(), + /** + * GstUriPlaylistBin:cache: + * + * Cache playlist items from the network to disk so they are downloaded only once when playing multiple iterations. + * + * Since: plugins-rs-0.14.0 + */ + glib::ParamSpecBoolean::builder("cache") + .nick("Cache") + .blurb("Cache playlist items from the network to disk so they are downloaded only once when playing multiple iterations.") + .mutable_ready() + .build(), + /** + * GstUriPlaylistBin:cache-dir: + * + * The directory where playlist items are downloaded to, if 'cache' is enabled. If not set (default), the XDG cache directory is used. + * + * Since: plugins-rs-0.14.0 + */ + glib::ParamSpecString::builder("cache-dir") + .nick("Cache directory") + .blurb("The directory where playlist items are downloaded to, if 'cache' is enabled. If not set (default), the XDG cache directory is used.") + .mutable_ready() + .build(), glib::ParamSpecUInt::builder("current-iteration") .nick("Current iteration") .blurb("The index of the current playlist iteration, or 0 if the iterations property is 0 (unlimited playlist)") @@ -246,6 +278,30 @@ impl ObjectImpl for UriPlaylistBin { } } } + "cache" => { + let mut settings = self.settings.lock().unwrap(); + let new_value = value.get().expect("type checked upstream"); + gst::info!( + CAT, + imp = self, + "Changing cache from {:?} to {:?}", + settings.cache, + new_value, + ); + settings.cache = new_value; + } + "cache-dir" => { + let mut settings = self.settings.lock().unwrap(); + let new_value = value.get().expect("type checked upstream"); + gst::info!( + CAT, + imp = self, + "Changing cache-dir from {:?} to {:?}", + settings.cache_dir, + new_value, + ); + settings.cache_dir = new_value; + } _ => unimplemented!(), } } @@ -276,6 +332,14 @@ impl ObjectImpl for UriPlaylistBin { .unwrap_or(0) .to_value() } + "cache" => { + let settings = self.settings.lock().unwrap(); + settings.cache.to_value() + } + "cache-dir" => { + let settings = self.settings.lock().unwrap(); + settings.cache_dir.to_value() + } _ => unimplemented!(), } } @@ -411,10 +475,16 @@ impl UriPlaylistBin { let mut state_guard = self.state.lock().unwrap(); assert!(state_guard.is_none()); + let settings = self.settings.lock().unwrap(); + // No need to enable caching if we play only one iteration + let download = settings.cache && settings.iterations != 1; let uridecodebin = gst::ElementFactory::make("uridecodebin3") .name("playlist-uridecodebin") + .property("download", download) + .property("download-dir", &settings.cache_dir) .build() .map_err(|e| PlaylistError::PluginMissing { error: e.into() })?; + drop(settings); let streamsynchronizer = gst::ElementFactory::make("streamsynchronizer") .name("playlist-streamsynchronizer") @@ -494,12 +564,81 @@ impl UriPlaylistBin { }); let bin_weak = self.obj().downgrade(); - uridecodebin.connect("about-to-finish", false, move |_args| { + uridecodebin.connect("about-to-finish", false, move |args| { + let uridecodebin = args[0].get::().unwrap(); let bin = bin_weak.upgrade()?; let self_ = bin.imp(); gst::debug!(CAT, obj = bin, "current URI about to finish"); + let cache = self_.settings.lock().unwrap().cache; + + // `about-to-finish` is emitted when the file has been fully buffered so we are sure it has been fully written to disk. + if cache { + // retrieve cached path of the current item + let download_path = uridecodebin + .iterate_recurse() + .find(|e| { + e.factory() + .map(|factory| factory.name()) + .unwrap_or_default() + == "downloadbuffer" + }) + .map(|downloadbuffer| downloadbuffer.property::("temp-location")) + .map(PathBuf::from) + .and_then(|path| path.canonicalize().ok()); + + // urisourcebin uses downloadbuffer only with some specific URI scheme (http, etc). + // So if it has not been used assume it's a local file and loop using the original (or already cached) URI. + if let Some(path) = download_path { + let mut state = self_.state.lock().unwrap(); + if let Some(state) = state.as_mut() { + let uri = uridecodebin.property::("uri"); + // downloadbuffer will remove the file as soon as it's done with it so we need to make a copy. + let mut link_path = path.clone(); + link_path.set_file_name(format!( + "item-{}-{}", + state.current_uri_index, + path.file_name() + .and_then(|name| name.to_str()) + .unwrap_or_default() + )); + + let mut cached = true; + + // Try first creating a hard link to prevent a full copy. + if let Err(err) = std::fs::hard_link(&path, &link_path) { + gst::warning!( + CAT, + imp = self_, + "Failed to hard link cached item, try copy: '{err}'" + ); + + if let Err(err) = std::fs::copy(&path, &link_path) { + // Hard links are only supported with NTFS on Windows so fallback to copy. + gst::warning!( + CAT, + imp = self_, + "Failed to copy cached item: '{err}'" + ); + + cached = false; + } + } + + if cached { + gst::log!( + CAT, + imp = self_, + "URI {uri} cached to {}", + link_path.display() + ); + state.cached_uris.insert(uri, link_path); + } + } + } + } + let _ = self_.start_next_item(); None @@ -549,17 +688,21 @@ impl UriPlaylistBin { } }; - gst::debug!( - CAT, - imp = self, - "start next item #{}: {}", - item.index(), - item.uri() - ); + let mut uri = item.uri(); + if let Some(path) = state.cached_uris.get(&uri) { + uri = gst::glib::filename_to_uri(path, None).unwrap().to_string(); + gst::debug!( + CAT, + imp = self, + "start next item from cache #{}: {uri}", + item.index(), + ); + } else { + gst::debug!(CAT, imp = self, "start next item #{}: {uri}", item.index(),); + } // don't hold the mutex when updating `uri` to prevent deadlocks. let uridecodebin = state.uridecodebin.clone(); - let uri = item.uri(); state.pending_current_items.push_back(Some(item)); @@ -640,6 +783,13 @@ impl UriPlaylistBin { } let mut state_guard = self.state.lock().unwrap(); + + if let Some(state) = state_guard.as_ref() { + for cached in state.cached_uris.values() { + let _ = std::fs::remove_file(cached); + } + } + *state_guard = None; } } diff --git a/utils/uriplaylistbin/tests/uriplaylistbin.rs b/utils/uriplaylistbin/tests/uriplaylistbin.rs index 94f513ac..aae75b43 100644 --- a/utils/uriplaylistbin/tests/uriplaylistbin.rs +++ b/utils/uriplaylistbin/tests/uriplaylistbin.rs @@ -44,6 +44,13 @@ impl TestMedia { } } + fn mkv_http() -> Self { + Self { + uri: "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/raw/main/utils/uriplaylistbin/tests/sample.mkv?ref_type=heads&inline=false".to_string(), + len: 510.mseconds(), + } + } + fn mkv() -> Self { Self { uri: file_name_to_uri("sample.mkv"), @@ -106,6 +113,7 @@ fn test( iterations: u32, check_streams: bool, iterations_change: Option, + cache: bool, ) -> (Vec, u32, u64) { init(); @@ -113,10 +121,19 @@ fn test( let uris: Vec = medias.iter().map(|t| t.uri.clone()).collect(); + // create a temp directory to store the cache + let cache_dir = + cache.then(|| tempfile::tempdir().expect("failed to create temp cache directory")); + let pipeline = Pipeline(gst::Pipeline::default()); let playlist = gst::ElementFactory::make("uriplaylistbin") .property("uris", &uris) .property("iterations", iterations) + .property("cache", cache) + .property( + "cache-dir", + cache_dir.as_ref().map(|dir| dir.path().to_str().unwrap()), + ) .build() .unwrap(); let mq = gst::ElementFactory::make("multiqueue").build().unwrap(); @@ -254,6 +271,13 @@ fn test( } } + if let Some(cache_dir) = cache_dir { + let dir = std::fs::read_dir(cache_dir.path()).expect("failed to read cache dir"); + // all items should have been cached if we looped the playlist + let n_cached_files = if iterations > 1 { uris.len() } else { 0 }; + assert_eq!(dir.count(), n_cached_files); + } + let current_iteration = playlist.property::("current-iteration"); let current_uri_index = playlist.property::("current-uri-index"); @@ -311,7 +335,7 @@ fn assert_stream_selected(msg: gst::Message, n_streams: usize) -> gst::Object { #[test] fn single_audio() { let (events, current_iteration, current_uri_index) = - test(vec![TestMedia::ogg()], 1, 1, true, None); + test(vec![TestMedia::ogg()], 1, 1, true, None, false); assert_eos(events.into_iter().last().unwrap()); assert_eq!(current_iteration, 0); assert_eq!(current_uri_index, 0); @@ -320,7 +344,7 @@ fn single_audio() { #[test] fn single_video() { let (events, current_iteration, current_uri_index) = - test(vec![TestMedia::mkv()], 2, 1, true, None); + test(vec![TestMedia::mkv()], 2, 1, true, None, false); assert_eos(events.into_iter().last().unwrap()); assert_eq!(current_iteration, 0); assert_eq!(current_uri_index, 0); @@ -334,6 +358,7 @@ fn multi_audio() { 1, true, None, + false, ); assert_eos(events.into_iter().last().unwrap()); assert_eq!(current_iteration, 0); @@ -342,8 +367,14 @@ fn multi_audio() { #[test] fn multi_audio_video() { - let (events, current_iteration, current_uri_index) = - test(vec![TestMedia::mkv(), TestMedia::mkv()], 2, 1, true, None); + let (events, current_iteration, current_uri_index) = test( + vec![TestMedia::mkv(), TestMedia::mkv()], + 2, + 1, + true, + None, + false, + ); assert_eos(events.into_iter().last().unwrap()); assert_eq!(current_iteration, 0); assert_eq!(current_uri_index, 1); @@ -351,8 +382,14 @@ fn multi_audio_video() { #[test] fn iterations() { - let (events, current_iteration, current_uri_index) = - test(vec![TestMedia::mkv(), TestMedia::mkv()], 2, 2, true, None); + let (events, current_iteration, current_uri_index) = test( + vec![TestMedia::mkv(), TestMedia::mkv()], + 2, + 2, + true, + None, + false, + ); assert_eos(events.into_iter().last().unwrap()); assert_eq!(current_iteration, 1); assert_eq!(current_uri_index, 1); @@ -362,8 +399,14 @@ fn iterations() { // FIXME: racy: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/514 #[ignore] fn nb_streams_increasing() { - let (events, current_iteration, current_uri_index) = - test(vec![TestMedia::ogg(), TestMedia::mkv()], 2, 1, false, None); + let (events, current_iteration, current_uri_index) = test( + vec![TestMedia::ogg(), TestMedia::mkv()], + 2, + 1, + false, + None, + false, + ); assert_eos(events.into_iter().last().unwrap()); assert_eq!(current_iteration, 0); assert_eq!(current_uri_index, 1); @@ -377,6 +420,7 @@ fn missing_file() { 1, false, None, + false, ); assert_error( events.into_iter().last().unwrap(), @@ -394,6 +438,7 @@ fn missing_http() { 1, false, None, + false, ); assert_error( events.into_iter().last().unwrap(), @@ -415,6 +460,7 @@ fn increase_iterations() { when_ss: 2, iterations: 8, }), + false, ); assert_eos(events.into_iter().last().unwrap()); assert_eq!(current_iteration, 7); @@ -435,6 +481,7 @@ fn decrease_iterations() { when_ss: 2, iterations: 1, }), + false, ); assert_eos(events.into_iter().last().unwrap()); assert_eq!(current_iteration, 2); @@ -453,8 +500,19 @@ fn infinite_to_finite() { when_ss: 2, iterations: 4, }), + false, ); assert_eos(events.into_iter().last().unwrap()); assert_eq!(current_iteration, 3); assert_eq!(current_uri_index, 0); } + +#[test] +/// cache HTTP playlist items +fn cache() { + let (events, current_iteration, current_uri_index) = + test(vec![TestMedia::mkv_http()], 2, 3, true, None, true); + assert_eos(events.into_iter().last().unwrap()); + assert_eq!(current_iteration, 2); + assert_eq!(current_uri_index, 0); +}