From c63586e0a7d808bcd3d3a652114523751b41ec60 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Thu, 6 Feb 2020 12:39:12 -0300 Subject: [PATCH] nle: Seek the whole stack on initialization Instead of seeking each nleobject separately to setup new stack, wait for the whole stack to preroll and then seek that newly setup stack, leading to the same code path and seek 'tweaking' as when processing a seek on the composition (without stack changes). This is mandatory to properly handle filter that tweak segments to handle time remapping for example. --- plugins/nle/nlecomposition.c | 208 ++++++++++++++++++++++++++--------- plugins/nle/nlesource.c | 40 +++---- 2 files changed, 174 insertions(+), 74 deletions(-) diff --git a/plugins/nle/nlecomposition.c b/plugins/nle/nlecomposition.c index f0cf426f4d..97ffb9431c 100644 --- a/plugins/nle/nlecomposition.c +++ b/plugins/nle/nlecomposition.c @@ -79,7 +79,7 @@ typedef enum } NleUpdateStackReason; static const char *UPDATE_PIPELINE_REASONS[] = { - "Initialize", "Commit", "EOS", "Seek" + "Initialize", "Commit", "EOS", "Seek", "None" }; typedef struct @@ -194,6 +194,8 @@ struct _NleCompositionPrivate /* 0 means that we already received the right caps or segment */ gint seqnum_to_restart_task; gboolean waiting_serialized_query_or_buffer; + GstEvent *stack_initialization_seek; + volatile gboolean stack_initialization_seek_sent; gboolean tearing_down_stack; gboolean suppress_child_error; @@ -258,7 +260,7 @@ static void _update_pipeline_func (NleComposition * comp, static void _commit_func (NleComposition * comp, UpdateCompositionData * ucompo); static GstEvent *get_new_seek_event (NleComposition * comp, gboolean initial, - gboolean updatestoponly); + gboolean updatestoponly, NleUpdateStackReason reason); static gboolean _nle_composition_add_object (NleComposition * comp, NleObject * object); static gboolean _nle_composition_remove_object (NleComposition * comp, @@ -453,6 +455,22 @@ _start_task (NleComposition * comp) GST_OBJECT_UNLOCK (comp); } +static gboolean +_pause_task (NleComposition * comp) +{ + GST_OBJECT_LOCK (comp); + if (comp->task == NULL) { + GST_INFO_OBJECT (comp, "No task set, it must have been stopped, returning"); + GST_OBJECT_UNLOCK (comp); + return FALSE; + } + + gst_task_pause (comp->task); + GST_OBJECT_UNLOCK (comp); + + return TRUE; +} + static gboolean _stop_task (NleComposition * comp) { @@ -546,16 +564,36 @@ _seek_pipeline_func (NleComposition * comp, SeekData * seekd) GstSeekType cur_type, stop_type; gint64 cur, stop; NleCompositionPrivate *priv = comp->priv; + gboolean initializing_stack = priv->stack_initialization_seek == seekd->event; + NleUpdateStackReason reason = + initializing_stack ? COMP_UPDATE_STACK_NONE : COMP_UPDATE_STACK_ON_SEEK; + GstClockTime segment_start, segment_stop; + gboolean reverse; gst_event_parse_seek (seekd->event, &rate, &format, &flags, &cur_type, &cur, &stop_type, &stop); + reverse = rate < 0; + GST_DEBUG_OBJECT (seekd->comp, "start:%" GST_TIME_FORMAT " -- stop:%" GST_TIME_FORMAT " flags:%d", GST_TIME_ARGS (cur), GST_TIME_ARGS (stop), flags); + if (!initializing_stack) { + segment_start = cur; + segment_stop = stop; + } else { + /* During plain playback (no seek), the segment->stop doesn't + * evolve when going from stack to stack, only the start does + * (in reverse playback, the logic is reversed) */ + segment_start = reverse ? priv->segment->start : cur; + segment_stop = reverse ? stop : priv->segment->stop; + } + gst_segment_do_seek (priv->segment, - rate, format, flags, cur_type, cur, stop_type, stop, NULL); + rate, format, flags, cur_type, segment_start, stop_type, segment_stop, + NULL); + gst_segment_do_seek (priv->seek_segment, rate, format, flags, cur_type, cur, stop_type, stop, NULL); @@ -576,8 +614,9 @@ _seek_pipeline_func (NleComposition * comp, SeekData * seekd) } #endif - _post_start_composition_update (seekd->comp, - gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK); + if (!initializing_stack) + _post_start_composition_update (seekd->comp, + gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK); /* crop the segment start/stop values */ /* Only crop segment start value if we don't have a default object */ @@ -587,15 +626,21 @@ _seek_pipeline_func (NleComposition * comp, SeekData * seekd) priv->segment->stop = MIN (priv->segment->stop, NLE_OBJECT_STOP (seekd->comp)); - priv->next_base_time = 0; - comp->priv->flush_seqnum = comp->priv->seek_seqnum = - gst_event_get_seqnum (seekd->event); - seek_handling (seekd->comp, gst_event_get_seqnum (seekd->event), - COMP_UPDATE_STACK_ON_SEEK); + if (initializing_stack) { + GST_INFO_OBJECT (seekd->comp, "Pausing task to run initializing seek."); + _pause_task (seekd->comp); + } else { + priv->next_base_time = 0; + comp->priv->flush_seqnum = comp->priv->seek_seqnum = + gst_event_get_seqnum (seekd->event); + } - _post_start_composition_update_done (seekd->comp, - gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK); + seek_handling (seekd->comp, gst_event_get_seqnum (seekd->event), reason); + + if (!initializing_stack) + _post_start_composition_update_done (seekd->comp, + gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK); } /* Must be called with OBJECTS_LOCK taken */ @@ -838,7 +883,7 @@ _add_action_locked (NleComposition * comp, GCallback func, GST_INFO_OBJECT (comp, "Adding Action for function: %p:%s", action, GST_DEBUG_FUNCPTR_NAME (func)); - if (func == G_CALLBACK (_emit_commited_signal_func)) + if (priority == G_PRIORITY_HIGH) priv->actions = g_list_prepend (priv->actions, action); else priv->actions = g_list_append (priv->actions, action); @@ -858,6 +903,17 @@ _add_action (NleComposition * comp, GCallback func, ACTIONS_UNLOCK (comp); } +static SeekData * +create_seek_data (NleComposition * comp, GstEvent * event) +{ + SeekData *seekd = g_slice_new0 (SeekData); + + seekd->comp = comp; + seekd->event = event; + + return seekd; +} + static void _add_seek_action (NleComposition * comp, GstEvent * event) { @@ -904,11 +960,8 @@ _add_seek_action (NleComposition * comp, GstEvent * event) } } - GST_DEBUG_OBJECT (comp, "Adding Action"); - - seekd = g_slice_new0 (SeekData); - seekd->comp = comp; - seekd->event = event; + GST_DEBUG_OBJECT (comp, "Adding seek Action"); + seekd = create_seek_data (comp, event); comp->priv->next_eos_seqnum = 0; comp->priv->real_eos_seqnum = 0; @@ -1160,6 +1213,7 @@ nle_composition_dispose (GObject * object) g_list_free (priv->objects_stop); g_list_free_full (priv->actions, (GDestroyNotify) _remove_each_action); + g_clear_object (&priv->stack_initialization_seek); nle_composition_reset_target_pad (comp); @@ -1261,7 +1315,6 @@ nle_composition_reset (NleComposition * comp) nle_composition_reset_target_pad (comp); priv->initialized = FALSE; - priv->send_stream_start = TRUE; priv->real_eos_seqnum = 0; priv->seek_seqnum = 0; priv->next_eos_seqnum = 0; @@ -1280,8 +1333,33 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED, NleCompositionPrivate *priv = comp->priv; GstEvent *event; - if (GST_IS_BUFFER (info->data) || - (GST_IS_QUERY (info->data) && GST_QUERY_IS_SERIALIZED (info->data))) { + if (GST_IS_BUFFER (info->data) || (GST_IS_QUERY (info->data) + && GST_QUERY_IS_SERIALIZED (info->data))) { + + if (priv->stack_initialization_seek) { + if (g_atomic_int_compare_and_exchange + (&priv->stack_initialization_seek_sent, FALSE, TRUE)) { + _add_action (comp, G_CALLBACK (_seek_pipeline_func), + create_seek_data (comp, + gst_event_ref (priv->stack_initialization_seek)), + G_PRIORITY_HIGH); + + GST_OBJECT_LOCK (comp); + if (comp->task) + gst_task_start (comp->task); + GST_OBJECT_UNLOCK (comp); + + priv->send_stream_start = + priv->updating_reason == COMP_UPDATE_STACK_INITIALIZE; + } + + GST_DEBUG_OBJECT (comp, + "Dropping %" GST_PTR_FORMAT " while sending initializing stack seek", + info->data); + + return GST_PAD_PROBE_DROP; + } + if (priv->waiting_serialized_query_or_buffer) { GST_INFO_OBJECT (comp, "update_pipeline DONE"); _restart_task (comp); @@ -1299,6 +1377,12 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED, if (_is_ready_to_restart_task (comp, event)) _restart_task (comp); + if (g_atomic_int_compare_and_exchange + (&priv->stack_initialization_seek_sent, TRUE, FALSE)) { + GST_INFO_OBJECT (comp, "Done seeking initialization stack."); + gst_clear_event (&priv->stack_initialization_seek); + } + if (gst_event_get_seqnum (event) != comp->priv->flush_seqnum) { GST_INFO_OBJECT (comp, "Dropping FLUSH_STOP %d -- %d", gst_event_get_seqnum (event), priv->flush_seqnum); @@ -1353,6 +1437,15 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED, retval = GST_PAD_PROBE_DROP; } break; + case GST_EVENT_CAPS: + { + if (priv->stack_initialization_seek) { + GST_INFO_OBJECT (comp, + "Waiting for preroll to send initializing seek, dropping caps."); + return GST_PAD_PROBE_DROP; + } + break; + } case GST_EVENT_SEGMENT: { guint64 rstart, rstop; @@ -1361,6 +1454,11 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED, GstEvent *event2; /* next_base_time */ + if (priv->stack_initialization_seek) { + GST_INFO_OBJECT (comp, "Waiting for preroll to send initializing seek"); + return GST_PAD_PROBE_DROP; + } + if (_is_ready_to_restart_task (comp, event)) _restart_task (comp); @@ -1495,7 +1593,7 @@ nle_composition_commit_func (NleObject * object, gboolean recurse) */ static GstEvent * get_new_seek_event (NleComposition * comp, gboolean initial, - gboolean updatestoponly) + gboolean updatestoponly, NleUpdateStackReason reason) { GstSeekFlags flags = GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH; gint64 start, stop; @@ -1517,12 +1615,18 @@ get_new_seek_event (NleComposition * comp, gboolean initial, GST_TIME_FORMAT, GST_TIME_ARGS (priv->segment->stop), GST_TIME_ARGS (priv->current_stack_stop)); - start = GST_CLOCK_TIME_IS_VALID (priv->segment->start) - ? MAX (priv->segment->start, priv->current_stack_start) - : priv->current_stack_start; - stop = GST_CLOCK_TIME_IS_VALID (priv->segment->stop) - ? MIN (priv->segment->stop, priv->current_stack_stop) - : priv->current_stack_stop; + if (reason == COMP_UPDATE_STACK_INITIALIZE + || reason == COMP_UPDATE_STACK_ON_EOS) { + start = priv->current_stack_start; + stop = priv->current_stack_stop; + } else { + start = GST_CLOCK_TIME_IS_VALID (priv->segment->start) + ? MAX (priv->segment->start, priv->current_stack_start) + : priv->current_stack_start; + stop = GST_CLOCK_TIME_IS_VALID (priv->segment->stop) + ? MIN (priv->segment->stop, priv->current_stack_stop) + : priv->current_stack_stop; + } if (updatestoponly) { starttype = GST_SEEK_TYPE_NONE; @@ -1673,6 +1777,7 @@ _seek_current_stack (NleComposition * comp, GstEvent * event, GST_INFO_OBJECT (comp, "Seeking itself %" GST_PTR_FORMAT, event); if (!peer) { + gst_event_unref (event); GST_ERROR_OBJECT (comp, "Can't seek because no pad available - " "no children in the composition ready to be used, the duration is 0, " "or not committed yet"); @@ -1719,7 +1824,8 @@ seek_handling (NleComposition * comp, gint32 seqnum, update_pipeline (comp, comp->priv->segment->stop, seqnum, update_stack_reason); } else { - GstEvent *toplevel_seek = get_new_seek_event (comp, FALSE, FALSE); + GstEvent *toplevel_seek = get_new_seek_event (comp, FALSE, FALSE, + update_stack_reason); gst_event_set_seqnum (toplevel_seek, seqnum); _set_real_eos_seqnum_from_seek (comp, toplevel_seek); @@ -1873,7 +1979,6 @@ nle_composition_reset_target_pad (NleComposition * comp) nle_object_ghost_pad_set_target (NLE_OBJECT (comp), NLE_OBJECT_SRC (comp), NULL); - priv->send_stream_start = TRUE; } /* nle_composition_ghost_pad_set_target: @@ -2243,7 +2348,7 @@ get_clean_toplevel_stack (NleComposition * comp, GstClockTime * timestamp, return NULL; } - GST_DEBUG ("start:%" GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT, + GST_DEBUG_OBJECT (comp, "start:%" GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop)); if (stack) { @@ -2366,6 +2471,7 @@ _restart_task (NleComposition * comp) comp->priv->seqnum_to_restart_task = 0; comp->priv->waiting_serialized_query_or_buffer = FALSE; + gst_clear_event (&comp->priv->stack_initialization_seek); comp->priv->updating_reason = COMP_UPDATE_STACK_NONE; GST_OBJECT_LOCK (comp); @@ -2817,7 +2923,6 @@ _relink_single_node (NleComposition * comp, GNode * node, NleObject *newparent; GNode *node_it; GstPad *srcpad = NULL, *sinkpad = NULL; - GstEvent *translated_seek; if (G_UNLIKELY (!node)) return; @@ -2839,11 +2944,6 @@ _relink_single_node (NleComposition * comp, GNode * node, gst_bin_add (GST_BIN (comp->priv->current_bin), GST_ELEMENT (newobj)); gst_element_sync_state_with_parent (GST_ELEMENT_CAST (newobj)); - translated_seek = nle_object_translate_incoming_seek (newobj, - gst_event_ref (toplevel_seek)); - - gst_element_send_event (GST_ELEMENT (newobj), translated_seek); - /* link to parent if needed. */ if (newparent) { _link_to_parent (comp, newobj, newparent); @@ -2976,7 +3076,7 @@ beach: } static inline gboolean -_activate_new_stack (NleComposition * comp) +_activate_new_stack (NleComposition * comp, GstEvent * toplevel_seek) { GstPad *pad; GstElement *topelement; @@ -2992,10 +3092,15 @@ _activate_new_stack (NleComposition * comp) GST_DEBUG_OBJECT (comp, "Nothing else in the composition" ", update 'worked'"); + gst_event_unref (toplevel_seek); goto resync_state; } - /* The stack is entirely ready, send seek out synchronously */ + /* The stack is entirely ready, stack initializing seek once ready */ + GST_INFO_OBJECT (comp, "Activating stack with seek: %" GST_PTR_FORMAT, + toplevel_seek); + comp->priv->stack_initialization_seek = toplevel_seek; + topelement = GST_ELEMENT (priv->current->data); /* Get toplevel object source pad */ pad = NLE_OBJECT_SRC (topelement); @@ -3008,6 +3113,7 @@ _activate_new_stack (NleComposition * comp) GST_DEBUG_OBJECT (comp, "New stack activated!"); resync_state: + g_atomic_int_set (&priv->stack_initialization_seek_sent, FALSE); gst_element_set_locked_state (priv->current_bin, FALSE); GST_DEBUG ("going back to parent state"); @@ -3226,7 +3332,8 @@ update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum, } #endif - toplevel_seek = get_new_seek_event (comp, TRUE, updatestoponly); + toplevel_seek = + get_new_seek_event (comp, TRUE, updatestoponly, update_reason); gst_event_set_seqnum (toplevel_seek, seqnum); _set_real_eos_seqnum_from_seek (comp, toplevel_seek); @@ -3236,7 +3343,7 @@ update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum, if (tear_down) { _dump_stack (comp, update_reason, stack); _deactivate_stack (comp, update_reason); - _relink_new_stack (comp, stack, toplevel_seek); + _relink_new_stack (comp, stack, gst_event_ref (toplevel_seek)); } /* Unlock all elements in new stack */ @@ -3258,24 +3365,17 @@ update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum, comp->priv->updating_reason = update_reason; comp->priv->seqnum_to_restart_task = seqnum; - GST_OBJECT_LOCK (comp); - if (comp->task == NULL) { - GST_INFO_OBJECT (comp, - "No task set, it must have been stopped, returning"); - GST_OBJECT_UNLOCK (comp); - return FALSE; + if (!_pause_task (comp)) { + gst_event_unref (toplevel_seek); + return FALSE; } - - gst_task_pause (comp->task); - GST_OBJECT_UNLOCK (comp); } /* Activate stack */ if (tear_down) - return _activate_new_stack (comp); - else - return _seek_current_stack (comp, toplevel_seek, - _have_to_flush_downstream (update_reason)); + return _activate_new_stack (comp, toplevel_seek); + return _seek_current_stack (comp, toplevel_seek, + _have_to_flush_downstream (update_reason)); } static gboolean diff --git a/plugins/nle/nlesource.c b/plugins/nle/nlesource.c index 36ad2779f9..8fc9e711d4 100644 --- a/plugins/nle/nlesource.c +++ b/plugins/nle/nlesource.c @@ -443,7 +443,7 @@ nle_source_send_event (GstElement * element, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: g_mutex_lock (&source->priv->seek_lock); - source->priv->seek_event = event; + gst_event_replace (&source->priv->seek_event, event); g_mutex_unlock (&source->priv->seek_lock); break; default: @@ -459,6 +459,7 @@ ghost_seek_pad (GstElement * source, gpointer user_data) { NleSourcePrivate *priv = NLE_SOURCE (source)->priv; + g_assert (!NLE_OBJECT (source)->in_composition); g_mutex_lock (&priv->seek_lock); if (priv->seek_event) { GstEvent *seek_event = priv->seek_event; @@ -482,7 +483,7 @@ pad_brobe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source) GstPadProbeReturn res = GST_PAD_PROBE_OK; GST_OBJECT_LOCK (source); - if (!priv->areblocked) { + if (!priv->areblocked && priv->seek_event) { GST_INFO_OBJECT (pad, "Blocked now, launching seek"); priv->areblocked = TRUE; gst_element_call_async (GST_ELEMENT (source), ghost_seek_pad, NULL, NULL); @@ -492,9 +493,6 @@ pad_brobe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source) } if (priv->probeid && GST_EVENT_SEQNUM (info->data) == priv->flush_seqnum) { - GST_INFO_OBJECT (source, "Seeking now done: %" GST_PTR_FORMAT - " - %d ? %d", info->data, GST_EVENT_SEQNUM (info->data), - priv->flush_seqnum); priv->flush_seqnum = GST_SEQNUM_INVALID; priv->areblocked = FALSE; priv->probeid = 0; @@ -526,33 +524,35 @@ nle_source_prepare (NleObject * object) return FALSE; } - if (object->in_composition == FALSE) { - gst_element_send_event (GST_ELEMENT_CAST (parent), - gst_event_new_seek (1.0, GST_FORMAT_TIME, - GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH, - GST_SEEK_TYPE_SET, object->start, GST_SEEK_TYPE_SET, object->stop)); - } - - GST_LOG_OBJECT (source, "srcpad:%p, dynamicpads:%d", - object->srcpad, priv->dynamicpads); - if (!priv->staticpad && !(get_valid_src_pad (source, source->element, &pad))) { GST_DEBUG_OBJECT (source, "Couldn't find a valid source pad"); gst_object_unref (parent); return FALSE; - } else { - if (priv->staticpad) - pad = gst_object_ref (priv->staticpad); - priv->ghostedpad = pad; + } + + if (priv->staticpad) + pad = gst_object_ref (priv->staticpad); + priv->ghostedpad = pad; + + if (object->in_composition == FALSE) { + g_mutex_lock (&source->priv->seek_lock); + source->priv->seek_event = + gst_event_new_seek (1.0, GST_FORMAT_TIME, + GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH, + GST_SEEK_TYPE_SET, object->start, GST_SEEK_TYPE_SET, object->stop); + g_mutex_unlock (&source->priv->seek_lock); GST_OBJECT_LOCK (source); priv->probeid = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, (GstPadProbeCallback) pad_brobe_cb, source, NULL); GST_OBJECT_UNLOCK (source); - gst_object_unref (pad); } + GST_LOG_OBJECT (source, "srcpad:%p, dynamicpads:%d", + object->srcpad, priv->dynamicpads); + + gst_object_unref (pad); gst_object_unref (parent); return TRUE;