nlecomposition: Restart the main task on FLUSH_STOP

It means stop using a dedicated probe to restart task so that the main probe does not
drop the FLUSH_STOP event before we have a chance to restart the task. (and this is
for sure cleaner/and simpler to read).
This commit is contained in:
Thibault Saunier 2014-09-19 12:49:52 +02:00
parent 37f50193fd
commit 02665dae91

View file

@ -76,7 +76,8 @@ typedef enum
COMP_UPDATE_STACK_INITIALIZE,
COMP_UPDATE_STACK_ON_COMMIT,
COMP_UPDATE_STACK_ON_EOS,
COMP_UPDATE_STACK_ON_SEEK
COMP_UPDATE_STACK_ON_SEEK,
COMP_UPDATE_STACK_NONE
} NleUpdateStackReason;
static const char *UPDATE_PIPELINE_REASONS[] = {
@ -170,13 +171,12 @@ struct _NleCompositionPrivate
gint next_eos_seqnum;
gint flush_seqnum;
/* While we do not get a buffer on our srcpad,
* we are not commited */
gulong commited_probeid;
/* 0 means that we already received the right caps or segment */
gint awaited_caps_seqnum;
gint seqnum_to_restart_task;
gboolean tearing_down_stack;
NleUpdateStackReason updating_reason;
};
typedef struct _Action
@ -243,10 +243,12 @@ static void _deactivate_stack (NleComposition * comp,
static gboolean _set_real_eos_seqnum_from_seek (NleComposition * comp,
GstEvent * event);
static void _emit_commited_signal_func (NleComposition * comp, gpointer udata);
static void _restart_task (UpdateCompositionData * ucompo);
static void _restart_task (NleComposition * comp);
static void
_add_action (NleComposition * comp, GCallback func, gpointer data,
gint priority);
static gboolean
_is_ready_to_restart_task (NleComposition * comp, GstEvent * event);
/* COMP_REAL_START: actual position to start current playback at. */
@ -383,7 +385,11 @@ _start_task (NleComposition * comp)
task = comp->task;
if (task == NULL) {
gchar *taskname =
g_strdup_printf ("%s_update_management", GST_OBJECT_NAME (comp));
task = gst_task_new ((GstTaskFunction) _execute_actions, comp, NULL);
gst_object_set_name (GST_OBJECT_CAST (task), taskname);
gst_task_set_lock (task, GET_TASK_LOCK (comp));
GST_INFO_OBJECT (comp, "created task %p", task);
comp->task = task;
@ -716,12 +722,6 @@ _add_add_object_action (NleComposition * comp, NleObject * object)
G_PRIORITY_DEFAULT);
}
static void
_free_update_compo_data (gpointer updatepipelinedata)
{
g_slice_free (UpdateCompositionData, updatepipelinedata);
}
static void
_free_action (Action * action, gpointer udata)
{
@ -1104,8 +1104,12 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP:
if (_is_ready_to_restart_task (comp, event))
_restart_task (comp);
if (gst_event_get_seqnum (event) != comp->priv->flush_seqnum) {
GST_INFO_OBJECT (comp, "Dropping flush stop");
GST_INFO_OBJECT (comp, "Dropping flush stop %d -- %d",
gst_event_get_seqnum (event), priv->seqnum_to_restart_task);
retval = GST_PAD_PROBE_DROP;
} else {
GST_INFO_OBJECT (comp, "Forwarding our flush stop with seqnum %i",
@ -1144,6 +1148,9 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
GstEvent *event2;
/* next_base_time */
if (_is_ready_to_restart_task (comp, event))
_restart_task (comp);
gst_event_parse_segment (event, &segment);
gst_segment_copy_into (segment, &copy);
@ -1173,19 +1180,11 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
GST_INFO_OBJECT (comp, "Got EOS, last EOS seqnum id : %i current "
"seq num is: %i", comp->priv->real_eos_seqnum, seqnum);
if (priv->commited_probeid && comp->priv->awaited_caps_seqnum == 0) {
UpdateCompositionData ucompo;
if (_is_ready_to_restart_task (comp, event)) {
GST_INFO_OBJECT (comp, "We got an EOS right after seeing the right"
" segment, restarting task");
ucompo.comp = comp;
GST_FIXME_OBJECT (comp, "Check it we need to emit or not!!");
ucompo.reason = COMP_UPDATE_STACK_ON_COMMIT;
ucompo.seqnum = seqnum;
gst_pad_remove_probe (NLE_OBJECT_SRC (comp), priv->commited_probeid);
_restart_task (&ucompo);
_restart_task (comp);
}
if (g_atomic_int_compare_and_exchange (&comp->priv->real_eos_seqnum,
@ -2062,70 +2061,51 @@ _emit_commited_signal_func (NleComposition * comp, gpointer udata)
}
static void
_restart_task (UpdateCompositionData * ucompo)
_restart_task (NleComposition * comp)
{
NleComposition *comp = ucompo->comp;
GST_INFO_OBJECT (comp, "Restarting task! after %s DONE",
UPDATE_PIPELINE_REASONS[ucompo->reason]);
UPDATE_PIPELINE_REASONS[comp->priv->updating_reason]);
if (ucompo->reason == COMP_UPDATE_STACK_ON_COMMIT)
if (comp->priv->updating_reason == COMP_UPDATE_STACK_ON_COMMIT)
_add_action (comp, G_CALLBACK (_emit_commited_signal_func), comp,
G_PRIORITY_HIGH);
comp->priv->awaited_caps_seqnum = 0;
comp->priv->commited_probeid = 0;
comp->priv->seqnum_to_restart_task = 0;
comp->priv->updating_reason = COMP_UPDATE_STACK_NONE;
gst_task_start (comp->task);
}
static gboolean
_is_ready_to_restart_task (UpdateCompositionData * ucompo, GstEvent * event)
_is_ready_to_restart_task (NleComposition * comp, GstEvent * event)
{
NleComposition *comp = ucompo->comp;
if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS ||
GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
gint seqnum = gst_event_get_seqnum (event);
gint seqnum = gst_event_get_seqnum (event);
if (comp->priv->awaited_caps_seqnum == seqnum) {
gchar *name = g_strdup_printf ("new-stack__%" GST_TIME_FORMAT "--%"
GST_TIME_FORMAT "", GST_TIME_ARGS (comp->priv->segment_start),
GST_TIME_ARGS (comp->priv->segment_stop));
if (comp->priv->seqnum_to_restart_task == seqnum) {
gchar *name = g_strdup_printf ("new-stack__%" GST_TIME_FORMAT "--%"
GST_TIME_FORMAT "", GST_TIME_ARGS (comp->priv->segment_start),
GST_TIME_ARGS (comp->priv->segment_stop));
GST_INFO_OBJECT (comp, "Got %s with proper seqnum"
" done with stack reconfiguration %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event);
GST_INFO_OBJECT (comp, "Got %s with proper seqnum"
" done with stack reconfiguration %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event);
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (comp),
GST_DEBUG_GRAPH_SHOW_ALL, name);
g_free (name);
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (comp),
GST_DEBUG_GRAPH_SHOW_ALL, name);
g_free (name);
return TRUE;
return TRUE;
} else {
GST_ERROR_OBJECT (comp, "WARNING: Caps seqnum %i != wanted %i",
seqnum, comp->priv->awaited_caps_seqnum);
}
} else if (comp->priv->seqnum_to_restart_task) {
GST_INFO_OBJECT (comp, "WARNING: %s seqnum %i != wanted %i",
GST_EVENT_TYPE_NAME (event), seqnum,
comp->priv->seqnum_to_restart_task);
}
return FALSE;
}
static GstPadProbeReturn
_is_update_done_cb (GstPad * pad, GstPadProbeInfo * info,
UpdateCompositionData * ucompo)
{
if (_is_ready_to_restart_task (ucompo, info->data)) {
_restart_task (ucompo);
return GST_PAD_PROBE_REMOVE;
}
return GST_PAD_PROBE_OK;
}
static void
_commit_func (NleComposition * comp, UpdateCompositionData * ucompo)
{
@ -2842,12 +2822,9 @@ update_pipeline (NleComposition * comp, GstClockTime currenttime, gint32 seqnum,
ucompo->comp = comp;
ucompo->reason = update_reason;
comp->priv->updating_reason = update_reason;
ucompo->seqnum = seqnum;
comp->priv->awaited_caps_seqnum = seqnum;
priv->commited_probeid = gst_pad_add_probe (NLE_OBJECT_SRC (comp),
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
(GstPadProbeCallback) _is_update_done_cb, ucompo,
_free_update_compo_data);
comp->priv->seqnum_to_restart_task = seqnum;
GST_OBJECT_LOCK (comp);
if (comp->task == NULL) {