diff --git a/plugins/nle/nlecomposition.c b/plugins/nle/nlecomposition.c index f0cf426f4d..97ffb9431c 100644 --- a/plugins/nle/nlecomposition.c +++ b/plugins/nle/nlecomposition.c @@ -79,7 +79,7 @@ typedef enum } NleUpdateStackReason; static const char *UPDATE_PIPELINE_REASONS[] = { - "Initialize", "Commit", "EOS", "Seek" + "Initialize", "Commit", "EOS", "Seek", "None" }; typedef struct @@ -194,6 +194,8 @@ struct _NleCompositionPrivate /* 0 means that we already received the right caps or segment */ gint seqnum_to_restart_task; gboolean waiting_serialized_query_or_buffer; + GstEvent *stack_initialization_seek; + volatile gboolean stack_initialization_seek_sent; gboolean tearing_down_stack; gboolean suppress_child_error; @@ -258,7 +260,7 @@ static void _update_pipeline_func (NleComposition * comp, static void _commit_func (NleComposition * comp, UpdateCompositionData * ucompo); static GstEvent *get_new_seek_event (NleComposition * comp, gboolean initial, - gboolean updatestoponly); + gboolean updatestoponly, NleUpdateStackReason reason); static gboolean _nle_composition_add_object (NleComposition * comp, NleObject * object); static gboolean _nle_composition_remove_object (NleComposition * comp, @@ -453,6 +455,22 @@ _start_task (NleComposition * comp) GST_OBJECT_UNLOCK (comp); } +static gboolean +_pause_task (NleComposition * comp) +{ + GST_OBJECT_LOCK (comp); + if (comp->task == NULL) { + GST_INFO_OBJECT (comp, "No task set, it must have been stopped, returning"); + GST_OBJECT_UNLOCK (comp); + return FALSE; + } + + gst_task_pause (comp->task); + GST_OBJECT_UNLOCK (comp); + + return TRUE; +} + static gboolean _stop_task (NleComposition * comp) { @@ -546,16 +564,36 @@ _seek_pipeline_func (NleComposition * comp, SeekData * seekd) GstSeekType cur_type, stop_type; gint64 cur, stop; NleCompositionPrivate *priv = comp->priv; + gboolean initializing_stack = priv->stack_initialization_seek == seekd->event; + NleUpdateStackReason reason = + initializing_stack ? COMP_UPDATE_STACK_NONE : COMP_UPDATE_STACK_ON_SEEK; + GstClockTime segment_start, segment_stop; + gboolean reverse; gst_event_parse_seek (seekd->event, &rate, &format, &flags, &cur_type, &cur, &stop_type, &stop); + reverse = rate < 0; + GST_DEBUG_OBJECT (seekd->comp, "start:%" GST_TIME_FORMAT " -- stop:%" GST_TIME_FORMAT " flags:%d", GST_TIME_ARGS (cur), GST_TIME_ARGS (stop), flags); + if (!initializing_stack) { + segment_start = cur; + segment_stop = stop; + } else { + /* During plain playback (no seek), the segment->stop doesn't + * evolve when going from stack to stack, only the start does + * (in reverse playback, the logic is reversed) */ + segment_start = reverse ? priv->segment->start : cur; + segment_stop = reverse ? stop : priv->segment->stop; + } + gst_segment_do_seek (priv->segment, - rate, format, flags, cur_type, cur, stop_type, stop, NULL); + rate, format, flags, cur_type, segment_start, stop_type, segment_stop, + NULL); + gst_segment_do_seek (priv->seek_segment, rate, format, flags, cur_type, cur, stop_type, stop, NULL); @@ -576,8 +614,9 @@ _seek_pipeline_func (NleComposition * comp, SeekData * seekd) } #endif - _post_start_composition_update (seekd->comp, - gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK); + if (!initializing_stack) + _post_start_composition_update (seekd->comp, + gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK); /* crop the segment start/stop values */ /* Only crop segment start value if we don't have a default object */ @@ -587,15 +626,21 @@ _seek_pipeline_func (NleComposition * comp, SeekData * seekd) priv->segment->stop = MIN (priv->segment->stop, NLE_OBJECT_STOP (seekd->comp)); - priv->next_base_time = 0; - comp->priv->flush_seqnum = comp->priv->seek_seqnum = - gst_event_get_seqnum (seekd->event); - seek_handling (seekd->comp, gst_event_get_seqnum (seekd->event), - COMP_UPDATE_STACK_ON_SEEK); + if (initializing_stack) { + GST_INFO_OBJECT (seekd->comp, "Pausing task to run initializing seek."); + _pause_task (seekd->comp); + } else { + priv->next_base_time = 0; + comp->priv->flush_seqnum = comp->priv->seek_seqnum = + gst_event_get_seqnum (seekd->event); + } - _post_start_composition_update_done (seekd->comp, - gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK); + seek_handling (seekd->comp, gst_event_get_seqnum (seekd->event), reason); + + if (!initializing_stack) + _post_start_composition_update_done (seekd->comp, + gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK); } /* Must be called with OBJECTS_LOCK taken */ @@ -838,7 +883,7 @@ _add_action_locked (NleComposition * comp, GCallback func, GST_INFO_OBJECT (comp, "Adding Action for function: %p:%s", action, GST_DEBUG_FUNCPTR_NAME (func)); - if (func == G_CALLBACK (_emit_commited_signal_func)) + if (priority == G_PRIORITY_HIGH) priv->actions = g_list_prepend (priv->actions, action); else priv->actions = g_list_append (priv->actions, action); @@ -858,6 +903,17 @@ _add_action (NleComposition * comp, GCallback func, ACTIONS_UNLOCK (comp); } +static SeekData * +create_seek_data (NleComposition * comp, GstEvent * event) +{ + SeekData *seekd = g_slice_new0 (SeekData); + + seekd->comp = comp; + seekd->event = event; + + return seekd; +} + static void _add_seek_action (NleComposition * comp, GstEvent * event) { @@ -904,11 +960,8 @@ _add_seek_action (NleComposition * comp, GstEvent * event) } } - GST_DEBUG_OBJECT (comp, "Adding Action"); - - seekd = g_slice_new0 (SeekData); - seekd->comp = comp; - seekd->event = event; + GST_DEBUG_OBJECT (comp, "Adding seek Action"); + seekd = create_seek_data (comp, event); comp->priv->next_eos_seqnum = 0; comp->priv->real_eos_seqnum = 0; @@ -1160,6 +1213,7 @@ nle_composition_dispose (GObject * object) g_list_free (priv->objects_stop); g_list_free_full (priv->actions, (GDestroyNotify) _remove_each_action); + g_clear_object (&priv->stack_initialization_seek); nle_composition_reset_target_pad (comp); @@ -1261,7 +1315,6 @@ nle_composition_reset (NleComposition * comp) nle_composition_reset_target_pad (comp); priv->initialized = FALSE; - priv->send_stream_start = TRUE; priv->real_eos_seqnum = 0; priv->seek_seqnum = 0; priv->next_eos_seqnum = 0; @@ -1280,8 +1333,33 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED, NleCompositionPrivate *priv = comp->priv; GstEvent *event; - if (GST_IS_BUFFER (info->data) || - (GST_IS_QUERY (info->data) && GST_QUERY_IS_SERIALIZED (info->data))) { + if (GST_IS_BUFFER (info->data) || (GST_IS_QUERY (info->data) + && GST_QUERY_IS_SERIALIZED (info->data))) { + + if (priv->stack_initialization_seek) { + if (g_atomic_int_compare_and_exchange + (&priv->stack_initialization_seek_sent, FALSE, TRUE)) { + _add_action (comp, G_CALLBACK (_seek_pipeline_func), + create_seek_data (comp, + gst_event_ref (priv->stack_initialization_seek)), + G_PRIORITY_HIGH); + + GST_OBJECT_LOCK (comp); + if (comp->task) + gst_task_start (comp->task); + GST_OBJECT_UNLOCK (comp); + + priv->send_stream_start = + priv->updating_reason == COMP_UPDATE_STACK_INITIALIZE; + } + + GST_DEBUG_OBJECT (comp, + "Dropping %" GST_PTR_FORMAT " while sending initializing stack seek", + info->data); + + return GST_PAD_PROBE_DROP; + } + if (priv->waiting_serialized_query_or_buffer) { GST_INFO_OBJECT (comp, "update_pipeline DONE"); _restart_task (comp); @@ -1299,6 +1377,12 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED, if (_is_ready_to_restart_task (comp, event)) _restart_task (comp); + if (g_atomic_int_compare_and_exchange + (&priv->stack_initialization_seek_sent, TRUE, FALSE)) { + GST_INFO_OBJECT (comp, "Done seeking initialization stack."); + gst_clear_event (&priv->stack_initialization_seek); + } + if (gst_event_get_seqnum (event) != comp->priv->flush_seqnum) { GST_INFO_OBJECT (comp, "Dropping FLUSH_STOP %d -- %d", gst_event_get_seqnum (event), priv->flush_seqnum); @@ -1353,6 +1437,15 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED, retval = GST_PAD_PROBE_DROP; } break; + case GST_EVENT_CAPS: + { + if (priv->stack_initialization_seek) { + GST_INFO_OBJECT (comp, + "Waiting for preroll to send initializing seek, dropping caps."); + return GST_PAD_PROBE_DROP; + } + break; + } case GST_EVENT_SEGMENT: { guint64 rstart, rstop; @@ -1361,6 +1454,11 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED, GstEvent *event2; /* next_base_time */ + if (priv->stack_initialization_seek) { + GST_INFO_OBJECT (comp, "Waiting for preroll to send initializing seek"); + return GST_PAD_PROBE_DROP; + } + if (_is_ready_to_restart_task (comp, event)) _restart_task (comp); @@ -1495,7 +1593,7 @@ nle_composition_commit_func (NleObject * object, gboolean recurse) */ static GstEvent * get_new_seek_event (NleComposition * comp, gboolean initial, - gboolean updatestoponly) + gboolean updatestoponly, NleUpdateStackReason reason) { GstSeekFlags flags = GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH; gint64 start, stop; @@ -1517,12 +1615,18 @@ get_new_seek_event (NleComposition * comp, gboolean initial, GST_TIME_FORMAT, GST_TIME_ARGS (priv->segment->stop), GST_TIME_ARGS (priv->current_stack_stop)); - start = GST_CLOCK_TIME_IS_VALID (priv->segment->start) - ? MAX (priv->segment->start, priv->current_stack_start) - : priv->current_stack_start; - stop = GST_CLOCK_TIME_IS_VALID (priv->segment->stop) - ? MIN (priv->segment->stop, priv->current_stack_stop) - : priv->current_stack_stop; + if (reason == COMP_UPDATE_STACK_INITIALIZE + || reason == COMP_UPDATE_STACK_ON_EOS) { + start = priv->current_stack_start; + stop = priv->current_stack_stop; + } else { + start = GST_CLOCK_TIME_IS_VALID (priv->segment->start) + ? MAX (priv->segment->start, priv->current_stack_start) + : priv->current_stack_start; + stop = GST_CLOCK_TIME_IS_VALID (priv->segment->stop) + ? MIN (priv->segment->stop, priv->current_stack_stop) + : priv->current_stack_stop; + } if (updatestoponly) { starttype = GST_SEEK_TYPE_NONE; @@ -1673,6 +1777,7 @@ _seek_current_stack (NleComposition * comp, GstEvent * event, GST_INFO_OBJECT (comp, "Seeking itself %" GST_PTR_FORMAT, event); if (!peer) { + gst_event_unref (event); GST_ERROR_OBJECT (comp, "Can't seek because no pad available - " "no children in the composition ready to be used, the duration is 0, " "or not committed yet"); @@ -1719,7 +1824,8 @@ seek_handling (NleComposition * comp, gint32 seqnum, update_pipeline (comp, comp->priv->segment->stop, seqnum, update_stack_reason); } else { - GstEvent *toplevel_seek = get_new_seek_event (comp, FALSE, FALSE); + GstEvent *toplevel_seek = get_new_seek_event (comp, FALSE, FALSE, + update_stack_reason); gst_event_set_seqnum (toplevel_seek, seqnum); _set_real_eos_seqnum_from_seek (comp, toplevel_seek); @@ -1873,7 +1979,6 @@ nle_composition_reset_target_pad (NleComposition * comp) nle_object_ghost_pad_set_target (NLE_OBJECT (comp), NLE_OBJECT_SRC (comp), NULL); - priv->send_stream_start = TRUE; } /* nle_composition_ghost_pad_set_target: @@ -2243,7 +2348,7 @@ get_clean_toplevel_stack (NleComposition * comp, GstClockTime * timestamp, return NULL; } - GST_DEBUG ("start:%" GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT, + GST_DEBUG_OBJECT (comp, "start:%" GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop)); if (stack) { @@ -2366,6 +2471,7 @@ _restart_task (NleComposition * comp) comp->priv->seqnum_to_restart_task = 0; comp->priv->waiting_serialized_query_or_buffer = FALSE; + gst_clear_event (&comp->priv->stack_initialization_seek); comp->priv->updating_reason = COMP_UPDATE_STACK_NONE; GST_OBJECT_LOCK (comp); @@ -2817,7 +2923,6 @@ _relink_single_node (NleComposition * comp, GNode * node, NleObject *newparent; GNode *node_it; GstPad *srcpad = NULL, *sinkpad = NULL; - GstEvent *translated_seek; if (G_UNLIKELY (!node)) return; @@ -2839,11 +2944,6 @@ _relink_single_node (NleComposition * comp, GNode * node, gst_bin_add (GST_BIN (comp->priv->current_bin), GST_ELEMENT (newobj)); gst_element_sync_state_with_parent (GST_ELEMENT_CAST (newobj)); - translated_seek = nle_object_translate_incoming_seek (newobj, - gst_event_ref (toplevel_seek)); - - gst_element_send_event (GST_ELEMENT (newobj), translated_seek); - /* link to parent if needed. */ if (newparent) { _link_to_parent (comp, newobj, newparent); @@ -2976,7 +3076,7 @@ beach: } static inline gboolean -_activate_new_stack (NleComposition * comp) +_activate_new_stack (NleComposition * comp, GstEvent * toplevel_seek) { GstPad *pad; GstElement *topelement; @@ -2992,10 +3092,15 @@ _activate_new_stack (NleComposition * comp) GST_DEBUG_OBJECT (comp, "Nothing else in the composition" ", update 'worked'"); + gst_event_unref (toplevel_seek); goto resync_state; } - /* The stack is entirely ready, send seek out synchronously */ + /* The stack is entirely ready, stack initializing seek once ready */ + GST_INFO_OBJECT (comp, "Activating stack with seek: %" GST_PTR_FORMAT, + toplevel_seek); + comp->priv->stack_initialization_seek = toplevel_seek; + topelement = GST_ELEMENT (priv->current->data); /* Get toplevel object source pad */ pad = NLE_OBJECT_SRC (topelement); @@ -3008,6 +3113,7 @@ _activate_new_stack (NleComposition * comp) GST_DEBUG_OBJECT (comp, "New stack activated!"); resync_state: + g_atomic_int_set (&priv->stack_initialization_seek_sent, FALSE); gst_element_set_locked_state (priv->current_bin, FALSE); GST_DEBUG ("going back to parent state"); @@ -3226,7 +3332,8 @@ update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum, } #endif - toplevel_seek = get_new_seek_event (comp, TRUE, updatestoponly); + toplevel_seek = + get_new_seek_event (comp, TRUE, updatestoponly, update_reason); gst_event_set_seqnum (toplevel_seek, seqnum); _set_real_eos_seqnum_from_seek (comp, toplevel_seek); @@ -3236,7 +3343,7 @@ update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum, if (tear_down) { _dump_stack (comp, update_reason, stack); _deactivate_stack (comp, update_reason); - _relink_new_stack (comp, stack, toplevel_seek); + _relink_new_stack (comp, stack, gst_event_ref (toplevel_seek)); } /* Unlock all elements in new stack */ @@ -3258,24 +3365,17 @@ update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum, comp->priv->updating_reason = update_reason; comp->priv->seqnum_to_restart_task = seqnum; - GST_OBJECT_LOCK (comp); - if (comp->task == NULL) { - GST_INFO_OBJECT (comp, - "No task set, it must have been stopped, returning"); - GST_OBJECT_UNLOCK (comp); - return FALSE; + if (!_pause_task (comp)) { + gst_event_unref (toplevel_seek); + return FALSE; } - - gst_task_pause (comp->task); - GST_OBJECT_UNLOCK (comp); } /* Activate stack */ if (tear_down) - return _activate_new_stack (comp); - else - return _seek_current_stack (comp, toplevel_seek, - _have_to_flush_downstream (update_reason)); + return _activate_new_stack (comp, toplevel_seek); + return _seek_current_stack (comp, toplevel_seek, + _have_to_flush_downstream (update_reason)); } static gboolean diff --git a/plugins/nle/nlesource.c b/plugins/nle/nlesource.c index 36ad2779f9..8fc9e711d4 100644 --- a/plugins/nle/nlesource.c +++ b/plugins/nle/nlesource.c @@ -443,7 +443,7 @@ nle_source_send_event (GstElement * element, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: g_mutex_lock (&source->priv->seek_lock); - source->priv->seek_event = event; + gst_event_replace (&source->priv->seek_event, event); g_mutex_unlock (&source->priv->seek_lock); break; default: @@ -459,6 +459,7 @@ ghost_seek_pad (GstElement * source, gpointer user_data) { NleSourcePrivate *priv = NLE_SOURCE (source)->priv; + g_assert (!NLE_OBJECT (source)->in_composition); g_mutex_lock (&priv->seek_lock); if (priv->seek_event) { GstEvent *seek_event = priv->seek_event; @@ -482,7 +483,7 @@ pad_brobe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source) GstPadProbeReturn res = GST_PAD_PROBE_OK; GST_OBJECT_LOCK (source); - if (!priv->areblocked) { + if (!priv->areblocked && priv->seek_event) { GST_INFO_OBJECT (pad, "Blocked now, launching seek"); priv->areblocked = TRUE; gst_element_call_async (GST_ELEMENT (source), ghost_seek_pad, NULL, NULL); @@ -492,9 +493,6 @@ pad_brobe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source) } if (priv->probeid && GST_EVENT_SEQNUM (info->data) == priv->flush_seqnum) { - GST_INFO_OBJECT (source, "Seeking now done: %" GST_PTR_FORMAT - " - %d ? %d", info->data, GST_EVENT_SEQNUM (info->data), - priv->flush_seqnum); priv->flush_seqnum = GST_SEQNUM_INVALID; priv->areblocked = FALSE; priv->probeid = 0; @@ -526,33 +524,35 @@ nle_source_prepare (NleObject * object) return FALSE; } - if (object->in_composition == FALSE) { - gst_element_send_event (GST_ELEMENT_CAST (parent), - gst_event_new_seek (1.0, GST_FORMAT_TIME, - GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH, - GST_SEEK_TYPE_SET, object->start, GST_SEEK_TYPE_SET, object->stop)); - } - - GST_LOG_OBJECT (source, "srcpad:%p, dynamicpads:%d", - object->srcpad, priv->dynamicpads); - if (!priv->staticpad && !(get_valid_src_pad (source, source->element, &pad))) { GST_DEBUG_OBJECT (source, "Couldn't find a valid source pad"); gst_object_unref (parent); return FALSE; - } else { - if (priv->staticpad) - pad = gst_object_ref (priv->staticpad); - priv->ghostedpad = pad; + } + + if (priv->staticpad) + pad = gst_object_ref (priv->staticpad); + priv->ghostedpad = pad; + + if (object->in_composition == FALSE) { + g_mutex_lock (&source->priv->seek_lock); + source->priv->seek_event = + gst_event_new_seek (1.0, GST_FORMAT_TIME, + GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH, + GST_SEEK_TYPE_SET, object->start, GST_SEEK_TYPE_SET, object->stop); + g_mutex_unlock (&source->priv->seek_lock); GST_OBJECT_LOCK (source); priv->probeid = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, (GstPadProbeCallback) pad_brobe_cb, source, NULL); GST_OBJECT_UNLOCK (source); - gst_object_unref (pad); } + GST_LOG_OBJECT (source, "srcpad:%p, dynamicpads:%d", + object->srcpad, priv->dynamicpads); + + gst_object_unref (pad); gst_object_unref (parent); return TRUE;