composition: Use a GstPad task to run the update pipeline thread

This commit is contained in:
Thibault Saunier 2014-06-27 12:15:10 +02:00
parent 6b193cdbda
commit 550aaf522d
2 changed files with 182 additions and 85 deletions

View file

@ -1,6 +1,8 @@
/* GStreamer /* GStreamer
* Copyright (C) 2001 Wim Taymans <wim.taymans@gmail.com> * Copyright (C) 2001 Wim Taymans <wim.taymans@gmail.com>
* 2004-2008 Edward Hervey <bilboed@bilboed.com> * 2004-2008 Edward Hervey <bilboed@bilboed.com>
* 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
* 2014 Thibault Saunier <tsaunier@gnome.org>
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public * modify it under the terms of the GNU Library General Public
@ -133,9 +135,12 @@ struct _GnlCompositionPrivate
GstPadEventFunction gnl_event_pad_func; GstPadEventFunction gnl_event_pad_func;
gboolean send_stream_start; gboolean send_stream_start;
GThread *update_pipeline_thread; GMainContext *mcontext;
GCond update_pipeline_cond; /* Ensure that when we remove all sources from the maincontext
GMutex update_pipeline_mutex; * we can not add any source, avoiding:
* "g_source_attach: assertion '!SOURCE_DESTROYED (source)' failed" */
GMutex mcontext_lock;
gboolean reset_time; gboolean reset_time;
@ -193,6 +198,7 @@ gnl_composition_event_handler (GstPad * ghostpad, GstObject * parent,
static void static void
compare_relink_single_node (GnlComposition * comp, GNode * node, compare_relink_single_node (GnlComposition * comp, GNode * node,
GNode * oldstack); GNode * oldstack);
static gboolean update_pipeline_func (GnlComposition * comp);
/* COMP_REAL_START: actual position to start current playback at. */ /* COMP_REAL_START: actual position to start current playback at. */
@ -236,22 +242,19 @@ compare_relink_single_node (GnlComposition * comp, GNode * node,
g_mutex_unlock (&comp->priv->flushing_lock); \ g_mutex_unlock (&comp->priv->flushing_lock); \
} G_STMT_END } G_STMT_END
#define WAIT_FOR_UPDATE_PIPELINE(comp) G_STMT_START { \ #define MAIN_CONTEXT_LOCK(comp) G_STMT_START { \
GST_INFO_OBJECT (comp, "waiting for EOS from thread %p", \ GST_LOG_OBJECT (comp, "Getting MAIN_CONTEXT_LOCK in thread %p", \
g_thread_self()); \ g_thread_self()); \
g_mutex_lock(&(comp->priv->update_pipeline_mutex)); \ g_mutex_lock(&((GnlComposition*)comp)->priv->mcontext_lock); \
g_cond_wait(&(comp->priv->update_pipeline_cond), \ GST_LOG_OBJECT (comp, "Got MAIN_CONTEXT_LOCK in thread %p", \
&(comp->priv->update_pipeline_mutex)); \ g_thread_self()); \
g_mutex_unlock(&(comp->priv->update_pipeline_mutex)); \ } G_STMT_END
} G_STMT_END
#define SIGNAL_UPDATE_PIPELINE(comp) { \ #define MAIN_CONTEXT_UNLOCK(comp) G_STMT_START { \
GST_INFO_OBJECT (comp, "signaling EOS from thread %p", \ g_mutex_unlock(&((GnlComposition*)comp)->priv->mcontext_lock); \
g_thread_self()); \ GST_LOG_OBJECT (comp, "Unlocked MAIN_CONTEXT_LOCK in thread %p", \
g_mutex_lock(&(comp->priv->update_pipeline_mutex)); \ g_thread_self()); \
g_cond_signal(&(comp->priv->update_pipeline_cond)); \ } G_STMT_END
g_mutex_unlock(&(comp->priv->update_pipeline_mutex)); \
} G_STMT_END
@ -267,6 +270,109 @@ struct _GnlCompositionEntry
gboolean seeked; gboolean seeked;
}; };
static void
_remove_all_sources (GnlComposition * comp)
{
GSource *source;
MAIN_CONTEXT_LOCK (comp);
while ((source =
g_main_context_find_source_by_user_data (comp->priv->mcontext,
comp))) {
g_source_destroy (source);
}
MAIN_CONTEXT_UNLOCK (comp);
}
static void
iterate_main_context_func (GnlComposition * comp)
{
if (comp->priv->running == FALSE) {
GST_DEBUG_OBJECT (comp, "Not running anymore");
return;
}
g_main_context_iteration (comp->priv->mcontext, TRUE);
}
static void
_start_srcpad_task (GnlComposition * comp)
{
GST_ERROR_OBJECT (comp, "Starting srcpad task");
comp->priv->running = TRUE;
gst_pad_start_task (GST_PAD (GNL_OBJECT_SRC (comp)),
(GstTaskFunction) iterate_main_context_func, comp, NULL);
}
static gboolean
_stop_srcpad_task (GnlComposition * comp, GstEvent * flush_start)
{
gboolean res = TRUE;
GnlObject *obj = GNL_OBJECT (comp);
GST_ERROR_OBJECT (comp, "%s srcpad task",
flush_start ? "Pausing" : "Stopping");
comp->priv->running = FALSE;
/* Clean the stack of GSource set on the MainContext */
g_main_context_wakeup (comp->priv->mcontext);
_remove_all_sources (comp);
if (flush_start) {
res = gst_pad_push_event (obj->srcpad, flush_start);
}
gst_pad_stop_task (obj->srcpad);
return res;
}
static gboolean
src_activate_mode (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active)
{
GnlComposition *comp = GNL_COMPOSITION (parent);
if (gst_ghost_pad_activate_mode_default (pad, parent, mode, active) == FALSE) {
GST_WARNING_OBJECT (pad, "Could not activate ghost pad");
return FALSE;
}
GST_ERROR ("ACTIVATING SRCPAD TASK %i", active);
if (active == TRUE) {
switch (mode) {
case GST_PAD_MODE_PUSH:
{
GST_INFO_OBJECT (pad, "Activating pad!");
_start_srcpad_task (comp);
return TRUE;
}
default:
{
GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
return FALSE;
}
}
}
/* deactivating */
GST_INFO_OBJECT (comp, "Deactivating srcpad");
_stop_srcpad_task (comp, FALSE);
return TRUE;
}
static void
_add_update_gsource (GnlComposition * comp)
{
MAIN_CONTEXT_LOCK (comp);
g_main_context_invoke (comp->priv->mcontext,
(GSourceFunc) update_pipeline_func, comp);
MAIN_CONTEXT_UNLOCK (comp);
}
static void static void
gnl_composition_class_init (GnlCompositionClass * klass) gnl_composition_class_init (GnlCompositionClass * klass)
{ {
@ -401,6 +507,8 @@ gnl_composition_init (GnlComposition * comp)
g_direct_equal, NULL, (GDestroyNotify) hash_value_destroy); g_direct_equal, NULL, (GDestroyNotify) hash_value_destroy);
priv->deactivated_elements_state = GST_STATE_READY; priv->deactivated_elements_state = GST_STATE_READY;
priv->mcontext = g_main_context_new ();
g_mutex_init (&priv->mcontext_lock);
comp->priv = priv; comp->priv = priv;
@ -409,6 +517,8 @@ gnl_composition_init (GnlComposition * comp)
priv->gnl_event_pad_func = GST_PAD_EVENTFUNC (GNL_OBJECT_SRC (comp)); priv->gnl_event_pad_func = GST_PAD_EVENTFUNC (GNL_OBJECT_SRC (comp));
gst_pad_set_event_function (GNL_OBJECT_SRC (comp), gst_pad_set_event_function (GNL_OBJECT_SRC (comp),
GST_DEBUG_FUNCPTR (gnl_composition_event_handler)); GST_DEBUG_FUNCPTR (gnl_composition_event_handler));
gst_pad_set_activatemode_function (GNL_OBJECT_SRC (comp),
GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) src_activate_mode));
} }
static void static void
@ -698,6 +808,7 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
gboolean reverse = (comp->priv->segment->rate < 0); gboolean reverse = (comp->priv->segment->rate < 0);
gboolean should_check_objects = FALSE; gboolean should_check_objects = FALSE;
GST_ERROR ("EOS");
COMP_FLUSHING_LOCK (comp); COMP_FLUSHING_LOCK (comp);
if (priv->flushing) { if (priv->flushing) {
GST_DEBUG_OBJECT (comp, "flushing, bailing out"); GST_DEBUG_OBJECT (comp, "flushing, bailing out");
@ -736,7 +847,7 @@ ghost_event_probe_handler (GstPad * ghostpad G_GNUC_UNUSED,
return GST_PAD_PROBE_OK; return GST_PAD_PROBE_OK;
} }
SIGNAL_UPDATE_PIPELINE (comp); _add_update_gsource (comp);
retval = GST_PAD_PROBE_DROP; retval = GST_PAD_PROBE_DROP;
} }
@ -1747,59 +1858,55 @@ set_child_caps (GValue * item, GValue * ret G_GNUC_UNUSED, GnlObject * comp)
return TRUE; return TRUE;
} }
static gpointer static gboolean
update_pipeline_func (GnlComposition * comp) update_pipeline_func (GnlComposition * comp)
{ {
while (comp->priv->running) { GnlCompositionPrivate *priv;
GnlCompositionPrivate *priv; gboolean reverse;
gboolean reverse;
WAIT_FOR_UPDATE_PIPELINE (comp);
/* Set up a non-initial seek on segment_stop */
priv = comp->priv;
reverse = (priv->segment->rate < 0.0);
if (!reverse) {
GST_DEBUG_OBJECT (comp,
"Setting segment->start to segment_stop:%" GST_TIME_FORMAT,
GST_TIME_ARGS (priv->segment_stop));
priv->segment->start = priv->segment_stop;
} else {
GST_DEBUG_OBJECT (comp,
"Setting segment->stop to segment_start:%" GST_TIME_FORMAT,
GST_TIME_ARGS (priv->segment_start));
priv->segment->stop = priv->segment_start;
}
seek_handling (comp, TRUE, TRUE);
if (!priv->current) {
/* If we're at the end, post SEGMENT_DONE, or push EOS */
GST_DEBUG_OBJECT (comp, "Nothing else to play");
if (!(priv->segment->flags & GST_SEEK_FLAG_SEGMENT)) {
GST_DEBUG_OBJECT (comp, "Real EOS should be sent now");
} else if (priv->segment->flags & GST_SEEK_FLAG_SEGMENT) {
gint64 epos;
if (GST_CLOCK_TIME_IS_VALID (priv->segment->stop))
epos = (MIN (priv->segment->stop, GNL_OBJECT_STOP (comp)));
else
epos = GNL_OBJECT_STOP (comp);
GST_LOG_OBJECT (comp, "Emitting segment done pos %" GST_TIME_FORMAT,
GST_TIME_ARGS (epos));
gst_element_post_message (GST_ELEMENT_CAST (comp),
gst_message_new_segment_done (GST_OBJECT (comp),
priv->segment->format, epos));
gst_pad_push_event (GNL_OBJECT (comp)->srcpad,
gst_event_new_segment_done (priv->segment->format, epos));
}
}
/* Set up a non-initial seek on segment_stop */
priv = comp->priv;
reverse = (priv->segment->rate < 0.0);
if (!reverse) {
GST_DEBUG_OBJECT (comp,
"Setting segment->start to segment_stop:%" GST_TIME_FORMAT,
GST_TIME_ARGS (priv->segment_stop));
priv->segment->start = priv->segment_stop;
} else {
GST_DEBUG_OBJECT (comp,
"Setting segment->stop to segment_start:%" GST_TIME_FORMAT,
GST_TIME_ARGS (priv->segment_start));
priv->segment->stop = priv->segment_start;
} }
return NULL; seek_handling (comp, TRUE, TRUE);
if (!priv->current) {
/* If we're at the end, post SEGMENT_DONE, or push EOS */
GST_DEBUG_OBJECT (comp, "Nothing else to play");
if (!(priv->segment->flags & GST_SEEK_FLAG_SEGMENT)) {
GST_DEBUG_OBJECT (comp, "Real EOS should be sent now");
} else if (priv->segment->flags & GST_SEEK_FLAG_SEGMENT) {
gint64 epos;
if (GST_CLOCK_TIME_IS_VALID (priv->segment->stop))
epos = (MIN (priv->segment->stop, GNL_OBJECT_STOP (comp)));
else
epos = GNL_OBJECT_STOP (comp);
GST_LOG_OBJECT (comp, "Emitting segment done pos %" GST_TIME_FORMAT,
GST_TIME_ARGS (epos));
gst_element_post_message (GST_ELEMENT_CAST (comp),
gst_message_new_segment_done (GST_OBJECT (comp),
priv->segment->format, epos));
gst_pad_push_event (GNL_OBJECT (comp)->srcpad,
gst_event_new_segment_done (priv->segment->format, epos));
}
}
return G_SOURCE_REMOVE;
} }
static GstStateChangeReturn static GstStateChangeReturn
@ -1813,12 +1920,6 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition)
gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
switch (transition) { switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
comp->priv->running = TRUE;
comp->priv->update_pipeline_thread =
g_thread_new ("update_pipeline_thread",
(GThreadFunc) update_pipeline_func, comp);
break;
case GST_STATE_CHANGE_READY_TO_PAUSED: case GST_STATE_CHANGE_READY_TO_PAUSED:
{ {
GstIterator *children; GstIterator *children;
@ -1863,13 +1964,9 @@ gnl_composition_change_state (GstElement * element, GstStateChange transition)
} }
break; break;
case GST_STATE_CHANGE_PAUSED_TO_READY: case GST_STATE_CHANGE_PAUSED_TO_READY:
gnl_composition_reset (comp); /* Fallthrough */
break;
case GST_STATE_CHANGE_READY_TO_NULL: case GST_STATE_CHANGE_READY_TO_NULL:
gnl_composition_reset (comp); gnl_composition_reset (comp);
comp->priv->running = FALSE;
SIGNAL_UPDATE_PIPELINE (comp);
g_thread_join (comp->priv->update_pipeline_thread);
break; break;
default: default:
break; break;
@ -2420,7 +2517,7 @@ _activate_new_stack (GnlComposition * comp, gboolean forcing_flush)
priv->segment_stop = GST_CLOCK_TIME_NONE; priv->segment_stop = GST_CLOCK_TIME_NONE;
} }
GST_INFO_OBJECT (comp, "Nothing else in the composition" GST_ERROR_OBJECT (comp, "Nothing else in the composition"
", update 'worked'"); ", update 'worked'");
return TRUE; return TRUE;
} }
@ -2436,11 +2533,11 @@ _activate_new_stack (GnlComposition * comp, gboolean forcing_flush)
pad = GNL_OBJECT_SRC (topelement); pad = GNL_OBJECT_SRC (topelement);
topentry = COMP_ENTRY (comp, topelement); topentry = COMP_ENTRY (comp, topelement);
GST_DEBUG_OBJECT (comp, GST_ERROR_OBJECT (comp,
"We have a valid toplevel element pad %s:%s", GST_DEBUG_PAD_NAME (pad)); "We have a valid toplevel element pad %s:%s", GST_DEBUG_PAD_NAME (pad));
/* Send seek event */ /* Send seek event */
GST_LOG_OBJECT (comp, "sending seek event"); GST_ERROR_OBJECT (comp, "sending seek event");
if (gst_pad_send_event (pad, event)) { if (gst_pad_send_event (pad, event)) {
/* Unconditionnaly set the ghostpad target to pad */ /* Unconditionnaly set the ghostpad target to pad */
GST_LOG_OBJECT (comp, GST_LOG_OBJECT (comp,
@ -2461,7 +2558,7 @@ _activate_new_stack (GnlComposition * comp, gboolean forcing_flush)
return FALSE; return FALSE;
} }
GST_LOG_OBJECT (comp, "New stack activated!"); GST_ERROR_OBJECT (comp, "New stack activated!");
return TRUE; return TRUE;
} }

View file

@ -59,7 +59,7 @@ test_simplest_full (void)
bus = gst_element_get_bus (GST_ELEMENT (pipeline)); bus = gst_element_get_bus (GST_ELEMENT (pipeline));
GST_DEBUG ("Setting pipeline to PLAYING"); GST_ERROR ("Setting pipeline to PLAYING");
ASSERT_OBJECT_REFCOUNT (source1, "source1", 1); ASSERT_OBJECT_REFCOUNT (source1, "source1", 1);
fail_if (gst_element_set_state (GST_ELEMENT (pipeline), fail_if (gst_element_set_state (GST_ELEMENT (pipeline),
@ -76,7 +76,7 @@ test_simplest_full (void)
fail_if (collect->expected_segments != NULL); fail_if (collect->expected_segments != NULL);
GST_DEBUG ("Resetted pipeline to READY"); GST_ERROR ("Resetted pipeline to READY");
/* Expected segments */ /* Expected segments */
collect->expected_segments = g_list_append (collect->expected_segments, collect->expected_segments = g_list_append (collect->expected_segments,
@ -84,7 +84,7 @@ test_simplest_full (void)
collect->expected_base = 0; collect->expected_base = 0;
collect->gotsegment = FALSE; collect->gotsegment = FALSE;
GST_DEBUG ("Setting pipeline to PLAYING again"); GST_ERROR ("Setting pipeline to PLAYING again");
fail_if (gst_element_set_state (GST_ELEMENT (pipeline), fail_if (gst_element_set_state (GST_ELEMENT (pipeline),
GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE); GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE);