mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-02-21 07:06:19 +00:00
fmp4mux: In live pipelines use the current fragment end time as timeout
This allows muxing even if some streams are sparse or have big gaps.
This commit is contained in:
parent
ffea0e2d2d
commit
987e4efc02
2 changed files with 256 additions and 6 deletions
|
@ -500,12 +500,15 @@ impl FMP4Mux {
|
||||||
element: &super::FMP4Mux,
|
element: &super::FMP4Mux,
|
||||||
state: &mut State,
|
state: &mut State,
|
||||||
settings: &Settings,
|
settings: &Settings,
|
||||||
|
timeout: bool,
|
||||||
at_eos: bool,
|
at_eos: bool,
|
||||||
) -> Result<Option<gst::BufferList>, gst::FlowError> {
|
) -> Result<Option<gst::BufferList>, gst::FlowError> {
|
||||||
let class = element.class();
|
let class = element.class();
|
||||||
|
|
||||||
if at_eos {
|
if at_eos {
|
||||||
gst::info!(CAT, obj: element, "Draining at EOS");
|
gst::info!(CAT, obj: element, "Draining at EOS");
|
||||||
|
} else if timeout {
|
||||||
|
gst::info!(CAT, obj: element, "Draining at timeout");
|
||||||
} else {
|
} else {
|
||||||
for stream in &state.streams {
|
for stream in &state.streams {
|
||||||
if !stream.fragment_filled && !stream.sinkpad.is_eos() {
|
if !stream.fragment_filled && !stream.sinkpad.is_eos() {
|
||||||
|
@ -525,7 +528,8 @@ impl FMP4Mux {
|
||||||
|
|
||||||
for stream in &mut state.streams {
|
for stream in &mut state.streams {
|
||||||
assert!(
|
assert!(
|
||||||
at_eos
|
timeout
|
||||||
|
|| at_eos
|
||||||
|| stream.sinkpad.is_eos()
|
|| stream.sinkpad.is_eos()
|
||||||
|| stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
|
|| stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
|
||||||
);
|
);
|
||||||
|
@ -540,7 +544,7 @@ impl FMP4Mux {
|
||||||
|
|
||||||
let fragment_start_pts = state.fragment_start_pts.unwrap();
|
let fragment_start_pts = state.fragment_start_pts.unwrap();
|
||||||
while let Some(gop) = stream.queued_gops.pop_back() {
|
while let Some(gop) = stream.queued_gops.pop_back() {
|
||||||
assert!(gop.final_end_pts);
|
assert!(timeout || gop.final_end_pts);
|
||||||
|
|
||||||
let end_pts = gop.end_pts;
|
let end_pts = gop.end_pts;
|
||||||
gops.push(gop);
|
gops.push(gop);
|
||||||
|
@ -674,7 +678,7 @@ impl FMP4Mux {
|
||||||
let mut buffer_list = None;
|
let mut buffer_list = None;
|
||||||
|
|
||||||
if interleaved_buffers.is_empty() {
|
if interleaved_buffers.is_empty() {
|
||||||
assert!(at_eos);
|
assert!(timeout || at_eos);
|
||||||
} else {
|
} else {
|
||||||
let min_earliest_pts_position = min_earliest_pts_position.unwrap();
|
let min_earliest_pts_position = min_earliest_pts_position.unwrap();
|
||||||
let min_earliest_pts = min_earliest_pts.unwrap();
|
let min_earliest_pts = min_earliest_pts.unwrap();
|
||||||
|
@ -1181,6 +1185,11 @@ impl ElementImpl for FMP4Mux {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AggregatorImpl for FMP4Mux {
|
impl AggregatorImpl for FMP4Mux {
|
||||||
|
fn next_time(&self, _aggregator: &Self::Type) -> Option<gst::ClockTime> {
|
||||||
|
let state = self.state.lock().unwrap();
|
||||||
|
state.fragment_start_pts
|
||||||
|
}
|
||||||
|
|
||||||
fn sink_query(
|
fn sink_query(
|
||||||
&self,
|
&self,
|
||||||
aggregator: &Self::Type,
|
aggregator: &Self::Type,
|
||||||
|
@ -1365,7 +1374,7 @@ impl AggregatorImpl for FMP4Mux {
|
||||||
fn aggregate(
|
fn aggregate(
|
||||||
&self,
|
&self,
|
||||||
aggregator: &Self::Type,
|
aggregator: &Self::Type,
|
||||||
_timeout: bool,
|
timeout: bool,
|
||||||
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||||
let settings = self.settings.lock().unwrap().clone();
|
let settings = self.settings.lock().unwrap().clone();
|
||||||
|
|
||||||
|
@ -1468,7 +1477,7 @@ impl AggregatorImpl for FMP4Mux {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Some(oldest_gop) => {
|
Some(oldest_gop) => {
|
||||||
if !oldest_gop.final_earliest_pts {
|
if !timeout && !oldest_gop.final_earliest_pts {
|
||||||
earliest_pts = None;
|
earliest_pts = None;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1517,7 +1526,7 @@ impl AggregatorImpl for FMP4Mux {
|
||||||
}
|
}
|
||||||
|
|
||||||
// If enough GOPs were queued, drain and create the output fragment
|
// If enough GOPs were queued, drain and create the output fragment
|
||||||
self.drain(aggregator, &mut state, &settings, all_eos)?
|
self.drain(aggregator, &mut state, &settings, timeout, all_eos)?
|
||||||
};
|
};
|
||||||
|
|
||||||
for (sinkpad, event) in upstream_events {
|
for (sinkpad, event) in upstream_events {
|
||||||
|
|
|
@ -403,3 +403,244 @@ fn test_buffer_flags_multi_stream() {
|
||||||
let ev = h1.pull_event().unwrap();
|
let ev = h1.pull_event().unwrap();
|
||||||
assert_eq!(ev.type_(), gst::EventType::Eos);
|
assert_eq!(ev.type_(), gst::EventType::Eos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_live_timeout() {
|
||||||
|
init();
|
||||||
|
|
||||||
|
let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
|
||||||
|
let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
|
||||||
|
|
||||||
|
h1.use_testclock();
|
||||||
|
|
||||||
|
// 5s fragment duration
|
||||||
|
h1.element()
|
||||||
|
.unwrap()
|
||||||
|
.set_property("fragment-duration", gst::ClockTime::from_seconds(5));
|
||||||
|
|
||||||
|
h1.set_src_caps(
|
||||||
|
gst::Caps::builder("video/x-h264")
|
||||||
|
.field("width", 1920i32)
|
||||||
|
.field("height", 1080i32)
|
||||||
|
.field("framerate", gst::Fraction::new(30, 1))
|
||||||
|
.field("stream-format", "avc")
|
||||||
|
.field("alignment", "au")
|
||||||
|
.field("codec_data", gst::Buffer::with_size(1).unwrap())
|
||||||
|
.build(),
|
||||||
|
);
|
||||||
|
h1.play();
|
||||||
|
|
||||||
|
h2.set_src_caps(
|
||||||
|
gst::Caps::builder("audio/mpeg")
|
||||||
|
.field("mpegversion", 4i32)
|
||||||
|
.field("channels", 1i32)
|
||||||
|
.field("rate", 44100i32)
|
||||||
|
.field("stream-format", "raw")
|
||||||
|
.field("base-profile", "lc")
|
||||||
|
.field("profile", "lc")
|
||||||
|
.field("level", "2")
|
||||||
|
.field(
|
||||||
|
"codec_data",
|
||||||
|
gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
|
||||||
|
)
|
||||||
|
.build(),
|
||||||
|
);
|
||||||
|
h2.play();
|
||||||
|
|
||||||
|
let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000);
|
||||||
|
|
||||||
|
// Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag
|
||||||
|
for i in 0..7 {
|
||||||
|
let mut buffer = gst::Buffer::with_size(1).unwrap();
|
||||||
|
{
|
||||||
|
let buffer = buffer.get_mut().unwrap();
|
||||||
|
buffer.set_pts(gst::ClockTime::from_seconds(i));
|
||||||
|
buffer.set_dts(gst::ClockTime::from_seconds(i));
|
||||||
|
buffer.set_duration(gst::ClockTime::SECOND);
|
||||||
|
if i != 0 && i != 5 {
|
||||||
|
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
|
||||||
|
|
||||||
|
// Skip buffer 4th and 6th buffer (end of fragment / stream)
|
||||||
|
if i == 4 || i == 6 {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
let mut buffer = gst::Buffer::with_size(1).unwrap();
|
||||||
|
{
|
||||||
|
let buffer = buffer.get_mut().unwrap();
|
||||||
|
buffer.set_pts(gst::ClockTime::from_seconds(i));
|
||||||
|
buffer.set_dts(gst::ClockTime::from_seconds(i));
|
||||||
|
buffer.set_duration(gst::ClockTime::SECOND);
|
||||||
|
}
|
||||||
|
assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
|
||||||
|
}
|
||||||
|
|
||||||
|
if i == 2 {
|
||||||
|
let ev = loop {
|
||||||
|
let ev = h1.pull_upstream_event().unwrap();
|
||||||
|
if ev.type_() != gst::EventType::Reconfigure
|
||||||
|
&& ev.type_() != gst::EventType::Latency
|
||||||
|
{
|
||||||
|
break ev;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
|
||||||
|
assert_eq!(
|
||||||
|
gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
|
||||||
|
gst_video::UpstreamForceKeyUnitEvent {
|
||||||
|
running_time: Some(gst::ClockTime::from_seconds(5)),
|
||||||
|
all_headers: true,
|
||||||
|
count: 0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let ev = loop {
|
||||||
|
let ev = h2.pull_upstream_event().unwrap();
|
||||||
|
if ev.type_() != gst::EventType::Reconfigure
|
||||||
|
&& ev.type_() != gst::EventType::Latency
|
||||||
|
{
|
||||||
|
break ev;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
|
||||||
|
assert_eq!(
|
||||||
|
gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
|
||||||
|
gst_video::UpstreamForceKeyUnitEvent {
|
||||||
|
running_time: Some(gst::ClockTime::from_seconds(5)),
|
||||||
|
all_headers: true,
|
||||||
|
count: 0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Advance time and crank the clock: this should bring us to the end of the first fragment
|
||||||
|
h1.set_time(gst::ClockTime::from_seconds(5)).unwrap();
|
||||||
|
h1.crank_single_clock_wait().unwrap();
|
||||||
|
|
||||||
|
let header = h1.pull().unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
header.flags(),
|
||||||
|
gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
|
||||||
|
);
|
||||||
|
assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset));
|
||||||
|
assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset));
|
||||||
|
|
||||||
|
let fragment_header = h1.pull().unwrap();
|
||||||
|
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
|
||||||
|
assert_eq!(
|
||||||
|
fragment_header.pts(),
|
||||||
|
Some(gst::ClockTime::ZERO + output_offset)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
fragment_header.dts(),
|
||||||
|
Some(gst::ClockTime::ZERO + output_offset)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
fragment_header.duration(),
|
||||||
|
Some(gst::ClockTime::from_seconds(5))
|
||||||
|
);
|
||||||
|
|
||||||
|
for i in 0..5 {
|
||||||
|
for j in 0..2 {
|
||||||
|
// Skip gap events that don't result in buffers
|
||||||
|
if j == 1 && i == 4 {
|
||||||
|
// Advance time and crank the clock another time. This brings us at the end of the
|
||||||
|
// EOS.
|
||||||
|
h1.set_time(gst::ClockTime::from_seconds(7)).unwrap();
|
||||||
|
h1.crank_single_clock_wait().unwrap();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let buffer = h1.pull().unwrap();
|
||||||
|
if i == 4 && j == 0 {
|
||||||
|
assert_eq!(
|
||||||
|
buffer.flags(),
|
||||||
|
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
|
||||||
|
);
|
||||||
|
} else if i == 5 && j == 0 {
|
||||||
|
assert_eq!(buffer.flags(), gst::BufferFlags::HEADER);
|
||||||
|
} else {
|
||||||
|
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
buffer.pts(),
|
||||||
|
Some(gst::ClockTime::from_seconds(i) + output_offset)
|
||||||
|
);
|
||||||
|
|
||||||
|
if j == 0 {
|
||||||
|
assert_eq!(
|
||||||
|
buffer.dts(),
|
||||||
|
Some(gst::ClockTime::from_seconds(i) + output_offset)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
assert!(buffer.dts().is_none());
|
||||||
|
}
|
||||||
|
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
h1.push_event(gst::event::Eos::new());
|
||||||
|
h2.push_event(gst::event::Eos::new());
|
||||||
|
|
||||||
|
let fragment_header = h1.pull().unwrap();
|
||||||
|
assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER);
|
||||||
|
assert_eq!(
|
||||||
|
fragment_header.pts(),
|
||||||
|
Some(gst::ClockTime::from_seconds(5) + output_offset)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
fragment_header.dts(),
|
||||||
|
Some(gst::ClockTime::from_seconds(5) + output_offset)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
fragment_header.duration(),
|
||||||
|
Some(gst::ClockTime::from_seconds(2))
|
||||||
|
);
|
||||||
|
|
||||||
|
for i in 5..7 {
|
||||||
|
for j in 0..2 {
|
||||||
|
// Skip gap events that don't result in buffers
|
||||||
|
if j == 1 && i == 6 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let buffer = h1.pull().unwrap();
|
||||||
|
if i == 6 && j == 0 {
|
||||||
|
assert_eq!(
|
||||||
|
buffer.flags(),
|
||||||
|
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
|
||||||
|
}
|
||||||
|
assert_eq!(
|
||||||
|
buffer.pts(),
|
||||||
|
Some(gst::ClockTime::from_seconds(i) + output_offset)
|
||||||
|
);
|
||||||
|
if j == 0 {
|
||||||
|
assert_eq!(
|
||||||
|
buffer.dts(),
|
||||||
|
Some(gst::ClockTime::from_seconds(i) + output_offset)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
assert!(buffer.dts().is_none());
|
||||||
|
}
|
||||||
|
assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let ev = h1.pull_event().unwrap();
|
||||||
|
assert_eq!(ev.type_(), gst::EventType::StreamStart);
|
||||||
|
let ev = h1.pull_event().unwrap();
|
||||||
|
assert_eq!(ev.type_(), gst::EventType::Caps);
|
||||||
|
let ev = h1.pull_event().unwrap();
|
||||||
|
assert_eq!(ev.type_(), gst::EventType::Segment);
|
||||||
|
let ev = h1.pull_event().unwrap();
|
||||||
|
assert_eq!(ev.type_(), gst::EventType::Eos);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue