gst/schedulers/gstoptimalscheduler.c: Added lock to protect scheduler data structures.

Original commit message from CVS:
* gst/schedulers/gstoptimalscheduler.c:
(gst_opt_scheduler_class_init), (gst_opt_scheduler_init),
(gst_opt_scheduler_finalize), (remove_decoupled), (schedule_chain),
(get_invalid_call), (chain_invalid_call),
(get_group_schedule_function), (loop_group_schedule_function),
(gst_opt_scheduler_loop_wrapper), (gst_opt_scheduler_get_wrapper),
(gst_opt_scheduler_state_transition),
(gst_opt_scheduler_add_element),
(gst_opt_scheduler_remove_element), (gst_opt_scheduler_interrupt),
(gst_opt_scheduler_error), (gst_opt_scheduler_pad_link),
(gst_opt_scheduler_pad_unlink), (gst_opt_scheduler_iterate),
(gst_opt_scheduler_show):
Added lock to protect scheduler data structures.
This commit is contained in:
Wim Taymans 2005-02-02 15:31:06 +00:00
parent 4bdd55b8d1
commit f8114cc178
2 changed files with 183 additions and 21 deletions

View file

@ -1,3 +1,19 @@
2005-02-02 Wim Taymans <wim@fluendo.com>
* gst/schedulers/gstoptimalscheduler.c:
(gst_opt_scheduler_class_init), (gst_opt_scheduler_init),
(gst_opt_scheduler_finalize), (remove_decoupled), (schedule_chain),
(get_invalid_call), (chain_invalid_call),
(get_group_schedule_function), (loop_group_schedule_function),
(gst_opt_scheduler_loop_wrapper), (gst_opt_scheduler_get_wrapper),
(gst_opt_scheduler_state_transition),
(gst_opt_scheduler_add_element),
(gst_opt_scheduler_remove_element), (gst_opt_scheduler_interrupt),
(gst_opt_scheduler_error), (gst_opt_scheduler_pad_link),
(gst_opt_scheduler_pad_unlink), (gst_opt_scheduler_iterate),
(gst_opt_scheduler_show):
Added lock to protect scheduler data structures.
2005-02-01 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
* testsuite/threads/threadi.c: (cb_data):

View file

@ -73,10 +73,15 @@ typedef enum
}
GstOptSchedulerState;
#define GST_OPT_LOCK(sched) (g_static_rec_mutex_lock (&((GstOptScheduler *)sched)->lock))
#define GST_OPT_UNLOCK(sched) (g_static_rec_mutex_unlock (&((GstOptScheduler *)sched)->lock))
struct _GstOptScheduler
{
GstScheduler parent;
GStaticRecMutex lock;
GstOptSchedulerState state;
#ifdef USE_COTHREADS
@ -254,7 +259,7 @@ static void chain_recursively_migrate_group (GstOptSchedulerChain * chain,
GstOptSchedulerGroup * group);
static void chain_group_set_enabled (GstOptSchedulerChain * chain,
GstOptSchedulerGroup * group, gboolean enabled);
static void schedule_chain (GstOptSchedulerChain * chain);
static gboolean schedule_chain (GstOptSchedulerChain * chain);
/*
@ -323,6 +328,7 @@ static void gst_opt_scheduler_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_opt_scheduler_dispose (GObject * object);
static void gst_opt_scheduler_finalize (GObject * object);
static void gst_opt_scheduler_setup (GstScheduler * sched);
static void gst_opt_scheduler_reset (GstScheduler * sched);
@ -391,6 +397,7 @@ gst_opt_scheduler_class_init (GstOptSchedulerClass * klass)
gobject_class->get_property =
GST_DEBUG_FUNCPTR (gst_opt_scheduler_get_property);
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_opt_scheduler_dispose);
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_opt_scheduler_finalize);
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ITERATIONS,
g_param_spec_int ("iterations", "Iterations",
@ -431,6 +438,8 @@ gst_opt_scheduler_class_init (GstOptSchedulerClass * klass)
static void
gst_opt_scheduler_init (GstOptScheduler * scheduler)
{
g_static_rec_mutex_init (&scheduler->lock);
scheduler->elements = NULL;
scheduler->iterations = 1;
scheduler->max_recursion = 100;
@ -445,6 +454,16 @@ gst_opt_scheduler_dispose (GObject * object)
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_opt_scheduler_finalize (GObject * object)
{
GstOptScheduler *osched = GST_OPT_SCHEDULER (object);
g_static_rec_mutex_free (&osched->lock);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static gboolean
plugin_init (GstPlugin * plugin)
{
@ -897,6 +916,46 @@ add_to_group (GstOptSchedulerGroup * group, GstElement * element,
return group;
}
/* we need to remove a decoupled element from the scheduler, this
* involves finding all the groups that have this element as an
* entry point and disabling them. */
static void
remove_decoupled (GstScheduler * sched, GstElement * element)
{
GSList *chains;
GList *schedulers;
GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
GST_DEBUG_OBJECT (sched, "removing decoupled element \"%s\"",
GST_OBJECT_NAME (element));
for (chains = osched->chains; chains; chains = g_slist_next (chains)) {
GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
GSList *groups;
for (groups = chain->groups; groups; groups = g_slist_next (groups)) {
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
if (group->entry) {
GST_DEBUG_OBJECT (sched, "group %p, entry %s", group,
GST_STR_NULL (GST_OBJECT_NAME (group->entry)));
}
if (group->entry == element) {
GST_DEBUG ("clearing element %p \"%s\" as entry from group %p",
element, GST_ELEMENT_NAME (element), group);
group->entry = NULL;
group->type = GST_OPT_SCHEDULER_GROUP_UNKNOWN;
}
}
}
/* and remove from any child scheduler */
for (schedulers = sched->schedulers; schedulers;
schedulers = g_list_next (schedulers)) {
remove_decoupled (GST_SCHEDULER (schedulers->data), element);
}
}
static GstOptSchedulerGroup *
remove_from_group (GstOptSchedulerGroup * group, GstElement * element)
{
@ -1235,11 +1294,12 @@ gst_opt_scheduler_schedule_run_queue (GstOptScheduler * osched,
#endif
/* a chain is scheduled by picking the first active group and scheduling it */
static void
static gboolean
schedule_chain (GstOptSchedulerChain * chain)
{
GSList *groups;
GstOptScheduler *osched;
gboolean scheduled = FALSE;
osched = chain->sched;
@ -1249,6 +1309,7 @@ schedule_chain (GstOptSchedulerChain * chain)
sort_chain (chain);
GST_OPT_SCHEDULER_CHAIN_SET_CLEAN (chain);
/* FIXME handle the case where the groups change during scheduling */
groups = chain->groups;
while (groups) {
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
@ -1268,6 +1329,8 @@ schedule_chain (GstOptSchedulerChain * chain)
gst_opt_scheduler_schedule_run_queue (osched, NULL);
#endif
scheduled = TRUE;
GST_LOG ("done scheduling group %p in chain %p", group, chain);
unref_group (group);
break;
@ -1275,6 +1338,31 @@ schedule_chain (GstOptSchedulerChain * chain)
groups = g_slist_next (groups);
}
return scheduled;
}
/* this function is inserted in the gethandler when you are not
* supposed to call _pull on the pad. */
static GstData *
get_invalid_call (GstPad * pad)
{
GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
("get on pad %s:%s but the peer is operating chain based and so is not "
"allowed to pull, fix the element.", GST_DEBUG_PAD_NAME (pad)));
return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
}
/* this function is inserted in the chainhandler when you are not
* supposed to call _push on the pad. */
static void
chain_invalid_call (GstPad * pad, GstData * data)
{
GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
("chain on pad %s:%s but the pad is get based",
GST_DEBUG_PAD_NAME (pad)));
gst_data_unref (data);
}
/* a get-based group is scheduled by getting a buffer from the get based
@ -1287,16 +1375,21 @@ get_group_schedule_function (int argc, char *argv[])
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
GstElement *entry = group->entry;
const GList *pads;
GstOptScheduler *osched;
/* what if the entry point disappeared? */
if (!entry)
if (entry == NULL || group->chain == NULL)
return 0;
osched = group->chain->sched;
pads = gst_element_get_pad_list (entry);
GST_LOG ("executing get-based group %p", group);
group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;
GST_OPT_UNLOCK (osched);
while (pads) {
GstData *data;
GstPad *pad = GST_PAD (pads->data);
@ -1320,6 +1413,7 @@ get_group_schedule_function (int argc, char *argv[])
gst_pad_push (pad, data);
}
}
GST_OPT_LOCK (osched);
group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
@ -1329,7 +1423,8 @@ get_group_schedule_function (int argc, char *argv[])
/* a loop-based group is scheduled by calling the loop function
* on the entry point.
* We also set the running flag on this group for as long as this
* function is running. */
* function is running.
* This function should be called with the scheduler lock held. */
static int
loop_group_schedule_function (int argc, char *argv[])
{
@ -1343,9 +1438,16 @@ loop_group_schedule_function (int argc, char *argv[])
GST_DEBUG ("calling loopfunc of element %s in group %p",
GST_ELEMENT_NAME (entry), group);
if (entry->loopfunc)
if (group->chain == NULL)
return 0;
if (entry->loopfunc) {
GstOptScheduler *osched = group->chain->sched;
GST_OPT_UNLOCK (osched);
entry->loopfunc (entry);
else
GST_OPT_LOCK (osched);
} else
group_error_handler (group);
GST_LOG ("returned from loopfunc of element %s in group %p",
@ -1386,6 +1488,7 @@ gst_opt_scheduler_loop_wrapper (GstPad * sinkpad, GstData * data)
GST_LOG ("chain handler for loop-based pad %" GST_PTR_FORMAT, sinkpad);
GST_OPT_LOCK (osched);
#ifdef USE_COTHREADS
if (GST_PAD_DATALIST (peer)) {
g_warning ("deadlock detected, disabling group %p", group);
@ -1408,6 +1511,7 @@ gst_opt_scheduler_loop_wrapper (GstPad * sinkpad, GstData * data)
}
}
#endif
GST_OPT_UNLOCK (osched);
GST_LOG ("%d datas left on %s:%s's datapen after chain handler",
g_list_length (GST_PAD_DATALIST (peer)), GST_DEBUG_PAD_NAME (peer));
@ -1449,6 +1553,7 @@ gst_opt_scheduler_get_wrapper (GstPad * srcpad)
data = NULL;
disabled = FALSE;
GST_OPT_LOCK (osched);
do {
GST_LOG ("scheduling upstream group %p to fill datapen", group);
#ifdef USE_COTHREADS
@ -1479,7 +1584,8 @@ gst_opt_scheduler_get_wrapper (GstPad * srcpad)
* this is not allowed in the optimal scheduler (yet) */
g_warning ("deadlock detected, disabling group %p", group);
group_error_handler (group);
return GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
data = GST_DATA (gst_event_new (GST_EVENT_INTERRUPT));
goto done;
}
#endif
/* if the scheduler interrupted, make sure we send an INTERRUPTED event
@ -1500,6 +1606,11 @@ gst_opt_scheduler_get_wrapper (GstPad * srcpad)
}
while (data == NULL);
#ifndef USE_COTHREADS
done:
#endif
GST_OPT_UNLOCK (osched);
GST_LOG ("get handler, returning data %p, queue length %d",
data, g_list_length (GST_PAD_DATALIST (srcpad)));
@ -1560,6 +1671,7 @@ gst_opt_scheduler_state_transition (GstScheduler * sched, GstElement * element,
GST_ELEMENT_NAME (element) ? GST_ELEMENT_NAME (element) : "(null)",
transition);
GST_OPT_LOCK (sched);
/* we check the state of the managing pipeline here */
if (GST_IS_BIN (element)) {
if (GST_SCHEDULER_PARENT (sched) == element) {
@ -1579,12 +1691,14 @@ gst_opt_scheduler_state_transition (GstScheduler * sched, GstElement * element,
GST_LOG ("no interesting state change, doing nothing");
}
}
return res;
goto done;
}
/* we don't care about decoupled elements after this */
if (GST_ELEMENT_IS_DECOUPLED (element))
return GST_STATE_SUCCESS;
if (GST_ELEMENT_IS_DECOUPLED (element)) {
res = GST_STATE_SUCCESS;
goto done;
}
/* get the group of the element */
group = GST_ELEMENT_SCHED_GROUP (element);
@ -1617,6 +1731,11 @@ gst_opt_scheduler_state_transition (GstScheduler * sched, GstElement * element,
break;
}
//gst_scheduler_show (sched);
done:
GST_OPT_UNLOCK (sched);
return res;
}
@ -1889,10 +2008,12 @@ gst_opt_scheduler_add_element (GstScheduler * sched, GstElement * element)
GstOptSchedulerGroup *group;
GstOptSchedulerChain *chain;
GST_OPT_LOCK (sched);
chain = create_chain (osched);
group = create_group (chain, element, GST_OPT_SCHEDULER_GROUP_LOOP);
group->entry = element;
GST_OPT_UNLOCK (sched);
GST_LOG ("added element \"%s\" as loop based entry",
GST_ELEMENT_NAME (element));
@ -1907,10 +2028,13 @@ gst_opt_scheduler_remove_element (GstScheduler * sched, GstElement * element)
GST_DEBUG_OBJECT (sched, "removing element \"%s\"",
GST_OBJECT_NAME (element));
GST_OPT_LOCK (sched);
/* decoupled elements are not added to the scheduler lists and should therefore
* not be removed */
if (GST_ELEMENT_IS_DECOUPLED (element))
return;
if (GST_ELEMENT_IS_DECOUPLED (element)) {
remove_decoupled (sched, element);
goto done;
}
/* the element is guaranteed to live in it's own group/chain now */
get_group (element, &group);
@ -1920,6 +2044,9 @@ gst_opt_scheduler_remove_element (GstScheduler * sched, GstElement * element)
g_free (GST_ELEMENT (element)->sched_private);
GST_ELEMENT (element)->sched_private = NULL;
done:
GST_OPT_UNLOCK (sched);
}
static gboolean
@ -1956,8 +2083,10 @@ gst_opt_scheduler_interrupt (GstScheduler * sched, GstElement * element)
{
GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
GST_OPT_LOCK (sched);
GST_INFO ("scheduler set interrupted state");
osched->state = GST_OPT_SCHEDULER_STATE_INTERRUPTED;
GST_OPT_UNLOCK (sched);
}
return TRUE;
#endif
@ -1969,11 +2098,13 @@ gst_opt_scheduler_error (GstScheduler * sched, GstElement * element)
GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
GstOptSchedulerGroup *group;
GST_OPT_LOCK (sched);
get_group (element, &group);
if (group)
group_error_handler (group);
osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
GST_OPT_UNLOCK (sched);
}
/* link pads, merge groups and chains */
@ -1991,6 +2122,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
src_element = GST_PAD_PARENT (srcpad);
sink_element = GST_PAD_PARENT (sinkpad);
GST_OPT_LOCK (sched);
/* first we need to figure out what type of link we're dealing
* with */
if (src_element->loopfunc && sink_element->loopfunc)
@ -2014,7 +2146,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
if (group->entry != sink_element) {
g_error
("internal error: cannot schedule get to loop in multi-loop based group");
return;
goto done;
}
}
} else
@ -2033,7 +2165,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
if (group->entry != src_element) {
g_error ("internal error: cannot schedule get to chain "
"with mixed loop/chain based group");
return;
goto done;
}
}
} else
@ -2055,7 +2187,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
GST_LOG ("get to chain based link");
/* setup get/chain handlers */
GST_RPAD_GETHANDLER (srcpad) = gst_pad_call_get_function;
GST_RPAD_GETHANDLER (srcpad) = get_invalid_call;
GST_RPAD_CHAINHANDLER (sinkpad) = gst_pad_call_chain_function;
/* the two elements should be put into the same group,
@ -2081,6 +2213,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
case GST_OPT_CHAIN_TO_CHAIN:
GST_LOG ("loop/chain to chain based link");
GST_RPAD_GETHANDLER (srcpad) = get_invalid_call;
GST_RPAD_CHAINHANDLER (sinkpad) = gst_pad_call_chain_function;
/* the two elements should be put into the same group, this also means
@ -2095,6 +2228,7 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
GST_LOG ("get to loop based link");
GST_RPAD_GETHANDLER (srcpad) = gst_pad_call_get_function;
GST_RPAD_CHAINHANDLER (sinkpad) = chain_invalid_call;
/* the two elements should be put into the same group, this also means
* that they are in the same chain automatically, sink_element is
@ -2110,8 +2244,8 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
GST_LOG ("chain/loop to loop based link");
GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_loop_wrapper;
GST_RPAD_GETHANDLER (srcpad) = gst_opt_scheduler_get_wrapper;
GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_loop_wrapper;
/* events on the srcpad have to be intercepted as we might need to
* flush the buffer lists, so override the given eventfunc */
GST_RPAD_EVENTHANDLER (srcpad) = gst_opt_scheduler_event_wrapper;
@ -2145,6 +2279,8 @@ gst_opt_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
g_error ("(internal error) invalid element link, what are you doing?");
break;
}
done:
GST_OPT_UNLOCK (sched);
}
static void
@ -2513,6 +2649,7 @@ gst_opt_scheduler_pad_unlink (GstScheduler * sched,
src_element = GST_PAD_PARENT (srcpad);
sink_element = GST_PAD_PARENT (sinkpad);
GST_OPT_LOCK (sched);
get_group (src_element, &group1);
get_group (sink_element, &group2);
@ -2530,7 +2667,7 @@ gst_opt_scheduler_pad_unlink (GstScheduler * sched,
if (!group1 || !group2) {
GST_LOG
("one (or both) of the elements is not in a group, not interesting");
return;
goto done;
}
/* easy part, groups are different */
@ -2614,6 +2751,8 @@ gst_opt_scheduler_pad_unlink (GstScheduler * sched,
}
/* at this point the group can be freed and gone, so don't touch */
}
done:
GST_OPT_UNLOCK (sched);
}
/* a scheduler iteration is done by looping and scheduling the active chains */
@ -2622,7 +2761,10 @@ gst_opt_scheduler_iterate (GstScheduler * sched)
{
GstSchedulerState state = GST_SCHEDULER_STATE_STOPPED;
GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
gint iterations = osched->iterations;
gint iterations;
GST_OPT_LOCK (sched);
iterations = osched->iterations;
osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
@ -2632,7 +2774,8 @@ gst_opt_scheduler_iterate (GstScheduler * sched)
gboolean scheduled = FALSE;
GSList *chains;
/* we have to schedule each of the scheduler chains now */
/* we have to schedule each of the scheduler chains now.
* FIXME handle the case where the chains change during iterations. */
chains = osched->chains;
while (chains) {
GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
@ -2641,8 +2784,7 @@ gst_opt_scheduler_iterate (GstScheduler * sched)
/* if the chain is not disabled, schedule it */
if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
GST_LOG ("scheduling chain %p", chain);
schedule_chain (chain);
scheduled = TRUE;
scheduled = schedule_chain (chain);
GST_LOG ("scheduled chain %p", chain);
} else {
GST_LOG ("not scheduling disabled chain %p", chain);
@ -2681,6 +2823,7 @@ gst_opt_scheduler_iterate (GstScheduler * sched)
if (iterations > 0)
iterations--;
}
GST_OPT_UNLOCK (sched);
return state;
}
@ -2692,6 +2835,8 @@ gst_opt_scheduler_show (GstScheduler * sched)
GstOptScheduler *osched = GST_OPT_SCHEDULER (sched);
GSList *chains;
GST_OPT_LOCK (sched);
g_print ("iterations: %d\n", osched->iterations);
g_print ("max recursion: %d\n", osched->max_recursion);
@ -2739,6 +2884,7 @@ gst_opt_scheduler_show (GstScheduler * sched)
}
}
}
GST_OPT_UNLOCK (sched);
}
static void