gst-plugins-rs/net/hlssink3/src/hlssink4/imp.rs
Sanchayan Maity 1fc80c288d net/hlssink3: Add hlssink4
`hlssink4` adds support for the following as per RFC 8216

- Multivariant/master playlist
- Alternate Renditions
- Variant Streams
2024-04-10 11:13:06 +05:30

1475 lines
52 KiB
Rust

// Copyright (C) 2024, asymptotic.io
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/*
* `hlssink4` for supporting multi-variant playlist with alternate renditions
* and variant streams. Builds on top of and requires `hlscmafsink`/`hlssink3`.
*
* TODO:
*
* - Support for closed captions
* - Support for WebVTT subtitles
*
* NOT SUPPORTED:
*
* - Muxed audio and video with alternate renditions
* - Simple Media playlist. Use `hlssink3` for the same
*/
use crate::hlssink4::{HlsSink4AlternativeMediaType, HlsSink4MuxerType, HlsSink4PlaylistType};
use gio::prelude::*;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use m3u8_rs::{
AlternativeMedia, AlternativeMediaType, MasterPlaylist, MediaPlaylistType, VariantStream,
};
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Display;
use std::fs;
use std::fs::File;
use std::path;
use std::str::FromStr;
use std::sync::Mutex;
const DEFAULT_AUTO_SELECT: bool = false;
const DEFAULT_FORCED: bool = false;
const DEFAULT_I_FRAMES_ONLY_PLAYLIST: bool = false;
const DEFAULT_IS_DEFAULT: bool = false;
const DEFAULT_MAX_NUM_SEGMENT_FILES: u32 = 10;
const DEFAULT_MUXER_TYPE: HlsSink4MuxerType = HlsSink4MuxerType::Cmaf;
const DEFAULT_PLAYLIST_LENGTH: u32 = 5;
const DEFAULT_PLAYLIST_TYPE: HlsSink4PlaylistType = HlsSink4PlaylistType::Unspecified;
const DEFAULT_SEND_KEYFRAME_REQUESTS: bool = true;
const DEFAULT_TARGET_DURATION: u32 = 15;
const DEFAULT_TS_LOCATION: &str = "segment%05d.ts";
const DEFAULT_INIT_LOCATION: &str = "init%05d.mp4";
const DEFAULT_CMAF_LOCATION: &str = "segment%05d.m4s";
const DEFAULT_MASTER_PLAYLIST_LOCATION: &str = "master.m3u8";
const SIGNAL_DELETE_FRAGMENT: &str = "delete-fragment";
const SIGNAL_GET_FRAGMENT_STREAM: &str = "get-fragment-stream";
const SIGNAL_GET_INIT_STREAM: &str = "get-init-stream";
const SIGNAL_GET_MASTER_PLAYLIST_STREAM: &str = "get-master-playlist-stream";
const SIGNAL_GET_PLAYLIST_STREAM: &str = "get-playlist-stream";
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new("hlssink4", gst::DebugColorFlags::empty(), Some("HLS sink"))
});
impl From<HlsSink4AlternativeMediaType> for AlternativeMediaType {
fn from(media_type: HlsSink4AlternativeMediaType) -> Self {
match media_type {
HlsSink4AlternativeMediaType::Audio => AlternativeMediaType::Audio,
HlsSink4AlternativeMediaType::Video => AlternativeMediaType::Video,
}
}
}
impl From<AlternativeMediaType> for HlsSink4AlternativeMediaType {
fn from(value: AlternativeMediaType) -> Self {
match value {
AlternativeMediaType::Audio => HlsSink4AlternativeMediaType::Audio,
AlternativeMediaType::Video => HlsSink4AlternativeMediaType::Video,
AlternativeMediaType::ClosedCaptions => unimplemented!(),
AlternativeMediaType::Subtitles => unimplemented!(),
AlternativeMediaType::Other(_) => unimplemented!(),
}
}
}
impl FromStr for HlsSink4AlternativeMediaType {
type Err = String;
fn from_str(s: &str) -> Result<HlsSink4AlternativeMediaType, String> {
match s {
"AUDIO" => Ok(HlsSink4AlternativeMediaType::Audio),
"VIDEO" => Ok(HlsSink4AlternativeMediaType::Video),
"audio" => Ok(HlsSink4AlternativeMediaType::Audio),
"video" => Ok(HlsSink4AlternativeMediaType::Video),
_ => unimplemented!(),
}
}
}
impl Display for HlsSink4AlternativeMediaType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
match self {
HlsSink4AlternativeMediaType::Audio => "AUDIO",
HlsSink4AlternativeMediaType::Video => "VIDEO",
}
)
}
}
impl Display for HlsSink4PlaylistType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
match self {
HlsSink4PlaylistType::Unspecified => "unspecified",
HlsSink4PlaylistType::Event => "event",
HlsSink4PlaylistType::Vod => "vod",
}
)
}
}
impl From<HlsSink4PlaylistType> for Option<MediaPlaylistType> {
fn from(pl_type: HlsSink4PlaylistType) -> Self {
use HlsSink4PlaylistType::*;
match pl_type {
Unspecified => None,
Event => Some(MediaPlaylistType::Event),
Vod => Some(MediaPlaylistType::Vod),
}
}
}
impl From<Option<&MediaPlaylistType>> for HlsSink4PlaylistType {
fn from(inner_pl_type: Option<&MediaPlaylistType>) -> Self {
use HlsSink4PlaylistType::*;
match inner_pl_type {
Some(MediaPlaylistType::Event) => Event,
Some(MediaPlaylistType::Vod) => Vod,
None | Some(MediaPlaylistType::Other(_)) => Unspecified,
}
}
}
#[derive(Clone, Default)]
struct AlternateRendition {
media_type: HlsSink4AlternativeMediaType,
/*
* While the URI is optional for an alternate rendition when
* the media type is audio or video, we keep it required here
* because of the way we handle media as non-muxed and each
* media having it's own HLS sink element downstream.
*
* We do not support muxed audio video for renditions.
*/
uri: String,
group_id: String,
language: Option<String>,
name: String,
default: bool,
autoselect: bool,
forced: bool,
}
impl From<&AlternateRendition> for AlternativeMedia {
fn from(rendition: &AlternateRendition) -> Self {
Self {
media_type: AlternativeMediaType::from(rendition.media_type),
uri: Some(rendition.uri.clone()),
group_id: rendition.group_id.clone(),
language: rendition.language.clone(),
name: rendition.name.clone(),
default: rendition.default,
autoselect: rendition.autoselect,
forced: rendition.forced,
..Default::default()
}
}
}
impl From<gst::Structure> for AlternateRendition {
fn from(s: gst::Structure) -> Self {
AlternateRendition {
media_type: s.get::<&str>("media_type").map_or(
HlsSink4AlternativeMediaType::Audio,
|media| {
HlsSink4AlternativeMediaType::from_str(media).expect("Failed to get media type")
},
),
uri: s.get("uri").expect("uri missing in alternate rendition"),
group_id: s
.get("group_id")
.expect("group_id missing in alternate rendition"),
language: s.get("language").unwrap_or(None),
name: s.get("name").expect("name missing in alternate rendition"),
default: s.get("default").unwrap_or(DEFAULT_IS_DEFAULT),
autoselect: s.get("autoselect").unwrap_or(DEFAULT_AUTO_SELECT),
forced: s.get("forced").unwrap_or(DEFAULT_FORCED),
}
}
}
impl From<AlternateRendition> for gst::Structure {
fn from(obj: AlternateRendition) -> Self {
gst::Structure::builder("pad-settings")
.field("media_type", obj.media_type)
.field("uri", obj.uri)
.field("group_id", obj.group_id)
.field("language", obj.language)
.field("name", obj.name)
.field("default", obj.default)
.field("autoselect", obj.autoselect)
.field("forced", obj.forced)
.build()
}
}
#[derive(Clone, Debug, Default)]
struct Variant {
/* No effect when using hlscmafsink which is the default */
is_i_frame: bool,
/*
* For a variant to have muxed audio and video, set the URI on the
* variant pad property of the audio and video pads to be the same.
*/
uri: String,
bandwidth: u64,
codecs: Option<String>,
audio: Option<String>,
video: Option<String>,
}
impl From<gst::Structure> for Variant {
fn from(s: gst::Structure) -> Self {
Variant {
is_i_frame: s
.get("is-i-frame")
.unwrap_or(DEFAULT_I_FRAMES_ONLY_PLAYLIST),
uri: s.get("uri").expect("uri missing in variant stream"),
bandwidth: s
.get::<i32>("bandwidth")
.expect("bandwidth missing in variant stream") as u64,
audio: s.get("audio").unwrap_or(None),
video: s.get("video").unwrap_or(None),
/*
* This will be constructed from caps when using `hlscmafsink`.
* Needs to set otherwise.
*/
codecs: s.get("codecs").unwrap_or(None),
}
}
}
impl From<Variant> for gst::Structure {
fn from(obj: Variant) -> Self {
gst::Structure::builder("variant-stream")
.field("is-i-frame", obj.is_i_frame)
.field("uri", obj.uri)
.field("bandwidth", obj.bandwidth)
.field("codecs", obj.codecs)
.field("audio", obj.audio)
.field("video", obj.video)
.build()
}
}
impl From<&Variant> for VariantStream {
fn from(variant: &Variant) -> Self {
Self {
is_i_frame: variant.is_i_frame,
uri: variant.uri.clone(),
bandwidth: variant.bandwidth,
codecs: variant.codecs.clone(),
audio: variant.audio.clone(),
video: variant.video.clone(),
..Default::default()
}
}
}
impl From<VariantStream> for Variant {
fn from(value: VariantStream) -> Self {
Self {
is_i_frame: value.is_i_frame,
uri: value.uri,
bandwidth: value.bandwidth,
codecs: value.codecs,
audio: value.audio,
video: value.video,
}
}
}
/* Helper functions */
fn accumulate_codec_caps(
codecs: &mut HashMap<String, Vec<gst::Caps>>,
caps: gst::Caps,
id: String,
) {
match codecs.get_mut(id.as_str()) {
Some(v) => {
v.push(caps);
/*
* TODO: Should we move to itertools unique?
*
* It is possible to get multiple CAPS event on the pad. We
* rely on writing the master playlist only after CAPS for
* all the pads are known so that the codec string for variant
* can be generated correctly before writing the playlist. In
* case of multiple events, the count can be higher. If one
* cap is a subset of the other drop the subset cap to prevent
* this.
*/
v.dedup_by(|caps1, caps2| caps1.is_subset(caps2));
}
None => {
let vec = vec![caps];
codecs.insert(id, vec);
}
}
}
fn build_codec_string_for_variant(
variant: &Variant,
codecs: &HashMap<String, Vec<gst::Caps>>,
) -> Result<Option<String>, glib::BoolError> {
/*
* mpegtsmux only accepts stream-format as byte-stream for H264/H265.
* The pbutils helper used relies on codec_data for figuring out the
* profile and level information, however codec_data is absent in the
* case of stream-format being byte-stream.
*
* If the profile and level information are missing from the codecs
* field, clients like hls.js or Video.js which rely on browsers
* built-in HLS support fail to load the media source. This can be
* checked by running something like below in the browser console.
*
* MediaSource.isTypeSupported('video/mp4;codecs=avc1')
* MediaSource.isTypeSupported('video/mp4;codecs=avc1.42000c')
*
* The first one will return a false.
*
* The use of `hlscmafsink` with the muxer type being CMAF by default
* uses the `avc` stream-format where `codec_data` is included and
* the get mime codec helper retrieves the codec string.
*
* If the user opts for MPEGTS instead, either they should provide
* the codec string or expect the codec string to not have the profile
* level information. Though gst-play and ffplay still work with such
* a multi-variant playlist where CODECS does not contain profile info
* for video.
*/
if let Some(codecs_str) = &variant.codecs {
return Ok(Some(codecs_str.clone()));
}
let mut codecs_str: Vec<String> = vec![];
if let Some(audio_group_id) = &variant.audio {
if let Some(caps) = codecs.get(audio_group_id.as_str()) {
for cap in caps.iter() {
match gst_pbutils::codec_utils_caps_get_mime_codec(cap) {
Ok(codec_str) => codecs_str.push(codec_str.to_string()),
Err(e) => return Err(e),
}
}
}
}
if let Some(video_group_id) = &variant.video {
if let Some(caps) = codecs.get(video_group_id.as_str()) {
for cap in caps.iter() {
match gst_pbutils::codec_utils_caps_get_mime_codec(cap) {
Ok(codec_str) => codecs_str.push(codec_str.to_string()),
Err(e) => return Err(e),
}
}
}
}
if let Some(caps) = codecs.get(variant.uri.as_str()) {
for cap in caps.iter() {
match gst_pbutils::codec_utils_caps_get_mime_codec(cap) {
Ok(codec_str) => codecs_str.push(codec_str.to_string()),
Err(e) => return Err(e),
}
}
}
match codecs_str.is_empty() {
false => {
codecs_str.sort();
codecs_str.dedup();
Ok(Some(codecs_str.join(",")))
}
true => Ok(None),
}
}
fn get_existing_hlssink_for_variant(
elem: &HlsSink4,
uri: String,
muxer_type: HlsSink4MuxerType,
) -> (bool, String, gst::Element) {
let sink_name = hlssink_name(uri, muxer_type);
match muxer_type {
HlsSink4MuxerType::Cmaf => {
let hlssink = hlssink_element(muxer_type, sink_name.clone());
(false, sink_name, hlssink)
}
HlsSink4MuxerType::MpegTs => {
if let Some(hlssink) = elem.obj().by_name(&sink_name) {
(true, sink_name, hlssink)
} else {
let hlssink = hlssink_element(muxer_type, sink_name.clone());
(false, sink_name, hlssink)
}
}
}
}
fn hlssink_element(muxer_type: HlsSink4MuxerType, sink_name: String) -> gst::Element {
match muxer_type {
HlsSink4MuxerType::Cmaf => gst::ElementFactory::make("hlscmafsink")
.name(sink_name)
.build()
.expect("hlscmafsink must be available"),
HlsSink4MuxerType::MpegTs => gst::ElementFactory::make("hlssink3")
.name(sink_name)
.build()
.expect("hlssink3 must be available"),
}
}
fn hlssink_name(uri: String, muxer_type: HlsSink4MuxerType) -> String {
match muxer_type {
HlsSink4MuxerType::Cmaf => format!("hlscmafsink-{}", uri).to_string(),
HlsSink4MuxerType::MpegTs => format!("hlssink3-{}", uri).to_string(),
}
}
fn hlssink_pad(hlssink: &gst::Element, muxer_type: HlsSink4MuxerType, is_video: bool) -> gst::Pad {
match muxer_type {
HlsSink4MuxerType::Cmaf => hlssink
.static_pad("sink")
.expect("hlscmafsink always has a sink pad"),
HlsSink4MuxerType::MpegTs => match is_video {
true => hlssink
.request_pad_simple("video")
.expect("hlssink3 always has a video pad"),
false => hlssink
.request_pad_simple("audio")
.expect("hlssink3 always has a video pad"),
},
}
}
fn hlssink_setup_paths(
pad: &HlsSink4Pad,
hlssink: &gst::Element,
muxer_type: HlsSink4MuxerType,
master_playlist_location: String,
uri: String,
) {
let master_playlist_parts: Vec<&str> = master_playlist_location.split('/').collect();
let segment_location = if master_playlist_parts.is_empty() {
uri
} else {
let master_playlist_root =
&master_playlist_parts[..master_playlist_parts.len() - 1].join("/");
format!("{master_playlist_root}/{uri}")
};
let parts: Vec<&str> = segment_location.split('/').collect();
if parts.is_empty() {
gst::error!(CAT, imp: pad, "URI must be relative to master");
gst::element_error!(
pad.parent(),
gst::ResourceError::Failed,
["URI must be relative to master"]
);
return;
}
let segment_playlist_root = &parts[..parts.len() - 1].join("/");
hlssink.set_property("playlist-location", segment_location);
match muxer_type {
HlsSink4MuxerType::Cmaf => {
hlssink.set_property(
"init-location",
format!("{segment_playlist_root}/{DEFAULT_INIT_LOCATION}"),
);
hlssink.set_property(
"location",
format!("{segment_playlist_root}/{DEFAULT_CMAF_LOCATION}"),
);
}
HlsSink4MuxerType::MpegTs => {
hlssink.set_property(
"location",
format!("{segment_playlist_root}/{DEFAULT_TS_LOCATION}"),
);
}
}
}
/*
* The EXT-X-MEDIA tag is used to relate Media Playlists that contain
* alternative Renditions. An EXT-X-MEDIA tag must have TYPE of media.
* We use the existence of the field to decide whether the user meant
* a requested media to be an alternate rendition or a variant stream
* by setting the corresponding property.
*/
fn is_alternate_rendition(s: &gst::Structure) -> bool {
match s.get::<&str>("media_type") {
Ok(s) => s == "AUDIO" || s == "VIDEO" || s == "audio" || s == "video",
Err(_) => false,
}
}
/* Helper functions end */
/*
* A pad/media requested represents either an alternate rendition or
* a variant stream.
*/
#[derive(Clone)]
enum HlsSink4PadSettings {
PadAlternative(AlternateRendition),
PadVariant(Variant),
}
impl Default for HlsSink4PadSettings {
fn default() -> Self {
HlsSink4PadSettings::PadVariant(Variant::default())
}
}
impl From<gst::Structure> for HlsSink4PadSettings {
fn from(s: gst::Structure) -> Self {
match is_alternate_rendition(&s) {
true => HlsSink4PadSettings::PadAlternative(AlternateRendition::from(s)),
false => HlsSink4PadSettings::PadVariant(Variant::from(s)),
}
}
}
impl From<HlsSink4PadSettings> for gst::Structure {
fn from(obj: HlsSink4PadSettings) -> Self {
match obj {
HlsSink4PadSettings::PadAlternative(a) => Into::<gst::Structure>::into(a),
HlsSink4PadSettings::PadVariant(v) => Into::<gst::Structure>::into(v),
}
}
}
#[derive(Default)]
pub(crate) struct HlsSink4Pad {
settings: Mutex<HlsSink4PadSettings>,
}
#[glib::object_subclass]
impl ObjectSubclass for HlsSink4Pad {
const NAME: &'static str = "HlsSink4Pad";
type Type = super::HlsSink4Pad;
type ParentType = gst::GhostPad;
}
impl ObjectImpl for HlsSink4Pad {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecBoxed::builder::<gst::Structure>("alternate-rendition")
.nick("Rendition")
.blurb("Alternate Rendition")
.mutable_ready()
.build(),
glib::ParamSpecBoxed::builder::<gst::Structure>("variant")
.nick("Variant")
.blurb("Variant Stream")
.mutable_ready()
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"alternate-rendition" => {
let s = value
.get::<gst::Structure>()
.expect("Must be a valid AlternateRendition");
let rendition = AlternateRendition::from(s);
let alternative_media = AlternativeMedia::from(&rendition);
let parent = self.parent();
let elem = parent.imp();
let elem_settings = elem.settings.lock().unwrap();
let muxer_type = elem_settings.muxer_type;
let mut state = elem.state.lock().unwrap();
let obj = self.obj();
let pad_name = obj.name();
let is_video = pad_name.contains("video");
let sink_name = hlssink_name(rendition.uri.clone(), muxer_type);
let hlssink = hlssink_element(muxer_type, sink_name.clone());
let peer_pad = hlssink_pad(&hlssink, muxer_type, is_video);
elem.setup_hlssink(&hlssink, &elem_settings);
hlssink_setup_paths(
self,
&hlssink,
muxer_type,
elem_settings.master_playlist_location.clone(),
rendition.uri.clone(),
);
parent
.add(&hlssink)
.expect("Failed to add hlssink for rendition");
self.obj()
.set_target(Some(&peer_pad))
.expect("Failed to set target for rendition");
self.obj()
.set_active(true)
.expect("Failed to activate rendition pad");
state.pads.insert(pad_name.to_string(), sink_name);
state.alternatives.push(alternative_media);
drop(elem_settings);
drop(state);
let mut settings = self.settings.lock().unwrap();
*settings = HlsSink4PadSettings::PadAlternative(rendition);
}
"variant" => {
let s = value
.get::<gst::Structure>()
.expect("Must be a valid Variant");
let variant = Variant::from(s);
let parent = self.parent();
let elem = parent.imp();
let elem_settings = elem.settings.lock().unwrap();
let muxer_type = elem_settings.muxer_type;
let mut state = elem.state.lock().unwrap();
let obj = self.obj();
let pad_name = obj.name();
let is_video = pad_name.contains("video");
/*
* If the variant is to have muxed audio and video, look for
* a hlssink with the same URI.
*/
let (muxed, sink_name, hlssink) = get_existing_hlssink_for_variant(
elem,
variant.uri.clone(),
elem_settings.muxer_type,
);
let peer_pad = hlssink_pad(&hlssink, muxer_type, is_video);
if !muxed {
elem.setup_hlssink(&hlssink, &elem_settings);
hlssink_setup_paths(
self,
&hlssink,
muxer_type,
elem_settings.master_playlist_location.clone(),
variant.uri.clone(),
);
parent
.add(&hlssink)
.expect("Failed to add hlssink for variant");
state.variants.push(variant.clone());
}
if muxer_type == HlsSink4MuxerType::MpegTs && is_video && variant.is_i_frame {
hlssink.set_property("i-frames-only", true);
}
self.obj()
.set_target(Some(&peer_pad))
.expect("Failed to set target for variant");
self.obj()
.set_active(true)
.expect("Failed to activate variant pad");
state.pads.insert(pad_name.to_string(), sink_name);
drop(elem_settings);
drop(state);
let mut settings = self.settings.lock().unwrap();
*settings = HlsSink4PadSettings::PadVariant(variant);
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"alternate-rendition" | "variant" => {
Into::<gst::Structure>::into(settings.clone()).to_value()
}
_ => unimplemented!(),
}
}
}
impl GstObjectImpl for HlsSink4Pad {}
impl PadImpl for HlsSink4Pad {}
impl ProxyPadImpl for HlsSink4Pad {}
impl GhostPadImpl for HlsSink4Pad {}
impl HlsSink4Pad {
fn parent(&self) -> super::HlsSink4 {
self.obj()
.parent()
.map(|elem_obj| {
elem_obj
.downcast::<super::HlsSink4>()
.expect("Wrong Element type")
})
.expect("Pad should have a parent at this stage")
}
}
#[derive(Default)]
struct State {
all_mimes: u32,
audio_pad_serial: u32,
video_pad_serial: u32,
pads: HashMap<String, String>,
alternatives: Vec<AlternativeMedia>,
variants: Vec<Variant>,
codecs: HashMap<String, Vec<gst::Caps>>,
wrote_manifest: bool,
}
#[derive(Debug)]
struct Settings {
master_playlist_location: String,
muxer_type: HlsSink4MuxerType,
/* Below settings will be applied to all underlying hlscmafsink/hlssink3 */
playlist_length: u32,
playlist_type: Option<HlsSink4PlaylistType>,
max_num_segment_files: usize,
send_keyframe_requests: bool,
target_duration: u32,
}
impl Default for Settings {
fn default() -> Self {
Self {
master_playlist_location: DEFAULT_MASTER_PLAYLIST_LOCATION.to_string(),
playlist_length: DEFAULT_PLAYLIST_LENGTH,
playlist_type: Some(DEFAULT_PLAYLIST_TYPE),
max_num_segment_files: DEFAULT_MAX_NUM_SEGMENT_FILES as usize,
send_keyframe_requests: DEFAULT_SEND_KEYFRAME_REQUESTS,
target_duration: DEFAULT_TARGET_DURATION,
muxer_type: DEFAULT_MUXER_TYPE,
}
}
}
#[derive(Default)]
pub struct HlsSink4 {
settings: Mutex<Settings>,
state: Mutex<State>,
}
#[glib::object_subclass]
impl ObjectSubclass for HlsSink4 {
const NAME: &'static str = "GstHlsSink4";
type Type = super::HlsSink4;
type ParentType = gst::Bin;
}
impl ObjectImpl for HlsSink4 {
fn constructed(&self) {
self.parent_constructed();
}
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("master-playlist-location")
.nick("Master Playlist file location")
.blurb("Location of the master playlist file to write")
.default_value(Some(DEFAULT_MASTER_PLAYLIST_LOCATION))
.build(),
glib::ParamSpecUInt::builder("max-files")
.nick("Max files")
.blurb("Maximum number of files to keep on disk. Once the maximum is reached, old files start to be deleted to make room for new ones.")
.build(),
glib::ParamSpecEnum::builder_with_default("muxer-type", DEFAULT_MUXER_TYPE)
.nick("Muxer Type")
.blurb("The muxer to use, cmafmux or mpegtsmux, accordingly selects hlssink3 or hlscmafsink")
.build(),
glib::ParamSpecUInt::builder("playlist-length")
.nick("Playlist length")
.blurb("Length of HLS playlist. To allow players to conform to section 6.3.3 of the HLS specification, this should be at least 3. If set to 0, the playlist will be infinite.")
.default_value(DEFAULT_PLAYLIST_LENGTH)
.build(),
glib::ParamSpecEnum::builder_with_default("playlist-type", DEFAULT_PLAYLIST_TYPE)
.nick("Playlist Type")
.blurb("The type of the playlist to use. When VOD type is set, the playlist will be live until the pipeline ends execution.")
.build(),
glib::ParamSpecBoolean::builder("send-keyframe-requests")
.nick("Send Keyframe Requests")
.blurb("Send keyframe requests to ensure correct fragmentation. If this is disabled then the input must have keyframes in regular intervals.")
.default_value(DEFAULT_SEND_KEYFRAME_REQUESTS)
.build(),
glib::ParamSpecUInt::builder("target-duration")
.nick("Target duration")
.blurb("The target duration in seconds of a segment/file. (0 - disabled, useful for management of segment duration by the streaming server)")
.default_value(DEFAULT_TARGET_DURATION)
.build(),
]
});
&PROPERTIES
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().expect("Failed to get settings lock");
gst::debug!(
CAT,
imp: self,
"Setting property '{}' to '{:?}'",
pspec.name(),
value
);
match pspec.name() {
"master-playlist-location" => {
settings.master_playlist_location =
value.get::<String>().expect("type checked upstream");
}
"max-files" => {
let max_files: u32 = value.get().expect("type checked upstream");
settings.max_num_segment_files = max_files as usize;
}
"muxer-type" => {
settings.muxer_type = value
.get::<HlsSink4MuxerType>()
.expect("type checked upstream");
}
"playlist-length" => {
settings.playlist_length = value.get().expect("type checked upstream");
}
"playlist-type" => {
settings.playlist_type = value
.get::<HlsSink4PlaylistType>()
.expect("type checked upstream")
.into();
}
"target-duration" => {
settings.target_duration = value.get().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().expect("Failed to get settings lock");
match pspec.name() {
"master-playlist-location" => settings.master_playlist_location.to_value(),
"max-files" => {
let max_files = settings.max_num_segment_files as u32;
max_files.to_value()
}
"muxer-type" => settings.muxer_type.to_value(),
"playlist-length" => settings.playlist_length.to_value(),
"playlist-type" => settings
.playlist_type
.unwrap_or(DEFAULT_PLAYLIST_TYPE)
.to_value(),
"send-keyframe-requests" => settings.send_keyframe_requests.to_value(),
"target-duration" => settings.target_duration.to_value(),
_ => unimplemented!(),
}
}
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: Lazy<Vec<glib::subclass::Signal>> = Lazy::new(|| {
vec![
glib::subclass::Signal::builder(SIGNAL_GET_MASTER_PLAYLIST_STREAM)
.param_types([String::static_type()])
.return_type::<Option<gio::OutputStream>>()
.class_handler(|_, args| {
let master_playlist_location = args[1].get::<String>().expect("signal arg");
let elem = args[0].get::<super::HlsSink4>().expect("signal arg");
let imp = elem.imp();
Some(
imp.new_file_stream(&master_playlist_location)
.ok()
.to_value(),
)
})
.accumulator(|_hint, ret, value| {
/* First signal handler wins */
*ret = value.clone();
false
})
.build(),
/* We will proxy the below signals from the underlying hlssink3/hlscmafsink */
glib::subclass::Signal::builder(SIGNAL_DELETE_FRAGMENT)
.param_types([String::static_type()])
.return_type::<bool>()
.class_handler(|_, args| {
let fragment_location = args[1].get::<String>().expect("signal arg");
let elem = args[0].get::<super::HlsSink4>().expect("signal arg");
let imp = elem.imp();
imp.delete_fragment(&fragment_location);
Some(true.to_value())
})
.accumulator(|_hint, ret, value| {
// First signal handler wins
*ret = value.clone();
false
})
.build(),
glib::subclass::Signal::builder(SIGNAL_GET_FRAGMENT_STREAM)
.param_types([String::static_type()])
.return_type::<Option<gio::OutputStream>>()
.class_handler(|_, args| {
let fragment_location = args[1].get::<String>().expect("signal arg");
let elem = args[0].get::<super::HlsSink4>().expect("signal arg");
let imp = elem.imp();
Some(imp.new_file_stream(&fragment_location).ok().to_value())
})
.accumulator(|_hint, ret, value| {
// First signal handler wins
*ret = value.clone();
false
})
.build(),
glib::subclass::Signal::builder(SIGNAL_GET_INIT_STREAM)
.param_types([String::static_type()])
.return_type::<Option<gio::OutputStream>>()
.class_handler(|_, args| {
let elem = args[0].get::<super::HlsSink4>().expect("signal arg");
let init_location = args[1].get::<String>().expect("signal arg");
let imp = elem.imp();
Some(imp.new_file_stream(&init_location).ok().to_value())
})
.accumulator(|_hint, ret, value| {
// First signal handler wins
*ret = value.clone();
false
})
.build(),
glib::subclass::Signal::builder(SIGNAL_GET_PLAYLIST_STREAM)
.param_types([String::static_type()])
.return_type::<Option<gio::OutputStream>>()
.class_handler(|_, args| {
let playlist_location = args[1].get::<String>().expect("signal arg");
let elem = args[0].get::<super::HlsSink4>().expect("signal arg");
let imp = elem.imp();
Some(imp.new_file_stream(&playlist_location).ok().to_value())
})
.accumulator(|_hint, ret, value| {
// First signal handler wins
*ret = value.clone();
false
})
.build(),
]
});
SIGNALS.as_ref()
}
}
impl GstObjectImpl for HlsSink4 {}
impl ElementImpl for HlsSink4 {
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
let state = self.state.lock().unwrap();
if !self.validate_alternate_rendition_and_variants(&state.alternatives, &state.variants) {
gst::element_error!(
self.obj(),
gst::ResourceError::Settings,
["Validation of alternate rendition and variants failed"]
);
return Err(gst::StateChangeError);
}
drop(state);
self.parent_change_state(transition)
}
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"HTTP Live Streaming sink",
"Sink/Muxer",
"HTTP Live Streaming sink",
"Sanchayan Maity <sanchayan@asymptotic.io>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps_any = gst::Caps::new_any();
let audio_pad_template = gst::PadTemplate::with_gtype(
"audio_%u",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&caps_any,
super::HlsSink4Pad::static_type(),
)
.unwrap();
let video_pad_template = gst::PadTemplate::with_gtype(
"video_%u",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&caps_any,
super::HlsSink4Pad::static_type(),
)
.unwrap();
vec![audio_pad_template, video_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn request_new_pad(
&self,
templ: &gst::PadTemplate,
_name: Option<&str>,
_caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
let mut state = self.state.lock().unwrap();
match templ.name_template() {
"audio_%u" => {
let audio_pad_name = format!("audio_{}", state.audio_pad_serial);
let sink_pad = gst::PadBuilder::<super::HlsSink4Pad>::from_template(templ)
.name(audio_pad_name.clone())
.event_function(|pad, parent, event| {
HlsSink4::catch_panic_pad_function(
parent,
|| false,
|this| this.sink_event(pad, event),
)
})
.flags(gst::PadFlags::FIXED_CAPS)
.build();
state.audio_pad_serial += 1;
self.obj()
.add_pad(&sink_pad)
.expect("Failed to add audio pad");
Some(sink_pad.upcast())
}
"video_%u" => {
let video_pad_name = format!("video_{}", state.video_pad_serial);
let sink_pad = gst::PadBuilder::<super::HlsSink4Pad>::from_template(templ)
.name(video_pad_name.clone())
.event_function(|pad, parent, event| {
HlsSink4::catch_panic_pad_function(
parent,
|| false,
|this| this.sink_event(pad, event),
)
})
.flags(gst::PadFlags::FIXED_CAPS)
.build();
state.video_pad_serial += 1;
self.obj()
.add_pad(&sink_pad)
.expect("Failed to add video pad");
Some(sink_pad.upcast())
}
other_name => {
gst::warning!(
CAT,
imp: self,
"requested_new_pad: name \"{}\" is not one of audio, video",
other_name
);
None
}
}
}
fn release_pad(&self, pad: &gst::Pad) {
pad.set_active(false).unwrap();
let ghost_pad = pad.downcast_ref::<gst::GhostPad>().unwrap();
let pad_name = ghost_pad.name().to_string();
let mut state = self.state.lock().unwrap();
if let Some(hlssink_name) = state.pads.get(&pad_name.to_string()) {
if let Some(hlssink) = self.obj().by_name(hlssink_name) {
if let Err(err) = self.obj().remove(&hlssink) {
gst::error!(CAT, imp: self, "Failed to remove hlssink for pad: {} with error: {}", pad_name, err);
}
state.pads.remove(&pad_name);
}
}
self.obj().remove_pad(pad).expect("Failed to remove pad");
}
}
impl BinImpl for HlsSink4 {
fn handle_message(&self, message: gst::Message) {
use gst::MessageView;
match message.view() {
MessageView::Eos(eos) => {
gst::debug!(CAT, imp: self, "Got EOS from {:?}", eos.src());
self.parent_handle_message(message)
}
MessageView::Error(err) => {
gst::error!(CAT, imp: self, "Got error: {} {:?}", err.error(), err.debug());
self.parent_handle_message(message)
}
_ => self.parent_handle_message(message),
}
}
}
impl ChildProxyImpl for HlsSink4 {
fn children_count(&self) -> u32 {
let object = self.obj();
object.num_pads() as u32
}
fn child_by_name(&self, name: &str) -> Option<glib::Object> {
let object = self.obj();
object
.pads()
.into_iter()
.find(|p| p.name() == name)
.map(|p| p.upcast())
}
fn child_by_index(&self, index: u32) -> Option<glib::Object> {
let object = self.obj();
object
.pads()
.into_iter()
.nth(index as usize)
.map(|p| p.upcast())
}
}
impl HlsSink4 {
fn sink_event(&self, hlspad: &super::HlsSink4Pad, event: gst::Event) -> bool {
let pad = hlspad.upcast_ref::<gst::Pad>();
gst::log!(CAT, obj: pad, "Handling event {event:?}");
if let gst::EventView::Caps(ev) = event.view() {
let pad_settings = hlspad.imp().settings.lock().unwrap().to_owned();
let mut state = self.state.lock().unwrap();
let wrote_manifest = state.wrote_manifest;
let caps = ev.caps();
state.all_mimes += 1;
/*
* Keep track of caps for every pad. Depending on whether a
* requested pad/media is an alternate rendition or variant
* stream, track the caps as per group id.
*/
match pad_settings {
HlsSink4PadSettings::PadAlternative(ref a) => {
accumulate_codec_caps(&mut state.codecs, caps.to_owned(), a.group_id.clone());
}
HlsSink4PadSettings::PadVariant(ref v) => {
if let Some(group_id) = &v.video {
accumulate_codec_caps(&mut state.codecs, caps.to_owned(), group_id.clone());
} else if let Some(group_id) = &v.audio {
accumulate_codec_caps(&mut state.codecs, caps.to_owned(), group_id.clone());
} else {
/*
* Variant streams which do not have AUDIO or VIDEO
* set and thus are not associated with any rendition
* groups, are tracked via their URI.
*/
accumulate_codec_caps(&mut state.codecs, caps.to_owned(), v.uri.clone());
}
}
}
/*
* Write the master playlist only if we have got caps on all the
* sink pads.
*/
let write_manifest = state.all_mimes == self.obj().num_sink_pads() as u32;
drop(state);
drop(pad_settings);
if !wrote_manifest && write_manifest {
let mut state = self.state.lock().unwrap();
let codecs = state.codecs.clone();
for variant in state.variants.iter_mut() {
match build_codec_string_for_variant(variant, &codecs) {
Ok(codec_str) => variant.codecs = codec_str,
Err(e) => {
gst::error!(CAT, imp: self, "Failed to build codec string with error: {}", e);
gst::element_error!(
self.obj(),
gst::ResourceError::Failed,
["Failed to build codec string with error: {}", e]
);
}
}
}
drop(state);
self.write_master_playlist();
}
gst::debug!(CAT, imp: self, "Received caps {:?} on pad: {}", caps, pad.name());
}
gst::Pad::event_default(pad, Some(&*self.obj()), event)
}
fn setup_hlssink(&self, hlssink: &gst::Element, settings: &Settings) {
/* Propagate some settings to the underlying hlscmafsink/hlssink3 */
hlssink.set_property("max-files", settings.max_num_segment_files as u32);
hlssink.set_property("playlist-length", settings.playlist_length);
hlssink.set_property_from_str(
"playlist-type",
&settings
.playlist_type
.unwrap_or(DEFAULT_PLAYLIST_TYPE)
.to_string(),
);
if settings.muxer_type == HlsSink4MuxerType::MpegTs {
hlssink.set_property("send-keyframe-requests", settings.send_keyframe_requests);
}
hlssink.set_property("target-duration", settings.target_duration);
let mut signals = vec![
SIGNAL_DELETE_FRAGMENT,
SIGNAL_GET_FRAGMENT_STREAM,
SIGNAL_GET_PLAYLIST_STREAM,
];
if settings.muxer_type == HlsSink4MuxerType::Cmaf {
signals.push(SIGNAL_GET_INIT_STREAM);
}
for signal in signals {
hlssink.connect(signal, false, {
let self_weak = self.downgrade();
move |args| -> Option<glib::Value> {
let self_ = self_weak.upgrade()?;
let location = args[1].get::<&str>().unwrap();
if signal == SIGNAL_DELETE_FRAGMENT {
Some(
self_
.obj()
.emit_by_name::<bool>(signal, &[&location])
.to_value(),
)
} else {
Some(
self_
.obj()
.emit_by_name::<Option<gio::OutputStream>>(signal, &[&location])
.to_value(),
)
}
}
});
}
}
fn validate_alternate_rendition_and_variants(
&self,
alternatives: &[AlternativeMedia],
variants: &[Variant],
) -> bool {
if variants.is_empty() {
gst::error!(CAT, imp: self, "Empty variant stream");
return false;
}
let variants_audio_group_ids = variants
.iter()
.filter_map(|variant| variant.audio.clone())
.collect::<Vec<_>>();
let variants_video_group_ids = variants
.iter()
.filter_map(|variant| variant.video.clone())
.collect::<Vec<_>>();
for alternate in alternatives.iter() {
let groupid = &alternate.group_id;
let res = if alternate.media_type == AlternativeMediaType::Audio {
variants_audio_group_ids
.clone()
.into_iter()
.find(|x| *x == *groupid)
} else {
variants_video_group_ids
.clone()
.into_iter()
.find(|x| *x == *groupid)
};
if res.is_none() {
gst::error!(CAT, imp: self, "No matching GROUP-ID for alternate rendition in variant stream");
return false;
}
}
// NAME in alternate renditions must be unique
let mut names = alternatives
.iter()
.map(|alt| Some(alt.name.clone()))
.collect::<Vec<_>>();
let names_len = names.len();
names.dedup();
if names.len() < names_len {
gst::error!(CAT, imp: self, "Duplicate NAME not allowed in alternate rendition");
return false;
}
true
}
fn write_master_playlist(&self) {
let mut state = self.state.lock().unwrap();
let variant_streams = state.variants.iter().map(VariantStream::from).collect();
let alternatives = state.alternatives.clone();
state.wrote_manifest = true;
drop(state);
let settings = self.settings.lock().unwrap();
let master_playlist_location = settings.master_playlist_location.clone();
let master_playlist_filename = path::Path::new(&master_playlist_location)
.to_str()
.expect("Master playlist path to string conversion failed");
drop(settings);
let playlist = MasterPlaylist {
version: Some(4),
variants: variant_streams,
alternatives,
..Default::default()
};
match self.obj().emit_by_name::<Option<gio::OutputStream>>(
SIGNAL_GET_MASTER_PLAYLIST_STREAM,
&[&master_playlist_filename],
) {
Some(s) => {
let mut stream = s.into_write();
if let Err(err) = playlist.write_to(&mut stream) {
gst::error!(CAT, imp: self, "Failed to write master playlist with error: {}", err);
gst::element_error!(
self.obj(),
gst::ResourceError::Settings,
["Failed to write master playlist with error: {}", err]
);
}
}
None => {
gst::error!(CAT, imp: self, "Could not get stream to write master playlist");
gst::element_error!(
self.obj(),
gst::ResourceError::Settings,
["Could not get stream to write master playlist"]
);
}
}
}
fn new_file_stream<P>(&self, location: &P) -> Result<gio::OutputStream, String>
where
P: AsRef<path::Path>,
{
let file = File::create(location).map_err(move |err| {
let error_msg = gst::error_msg!(
gst::ResourceError::OpenWrite,
[
"Could not open file {} for writing: {}",
location.as_ref().to_str().unwrap(),
err.to_string(),
]
);
self.post_error_message(error_msg);
err.to_string()
})?;
Ok(gio::WriteOutputStream::new(file).upcast())
}
fn delete_fragment<P>(&self, location: &P)
where
P: AsRef<path::Path>,
{
let _ = fs::remove_file(location).map_err(|err| {
gst::warning!(
CAT,
imp: self,
"Could not delete segment file: {}",
err.to_string()
);
});
}
}