Delete old segment files

This commit is contained in:
Rafael Caricio 2021-05-16 18:20:12 +02:00
parent e127b9e698
commit c337dd69e4
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
3 changed files with 62 additions and 55 deletions

View file

@ -9,7 +9,7 @@ Progress:
- [x] Support all properties exposed by the `hlssink2` plugin;
- [x] Write TS content to fragment stream (defaults to filesystem);
- [x] Write HLS playlist m3u8 file;
- [ ] Delete old fragments;
- [x] Delete old fragments;
- [ ] Signal to acquire fragment stream;
- [ ] Signal to acquire HLS playlist stream;
- [ ] Signal to delete a fragment file;

View file

@ -2,18 +2,18 @@ use gio::prelude::*;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_info, gst_trace};
use gst::{gst_debug, gst_error, gst_info, gst_trace, gst_warning};
use crate::{output::StreamWriter, playlist::PlaylistRenderState};
use m3u8_rs::playlist::{MediaPlaylist, MediaPlaylistType, MediaSegment};
use once_cell::sync::Lazy;
use std::fs::{File, OpenOptions};
use std::fs;
use std::path;
use std::sync::{Arc, Mutex};
const DEFAULT_LOCATION: &str = "segment%05d.ts";
const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8";
const DEFAULT_MAX_FILES: u32 = 10;
const DEFAULT_MAX_NUM_SEGMENT_FILES: u32 = 10;
const DEFAULT_TARGET_DURATION: u32 = 15;
const DEFAULT_PLAYLIST_LENGTH: u32 = 5;
const DEFAULT_SEND_KEYFRAME_REQUESTS: bool = true;
@ -34,7 +34,7 @@ struct Settings {
playlist_location: String, // TODO: Evaluate the use of `PathBuf` instead.
playlist_root: Option<String>, // TODO: Evaluate the use of `PathBuf` instead.
playlist_length: u32,
max_files: u32,
max_num_segment_files: usize,
target_duration: u32,
send_keyframe_requests: bool,
@ -52,7 +52,7 @@ impl Default for Settings {
playlist_location: String::from(DEFAULT_PLAYLIST_LOCATION),
playlist_root: None,
playlist_length: DEFAULT_PLAYLIST_LENGTH,
max_files: DEFAULT_MAX_FILES,
max_num_segment_files: DEFAULT_MAX_NUM_SEGMENT_FILES as usize,
target_duration: DEFAULT_TARGET_DURATION,
send_keyframe_requests: DEFAULT_SEND_KEYFRAME_REQUESTS,
@ -69,11 +69,11 @@ enum State {
Started {
playlist: MediaPlaylist,
playlist_render_state: PlaylistRenderState,
playlist_index: u32,
playlist_index: usize,
current_segment_file: Option<File>,
fragment_opened_at: Option<gst::ClockTime>,
current_segment_location: Option<String>,
old_segment_locations: Vec<String>,
},
}
@ -126,7 +126,7 @@ impl FlexHlsSink {
playlist_index: 0,
current_segment_location: None,
fragment_opened_at: None,
current_segment_file: None,
old_segment_locations: Vec::new(),
};
}
@ -145,13 +145,12 @@ impl FlexHlsSink {
);
let mut state = self.state.lock().unwrap();
let (current_segment_location, current_segment_file) = match &mut *state {
let current_segment_location = match &mut *state {
State::Stopped => return Err("Not in Started state".to_string()),
State::Started {
current_segment_location,
current_segment_file,
..
} => (current_segment_location, current_segment_file),
} => current_segment_location,
};
let settings = self.settings.lock().unwrap();
@ -162,19 +161,13 @@ impl FlexHlsSink {
.replace(BACKWARDS_COMPATIBLE_PLACEHOLDER, &seq_num);
gst_trace!(CAT, "Segment location formatted: {}", segment_file_location);
let segment_file = File::create(&segment_file_location).map_err(move |err| {
gst_error!(CAT, "Could not create a new segment file");
err.to_string()
})?;
*current_segment_location = Some(segment_file_location.clone());
*current_segment_file = Some(segment_file);
let giostreamsink = settings.giostreamsink.as_ref().unwrap();
// TODO: this should be a call to the signal exposed by this plugin
let stream = self
.get_fragment_stream(element, &segment_file_location)
.new_file_stream(element, &segment_file_location)
.map_err(|err| err.to_string())?;
let giostreamsink = settings.giostreamsink.as_ref().unwrap();
giostreamsink.set_property("stream", &stream).unwrap();
gst_info!(
@ -185,19 +178,7 @@ impl FlexHlsSink {
Ok(segment_file_location)
}
fn get_fragment_stream<P>(
&self,
element: &super::FlexHlsSink,
location: &P,
) -> Result<gio::WriteOutputStream, String>
where
P: AsRef<path::Path>,
{
let file_stream = File::create(location).map_err(|err| err.to_string())?;
Ok(gio::WriteOutputStream::new(file_stream))
}
fn get_playlist_stream<P>(
fn new_file_stream<P>(
&self,
element: &super::FlexHlsSink,
location: &P,
@ -206,7 +187,7 @@ impl FlexHlsSink {
P: AsRef<path::Path>,
{
let element_weak = element.downgrade();
let file = OpenOptions::new()
let file = fs::OpenOptions::new()
.write(true)
.create(true)
.open(location)
@ -214,7 +195,7 @@ impl FlexHlsSink {
let error_msg = gst::error_msg!(
gst::ResourceError::OpenWrite,
[
"Could not open playlist file {} for writing: {}",
"Could not open file {} for writing: {}",
location.as_ref().to_str().unwrap(),
err.to_string(),
]
@ -242,23 +223,26 @@ impl FlexHlsSink {
current_segment_location,
playlist_render_state,
playlist_index,
old_segment_locations,
..
} => {
gst_info!(CAT, "COUNT {}", playlist.segments.len());
// TODO: Add new entry to the playlist
let segment_location = current_segment_location
.as_ref()
.take()
.ok_or_else(|| gst::StateChangeError)?;
let segment_duration = fragment_closed_at
- fragment_opened_at
.as_ref()
.ok_or_else(|| gst::StateChangeError)?;
playlist.segments.push(MediaSegment {
uri: segment_location.to_string(),
duration: segment_duration.seconds().unwrap() as f32,
uri: segment_location.clone(),
duration: {
let fragment_opened_at =
fragment_opened_at.as_ref().ok_or(gst::StateChangeError)?;
let segment_duration = fragment_closed_at - fragment_opened_at;
segment_duration.seconds().ok_or(gst::StateChangeError)? as f32
},
title: None,
byte_range: None,
discontinuity: false,
@ -268,16 +252,28 @@ impl FlexHlsSink {
daterange: None,
});
let (playlist_location, max_num_segments, max_playlist_length) = {
let settings = self.settings.lock().unwrap();
(
settings.playlist_location.clone(),
settings.max_num_segment_files,
settings.playlist_length as usize,
)
};
// TODO: remove old segments from playlist
if playlist.segments.len() > max_playlist_length {
for _ in 0..playlist.segments.len() - max_playlist_length {
let _ = playlist.segments.remove(0);
}
}
*playlist_index += 1;
playlist.media_sequence = *playlist_index as i32 - playlist.segments.len() as i32;
let playlist_location = {
let settings = self.settings.lock().unwrap();
settings.playlist_location.clone()
};
// TODO: this should be a call to the signal exposed by this plugin
let playlist_file = self
.get_playlist_stream(&element, &playlist_location)
.new_file_stream(&element, &playlist_location)
.map_err(|_| gst::StateChangeError)?;
let mut playlist_stream = StreamWriter(playlist_file);
@ -292,9 +288,16 @@ impl FlexHlsSink {
*playlist_render_state = PlaylistRenderState::Started;
// TODO: clean up (delete) old segment files
*current_segment_location = None;
old_segment_locations.push(segment_location);
if old_segment_locations.len() > max_num_segments {
for _ in 0..old_segment_locations.len() - max_num_segments {
let old_segment_location = old_segment_locations.remove(0);
// TODO: trigger event to delete segment location
let _ = fs::remove_file(&old_segment_location).map_err(|err| {
gst_warning!(CAT, "Could not delete segment file: {}", err.to_string());
});
}
}
}
};
@ -411,7 +414,7 @@ impl ObjectImpl for FlexHlsSink {
"Maximum number of files to keep on disk. Once the maximum is reached, old files start to be deleted to make room for new ones.",
0,
u32::MAX,
DEFAULT_MAX_FILES,
DEFAULT_MAX_NUM_SEGMENT_FILES,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
@ -477,7 +480,8 @@ impl ObjectImpl for FlexHlsSink {
.expect("type checked upstream");
}
"max-files" => {
settings.max_files = value.get().expect("type checked upstream");
let max_files: u32 = value.get().expect("type checked upstream");
settings.max_num_segment_files = max_files as usize;
}
"target-duration" => {
settings.target_duration = value.get().expect("type checked upstream");
@ -511,7 +515,10 @@ impl ObjectImpl for FlexHlsSink {
"location" => settings.location.to_value(),
"playlist-location" => settings.playlist_location.to_value(),
"playlist-root" => settings.playlist_root.to_value(),
"max-files" => settings.max_files.to_value(),
"max-files" => {
let max_files = settings.max_num_segment_files as u32;
max_files.to_value()
}
"target-duration" => settings.target_duration.to_value(),
"playlist-length" => settings.playlist_length.to_value(),
"send-keyframe-requests" => settings.send_keyframe_requests.to_value(),

View file

@ -110,7 +110,7 @@ fn test_basic_element_with_video_content() {
fn test_basic_element_properties() {
init();
const BUFFER_NB: i32 = 3;
const BUFFER_NB: i32 = 200;
let pipeline = gst::Pipeline::new(Some("audio_pipeline"));