hlssink3: Post hls-segment-added message

Posts a simple 'hls-segment-added' message with the segment location, start running time and duration.
With hlssink2, it was possible to catch 'splitmuxsink-fragment-closed', but since hlssink3 doesn't forward that message
(and hlscmafsink doesn't even use that mux), the new one was added to allow for listening for new fragments being added.

I extended the existing tests to check whether this message is posted correctly.
They theoretically only cover hlssink3, but hlscmafsink uses the same base class so it should be alright for now.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1677>
This commit is contained in:
Piotr Brzeziński 2024-08-05 18:54:05 +02:00 committed by GStreamer Marge Bot
parent 5172e8e520
commit 982a9a9aea
4 changed files with 64 additions and 19 deletions

View file

@ -371,6 +371,7 @@ impl HlsBaseSink {
&self, &self,
location: &str, location: &str,
running_time: Option<gst::ClockTime>, running_time: Option<gst::ClockTime>,
duration: gst::ClockTime,
mut segment: MediaSegment, mut segment: MediaSegment,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
@ -435,7 +436,16 @@ impl HlsBaseSink {
context.old_segment_locations.push(location.to_string()); context.old_segment_locations.push(location.to_string());
} }
self.write_playlist(context) self.write_playlist(context).map(|res| {
let s = gst::Structure::builder("hls-segment-added")
.field("location", location)
.field("running-time", running_time.unwrap())
.field("duration", duration)
.build();
self.post_message(gst::message::Element::builder(s).src(&*self.obj()).build());
res
})
} }
fn write_playlist( fn write_playlist(

View file

@ -407,7 +407,7 @@ impl HlsCmafSink {
fn add_segment( fn add_segment(
&self, &self,
duration: f32, duration: gst::ClockTime,
running_time: Option<gst::ClockTime>, running_time: Option<gst::ClockTime>,
location: String, location: String,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
@ -424,9 +424,10 @@ impl HlsCmafSink {
base_imp!(self).add_segment( base_imp!(self).add_segment(
&location, &location,
running_time, running_time,
duration,
MediaSegment { MediaSegment {
uri, uri,
duration, duration: duration.mseconds() as f32 / 1_000f32,
map, map,
..Default::default() ..Default::default()
}, },
@ -481,7 +482,7 @@ impl HlsCmafSink {
.downcast_ref::<gst::ClockTime>() .downcast_ref::<gst::ClockTime>()
.unwrap(); .unwrap();
let running_time = segment.to_running_time(first.pts().unwrap()); let running_time = segment.to_running_time(first.pts().unwrap());
let dur = first.duration().unwrap(); let duration = first.duration().unwrap();
let (mut stream, location) = self.on_new_fragment().map_err(|err| { let (mut stream, location) = self.on_new_fragment().map_err(|err| {
gst::error!( gst::error!(
@ -506,6 +507,6 @@ impl HlsCmafSink {
gst::FlowError::Error gst::FlowError::Error
})?; })?;
self.add_segment(dur.mseconds() as f32 / 1_000f32, running_time, location) self.add_segment(duration, running_time, location)
} }
} }

View file

@ -561,7 +561,7 @@ impl HlsSink3 {
} }
}; };
let duration = ((closed_at - opened_at).mseconds() as f32) / 1_000f32; let duration = closed_at - opened_at;
let running_time = state.fragment_running_time; let running_time = state.fragment_running_time;
drop(state); drop(state);
@ -571,9 +571,10 @@ impl HlsSink3 {
let _ = base_imp.add_segment( let _ = base_imp.add_segment(
&location, &location,
running_time, running_time,
duration,
MediaSegment { MediaSegment {
uri, uri,
duration, duration: duration.mseconds() as f32 / 1_000f32,
..Default::default() ..Default::default()
}, },
); );

View file

@ -65,6 +65,7 @@ enum HlsSinkEvent {
GetPlaylistStream(String), GetPlaylistStream(String),
GetFragmentStream(String), GetFragmentStream(String),
DeleteFragment(String), DeleteFragment(String),
SegmentAddedMessage(String),
} }
/// Represents a HLS playlist file that writes to a shared string. /// Represents a HLS playlist file that writes to a shared string.
@ -166,12 +167,15 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> {
} }
}); });
hlssink3.connect("delete-fragment", false, move |args| { hlssink3.connect("delete-fragment", false, {
let location = args[1].get::<String>().expect("No location given"); let hls_events_sender = hls_events_sender.clone();
hls_events_sender move |args| {
.try_send(HlsSinkEvent::DeleteFragment(location)) let location = args[1].get::<String>().expect("No location given");
.expect("Send delete fragment event"); hls_events_sender
Some(true.to_value()) .try_send(HlsSinkEvent::DeleteFragment(location))
.expect("Send delete fragment event");
Some(true.to_value())
}
}); });
try_or_pause!(pipeline.add_many([&video_src, &x264enc, &h264parse, &hlssink3,])); try_or_pause!(pipeline.add_many([&video_src, &x264enc, &h264parse, &hlssink3,]));
@ -196,6 +200,16 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> {
eos = true; eos = true;
break; break;
} }
MessageView::Element(msg) => {
if let Some(structure) = msg.structure() {
if structure.has_name("hls-segment-added") {
let location = structure.get::<String>("location").unwrap();
hls_events_sender
.try_send(HlsSinkEvent::SegmentAddedMessage(location))
.expect("Send segment added event");
}
}
}
MessageView::Error(..) => unreachable!(), MessageView::Error(..) => unreachable!(),
_ => (), _ => (),
} }
@ -214,17 +228,22 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> {
vec![ vec![
GetFragmentStream("segment00000.ts".to_string()), GetFragmentStream("segment00000.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()), GetPlaylistStream("playlist.m3u8".to_string()),
SegmentAddedMessage("segment00000.ts".to_string()),
GetFragmentStream("segment00001.ts".to_string()), GetFragmentStream("segment00001.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()), GetPlaylistStream("playlist.m3u8".to_string()),
SegmentAddedMessage("segment00001.ts".to_string()),
GetFragmentStream("segment00002.ts".to_string()), GetFragmentStream("segment00002.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()), GetPlaylistStream("playlist.m3u8".to_string()),
DeleteFragment("segment00000.ts".to_string()), DeleteFragment("segment00000.ts".to_string()),
SegmentAddedMessage("segment00002.ts".to_string()),
GetFragmentStream("segment00003.ts".to_string()), GetFragmentStream("segment00003.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()), GetPlaylistStream("playlist.m3u8".to_string()),
DeleteFragment("segment00001.ts".into()), DeleteFragment("segment00001.ts".into()),
SegmentAddedMessage("segment00003.ts".to_string()),
GetFragmentStream("segment00004.ts".to_string()), GetFragmentStream("segment00004.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()), GetPlaylistStream("playlist.m3u8".to_string()),
DeleteFragment("segment00002.ts".to_string()), DeleteFragment("segment00002.ts".to_string()),
SegmentAddedMessage("segment00004.ts".to_string()),
GetPlaylistStream("playlist.m3u8".to_string()), GetPlaylistStream("playlist.m3u8".to_string()),
] ]
}; };
@ -372,12 +391,15 @@ fn test_hlssink3_write_correct_playlist_content() -> Result<(), ()> {
} }
}); });
hlssink3.connect("delete-fragment", false, move |args| { hlssink3.connect("delete-fragment", false, {
let location = args[1].get::<String>().expect("No location given"); let hls_events_sender = hls_events_sender.clone();
hls_events_sender move |args| {
.try_send(HlsSinkEvent::DeleteFragment(location)) let location = args[1].get::<String>().expect("No location given");
.expect("Send delete fragment event"); hls_events_sender
Some(true.to_value()) .try_send(HlsSinkEvent::DeleteFragment(location))
.expect("Send delete fragment event");
Some(true.to_value())
}
}); });
try_or_pause!(pipeline.add_many([&video_src, &x264enc, &h264parse, &hlssink3,])); try_or_pause!(pipeline.add_many([&video_src, &x264enc, &h264parse, &hlssink3,]));
@ -402,6 +424,16 @@ fn test_hlssink3_write_correct_playlist_content() -> Result<(), ()> {
eos = true; eos = true;
break; break;
} }
MessageView::Element(msg) => {
if let Some(structure) = msg.structure() {
if structure.has_name("hls-segment-added") {
let location = structure.get::<String>("location").unwrap();
hls_events_sender
.try_send(HlsSinkEvent::SegmentAddedMessage(location))
.expect("Send segment added event");
}
}
}
MessageView::Error(..) => unreachable!(), MessageView::Error(..) => unreachable!(),
_ => (), _ => (),
} }
@ -420,6 +452,7 @@ fn test_hlssink3_write_correct_playlist_content() -> Result<(), ()> {
vec![ vec![
GetFragmentStream("/www/media/segments/my-own-filename-000.ts".to_string()), GetFragmentStream("/www/media/segments/my-own-filename-000.ts".to_string()),
GetPlaylistStream("/www/media/main.m3u8".to_string()), GetPlaylistStream("/www/media/main.m3u8".to_string()),
SegmentAddedMessage("/www/media/segments/my-own-filename-000.ts".to_string()),
GetPlaylistStream("/www/media/main.m3u8".to_string()), GetPlaylistStream("/www/media/main.m3u8".to_string()),
] ]
}; };