From 5fb71dd55b89afad202e66f071b0615dedf51871 Mon Sep 17 00:00:00 2001 From: Vivia Nikolaidou Date: Mon, 24 Oct 2022 16:49:47 +0300 Subject: [PATCH] inputselector: Fix waiting on sync-mode=clock Basically copy over what clocksync does, but taking into account that we have multiple upstream latencies. Part-of: --- .../plugins/elements/gstinputselector.c | 117 +++++++++++++++--- .../plugins/elements/gstinputselector.h | 2 + 2 files changed, 99 insertions(+), 20 deletions(-) diff --git a/subprojects/gstreamer/plugins/elements/gstinputselector.c b/subprojects/gstreamer/plugins/elements/gstinputselector.c index 069e7329f0..f673c46217 100644 --- a/subprojects/gstreamer/plugins/elements/gstinputselector.c +++ b/subprojects/gstreamer/plugins/elements/gstinputselector.c @@ -165,6 +165,8 @@ struct _GstSelectorPad gboolean sending_cached_buffers; GQueue *cached_buffers; + + GstClockID clock_id; }; struct _GstSelectorPadCachedBuffer @@ -345,6 +347,11 @@ gst_selector_pad_reset (GstSelectorPad * pad) gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED); pad->sending_cached_buffers = FALSE; gst_selector_pad_free_cached_buffers (pad); + if (pad->clock_id) { + gst_clock_id_unschedule (pad->clock_id); + gst_clock_id_unref (pad->clock_id); + } + pad->clock_id = NULL; GST_OBJECT_UNLOCK (pad); } @@ -474,6 +481,10 @@ gst_input_selector_eos_wait (GstInputSelector * self, GstSelectorPad * pad, gst_pad_push_event (self->srcpad, gst_event_ref (eos_event)); GST_INPUT_SELECTOR_LOCK (self); + if (pad->clock_id) { + GST_DEBUG_OBJECT (pad, "unlock clock wait"); + gst_clock_id_unschedule (pad->clock_id); + } /* Wake up other pads so they can continue when syncing to * running time, as this pad just switched to EOS and * may enable others to progress */ @@ -558,6 +569,10 @@ gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event) selpad->flushing = TRUE; sel->eos = FALSE; selpad->group_done = FALSE; + if (selpad->clock_id) { + GST_DEBUG_OBJECT (selpad, "unlock clock wait"); + gst_clock_id_unschedule (selpad->clock_id); + } GST_INPUT_SELECTOR_BROADCAST (sel); break; case GST_EVENT_FLUSH_STOP: @@ -750,7 +765,6 @@ gst_input_selector_wait_running_time (GstInputSelector * sel, while (TRUE) { GstPad *active_sinkpad; GstSelectorPad *active_selpad; - GstClock *clock; gint64 cur_running_time; GstClockTime running_time; @@ -774,21 +788,7 @@ gst_input_selector_wait_running_time (GstInputSelector * sel, } cur_running_time = GST_CLOCK_TIME_NONE; - if (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK) { - clock = gst_element_get_clock (GST_ELEMENT_CAST (sel)); - if (clock) { - GstClockTime base_time; - - cur_running_time = gst_clock_get_time (clock); - base_time = gst_element_get_base_time (GST_ELEMENT_CAST (sel)); - if (base_time <= cur_running_time) - cur_running_time -= base_time; - else - cur_running_time = 0; - - gst_object_unref (clock); - } - } else { + if (sel->sync_mode != GST_INPUT_SELECTOR_SYNC_MODE_CLOCK) { GstSegment *active_seg; active_seg = &active_selpad->segment; @@ -818,9 +818,61 @@ gst_input_selector_wait_running_time (GstInputSelector * sel, break; } - if (selpad != active_selpad && !sel->eos && !sel->flushing - && !selpad->flushing && (cur_running_time == GST_CLOCK_TIME_NONE - || running_time >= cur_running_time)) { + if (selpad == active_selpad || sel->eos || sel->flushing + || selpad->flushing) { + GST_DEBUG_OBJECT (selpad, "Waiting aborted. Unblocking"); + GST_INPUT_SELECTOR_UNLOCK (sel); + break; + } + + if (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK + && GST_CLOCK_TIME_IS_VALID (running_time)) { + GstClock *clock; + GstClockReturn cret; + GstClockTime base_time; + GstClockTimeDiff jitter; + GstClockID clock_id; + + base_time = gst_element_get_base_time (GST_ELEMENT_CAST (sel)); + if (!GST_CLOCK_TIME_IS_VALID (base_time)) { + GST_DEBUG_OBJECT (selpad, "sync-mode=clock but no base time. Blocking"); + GST_INPUT_SELECTOR_WAIT (sel); + continue; + } + + clock = gst_element_get_clock (GST_ELEMENT_CAST (sel)); + if (!clock) { + GST_DEBUG_OBJECT (selpad, "sync-mode=clock but no clock. Blocking"); + GST_INPUT_SELECTOR_WAIT (sel); + continue; + } + + /* FIXME: If no upstream latency was queried yet, do one now */ + clock_id = + gst_clock_new_single_shot_id (clock, + running_time + base_time + sel->upstream_latency); + selpad->clock_id = gst_clock_id_ref (clock_id); + GST_INPUT_SELECTOR_UNLOCK (sel); + + gst_object_unref (clock); + cret = gst_clock_id_wait (clock_id, &jitter); + gst_clock_id_unref (clock_id); + + GST_DEBUG_OBJECT (sel, "Clock returned %d, jitter %" GST_STIME_FORMAT, + cret, GST_STIME_ARGS (jitter)); + + GST_INPUT_SELECTOR_LOCK (sel); + if (selpad->clock_id) { + gst_clock_id_unref (selpad->clock_id); + selpad->clock_id = NULL; + } + if (cret == GST_CLOCK_OK || + cret == GST_CLOCK_EARLY || cret == GST_CLOCK_DONE) { + GST_INPUT_SELECTOR_UNLOCK (sel); + break; + } + } else if (!GST_CLOCK_TIME_IS_VALID (cur_running_time) + || running_time >= cur_running_time) { GST_DEBUG_OBJECT (selpad, "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %" GST_TIME_FORMAT, GST_TIME_ARGS (running_time), @@ -1333,6 +1385,8 @@ gst_input_selector_init (GstInputSelector * sel) g_cond_init (&sel->cond); sel->eos = FALSE; + sel->upstream_latency = 0; + /* lets give a change for downstream to do something on * active-pad change before we start pushing new buffers */ g_signal_connect_data (sel, "notify::active-pad", @@ -1714,7 +1768,16 @@ retry: GST_ERROR_OBJECT (pad, "minimum latency bigger than maximum latency"); } - gst_query_set_latency (query, fold_data.live, fold_data.min, fold_data.max); + GST_INPUT_SELECTOR_LOCK (sel); + if (fold_data.live) + sel->upstream_latency = fold_data.min; + else + sel->upstream_latency = 0; + + gst_query_set_latency (query, fold_data.live + || (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK), + fold_data.min, fold_data.max); + GST_INPUT_SELECTOR_UNLOCK (sel); } else { GST_LOG_OBJECT (pad, "latency query failed"); } @@ -1876,6 +1939,7 @@ gst_input_selector_reset (GstInputSelector * sel) } } sel->have_group_id = TRUE; + sel->upstream_latency = 0; GST_INPUT_SELECTOR_UNLOCK (sel); } @@ -1902,6 +1966,19 @@ gst_input_selector_change_state (GstElement * element, GST_INPUT_SELECTOR_BROADCAST (self); GST_INPUT_SELECTOR_UNLOCK (self); break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED:{ + GList *walk; + + for (walk = GST_ELEMENT_CAST (self)->sinkpads; walk; + walk = g_list_next (walk)) { + GstSelectorPad *selpad = GST_SELECTOR_PAD_CAST (walk->data); + if (selpad->clock_id) { + GST_DEBUG_OBJECT (selpad, "unlock clock wait"); + gst_clock_id_unschedule (selpad->clock_id); + } + } + break; + } default: break; } diff --git a/subprojects/gstreamer/plugins/elements/gstinputselector.h b/subprojects/gstreamer/plugins/elements/gstinputselector.h index 9d2eb6aa5c..79a55780b6 100644 --- a/subprojects/gstreamer/plugins/elements/gstinputselector.h +++ b/subprojects/gstreamer/plugins/elements/gstinputselector.h @@ -80,6 +80,8 @@ struct _GstInputSelector { gboolean eos; gboolean eos_sent; gboolean flushing; + + GstClockTime upstream_latency; }; struct _GstInputSelectorClass {