// Copyright (C) 2021 OneStream Live // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at // . // // SPDX-License-Identifier: MPL-2.0 use std::{ collections::HashMap, path::Path, sync::{Arc, Mutex}, }; use gst::prelude::*; use structopt::StructOpt; #[derive(Debug, StructOpt)] #[structopt(name = "playlist", about = "An example of uriplaylistbin usage.")] struct Opt { #[structopt(default_value = "1", short = "i", long = "iterations")] iterations: u32, uris: Vec, } fn create_pipeline(uris: Vec, iterations: u32) -> anyhow::Result { let pipeline = gst::Pipeline::new(None); let playlist = gst::ElementFactory::make("uriplaylistbin", None)?; pipeline.add(&playlist)?; playlist.set_property("uris", &uris); playlist.set_property("iterations", &iterations); let sink_bins = Arc::new(Mutex::new(HashMap::new())); let sink_bins_clone = sink_bins.clone(); let pipeline_weak = pipeline.downgrade(); playlist.connect_pad_added(move |_playlist, src_pad| { let pipeline = match pipeline_weak.upgrade() { None => return, Some(pipeline) => pipeline, }; let pad_name = src_pad.name(); let sink = if pad_name.starts_with("audio") { gst::parse_bin_from_description("audioconvert ! audioresample ! autoaudiosink", true) .unwrap() } else if pad_name.starts_with("video") { gst::parse_bin_from_description("videoconvert ! autovideosink", true).unwrap() } else { unimplemented!(); }; pipeline.add(&sink).unwrap(); sink.sync_state_with_parent().unwrap(); let sink_pad = sink.static_pad("sink").unwrap(); src_pad.link(&sink_pad).unwrap(); sink_bins.lock().unwrap().insert(pad_name, sink); }); let pipeline_weak = pipeline.downgrade(); playlist.connect_pad_removed(move |_playlist, pad| { let pipeline = match pipeline_weak.upgrade() { None => return, Some(pipeline) => pipeline, }; // remove sink bin that was handling the pad let sink_bins = sink_bins_clone.lock().unwrap(); let sink = sink_bins.get(&pad.name()).unwrap(); pipeline.remove(sink).unwrap(); let _ = sink.set_state(gst::State::Null); }); Ok(pipeline) } fn main() -> anyhow::Result<()> { gst::init().unwrap(); gsturiplaylistbin::plugin_register_static().expect("Failed to register uriplaylistbin plugin"); let opt = Opt::from_args(); if opt.uris.is_empty() { anyhow::bail!("Need at least one URI to play"); } let uris = opt .uris .into_iter() .map(|uri| { let p = Path::new(&uri); match p.canonicalize() { Ok(p) => format!("file://{}", p.to_str().unwrap().to_string()), _ => uri, } }) .collect(); { let pipeline = create_pipeline(uris, opt.iterations)?; pipeline .set_state(gst::State::Playing) .expect("Unable to set the pipeline to the `Playing` state"); let bus = pipeline.bus().unwrap(); for msg in bus.iter_timed(gst::ClockTime::NONE) { use gst::MessageView; match msg.view() { MessageView::Error(err) => { eprintln!( "Error received from element {:?}: {}", err.src().map(|s| s.path_string()), err.error() ); eprintln!("Debugging information: {:?}", err.debug()); break; } MessageView::Eos(..) => break, _ => (), } } pipeline .set_state(gst::State::Null) .expect("Unable to set the pipeline to the `Null` state"); } unsafe { gst::deinit(); } Ok(()) }