matroskamux: Add support for latency timeouts in live pipelines

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7510>
This commit is contained in:
Sebastian Dröge 2024-09-13 10:55:03 +03:00 committed by GStreamer Marge Bot
parent 945a7bdfc4
commit 12b434ae9d
2 changed files with 63 additions and 12 deletions

View file

@ -239,6 +239,7 @@ static gboolean gst_matroska_mux_src_event (GstAggregator * agg,
static gboolean gst_matroska_mux_stop (GstAggregator * agg); static gboolean gst_matroska_mux_stop (GstAggregator * agg);
static GstBuffer *gst_matroska_mux_clip (GstAggregator * agg, static GstBuffer *gst_matroska_mux_clip (GstAggregator * agg,
GstAggregatorPad * agg_pad, GstBuffer * buffer); GstAggregatorPad * agg_pad, GstBuffer * buffer);
static GstClockTime gst_matroska_mux_get_next_time (GstAggregator * agg);
static GstPad *gst_matroska_mux_request_new_pad (GstElement * element, static GstPad *gst_matroska_mux_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name, const GstCaps * caps); GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
@ -415,6 +416,8 @@ gst_matroska_mux_class_init (GstMatroskaMuxClass * klass)
GST_DEBUG_FUNCPTR (gst_matroska_mux_sink_event); GST_DEBUG_FUNCPTR (gst_matroska_mux_sink_event);
gstaggregator_class->src_event = gstaggregator_class->src_event =
GST_DEBUG_FUNCPTR (gst_matroska_mux_src_event); GST_DEBUG_FUNCPTR (gst_matroska_mux_src_event);
gstaggregator_class->get_next_time =
GST_DEBUG_FUNCPTR (gst_matroska_mux_get_next_time);
parent_class = g_type_class_peek_parent (klass); parent_class = g_type_class_peek_parent (klass);
@ -662,6 +665,7 @@ gst_matroska_mux_stop (GstAggregator * agg)
/* reset timers */ /* reset timers */
mux->duration = 0; mux->duration = 0;
mux->last_pos = 0;
/* reset cluster */ /* reset cluster */
mux->cluster = 0; mux->cluster = 0;
@ -701,6 +705,20 @@ gst_matroska_mux_src_event (GstAggregator * agg, GstEvent * event)
return GST_AGGREGATOR_CLASS (parent_class)->src_event (agg, event); return GST_AGGREGATOR_CLASS (parent_class)->src_event (agg, event);
} }
static GstClockTime
gst_matroska_mux_get_next_time (GstAggregator * agg)
{
GstMatroskaMux *mux = GST_MATROSKA_MUX (agg);
GstSegment segment;
GstClockTime next_time;
gst_segment_init (&segment, GST_FORMAT_TIME);
next_time =
gst_segment_to_running_time (&segment, GST_FORMAT_TIME, mux->last_pos);
return next_time;
}
static void static void
gst_matroska_mux_free_codec_priv (GstMatroskaTrackContext * context) gst_matroska_mux_free_codec_priv (GstMatroskaTrackContext * context)
{ {
@ -4174,7 +4192,7 @@ gst_matroska_mux_find_best_pad (GstMatroskaMux * mux, gboolean timeout)
buffer = gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD (mux_pad)); buffer = gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD (mux_pad));
if (!buffer) { if (!buffer) {
if (!GST_PAD_IS_EOS (mux_pad)) { if (!timeout && !GST_PAD_IS_EOS (mux_pad)) {
best = NULL; best = NULL;
best_time = GST_CLOCK_TIME_NONE; best_time = GST_CLOCK_TIME_NONE;
break; break;
@ -4203,11 +4221,31 @@ gst_matroska_mux_find_best_pad (GstMatroskaMux * mux, gboolean timeout)
return best; return best;
} }
static gboolean
gst_matroska_mux_all_pads_eos (GstMatroskaMux * mux)
{
GList *l;
GST_OBJECT_LOCK (mux);
for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
GstMatroskaMuxPad *pad = GST_MATROSKA_MUX_PAD (l->data);
if (gst_aggregator_pad_has_buffer (GST_AGGREGATOR_PAD (pad))
|| !gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad))) {
GST_OBJECT_UNLOCK (mux);
return FALSE;
}
}
GST_OBJECT_UNLOCK (mux);
return TRUE;
}
static GstFlowReturn static GstFlowReturn
gst_matroska_mux_aggregate (GstAggregator * agg, gboolean timeout) gst_matroska_mux_aggregate (GstAggregator * agg, gboolean timeout)
{ {
GstMatroskaMux *mux = GST_MATROSKA_MUX (agg); GstMatroskaMux *mux = GST_MATROSKA_MUX (agg);
GstClockTime buffer_timestamp; GstClockTime buffer_timestamp, end_ts = GST_CLOCK_TIME_NONE;
GstEbmlWrite *ebml = mux->ebml_write; GstEbmlWrite *ebml = mux->ebml_write;
GstMatroskaMuxPad *best = NULL; GstMatroskaMuxPad *best = NULL;
GstBuffer *buf; GstBuffer *buf;
@ -4229,15 +4267,20 @@ gst_matroska_mux_aggregate (GstAggregator * agg, gboolean timeout)
best = gst_matroska_mux_find_best_pad (mux, timeout); best = gst_matroska_mux_find_best_pad (mux, timeout);
/* if there is no best pad, we have reached EOS */ /* if there is no best pad, we have reached EOS or timed out without any
* buffers */
if (best == NULL) { if (best == NULL) {
GST_DEBUG_OBJECT (mux, "No best pad. Finishing..."); if (gst_matroska_mux_all_pads_eos (mux)) {
GST_DEBUG_OBJECT (mux, "All pads EOS. Finishing...");
if (!mux->ebml_write->streamable) { if (!mux->ebml_write->streamable) {
gst_matroska_mux_finish (mux); gst_matroska_mux_finish (mux);
} else { } else {
GST_DEBUG_OBJECT (mux, "... but streamable, nothing to finish"); GST_DEBUG_OBJECT (mux, "... but streamable, nothing to finish");
} }
ret = GST_FLOW_EOS; ret = GST_FLOW_EOS;
} else {
ret = GST_AGGREGATOR_FLOW_NEED_DATA;
}
goto exit; goto exit;
} }
@ -4271,8 +4314,7 @@ gst_matroska_mux_aggregate (GstAggregator * agg, gboolean timeout)
/* make note of first and last encountered timestamps, so we can calculate /* make note of first and last encountered timestamps, so we can calculate
* the actual duration later when we send an updated header on eos */ * the actual duration later when we send an updated header on eos */
if (GST_CLOCK_TIME_IS_VALID (buffer_timestamp)) { if (GST_CLOCK_TIME_IS_VALID (buffer_timestamp)) {
GstClockTime start_ts = buffer_timestamp; end_ts = buffer_timestamp;
GstClockTime end_ts = start_ts;
if (GST_BUFFER_DURATION_IS_VALID (buf)) if (GST_BUFFER_DURATION_IS_VALID (buf))
end_ts += GST_BUFFER_DURATION (buf); end_ts += GST_BUFFER_DURATION (buf);
@ -4283,8 +4325,8 @@ gst_matroska_mux_aggregate (GstAggregator * agg, gboolean timeout)
best->end_ts = end_ts; best->end_ts = end_ts;
if (G_UNLIKELY (best->start_ts == GST_CLOCK_TIME_NONE || if (G_UNLIKELY (best->start_ts == GST_CLOCK_TIME_NONE ||
start_ts < best->start_ts)) buffer_timestamp < best->start_ts))
best->start_ts = start_ts; best->start_ts = buffer_timestamp;
} }
if ((gst_buffer_get_size (buf) == 0 && if ((gst_buffer_get_size (buf) == 0 &&
@ -4297,6 +4339,14 @@ gst_matroska_mux_aggregate (GstAggregator * agg, gboolean timeout)
ret = gst_matroska_mux_write_data (mux, best, buf); ret = gst_matroska_mux_write_data (mux, best, buf);
} }
if (GST_CLOCK_TIME_IS_VALID (buffer_timestamp)) {
if (GST_CLOCK_TIME_IS_VALID (end_ts)
&& mux->last_pos < end_ts)
mux->last_pos = end_ts;
else if (mux->last_pos < buffer_timestamp)
mux->last_pos = buffer_timestamp;
}
exit: exit:
gst_clear_object (&best); gst_clear_object (&best);

View file

@ -126,6 +126,7 @@ struct _GstMatroskaMux {
guint64 earliest_time; guint64 earliest_time;
/* length, position (time, ns) */ /* length, position (time, ns) */
guint64 duration; guint64 duration;
GstClockTime last_pos;
/* byte-positions of master-elements (for replacing contents) */ /* byte-positions of master-elements (for replacing contents) */
guint64 segment_pos, guint64 segment_pos,