uriplaylistbin: add caching

Add optional caching feature preventing to re-download playlist items
for each iteration.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2028>
This commit is contained in:
Guillaume Desmottes 2025-01-06 14:38:20 +01:00 committed by GStreamer Marge Bot
parent 70ed528c7a
commit 1b761f27ef
6 changed files with 264 additions and 19 deletions

1
Cargo.lock generated
View file

@ -3211,6 +3211,7 @@ dependencies = [
"gstreamer",
"gstreamer-app",
"more-asserts",
"tempfile",
"thiserror 2.0.9",
"url",
]

View file

@ -15734,6 +15734,30 @@
}
},
"properties": {
"cache": {
"blurb": "Cache playlist items from the network to disk so they are downloaded only once when playing multiple iterations.",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "ready",
"readable": true,
"type": "gboolean",
"writable": true
},
"cache-dir": {
"blurb": "The directory where playlist items are downloaded to, if 'cache' is enabled. If not set (default), the XDG cache directory is used.",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "ready",
"readable": true,
"type": "gchararray",
"writable": true
},
"current-iteration": {
"blurb": "The index of the current playlist iteration, or 0 if the iterations property is 0 (unlimited playlist)",
"conditionally-available": false,

View file

@ -18,6 +18,7 @@ thiserror = "2"
gst-app.workspace = true
url = "2.2"
more-asserts = "0.3"
tempfile = "3"
[lib]
name = "gsturiplaylistbin"

View file

@ -20,14 +20,25 @@ use gst::prelude::*;
struct Opt {
#[clap(short, default_value = "1")]
iterations: u32,
#[clap(long, help = "Enable items cache")]
cache: bool,
#[clap(long, help = "Cache directory")]
cache_dir: Option<String>,
uris: Vec<String>,
}
fn create_pipeline(uris: Vec<String>, iterations: u32) -> anyhow::Result<gst::Pipeline> {
fn create_pipeline(
uris: Vec<String>,
iterations: u32,
cache: bool,
cache_dir: Option<String>,
) -> anyhow::Result<gst::Pipeline> {
let pipeline = gst::Pipeline::default();
let playlist = gst::ElementFactory::make("uriplaylistbin")
.property("uris", &uris)
.property("iterations", iterations)
.property("cache", cache)
.property("cache-dir", cache_dir)
.build()?;
pipeline.add(&playlist)?;
@ -117,7 +128,7 @@ fn main() -> anyhow::Result<()> {
.collect();
{
let pipeline = create_pipeline(uris, opt.iterations)?;
let pipeline = create_pipeline(uris, opt.iterations, opt.cache, opt.cache_dir)?;
pipeline
.set_state(gst::State::Playing)

View file

@ -8,6 +8,7 @@
use std::{
collections::{HashMap, VecDeque},
path::PathBuf,
sync::{Arc, Mutex, MutexGuard},
};
@ -35,6 +36,8 @@ enum PlaylistError {
struct Settings {
uris: Vec<String>,
iterations: u32,
cache: bool,
cache_dir: Option<String>,
}
impl Default for Settings {
@ -42,6 +45,8 @@ impl Default for Settings {
Self {
uris: vec![],
iterations: 1,
cache: false,
cache_dir: None,
}
}
}
@ -55,6 +60,8 @@ struct State {
current_item: Option<Item>,
/// key are src pads from uridecodebin
pads: HashMap<gst::Pad, Pads>,
/// URIs cached on disk, only used if `cache` property is enabled.
cached_uris: HashMap<String, PathBuf>,
// read-only properties
current_iteration: u32,
@ -77,6 +84,7 @@ impl State {
pending_current_items: VecDeque::new(),
current_item: None,
pads: HashMap::new(),
cached_uris: HashMap::new(),
current_iteration: 0,
current_uri_index: 0,
}
@ -195,6 +203,30 @@ impl ObjectImpl for UriPlaylistBin {
.default_value(1)
.mutable_playing()
.build(),
/**
* GstUriPlaylistBin:cache:
*
* Cache playlist items from the network to disk so they are downloaded only once when playing multiple iterations.
*
* Since: plugins-rs-0.14.0
*/
glib::ParamSpecBoolean::builder("cache")
.nick("Cache")
.blurb("Cache playlist items from the network to disk so they are downloaded only once when playing multiple iterations.")
.mutable_ready()
.build(),
/**
* GstUriPlaylistBin:cache-dir:
*
* The directory where playlist items are downloaded to, if 'cache' is enabled. If not set (default), the XDG cache directory is used.
*
* Since: plugins-rs-0.14.0
*/
glib::ParamSpecString::builder("cache-dir")
.nick("Cache directory")
.blurb("The directory where playlist items are downloaded to, if 'cache' is enabled. If not set (default), the XDG cache directory is used.")
.mutable_ready()
.build(),
glib::ParamSpecUInt::builder("current-iteration")
.nick("Current iteration")
.blurb("The index of the current playlist iteration, or 0 if the iterations property is 0 (unlimited playlist)")
@ -246,6 +278,30 @@ impl ObjectImpl for UriPlaylistBin {
}
}
}
"cache" => {
let mut settings = self.settings.lock().unwrap();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
imp = self,
"Changing cache from {:?} to {:?}",
settings.cache,
new_value,
);
settings.cache = new_value;
}
"cache-dir" => {
let mut settings = self.settings.lock().unwrap();
let new_value = value.get().expect("type checked upstream");
gst::info!(
CAT,
imp = self,
"Changing cache-dir from {:?} to {:?}",
settings.cache_dir,
new_value,
);
settings.cache_dir = new_value;
}
_ => unimplemented!(),
}
}
@ -276,6 +332,14 @@ impl ObjectImpl for UriPlaylistBin {
.unwrap_or(0)
.to_value()
}
"cache" => {
let settings = self.settings.lock().unwrap();
settings.cache.to_value()
}
"cache-dir" => {
let settings = self.settings.lock().unwrap();
settings.cache_dir.to_value()
}
_ => unimplemented!(),
}
}
@ -411,10 +475,16 @@ impl UriPlaylistBin {
let mut state_guard = self.state.lock().unwrap();
assert!(state_guard.is_none());
let settings = self.settings.lock().unwrap();
// No need to enable caching if we play only one iteration
let download = settings.cache && settings.iterations != 1;
let uridecodebin = gst::ElementFactory::make("uridecodebin3")
.name("playlist-uridecodebin")
.property("download", download)
.property("download-dir", &settings.cache_dir)
.build()
.map_err(|e| PlaylistError::PluginMissing { error: e.into() })?;
drop(settings);
let streamsynchronizer = gst::ElementFactory::make("streamsynchronizer")
.name("playlist-streamsynchronizer")
@ -494,12 +564,81 @@ impl UriPlaylistBin {
});
let bin_weak = self.obj().downgrade();
uridecodebin.connect("about-to-finish", false, move |_args| {
uridecodebin.connect("about-to-finish", false, move |args| {
let uridecodebin = args[0].get::<gst::Bin>().unwrap();
let bin = bin_weak.upgrade()?;
let self_ = bin.imp();
gst::debug!(CAT, obj = bin, "current URI about to finish");
let cache = self_.settings.lock().unwrap().cache;
// `about-to-finish` is emitted when the file has been fully buffered so we are sure it has been fully written to disk.
if cache {
// retrieve cached path of the current item
let download_path = uridecodebin
.iterate_recurse()
.find(|e| {
e.factory()
.map(|factory| factory.name())
.unwrap_or_default()
== "downloadbuffer"
})
.map(|downloadbuffer| downloadbuffer.property::<String>("temp-location"))
.map(PathBuf::from)
.and_then(|path| path.canonicalize().ok());
// urisourcebin uses downloadbuffer only with some specific URI scheme (http, etc).
// So if it has not been used assume it's a local file and loop using the original (or already cached) URI.
if let Some(path) = download_path {
let mut state = self_.state.lock().unwrap();
if let Some(state) = state.as_mut() {
let uri = uridecodebin.property::<String>("uri");
// downloadbuffer will remove the file as soon as it's done with it so we need to make a copy.
let mut link_path = path.clone();
link_path.set_file_name(format!(
"item-{}-{}",
state.current_uri_index,
path.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default()
));
let mut cached = true;
// Try first creating a hard link to prevent a full copy.
if let Err(err) = std::fs::hard_link(&path, &link_path) {
gst::warning!(
CAT,
imp = self_,
"Failed to hard link cached item, try copy: '{err}'"
);
if let Err(err) = std::fs::copy(&path, &link_path) {
// Hard links are only supported with NTFS on Windows so fallback to copy.
gst::warning!(
CAT,
imp = self_,
"Failed to copy cached item: '{err}'"
);
cached = false;
}
}
if cached {
gst::log!(
CAT,
imp = self_,
"URI {uri} cached to {}",
link_path.display()
);
state.cached_uris.insert(uri, link_path);
}
}
}
}
let _ = self_.start_next_item();
None
@ -549,17 +688,21 @@ impl UriPlaylistBin {
}
};
gst::debug!(
CAT,
imp = self,
"start next item #{}: {}",
item.index(),
item.uri()
);
let mut uri = item.uri();
if let Some(path) = state.cached_uris.get(&uri) {
uri = gst::glib::filename_to_uri(path, None).unwrap().to_string();
gst::debug!(
CAT,
imp = self,
"start next item from cache #{}: {uri}",
item.index(),
);
} else {
gst::debug!(CAT, imp = self, "start next item #{}: {uri}", item.index(),);
}
// don't hold the mutex when updating `uri` to prevent deadlocks.
let uridecodebin = state.uridecodebin.clone();
let uri = item.uri();
state.pending_current_items.push_back(Some(item));
@ -640,6 +783,13 @@ impl UriPlaylistBin {
}
let mut state_guard = self.state.lock().unwrap();
if let Some(state) = state_guard.as_ref() {
for cached in state.cached_uris.values() {
let _ = std::fs::remove_file(cached);
}
}
*state_guard = None;
}
}

View file

@ -44,6 +44,13 @@ impl TestMedia {
}
}
fn mkv_http() -> Self {
Self {
uri: "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/raw/main/utils/uriplaylistbin/tests/sample.mkv?ref_type=heads&inline=false".to_string(),
len: 510.mseconds(),
}
}
fn mkv() -> Self {
Self {
uri: file_name_to_uri("sample.mkv"),
@ -106,6 +113,7 @@ fn test(
iterations: u32,
check_streams: bool,
iterations_change: Option<IterationsChange>,
cache: bool,
) -> (Vec<gst::Message>, u32, u64) {
init();
@ -113,10 +121,19 @@ fn test(
let uris: Vec<String> = medias.iter().map(|t| t.uri.clone()).collect();
// create a temp directory to store the cache
let cache_dir =
cache.then(|| tempfile::tempdir().expect("failed to create temp cache directory"));
let pipeline = Pipeline(gst::Pipeline::default());
let playlist = gst::ElementFactory::make("uriplaylistbin")
.property("uris", &uris)
.property("iterations", iterations)
.property("cache", cache)
.property(
"cache-dir",
cache_dir.as_ref().map(|dir| dir.path().to_str().unwrap()),
)
.build()
.unwrap();
let mq = gst::ElementFactory::make("multiqueue").build().unwrap();
@ -254,6 +271,13 @@ fn test(
}
}
if let Some(cache_dir) = cache_dir {
let dir = std::fs::read_dir(cache_dir.path()).expect("failed to read cache dir");
// all items should have been cached if we looped the playlist
let n_cached_files = if iterations > 1 { uris.len() } else { 0 };
assert_eq!(dir.count(), n_cached_files);
}
let current_iteration = playlist.property::<u32>("current-iteration");
let current_uri_index = playlist.property::<u64>("current-uri-index");
@ -311,7 +335,7 @@ fn assert_stream_selected(msg: gst::Message, n_streams: usize) -> gst::Object {
#[test]
fn single_audio() {
let (events, current_iteration, current_uri_index) =
test(vec![TestMedia::ogg()], 1, 1, true, None);
test(vec![TestMedia::ogg()], 1, 1, true, None, false);
assert_eos(events.into_iter().last().unwrap());
assert_eq!(current_iteration, 0);
assert_eq!(current_uri_index, 0);
@ -320,7 +344,7 @@ fn single_audio() {
#[test]
fn single_video() {
let (events, current_iteration, current_uri_index) =
test(vec![TestMedia::mkv()], 2, 1, true, None);
test(vec![TestMedia::mkv()], 2, 1, true, None, false);
assert_eos(events.into_iter().last().unwrap());
assert_eq!(current_iteration, 0);
assert_eq!(current_uri_index, 0);
@ -334,6 +358,7 @@ fn multi_audio() {
1,
true,
None,
false,
);
assert_eos(events.into_iter().last().unwrap());
assert_eq!(current_iteration, 0);
@ -342,8 +367,14 @@ fn multi_audio() {
#[test]
fn multi_audio_video() {
let (events, current_iteration, current_uri_index) =
test(vec![TestMedia::mkv(), TestMedia::mkv()], 2, 1, true, None);
let (events, current_iteration, current_uri_index) = test(
vec![TestMedia::mkv(), TestMedia::mkv()],
2,
1,
true,
None,
false,
);
assert_eos(events.into_iter().last().unwrap());
assert_eq!(current_iteration, 0);
assert_eq!(current_uri_index, 1);
@ -351,8 +382,14 @@ fn multi_audio_video() {
#[test]
fn iterations() {
let (events, current_iteration, current_uri_index) =
test(vec![TestMedia::mkv(), TestMedia::mkv()], 2, 2, true, None);
let (events, current_iteration, current_uri_index) = test(
vec![TestMedia::mkv(), TestMedia::mkv()],
2,
2,
true,
None,
false,
);
assert_eos(events.into_iter().last().unwrap());
assert_eq!(current_iteration, 1);
assert_eq!(current_uri_index, 1);
@ -362,8 +399,14 @@ fn iterations() {
// FIXME: racy: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/514
#[ignore]
fn nb_streams_increasing() {
let (events, current_iteration, current_uri_index) =
test(vec![TestMedia::ogg(), TestMedia::mkv()], 2, 1, false, None);
let (events, current_iteration, current_uri_index) = test(
vec![TestMedia::ogg(), TestMedia::mkv()],
2,
1,
false,
None,
false,
);
assert_eos(events.into_iter().last().unwrap());
assert_eq!(current_iteration, 0);
assert_eq!(current_uri_index, 1);
@ -377,6 +420,7 @@ fn missing_file() {
1,
false,
None,
false,
);
assert_error(
events.into_iter().last().unwrap(),
@ -394,6 +438,7 @@ fn missing_http() {
1,
false,
None,
false,
);
assert_error(
events.into_iter().last().unwrap(),
@ -415,6 +460,7 @@ fn increase_iterations() {
when_ss: 2,
iterations: 8,
}),
false,
);
assert_eos(events.into_iter().last().unwrap());
assert_eq!(current_iteration, 7);
@ -435,6 +481,7 @@ fn decrease_iterations() {
when_ss: 2,
iterations: 1,
}),
false,
);
assert_eos(events.into_iter().last().unwrap());
assert_eq!(current_iteration, 2);
@ -453,8 +500,19 @@ fn infinite_to_finite() {
when_ss: 2,
iterations: 4,
}),
false,
);
assert_eos(events.into_iter().last().unwrap());
assert_eq!(current_iteration, 3);
assert_eq!(current_uri_index, 0);
}
#[test]
/// cache HTTP playlist items
fn cache() {
let (events, current_iteration, current_uri_index) =
test(vec![TestMedia::mkv_http()], 2, 3, true, None, true);
assert_eos(events.into_iter().last().unwrap());
assert_eq!(current_iteration, 2);
assert_eq!(current_uri_index, 0);
}