gst/gstbin.c (gst_bin_remove): Debugging fixes.

Original commit message from CVS:
2004-03-07  Andy Wingo  <wingo@pobox.com>

* gst/gstbin.c (gst_bin_remove): Debugging fixes.

* gst/schedulers/gstoptimalscheduler.c (destroy_group): Assert
there are no links to other groups when a group is destroyed.
(gst_opt_scheduler_pad_unlink): If the unlink means an element is
removed from a group, make sure the link count to elements linked
to other pads is appropriately decremented. This really fixes
#135672.

The 1.60->1.61 patch has been reapplied in light of this fix.

* gst/gstelement.c (gst_element_dispose): Really protect against
multiple invocations this time.
This commit is contained in:
Andy Wingo 2004-03-07 14:33:13 +00:00
parent 558ac743bb
commit b6a5699ca6
5 changed files with 380 additions and 231 deletions

View file

@ -1,3 +1,19 @@
2004-03-07 Andy Wingo <wingo@pobox.com>
* gst/gstbin.c (gst_bin_remove): Debugging fixes.
* gst/schedulers/gstoptimalscheduler.c (destroy_group): Assert
there are no links to other groups when a group is destroyed.
(gst_opt_scheduler_pad_unlink): If the unlink means an element is
removed from a group, make sure the link count to elements linked
to other pads is appropriately decremented. This really fixes
#135672.
The 1.60->1.61 patch has been reapplied in light of this fix.
* gst/gstelement.c (gst_element_dispose): Really protect against
multiple invocations this time.
2004-03-06 Thomas Vander Stichele <thomas at apestaart dot org>
* docs/gst/gstreamer-sections.txt:
@ -272,6 +288,7 @@
* docs/manual/debugging.xml:
fix manual for new debugging system
>>>>>>> 1.292
2004-02-25 Andy Wingo <wingo@pobox.com>
* gst/gstpad.c (gst_pad_link_prepare): Re-add

View file

@ -110,10 +110,6 @@ All GstElements can be serialized to an XML presentation and subsequently loaded
</para>
@:
@:
@:
@gstxml: the object which received the signal.
@arg1:
@arg2:
@ -123,7 +119,7 @@ All GstElements can be serialized to an XML presentation and subsequently loaded
</para>
@gstxml: the object which received the signal.
@arg1:
@arg2:
@:
@:
@:

View file

@ -564,7 +564,8 @@ gst_bin_remove (GstBin *bin, GstElement *element)
{
GstBinClass *bclass;
GST_CAT_DEBUG (GST_CAT_PARENTAGE, "[%s]: trying to remove child %s", GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (element));
GST_CAT_DEBUG (GST_CAT_PARENTAGE, "[%s]: trying to remove child %s",
GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (element));
g_return_if_fail (GST_IS_BIN (bin));
g_return_if_fail (GST_IS_ELEMENT (element));

View file

@ -2947,14 +2947,17 @@ gst_element_dispose (GObject *object)
element->numpads = 0;
if (element->state_mutex)
g_mutex_free (element->state_mutex);
element->state_mutex = NULL;
if (element->state_cond)
g_cond_free (element->state_cond);
element->state_cond = NULL;
if (element->prop_value_queue)
g_async_queue_unref (element->prop_value_queue);
element->prop_value_queue = NULL;
if (element->property_mutex)
g_mutex_free (element->property_mutex);
element->property_mutex = NULL;
gst_object_replace ((GstObject **)&element->sched, NULL);
gst_object_replace ((GstObject **)&element->clock, NULL);

View file

@ -183,20 +183,81 @@ struct _GstOptSchedulerGroup {
};
/* some group operations */
static GstOptSchedulerGroup* ref_group (GstOptSchedulerGroup *group);
#ifndef USE_COTHREADS
/*
static GstOptSchedulerGroup* ref_group_by_count (GstOptSchedulerGroup *group, gint count);
* A group is a set of elements through which data can flow without switching
* cothreads or without invoking the scheduler's run queue.
*/
#endif
static GstOptSchedulerGroup* ref_group (GstOptSchedulerGroup *group);
static GstOptSchedulerGroup* unref_group (GstOptSchedulerGroup *group);
static GstOptSchedulerGroup* create_group (GstOptSchedulerChain *chain,
GstElement *element,
GstOptSchedulerGroupType type);
static void destroy_group (GstOptSchedulerGroup *group);
static GstOptSchedulerGroup* add_to_group (GstOptSchedulerGroup *group,
GstElement *element);
static GstOptSchedulerGroup* remove_from_group (GstOptSchedulerGroup *group,
GstElement *element);
static GstOptSchedulerGroup* merge_groups (GstOptSchedulerGroup *group1,
GstOptSchedulerGroup *group2);
static void setup_group_scheduler (GstOptScheduler *osched,
GstOptSchedulerGroup *group);
static void destroy_group_scheduler (GstOptSchedulerGroup *group);
static void group_error_handler (GstOptSchedulerGroup *group);
static void group_element_set_enabled (GstOptSchedulerGroup *group,
GstElement *element, gboolean enabled);
GstElement *element,
gboolean enabled);
static gboolean schedule_group (GstOptSchedulerGroup *group);
/*
* A chain is a set of groups that are linked to each other.
*/
static void destroy_chain (GstOptSchedulerChain *chain);
static GstOptSchedulerChain* create_chain (GstOptScheduler *osched);
static GstOptSchedulerChain* ref_chain (GstOptSchedulerChain *chain);
static GstOptSchedulerChain* unref_chain (GstOptSchedulerChain *chain);
static GstOptSchedulerChain* add_to_chain (GstOptSchedulerChain *chain,
GstOptSchedulerGroup *group);
static GstOptSchedulerChain* remove_from_chain (GstOptSchedulerChain *chain,
GstOptSchedulerGroup *group);
static GstOptSchedulerChain* merge_chains (GstOptSchedulerChain *chain1,
GstOptSchedulerChain *chain2);
static void chain_recursively_migrate_group (GstOptSchedulerChain *chain,
GstOptSchedulerGroup *group);
static void chain_group_set_enabled (GstOptSchedulerChain *chain,
GstOptSchedulerGroup *group, gboolean enabled);
GstOptSchedulerGroup *group,
gboolean enabled);
static void schedule_chain (GstOptSchedulerChain *chain);
/*
* The schedule functions are the entry points for cothreads, or called directly
* by gst_opt_scheduler_schedule_run_queue
*/
static int get_group_schedule_function (int argc, char *argv[]);
static int loop_group_schedule_function (int argc, char *argv[]);
static int unknown_group_schedule_function (int argc, char *argv[]);
/*
* These wrappers are set on the pads as the chain handler (what happens when
* gst_pad_push is called) or get handler (for gst_pad_pull).
*/
static void gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstData *data);
static GstData* gst_opt_scheduler_get_wrapper (GstPad *srcpad);
static void gst_opt_scheduler_chain_wrapper (GstPad *sinkpad, GstData *data);
/*
* Without cothreads, gst_pad_push or gst_pad_pull on a loop-based group will
* just queue the peer element on a list. We need to actually run the queue
* instead of relying on cothreads to do the switch for us.
*/
#ifndef USE_COTHREADS
static void gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched);
#endif
/*
* Scheduler private data for an element
*/
@ -212,6 +273,10 @@ struct _GstOptSchedulerCtx {
GstOptSchedulerCtxFlags flags; /* flags for this element */
};
/*
* Implementation of GstScheduler
*/
enum
{
ARG_0,
@ -219,7 +284,6 @@ enum
ARG_MAX_RECURSION,
};
static void gst_opt_scheduler_class_init (GstOptSchedulerClass *klass);
static void gst_opt_scheduler_init (GstOptScheduler *scheduler);
@ -379,42 +443,6 @@ GST_PLUGIN_DEFINE (
);
static void
destroy_chain (GstOptSchedulerChain *chain)
{
GstOptScheduler *osched;
GST_LOG ( "destroy chain %p", chain);
g_assert (chain->num_groups == 0);
g_assert (chain->groups == NULL);
osched = chain->sched;
osched->chains = g_slist_remove (osched->chains, chain);
gst_object_unref (GST_OBJECT (osched));
g_free (chain);
}
static GstOptSchedulerChain*
create_chain (GstOptScheduler *osched)
{
GstOptSchedulerChain *chain;
chain = g_new0 (GstOptSchedulerChain, 1);
chain->sched = osched;
chain->refcount = 1;
chain->flags = GST_OPT_SCHEDULER_CHAIN_DISABLED;
gst_object_ref (GST_OBJECT (osched));
osched->chains = g_slist_prepend (osched->chains, chain);
GST_LOG ( "new chain %p", chain);
return chain;
}
static GstOptSchedulerChain*
ref_chain (GstOptSchedulerChain *chain)
{
@ -439,6 +467,42 @@ unref_chain (GstOptSchedulerChain *chain)
return chain;
}
static GstOptSchedulerChain*
create_chain (GstOptScheduler *osched)
{
GstOptSchedulerChain *chain;
chain = g_new0 (GstOptSchedulerChain, 1);
chain->sched = osched;
chain->refcount = 1;
chain->flags = GST_OPT_SCHEDULER_CHAIN_DISABLED;
gst_object_ref (GST_OBJECT (osched));
osched->chains = g_slist_prepend (osched->chains, chain);
GST_LOG ( "new chain %p", chain);
return chain;
}
static void
destroy_chain (GstOptSchedulerChain *chain)
{
GstOptScheduler *osched;
GST_LOG ( "destroy chain %p", chain);
g_assert (chain->num_groups == 0);
g_assert (chain->groups == NULL);
osched = chain->sched;
osched->chains = g_slist_remove (osched->chains, chain);
gst_object_unref (GST_OBJECT (osched));
g_free (chain);
}
static GstOptSchedulerChain*
add_to_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
{
@ -449,7 +513,17 @@ add_to_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
group = ref_group (group);
group->chain = ref_chain (chain);
/* The first non-disabled group in the chain's group list will be the entry
point for the chain. Because buffers can accumulate in loop elements' peer
bufpens, we preferentially schedule loop groups before get groups to avoid
unnecessary execution of get-based groups when the bufpens are already
full. */
if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
chain->groups = g_slist_prepend (chain->groups, group);
else
chain->groups = g_slist_append (chain->groups, group);
chain->num_groups++;
if (GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group)) {
@ -491,10 +565,17 @@ merge_chains (GstOptSchedulerChain *chain1, GstOptSchedulerChain *chain2)
GST_LOG ("merging chain %p and %p", chain1, chain2);
/* FIXME: document how chain2 can be NULL */
if (chain1 == chain2 || chain2 == NULL)
return chain1;
ref_chain (chain2);
/* switch if it's more efficient */
if (chain1->num_groups < chain2->num_groups) {
GstOptSchedulerChain *tmp = chain2;
chain2 = chain1;
chain1 = tmp;
}
walk = chain2->groups;
while (walk) {
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
@ -503,17 +584,15 @@ merge_chains (GstOptSchedulerChain *chain1, GstOptSchedulerChain *chain2)
GST_LOG ("reparenting group %p from chain %p to %p",
group, chain2, chain1);
group->chain = NULL;
chain2->num_groups--;
chain2 = unref_chain (chain2);
ref_group (group);
group->chain = ref_chain (chain1);
chain1->groups = g_slist_prepend (chain1->groups, group);
chain1->num_groups++;
remove_from_chain (chain2, group);
add_to_chain (chain1, group);
unref_group (group);
}
g_slist_free (chain2->groups);
chain2->groups = NULL;
unref_chain (chain2);
/* chain2 is now freed, if nothing else was referencing it before */
return chain1;
}
@ -521,8 +600,8 @@ merge_chains (GstOptSchedulerChain *chain1, GstOptSchedulerChain *chain2)
static void
chain_group_set_enabled (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group, gboolean enabled)
{
g_assert (chain != NULL);
g_assert (group != NULL);
g_assert (chain != NULL);
GST_LOG ("request to %d group %p in chain %p, have %d groups enabled out of %d",
enabled, group, chain, chain->num_enabled, chain->num_groups);
@ -539,6 +618,10 @@ chain_group_set_enabled (GstOptSchedulerChain *chain, GstOptSchedulerGroup *grou
GST_DEBUG ("enable group %p in chain %p, now %d groups enabled out of %d", group, chain,
chain->num_enabled, chain->num_groups);
/* OK to call even if the scheduler (cothread context / schedulerfunc) was
setup already -- will get destroyed when the group is destroyed */
setup_group_scheduler (chain->sched, group);
if (chain->num_enabled == chain->num_groups) {
GST_DEBUG ("enable chain %p", chain);
GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
@ -594,28 +677,13 @@ ref_group (GstOptSchedulerGroup *group)
return group;
}
#ifndef USE_COTHREADS
/* remove me
static GstOptSchedulerGroup*
ref_group_by_count (GstOptSchedulerGroup *group, gint count)
{
GST_LOG ("ref group %p %d->%d", group,
group->refcount, group->refcount+count);
group->refcount += count;
return group;
}
*/
#endif
static GstOptSchedulerGroup*
unref_group (GstOptSchedulerGroup *group)
{
GST_LOG ("unref group %p %d->%d", group,
group->refcount, group->refcount-1);
if (--group->refcount == 1) {
if (--group->refcount == 0) {
destroy_group (group);
group = NULL;
}
@ -623,6 +691,43 @@ unref_group (GstOptSchedulerGroup *group)
return group;
}
static GstOptSchedulerGroup*
create_group (GstOptSchedulerChain *chain, GstElement *element,
GstOptSchedulerGroupType type)
{
GstOptSchedulerGroup *group;
group = g_new0 (GstOptSchedulerGroup, 1);
GST_LOG ("new group %p", group);
group->refcount = 1; /* float... */
group->flags = GST_OPT_SCHEDULER_GROUP_DISABLED;
group->type = type;
add_to_group (group, element);
add_to_chain (chain, group);
group = unref_group (group); /* ...and sink. */
/* group's refcount is now 2 (one for the element, one for the chain) */
return group;
}
static void
destroy_group (GstOptSchedulerGroup *group)
{
GST_LOG ("destroy group %p", group);
g_assert (group != NULL);
g_assert (group->elements == NULL);
g_assert (group->chain == NULL);
g_assert (group->group_links == NULL);
if (group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)
destroy_group_scheduler (group);
g_free (group);
}
static GstOptSchedulerGroup*
add_to_group (GstOptSchedulerGroup *group, GstElement *element)
{
@ -639,6 +744,7 @@ add_to_group (GstOptSchedulerGroup *group, GstElement *element)
g_assert (GST_ELEMENT_SCHED_GROUP (element) == NULL);
/* Ref the group... */
GST_ELEMENT_SCHED_GROUP (element) = ref_group (group);
gst_object_ref (GST_OBJECT (element));
@ -649,26 +755,100 @@ add_to_group (GstOptSchedulerGroup *group, GstElement *element)
group_element_set_enabled (group, element, TRUE);
}
/* Ref the group... */
ref_group (group);
return group;
}
/* if the element is linked to elements from other groups, you must decrement
the link count prior to calling this function */
static GstOptSchedulerGroup*
remove_from_group (GstOptSchedulerGroup *group, GstElement *element)
{
GST_DEBUG ("removing element \"%s\" from group %p", GST_ELEMENT_NAME (element), group);
g_assert (group != NULL);
g_assert (element != NULL);
g_assert (GST_ELEMENT_SCHED_GROUP (element) == group);
group->elements = g_slist_remove (group->elements, element);
group->num_elements--;
/* if the element was an entry point in the group, clear the group's
* entry point */
if (group->entry == element) {
group->entry = NULL;
}
GST_ELEMENT_SCHED_GROUP (element) = NULL;
gst_object_unref (GST_OBJECT (element));
if (group->num_elements == 0) {
GST_LOG ("group %p is now empty", group);
/* don't know in what case group->chain would be NULL, but putting this here
in deference to 0.8 -- remove me in 0.9 */
if (group->chain) {
GST_LOG ("removing group %p from its chain", group);
chain_group_set_enabled (group->chain, group, FALSE);
remove_from_chain (group->chain, group);
}
}
group = unref_group (group);
return group;
}
/* FIXME need to check if the groups are of the same type -- otherwise need to
setup the scheduler again, if it is setup */
static GstOptSchedulerGroup*
create_group (GstOptSchedulerChain *chain, GstElement *element)
merge_groups (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
{
GstOptSchedulerGroup *group;
g_assert (group1 != NULL);
group = g_new0 (GstOptSchedulerGroup, 1);
GST_LOG ("new group %p", group);
group->refcount = 1;
group->flags = GST_OPT_SCHEDULER_GROUP_DISABLED;
GST_DEBUG ("merging groups %p and %p", group1, group2);
add_to_group (group, element);
add_to_chain (chain, group);
if (group1 == group2 || group2 == NULL)
return group1;
return group;
while (group2 && group2->elements) {
GstElement *element = (GstElement *)group2->elements->data;
group2 = remove_from_group (group2, element);
add_to_group (group1, element);
}
return group1;
}
/* setup the scheduler context for a group. The right schedule function
* is selected based on the group type and cothreads are created if
* needed */
static void
setup_group_scheduler (GstOptScheduler *osched, GstOptSchedulerGroup *group)
{
GroupScheduleFunction wrapper;
wrapper = unknown_group_schedule_function;
/* figure out the wrapper function for this group */
if (group->type == GST_OPT_SCHEDULER_GROUP_GET)
wrapper = get_group_schedule_function;
else if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
wrapper = loop_group_schedule_function;
#ifdef USE_COTHREADS
if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
do_cothread_create (group->cothread, osched->context,
(cothread_func) wrapper, 0, (char **) group);
}
else {
do_cothread_setfunc (group->cothread, osched->context,
(cothread_func) wrapper, 0, (char **) group);
}
#else
group->schedulefunc = wrapper;
group->argc = 0;
group->argv = (char **) group;
#endif
group->flags |= GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
}
static void
@ -693,71 +873,6 @@ destroy_group_scheduler (GstOptSchedulerGroup *group)
group->flags &= ~GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
}
static void
destroy_group (GstOptSchedulerGroup *group)
{
GST_LOG ("destroy group %p", group);
g_assert (group != NULL);
g_assert (group->elements == NULL);
remove_from_chain (group->chain, group);
if (group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)
destroy_group_scheduler (group);
g_free (group);
}
static GstOptSchedulerGroup*
remove_from_group (GstOptSchedulerGroup *group, GstElement *element)
{
GST_DEBUG ("removing element \"%s\" from group %p", GST_ELEMENT_NAME (element), group);
g_assert (group != NULL);
g_assert (element != NULL);
g_assert (GST_ELEMENT_SCHED_GROUP (element) == group);
group->elements = g_slist_remove (group->elements, element);
group->num_elements--;
/* if the element was an entry point in the group, clear the group's
* entry point */
if (group->entry == element) {
group->entry = NULL;
}
GST_ELEMENT_SCHED_GROUP (element) = NULL;
gst_object_unref (GST_OBJECT (element));
if (group->num_elements == 0) {
group = unref_group (group);
}
group = unref_group (group);
return group;
}
static GstOptSchedulerGroup*
merge_groups (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
{
g_assert (group1 != NULL);
GST_DEBUG ("merging groups %p and %p", group1, group2);
if (group1 == group2 || group2 == NULL)
return group1;
while (group2 && group2->elements) {
GstElement *element = (GstElement *)group2->elements->data;
group2 = remove_from_group (group2, element);
add_to_group (group1, element);
}
return group1;
}
static void
group_error_handler (GstOptSchedulerGroup *group)
{
@ -779,6 +894,11 @@ group_element_set_enabled (GstOptSchedulerGroup *group, GstElement *element, gbo
GST_LOG ("request to %d element %s in group %p, have %d elements enabled out of %d",
enabled, GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
/* Note that if an unlinked PLAYING element is added to a bin, we have to
create a new group to hold the element, and this function will be called
before the group is added to the chain. Thus we have a valid case for
group->chain==NULL. */
if (enabled) {
if (group->num_enabled < group->num_elements)
group->num_enabled++;
@ -787,10 +907,15 @@ group_element_set_enabled (GstOptSchedulerGroup *group, GstElement *element, gbo
GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
if (group->num_enabled == group->num_elements) {
if (!group->chain) {
GST_DEBUG ("enable chainless group %p", group);
GST_OPT_SCHEDULER_GROUP_ENABLE (group);
} else {
GST_LOG ("enable group %p", group);
chain_group_set_enabled (group->chain, group, TRUE);
}
}
}
else {
if (group->num_enabled > 0)
group->num_enabled--;
@ -799,11 +924,16 @@ group_element_set_enabled (GstOptSchedulerGroup *group, GstElement *element, gbo
GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
if (group->num_enabled == 0) {
if (!group->chain) {
GST_DEBUG ("disable chainless group %p", group);
GST_OPT_SCHEDULER_GROUP_DISABLE (group);
} else {
GST_LOG ("disable group %p", group);
chain_group_set_enabled (group->chain, group, FALSE);
}
}
}
}
/* a group is scheduled by doing a cothread switch to it or
* by calling the schedule function. In the non-cothread case
@ -855,9 +985,12 @@ schedule_group (GstOptSchedulerGroup *group)
static void
gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
{
GST_LOG_OBJECT (osched, "entering scheduler run queue recursion %d %d",
GST_LOG_OBJECT (osched, "running queue: %d groups, recursed %d times",
g_list_length (osched->runqueue),
osched->recursion, g_list_length (osched->runqueue));
/* note that we have a ref on each group on the queue (unref after running) */
/* make sure we don't exceed max_recursion */
if (osched->recursion > osched->max_recursion) {
osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
@ -872,7 +1005,7 @@ gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
group = (GstOptSchedulerGroup *) osched->runqueue->data;
/* runqueue hols refcount to group */
/* runqueue holds refcount to group */
osched->runqueue = g_list_remove (osched->runqueue, group);
GST_LOG_OBJECT (osched, "scheduling group %p", group);
@ -945,7 +1078,7 @@ get_group_schedule_function (int argc, char *argv[])
GstElement *entry = group->entry;
const GList *pads = gst_element_get_pad_list (entry);
GST_LOG ("get wrapper of group %p", group);
GST_LOG ("executing get-based group %p", group);
group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
@ -986,7 +1119,7 @@ loop_group_schedule_function (int argc, char *argv[])
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
GstElement *entry = group->entry;
GST_LOG ("loop wrapper of group %p", group);
GST_LOG ("executing loop-based group %p", group);
group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
@ -1024,26 +1157,31 @@ gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstData *data)
{
GstOptSchedulerGroup *group;
GstOptScheduler *osched;
GST_LOG ("loop wrapper, putting buffer in bufpen");
GstRealPad *peer;
group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (sinkpad));
osched = group->chain->sched;
peer = GST_RPAD_PEER (sinkpad);
GST_LOG ("chain handler for loop-based pad %" GST_PTR_FORMAT, sinkpad);
#ifdef USE_COTHREADS
if (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad))) {
if (GST_PAD_BUFLIST (peer)) {
g_warning ("deadlock detected, disabling group %p", group);
group_error_handler (group);
}
else {
GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), data);
GST_LOG ("queueing data %p on %s:%s's bufpen", data,
GST_DEBUG_PAD_NAME (peer));
GST_PAD_BUFLIST (peer) = g_list_append (GST_PAD_BUFLIST (peer), data);
schedule_group (group);
}
#else
GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), data);
GST_LOG ("queueing data %p on %s:%s's bufpen", data,
GST_DEBUG_PAD_NAME (peer));
GST_PAD_BUFLIST (peer) = g_list_append (GST_PAD_BUFLIST (peer), data);
if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
GST_LOG ("adding %p to runqueue", group);
GST_LOG ("adding group %p to runqueue", group);
if (!g_list_find (osched->runqueue, group))
{
ref_group (group);
@ -1052,8 +1190,8 @@ gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstData *data)
}
#endif
GST_LOG ("after loop wrapper buflist %d",
g_list_length (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad))));
GST_LOG ("%d buffers left on %s:%s's bufpen after chain handler",
g_list_length (GST_PAD_BUFLIST (peer)));
}
/* this function is called by a loop based element that performs a
@ -1067,15 +1205,14 @@ gst_opt_scheduler_get_wrapper (GstPad *srcpad)
GstOptScheduler *osched;
gboolean disabled;
GST_LOG ("get wrapper, removing buffer from bufpen");
GST_LOG ("get handler for %" GST_PTR_FORMAT, srcpad);
/* first try to grab a queued buffer */
if (GST_PAD_BUFLIST (srcpad)) {
data = GST_PAD_BUFLIST (srcpad)->data;
GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), data);
GST_LOG ("get wrapper, returning queued data %d",
g_list_length (GST_PAD_BUFLIST (srcpad)));
GST_LOG ("returning popped queued data %p", data);
return data;
}
@ -1087,6 +1224,7 @@ gst_opt_scheduler_get_wrapper (GstPad *srcpad)
disabled = FALSE;
do {
GST_LOG ("scheduling upstream group %p to fill bufpen", group);
#ifdef USE_COTHREADS
schedule_group (group);
#else
@ -1099,9 +1237,9 @@ gst_opt_scheduler_get_wrapper (GstPad *srcpad)
osched->runqueue = g_list_append (osched->runqueue, group);
}
GST_LOG_OBJECT (osched, "recursing into scheduler group %p", group);
GST_LOG ("recursing into scheduler group %p", group);
gst_opt_scheduler_schedule_run_queue (osched);
GST_LOG_OBJECT (osched, "return from recurse group %p", group);
GST_LOG ("return from recurse group %p", group);
/* if the other group was disabled we might have to break out of the loop */
disabled = GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group);
@ -1139,7 +1277,7 @@ gst_opt_scheduler_get_wrapper (GstPad *srcpad)
}
while (data == NULL);
GST_LOG ("get wrapper, returning data %p, queue length %d",
GST_LOG ("get handler, returning data %p, queue length %d",
data, g_list_length (GST_PAD_BUFLIST (srcpad)));
return data;
@ -1207,44 +1345,9 @@ gst_opt_scheduler_event_wrapper (GstPad *srcpad, GstEvent *event)
return GST_RPAD_EVENTFUNC (srcpad) (srcpad, event);
}
/* setup the scheduler context for a group. The right schedule function
* is selected based on the group type and cothreads are created if
* needed */
static void
setup_group_scheduler (GstOptScheduler *osched, GstOptSchedulerGroup *group)
{
GroupScheduleFunction wrapper;
wrapper = unknown_group_schedule_function;
/* figure out the wrapper function for this group */
if (group->type == GST_OPT_SCHEDULER_GROUP_GET)
wrapper = get_group_schedule_function;
else if (group->type == GST_OPT_SCHEDULER_GROUP_LOOP)
wrapper = loop_group_schedule_function;
#ifdef USE_COTHREADS
if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
do_cothread_create (group->cothread, osched->context,
(cothread_func) wrapper, 0, (char **) group);
}
else {
do_cothread_setfunc (group->cothread, osched->context,
(cothread_func) wrapper, 0, (char **) group);
}
#else
group->schedulefunc = wrapper;
group->argc = 0;
group->argv = (char **) group;
#endif
group->flags |= GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
}
static GstElementStateReturn
gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition)
{
GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
GstOptSchedulerGroup *group;
GstElementStateReturn res = GST_STATE_SUCCESS;
@ -1288,7 +1391,6 @@ gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gi
}
/* else construct the scheduling context of this group and enable it */
else {
setup_group_scheduler (osched, group);
group_element_set_enabled (group, element, TRUE);
}
break;
@ -1340,7 +1442,8 @@ get_group (GstElement *element, GstOptSchedulerGroup **group)
* will also merge the chains.
*/
static GstOptSchedulerGroup*
group_elements (GstOptScheduler *osched, GstElement *element1, GstElement *element2)
group_elements (GstOptScheduler *osched, GstElement *element1, GstElement *element2,
GstOptSchedulerGroupType type)
{
GstOptSchedulerGroup *group1, *group2, *group = NULL;
@ -1356,7 +1459,7 @@ group_elements (GstOptScheduler *osched, GstElement *element1, GstElement *eleme
GST_ELEMENT_NAME (element1), GST_ELEMENT_NAME (element2));
chain = create_chain (osched);
group = create_group (chain, element1);
group = create_group (chain, element1, type);
add_to_group (group, element2);
}
/* the first element has a group */
@ -1548,9 +1651,8 @@ gst_opt_scheduler_add_element (GstScheduler *sched, GstElement *element)
chain = create_chain (osched);
group = create_group (chain, element);
group = create_group (chain, element, GST_OPT_SCHEDULER_GROUP_LOOP);
group->entry = element;
group->type = GST_OPT_SCHEDULER_GROUP_LOOP;
GST_LOG ("added element \"%s\" as loop based entry", GST_ELEMENT_NAME (element));
}
@ -1733,13 +1835,13 @@ gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad
/* the two elements should be put into the same group,
* this also means that they are in the same chain automatically */
group = group_elements (osched, element1, element2);
group = group_elements (osched, element1, element2,
GST_OPT_SCHEDULER_GROUP_GET);
/* if there is not yet an entry in the group, select the source
* element as the entry point */
if (!group->entry) {
group->entry = element1;
group->type = GST_OPT_SCHEDULER_GROUP_GET;
GST_DEBUG ("setting \"%s\" as entry point of _get-based group %p",
GST_ELEMENT_NAME (element1), group);
@ -1759,7 +1861,7 @@ gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad
* this also means that they are in the same chain automatically,
* in case of a loop-based element1, there will be a group for element1 and
* element2 will be added to it. */
group_elements (osched, element1, element2);
group_elements (osched, element1, element2, GST_OPT_SCHEDULER_GROUP_LOOP);
break;
case GST_OPT_GET_TO_LOOP:
GST_LOG ("get to loop based link");
@ -1770,7 +1872,7 @@ gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad
* this also means that they are in the same chain automatically,
* element2 is loop-based so it already has a group where element1
* will be added to */
group_elements (osched, element1, element2);
group_elements (osched, element1, element2, GST_OPT_SCHEDULER_GROUP_LOOP);
break;
case GST_OPT_CHAIN_TO_LOOP:
case GST_OPT_LOOP_TO_LOOP:
@ -1796,7 +1898,8 @@ gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad
/* create a new group for element1 as it cannot be merged into another group
* here. we create the group in the same chain as the loop-based element. */
GST_DEBUG ("creating new group for element %s", GST_ELEMENT_NAME (element1));
group1 = create_group (group2->chain, element1);
group1 = create_group (group2->chain, element1,
GST_OPT_SCHEDULER_GROUP_LOOP);
}
else {
/* both elements are already in a group, make sure they are added to
@ -2010,7 +2113,22 @@ gst_opt_scheduler_pad_unlink (GstScheduler *sched, GstPad *srcpad, GstPad *sinkp
group->type == GST_OPT_SCHEDULER_GROUP_LOOP) &&
!GST_ELEMENT_IS_DECOUPLED (element1))
{
GList *l;
GstPad *pad;
GstOptSchedulerGroup *peer_group;
GST_LOG ("element1 is separated from the group");
/* have to decrement links to other groups from other pads */
for (l=element1->pads; l; l=l->next) {
pad = (GstPad*)l->data;
if (GST_IS_REAL_PAD (pad) && GST_PAD_PEER (pad)) {
get_group (GST_PAD_PARENT (GST_PAD_PEER (pad)), &peer_group);
if (peer_group && peer_group != group)
group_dec_link (group, peer_group);
}
}
remove_from_group (group, element1);
}
else {
@ -2024,7 +2142,22 @@ gst_opt_scheduler_pad_unlink (GstScheduler *sched, GstPad *srcpad, GstPad *sinkp
group->type == GST_OPT_SCHEDULER_GROUP_LOOP) &&
!GST_ELEMENT_IS_DECOUPLED (element2))
{
GList *l;
GstPad *pad;
GstOptSchedulerGroup *peer_group;
GST_LOG ("element2 is separated from the group");
/* have to decrement links to other groups from other pads */
for (l=element2->pads; l; l=l->next) {
pad = (GstPad*)l->data;
if (GST_IS_REAL_PAD (pad) && GST_PAD_PEER (pad)) {
get_group (GST_PAD_PARENT (GST_PAD_PEER (pad)), &peer_group);
if (peer_group && peer_group != group)
group_dec_link (group, peer_group);
}
}
remove_from_group (group, element2);
}
else {
@ -2052,7 +2185,7 @@ gst_opt_scheduler_iterate (GstScheduler *sched)
osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
GST_DEBUG ("iterating scheduler %p", sched);
GST_DEBUG_OBJECT (sched, "iterating");
while (iterations) {
gboolean scheduled = FALSE;
@ -2066,6 +2199,7 @@ gst_opt_scheduler_iterate (GstScheduler *sched)
ref_chain (chain);
/* if the chain is not disabled, schedule it */
if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
GST_LOG ("scheduling chain %p", chain);
schedule_chain (chain);
scheduled = TRUE;
}
@ -2080,8 +2214,6 @@ gst_opt_scheduler_iterate (GstScheduler *sched)
osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
}
GST_LOG_OBJECT (sched, "iterate scheduled %p", chain);
chains = g_slist_next (chains);
unref_chain (chain);
}