composition: Post messages on the bus when it updates itself

And properly set the seqnums of those messages so that the application,
parents have the exact information about what is going on and why.
This commit is contained in:
Thibault Saunier 2014-07-25 10:55:52 +02:00
parent 9bd96f9770
commit 463626ceae
2 changed files with 142 additions and 64 deletions

View file

@ -48,7 +48,6 @@ GST_DEBUG_CATEGORY_STATIC (gnlcomposition_debug);
G_DEFINE_TYPE_WITH_CODE (GnlComposition, gnl_composition, GNL_TYPE_OBJECT,
_do_init);
enum
{
PROP_0,
@ -96,6 +95,14 @@ typedef struct
GnlObject *object;
} ChildIOData;
typedef struct
{
GnlComposition *comp;
gint32 seqnum;
GnlUpdateStackReason reason;
} UpdateCompositionData;
struct _GnlCompositionPrivate
{
gboolean dispose_has_run;
@ -197,13 +204,15 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition);
static inline void gnl_composition_reset_target_pad (GnlComposition * comp);
static gboolean
seek_handling (GnlComposition * comp, GnlUpdateStackReason update_stack_reason);
seek_handling (GnlComposition * comp, gint32 seqnum,
GnlUpdateStackReason update_stack_reason);
static gint objects_start_compare (GnlObject * a, GnlObject * b);
static gint objects_stop_compare (GnlObject * a, GnlObject * b);
static GstClockTime get_current_position (GnlComposition * comp);
static gboolean update_pipeline (GnlComposition * comp,
GstClockTime currenttime, GnlUpdateStackReason update_stack_reason);
GstClockTime currenttime, gint32 seqnum,
GnlUpdateStackReason update_stack_reason);
static gboolean gnl_composition_commit_func (GnlObject * object,
gboolean recurse);
static void update_start_stop_duration (GnlComposition * comp);
@ -213,8 +222,8 @@ gnl_composition_event_handler (GstPad * ghostpad, GstObject * parent,
GstEvent * event);
static void _relink_single_node (GnlComposition * comp, GNode * node,
GstEvent * toplevel_seek);
static gboolean _update_pipeline_func (GnlComposition * comp);
static gboolean _commit_func (GnlComposition * comp);
static gboolean _update_pipeline_func (UpdateCompositionData * ucompo);
static gboolean _commit_func (UpdateCompositionData * ucompo);
static GstEvent *get_new_seek_event (GnlComposition * comp, gboolean initial,
gboolean updatestoponly);
static gboolean
@ -226,7 +235,7 @@ static void _deactivate_stack (GnlComposition * comp,
static gboolean _set_real_eos_seqnum_from_seek (GnlComposition * comp,
GstEvent * event);
static gboolean _emit_commited_signal_func (GnlComposition * comp);
static void _restart_task (GnlComposition * comp, gboolean emit_commit);
static void _restart_task (UpdateCompositionData * ucompo);
/* COMP_REAL_START: actual position to start current playback at. */
@ -391,6 +400,33 @@ join_failed:
return res;
}
static void
_post_start_composition_update (GnlComposition * comp,
gint32 seqnum, GnlUpdateStackReason reason)
{
GstMessage *msg;
msg = gst_message_new_element (GST_OBJECT (comp),
gst_structure_new ("GnlCompositionStartUpdate",
"reason", G_TYPE_STRING, UPDATE_PIPELINE_REASONS[reason], NULL));
gst_message_set_seqnum (msg, seqnum);
gst_element_post_message (GST_ELEMENT (comp), msg);
}
static void
_post_start_composition_update_done (GnlComposition * comp,
gint32 seqnum, GnlUpdateStackReason reason)
{
GstMessage *msg = gst_message_new_element (GST_OBJECT (comp),
gst_structure_new ("GnlCompositionUpdateDone",
"reason", G_TYPE_STRING, UPDATE_PIPELINE_REASONS[reason],
NULL));
gst_message_set_seqnum (msg, seqnum);
gst_element_post_message (GST_ELEMENT (comp), msg);
}
static void
_free_seek_data (SeekData * seekd)
{
@ -432,6 +468,9 @@ _seek_pipeline_func (SeekData * seekd)
goto beach;
}
_post_start_composition_update (seekd->comp,
gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK);
/* crop the segment start/stop values */
/* Only crop segment start value if we don't have a default object */
if (priv->expandables == NULL)
@ -442,7 +481,11 @@ _seek_pipeline_func (SeekData * seekd)
priv->next_base_time = 0;
seek_handling (seekd->comp, COMP_UPDATE_STACK_ON_SEEK);
seek_handling (seekd->comp, gst_event_get_seqnum (seekd->event),
COMP_UPDATE_STACK_ON_SEEK);
_post_start_composition_update_done (seekd->comp,
gst_event_get_seqnum (seekd->event), COMP_UPDATE_STACK_ON_SEEK);
beach:
return G_SOURCE_REMOVE;
@ -490,17 +533,20 @@ _add_seek_gsource (GnlComposition * comp, GstEvent * event)
static gboolean
_initialize_stack_func (GnlComposition * comp)
_initialize_stack_func (UpdateCompositionData * ucompo)
{
GnlComposition *comp = ucompo->comp;
GnlCompositionPrivate *priv = comp->priv;
_post_start_composition_update (comp, ucompo->seqnum, ucompo->reason);
comp->priv->next_base_time = 0;
/* set ghostpad target */
if (!(update_pipeline (comp, COMP_REAL_START (comp),
COMP_UPDATE_STACK_INITIALIZE))) {
ucompo->seqnum, COMP_UPDATE_STACK_INITIALIZE))) {
GST_FIXME_OBJECT (comp, "PLEASE signal state change failure ASYNC");
}
_post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
priv->initialized = TRUE;
return G_SOURCE_REMOVE;
@ -608,6 +654,29 @@ _add_add_object_gsource (GnlComposition * comp, GnlObject * object)
_free_child_io_data, G_PRIORITY_DEFAULT);
}
static void
_free_update_compo_data (gpointer updatepipelinedata)
{
g_slice_free (UpdateCompositionData, updatepipelinedata);
}
static void
_add_update_compo_gsource (GnlComposition * comp,
GSourceFunc func, GnlUpdateStackReason reason)
{
UpdateCompositionData *ucompo = g_slice_new0 (UpdateCompositionData);
ucompo->comp = comp;
ucompo->reason = reason;
ucompo->seqnum = gst_util_seqnum_next ();
GST_INFO_OBJECT (comp, "Updating because: %s -- Setting seqnum: %i",
UPDATE_PIPELINE_REASONS[reason], ucompo->seqnum);
_add_gsource (comp, (GSourceFunc) func, ucompo,
_free_update_compo_data, G_PRIORITY_DEFAULT);
}
static void
gnl_composition_class_init (GnlCompositionClass * klass)
{
@ -750,7 +819,6 @@ gnl_composition_dispose (GObject * object)
iter = priv->expandables;
g_print ("ITER IS: %p\n", iter);
while (iter) {
GList *next = iter->next;
@ -947,14 +1015,18 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
"seq num is: %i", comp->priv->real_eos_seqnum, seqnum);
if (priv->commited_probeid && comp->priv->awaited_caps_seqnum == 0) {
UpdateCompositionData ucompo;
GST_INFO_OBJECT (comp, "We got an EOS right after seeing the right"
" segment, restarting task");
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (comp),
GST_DEBUG_GRAPH_SHOW_ALL, "eos-after-segment");
gst_pad_remove_probe (GNL_OBJECT_SRC (comp), priv->commited_probeid);
ucompo.comp = comp;
GST_FIXME_OBJECT (comp, "Check it we need to emit or not!!");
_restart_task (comp, TRUE);
ucompo.reason = COMP_UPDATE_STACK_ON_COMMIT;
ucompo.seqnum = seqnum;
gst_pad_remove_probe (GNL_OBJECT_SRC (comp), priv->commited_probeid);
_restart_task (&ucompo);
}
if (g_atomic_int_compare_and_exchange (&comp->priv->real_eos_seqnum,
@ -967,8 +1039,8 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
}
if (priv->next_eos_seqnum == seqnum)
_add_gsource (comp, (GSourceFunc) _update_pipeline_func, comp,
NULL, G_PRIORITY_DEFAULT);
_add_update_compo_gsource (comp, (GSourceFunc) _update_pipeline_func,
COMP_UPDATE_STACK_ON_EOS);
else
GST_INFO_OBJECT (comp,
"Got an EOS but it seqnum %i != next eos seqnum %i", seqnum,
@ -1024,8 +1096,8 @@ have_to_update_pipeline (GnlComposition * comp,
static gboolean
gnl_composition_commit_func (GnlObject * object, gboolean recurse)
{
_add_gsource (GNL_COMPOSITION (object), (GSourceFunc) _commit_func,
GNL_COMPOSITION (object), NULL, G_PRIORITY_DEFAULT);
_add_update_compo_gsource (GNL_COMPOSITION (object),
(GSourceFunc) _commit_func, COMP_UPDATE_STACK_ON_COMMIT);
return TRUE;
}
@ -1205,19 +1277,23 @@ _seek_current_stack (GnlComposition * comp, GstEvent * event,
*/
static gboolean
seek_handling (GnlComposition * comp, GnlUpdateStackReason update_stack_reason)
seek_handling (GnlComposition * comp, gint32 seqnum,
GnlUpdateStackReason update_stack_reason)
{
GST_DEBUG_OBJECT (comp, "Seek hnalding update pipeline reason: %s",
GST_DEBUG_OBJECT (comp, "Seek hanlding update pipeline reason: %s",
UPDATE_PIPELINE_REASONS[update_stack_reason]);
if (have_to_update_pipeline (comp, update_stack_reason)) {
if (comp->priv->segment->rate >= 0.0)
update_pipeline (comp, comp->priv->segment->start, update_stack_reason);
update_pipeline (comp, comp->priv->segment->start, seqnum,
update_stack_reason);
else
update_pipeline (comp, comp->priv->segment->stop, update_stack_reason);
update_pipeline (comp, comp->priv->segment->stop, seqnum,
update_stack_reason);
} else {
GstEvent *toplevel_seek = get_new_seek_event (comp, FALSE, FALSE);
gst_event_set_seqnum (toplevel_seek, seqnum);
_set_real_eos_seqnum_from_seek (comp, toplevel_seek);
_seek_current_stack (comp, toplevel_seek,
@ -1792,7 +1868,7 @@ _set_current_bin_to_ready (GnlComposition * comp, gboolean flush_downstream)
flush_event = gst_event_new_flush_start ();
priv->flush_seqnum = gst_event_get_seqnum (flush_event);
GST_INFO_OBJECT (comp, "sending flushes downstream with seqnum %d",
priv->flush_seqnum);
priv->flush_seqnum);
gst_pad_push_event (ptarget, flush_event);
flush_event = gst_event_new_flush_stop (TRUE);
@ -1852,12 +1928,14 @@ _emit_commited_signal_func (GnlComposition * comp)
}
static void
_restart_task (GnlComposition * comp, gboolean emit_commit)
_restart_task (UpdateCompositionData * ucompo)
{
GST_INFO_OBJECT (comp, "Restarting task! (%semiting commit done)",
emit_commit ? "" : "NOT ");
GnlComposition *comp = ucompo->comp;
if (emit_commit)
GST_INFO_OBJECT (comp, "Restarting task! after %s DONE",
UPDATE_PIPELINE_REASONS[ucompo->reason]);
if (ucompo->reason == COMP_UPDATE_STACK_ON_COMMIT)
_add_gsource (comp, (GSourceFunc) _emit_commited_signal_func, comp, NULL,
G_PRIORITY_HIGH);
@ -1868,8 +1946,10 @@ _restart_task (GnlComposition * comp, gboolean emit_commit)
}
static gboolean
_is_ready_to_restart_task (GnlComposition * comp, GstEvent * event)
_is_ready_to_restart_task (UpdateCompositionData * ucompo, GstEvent * event)
{
GnlComposition *comp = ucompo->comp;
if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS ||
GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
gint seqnum = gst_event_get_seqnum (event);
@ -1879,6 +1959,7 @@ _is_ready_to_restart_task (GnlComposition * comp, GstEvent * event)
gchar *name = g_strdup_printf ("new-stack__%" GST_TIME_FORMAT "--%"
GST_TIME_FORMAT "", GST_TIME_ARGS (comp->priv->segment_start),
GST_TIME_ARGS (comp->priv->segment_stop));
GST_INFO_OBJECT (comp, "Got %s with proper seqnum"
" done with stack reconfiguration %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event);
@ -1899,23 +1980,11 @@ _is_ready_to_restart_task (GnlComposition * comp, GstEvent * event)
}
static GstPadProbeReturn
_commit_done_cb (GstPad * pad, GstPadProbeInfo * info, GnlComposition * comp)
_is_update_done_cb (GstPad * pad, GstPadProbeInfo * info,
UpdateCompositionData * ucompo)
{
if (_is_ready_to_restart_task (comp, info->data)) {
_restart_task (comp, TRUE);
return GST_PAD_PROBE_REMOVE;
}
return GST_PAD_PROBE_OK;
}
static GstPadProbeReturn
_stack_setup_done_cb (GstPad * pad, GstPadProbeInfo * info,
GnlComposition * comp)
{
if (_is_ready_to_restart_task (comp, info->data)) {
_restart_task (comp, FALSE);
if (_is_ready_to_restart_task (ucompo, info->data)) {
_restart_task (ucompo);
return GST_PAD_PROBE_REMOVE;
}
@ -1942,11 +2011,14 @@ _commit_values (GnlComposition * comp)
}
static gboolean
_commit_func (GnlComposition * comp)
_commit_func (UpdateCompositionData * ucompo)
{
GstClockTime curpos;
GnlComposition *comp = ucompo->comp;
GnlCompositionPrivate *priv = comp->priv;
_post_start_composition_update (comp, ucompo->seqnum, ucompo->reason);
priv->next_base_time = 0;
GST_INFO_OBJECT (comp, "Commiting state");
@ -1960,6 +2032,7 @@ _commit_func (GnlComposition * comp)
GST_INFO_OBJECT (comp, "Nothing to commit, leaving");
g_signal_emit (comp, _signals[COMMITED_SIGNAL], 0, FALSE);
_post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
return G_SOURCE_REMOVE;
}
@ -1981,7 +2054,7 @@ _commit_func (GnlComposition * comp)
/* And update the pipeline at current position if needed */
update_start_stop_duration (comp);
update_pipeline (comp, curpos, COMP_UPDATE_STACK_ON_COMMIT);
update_pipeline (comp, curpos, ucompo->seqnum, COMP_UPDATE_STACK_ON_COMMIT);
if (!priv->current) {
GST_INFO_OBJECT (comp, "No new stack set, we can go and keep acting on"
@ -1991,17 +2064,21 @@ _commit_func (GnlComposition * comp)
}
}
_post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
return G_SOURCE_REMOVE;
}
static gboolean
_update_pipeline_func (GnlComposition * comp)
_update_pipeline_func (UpdateCompositionData * ucompo)
{
GnlCompositionPrivate *priv;
gboolean reverse;
GnlComposition *comp = ucompo->comp;
GnlCompositionPrivate *priv = comp->priv;
_post_start_composition_update (comp, ucompo->seqnum, ucompo->reason);
/* Set up a non-initial seek on segment_stop */
priv = comp->priv;
reverse = (priv->segment->rate < 0.0);
if (!reverse) {
GST_DEBUG_OBJECT (comp,
@ -2015,7 +2092,7 @@ _update_pipeline_func (GnlComposition * comp)
priv->segment->stop = priv->segment_start;
}
seek_handling (comp, COMP_UPDATE_STACK_ON_EOS);
seek_handling (comp, ucompo->seqnum, COMP_UPDATE_STACK_ON_EOS);
/* Post segment done if last seek was a segment seek */
if (!priv->current && (priv->segment->flags & GST_SEEK_FLAG_SEGMENT)) {
@ -2035,6 +2112,7 @@ _update_pipeline_func (GnlComposition * comp)
gst_event_new_segment_done (priv->segment->format, epos));
}
_post_start_composition_update_done (comp, ucompo->seqnum, ucompo->reason);
return G_SOURCE_REMOVE;
}
@ -2073,8 +2151,8 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition)
GST_DEBUG_OBJECT (comp,
"Setting all children to READY and locking their state");
_add_gsource (comp, (GSourceFunc) _initialize_stack_func, comp,
NULL, G_PRIORITY_DEFAULT);
_add_update_compo_gsource (comp, (GSourceFunc) _initialize_stack_func,
COMP_UPDATE_STACK_INITIALIZE);
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
_stop_task (comp);
@ -2562,7 +2640,7 @@ _set_real_eos_seqnum_from_seek (GnlComposition * comp, GstEvent * event)
* WITH OBJECTS LOCK TAKEN
*/
static gboolean
update_pipeline (GnlComposition * comp, GstClockTime currenttime,
update_pipeline (GnlComposition * comp, GstClockTime currenttime, gint32 seqnum,
GnlUpdateStackReason update_reason)
{
@ -2581,10 +2659,10 @@ update_pipeline (GnlComposition * comp, GstClockTime currenttime,
_assert_proper_thread (comp);
GST_DEBUG_OBJECT (comp,
GST_INFO_OBJECT (comp,
"currenttime:%" GST_TIME_FORMAT
" Reason: %s", GST_TIME_ARGS (currenttime),
UPDATE_PIPELINE_REASONS[update_reason]);
" Reason: %s, Seqnum: %i", GST_TIME_ARGS (currenttime),
UPDATE_PIPELINE_REASONS[update_reason], seqnum);
if (!GST_CLOCK_TIME_IS_VALID (currenttime))
return FALSE;
@ -2637,6 +2715,7 @@ update_pipeline (GnlComposition * comp, GstClockTime currenttime,
#endif
toplevel_seek = get_new_seek_event (comp, TRUE, updatestoponly);
gst_event_set_seqnum (toplevel_seek, seqnum);
_set_real_eos_seqnum_from_seek (comp, toplevel_seek);
_remove_all_update_sources (comp);
@ -2654,21 +2733,20 @@ update_pipeline (GnlComposition * comp, GstClockTime currenttime,
priv->current = stack;
if (priv->current) {
GstPadProbeCallback _stack_setup_done;
UpdateCompositionData *ucompo = g_slice_new0 (UpdateCompositionData);
GST_INFO_OBJECT (comp, "New stack set and ready to run, probing src pad"
" and stopping children thread until we are actually ready with"
" that new stack");
if (update_reason == COMP_UPDATE_STACK_ON_COMMIT)
_stack_setup_done = (GstPadProbeCallback) _commit_done_cb;
else
_stack_setup_done = (GstPadProbeCallback) _stack_setup_done_cb;
ucompo->comp = comp;
ucompo->reason = update_reason;
ucompo->seqnum = seqnum;
comp->priv->awaited_caps_seqnum = priv->next_eos_seqnum;
priv->commited_probeid = gst_pad_add_probe (GNL_OBJECT_SRC (comp),
GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
(GstPadProbeCallback) _stack_setup_done, comp, NULL);
(GstPadProbeCallback) _is_update_done_cb, ucompo,
_free_update_compo_data);
gst_task_pause (comp->task);
}

View file

@ -218,9 +218,9 @@ GST_START_TEST (test_dispose_on_commit)
fail_unless (gst_element_link (composition, fakesink) == TRUE);
ASSERT_OBJECT_REFCOUNT (composition, "composition", 1);
g_signal_emit_by_name (composition, "commit", TRUE, &ret);
ASSERT_OBJECT_REFCOUNT (composition, "composition", 1);
gst_object_unref (pipeline);
}