From 982a9a9aea16f38674bebf529d09c2eef1882d4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Brzezi=C5=84ski?= Date: Mon, 5 Aug 2024 18:54:05 +0200 Subject: [PATCH] hlssink3: Post hls-segment-added message Posts a simple 'hls-segment-added' message with the segment location, start running time and duration. With hlssink2, it was possible to catch 'splitmuxsink-fragment-closed', but since hlssink3 doesn't forward that message (and hlscmafsink doesn't even use that mux), the new one was added to allow for listening for new fragments being added. I extended the existing tests to check whether this message is posted correctly. They theoretically only cover hlssink3, but hlscmafsink uses the same base class so it should be alright for now. Part-of: --- net/hlssink3/src/hlsbasesink.rs | 12 +++++- net/hlssink3/src/hlscmafsink/imp.rs | 9 +++-- net/hlssink3/src/hlssink3/imp.rs | 5 ++- net/hlssink3/tests/hlssink3.rs | 57 +++++++++++++++++++++++------ 4 files changed, 64 insertions(+), 19 deletions(-) diff --git a/net/hlssink3/src/hlsbasesink.rs b/net/hlssink3/src/hlsbasesink.rs index 58ca16a48..ee7aa4564 100644 --- a/net/hlssink3/src/hlsbasesink.rs +++ b/net/hlssink3/src/hlsbasesink.rs @@ -371,6 +371,7 @@ impl HlsBaseSink { &self, location: &str, running_time: Option, + duration: gst::ClockTime, mut segment: MediaSegment, ) -> Result { let mut state = self.state.lock().unwrap(); @@ -435,7 +436,16 @@ impl HlsBaseSink { context.old_segment_locations.push(location.to_string()); } - self.write_playlist(context) + self.write_playlist(context).map(|res| { + let s = gst::Structure::builder("hls-segment-added") + .field("location", location) + .field("running-time", running_time.unwrap()) + .field("duration", duration) + .build(); + self.post_message(gst::message::Element::builder(s).src(&*self.obj()).build()); + + res + }) } fn write_playlist( diff --git a/net/hlssink3/src/hlscmafsink/imp.rs b/net/hlssink3/src/hlscmafsink/imp.rs index 8de2a7779..282605184 100644 --- a/net/hlssink3/src/hlscmafsink/imp.rs +++ b/net/hlssink3/src/hlscmafsink/imp.rs @@ -407,7 +407,7 @@ impl HlsCmafSink { fn add_segment( &self, - duration: f32, + duration: gst::ClockTime, running_time: Option, location: String, ) -> Result { @@ -424,9 +424,10 @@ impl HlsCmafSink { base_imp!(self).add_segment( &location, running_time, + duration, MediaSegment { uri, - duration, + duration: duration.mseconds() as f32 / 1_000f32, map, ..Default::default() }, @@ -481,7 +482,7 @@ impl HlsCmafSink { .downcast_ref::() .unwrap(); let running_time = segment.to_running_time(first.pts().unwrap()); - let dur = first.duration().unwrap(); + let duration = first.duration().unwrap(); let (mut stream, location) = self.on_new_fragment().map_err(|err| { gst::error!( @@ -506,6 +507,6 @@ impl HlsCmafSink { gst::FlowError::Error })?; - self.add_segment(dur.mseconds() as f32 / 1_000f32, running_time, location) + self.add_segment(duration, running_time, location) } } diff --git a/net/hlssink3/src/hlssink3/imp.rs b/net/hlssink3/src/hlssink3/imp.rs index 792e0e125..c35a0c42b 100644 --- a/net/hlssink3/src/hlssink3/imp.rs +++ b/net/hlssink3/src/hlssink3/imp.rs @@ -561,7 +561,7 @@ impl HlsSink3 { } }; - let duration = ((closed_at - opened_at).mseconds() as f32) / 1_000f32; + let duration = closed_at - opened_at; let running_time = state.fragment_running_time; drop(state); @@ -571,9 +571,10 @@ impl HlsSink3 { let _ = base_imp.add_segment( &location, running_time, + duration, MediaSegment { uri, - duration, + duration: duration.mseconds() as f32 / 1_000f32, ..Default::default() }, ); diff --git a/net/hlssink3/tests/hlssink3.rs b/net/hlssink3/tests/hlssink3.rs index eeeb2e116..c84ff83b4 100644 --- a/net/hlssink3/tests/hlssink3.rs +++ b/net/hlssink3/tests/hlssink3.rs @@ -65,6 +65,7 @@ enum HlsSinkEvent { GetPlaylistStream(String), GetFragmentStream(String), DeleteFragment(String), + SegmentAddedMessage(String), } /// Represents a HLS playlist file that writes to a shared string. @@ -166,12 +167,15 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> { } }); - hlssink3.connect("delete-fragment", false, move |args| { - let location = args[1].get::().expect("No location given"); - hls_events_sender - .try_send(HlsSinkEvent::DeleteFragment(location)) - .expect("Send delete fragment event"); - Some(true.to_value()) + hlssink3.connect("delete-fragment", false, { + let hls_events_sender = hls_events_sender.clone(); + move |args| { + let location = args[1].get::().expect("No location given"); + hls_events_sender + .try_send(HlsSinkEvent::DeleteFragment(location)) + .expect("Send delete fragment event"); + Some(true.to_value()) + } }); try_or_pause!(pipeline.add_many([&video_src, &x264enc, &h264parse, &hlssink3,])); @@ -196,6 +200,16 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> { eos = true; break; } + MessageView::Element(msg) => { + if let Some(structure) = msg.structure() { + if structure.has_name("hls-segment-added") { + let location = structure.get::("location").unwrap(); + hls_events_sender + .try_send(HlsSinkEvent::SegmentAddedMessage(location)) + .expect("Send segment added event"); + } + } + } MessageView::Error(..) => unreachable!(), _ => (), } @@ -214,17 +228,22 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> { vec![ GetFragmentStream("segment00000.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), + SegmentAddedMessage("segment00000.ts".to_string()), GetFragmentStream("segment00001.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), + SegmentAddedMessage("segment00001.ts".to_string()), GetFragmentStream("segment00002.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), DeleteFragment("segment00000.ts".to_string()), + SegmentAddedMessage("segment00002.ts".to_string()), GetFragmentStream("segment00003.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), DeleteFragment("segment00001.ts".into()), + SegmentAddedMessage("segment00003.ts".to_string()), GetFragmentStream("segment00004.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), DeleteFragment("segment00002.ts".to_string()), + SegmentAddedMessage("segment00004.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), ] }; @@ -372,12 +391,15 @@ fn test_hlssink3_write_correct_playlist_content() -> Result<(), ()> { } }); - hlssink3.connect("delete-fragment", false, move |args| { - let location = args[1].get::().expect("No location given"); - hls_events_sender - .try_send(HlsSinkEvent::DeleteFragment(location)) - .expect("Send delete fragment event"); - Some(true.to_value()) + hlssink3.connect("delete-fragment", false, { + let hls_events_sender = hls_events_sender.clone(); + move |args| { + let location = args[1].get::().expect("No location given"); + hls_events_sender + .try_send(HlsSinkEvent::DeleteFragment(location)) + .expect("Send delete fragment event"); + Some(true.to_value()) + } }); try_or_pause!(pipeline.add_many([&video_src, &x264enc, &h264parse, &hlssink3,])); @@ -402,6 +424,16 @@ fn test_hlssink3_write_correct_playlist_content() -> Result<(), ()> { eos = true; break; } + MessageView::Element(msg) => { + if let Some(structure) = msg.structure() { + if structure.has_name("hls-segment-added") { + let location = structure.get::("location").unwrap(); + hls_events_sender + .try_send(HlsSinkEvent::SegmentAddedMessage(location)) + .expect("Send segment added event"); + } + } + } MessageView::Error(..) => unreachable!(), _ => (), } @@ -420,6 +452,7 @@ fn test_hlssink3_write_correct_playlist_content() -> Result<(), ()> { vec![ GetFragmentStream("/www/media/segments/my-own-filename-000.ts".to_string()), GetPlaylistStream("/www/media/main.m3u8".to_string()), + SegmentAddedMessage("/www/media/segments/my-own-filename-000.ts".to_string()), GetPlaylistStream("/www/media/main.m3u8".to_string()), ] };