nle: Seek the whole stack on initialization

Instead of seeking each nleobject separately to setup new stack, wait
for the whole stack to preroll and then seek that newly setup stack,
leading to the same code path and seek 'tweaking' as when processing
a seek on the composition (without stack changes).

This is mandatory to properly handle filter that tweak segments to handle
time remapping for example.
This commit is contained in:
Thibault Saunier 2020-02-06 12:39:12 -03:00
parent 17500f6836
commit c63586e0a7
2 changed files with 174 additions and 74 deletions

View file

@ -79,7 +79,7 @@ typedef enum
} NleUpdateStackReason;
static const char *UPDATE_PIPELINE_REASONS[] = {
"Initialize", "Commit", "EOS", "Seek"
"Initialize", "Commit", "EOS", "Seek", "None"
};
typedef struct
@ -194,6 +194,8 @@ struct _NleCompositionPrivate
/* 0 means that we already received the right caps or segment */
gint seqnum_to_restart_task;
gboolean waiting_serialized_query_or_buffer;
GstEvent *stack_initialization_seek;
volatile gboolean stack_initialization_seek_sent;
gboolean tearing_down_stack;
gboolean suppress_child_error;
@ -258,7 +260,7 @@ static void _update_pipeline_func (NleComposition * comp,
static void _commit_func (NleComposition * comp,
UpdateCompositionData * ucompo);
static GstEvent *get_new_seek_event (NleComposition * comp, gboolean initial,
gboolean updatestoponly);
gboolean updatestoponly, NleUpdateStackReason reason);
static gboolean _nle_composition_add_object (NleComposition * comp,
NleObject * object);
static gboolean _nle_composition_remove_object (NleComposition * comp,
@ -453,6 +455,22 @@ _start_task (NleComposition * comp)
GST_OBJECT_UNLOCK (comp);
}
static gboolean
_pause_task (NleComposition * comp)
{
GST_OBJECT_LOCK (comp);
if (comp->task == NULL) {
GST_INFO_OBJECT (comp, "No task set, it must have been stopped, returning");
GST_OBJECT_UNLOCK (comp);
return FALSE;
}
gst_task_pause (comp->task);
GST_OBJECT_UNLOCK (comp);
return TRUE;
}
static gboolean
_stop_task (NleComposition * comp)
{
@ -546,16 +564,36 @@ _seek_pipeline_func (NleComposition * comp, SeekData * seekd)
GstSeekType cur_type, stop_type;
gint64 cur, stop;
NleCompositionPrivate *priv = comp->priv;
gboolean initializing_stack = priv->stack_initialization_seek == seekd->event;
NleUpdateStackReason reason =
initializing_stack ? COMP_UPDATE_STACK_NONE : COMP_UPDATE_STACK_ON_SEEK;
GstClockTime segment_start, segment_stop;
gboolean reverse;
gst_event_parse_seek (seekd->event, &rate, &format, &flags,
&cur_type, &cur, &stop_type, &stop);
reverse = rate < 0;
GST_DEBUG_OBJECT (seekd->comp,
"start:%" GST_TIME_FORMAT " -- stop:%" GST_TIME_FORMAT " flags:%d",
GST_TIME_ARGS (cur), GST_TIME_ARGS (stop), flags);
if (!initializing_stack) {
segment_start = cur;
segment_stop = stop;
} else {
/* During plain playback (no seek), the segment->stop doesn't
* evolve when going from stack to stack, only the start does
* (in reverse playback, the logic is reversed) */
segment_start = reverse ? priv->segment->start : cur;
segment_stop = reverse ? stop : priv->segment->stop;
}
gst_segment_do_seek (priv->segment,
rate, format, flags, cur_type, cur, stop_type, stop, NULL);
rate, format, flags, cur_type, segment_start, stop_type, segment_stop,
NULL);
gst_segment_do_seek (priv->seek_segment,
rate, format, flags, cur_type, cur, stop_type, stop, NULL);
@ -576,8 +614,9 @@ _seek_pipeline_func (NleComposition * comp, SeekData * seekd)
}
#endif
_post_start_composition_update (seekd->comp,
gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK);
if (!initializing_stack)
_post_start_composition_update (seekd->comp,
gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK);
/* crop the segment start/stop values */
/* Only crop segment start value if we don't have a default object */
@ -587,15 +626,21 @@ _seek_pipeline_func (NleComposition * comp, SeekData * seekd)
priv->segment->stop =
MIN (priv->segment->stop, NLE_OBJECT_STOP (seekd->comp));
priv->next_base_time = 0;
comp->priv->flush_seqnum = comp->priv->seek_seqnum =
gst_event_get_seqnum (seekd->event);
seek_handling (seekd->comp, gst_event_get_seqnum (seekd->event),
COMP_UPDATE_STACK_ON_SEEK);
if (initializing_stack) {
GST_INFO_OBJECT (seekd->comp, "Pausing task to run initializing seek.");
_pause_task (seekd->comp);
} else {
priv->next_base_time = 0;
comp->priv->flush_seqnum = comp->priv->seek_seqnum =
gst_event_get_seqnum (seekd->event);
}
_post_start_composition_update_done (seekd->comp,
gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK);
seek_handling (seekd->comp, gst_event_get_seqnum (seekd->event), reason);
if (!initializing_stack)
_post_start_composition_update_done (seekd->comp,
gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK);
}
/* Must be called with OBJECTS_LOCK taken */
@ -838,7 +883,7 @@ _add_action_locked (NleComposition * comp, GCallback func,
GST_INFO_OBJECT (comp, "Adding Action for function: %p:%s",
action, GST_DEBUG_FUNCPTR_NAME (func));
if (func == G_CALLBACK (_emit_commited_signal_func))
if (priority == G_PRIORITY_HIGH)
priv->actions = g_list_prepend (priv->actions, action);
else
priv->actions = g_list_append (priv->actions, action);
@ -858,6 +903,17 @@ _add_action (NleComposition * comp, GCallback func,
ACTIONS_UNLOCK (comp);
}
static SeekData *
create_seek_data (NleComposition * comp, GstEvent * event)
{
SeekData *seekd = g_slice_new0 (SeekData);
seekd->comp = comp;
seekd->event = event;
return seekd;
}
static void
_add_seek_action (NleComposition * comp, GstEvent * event)
{
@ -904,11 +960,8 @@ _add_seek_action (NleComposition * comp, GstEvent * event)
}
}
GST_DEBUG_OBJECT (comp, "Adding Action");
seekd = g_slice_new0 (SeekData);
seekd->comp = comp;
seekd->event = event;
GST_DEBUG_OBJECT (comp, "Adding seek Action");
seekd = create_seek_data (comp, event);
comp->priv->next_eos_seqnum = 0;
comp->priv->real_eos_seqnum = 0;
@ -1160,6 +1213,7 @@ nle_composition_dispose (GObject * object)
g_list_free (priv->objects_stop);
g_list_free_full (priv->actions, (GDestroyNotify) _remove_each_action);
g_clear_object (&priv->stack_initialization_seek);
nle_composition_reset_target_pad (comp);
@ -1261,7 +1315,6 @@ nle_composition_reset (NleComposition * comp)
nle_composition_reset_target_pad (comp);
priv->initialized = FALSE;
priv->send_stream_start = TRUE;
priv->real_eos_seqnum = 0;
priv->seek_seqnum = 0;
priv->next_eos_seqnum = 0;
@ -1280,8 +1333,33 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
NleCompositionPrivate *priv = comp->priv;
GstEvent *event;
if (GST_IS_BUFFER (info->data) ||
(GST_IS_QUERY (info->data) && GST_QUERY_IS_SERIALIZED (info->data))) {
if (GST_IS_BUFFER (info->data) || (GST_IS_QUERY (info->data)
&& GST_QUERY_IS_SERIALIZED (info->data))) {
if (priv->stack_initialization_seek) {
if (g_atomic_int_compare_and_exchange
(&priv->stack_initialization_seek_sent, FALSE, TRUE)) {
_add_action (comp, G_CALLBACK (_seek_pipeline_func),
create_seek_data (comp,
gst_event_ref (priv->stack_initialization_seek)),
G_PRIORITY_HIGH);
GST_OBJECT_LOCK (comp);
if (comp->task)
gst_task_start (comp->task);
GST_OBJECT_UNLOCK (comp);
priv->send_stream_start =
priv->updating_reason == COMP_UPDATE_STACK_INITIALIZE;
}
GST_DEBUG_OBJECT (comp,
"Dropping %" GST_PTR_FORMAT " while sending initializing stack seek",
info->data);
return GST_PAD_PROBE_DROP;
}
if (priv->waiting_serialized_query_or_buffer) {
GST_INFO_OBJECT (comp, "update_pipeline DONE");
_restart_task (comp);
@ -1299,6 +1377,12 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
if (_is_ready_to_restart_task (comp, event))
_restart_task (comp);
if (g_atomic_int_compare_and_exchange
(&priv->stack_initialization_seek_sent, TRUE, FALSE)) {
GST_INFO_OBJECT (comp, "Done seeking initialization stack.");
gst_clear_event (&priv->stack_initialization_seek);
}
if (gst_event_get_seqnum (event) != comp->priv->flush_seqnum) {
GST_INFO_OBJECT (comp, "Dropping FLUSH_STOP %d -- %d",
gst_event_get_seqnum (event), priv->flush_seqnum);
@ -1353,6 +1437,15 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
retval = GST_PAD_PROBE_DROP;
}
break;
case GST_EVENT_CAPS:
{
if (priv->stack_initialization_seek) {
GST_INFO_OBJECT (comp,
"Waiting for preroll to send initializing seek, dropping caps.");
return GST_PAD_PROBE_DROP;
}
break;
}
case GST_EVENT_SEGMENT:
{
guint64 rstart, rstop;
@ -1361,6 +1454,11 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
GstEvent *event2;
/* next_base_time */
if (priv->stack_initialization_seek) {
GST_INFO_OBJECT (comp, "Waiting for preroll to send initializing seek");
return GST_PAD_PROBE_DROP;
}
if (_is_ready_to_restart_task (comp, event))
_restart_task (comp);
@ -1495,7 +1593,7 @@ nle_composition_commit_func (NleObject * object, gboolean recurse)
*/
static GstEvent *
get_new_seek_event (NleComposition * comp, gboolean initial,
gboolean updatestoponly)
gboolean updatestoponly, NleUpdateStackReason reason)
{
GstSeekFlags flags = GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH;
gint64 start, stop;
@ -1517,12 +1615,18 @@ get_new_seek_event (NleComposition * comp, gboolean initial,
GST_TIME_FORMAT, GST_TIME_ARGS (priv->segment->stop),
GST_TIME_ARGS (priv->current_stack_stop));
start = GST_CLOCK_TIME_IS_VALID (priv->segment->start)
? MAX (priv->segment->start, priv->current_stack_start)
: priv->current_stack_start;
stop = GST_CLOCK_TIME_IS_VALID (priv->segment->stop)
? MIN (priv->segment->stop, priv->current_stack_stop)
: priv->current_stack_stop;
if (reason == COMP_UPDATE_STACK_INITIALIZE
|| reason == COMP_UPDATE_STACK_ON_EOS) {
start = priv->current_stack_start;
stop = priv->current_stack_stop;
} else {
start = GST_CLOCK_TIME_IS_VALID (priv->segment->start)
? MAX (priv->segment->start, priv->current_stack_start)
: priv->current_stack_start;
stop = GST_CLOCK_TIME_IS_VALID (priv->segment->stop)
? MIN (priv->segment->stop, priv->current_stack_stop)
: priv->current_stack_stop;
}
if (updatestoponly) {
starttype = GST_SEEK_TYPE_NONE;
@ -1673,6 +1777,7 @@ _seek_current_stack (NleComposition * comp, GstEvent * event,
GST_INFO_OBJECT (comp, "Seeking itself %" GST_PTR_FORMAT, event);
if (!peer) {
gst_event_unref (event);
GST_ERROR_OBJECT (comp, "Can't seek because no pad available - "
"no children in the composition ready to be used, the duration is 0, "
"or not committed yet");
@ -1719,7 +1824,8 @@ seek_handling (NleComposition * comp, gint32 seqnum,
update_pipeline (comp, comp->priv->segment->stop, seqnum,
update_stack_reason);
} else {
GstEvent *toplevel_seek = get_new_seek_event (comp, FALSE, FALSE);
GstEvent *toplevel_seek = get_new_seek_event (comp, FALSE, FALSE,
update_stack_reason);
gst_event_set_seqnum (toplevel_seek, seqnum);
_set_real_eos_seqnum_from_seek (comp, toplevel_seek);
@ -1873,7 +1979,6 @@ nle_composition_reset_target_pad (NleComposition * comp)
nle_object_ghost_pad_set_target (NLE_OBJECT (comp),
NLE_OBJECT_SRC (comp), NULL);
priv->send_stream_start = TRUE;
}
/* nle_composition_ghost_pad_set_target:
@ -2243,7 +2348,7 @@ get_clean_toplevel_stack (NleComposition * comp, GstClockTime * timestamp,
return NULL;
}
GST_DEBUG ("start:%" GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT,
GST_DEBUG_OBJECT (comp, "start:%" GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT,
GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
if (stack) {
@ -2366,6 +2471,7 @@ _restart_task (NleComposition * comp)
comp->priv->seqnum_to_restart_task = 0;
comp->priv->waiting_serialized_query_or_buffer = FALSE;
gst_clear_event (&comp->priv->stack_initialization_seek);
comp->priv->updating_reason = COMP_UPDATE_STACK_NONE;
GST_OBJECT_LOCK (comp);
@ -2817,7 +2923,6 @@ _relink_single_node (NleComposition * comp, GNode * node,
NleObject *newparent;
GNode *node_it;
GstPad *srcpad = NULL, *sinkpad = NULL;
GstEvent *translated_seek;
if (G_UNLIKELY (!node))
return;
@ -2839,11 +2944,6 @@ _relink_single_node (NleComposition * comp, GNode * node,
gst_bin_add (GST_BIN (comp->priv->current_bin), GST_ELEMENT (newobj));
gst_element_sync_state_with_parent (GST_ELEMENT_CAST (newobj));
translated_seek = nle_object_translate_incoming_seek (newobj,
gst_event_ref (toplevel_seek));
gst_element_send_event (GST_ELEMENT (newobj), translated_seek);
/* link to parent if needed. */
if (newparent) {
_link_to_parent (comp, newobj, newparent);
@ -2976,7 +3076,7 @@ beach:
}
static inline gboolean
_activate_new_stack (NleComposition * comp)
_activate_new_stack (NleComposition * comp, GstEvent * toplevel_seek)
{
GstPad *pad;
GstElement *topelement;
@ -2992,10 +3092,15 @@ _activate_new_stack (NleComposition * comp)
GST_DEBUG_OBJECT (comp, "Nothing else in the composition"
", update 'worked'");
gst_event_unref (toplevel_seek);
goto resync_state;
}
/* The stack is entirely ready, send seek out synchronously */
/* The stack is entirely ready, stack initializing seek once ready */
GST_INFO_OBJECT (comp, "Activating stack with seek: %" GST_PTR_FORMAT,
toplevel_seek);
comp->priv->stack_initialization_seek = toplevel_seek;
topelement = GST_ELEMENT (priv->current->data);
/* Get toplevel object source pad */
pad = NLE_OBJECT_SRC (topelement);
@ -3008,6 +3113,7 @@ _activate_new_stack (NleComposition * comp)
GST_DEBUG_OBJECT (comp, "New stack activated!");
resync_state:
g_atomic_int_set (&priv->stack_initialization_seek_sent, FALSE);
gst_element_set_locked_state (priv->current_bin, FALSE);
GST_DEBUG ("going back to parent state");
@ -3226,7 +3332,8 @@ update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum,
}
#endif
toplevel_seek = get_new_seek_event (comp, TRUE, updatestoponly);
toplevel_seek =
get_new_seek_event (comp, TRUE, updatestoponly, update_reason);
gst_event_set_seqnum (toplevel_seek, seqnum);
_set_real_eos_seqnum_from_seek (comp, toplevel_seek);
@ -3236,7 +3343,7 @@ update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum,
if (tear_down) {
_dump_stack (comp, update_reason, stack);
_deactivate_stack (comp, update_reason);
_relink_new_stack (comp, stack, toplevel_seek);
_relink_new_stack (comp, stack, gst_event_ref (toplevel_seek));
}
/* Unlock all elements in new stack */
@ -3258,24 +3365,17 @@ update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum,
comp->priv->updating_reason = update_reason;
comp->priv->seqnum_to_restart_task = seqnum;
GST_OBJECT_LOCK (comp);
if (comp->task == NULL) {
GST_INFO_OBJECT (comp,
"No task set, it must have been stopped, returning");
GST_OBJECT_UNLOCK (comp);
return FALSE;
if (!_pause_task (comp)) {
gst_event_unref (toplevel_seek);
return FALSE;
}
gst_task_pause (comp->task);
GST_OBJECT_UNLOCK (comp);
}
/* Activate stack */
if (tear_down)
return _activate_new_stack (comp);
else
return _seek_current_stack (comp, toplevel_seek,
_have_to_flush_downstream (update_reason));
return _activate_new_stack (comp, toplevel_seek);
return _seek_current_stack (comp, toplevel_seek,
_have_to_flush_downstream (update_reason));
}
static gboolean

View file

@ -443,7 +443,7 @@ nle_source_send_event (GstElement * element, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
g_mutex_lock (&source->priv->seek_lock);
source->priv->seek_event = event;
gst_event_replace (&source->priv->seek_event, event);
g_mutex_unlock (&source->priv->seek_lock);
break;
default:
@ -459,6 +459,7 @@ ghost_seek_pad (GstElement * source, gpointer user_data)
{
NleSourcePrivate *priv = NLE_SOURCE (source)->priv;
g_assert (!NLE_OBJECT (source)->in_composition);
g_mutex_lock (&priv->seek_lock);
if (priv->seek_event) {
GstEvent *seek_event = priv->seek_event;
@ -482,7 +483,7 @@ pad_brobe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source)
GstPadProbeReturn res = GST_PAD_PROBE_OK;
GST_OBJECT_LOCK (source);
if (!priv->areblocked) {
if (!priv->areblocked && priv->seek_event) {
GST_INFO_OBJECT (pad, "Blocked now, launching seek");
priv->areblocked = TRUE;
gst_element_call_async (GST_ELEMENT (source), ghost_seek_pad, NULL, NULL);
@ -492,9 +493,6 @@ pad_brobe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source)
}
if (priv->probeid && GST_EVENT_SEQNUM (info->data) == priv->flush_seqnum) {
GST_INFO_OBJECT (source, "Seeking now done: %" GST_PTR_FORMAT
" - %d ? %d", info->data, GST_EVENT_SEQNUM (info->data),
priv->flush_seqnum);
priv->flush_seqnum = GST_SEQNUM_INVALID;
priv->areblocked = FALSE;
priv->probeid = 0;
@ -526,33 +524,35 @@ nle_source_prepare (NleObject * object)
return FALSE;
}
if (object->in_composition == FALSE) {
gst_element_send_event (GST_ELEMENT_CAST (parent),
gst_event_new_seek (1.0, GST_FORMAT_TIME,
GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, object->start, GST_SEEK_TYPE_SET, object->stop));
}
GST_LOG_OBJECT (source, "srcpad:%p, dynamicpads:%d",
object->srcpad, priv->dynamicpads);
if (!priv->staticpad && !(get_valid_src_pad (source, source->element, &pad))) {
GST_DEBUG_OBJECT (source, "Couldn't find a valid source pad");
gst_object_unref (parent);
return FALSE;
} else {
if (priv->staticpad)
pad = gst_object_ref (priv->staticpad);
priv->ghostedpad = pad;
}
if (priv->staticpad)
pad = gst_object_ref (priv->staticpad);
priv->ghostedpad = pad;
if (object->in_composition == FALSE) {
g_mutex_lock (&source->priv->seek_lock);
source->priv->seek_event =
gst_event_new_seek (1.0, GST_FORMAT_TIME,
GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH,
GST_SEEK_TYPE_SET, object->start, GST_SEEK_TYPE_SET, object->stop);
g_mutex_unlock (&source->priv->seek_lock);
GST_OBJECT_LOCK (source);
priv->probeid = gst_pad_add_probe (pad,
GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, (GstPadProbeCallback) pad_brobe_cb,
source, NULL);
GST_OBJECT_UNLOCK (source);
gst_object_unref (pad);
}
GST_LOG_OBJECT (source, "srcpad:%p, dynamicpads:%d",
object->srcpad, priv->dynamicpads);
gst_object_unref (pad);
gst_object_unref (parent);
return TRUE;