Better, cleaner state management of the scheduler by adding scheduler state flags.

Original commit message from CVS:
Better, cleaner state management of the scheduler by adding scheduler state
flags.
typefind has to interrupt instead of yield.
Fix a leak in the queue when it's dropping buffers.
This commit is contained in:
Wim Taymans 2001-12-24 15:14:03 +00:00
parent 109f7a0d64
commit 7ec6702121
7 changed files with 131 additions and 72 deletions

View file

@ -717,7 +717,16 @@ gst_bin_iterate_func (GstBin * bin)
{ {
/* only iterate if this is the manager bin */ /* only iterate if this is the manager bin */
if (GST_ELEMENT_SCHED (bin)->parent == GST_ELEMENT (bin)) { if (GST_ELEMENT_SCHED (bin)->parent == GST_ELEMENT (bin)) {
return gst_scheduler_iterate (GST_ELEMENT_SCHED (bin)); GstSchedulerState state;
state = gst_scheduler_iterate (GST_ELEMENT_SCHED (bin));
if (state == GST_SCHEDULER_STATE_RUNNING) {
return TRUE;
}
else if (state == GST_SCHEDULER_STATE_ERROR) {
gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
}
} }
else { else {
g_warning ("bin \"%d\" can't be iterated on!\n", GST_ELEMENT_NAME (bin)); g_warning ("bin \"%d\" can't be iterated on!\n", GST_ELEMENT_NAME (bin));

View file

@ -1478,7 +1478,7 @@ gst_pad_push (GstPad *pad, GstBuffer *buf)
if (GST_IS_BUFFER (buf)) if (GST_IS_BUFFER (buf))
gst_buffer_unref (buf); gst_buffer_unref (buf);
else else
gst_pad_event_default (pad, GST_EVENT (buf)); gst_event_free (GST_EVENT (buf));
} }
} }
#endif #endif
@ -2018,7 +2018,7 @@ gst_pad_event_default (GstPad *pad, GstEvent *event)
} }
} }
gst_event_free (event); gst_event_free (event);
/* we have to try to schedule another element because this one is deisabled */ /* we have to try to schedule another element because this one is disabled */
gst_element_yield (element); gst_element_yield (element);
break; break;
default: default:

View file

@ -360,6 +360,8 @@ restart:
/* this means the other end is shut down */ /* this means the other end is shut down */
/* try to signal to resolve the error */ /* try to signal to resolve the error */
if (!queue->may_deadlock) { if (!queue->may_deadlock) {
if (GST_IS_BUFFER (buf)) gst_buffer_unref (buf);
else gst_event_free (GST_EVENT (buf));
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
return; return;

View file

@ -45,38 +45,46 @@ extern "C" {
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SCHEDULER)) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SCHEDULER))
#define GST_SCHED_PARENT(sched) ((sched)->parent) #define GST_SCHEDULER_PARENT(sched) ((sched)->parent)
#define GST_SCHEDULER_STATE(sched) ((sched)->state)
/*typedef struct _GstScheduler GstScheduler; */ /*typedef struct _GstScheduler GstScheduler; */
/*typedef struct _GstSchedulerClass GstSchedulerClass; */ /*typedef struct _GstSchedulerClass GstSchedulerClass; */
typedef enum {
GST_SCHEDULER_STATE_NONE,
GST_SCHEDULER_STATE_RUNNING,
GST_SCHEDULER_STATE_STOPPED,
GST_SCHEDULER_STATE_ERROR,
} GstSchedulerState;
struct _GstScheduler { struct _GstScheduler {
GstObject object; GstObject object;
GstElement *parent; GstElement *parent;
GstSchedulerState state;
}; };
struct _GstSchedulerClass { struct _GstSchedulerClass {
GstObjectClass parent_class; GstObjectClass parent_class;
/* virtual methods */ /* virtual methods */
void (*setup) (GstScheduler *sched); void (*setup) (GstScheduler *sched);
void (*reset) (GstScheduler *sched); void (*reset) (GstScheduler *sched);
void (*add_element) (GstScheduler *sched, GstElement *element); void (*add_element) (GstScheduler *sched, GstElement *element);
void (*remove_element) (GstScheduler *sched, GstElement *element); void (*remove_element) (GstScheduler *sched, GstElement *element);
GstElementStateReturn GstElementStateReturn (*state_transition) (GstScheduler *sched, GstElement *element, gint transition);
(*state_transition) (GstScheduler *sched, GstElement *element, gint transition); void (*lock_element) (GstScheduler *sched, GstElement *element);
void (*lock_element) (GstScheduler *sched, GstElement *element); void (*unlock_element) (GstScheduler *sched, GstElement *element);
void (*unlock_element) (GstScheduler *sched, GstElement *element); void (*yield) (GstScheduler *sched, GstElement *element);
void (*yield) (GstScheduler *sched, GstElement *element); void (*interrupt) (GstScheduler *sched, GstElement *element);
void (*interrupt) (GstScheduler *sched, GstElement *element); void (*error) (GstScheduler *sched, GstElement *element);
void (*error) (GstScheduler *sched, GstElement *element); void (*pad_connect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void (*pad_connect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); void (*pad_disconnect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void (*pad_disconnect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); void (*pad_select) (GstScheduler *sched, GList *padlist);
void (*pad_select) (GstScheduler *sched, GList *padlist); GstSchedulerState (*iterate) (GstScheduler *sched);
gboolean (*iterate) (GstScheduler *sched);
/* for debugging */ /* for debugging */
void (*show) (GstScheduler *sched); void (*show) (GstScheduler *sched);
/* signals go here */ /* signals go here */
}; };

View file

@ -196,7 +196,7 @@ gst_typefind_chain (GstPad *pad, GstBuffer *buf)
typefind->caps); typefind->caps);
if (GST_STATE(typefind) != oldstate) { if (GST_STATE(typefind) != oldstate) {
GST_DEBUG(0, "state changed during signal, aborting\n"); GST_DEBUG(0, "state changed during signal, aborting\n");
gst_element_yield (GST_ELEMENT (typefind)); gst_element_interrupt (GST_ELEMENT (typefind));
} }
gst_object_unref (GST_OBJECT (typefind)); gst_object_unref (GST_OBJECT (typefind));

View file

@ -62,6 +62,13 @@ struct _GstSchedulerChain {
#define GST_BASIC_SCHEDULER_CAST(sched) ((GstBasicScheduler *)(sched)) #define GST_BASIC_SCHEDULER_CAST(sched) ((GstBasicScheduler *)(sched))
typedef enum {
GST_BASIC_SCHEDULER_STATE_NONE,
GST_BASIC_SCHEDULER_STATE_STOPPED,
GST_BASIC_SCHEDULER_STATE_ERROR,
GST_BASIC_SCHEDULER_STATE_RUNNING,
} GstBasicSchedulerState;
struct _GstBasicScheduler { struct _GstBasicScheduler {
GstScheduler parent; GstScheduler parent;
@ -70,6 +77,8 @@ struct _GstBasicScheduler {
GList *chains; GList *chains;
gint num_chains; gint num_chains;
GstBasicSchedulerState state;
}; };
struct _GstBasicSchedulerClass { struct _GstBasicSchedulerClass {
@ -78,28 +87,29 @@ struct _GstBasicSchedulerClass {
static GType _gst_basic_scheduler_type = 0; static GType _gst_basic_scheduler_type = 0;
static void gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass); static void gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass);
static void gst_basic_scheduler_init (GstBasicScheduler * scheduler); static void gst_basic_scheduler_init (GstBasicScheduler * scheduler);
static void gst_basic_scheduler_dispose (GObject *object); static void gst_basic_scheduler_dispose (GObject *object);
static void gst_basic_scheduler_setup (GstScheduler *sched); static void gst_basic_scheduler_setup (GstScheduler *sched);
static void gst_basic_scheduler_reset (GstScheduler *sched); static void gst_basic_scheduler_reset (GstScheduler *sched);
static void gst_basic_scheduler_add_element (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_add_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_remove_element (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_remove_element (GstScheduler *sched, GstElement *element);
static GstElementStateReturn static GstElementStateReturn
gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition); gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition);
static void gst_basic_scheduler_lock_element (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_lock_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_unlock_element (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_unlock_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_error (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_error (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); static void gst_basic_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
static void gst_basic_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); static void gst_basic_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
static GstPad* gst_basic_scheduler_pad_select (GstScheduler *sched, GList *padlist); static GstPad* gst_basic_scheduler_pad_select (GstScheduler *sched, GList *padlist);
static gboolean gst_basic_scheduler_iterate (GstScheduler *sched); static GstSchedulerState
gst_basic_scheduler_iterate (GstScheduler *sched);
static void gst_basic_scheduler_show (GstScheduler *sched); static void gst_basic_scheduler_show (GstScheduler *sched);
static GstSchedulerClass *parent_class = NULL; static GstSchedulerClass *parent_class = NULL;
@ -877,7 +887,7 @@ static void
gst_basic_scheduler_reset (GstScheduler *sched) gst_basic_scheduler_reset (GstScheduler *sched)
{ {
cothread_context *ctx; cothread_context *ctx;
GstBin *bin = GST_BIN (GST_SCHED_PARENT (sched)); GstBin *bin = GST_BIN (GST_SCHEDULER_PARENT (sched));
GList *elements = GST_BASIC_SCHEDULER_CAST (sched)->elements; GList *elements = GST_BASIC_SCHEDULER_CAST (sched)->elements;
while (elements) { while (elements) {
@ -885,11 +895,11 @@ gst_basic_scheduler_reset (GstScheduler *sched)
elements = g_list_next (elements); elements = g_list_next (elements);
} }
ctx = GST_BIN_THREADCONTEXT (GST_SCHED_PARENT (sched)); ctx = GST_BIN_THREADCONTEXT (GST_SCHEDULER_PARENT (sched));
cothread_context_free (ctx); cothread_context_free (ctx);
GST_BIN_THREADCONTEXT (GST_SCHED_PARENT (sched)) = NULL; GST_BIN_THREADCONTEXT (GST_SCHEDULER_PARENT (sched)) = NULL;
} }
static void static void
@ -984,21 +994,41 @@ gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element,
GstSchedulerChain *chain; GstSchedulerChain *chain;
GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched); GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
/* find the chain the element is in */ /* check if our parent changed state */
chain = gst_basic_scheduler_find_chain (bsched, element); if (GST_SCHEDULER_PARENT (sched) == element) {
GST_INFO (GST_CAT_SCHEDULING, "parent \"%s\" changed state", GST_ELEMENT_NAME (element));
/* remove it from the chain */ if (transition == GST_STATE_PLAYING_TO_PAUSED) {
if (chain) { GST_INFO (GST_CAT_SCHEDULING, "setting scheduler state to stopped");
if (transition == GST_STATE_PLAYING_TO_PAUSED) GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_STOPPED;
gst_basic_scheduler_chain_disable_element (chain, element); }
if (transition == GST_STATE_PAUSED_TO_PLAYING) else if (transition == GST_STATE_PAUSED_TO_PLAYING) {
if (!gst_basic_scheduler_chain_enable_element (chain, element)) { GST_INFO (GST_CAT_SCHEDULING, "setting scheduler state to running");
GST_INFO (GST_CAT_SCHEDULING, "could not enable element \"%s\"", GST_ELEMENT_NAME (element)); GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_RUNNING;
return GST_STATE_FAILURE; }
} else {
GST_INFO (GST_CAT_SCHEDULING, "no interesting state change, doing nothing");
}
} }
else { else if (transition == GST_STATE_PLAYING_TO_PAUSED ||
GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" not found in any chain, no state change", GST_ELEMENT_NAME (element)); transition == GST_STATE_PAUSED_TO_PLAYING) {
/* find the chain the element is in */
chain = gst_basic_scheduler_find_chain (bsched, element);
/* remove it from the chain */
if (chain) {
if (transition == GST_STATE_PLAYING_TO_PAUSED) {
gst_basic_scheduler_chain_disable_element (chain, element);
}
else if (transition == GST_STATE_PAUSED_TO_PLAYING) {
if (!gst_basic_scheduler_chain_enable_element (chain, element)) {
GST_INFO (GST_CAT_SCHEDULING, "could not enable element \"%s\"", GST_ELEMENT_NAME (element));
return GST_STATE_FAILURE;
}
}
}
else {
GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" not found in any chain, no state change", GST_ELEMENT_NAME (element));
}
} }
return GST_STATE_SUCCESS; return GST_STATE_SUCCESS;
@ -1044,7 +1074,7 @@ gst_basic_scheduler_error (GstScheduler *sched, GstElement *element)
if (chain) if (chain)
gst_basic_scheduler_chain_disable_element (chain, element); gst_basic_scheduler_chain_disable_element (chain, element);
GST_STATE_PENDING (GST_SCHEDULER (sched)->parent) = GST_STATE_PAUSED; GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_ERROR;
cothread_switch (cothread_current_main ()); cothread_switch (cothread_current_main ());
} }
@ -1161,14 +1191,13 @@ gst_basic_scheduler_pad_select (GstScheduler * sched, GList * padlist)
return pad; return pad;
} }
static gboolean static GstSchedulerState
gst_basic_scheduler_iterate (GstScheduler * sched) gst_basic_scheduler_iterate (GstScheduler * sched)
{ {
GstBin *bin = GST_BIN (sched->parent); GstBin *bin = GST_BIN (sched->parent);
GList *chains; GList *chains;
GstSchedulerChain *chain; GstSchedulerChain *chain;
GstElement *entry; GstElement *entry;
gboolean eos = FALSE;
GList *elements; GList *elements;
gint scheduled = 0; gint scheduled = 0;
GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched); GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
@ -1179,7 +1208,7 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
chains = bsched->chains; chains = bsched->chains;
if (chains == NULL) if (chains == NULL)
return FALSE; return GST_SCHEDULER_STATE_STOPPED;
while (chains) { while (chains) {
chain = (GstSchedulerChain *) (chains->data); chain = (GstSchedulerChain *) (chains->data);
@ -1210,7 +1239,10 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
break; break;
} }
if (entry) { if (entry) {
GstSchedulerState state;
GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING); GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
GST_DEBUG (GST_CAT_DATAFLOW, "set COTHREAD_STOPPING flag on \"%s\"(@%p)\n", GST_DEBUG (GST_CAT_DATAFLOW, "set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
GST_ELEMENT_NAME (entry), entry); GST_ELEMENT_NAME (entry), entry);
if (GST_ELEMENT_THREADSTATE (entry)) { if (GST_ELEMENT_THREADSTATE (entry)) {
@ -1218,9 +1250,11 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
} }
else { else {
GST_DEBUG (GST_CAT_DATAFLOW, "cothread switch not possible, element has no threadstate\n"); GST_DEBUG (GST_CAT_DATAFLOW, "cothread switch not possible, element has no threadstate\n");
return FALSE; return GST_SCHEDULER_STATE_ERROR;
} }
state = GST_SCHEDULER_STATE (sched);
/* following is a check to see if the chain was interrupted due to a /* following is a check to see if the chain was interrupted due to a
* top-half state_change(). (i.e., if there's a pending state.) * top-half state_change(). (i.e., if there's a pending state.)
* *
@ -1228,28 +1262,32 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
* execute the state change. * execute the state change.
*/ */
GST_DEBUG (GST_CAT_DATAFLOW, "cothread switch ended or interrupted\n"); GST_DEBUG (GST_CAT_DATAFLOW, "cothread switch ended or interrupted\n");
if (GST_STATE_PENDING (GST_SCHEDULER (sched)->parent) != GST_STATE_VOID_PENDING) {
GST_DEBUG (GST_CAT_DATAFLOW, "handle pending state %d\n", if (state != GST_SCHEDULER_STATE_RUNNING) {
GST_STATE_PENDING (GST_SCHEDULER (sched)->parent)); GST_INFO (GST_CAT_DATAFLOW, "scheduler is not running, in state %d", state);
return FALSE; return state;
} }
scheduled++; scheduled++;
} }
else { else {
GST_INFO (GST_CAT_DATAFLOW, "NO ENTRY INTO CHAIN!"); GST_INFO (GST_CAT_DATAFLOW, "no entry in this chain, trying the next one");
if (scheduled == 0)
eos = TRUE;
} }
} }
else { else {
GST_INFO (GST_CAT_DATAFLOW, "NO ENABLED ELEMENTS IN CHAIN!!"); GST_INFO (GST_CAT_DATAFLOW, "no enabled elements in this chain, trying the next one");
if (scheduled == 0)
eos = TRUE;
} }
} }
GST_DEBUG (GST_CAT_DATAFLOW, "leaving (%s)\n", GST_ELEMENT_NAME (bin)); GST_DEBUG (GST_CAT_DATAFLOW, "leaving (%s)\n", GST_ELEMENT_NAME (bin));
return !eos; if (scheduled == 0) {
GST_INFO (GST_CAT_DATAFLOW, "nothing was scheduled, return STOPPED");
return GST_SCHEDULER_STATE_STOPPED;
}
else {
GST_INFO (GST_CAT_DATAFLOW, "scheduler still running, return RUNNING");
return GST_SCHEDULER_STATE_RUNNING;
}
} }

View file

@ -360,6 +360,8 @@ restart:
/* this means the other end is shut down */ /* this means the other end is shut down */
/* try to signal to resolve the error */ /* try to signal to resolve the error */
if (!queue->may_deadlock) { if (!queue->may_deadlock) {
if (GST_IS_BUFFER (buf)) gst_buffer_unref (buf);
else gst_event_free (GST_EVENT (buf));
g_mutex_unlock (queue->qlock); g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
return; return;