composition: Implement the logic to PAUSE the task while executing actions

We need to wait for the pipeline update to be actually finished before we can start another
action. That means that we pause the task until one buffer from the new stack is
outputed.

Co-Authored by: Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
This commit is contained in:
Thibault Saunier 2014-07-07 23:07:15 +02:00
parent 6e476ddd24
commit 054569cce7

View file

@ -172,6 +172,12 @@ struct _GnlCompositionPrivate
gboolean seeking_itself;
gint real_eos_seqnum;
/* While we do not get a buffer on our srcpad,
* we are not commited */
gulong commited_probeid;
/* 0 means that we already received the right segment */
gint awaited_segment_seqnum;
};
static guint _signals[LAST_SIGNAL] = { 0 };
@ -222,6 +228,10 @@ static void _relink_single_node (GnlComposition * comp, GNode * node,
GstEvent * toplevel_seek);
static gboolean update_pipeline_func (GnlComposition * comp);
static gboolean _commit_func (GnlComposition * comp);
static gboolean lock_child_state (GValue * item, GValue * ret,
gpointer udata G_GNUC_UNUSED);
static gboolean
set_child_caps (GValue * item, GValue * ret G_GNUC_UNUSED, GnlObject * comp);
static GstEvent *get_new_seek_event (GnlComposition * comp, gboolean initial,
gboolean updatestoponly);
static gboolean
@ -298,6 +308,12 @@ struct _GnlCompositionEntry
{
GnlObject *object;
GnlComposition *comp;
/* handler id for block probe */
gulong probeid;
gulong dataprobeid;
gboolean seeked;
};
static void
@ -774,6 +790,20 @@ gnl_composition_class_init (GnlCompositionClass * klass)
static void
hash_value_destroy (GnlCompositionEntry * entry)
{
GstPad *srcpad;
GstElement *element = GST_ELEMENT (entry->object);
srcpad = GNL_OBJECT_SRC (element);
if (entry->probeid) {
gst_pad_remove_probe (srcpad, entry->probeid);
entry->probeid = 0;
}
if (entry->dataprobeid) {
gst_pad_remove_probe (srcpad, entry->dataprobeid);
entry->dataprobeid = 0;
}
g_slice_free (GnlCompositionEntry, entry);
}
@ -947,6 +977,42 @@ signal_duration_change (GnlComposition * comp)
gst_message_new_duration_changed (GST_OBJECT_CAST (comp)));
}
static gboolean
unblock_child_pads (GValue * item, GValue * ret G_GNUC_UNUSED,
GnlComposition * comp)
{
GstPad *pad;
GstElement *child = g_value_get_object (item);
GnlCompositionEntry *entry = COMP_ENTRY (comp, child);
GST_DEBUG_OBJECT (child, "unblocking pads");
pad = GNL_OBJECT_SRC (child);
if (entry->probeid) {
gst_pad_remove_probe (pad, entry->probeid);
entry->probeid = 0;
}
return TRUE;
}
static void
unblock_children (GnlComposition * comp)
{
GstIterator *children;
children = gst_bin_iterate_elements (GST_BIN (comp->priv->current_bin));
retry:
if (G_UNLIKELY (gst_iterator_fold (children,
(GstIteratorFoldFunction) unblock_child_pads, NULL,
comp) == GST_ITERATOR_RESYNC)) {
gst_iterator_resync (children);
goto retry;
}
gst_iterator_free (children);
}
static gboolean
reset_child (GValue * item, GValue * ret G_GNUC_UNUSED, gpointer user_data)
{
@ -956,6 +1022,9 @@ reset_child (GValue * item, GValue * ret G_GNUC_UNUSED, gpointer user_data)
GnlObject *object;
GstPad *srcpad, *peerpad;
GST_DEBUG_OBJECT (child, "unlocking state");
gst_element_set_locked_state (child, FALSE);
entry = COMP_ENTRY (comp, child);
object = entry->object;
srcpad = object->srcpad;
@ -968,6 +1037,18 @@ reset_child (GValue * item, GValue * ret G_GNUC_UNUSED, gpointer user_data)
return TRUE;
}
static gboolean
lock_child_state (GValue * item, GValue * ret G_GNUC_UNUSED,
gpointer udata G_GNUC_UNUSED)
{
GstElement *child = g_value_get_object (item);
GST_DEBUG_OBJECT (child, "locking state");
gst_element_set_locked_state (child, TRUE);
return TRUE;
}
static void
reset_children (GnlComposition * comp)
{
@ -1953,6 +2034,16 @@ get_clean_toplevel_stack (GnlComposition * comp, GstClockTime * timestamp,
}
static gboolean
set_child_caps (GValue * item, GValue * ret G_GNUC_UNUSED, GnlObject * comp)
{
GstElement *child = g_value_get_object (item);
gnl_object_set_caps ((GnlObject *) child, comp->caps);
return TRUE;
}
/* Must be called with OBJECTS_LOCK taken */
static void
_set_current_bin_to_ready (GnlComposition * comp)
@ -2009,6 +2100,82 @@ _process_pending_entries (GnlComposition * comp)
g_hash_table_remove_all (priv->pending_io);
}
static gboolean
_emit_commited_signal_func (GnlComposition * comp)
{
GST_INFO_OBJECT (comp, "Emiting COMMITED now that the stack "
"is ready");
g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
return G_SOURCE_REMOVE;
}
static GstPadProbeReturn
_add_emit_commited_and_restart_task (GnlComposition * comp)
{
GST_ERROR_OBJECT (comp, "Setup commit and restart task!");
MAIN_CONTEXT_LOCK (comp);
g_main_context_invoke_full (comp->priv->mcontext, G_PRIORITY_HIGH,
(GSourceFunc) _emit_commited_signal_func, comp, NULL);
MAIN_CONTEXT_UNLOCK (comp);
comp->priv->awaited_segment_seqnum = 0;
comp->priv->commited_probeid = 0;
gst_task_start (comp->task);
return GST_PAD_PROBE_REMOVE;
}
static GstPadProbeReturn
_commit_done_cb (GstPad * pad, GstPadProbeInfo * info, GnlComposition * comp)
{
if (comp->priv->awaited_segment_seqnum) {
if (GST_IS_EVENT (info->data)) {
gint seqnum = gst_event_get_seqnum (info->data);
GST_DEBUG_OBJECT (comp, "Got event %s -- with seqnum: %i "
"(awaited_segment_seqnum: %i)",
GST_EVENT_TYPE_NAME (info->data), seqnum,
comp->priv->awaited_segment_seqnum);
if (seqnum == comp->priv->awaited_segment_seqnum) {
if (GST_EVENT_TYPE (info->data) == GST_EVENT_EOS) {
GST_INFO_OBJECT (comp, "Received EOS even before"
" receiving SEGMENT with proper seqnum -> we are done");
return _add_emit_commited_and_restart_task (comp);
} else if (GST_EVENT_TYPE (info->data) == GST_EVENT_SEGMENT) {
GST_INFO_OBJECT (comp, "Got segment event with right seqnum"
" now waiting for a buffer to restart playing with our "
" children");
comp->priv->awaited_segment_seqnum = 0;
}
}
}
return GST_PAD_PROBE_OK;
} else if (GST_IS_BUFFER (info->data)) {
GST_INFO_OBJECT (comp, "Got %" GST_PTR_FORMAT " concidering commit "
"as done", info->data);
return _add_emit_commited_and_restart_task (comp);
}
GST_INFO_OBJECT (comp, "Got %" GST_PTR_FORMAT " still waiting for a buffer",
info->data);
return GST_PAD_PROBE_OK;
}
static inline gboolean
_commit_values (GnlComposition * comp)
{
@ -2033,6 +2200,8 @@ _commit_func (GnlComposition * comp)
GstClockTime curpos;
GnlCompositionPrivate *priv = comp->priv;
GST_INFO_OBJECT (comp, "Commiting state");
COMP_OBJECTS_LOCK (comp);
/* Get current so that it represent the duration it was
@ -2043,7 +2212,7 @@ _commit_func (GnlComposition * comp)
if (_commit_values (comp) == FALSE) {
COMP_OBJECTS_UNLOCK (comp);
GST_ERROR_OBJECT (comp, "Nothing to commit, leaving");
GST_INFO_OBJECT (comp, "Nothing to commit, leaving");
g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, FALSE);
@ -2057,18 +2226,26 @@ _commit_func (GnlComposition * comp)
(priv->objects_stop, (GCompareFunc) objects_stop_compare);
if (priv->initialized == FALSE) {
GST_DEBUG_OBJECT (comp, "Not initialized yet, just updating values");
update_start_stop_duration (comp);
g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
} else {
/* And update the pipeline at current position if needed */
update_start_stop_duration (comp);
update_pipeline (comp, curpos, TRUE, TRUE);
if (!priv->current) {
GST_INFO_OBJECT (comp, "No new stack set, we can go and keep acting on"
" our children");
g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
}
}
COMP_OBJECTS_UNLOCK (comp);
GST_ERROR ("emitted signal");
g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, TRUE);
return G_SOURCE_REMOVE;
}
@ -2139,6 +2316,7 @@ _set_all_children_state (GnlComposition * comp, GstState state)
static GstStateChangeReturn
gnl_composition_change_state (GstElement * element, GstStateChange transition)
{
GstIterator *children;
GnlComposition *comp = (GnlComposition *) element;
GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
@ -2149,6 +2327,32 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
gnl_composition_reset (comp);
/* state-lock all elements */
GST_DEBUG_OBJECT (comp,
"Setting all children to READY and locking their state");
children = gst_bin_iterate_elements (GST_BIN (comp->priv->current_bin));
while (G_UNLIKELY (gst_iterator_fold (children,
(GstIteratorFoldFunction) lock_child_state, NULL,
NULL) == GST_ITERATOR_RESYNC)) {
gst_iterator_resync (children);
}
gst_iterator_free (children);
/* Set caps on all objects */
if (G_UNLIKELY (!gst_caps_is_any (GNL_OBJECT (comp)->caps))) {
children = gst_bin_iterate_elements (GST_BIN (comp->priv->current_bin));
while (G_UNLIKELY (gst_iterator_fold (children,
(GstIteratorFoldFunction) set_child_caps, NULL,
comp) == GST_ITERATOR_RESYNC)) {
gst_iterator_resync (children);
}
gst_iterator_free (children);
}
_add_initialize_stack_gsource (comp);
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
@ -2168,6 +2372,15 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition)
if (ret == GST_STATE_CHANGE_FAILURE)
return ret;
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
case GST_STATE_CHANGE_READY_TO_NULL:
unblock_children (comp);
break;
default:
break;
}
return ret;
}
@ -2464,21 +2677,45 @@ _deactivate_stack (GnlComposition * comp)
if (ptarget)
gst_object_unref (ptarget);
/* priv->current = NULL;
*/
}
static void
_relink_new_stack (GnlComposition * comp, GNode * stack,
GstEvent * toplevel_seek)
{
GnlCompositionPrivate *priv = comp->priv;
GST_ERROR ("RElinking new stack");
_relink_single_node (comp, stack, toplevel_seek);
GST_ERROR ("Reseting seqnum to %i", gst_event_get_seqnum (toplevel_seek));
GNL_OBJECT (comp)->wanted_seqnum = gst_event_get_seqnum (toplevel_seek);
gst_event_unref (toplevel_seek);
gst_element_set_locked_state (comp->priv->current_bin, FALSE);
gst_element_sync_state_with_parent (comp->priv->current_bin);
gst_element_set_locked_state (priv->current_bin, FALSE);
gst_element_sync_state_with_parent (priv->current_bin);
}
/* static void
* unlock_activate_stack (GnlComposition * comp, GNode * node, GstState state)
* {
* GNode *child;
*
* GST_LOG_OBJECT (comp, "object:%s",
* GST_ELEMENT_NAME ((GstElement *) (node->data)));
*
* gst_element_set_locked_state ((GstElement *) (node->data), FALSE);
* gst_element_set_state (GST_ELEMENT (node->data), state);
*
* for (child = node->children; child; child = child->next)
* unlock_activate_stack (comp, child, state);
* }
*/
static gboolean
are_same_stacks (GNode * stack1, GNode * stack2)
{
@ -2678,8 +2915,8 @@ update_pipeline (GnlComposition * comp, GstClockTime currenttime,
}
toplevel_seek = get_new_seek_event (comp, TRUE, updatestoponly);
stack_seqnum = gst_event_get_seqnum (toplevel_seek);
if (_is_last_stack (comp)) {
stack_seqnum = gst_event_get_seqnum (toplevel_seek);
g_atomic_int_set (&priv->real_eos_seqnum, stack_seqnum);
GST_ERROR_OBJECT (comp, "Seeting up last stack, seqnum is: %i",
@ -2696,11 +2933,17 @@ update_pipeline (GnlComposition * comp, GstClockTime currenttime,
GST_DEBUG_OBJECT (comp, "Setting current stack");
priv->current = stack;
if (!samestack && stack) {
GST_DEBUG_OBJECT (comp, "activating objects in new stack to %s",
gst_element_state_get_name (nextstate));
unlock_activate_stack (comp, stack, nextstate);
GST_DEBUG_OBJECT (comp, "Finished activating objects in new stack");
if (priv->current) {
GST_INFO_OBJECT (comp, "New stack set and ready to run, probing src pad"
" and stopping children thread until we are actually ready with"
" that new stack");
comp->priv->awaited_segment_seqnum = stack_seqnum;
priv->commited_probeid = gst_pad_add_probe (GNL_OBJECT_SRC (comp),
GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
(GstPadProbeCallback) _commit_done_cb, comp, NULL);
gst_task_pause (comp->task);
}
/* Activate stack */