Fix #109730, opt scheduler backport

Original commit message from CVS:
Fix #109730, opt scheduler backport
This commit is contained in:
Wim Taymans 2003-04-04 20:04:49 +00:00
parent 3d3d5be932
commit bd96e8b45a

View file

@ -101,6 +101,8 @@ typedef enum {
typedef struct _GstOptSchedulerChain GstOptSchedulerChain;
struct _GstOptSchedulerChain {
gint refcount;
GstOptScheduler *sched;
GstOptSchedulerChainFlags flags;
@ -144,6 +146,8 @@ struct _GstOptSchedulerGroup {
GstOptSchedulerGroupFlags flags; /* flags for this group */
GstOptSchedulerGroupType type; /* flags for this group */
gint refcount;
GSList *elements; /* elements of this group */
gint num_elements;
gint num_enabled;
@ -154,12 +158,25 @@ struct _GstOptSchedulerGroup {
#ifdef USE_COTHREADS
cothread *cothread; /* the cothread of this group */
#endif
#else
GroupScheduleFunction schedulefunc;
#endif
int argc;
char **argv;
};
/* some group operations */
static GstOptSchedulerGroup* ref_group (GstOptSchedulerGroup *group);
#ifndef USE_COTHREADS
static GstOptSchedulerGroup* ref_group_by_count (GstOptSchedulerGroup *group, gint count);
#endif
static GstOptSchedulerGroup* unref_group (GstOptSchedulerGroup *group);
static void destroy_group (GstOptSchedulerGroup *group);
static void group_element_set_enabled (GstOptSchedulerGroup *group,
GstElement *element, gboolean enabled);
static void chain_group_set_enabled (GstOptSchedulerChain *chain,
GstOptSchedulerGroup *group, gboolean enabled);
/*
* Scheduler private data for an element
*/
@ -206,7 +223,7 @@ static gboolean gst_opt_scheduler_yield (GstScheduler *sched, GstElement *ele
static gboolean gst_opt_scheduler_interrupt (GstScheduler *sched, GstElement *element);
static void gst_opt_scheduler_error (GstScheduler *sched, GstElement *element);
static void gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
static void gst_opt_scheduler_pad_unlink (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
static void gst_opt_scheduler_pad_unlink (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
static GstPad* gst_opt_scheduler_pad_select (GstScheduler *sched, GList *padlist);
static GstClockReturn gst_opt_scheduler_clock_wait (GstScheduler *sched, GstElement *element,
GstClockID id, GstClockTimeDiff *jitter);
@ -279,12 +296,16 @@ gst_opt_scheduler_class_init (GstOptSchedulerClass *klass)
gstscheduler_class->yield = GST_DEBUG_FUNCPTR (gst_opt_scheduler_yield);
gstscheduler_class->interrupt = GST_DEBUG_FUNCPTR (gst_opt_scheduler_interrupt);
gstscheduler_class->error = GST_DEBUG_FUNCPTR (gst_opt_scheduler_error);
gstscheduler_class->pad_link = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_link);
gstscheduler_class->pad_link = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_link);
gstscheduler_class->pad_unlink = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_unlink);
gstscheduler_class->pad_select = GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_select);
gstscheduler_class->clock_wait = GST_DEBUG_FUNCPTR (gst_opt_scheduler_clock_wait);
gstscheduler_class->iterate = GST_DEBUG_FUNCPTR (gst_opt_scheduler_iterate);
gstscheduler_class->show = GST_DEBUG_FUNCPTR (gst_opt_scheduler_show);
#ifdef USE_COTHREADS
do_cothreads_init(NULL);
#endif
}
static void
@ -308,9 +329,15 @@ plugin_init (GModule *module, GstPlugin *plugin)
gst_plugin_set_longname (plugin, "An optimal scheduler");
#ifdef USE_COTHREADS
factory = gst_scheduler_factory_new ("opt"COTHREADS_NAME,
"An optimal scheduler using "COTHREADS_NAME" cothreads",
gst_opt_scheduler_get_type());
#else
factory = gst_scheduler_factory_new ("opt",
"An optimal scheduler using no cothreads",
gst_opt_scheduler_get_type());
#endif
if (factory != NULL) {
gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory));
@ -330,46 +357,23 @@ GstPluginDesc plugin_desc = {
static void
delete_chain (GstOptSchedulerChain *chain)
destroy_chain (GstOptSchedulerChain *chain)
{
GSList *groups;
GstOptScheduler *osched;
GST_INFO (GST_CAT_SCHEDULING, "delete chain %p", chain);
GST_INFO (GST_CAT_SCHEDULING, "destroy chain %p", chain);
g_assert (chain->num_groups == 0);
g_assert (chain->groups == NULL);
osched = chain->sched;
osched->chains = g_slist_remove (osched->chains, chain);
groups = chain->groups;
while (groups) {
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
gst_object_unref (GST_OBJECT (osched));
/* clear all group's chain pointers, so they can never reference us again */
if (group->chain == chain)
group->chain = NULL;
groups = g_slist_next (groups);
}
g_slist_free (chain->groups);
g_free (chain);
}
static GstOptSchedulerChain*
add_to_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
{
GST_INFO (GST_CAT_SCHEDULING, "adding group %p to chain %p", group, chain);
g_assert (group->chain == NULL);
chain->groups = g_slist_prepend (chain->groups, group);
group->chain = chain;
chain->num_groups++;
return chain;
}
static GstOptSchedulerChain*
create_chain (GstOptScheduler *osched)
{
@ -377,7 +381,10 @@ create_chain (GstOptScheduler *osched)
chain = g_new0 (GstOptSchedulerChain, 1);
chain->sched = osched;
chain->refcount = 1;
chain->flags = GST_OPT_SCHEDULER_CHAIN_DISABLED;
gst_object_ref (GST_OBJECT (osched));
osched->chains = g_slist_prepend (osched->chains, chain);
GST_INFO (GST_CAT_SCHEDULING, "new chain %p", chain);
@ -385,6 +392,50 @@ create_chain (GstOptScheduler *osched)
return chain;
}
static GstOptSchedulerChain*
ref_chain (GstOptSchedulerChain *chain)
{
GST_INFO (GST_CAT_SCHEDULING, "ref chain %p %d->%d", chain,
chain->refcount, chain->refcount+1);
chain->refcount++;
return chain;
}
static GstOptSchedulerChain*
unref_chain (GstOptSchedulerChain *chain)
{
GST_INFO (GST_CAT_SCHEDULING, "unref chain %p %d->%d", chain,
chain->refcount, chain->refcount-1);
if (--chain->refcount == 0) {
destroy_chain (chain);
chain = NULL;
}
return chain;
}
static GstOptSchedulerChain*
add_to_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
{
GST_INFO (GST_CAT_SCHEDULING, "adding group %p to chain %p", group, chain);
g_assert (group->chain == NULL);
group = ref_group (group);
group->chain = ref_chain (chain);
chain->groups = g_slist_prepend (chain->groups, group);
chain->num_groups++;
if (GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group)) {
chain_group_set_enabled (chain, group, TRUE);
}
return chain;
}
static GstOptSchedulerChain*
remove_from_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
{
@ -396,18 +447,15 @@ remove_from_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
g_assert (group);
g_assert (group->chain == chain);
group->chain = NULL;
chain->groups = g_slist_remove (chain->groups, group);
chain->num_groups--;
unref_group (group);
group->chain = NULL;
if (chain->num_groups == 0) {
GST_INFO (GST_CAT_SCHEDULING, "chain %p is empty, removing", chain);
delete_chain (chain);
return NULL;
}
if (chain->num_groups == 0)
chain = unref_chain (chain);
chain = unref_chain (chain);
return chain;
}
@ -423,15 +471,26 @@ merge_chains (GstOptSchedulerChain *chain1, GstOptSchedulerChain *chain2)
if (chain1 == chain2 || chain2 == NULL)
return chain1;
ref_chain (chain2);
walk = chain2->groups;
while (walk) {
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
walk = g_slist_next (walk);
GST_INFO (GST_CAT_SCHEDULING, "reparenting group %p from chain %p to %p",
group, chain2, chain1);
group->chain = NULL;
add_to_chain (chain1, group);
chain2->num_groups--;
chain2 = unref_chain (chain2);
group->chain = ref_chain (chain1);
chain1->groups = g_slist_prepend (chain1->groups, group);
chain1->num_groups++;
}
delete_chain (chain2);
g_slist_free (chain2->groups);
chain2->groups = NULL;
unref_chain (chain2);
return chain1;
}
@ -476,6 +535,44 @@ chain_group_set_enabled (GstOptSchedulerChain *chain, GstOptSchedulerGroup *grou
}
}
static GstOptSchedulerGroup*
ref_group (GstOptSchedulerGroup *group)
{
GST_INFO (GST_CAT_SCHEDULING, "ref group %p %d->%d", group,
group->refcount, group->refcount+1);
group->refcount++;
return group;
}
#ifndef USE_COTHREADS
static GstOptSchedulerGroup*
ref_group_by_count (GstOptSchedulerGroup *group, gint count)
{
GST_INFO (GST_CAT_SCHEDULING, "ref group %p %d->%d", group,
group->refcount, group->refcount+count);
group->refcount += count;
return group;
}
#endif
static GstOptSchedulerGroup*
unref_group (GstOptSchedulerGroup *group)
{
GST_INFO (GST_CAT_SCHEDULING, "unref group %p %d->%d", group,
group->refcount, group->refcount-1);
if (--group->refcount == 1) {
destroy_group (group);
group = NULL;
}
return group;
}
static GstOptSchedulerGroup*
add_to_group (GstOptSchedulerGroup *group, GstElement *element)
{
@ -485,16 +582,22 @@ add_to_group (GstOptSchedulerGroup *group, GstElement *element)
GST_INFO (GST_CAT_SCHEDULING, "adding element \"%s\" to group %p", GST_ELEMENT_NAME (element), group);
if (GST_ELEMENT_IS_DECOUPLED (element)) {
GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" is decoupled, not adding to group %p", GST_ELEMENT_NAME (element), group);
GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" is decoupled, not adding to group %p",
GST_ELEMENT_NAME (element), group);
return group;
}
g_assert (GST_ELEMENT_SCHED_GROUP (element) == NULL);
GST_ELEMENT_SCHED_GROUP (element) = ref_group (group);
gst_object_ref (GST_OBJECT (element));
group->elements = g_slist_prepend (group->elements, element);
group->num_elements++;
GST_ELEMENT_SCHED_GROUP (element) = group;
if (gst_element_get_state (element) == GST_STATE_PLAYING) {
group_element_set_enabled (group, element, TRUE);
}
return group;
}
@ -506,6 +609,8 @@ create_group (GstOptSchedulerChain *chain, GstElement *element)
group = g_new0 (GstOptSchedulerGroup, 1);
GST_INFO (GST_CAT_SCHEDULING, "new group %p", group);
group->refcount = 1;
group->flags = GST_OPT_SCHEDULER_GROUP_DISABLED;
add_to_group (group, element);
add_to_chain (chain, group);
@ -519,7 +624,7 @@ destroy_group_scheduler (GstOptSchedulerGroup *group)
g_assert (group);
if (group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)
g_warning ("removing running element");
g_warning ("destroying running group scheduler");
#ifdef USE_COTHREADS
if (group->cothread) {
@ -536,29 +641,18 @@ destroy_group_scheduler (GstOptSchedulerGroup *group)
}
static void
delete_group (GstOptSchedulerGroup *group)
destroy_group (GstOptSchedulerGroup *group)
{
GSList *elements;
GST_INFO (GST_CAT_SCHEDULING, "delete group %p", group);
GST_INFO (GST_CAT_SCHEDULING, "destroy group %p", group);
g_assert (group != NULL);
g_assert (group->chain == NULL);
g_assert (group->elements == NULL);
remove_from_chain (group->chain, group);
if (group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)
destroy_group_scheduler (group);
/* remove all elements from the group */
elements = group->elements;
while (elements) {
GstElement *element = GST_ELEMENT (elements->data);
GST_ELEMENT_SCHED_GROUP (element) = NULL;
elements = g_slist_next (elements);
}
g_slist_free (group->elements);
g_free (group);
}
@ -569,18 +663,25 @@ remove_from_group (GstOptSchedulerGroup *group, GstElement *element)
g_assert (group != NULL);
g_assert (element != NULL);
g_assert (GST_ELEMENT_SCHED_GROUP (element) == group);
group->elements = g_slist_remove (group->elements, element);
group->num_elements--;
/* if the element was an entry point in the group, clear the group's
* entry point */
if (group->entry == element) {
group->entry = NULL;
}
GST_ELEMENT_SCHED_GROUP (element) = NULL;
gst_object_unref (GST_OBJECT (element));
if (group->num_elements == 0) {
GST_INFO (GST_CAT_SCHEDULING, "group %p is empty, deleting", group);
remove_from_chain (group->chain, group);
delete_group (group);
return NULL;
group = unref_group (group);
}
group = unref_group (group);
return group;
}
@ -594,9 +695,9 @@ merge_groups (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
if (group1 == group2 || group2 == NULL)
return group1;
while (group2) {
while (group2 && group2->elements) {
GstElement *element = (GstElement *)group2->elements->data;
group2 = remove_from_group (group2, element);
add_to_group (group1, element);
}
@ -658,8 +759,10 @@ group_element_set_enabled (GstOptSchedulerGroup *group, GstElement *element, gbo
static gboolean
schedule_group (GstOptSchedulerGroup *group)
{
if (!group->entry)
if (!group->entry) {
GST_INFO (GST_CAT_SCHEDULING, "not scheduling group %p without entry", group);
return FALSE;
}
#ifdef USE_COTHREADS
if (group->cothread)
@ -668,6 +771,11 @@ schedule_group (GstOptSchedulerGroup *group)
g_warning ("(internal error): trying to schedule group without cothread");
return TRUE;
#else
if (group->schedulefunc == NULL) {
GST_INFO (GST_CAT_SCHEDULING, "not scheduling group %p without schedulefunc",
group);
return FALSE;
}
group->schedulefunc (group->argc, group->argv);
return TRUE;
#endif
@ -677,7 +785,8 @@ schedule_group (GstOptSchedulerGroup *group)
static void
gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
{
GST_INFO (GST_CAT_SCHEDULING, "entering scheduler run queue recursion %d", osched->recursion);
GST_INFO (GST_CAT_SCHEDULING, "entering scheduler run queue recursion %d %d",
osched->recursion, g_list_length (osched->runqueue));
/* make sure we don't exceed max_recursion */
if (osched->recursion > osched->max_recursion) {
@ -689,18 +798,24 @@ gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
while (osched->runqueue) {
GstOptSchedulerGroup *group;
gboolean res;
group = (GstOptSchedulerGroup *) osched->runqueue->data;
/* runqueue hols refcount to group */
osched->runqueue = g_list_remove (osched->runqueue, group);
GST_INFO (GST_CAT_SCHEDULING, "scheduling %p", group);
if (!schedule_group (group)) {
res = schedule_group (group);
if (!res) {
g_warning ("error scheduling group %p", group);
group_error_handler (group);
}
GST_INFO (GST_CAT_SCHEDULING, "done scheduling %p", group);
else {
GST_INFO (GST_CAT_SCHEDULING, "done scheduling %p", group);
}
unref_group (group);
}
GST_INFO (GST_CAT_SCHEDULING, "run queue length after scheduling %d", g_list_length (osched->runqueue));
@ -710,7 +825,7 @@ gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
#endif
/* a chain is scheduled by picking the first active group and scheduling it */
static void
static void
schedule_chain (GstOptSchedulerChain *chain)
{
GSList *groups;
@ -722,10 +837,8 @@ schedule_chain (GstOptSchedulerChain *chain)
while (groups) {
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
groups = g_slist_next (groups);
if (!GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
ref_group (group);
GST_INFO (GST_CAT_SCHEDULING, "scheduling group %p in chain %p",
group, chain);
@ -733,14 +846,18 @@ schedule_chain (GstOptSchedulerChain *chain)
schedule_group (group);
#else
osched->recursion = 0;
ref_group (group);
osched->runqueue = g_list_append (osched->runqueue, group);
gst_opt_scheduler_schedule_run_queue (osched);
#endif
GST_INFO (GST_CAT_SCHEDULING, "done scheduling group %p in chain %p",
group, chain);
unref_group (group);
break;
}
groups = g_slist_next (groups);
}
}
@ -804,6 +921,9 @@ loop_group_schedule_function (int argc, char *argv[])
entry->loopfunc (entry);
GST_INFO (GST_CAT_SCHEDULING, "loopfunc ended of element %s in group %p",
GST_ELEMENT_NAME (entry), group);
group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
return 0;
@ -849,6 +969,8 @@ gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstBuffer *buffer)
#else
GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)) = g_list_append (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad)), buffer);
if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
ref_group (group);
GST_INFO (GST_CAT_SCHEDULING, "adding %p to runqueue", group);
osched->runqueue = g_list_append (osched->runqueue, group);
}
#endif
@ -863,47 +985,80 @@ gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstBuffer *buffer)
static GstBuffer*
gst_opt_scheduler_get_wrapper (GstPad *srcpad)
{
GstBuffer *buffer = NULL;
GstBuffer *buffer;
GstOptSchedulerGroup *group;
GstOptScheduler *osched;
gboolean disabled;
GST_INFO (GST_CAT_SCHEDULING, "get wrapper, removing buffer from bufpen");
if (GST_PAD_BUFLIST (srcpad))
/* first try to grab a queued buffer */
if (GST_PAD_BUFLIST (srcpad)) {
buffer = GST_PAD_BUFLIST (srcpad)->data;
while (!buffer) {
GstOptSchedulerGroup *group;
GstOptScheduler *osched;
GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), buffer);
group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (srcpad));
osched = group->chain->sched;
GST_INFO (GST_CAT_SCHEDULING, "get wrapper, returning queued buffer %d",
g_list_length (GST_PAD_BUFLIST (srcpad)));
return buffer;
}
/* else we need to schedule the peer element */
group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (srcpad));
osched = group->chain->sched;
buffer = NULL;
disabled = FALSE;
do {
#ifdef USE_COTHREADS
schedule_group (group);
#else
if (!(group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)) {
ref_group_by_count (group, 2);
osched->runqueue = g_list_append (osched->runqueue, group);
GST_INFO (GST_CAT_SCHEDULING, "recursing into scheduler group %p", group);
gst_opt_scheduler_schedule_run_queue (osched);
GST_INFO (GST_CAT_SCHEDULING, "return from recurse group %p", group);
/* if the other group was disabled we might have to break out of the loop */
disabled = GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group);
group = unref_group (group);
/* group is gone */
if (group == NULL) {
/* if the group was gone we also might have to break out of the loop */
disabled = TRUE;
}
}
else {
/* in this case, the group was running and we wanted to swtich to it,
* this is not allowed in the optimal scheduler (yet) */
g_warning ("deadlock detected, disabling group %p", group);
group_error_handler (group);
return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
}
#endif
/* if the scheduler interrupted, make sure we send an INTERRUPTED event to the
* > * loop based element */
* loop based element */
if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
return GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
GST_INFO (GST_CAT_SCHEDULING, "scheduler interrupted, return interrupt event");
buffer = GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
}
if (GST_PAD_BUFLIST (srcpad)) {
buffer = (GstBuffer *) GST_PAD_BUFLIST (srcpad)->data;
else {
if (GST_PAD_BUFLIST (srcpad)) {
buffer = (GstBuffer *) GST_PAD_BUFLIST (srcpad)->data;
GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), buffer);
}
else if (disabled) {
/* no buffer in queue and peer group was disabled */
buffer = GST_BUFFER (gst_event_new (GST_EVENT_INTERRUPT));
}
}
}
GST_PAD_BUFLIST (srcpad) = g_list_remove (GST_PAD_BUFLIST (srcpad), buffer);
while (buffer == NULL);
GST_INFO (GST_CAT_SCHEDULING, "get wrapper, returning buffer %d",
g_list_length (GST_PAD_BUFLIST (srcpad)));
GST_INFO (GST_CAT_SCHEDULING, "get wrapper, returning buffer %p, queue length %d",
buffer, g_list_length (GST_PAD_BUFLIST (srcpad)));
return buffer;
}
@ -921,6 +1076,56 @@ gst_opt_scheduler_chain_wrapper (GstPad *sinkpad, GstBuffer *buffer)
}
}
static void
clear_queued (GstData *data, gpointer user_data)
{
gst_data_unref (data);
}
static void
pad_clear_queued (GstPad *srcpad, gpointer user_data)
{
GList *buflist = GST_PAD_BUFLIST (srcpad);
if (buflist) {
GST_INFO (GST_CAT_SCHEDULING, "need to clear some buffers");
g_list_foreach (buflist, (GFunc) clear_queued, NULL);
g_list_free (buflist);
GST_PAD_BUFLIST (srcpad) = NULL;
}
}
static gboolean
gst_opt_scheduler_event_wrapper (GstPad *srcpad, GstEvent *event)
{
gboolean flush;
GST_INFO (GST_CAT_SCHEDULING, "intercepting event %d on pad %s:%s",
GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (srcpad));
/* figure out if this is a flush event */
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH:
flush = TRUE;
break;
case GST_EVENT_SEEK:
case GST_EVENT_SEEK_SEGMENT:
flush = GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH;
break;
default:
flush = FALSE;
break;
}
if (flush) {
GST_INFO (GST_CAT_SCHEDULING, "event is flush");
pad_clear_queued (srcpad, NULL);
}
return GST_RPAD_EVENTFUNC (srcpad) (srcpad, event);
}
/* setup the scheduler context for a group. The right schedule function
* is selected based on the group type and cothreads are created if
* needed */
@ -993,7 +1198,7 @@ gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gi
switch (transition) {
case GST_STATE_PAUSED_TO_PLAYING:
/* an element withut a group has to be an unlinked src, sink
/* an element without a group has to be an unlinked src, sink
* filter element */
if (!group) {
GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" has no group", GST_ELEMENT_NAME (element));
@ -1010,6 +1215,13 @@ gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gi
if (group)
group_element_set_enabled (group, element, FALSE);
break;
case GST_STATE_PAUSED_TO_READY:
{
GList *pads = (GList *) gst_element_get_pad_list (element);
g_list_foreach (pads, (GFunc) pad_clear_queued, NULL);
break;
}
default:
break;
}
@ -1157,6 +1369,7 @@ gst_opt_scheduler_add_element (GstScheduler *sched, GstElement *element)
ctx = g_new0 (GstOptSchedulerCtx, 1);
GST_ELEMENT_SCHED_CONTEXT (element) = ctx;
ctx->flags = GST_OPT_SCHEDULER_CTX_DISABLED;
/* loop based elements *always* end up in their own group. It can eventually
* be merged with another group when a link is made */
@ -1189,15 +1402,7 @@ gst_opt_scheduler_remove_element (GstScheduler *sched, GstElement *element)
/* the element is guaranteed to live in it's own group/chain now */
get_group (element, &group);
if (group) {
GstOptSchedulerChain *chain;
GST_ELEMENT_SCHED_GROUP (element) = NULL;
chain = group->chain;
if (chain) {
remove_from_chain (chain, group);
}
delete_group (group);
remove_from_group (group, element);
}
g_free (GST_ELEMENT_SCHED_CONTEXT (element));
@ -1250,6 +1455,7 @@ gst_opt_scheduler_interrupt (GstScheduler *sched, GstElement *element)
{
GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
GST_INFO (GST_CAT_SCHEDULING, "scheduler set interrupted state");
osched->state = GST_OPT_SCHEDULER_STATE_INTERRUPTED;
}
return TRUE;
@ -1350,6 +1556,8 @@ gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad
GST_RPAD_CHAINHANDLER (sinkpad) = GST_RPAD_CHAINFUNC (sinkpad);
else
GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_chain_wrapper;
GST_RPAD_EVENTHANDLER (srcpad) = GST_RPAD_EVENTFUNC (srcpad);
GST_RPAD_EVENTHANDLER (sinkpad) = GST_RPAD_EVENTFUNC (sinkpad);
/* the two elements should be put into the same group,
* this also means that they are in the same chain automatically */
@ -1374,6 +1582,8 @@ gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad
GST_RPAD_CHAINHANDLER (sinkpad) = GST_RPAD_CHAINFUNC (sinkpad);
else
GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_chain_wrapper;
GST_RPAD_EVENTHANDLER (srcpad) = GST_RPAD_EVENTFUNC (srcpad);
GST_RPAD_EVENTHANDLER (sinkpad) = GST_RPAD_EVENTFUNC (sinkpad);
/* the two elements should be put into the same group,
* this also means that they are in the same chain automatically,
@ -1385,6 +1595,8 @@ gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad
GST_INFO (GST_CAT_SCHEDULING, "get to loop based link");
GST_RPAD_GETHANDLER (srcpad) = GST_RPAD_GETFUNC (srcpad);
GST_RPAD_EVENTHANDLER (srcpad) = GST_RPAD_EVENTFUNC (srcpad);
GST_RPAD_EVENTHANDLER (sinkpad) = GST_RPAD_EVENTFUNC (sinkpad);
/* the two elements should be put into the same group,
* this also means that they are in the same chain automatically,
@ -1401,6 +1613,10 @@ gst_opt_scheduler_pad_link (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad
GST_RPAD_CHAINHANDLER (sinkpad) = gst_opt_scheduler_loop_wrapper;
GST_RPAD_GETHANDLER (srcpad) = gst_opt_scheduler_get_wrapper;
GST_RPAD_EVENTHANDLER (sinkpad) = GST_RPAD_EVENTFUNC (sinkpad);
/* events on the srcpad have to be intercepted as we might need to
* flush the buffer lists */
GST_RPAD_EVENTHANDLER (srcpad) = gst_opt_scheduler_event_wrapper;
group1 = GST_ELEMENT_SCHED_GROUP (element1);
group2 = GST_ELEMENT_SCHED_GROUP (element2);
@ -1488,6 +1704,15 @@ gst_opt_scheduler_pad_unlink (GstScheduler *sched, GstPad *srcpad, GstPad *sinkp
get_group (element1, &group1);
get_group (element2, &group2);
/* for decoupled elements (that are never put into a group) we use the
* group of the peer element for the remainder of the algorithm */
if (GST_ELEMENT_IS_DECOUPLED (element1)) {
group1 = group2;
}
if (GST_ELEMENT_IS_DECOUPLED (element2)) {
group2 = group1;
}
/* if one the elements has no group (anymore) we don't really care
* about the link */
if (!group1 || !group2) {
@ -1522,30 +1747,38 @@ gst_opt_scheduler_pad_unlink (GstScheduler *sched, GstPad *srcpad, GstPad *sinkp
if (still_link1 && still_link2) {
GST_INFO (GST_CAT_SCHEDULING, "elements still have links with other elements in the group");
/* FIXME it's possible that we have to break the chain */
g_warning ("pad unlink might break chain, implement me");
return;
}
/* now check which one of the elements we can remove from the group */
if (!still_link1) {
GST_INFO (GST_CAT_SCHEDULING, "element1 is separated from the group");
/* see if the element was an entry point for the group */
if (group->entry == element1) {
/* we're going to remove the element so we need to clear it as the
* entry point */
group->entry = NULL;
/* we only remove elements that are not the entry point of a loop based
* group and are not decoupled */
if (!(group->entry == element1 &&
group->type == GST_OPT_SCHEDULER_GROUP_LOOP) &&
!GST_ELEMENT_IS_DECOUPLED (element1))
{
GST_INFO (GST_CAT_SCHEDULING, "element1 is separated from the group");
remove_from_group (group, element1);
}
else {
GST_INFO (GST_CAT_SCHEDULING, "element1 is decoupled or entry in loop based group");
}
remove_from_group (group, element1);
}
if (!still_link2) {
GST_INFO (GST_CAT_SCHEDULING, "element2 is separated from the group");
/* see if the element was an entry point for the group */
if (group->entry == element2) {
/* we're going to remove the element so we need to clear it as the
* entry point */
group->entry = NULL;
/* we only remove elements that are not the entry point of a loop based
* group and are not decoupled */
if (!(group->entry == element2 &&
group->type == GST_OPT_SCHEDULER_GROUP_LOOP) &&
!GST_ELEMENT_IS_DECOUPLED (element2))
{
GST_INFO (GST_CAT_SCHEDULING, "element2 is separated from the group");
remove_from_group (group, element2);
}
else {
GST_INFO (GST_CAT_SCHEDULING, "element2 is decoupled or entry in loop based group");
}
remove_from_group (group, element2);
}
}
}
@ -1585,8 +1818,8 @@ gst_opt_scheduler_iterate (GstScheduler *sched)
chains = osched->chains;
while (chains) {
GstOptSchedulerChain *chain = (GstOptSchedulerChain *) chains->data;
chains = g_slist_next (chains);
ref_chain (chain);
/* if the chain is not disabled, schedule it */
if (!GST_OPT_SCHEDULER_CHAIN_IS_DISABLED (chain)) {
schedule_chain (chain);
@ -1597,7 +1830,16 @@ gst_opt_scheduler_iterate (GstScheduler *sched)
if (osched->state == GST_OPT_SCHEDULER_STATE_ERROR) {
GST_INFO (GST_CAT_SCHEDULING, "scheduler %p is in error", sched);
break;
}
else if (osched->state == GST_OPT_SCHEDULER_STATE_INTERRUPTED) {
GST_INFO (GST_CAT_SCHEDULING, "scheduler %p is interrupted, continue with next chain", sched);
osched->state = GST_OPT_SCHEDULER_STATE_RUNNING;
}
GST_INFO (GST_CAT_SCHEDULING, "iterate scheduled %p", chain);
chains = g_slist_next (chains);
unref_chain (chain);
}
/* at this point it's possible that the scheduler state is
@ -1639,15 +1881,16 @@ gst_opt_scheduler_show (GstScheduler *sched)
GSList *groups = chain->groups;
chains = g_slist_next (chains);
g_print ("+- chain %p: %d groups, %d enabled, flags %d\n", chain, chain->num_groups, chain->num_enabled, chain->flags);
g_print ("+- chain %p: refcount %d, %d groups, %d enabled, flags %d\n",
chain, chain->refcount, chain->num_groups, chain->num_enabled, chain->flags);
while (groups) {
GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
GSList *elements = group->elements;
groups = g_slist_next (groups);
g_print (" +- group %p: %d elements, %d enabled, flags %d, entry %s, %s\n",
group, group->num_elements, group->num_enabled, group->flags,
g_print (" +- group %p: refcount %d, %d elements, %d enabled, flags %d, entry %s, %s\n",
group, group->refcount, group->num_elements, group->num_enabled, group->flags,
(group->entry ? GST_ELEMENT_NAME (group->entry): "(none)"),
(group->type == GST_OPT_SCHEDULER_GROUP_GET ? "get-based" : "loop-based") );