This is an attempt at not segfaulting on errors but reporting some usefull info instead.

Original commit message from CVS:
This is an attempt at not segfaulting on errors but reporting some
usefull info instead.
- bin changes so errors can propagate.
- changed the _FAST macros to _CAST because that is what they do.
- removed all references to cothreads out of the core, they are
really a scheduler issue, handler with a sched_private gpointer.
- added a live buffer count, for debugging buffer leaks.
- added error checking in gst_scheduler_state_transition this solves the
"out of cothreads" problem.
- GST_ELEMENT_NO_ENTRY == GST_ELEMENT_INFINITE_LOOP
- added 2 private element flasg for use by the scheduler
(_COTHREAD_STOPPING) is now
- added scheduler entry points:
- _yield : to create possible scheduling points.
- _interrupt: to stop execution of an element.
- _error: to signal en error condition to the scheduler.
- improved error messages for pads.
- signal gst_element_error where appropriate.
- added the a new bin to the parent before entering it so one can reference
its children.
- queue memleak fixes on dispose.
- added possible deadlock detection in queue (turned off be default)
- GstBasicScheduler is a real class of its own now, hiding its internal
variables.
- GST_ELEMENT_IS_COTHREAD_STOPPING is gone. either call explicit _yield
operations, or make a sane loop.
- Better state change handling in filesrc. Better error reporting/recovery
too.
- updated core plugins.
- detect non decoupled elements on scheduler boundries and error.
This commit is contained in:
Wim Taymans 2001-12-22 21:18:17 +00:00
parent 11456df888
commit 087dee1f62
29 changed files with 925 additions and 533 deletions

View file

@ -203,72 +203,69 @@ gst_autoplugcache_loop (GstElement *element)
* the playout pointer hits the end of cache again it has to start pulling.
*/
do {
/* the first time through, the current_playout pointer is going to be NULL */
if (cache->current_playout == NULL) {
/* get a buffer */
buf = gst_pad_pull (cache->sinkpad);
/* the first time through, the current_playout pointer is going to be NULL */
if (cache->current_playout == NULL) {
/* get a buffer */
buf = gst_pad_pull (cache->sinkpad);
/* add it to the cache, though cache == NULL */
gst_buffer_ref (buf);
cache->cache = g_list_prepend (cache->cache, buf);
cache->buffer_count++;
/* add it to the cache, though cache == NULL */
gst_buffer_ref (buf);
cache->cache = g_list_prepend (cache->cache, buf);
cache->buffer_count++;
/* set the current_playout pointer */
cache->current_playout = cache->cache;
/* set the current_playout pointer */
cache->current_playout = cache->cache;
g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
/* send the buffer on its way */
gst_pad_push (cache->srcpad, buf);
}
/* send the buffer on its way */
gst_pad_push (cache->srcpad, buf);
}
/* the steady state is where the playout is at the front of the cache */
else if (g_list_previous(cache->current_playout) == NULL) {
/* the steady state is where the playout is at the front of the cache */
else if (g_list_previous(cache->current_playout) == NULL) {
/* if we've been told to fire an empty signal (after a reset) */
if (cache->fire_empty) {
int oldstate = GST_STATE(cache);
GST_DEBUG(0,"at front of cache, about to pull, but firing signal\n");
/* if we've been told to fire an empty signal (after a reset) */
if (cache->fire_empty) {
int oldstate = GST_STATE(cache);
GST_DEBUG(0,"at front of cache, about to pull, but firing signal\n");
gst_object_ref (GST_OBJECT (cache));
g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[CACHE_EMPTY], 0, NULL);
if (GST_STATE(cache) != oldstate) {
gst_object_ref (GST_OBJECT (cache));
g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[CACHE_EMPTY], 0, NULL);
if (GST_STATE(cache) != oldstate) {
gst_object_ref (GST_OBJECT (cache));
GST_DEBUG(GST_CAT_AUTOPLUG, "state changed during signal, aborting\n");
cothread_switch(cothread_current_main());
}
gst_object_unref (GST_OBJECT (cache));
GST_DEBUG(GST_CAT_AUTOPLUG, "state changed during signal, aborting\n");
cothread_switch(cothread_current_main());
}
/* get a buffer */
buf = gst_pad_pull (cache->sinkpad);
/* add it to the front of the cache */
gst_buffer_ref (buf);
cache->cache = g_list_prepend (cache->cache, buf);
cache->buffer_count++;
/* set the current_playout pointer */
cache->current_playout = cache->cache;
/* send the buffer on its way */
gst_pad_push (cache->srcpad, buf);
gst_object_unref (GST_OBJECT (cache));
}
/* otherwise we're trundling through existing cached buffers */
else {
/* move the current_playout pointer */
cache->current_playout = g_list_previous (cache->current_playout);
/* get a buffer */
buf = gst_pad_pull (cache->sinkpad);
if (cache->fire_first) {
g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
cache->fire_first = FALSE;
}
/* add it to the front of the cache */
gst_buffer_ref (buf);
cache->cache = g_list_prepend (cache->cache, buf);
cache->buffer_count++;
/* push that buffer */
gst_pad_push (cache->srcpad, GST_BUFFER(cache->current_playout->data));
/* set the current_playout pointer */
cache->current_playout = cache->cache;
/* send the buffer on its way */
gst_pad_push (cache->srcpad, buf);
}
/* otherwise we're trundling through existing cached buffers */
else {
/* move the current_playout pointer */
cache->current_playout = g_list_previous (cache->current_playout);
if (cache->fire_first) {
g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
cache->fire_first = FALSE;
}
} while (!GST_FLAG_IS_SET (element, GST_ELEMENT_COTHREAD_STOPPING));
/* push that buffer */
gst_pad_push (cache->srcpad, GST_BUFFER(cache->current_playout->data));
}
}
static GstPadNegotiateReturn

View file

@ -166,7 +166,6 @@ cothread_create (cothread_context *ctx)
cothread_destroy (ctx->threads[slot]);
break;
}
}
sp = CURRENT_STACK_FRAME;

View file

@ -272,46 +272,44 @@ gst_aggregator_loop (GstElement *element)
aggregator = GST_AGGREGATOR (element);
do {
if (aggregator->sched == AGGREGATOR_LOOP ||
aggregator->sched == AGGREGATOR_LOOP_PEEK) {
GList *pads = aggregator->sinkpads;
if (aggregator->sched == AGGREGATOR_LOOP ||
aggregator->sched == AGGREGATOR_LOOP_PEEK) {
GList *pads = aggregator->sinkpads;
while (pads) {
GstPad *pad = GST_PAD (pads->data);
pads = g_list_next (pads);
while (pads) {
GstPad *pad = GST_PAD (pads->data);
pads = g_list_next (pads);
if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
buf = gst_pad_peek (pad);
if (buf == NULL)
continue;
if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
buf = gst_pad_peek (pad);
if (buf == NULL)
continue;
g_assert (buf == gst_pad_pull (pad));
debug = "loop_peek";
}
else {
buf = gst_pad_pull (pad);
debug = "loop";
}
gst_aggregator_push (aggregator, pad, buf, debug);
}
}
else {
if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
GstPad *pad;
debug = "loop_select";
pad = gst_pad_select (aggregator->sinkpads);
buf = gst_pad_pull (pad);
gst_aggregator_push (aggregator, pad, buf, debug);
g_assert (buf == gst_pad_pull (pad));
debug = "loop_peek";
}
else {
g_assert_not_reached ();
buf = gst_pad_pull (pad);
debug = "loop";
}
gst_aggregator_push (aggregator, pad, buf, debug);
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
}
else {
if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
GstPad *pad;
debug = "loop_select";
pad = gst_pad_select (aggregator->sinkpads);
buf = gst_pad_pull (pad);
gst_aggregator_push (aggregator, pad, buf, debug);
}
else {
g_assert_not_reached ();
}
}
}
/**

View file

@ -387,7 +387,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
case ARG_OUTPUT:
break;
case ARG_DATA:
src->data = g_value_get_int (value);
src->data = g_value_get_enum (value);
switch (src->data) {
case FAKESRC_DATA_ALLOCATE:
if (src->parent) {
@ -403,7 +403,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
}
break;
case ARG_SIZETYPE:
src->sizetype = g_value_get_int (value);
src->sizetype = g_value_get_enum (value);
break;
case ARG_SIZEMIN:
src->sizemin = g_value_get_int (value);
@ -415,7 +415,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
src->parentsize = g_value_get_int (value);
break;
case ARG_FILLTYPE:
src->filltype = g_value_get_int (value);
src->filltype = g_value_get_enum (value);
break;
case ARG_PATTERN:
break;
@ -455,13 +455,13 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS
g_value_set_boolean (value, src->loop_based);
break;
case ARG_OUTPUT:
g_value_set_int (value, src->output);
g_value_set_enum (value, src->output);
break;
case ARG_DATA:
g_value_set_int (value, src->data);
g_value_set_enum (value, src->data);
break;
case ARG_SIZETYPE:
g_value_set_int (value, src->sizetype);
g_value_set_enum (value, src->sizetype);
break;
case ARG_SIZEMIN:
g_value_set_int (value, src->sizemin);
@ -473,7 +473,7 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS
g_value_set_int (value, src->parentsize);
break;
case ARG_FILLTYPE:
g_value_set_int (value, src->filltype);
g_value_set_enum (value, src->filltype);
break;
case ARG_PATTERN:
g_value_set_string (value, src->pattern);
@ -689,49 +689,46 @@ static void
gst_fakesrc_loop(GstElement *element)
{
GstFakeSrc *src;
GList *pads;
g_return_if_fail(element != NULL);
g_return_if_fail(GST_IS_FAKESRC(element));
src = GST_FAKESRC (element);
do {
GList *pads;
pads = gst_element_get_pad_list (element);
pads = GST_ELEMENT (src)->pads;
while (pads) {
GstPad *pad = GST_PAD (pads->data);
GstBuffer *buf;
while (pads) {
GstPad *pad = GST_PAD (pads->data);
GstBuffer *buf;
if (src->rt_num_buffers == 0) {
src->eos = TRUE;
}
else {
if (src->rt_num_buffers > 0)
src->rt_num_buffers--;
}
if (src->eos) {
gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
return;
}
buf = gst_fakesrc_create_buffer (src);
GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
if (!src->silent) {
gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
}
g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
buf, pad);
gst_pad_push (pad, buf);
pads = g_list_next (pads);
if (src->rt_num_buffers == 0) {
src->eos = TRUE;
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
else {
if (src->rt_num_buffers > 0)
src->rt_num_buffers--;
}
if (src->eos) {
gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
return;
}
buf = gst_fakesrc_create_buffer (src);
GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
if (!src->silent) {
gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
}
g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
buf, pad);
gst_pad_push (pad, buf);
pads = g_list_next (pads);
}
}
static GstElementStateReturn

View file

@ -30,6 +30,7 @@
#include <unistd.h>
#include <sys/mman.h>
#include <errno.h>
#include <string.h>
/**********************************************************************
@ -327,10 +328,10 @@ gst_filesrc_map_region (GstFileSrc *src, off_t offset, size_t size)
/* mmap() the data into this new buffer */
GST_BUFFER_DATA(buf) = mmap (NULL, size, PROT_READ, MAP_SHARED, src->fd, offset);
if (GST_BUFFER_DATA(buf) == NULL) {
fprintf (stderr, "ERROR: gstfilesrc couldn't map file!\n");
gst_element_error (GST_ELEMENT (src), "couldn't map file");
} else if (GST_BUFFER_DATA(buf) == MAP_FAILED) {
g_error ("gstfilesrc mmap(0x%x, %d, 0x%llx) : %s",
size, src->fd, offset, sys_errlist[errno]);
gst_element_error (GST_ELEMENT (src), "mmap (0x%x, %d, 0x%llx) : %s",
size, src->fd, offset, strerror (errno));
}
#ifdef MADV_SEQUENTIAL
/* madvise to tell the kernel what to do with it */
@ -533,8 +534,8 @@ gst_filesrc_open_file (GstFileSrc *src)
/* open the file */
src->fd = open (src->filename, O_RDONLY);
if (src->fd < 0) {
perror ("open");
gst_element_error (GST_ELEMENT (src), g_strconcat("opening file \"", src->filename, "\"", NULL));
gst_element_error (GST_ELEMENT (src), "opening file \"%s\" (%s)",
src->filename, strerror (errno), NULL);
return FALSE;
} else {
/* find the file length */
@ -557,7 +558,6 @@ gst_filesrc_close_file (GstFileSrc *src)
{
g_return_if_fail (GST_FLAG_IS_SET (src, GST_FILESRC_OPEN));
g_print ("close\n");
/* close the file */
close (src->fd);
@ -565,6 +565,8 @@ gst_filesrc_close_file (GstFileSrc *src)
src->fd = 0;
src->filelen = 0;
src->curoffset = 0;
if (src->mapbuf)
gst_buffer_unref (src->mapbuf);
GST_FLAG_UNSET (src, GST_FILESRC_OPEN);
}
@ -575,17 +577,22 @@ gst_filesrc_change_state (GstElement *element)
{
GstFileSrc *src = GST_FILESRC(element);
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
gst_filesrc_close_file (GST_FILESRC (element));
} if (GST_STATE_PENDING (element) == GST_STATE_READY) {
src->curoffset = 0;
} else {
if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
if (!gst_filesrc_open_file (GST_FILESRC (element)))
return GST_STATE_FAILURE;
}
switch (GST_STATE_TRANSITION (element)) {
case GST_STATE_NULL_TO_READY:
if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
if (!gst_filesrc_open_file (GST_FILESRC (element)))
return GST_STATE_FAILURE;
}
break;
case GST_STATE_READY_TO_NULL:
if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
gst_filesrc_close_file (GST_FILESRC (element));
break;
case GST_STATE_READY_TO_PAUSED:
case GST_STATE_PAUSED_TO_READY:
src->curoffset = 0;
default:
break;
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)

View file

@ -205,27 +205,24 @@ gst_identity_loop (GstElement *element)
identity = GST_IDENTITY (element);
do {
buf = gst_pad_pull (identity->sinkpad);
buf = gst_pad_pull (identity->sinkpad);
for (i=identity->duplicate; i; i--) {
if (!identity->silent)
g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n",
for (i=identity->duplicate; i; i--) {
if (!identity->silent)
g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n",
GST_DEBUG_PAD_NAME (identity->sinkpad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
buf);
if (i>1)
gst_buffer_ref (buf);
if (i>1)
gst_buffer_ref (buf);
gst_pad_push (identity->srcpad, buf);
gst_pad_push (identity->srcpad, buf);
if (identity->sleep_time)
usleep (identity->sleep_time);
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
if (identity->sleep_time)
usleep (identity->sleep_time);
}
}
static void

View file

@ -138,7 +138,6 @@ gst_bin_init (GstBin * bin)
bin->numchildren = 0;
bin->children = NULL;
bin->eoscond = g_cond_new ();
}
/**
@ -364,8 +363,8 @@ gst_bin_remove (GstBin * bin, GstElement * element)
}
void
gst_bin_child_state_change (GstBin * bin, GstElementState old, GstElementState new,
GstElement * child)
gst_bin_child_state_change (GstBin *bin, GstElementState old, GstElementState new,
GstElement *child)
{
gint old_idx = 0, new_idx = 0, i;
@ -394,6 +393,22 @@ gst_bin_child_state_change (GstBin * bin, GstElementState old, GstElementState n
GST_UNLOCK (bin);
}
void
gst_bin_child_error (GstBin *bin, GstElement *child)
{
if (GST_STATE (bin) != GST_STATE_NULL) {
/*
GST_STATE_PENDING (bin) = ((GST_STATE (bin) >> 1));
if (gst_element_set_state (bin, GST_STATE (bin)>>1) != GST_STATE_SUCCESS) {
gst_element_error (GST_ELEMENT (bin), "bin \"%s\" couldn't change state on error from child \"%s\"",
GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (child));
}
*/
gst_element_info (GST_ELEMENT (bin), "bin \"%s\" stopped because child \"%s\" signalled an error",
GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (child));
}
}
static void
gst_bin_send_event (GstElement *element, GstEvent *event)
{
@ -437,6 +452,9 @@ gst_bin_change_state (GstElement * element)
gst_element_set_state (child, old_state);
if (GST_ELEMENT_SCHED (child) == GST_ELEMENT_SCHED (element)) {
/* reset to what is was */
GST_STATE_PENDING (element) = old_state;
gst_bin_change_state (element);
return GST_STATE_FAILURE;
}
break;
@ -551,8 +569,6 @@ gst_bin_dispose (GObject * object)
bin->children = NULL;
bin->numchildren = 0;
g_cond_free (bin->eoscond);
G_OBJECT_CLASS (parent_class)->dispose (object);
}

View file

@ -38,15 +38,15 @@ extern GType _gst_bin_type;
# define GST_IS_BIN(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_BIN))
# define GST_IS_BIN_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_BIN))
#define GST_BIN_FAST(obj) ((GstBin*)(obj))
#define GST_BIN_CLASS_FAST(klass) ((GstBinClass*)(klass))
#define GST_BIN_CAST(obj) ((GstBin*)(obj))
#define GST_BIN_CLASS_CAST(klass) ((GstBinClass*)(klass))
#ifdef GST_TYPE_PARANOID
# define GST_BIN(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_BIN, GstBin))
# define GST_BIN_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_BIN, GstBinClass))
#else
# define GST_BIN GST_BIN_FAST
# define GST_BIN_CLASS GST_BIN_CLASS_FAST
# define GST_BIN GST_BIN_CAST
# define GST_BIN_CLASS GST_BIN_CLASS_CAST
#endif
typedef enum {
@ -71,11 +71,10 @@ struct _GstBin {
/* our children */
gint numchildren;
GList *children;
GCond *eoscond;
GstElementState child_states[GST_NUM_STATES];
cothread_context *threadcontext;
gpointer sched_private;
};
struct _GstBinClass {
@ -113,6 +112,7 @@ gboolean gst_bin_iterate (GstBin *bin);
/* internal */
void gst_bin_child_state_change (GstBin *bin, GstElementState oldstate,
GstElementState newstate, GstElement *child);
void gst_bin_child_error (GstBin *bin, GstElement *child);
#ifdef __cplusplus
}

View file

@ -31,6 +31,7 @@ GType _gst_buffer_type;
static GMemChunk *_gst_buffer_chunk;
static GMutex *_gst_buffer_chunk_lock;
static gint _gst_buffer_live;
void
_gst_buffer_initialize (void)
@ -58,6 +59,20 @@ _gst_buffer_initialize (void)
_gst_buffer_chunk_lock = g_mutex_new ();
_gst_buffer_type = g_type_register_static (G_TYPE_INT, "GstBuffer", &buffer_info, 0);
_gst_buffer_live = 0;
}
/**
* gst_buffer_print_stats:
*
* Print statistics about live buffers.
*/
void
gst_buffer_print_stats (void)
{
g_log (g_log_domain_gstreamer, G_LOG_LEVEL_INFO,
"%d live buffers", _gst_buffer_live);
}
/**
@ -74,6 +89,7 @@ gst_buffer_new (void)
g_mutex_lock (_gst_buffer_chunk_lock);
buffer = g_mem_chunk_alloc (_gst_buffer_chunk);
_gst_buffer_live++;
g_mutex_unlock (_gst_buffer_chunk_lock);
GST_INFO (GST_CAT_BUFFER,"creating new buffer %p",buffer);
@ -153,11 +169,12 @@ gst_buffer_create_sub (GstBuffer *parent,
g_mutex_lock (_gst_buffer_chunk_lock);
buffer = g_mem_chunk_alloc (_gst_buffer_chunk);
GST_DATA_TYPE(buffer) = _gst_buffer_type;
_gst_buffer_live++;
g_mutex_unlock (_gst_buffer_chunk_lock);
GST_INFO (GST_CAT_BUFFER,"creating new subbuffer %p from parent %p (size %u, offset %u)",
buffer, parent, size, offset);
GST_DATA_TYPE(buffer) = _gst_buffer_type;
buffer->lock = g_mutex_new ();
#ifdef HAVE_ATOMIC_H
atomic_set (&buffer->refcount, 1);
@ -292,6 +309,7 @@ gst_buffer_destroy (GstBuffer *buffer)
/* remove it entirely from memory */
g_mutex_lock (_gst_buffer_chunk_lock);
g_mem_chunk_free (_gst_buffer_chunk,buffer);
_gst_buffer_live--;
g_mutex_unlock (_gst_buffer_chunk_lock);
}

View file

@ -167,6 +167,8 @@ GstBuffer* gst_buffer_append (GstBuffer *buffer, GstBuffer *append);
gboolean gst_buffer_is_span_fast (GstBuffer *buf1, GstBuffer *buf2);
void gst_buffer_print_stats (void);
#ifdef __cplusplus
}
#endif /* __cplusplus */

View file

@ -173,8 +173,8 @@ gst_element_init (GstElement *element)
element->numsinkpads = 0;
element->pads = NULL;
element->loopfunc = NULL;
element->threadstate = NULL;
element->sched = NULL;
element->sched_private = NULL;
element->state_mutex = g_mutex_new ();
element->state_cond = g_cond_new ();
}
@ -798,10 +798,25 @@ void
gst_element_error (GstElement *element, const gchar *error, ...)
{
va_list var_args;
GstObject *parent;
g_return_if_fail (GST_IS_ELEMENT (element));
g_return_if_fail (error != NULL);
va_start (var_args, error);
gst_element_message (element, "error", error, var_args);
va_end (var_args);
parent = GST_ELEMENT_PARENT (element);
if (parent && GST_IS_BIN (parent)) {
gst_bin_child_error (GST_BIN (parent), element);
}
if (element->sched) {
gst_scheduler_error (element->sched, element);
}
}
/**
@ -817,6 +832,9 @@ gst_element_info (GstElement *element, const gchar *info, ...)
{
va_list var_args;
g_return_if_fail (GST_IS_ELEMENT (element));
g_return_if_fail (info != NULL);
va_start (var_args, info);
gst_element_message (element, "info", info, var_args);
va_end (var_args);
@ -965,7 +983,7 @@ static GstElementStateReturn
gst_element_change_state (GstElement *element)
{
GstElementState old_state;
//GstEvent *event;
GstObject *parent;
g_return_val_if_fail (element != NULL, GST_STATE_FAILURE);
g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE);
@ -973,7 +991,7 @@ gst_element_change_state (GstElement *element)
old_state = GST_STATE (element);
if (GST_STATE_PENDING (element) == GST_STATE_VOID_PENDING || old_state == GST_STATE_PENDING (element)) {
GST_INFO (GST_CAT_STATES, "no state change needed for element %s (VOID_PENDING)\n", GST_ELEMENT_NAME (element));
GST_INFO (GST_CAT_STATES, "no state change needed for element %s (VOID_PENDING)", GST_ELEMENT_NAME (element));
return GST_STATE_SUCCESS;
}
@ -983,8 +1001,12 @@ gst_element_change_state (GstElement *element)
GST_STATE_TRANSITION (element));
/* tell the scheduler if we have one */
if (element->sched)
gst_scheduler_state_transition (element->sched, element, GST_STATE_TRANSITION (element));
if (element->sched) {
if (gst_scheduler_state_transition (element->sched, element, GST_STATE_TRANSITION (element))
!= GST_STATE_SUCCESS) {
return GST_STATE_FAILURE;
}
}
GST_STATE (element) = GST_STATE_PENDING (element);
GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING;
@ -993,11 +1015,11 @@ gst_element_change_state (GstElement *element)
g_cond_signal (element->state_cond);
g_mutex_unlock (element->state_mutex);
if (GST_ELEMENT_PARENT (element)) {
gst_bin_child_state_change (GST_BIN (GST_ELEMENT_PARENT (element)), old_state, GST_STATE (element), element);
parent = GST_ELEMENT_PARENT (element);
if (parent && GST_IS_BIN (parent)) {
gst_bin_child_state_change (GST_BIN (parent), old_state, GST_STATE (element), element);
}
//event = gst_event_new_state_change (old_state, GST_STATE (element));
//gst_element_send_event (element, event);
return GST_STATE_SUCCESS;
}
@ -1328,7 +1350,6 @@ gst_element_signal_eos (GstElement *element)
GST_DEBUG(GST_CAT_EVENT, "signaling EOS on element %s\n",GST_OBJECT_NAME(element));
g_signal_emit (G_OBJECT (element), gst_element_signals[EOS], 0);
GST_FLAG_SET(element,GST_ELEMENT_COTHREAD_STOPPING);
}

View file

@ -58,8 +58,8 @@ extern GType _gst_element_type;
#define GST_TYPE_ELEMENT (_gst_element_type)
#define GST_ELEMENT_FAST(obj) ((GstElement*)(obj))
#define GST_ELEMENT_CLASS_FAST(klass) ((GstElementClass*)(klass))
#define GST_ELEMENT_CAST(obj) ((GstElement*)(obj))
#define GST_ELEMENT_CLASS_CAST(klass) ((GstElementClass*)(klass))
#define GST_IS_ELEMENT(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_ELEMENT))
#define GST_IS_ELEMENT_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_ELEMENT))
@ -67,8 +67,8 @@ extern GType _gst_element_type;
# define GST_ELEMENT(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_ELEMENT, GstElement))
# define GST_ELEMENT_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_ELEMENT, GstElementClass))
#else
# define GST_ELEMENT GST_ELEMENT_FAST
# define GST_ELEMENT_CLASS GST_ELEMENT_CLASS_FAST
# define GST_ELEMENT GST_ELEMENT_CAST
# define GST_ELEMENT_CLASS GST_ELEMENT_CLASS_CAST
#endif
typedef enum {
@ -82,18 +82,16 @@ typedef enum {
/* this element is incable of seeking (FIXME: does this apply to filters?) */
GST_ELEMENT_NO_SEEK,
/***** !!!!! need to have a flag that says that an element must
*not* be an entry into a scheduling chain !!!!! *****/
/* this element for some reason doesn't obey COTHREAD_STOPPING, or
has some other reason why it can't be the entry */
GST_ELEMENT_NO_ENTRY,
/* this element, for some reason, has a loop function that performs
* an infinite loop without calls to gst_element_yield () */
GST_ELEMENT_INFINITE_LOOP,
/* private flags that can be used by the scheduler */
GST_ELEMENT_SCHEDULER_PRIVATE1,
GST_ELEMENT_SCHEDULER_PRIVATE2,
/* there is a new loopfunction ready for placement */
GST_ELEMENT_NEW_LOOPFUNC,
/* the cothread holding this element needs to be stopped */
GST_ELEMENT_COTHREAD_STOPPING,
/* the element has to be scheduled as a cothread for any sanity */
GST_ELEMENT_USE_COTHREAD,
/* if this element can handle events */
GST_ELEMENT_EVENT_AWARE,
@ -103,7 +101,6 @@ typedef enum {
} GstElementFlags;
#define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED))
#define GST_ELEMENT_IS_COTHREAD_STOPPING(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_COTHREAD_STOPPING))
#define GST_ELEMENT_IS_EOS(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EOS))
#define GST_ELEMENT_IS_EVENT_AWARE(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EVENT_AWARE))
@ -127,9 +124,10 @@ struct _GstElement {
guint8 current_state;
guint8 pending_state;
GstElement *manager;
GstScheduler *sched;
GstElementLoopFunction loopfunc;
cothread_state *threadstate;
GstScheduler *sched;
gpointer sched_private;
/* element pads */
guint16 numpads;
@ -185,6 +183,8 @@ const gchar* gst_element_get_name (GstElement *element);
void gst_element_set_parent (GstElement *element, GstObject *parent);
GstObject* gst_element_get_parent (GstElement *element);
#define gst_element_yield(element) gst_scheduler_yield(GST_ELEMENT_SCHED(element),element)
#define gst_element_interrupt(element) gst_scheduler_interrupt(GST_ELEMENT_SCHED(element),element)
void gst_element_set_sched (GstElement *element, GstScheduler *sched);
GstScheduler* gst_element_get_sched (GstElement *element);

View file

@ -53,15 +53,15 @@ extern GType _gst_object_type;
# define GST_IS_OBJECT(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_OBJECT))
# define GST_IS_OBJECT_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_OBJECT))
#define GST_OBJECT_FAST(obj) ((GstObject*)(obj))
#define GST_OBJECT_CLASS_FAST(klass) ((GstObjectClass*)(klass))
#define GST_OBJECT_CAST(obj) ((GstObject*)(obj))
#define GST_OBJECT_CLASS_CAST(klass) ((GstObjectClass*)(klass))
#ifdef GST_TYPE_PARANOID
# define GST_OBJECT(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_OBJECT, GstObject))
# define GST_OBJECT_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_OBJECT, GstObjectClass))
#else
# define GST_OBJECT GST_OBJECT_FAST
# define GST_OBJECT_CLASS GST_OBJECT_CLASS_FAST
# define GST_OBJECT GST_OBJECT_CAST
# define GST_OBJECT_CLASS GST_OBJECT_CLASS_CAST
#endif
/*typedef struct _GstObject GstObject; */
@ -115,7 +115,7 @@ struct _GstObjectClass {
#endif
};
#define GST_FLAGS(obj) (GST_OBJECT (obj)->flags)
#define GST_FLAGS(obj) (GST_OBJECT_CAST (obj)->flags)
#define GST_FLAG_IS_SET(obj,flag) (GST_FLAGS (obj) & (1<<(flag)))
#define GST_FLAG_SET(obj,flag) G_STMT_START{ (GST_FLAGS (obj) |= (1<<(flag))); }G_STMT_END
#define GST_FLAG_UNSET(obj,flag) G_STMT_START{ (GST_FLAGS (obj) &= ~(1<<(flag))); }G_STMT_END
@ -127,10 +127,10 @@ struct _GstObjectClass {
#define GST_OBJECT_FLOATING(obj) (GST_FLAG_IS_SET (obj, GST_FLOATING))
/* object locking */
#define GST_LOCK(obj) (g_mutex_lock(GST_OBJECT(obj)->lock))
#define GST_TRYLOCK(obj) (g_mutex_trylock(GST_OBJECT(obj)->lock))
#define GST_UNLOCK(obj) (g_mutex_unlock(GST_OBJECT(obj)->lock))
#define GST_GET_LOCK(obj) (GST_OBJECT(obj)->lock)
#define GST_LOCK(obj) (g_mutex_lock(GST_OBJECT_CAST(obj)->lock))
#define GST_TRYLOCK(obj) (g_mutex_trylock(GST_OBJECT_CAST(obj)->lock))
#define GST_UNLOCK(obj) (g_mutex_unlock(GST_OBJECT_CAST(obj)->lock))
#define GST_GET_LOCK(obj) (GST_OBJECT_CAST(obj)->lock)
/* normal GObject stuff */

View file

@ -192,6 +192,9 @@ gst_real_pad_init (GstRealPad *pad)
pad->direction = GST_PAD_UNKNOWN;
pad->peer = NULL;
pad->sched = NULL;
pad->sched_private = NULL;
pad->chainfunc = NULL;
pad->getfunc = NULL;
pad->getregionfunc = NULL;
@ -564,13 +567,11 @@ void
gst_pad_connect (GstPad *srcpad,
GstPad *sinkpad)
{
if (!gst_pad_try_connect (srcpad, sinkpad))
/* FIXME: g_critical is glib-2.0, not glib-1.2
g_critical ("couldn't connect %s:%s and %s:%s",
*/
if (!gst_pad_try_connect (srcpad, sinkpad)) {
g_warning ("couldn't connect %s:%s and %s:%s",
GST_DEBUG_PAD_NAME (srcpad),
GST_DEBUG_PAD_NAME (sinkpad));
}
}
/**
@ -1451,18 +1452,33 @@ gst_pad_push (GstPad *pad, GstBuffer *buf)
GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad));
g_return_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SRC);
if (!peer) {
g_warning ("gst_pad_push but %s:%s is unconnected", GST_DEBUG_PAD_NAME (pad));
return;
g_warning ("push on pad %s:%s but it is unconnected", GST_DEBUG_PAD_NAME (pad));
}
if (peer->chainhandler) {
GST_DEBUG (GST_CAT_DATAFLOW, "calling chainhandler &%s of peer pad %s:%s\n",
GST_DEBUG_FUNCPTR_NAME (peer->chainhandler), GST_DEBUG_PAD_NAME (GST_PAD (peer)));
(peer->chainhandler) (GST_PAD_FAST (peer), buf);
}
else {
g_warning ("gst_pad_push but %s:%s has but no chainhandler", GST_DEBUG_PAD_NAME (peer));
if (peer->chainhandler) {
if (buf) {
GST_DEBUG (GST_CAT_DATAFLOW, "calling chainhandler &%s of peer pad %s:%s\n",
GST_DEBUG_FUNCPTR_NAME (peer->chainhandler), GST_DEBUG_PAD_NAME (GST_PAD (peer)));
(peer->chainhandler) (GST_PAD_CAST (peer), buf);
return;
}
else {
g_warning ("trying to push a NULL buffer on pad %s:%s", GST_DEBUG_PAD_NAME (peer));
return;
}
}
else {
g_warning ("(internal error) push on pad %s:%s but it has no chainhandler", GST_DEBUG_PAD_NAME (peer));
}
}
/* clean up the mess here */
if (buf != NULL) {
if (GST_IS_BUFFER (buf))
gst_buffer_unref (buf);
else
gst_pad_event_default (pad, GST_EVENT (buf));
}
}
#endif
@ -1486,18 +1502,33 @@ gst_pad_pull (GstPad *pad)
g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK, NULL);
if (!peer) {
g_warning ("gst_pad_pull but %s:%s is unconnected", GST_DEBUG_PAD_NAME(pad));
return NULL;
gst_element_error (GST_PAD_PARENT (pad),
"pull on pad %s:%s but it was unconnected",
GST_ELEMENT_NAME (GST_PAD_PARENT (pad)), GST_PAD_NAME (pad),
NULL);
}
else {
if (peer->gethandler) {
GstBuffer *buf;
if (peer->gethandler) {
GST_DEBUG (GST_CAT_DATAFLOW, "calling gethandler %s of peer pad %s:%s\n",
GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_PAD_NAME (peer));
return (peer->gethandler) (GST_PAD_FAST (peer));
} else {
g_warning ("gst_pad_pull but %s:%s has no gethandler", GST_DEBUG_PAD_NAME (peer));
return NULL;
GST_DEBUG (GST_CAT_DATAFLOW, "calling gethandler %s of peer pad %s:%s\n",
GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_PAD_NAME (peer));
buf = (peer->gethandler) (GST_PAD_CAST (peer));
if (buf)
return buf;
/* no null buffers allowed */
gst_element_error (GST_PAD_PARENT (pad),
"NULL buffer during pull on %s:%s", GST_DEBUG_PAD_NAME (pad), NULL);
} else {
gst_element_error (GST_PAD_PARENT (pad),
"(internal error) pull on pad %s:%s but the peer pad %s:%s has no gethandler",
GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer),
NULL);
}
}
return NULL;
}
#endif
@ -1535,8 +1566,8 @@ gst_pad_pullregion (GstPad *pad, GstRegionType type, guint64 offset, guint64 len
if (peer->pullregionfunc) {
GST_DEBUG (GST_CAT_DATAFLOW, "calling pullregionfunc &%s of peer pad %s:%s\n",
GST_DEBUG_FUNCPTR_NAME (peer->pullregionfunc), GST_DEBUG_PAD_NAME(GST_PAD_FAST (peer)));
result = (peer->pullregionfunc) (GST_PAD_FAST (peer), type, offset, len);
GST_DEBUG_FUNCPTR_NAME (peer->pullregionfunc), GST_DEBUG_PAD_NAME(GST_PAD_CAST (peer)));
result = (peer->pullregionfunc) (GST_PAD_CAST (peer), type, offset, len);
} else {
GST_DEBUG (GST_CAT_DATAFLOW,"no pullregionfunc\n");
result = NULL;
@ -1986,6 +2017,8 @@ gst_pad_event_default (GstPad *pad, GstEvent *event)
pads = g_list_next (pads);
}
}
/* we have to try to schedule another element because this one is deisabled */
gst_element_yield (element);
break;
default:
g_warning ("no default handler for event\n");

View file

@ -48,8 +48,8 @@ extern GType _gst_ghost_pad_type;
*/
#define GST_TYPE_PAD (_gst_pad_type)
#define GST_PAD_FAST(obj) ((GstPad*)(obj))
#define GST_PAD_CLASS_FAST(klass) ((GstPadClass*)(klass))
#define GST_PAD_CAST(obj) ((GstPad*)(obj))
#define GST_PAD_CLASS_CAST(klass) ((GstPadClass*)(klass))
#define GST_IS_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_PAD))
#define GST_IS_PAD_FAST(obj) (G_OBJECT_TYPE(obj) == GST_TYPE_REAL_PAD || \
G_OBJECT_TYPE(obj) == GST_TYPE_GHOST_PAD)
@ -59,8 +59,8 @@ extern GType _gst_ghost_pad_type;
# define GST_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_PAD, GstPad))
# define GST_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_PAD, GstPadClass))
#else
# define GST_PAD GST_PAD_FAST
# define GST_PAD_CLASS GST_PAD_CLASS_FAST
# define GST_PAD GST_PAD_CAST
# define GST_PAD_CLASS GST_PAD_CLASS_CAST
#endif
/*
@ -68,8 +68,8 @@ extern GType _gst_ghost_pad_type;
*/
#define GST_TYPE_REAL_PAD (_gst_real_pad_type)
#define GST_REAL_PAD_FAST(obj) ((GstRealPad*)(obj))
#define GST_REAL_PAD_CLASS_FAST(klass) ((GstRealPadClass*)(klass))
#define GST_REAL_PAD_CAST(obj) ((GstRealPad*)(obj))
#define GST_REAL_PAD_CLASS_CAST(klass) ((GstRealPadClass*)(klass))
#define GST_IS_REAL_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_REAL_PAD))
#define GST_IS_REAL_PAD_FAST(obj) (G_OBJECT_TYPE(obj) == GST_TYPE_REAL_PAD)
#define GST_IS_REAL_PAD_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_REAL_PAD))
@ -78,8 +78,8 @@ extern GType _gst_ghost_pad_type;
# define GST_REAL_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_REAL_PAD, GstRealPad))
# define GST_REAL_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_REAL_PAD, GstRealPadClass))
#else
# define GST_REAL_PAD GST_REAL_PAD_FAST
# define GST_REAL_PAD_CLASS GST_REAL_PAD_CLASS_FAST
# define GST_REAL_PAD GST_REAL_PAD_CAST
# define GST_REAL_PAD_CLASS GST_REAL_PAD_CLASS_CAST
#endif
/*
@ -87,8 +87,8 @@ extern GType _gst_ghost_pad_type;
*/
#define GST_TYPE_GHOST_PAD (_gst_ghost_pad_type)
#define GST_GHOST_PAD_FAST(obj) ((GstGhostPad*)(obj))
#define GST_GHOST_PAD_CLASS_FAST(klass) ((GstGhostPadClass*)(klass))
#define GST_GHOST_PAD_CAST(obj) ((GstGhostPad*)(obj))
#define GST_GHOST_PAD_CLASS_CAST(klass) ((GstGhostPadClass*)(klass))
#define GST_IS_GHOST_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_GHOST_PAD))
#define GST_IS_GHOST_PAD_FAST(obj) (G_OBJECT_TYPE(obj) == GST_TYPE_GHOST_PAD)
#define GST_IS_GHOST_PAD_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_GHOST_PAD))
@ -97,8 +97,8 @@ extern GType _gst_ghost_pad_type;
# define GST_GHOST_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_GHOST_PAD, GstGhostPad))
# define GST_GHOST_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_GHOST_PAD, GstGhostPadClass))
#else
# define GST_GHOST_PAD GST_GHOST_PAD_FAST
# define GST_GHOST_PAD_CLASS GST_GHOST_PAD_CLASS_FAST
# define GST_GHOST_PAD GST_GHOST_PAD_CAST
# define GST_GHOST_PAD_CLASS GST_GHOST_PAD_CLASS_CAST
#endif
@ -169,7 +169,8 @@ struct _GstRealPad {
GstCaps *caps;
GstPadDirection direction;
cothread_state *threadstate;
GstScheduler *sched;
gpointer sched_private;
GstRealPad *peer;
@ -178,8 +179,6 @@ struct _GstRealPad {
guint64 offset;
guint64 len;
GstScheduler *sched;
GstPadChainFunction chainfunc;
GstPadChainFunction chainhandler;
GstPadGetFunction getfunc;
@ -257,7 +256,7 @@ struct _GstGhostPadClass {
#define GST_GPAD_REALPAD(pad) (((GstGhostPad *)(pad))->realpad)
/* Generic */
#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
#define GST_PAD_DIRECTION(pad) GST_RPAD_DIRECTION(GST_PAD_REALIZE(pad))
#define GST_PAD_CAPS(pad) GST_RPAD_CAPS(GST_PAD_REALIZE(pad))
#define GST_PAD_PEER(pad) GST_RPAD_PEER(GST_PAD_REALIZE(pad))

View file

@ -104,6 +104,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
GList *pads;
gint elementcount = 0;
gint retval = 0;
gboolean backref = FALSE;
priv->binlevel++;
@ -174,13 +175,14 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
GstElement *new;
GST_DEBUG (0, "have pad for element %s\n", element_name);
new = gst_bin_get_by_name (parent, element_name);
new = gst_bin_get_by_name_recurse_up (parent, element_name);
if (!new) {
GST_DEBUG (0, "element %s does not exist! trying to continue\n", element_name);
}
else {
previous = new;
srcpadname = ptr + 1;
backref = TRUE;
}
}
@ -321,6 +323,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
i++;
continue;
}
gst_bin_add (GST_BIN (parent), element);
j = gst_parse_launch_cmdline (argc - i, argv + i + 1, GST_BIN (element), priv);
/* check for parse error */
@ -347,9 +350,9 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
return GST_PARSE_ERROR_NOSUCH_ELEMENT;
}
GST_DEBUG (0, "CREATED element %s\n", GST_ELEMENT_NAME (element));
gst_bin_add (GST_BIN (parent), element);
}
gst_bin_add (GST_BIN (parent), element);
elementcount++;
g_slist_free (sinkpads);
@ -412,7 +415,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
else
GST_DEBUG (0, "have sink pad %s:%s\n", GST_DEBUG_PAD_NAME (GST_PARSE_LISTPAD (sinkpads)));
if (!srcpads && sinkpads && previous) {
if (!srcpads && sinkpads && previous && srcpadname) {
dyn_connect *connect = g_malloc (sizeof (dyn_connect));
connect->srcpadname = srcpadname;
@ -442,7 +445,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
sinkpads = NULL;
/* if we're the first element, ghost all the sinkpads */
if (elementcount == 1) {
if (elementcount == 1 && !backref) {
DEBUG ("first element, ghosting all of %s's sink pads to parent %s\n",
GST_ELEMENT_NAME (element), GST_ELEMENT_NAME (GST_ELEMENT (parent)));
pads = gst_element_get_pad_list (element);

View file

@ -65,11 +65,13 @@ enum {
ARG_LEAKY,
ARG_LEVEL,
ARG_MAX_LEVEL,
ARG_MAY_DEADLOCK,
};
static void gst_queue_class_init (GstQueueClass *klass);
static void gst_queue_init (GstQueue *queue);
static void gst_queue_dispose (GObject *object);
static void gst_queue_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
@ -141,18 +143,22 @@ gst_queue_class_init (GstQueueClass *klass)
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEAKY,
g_param_spec_enum("leaky","Leaky","Where the queue leaks, if at all.",
GST_TYPE_QUEUE_LEAKY,GST_QUEUE_NO_LEAK,G_PARAM_READWRITE));
g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEVEL,
g_param_spec_int("level","Level","How many buffers are in the queue.",
0,G_MAXINT,0,G_PARAM_READABLE));
g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_MAX_LEVEL,
g_param_spec_int("max_level","Maximum Level","How many buffers the queue holds.",
0,G_MAXINT,100,G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY,
g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL,
g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
0, G_MAXINT, 0, G_PARAM_READABLE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
0, G_MAXINT, 100, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
TRUE, G_PARAM_READWRITE));
gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_queue_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_queue_get_property);
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state);
}
@ -182,6 +188,7 @@ gst_queue_init (GstQueue *queue)
queue->size_buffers = 100; /* 100 buffers */
queue->size_bytes = 100 * 1024; /* 100KB */
queue->size_time = 1000000000LL; /* 1sec */
queue->may_deadlock = TRUE;
queue->qlock = g_mutex_new ();
queue->reader = FALSE;
@ -191,6 +198,18 @@ gst_queue_init (GstQueue *queue)
GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
}
static void
gst_queue_dispose (GObject *object)
{
GstQueue *queue = GST_QUEUE (object);
g_mutex_free (queue->qlock);
g_cond_free (queue->not_empty);
g_cond_free (queue->not_full);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static GstBufferPool*
gst_queue_get_bufferpool (GstPad *pad)
{
@ -334,10 +353,21 @@ restart:
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
gst_element_interrupt (GST_ELEMENT (queue));
goto restart;
}
g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
if (GST_STATE (queue) != GST_STATE_PLAYING) {
/* this means the other end is shut down */
/* try to signal to resolve the error */
if (!queue->may_deadlock) {
g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
return;
}
else {
gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements");
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->writer)
@ -402,10 +432,20 @@ restart:
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
gst_element_interrupt (GST_ELEMENT (queue));
goto restart;
}
g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
if (GST_STATE (queue) != GST_STATE_PLAYING) {
/* this means the other end is shut down */
if (!queue->may_deadlock) {
g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
return NULL;
}
else {
gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->reader)
@ -513,10 +553,13 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa
switch (prop_id) {
case ARG_LEAKY:
queue->leaky = g_value_get_int(value);
queue->leaky = g_value_get_int (value);
break;
case ARG_MAX_LEVEL:
queue->size_buffers = g_value_get_int(value);
queue->size_buffers = g_value_get_int (value);
break;
case ARG_MAY_DEADLOCK:
queue->may_deadlock = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -536,13 +579,16 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe
switch (prop_id) {
case ARG_LEAKY:
g_value_set_int(value, queue->leaky);
g_value_set_int (value, queue->leaky);
break;
case ARG_LEVEL:
g_value_set_int(value, queue->level_buffers);
g_value_set_int (value, queue->level_buffers);
break;
case ARG_MAX_LEVEL:
g_value_set_int(value, queue->size_buffers);
g_value_set_int (value, queue->size_buffers);
break;
case ARG_MAY_DEADLOCK:
g_value_set_boolean (value, queue->may_deadlock);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);

View file

@ -74,6 +74,7 @@ struct _GstQueue {
guint64 size_time; /* size of queue in time */
gint leaky; /* whether the queue is leaky, and if so at which end */
gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
GMutex *qlock; /* lock for queue (vs object lock) */
/* we are single reader and single writer queue */

View file

@ -76,6 +76,8 @@ gst_scheduler_init (GstScheduler *sched)
void
gst_scheduler_setup (GstScheduler *sched)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
if (CLASS (sched)->setup)
CLASS (sched)->setup (sched);
}
@ -89,6 +91,8 @@ gst_scheduler_setup (GstScheduler *sched)
void
gst_scheduler_reset (GstScheduler *sched)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
if (CLASS (sched)->reset)
CLASS (sched)->reset (sched);
}
@ -104,6 +108,10 @@ gst_scheduler_reset (GstScheduler *sched)
void
gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_PAD (srcpad));
g_return_if_fail (GST_IS_PAD (sinkpad));
if (CLASS (sched)->pad_connect)
CLASS (sched)->pad_connect (sched, srcpad, sinkpad);
}
@ -119,6 +127,10 @@ gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
void
gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_PAD (srcpad));
g_return_if_fail (GST_IS_PAD (sinkpad));
if (CLASS (sched)->pad_disconnect)
CLASS (sched)->pad_disconnect (sched, srcpad, sinkpad);
}
@ -135,6 +147,9 @@ gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkp
GstPad *
gst_scheduler_pad_select (GstScheduler *sched, GList *padlist)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (padlist != NULL);
if (CLASS (sched)->pad_select)
CLASS (sched)->pad_select (sched, padlist);
}
@ -149,6 +164,9 @@ gst_scheduler_pad_select (GstScheduler *sched, GList *padlist)
void
gst_scheduler_add_element (GstScheduler *sched, GstElement *element)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_ELEMENT (element));
if (CLASS (sched)->add_element)
CLASS (sched)->add_element (sched, element);
}
@ -161,11 +179,16 @@ gst_scheduler_add_element (GstScheduler *sched, GstElement *element)
*
* Tell the scheduler that an element changed its state.
*/
void
GstElementStateReturn
gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition)
{
g_return_val_if_fail (GST_IS_SCHEDULER (sched), GST_STATE_FAILURE);
g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE);
if (CLASS (sched)->state_transition)
CLASS (sched)->state_transition (sched, element, transition);
return CLASS (sched)->state_transition (sched, element, transition);
return GST_STATE_SUCCESS;
}
/**
@ -178,6 +201,9 @@ gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint t
void
gst_scheduler_remove_element (GstScheduler *sched, GstElement *element)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_ELEMENT (element));
if (CLASS (sched)->remove_element)
CLASS (sched)->remove_element (sched, element);
}
@ -192,6 +218,9 @@ gst_scheduler_remove_element (GstScheduler *sched, GstElement *element)
void
gst_scheduler_lock_element (GstScheduler *sched, GstElement *element)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_ELEMENT (element));
if (CLASS (sched)->lock_element)
CLASS (sched)->lock_element (sched, element);
}
@ -206,10 +235,64 @@ gst_scheduler_lock_element (GstScheduler *sched, GstElement *element)
void
gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_ELEMENT (element));
if (CLASS (sched)->unlock_element)
CLASS (sched)->unlock_element (sched, element);
}
/**
* gst_scheduler_error:
* @sched: the schedulerr
* @element: the element with the error
*
* Tell the scheduler an element was in error
*/
void
gst_scheduler_error (GstScheduler *sched, GstElement *element)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_ELEMENT (element));
if (CLASS (sched)->error)
CLASS (sched)->error (sched, element);
}
/**
* gst_scheduler_yield:
* @sched: the schedulerr
* @element: the element requesting a yield
*
* Tell the scheduler to schedule another element.
*/
void
gst_scheduler_yield (GstScheduler *sched, GstElement *element)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_ELEMENT (element));
if (CLASS (sched)->yield)
CLASS (sched)->yield (sched, element);
}
/**
* gst_scheduler_interrupt:
* @sched: the schedulerr
* @element: the element requesting an interrupt
*
* Tell the scheduler to interrupt execution of this element.
*/
void
gst_scheduler_interrupt (GstScheduler *sched, GstElement *element)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_ELEMENT (element));
if (CLASS (sched)->interrupt)
CLASS (sched)->interrupt (sched, element);
}
/**
* gst_scheduler_iterate:
* @sched: the schedulerr
@ -221,6 +304,8 @@ gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element)
gboolean
gst_scheduler_iterate (GstScheduler *sched)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
if (CLASS (sched)->iterate)
CLASS (sched)->iterate (sched);
}
@ -235,6 +320,8 @@ gst_scheduler_iterate (GstScheduler *sched)
void
gst_scheduler_show (GstScheduler *sched)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
if (CLASS (sched)->show)
CLASS (sched)->show (sched);
}

View file

@ -54,31 +54,31 @@ struct _GstScheduler {
GstObject object;
GstElement *parent;
GList *elements;
gint num_elements;
GList *chains;
gint num_chains;
};
struct _GstSchedulerClass {
GstObjectClass parent_class;
/* virtual methods */
void (*setup) (GstScheduler *sched);
void (*reset) (GstScheduler *sched);
void (*add_element) (GstScheduler *sched, GstElement *element);
void (*remove_element) (GstScheduler *sched, GstElement *element);
void (*state_transition) (GstScheduler *sched, GstElement *element, gint transition);
void (*remove_element) (GstScheduler *sched, GstElement *element);
GstElementStateReturn
(*state_transition) (GstScheduler *sched, GstElement *element, gint transition);
void (*lock_element) (GstScheduler *sched, GstElement *element);
void (*unlock_element) (GstScheduler *sched, GstElement *element);
void (*yield) (GstScheduler *sched, GstElement *element);
void (*interrupt) (GstScheduler *sched, GstElement *element);
void (*error) (GstScheduler *sched, GstElement *element);
void (*pad_connect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void (*pad_disconnect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void (*pad_select) (GstScheduler *sched, GList *padlist);
gboolean (*iterate) (GstScheduler *sched);
/* for debugging */
void (*show) (GstScheduler *sched);
/* signals go here */
};
GType gst_scheduler_get_type (void);
@ -89,9 +89,12 @@ void gst_scheduler_setup (GstScheduler *sched);
void gst_scheduler_reset (GstScheduler *sched);
void gst_scheduler_add_element (GstScheduler *sched, GstElement *element);
void gst_scheduler_remove_element (GstScheduler *sched, GstElement *element);
void gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition);
GstElementStateReturn gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition);
void gst_scheduler_lock_element (GstScheduler *sched, GstElement *element);
void gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element);
void gst_scheduler_yield (GstScheduler *sched, GstElement *element);
void gst_scheduler_interrupt (GstScheduler *sched, GstElement *element);
void gst_scheduler_error (GstScheduler *sched, GstElement *element);
void gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
GstPad* gst_scheduler_pad_select (GstScheduler *sched, GList *padlist);

View file

@ -147,8 +147,6 @@ gst_thread_init (GstThread *thread)
thread->ppid = getpid();
thread->thread_id = -1;
/* gst_element_set_manager(GST_ELEMENT(thread),GST_ELEMENT(thread)); */
}
static void
@ -303,7 +301,8 @@ gst_thread_change_state (GstElement * element)
break;
case GST_STATE_PLAYING_TO_PAUSED:
{
GList *elements = (element->sched)->elements;
//GList *elements = (element->sched)->elements;
GList *elements = gst_bin_get_list(GST_BIN (thread));
THR_INFO ("pausing thread");
@ -313,7 +312,7 @@ gst_thread_change_state (GstElement * element)
* + the pending state was already set by gstelement.c::set_state()
* + find every queue we manage, and signal its empty and full conditions
*/
g_mutex_lock (thread->lock);
g_mutex_lock (thread->lock);
GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);

View file

@ -197,7 +197,7 @@ gst_typefind_chain (GstPad *pad, GstBuffer *buf)
if (GST_STATE(typefind) != oldstate) {
gst_object_unref (GST_OBJECT (typefind));
GST_DEBUG(0, "state changed during signal, aborting\n");
cothread_switch(cothread_current_main());
gst_element_yield (typefind);
}
gst_object_unref (GST_OBJECT (typefind));
}

View file

@ -25,8 +25,18 @@
typedef struct _GstSchedulerChain GstSchedulerChain;
#define GST_PAD_THREADSTATE(pad) (cothread_state*) (GST_PAD_CAST (pad)->sched_private)
#define GST_ELEMENT_THREADSTATE(elem) (cothread_state*) (GST_ELEMENT_CAST (elem)->sched_private)
#define GST_BIN_THREADCONTEXT(bin) (cothread_context*) (GST_BIN_CAST (bin)->sched_private)
#define GST_ELEMENT_COTHREAD_STOPPING GST_ELEMENT_SCHEDULER_PRIVATE1
#define GST_ELEMENT_IS_COTHREAD_STOPPING(element) GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
typedef struct _GstBasicScheduler GstBasicScheduler;
typedef struct _GstBasicSchedulerClass GstBasicSchedulerClass;
struct _GstSchedulerChain {
GstScheduler *sched;
GstBasicScheduler *sched;
GList *disabled;
@ -39,10 +49,37 @@ struct _GstSchedulerChain {
gboolean schedule;
};
#define GST_TYPE_BASIC_SCHEDULER \
(gst_basic_scheduler_get_type())
#define GST_BASIC_SCHEDULER(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_BASIC_SCHEDULER,GstBasicScheduler))
#define GST_BASIC_SCHEDULER_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_BASIC_SCHEDULER,GstBasicSchedulerClass))
#define GST_IS_BASIC_SCHEDULER(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_BASIC_SCHEDULER))
#define GST_IS_BASIC_SCHEDULER_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BASIC_SCHEDULER))
#define GST_BASIC_SCHEDULER_CAST(sched) ((GstBasicScheduler *)(sched))
struct _GstBasicScheduler {
GstScheduler parent;
GList *elements;
gint num_elements;
GList *chains;
gint num_chains;
};
struct _GstBasicSchedulerClass {
GstSchedulerClass parent_class;
};
static GType _gst_basic_scheduler_type = 0;
static void gst_basic_scheduler_class_init (GstSchedulerClass * klass);
static void gst_basic_scheduler_init (GstScheduler * scheduler);
static void gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass);
static void gst_basic_scheduler_init (GstBasicScheduler * scheduler);
static void gst_basic_scheduler_dispose (GObject *object);
@ -50,9 +87,13 @@ static void gst_basic_scheduler_setup (GstScheduler *sched);
static void gst_basic_scheduler_reset (GstScheduler *sched);
static void gst_basic_scheduler_add_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_remove_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition);
static GstElementStateReturn
gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition);
static void gst_basic_scheduler_lock_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_unlock_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_error (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
static void gst_basic_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
static GstPad* gst_basic_scheduler_pad_select (GstScheduler *sched, GList *padlist);
@ -67,13 +108,13 @@ gst_basic_scheduler_get_type (void)
{
if (!_gst_basic_scheduler_type) {
static const GTypeInfo scheduler_info = {
sizeof (GstSchedulerClass),
sizeof (GstBasicSchedulerClass),
NULL,
NULL,
(GClassInitFunc) gst_basic_scheduler_class_init,
NULL,
NULL,
sizeof (GstScheduler),
sizeof (GstBasicScheduler),
0,
(GInstanceInitFunc) gst_basic_scheduler_init,
NULL
@ -85,40 +126,48 @@ gst_basic_scheduler_get_type (void)
}
static void
gst_basic_scheduler_class_init (GstSchedulerClass * klass)
gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass)
{
GObjectClass *gobject_class;
GstObjectClass *gstobject_class;
GstSchedulerClass *gstscheduler_class;
gobject_class = (GObjectClass*)klass;
gstobject_class = (GstObjectClass*)klass;
gstscheduler_class = (GstSchedulerClass*)klass;
parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_basic_scheduler_dispose);
klass->setup = GST_DEBUG_FUNCPTR (gst_basic_scheduler_setup);
klass->reset = GST_DEBUG_FUNCPTR (gst_basic_scheduler_reset);
klass->add_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_add_element);
klass->remove_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_remove_element);
klass->state_transition = GST_DEBUG_FUNCPTR (gst_basic_scheduler_state_transition);
klass->lock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_lock_element);
klass->unlock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_unlock_element);
klass->pad_connect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_connect);
klass->pad_disconnect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_disconnect);
klass->pad_select = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_select);
klass->iterate = GST_DEBUG_FUNCPTR (gst_basic_scheduler_iterate);
gstscheduler_class->setup = GST_DEBUG_FUNCPTR (gst_basic_scheduler_setup);
gstscheduler_class->reset = GST_DEBUG_FUNCPTR (gst_basic_scheduler_reset);
gstscheduler_class->add_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_add_element);
gstscheduler_class->remove_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_remove_element);
gstscheduler_class->state_transition = GST_DEBUG_FUNCPTR (gst_basic_scheduler_state_transition);
gstscheduler_class->lock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_lock_element);
gstscheduler_class->unlock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_unlock_element);
gstscheduler_class->yield = GST_DEBUG_FUNCPTR (gst_basic_scheduler_yield);
gstscheduler_class->interrupt = GST_DEBUG_FUNCPTR (gst_basic_scheduler_interrupt);
gstscheduler_class->error = GST_DEBUG_FUNCPTR (gst_basic_scheduler_error);
gstscheduler_class->pad_connect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_connect);
gstscheduler_class->pad_disconnect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_disconnect);
gstscheduler_class->pad_select = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_select);
gstscheduler_class->iterate = GST_DEBUG_FUNCPTR (gst_basic_scheduler_iterate);
}
static void
gst_basic_scheduler_init (GstScheduler *scheduler)
gst_basic_scheduler_init (GstBasicScheduler *scheduler)
{
scheduler->elements = NULL;
scheduler->num_elements = 0;
scheduler->chains = NULL;
scheduler->num_chains = 0;
}
static void
gst_basic_scheduler_dispose (GObject *object)
{
G_OBJECT_CLASS (parent_class)->dispose (object);
}
@ -152,7 +201,7 @@ GstPluginDesc plugin_desc = {
static int
gst_basic_scheduler_loopfunc_wrapper (int argc, char *argv[])
{
GstElement *element = GST_ELEMENT (argv);
GstElement *element = GST_ELEMENT_CAST (argv);
G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
GST_DEBUG_ENTER ("(%d,'%s')", argc, name);
@ -162,6 +211,7 @@ gst_basic_scheduler_loopfunc_wrapper (int argc, char *argv[])
GST_DEBUG_FUNCPTR_NAME (element->loopfunc), name);
(element->loopfunc) (element);
GST_DEBUG (GST_CAT_DATAFLOW, "element %s ended loop function\n", name);
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
@ -172,7 +222,7 @@ gst_basic_scheduler_loopfunc_wrapper (int argc, char *argv[])
static int
gst_basic_scheduler_chain_wrapper (int argc, char *argv[])
{
GstElement *element = GST_ELEMENT (argv);
GstElement *element = GST_ELEMENT_CAST (argv);
G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
GST_DEBUG_ENTER ("(\"%s\")", name);
@ -189,7 +239,9 @@ gst_basic_scheduler_chain_wrapper (int argc, char *argv[])
pads = g_list_next (pads);
if (!GST_IS_REAL_PAD (pad))
continue;
realpad = GST_REAL_PAD (pad);
realpad = GST_REAL_PAD_CAST (pad);
if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK) {
GstBuffer *buf;
@ -204,10 +256,14 @@ gst_basic_scheduler_chain_wrapper (int argc, char *argv[])
GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s\n", name,
GST_PAD_NAME (pad));
GST_RPAD_CHAINFUNC (realpad) (pad, buf);
GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s done\n", name,
GST_PAD_NAME (pad));
}
}
GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s done\n", name,
GST_PAD_NAME (pad));
else {
gst_element_error (element, "NULL buffer detected. Is \"%s:%s\" connected?",
name, GST_PAD_NAME (pad), NULL);
}
}
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
@ -220,7 +276,7 @@ gst_basic_scheduler_chain_wrapper (int argc, char *argv[])
static int
gst_basic_scheduler_src_wrapper (int argc, char *argv[])
{
GstElement *element = GST_ELEMENT (argv);
GstElement *element = GST_ELEMENT_CAST (argv);
GList *pads;
GstRealPad *realpad;
GstBuffer *buf = NULL;
@ -231,9 +287,12 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[])
do {
pads = element->pads;
while (pads) {
if (!GST_IS_REAL_PAD (pads->data))
continue;
realpad = (GstRealPad *) (pads->data);
realpad = GST_REAL_PAD_CAST (pads->data);
pads = g_list_next (pads);
if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SRC) {
GST_DEBUG (GST_CAT_DATAFLOW, "calling _getfunc for %s:%s\n", GST_DEBUG_PAD_NAME (realpad));
@ -243,7 +302,7 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[])
/* fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name); */
/* else */
buf =
(GST_RPAD_GETREGIONFUNC (realpad)) ((GstPad *) realpad, realpad->regiontype,
(GST_RPAD_GETREGIONFUNC (realpad)) (GST_PAD_CAST (realpad), realpad->regiontype,
realpad->offset, realpad->len);
realpad->regiontype = GST_REGION_VOID;
}
@ -252,12 +311,12 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[])
/* if (GST_RPAD_GETFUNC(realpad) == NULL) */
/* fprintf(stderr,"error, no getfunc in \"%s\"\n", name); */
/* else */
buf = GST_RPAD_GETFUNC (realpad) ((GstPad *) realpad);
buf = GST_RPAD_GETFUNC (realpad) (GST_PAD_CAST (realpad));
}
GST_DEBUG (GST_CAT_DATAFLOW, "calling gst_pad_push on pad %s:%s\n",
GST_DEBUG_PAD_NAME (realpad));
gst_pad_push ((GstPad *) realpad, buf);
gst_pad_push (GST_PAD_CAST (realpad), buf);
}
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
@ -281,8 +340,8 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
*/
while (GST_RPAD_BUFPEN (pad) != NULL) {
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen\n",
GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
/* we may no longer be the same pad, check. */
if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
@ -295,8 +354,8 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
/* now fill the bufferpen and switch so it can be consumed */
GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf;
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p\n",
GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n");
}
@ -314,12 +373,12 @@ gst_basic_scheduler_select_proxy (GstPad * pad, GstBuffer * buf)
/* now fill the bufferpen and switch so it can be consumed */
GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf;
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p\n",
GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)),
gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad;
GST_FLAG_UNSET (GST_PAD_PARENT (pad), GST_ELEMENT_COTHREAD_STOPPING);
cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
g_print ("done switching\n");
GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n");
@ -339,8 +398,8 @@ gst_basic_scheduler_gethandler_proxy (GstPad * pad)
while (GST_RPAD_BUFPEN (pad) == NULL) {
GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen\n",
GST_ELEMENT_NAME (GST_ELEMENT (GST_PAD_PARENT (pad))),
GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
/* we may no longer be the same pad, check. */
if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
@ -374,8 +433,8 @@ gst_basic_scheduler_pullregionfunc_proxy (GstPad * pad, GstRegionType type, guin
/* we will loop switching to the peer until it's filled up the bufferpen */
while (GST_RPAD_BUFPEN (pad) == NULL) {
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen\n",
GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
/* we may no longer be the same pad, check. */
if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
@ -392,7 +451,7 @@ gst_basic_scheduler_pullregionfunc_proxy (GstPad * pad, GstRegionType type, guin
}
static void
static gboolean
gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
{
GList *elements;
@ -403,15 +462,19 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
GST_DEBUG (GST_CAT_SCHEDULING, "chain is using COTHREADS\n");
g_assert (bin->threadcontext != NULL);
g_assert (GST_BIN_THREADCONTEXT (bin) != NULL);
/* walk through all the chain's elements */
elements = chain->elements;
while (elements) {
element = GST_ELEMENT (elements->data);
gboolean decoupled;
gint same_sched = 0;
element = GST_ELEMENT_CAST (elements->data);
elements = g_list_next (elements);
decoupled = (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED) ? TRUE : FALSE);
/* start out without a wrapper function, we select it later */
wrapper_function = NULL;
@ -423,7 +486,7 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
else {
/* otherwise we need to decide what kind of cothread */
/* if it's not DECOUPLED, we decide based on whether it's a source or not */
if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
if (!decoupled) {
/* if it doesn't have any sinks, it must be a source (duh) */
if (element->numsinkpads == 0) {
wrapper_function = GST_DEBUG_FUNCPTR (gst_basic_scheduler_src_wrapper);
@ -441,16 +504,28 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
/* now we have to walk through the pads to set up their state */
pads = gst_element_get_pad_list (element);
while (pads) {
GstPad *peerpad;
pad = GST_PAD (pads->data);
pads = g_list_next (pads);
if (!GST_IS_REAL_PAD (pad))
continue;
peerpad = GST_PAD (GST_RPAD_PEER (pad));
/* if the element is DECOUPLED or outside the manager, we have to chain */
if ((wrapper_function == NULL) ||
(GST_RPAD_PEER (pad) &&
(GST_ELEMENT (GST_PAD_PARENT (GST_PAD (GST_RPAD_PEER (pad))))->sched != chain->sched))
) {
(peerpad && (GST_ELEMENT_CAST (GST_PAD_PARENT (peerpad))->sched != GST_SCHEDULER (chain->sched)))) {
if (!decoupled && GST_RPAD_PEER (pad) &&
!GST_FLAG_IS_SET (GST_PAD_PARENT (peerpad), GST_ELEMENT_DECOUPLED)) {
/* whoa non decoupled with different schedulers */
gst_element_error (element, "element \"%s\" is not decoupled but has pads in different schedulers",
GST_ELEMENT_NAME (element), NULL);
return FALSE;
}
/* set the chain proxies */
if (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK) {
GST_DEBUG (GST_CAT_SCHEDULING, "copying chain function into push proxy for %s:%s\n",
@ -464,17 +539,17 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
GST_RPAD_PULLREGIONFUNC (pad) = GST_RPAD_GETREGIONFUNC (pad);
}
/* otherwise we really are a cothread */
}
/* otherwise we really are a cothread */
else {
if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
GST_DEBUG (GST_CAT_SCHEDULING, "setting cothreaded push proxy for sinkpad %s:%s\n",
GST_DEBUG_PAD_NAME (pad));
GST_DEBUG_PAD_NAME (pad));
GST_RPAD_CHAINHANDLER (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_chainhandler_proxy);
}
else {
GST_DEBUG (GST_CAT_SCHEDULING, "setting cothreaded pull proxy for srcpad %s:%s\n",
GST_DEBUG_PAD_NAME (pad));
GST_DEBUG_PAD_NAME (pad));
GST_RPAD_GETHANDLER (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_gethandler_proxy);
GST_RPAD_PULLREGIONFUNC (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pullregionfunc_proxy);
}
@ -483,17 +558,24 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
/* need to set up the cothread now */
if (wrapper_function != NULL) {
if (element->threadstate == NULL) {
/* FIXME handle cothread_create returning NULL */
element->threadstate = cothread_create (bin->threadcontext);
GST_DEBUG (GST_CAT_SCHEDULING, "created cothread %p for '%s'\n", element->threadstate,
if (GST_ELEMENT_THREADSTATE (element) == NULL) {
GST_ELEMENT_THREADSTATE (element) = cothread_create (GST_BIN_THREADCONTEXT (bin));
if (GST_ELEMENT_THREADSTATE (element) == NULL) {
gst_element_error (element, "could not create cothread for \"%s\"",
GST_ELEMENT_NAME (element), NULL);
return FALSE;
}
GST_DEBUG (GST_CAT_SCHEDULING, "created cothread %p for '%s'\n",
GST_ELEMENT_THREADSTATE (element),
GST_ELEMENT_NAME (element));
}
cothread_setfunc (element->threadstate, wrapper_function, 0, (char **) element);
cothread_setfunc (GST_ELEMENT_THREADSTATE (element), wrapper_function, 0, (char **) element);
GST_DEBUG (GST_CAT_SCHEDULING, "set wrapper function for '%s' to &%s\n",
GST_ELEMENT_NAME (element), GST_DEBUG_FUNCPTR_NAME (wrapper_function));
}
}
return TRUE;
}
/*
@ -533,7 +615,7 @@ gst_basic_scheduler_chained_chain (GstBin *bin, _GstBinChain *chain) {
static GstSchedulerChain *
gst_basic_scheduler_chain_new (GstScheduler * sched)
gst_basic_scheduler_chain_new (GstBasicScheduler * sched)
{
GstSchedulerChain *chain = g_new (GstSchedulerChain, 1);
@ -559,7 +641,7 @@ gst_basic_scheduler_chain_new (GstScheduler * sched)
static void
gst_basic_scheduler_chain_destroy (GstSchedulerChain * chain)
{
GstScheduler *sched = chain->sched;
GstBasicScheduler *sched = chain->sched;
/* remove the chain from the schedulers' list of chains */
sched->chains = g_list_remove (sched->chains, chain);
@ -582,14 +664,14 @@ gst_basic_scheduler_chain_add_element (GstSchedulerChain * chain, GstElement * e
chain);
/* set the sched pointer for the element */
element->sched = chain->sched;
element->sched = GST_SCHEDULER (chain->sched);
/* add the element to the list of 'disabled' elements */
chain->disabled = g_list_prepend (chain->disabled, element);
chain->num_elements++;
}
static void
static gboolean
gst_basic_scheduler_chain_enable_element (GstSchedulerChain * chain, GstElement * element)
{
GST_INFO (GST_CAT_SCHEDULING, "enabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),
@ -602,7 +684,7 @@ gst_basic_scheduler_chain_enable_element (GstSchedulerChain * chain, GstElement
chain->elements = g_list_prepend (chain->elements, element);
/* reschedule the chain */
gst_basic_scheduler_cothreaded_chain (GST_BIN (chain->sched->parent), chain);
return gst_basic_scheduler_cothreaded_chain (GST_BIN (GST_SCHEDULER (chain->sched)->parent), chain);
}
static void
@ -633,9 +715,9 @@ gst_basic_scheduler_chain_remove_element (GstSchedulerChain * chain, GstElement
gst_basic_scheduler_chain_disable_element (chain, element);
}
/* we have to check for a threadstate here because a queue doesn't have one */
if (element->threadstate) {
cothread_free (element->threadstate);
element->threadstate = NULL;
if (GST_ELEMENT_THREADSTATE (element)) {
cothread_free (GST_ELEMENT_THREADSTATE (element));
GST_ELEMENT_THREADSTATE (element) = NULL;
}
/* remove the element from the list of elements */
@ -645,13 +727,10 @@ gst_basic_scheduler_chain_remove_element (GstSchedulerChain * chain, GstElement
/* if there are no more elements in the chain, destroy the chain */
if (chain->num_elements == 0)
gst_basic_scheduler_chain_destroy (chain);
/* unset the sched pointer for the element */
element->sched = NULL;
}
static void
gst_basic_scheduler_chain_elements (GstScheduler * sched, GstElement * element1, GstElement * element2)
gst_basic_scheduler_chain_elements (GstBasicScheduler * sched, GstElement * element1, GstElement * element2)
{
GList *chains;
GstSchedulerChain *chain;
@ -722,7 +801,7 @@ gst_basic_scheduler_chain_elements (GstScheduler * sched, GstElement * element1,
/* find the chain within the scheduler that holds the element, if any */
static GstSchedulerChain *
gst_basic_scheduler_find_chain (GstScheduler * sched, GstElement * element)
gst_basic_scheduler_find_chain (GstBasicScheduler * sched, GstElement * element)
{
GList *chains;
GstSchedulerChain *chain;
@ -788,9 +867,9 @@ gst_basic_scheduler_setup (GstScheduler *sched)
GstBin *bin = GST_BIN (sched->parent);
/* first create thread context */
if (bin->threadcontext == NULL) {
if (GST_BIN_THREADCONTEXT (bin) == NULL) {
GST_DEBUG (GST_CAT_SCHEDULING, "initializing cothread context\n");
bin->threadcontext = cothread_context_init ();
GST_BIN_THREADCONTEXT (bin) = cothread_context_init ();
}
}
@ -799,18 +878,18 @@ gst_basic_scheduler_reset (GstScheduler *sched)
{
cothread_context *ctx;
GstBin *bin = GST_BIN (GST_SCHED_PARENT (sched));
GList *elements = sched->elements;
GList *elements = GST_BASIC_SCHEDULER_CAST (sched)->elements;
while (elements) {
GST_ELEMENT (elements->data)->threadstate = NULL;
GST_ELEMENT_THREADSTATE (elements->data) = NULL;
elements = g_list_next (elements);
}
ctx = GST_BIN (GST_SCHED_PARENT (sched))->threadcontext;
ctx = GST_BIN_THREADCONTEXT (GST_SCHED_PARENT (sched));
cothread_context_free (ctx);
GST_BIN (GST_SCHED_PARENT (sched))->threadcontext = NULL;
GST_BIN_THREADCONTEXT (GST_SCHED_PARENT (sched)) = NULL;
}
static void
@ -820,9 +899,7 @@ gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
GstPad *pad;
GstElement *peerelement;
GstSchedulerChain *chain;
g_return_if_fail (element != NULL);
g_return_if_fail (GST_IS_ELEMENT (element));
GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
/* if it's already in this scheduler, don't bother doing anything */
if (GST_ELEMENT_SCHED (element) == sched)
@ -844,11 +921,11 @@ gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
return;
/* first add it to the list of elements that are to be scheduled */
sched->elements = g_list_prepend (sched->elements, element);
sched->num_elements++;
bsched->elements = g_list_prepend (bsched->elements, element);
bsched->num_elements++;
/* create a chain to hold it, and add */
chain = gst_basic_scheduler_chain_new (sched);
chain = gst_basic_scheduler_chain_new (bsched);
gst_basic_scheduler_chain_add_element (chain, element);
/* set the sched pointer in all the pads */
@ -870,7 +947,7 @@ gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
if (GST_ELEMENT_SCHED (element) == GST_ELEMENT_SCHED (peerelement)) {
GST_INFO (GST_CAT_SCHEDULING, "peer is in same scheduler, chaining together");
/* make sure that the two elements are in the same chain */
gst_basic_scheduler_chain_elements (sched, element, peerelement);
gst_basic_scheduler_chain_elements (bsched, element, peerelement);
}
}
}
@ -880,67 +957,104 @@ static void
gst_basic_scheduler_remove_element (GstScheduler * sched, GstElement * element)
{
GstSchedulerChain *chain;
GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
g_return_if_fail (element != NULL);
g_return_if_fail (GST_IS_ELEMENT (element));
if (g_list_find (sched->elements, element)) {
if (g_list_find (bsched->elements, element)) {
GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from scheduler",
GST_ELEMENT_NAME (element));
/* find what chain the element is in */
chain = gst_basic_scheduler_find_chain (sched, element);
chain = gst_basic_scheduler_find_chain (bsched, element);
/* remove it from its chain */
gst_basic_scheduler_chain_remove_element (chain, element);
/* remove it from the list of elements */
sched->elements = g_list_remove (sched->elements, element);
sched->num_elements--;
bsched->elements = g_list_remove (bsched->elements, element);
bsched->num_elements--;
/* unset the scheduler pointer in the element */
GST_ELEMENT_SCHED (element) = NULL;
}
}
static void
static GstElementStateReturn
gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition)
{
GstSchedulerChain *chain;
GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
/* find the chain the element is in */
chain = gst_basic_scheduler_find_chain (sched, element);
chain = gst_basic_scheduler_find_chain (bsched, element);
/* remove it from the chain */
if (chain) {
if (transition == GST_STATE_PLAYING_TO_PAUSED)
gst_basic_scheduler_chain_disable_element (chain, element);
if (transition == GST_STATE_PAUSED_TO_PLAYING)
gst_basic_scheduler_chain_enable_element (chain, element);
if (!gst_basic_scheduler_chain_enable_element (chain, element)) {
GST_INFO (GST_CAT_SCHEDULING, "could not enable element \"%s\"", GST_ELEMENT_NAME (element));
return GST_STATE_FAILURE;
}
}
else {
GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" not found in any chain, no state change", GST_ELEMENT_NAME (element));
}
return GST_STATE_SUCCESS;
}
static void
gst_basic_scheduler_lock_element (GstScheduler * sched, GstElement * element)
{
if (element->threadstate)
cothread_lock (element->threadstate);
if (GST_ELEMENT_THREADSTATE (element))
cothread_lock (GST_ELEMENT_THREADSTATE (element));
}
static void
gst_basic_scheduler_unlock_element (GstScheduler * sched, GstElement * element)
{
if (element->threadstate)
cothread_unlock (element->threadstate);
if (GST_ELEMENT_THREADSTATE (element))
cothread_unlock (GST_ELEMENT_THREADSTATE (element));
}
static void
gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad * srcpad, GstPad * sinkpad)
gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element)
{
if (GST_ELEMENT_IS_COTHREAD_STOPPING (element)) {
cothread_switch (cothread_current_main ());
}
}
static void
gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element)
{
cothread_switch (cothread_current_main ());
}
static void
gst_basic_scheduler_error (GstScheduler *sched, GstElement *element)
{
GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
if (GST_ELEMENT_THREADSTATE (element)) {
GstSchedulerChain *chain;
chain = gst_basic_scheduler_find_chain (bsched, element);
if (chain)
gst_basic_scheduler_chain_disable_element (chain, element);
GST_STATE_PENDING (GST_SCHEDULER (sched)->parent) = GST_STATE_PAUSED;
cothread_switch (cothread_current_main ());
}
}
static void
gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad *srcpad, GstPad *sinkpad)
{
GstElement *srcelement, *sinkelement;
GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
srcelement = GST_PAD_PARENT (srcpad);
g_return_if_fail (srcelement != NULL);
@ -955,7 +1069,7 @@ gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad * srcpad, GstPad *
if (GST_ELEMENT_SCHED (srcelement) == GST_ELEMENT_SCHED (sinkelement)) {
GST_INFO (GST_CAT_SCHEDULING, "peer %s:%s is in same scheduler, chaining together",
GST_DEBUG_PAD_NAME (sinkpad));
gst_basic_scheduler_chain_elements (sched, srcelement, sinkelement);
gst_basic_scheduler_chain_elements (bsched, srcelement, sinkelement);
}
}
@ -965,21 +1079,22 @@ gst_basic_scheduler_pad_disconnect (GstScheduler * sched, GstPad * srcpad, GstPa
GstSchedulerChain *chain;
GstElement *element1, *element2;
GstSchedulerChain *chain1, *chain2;
GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
GST_INFO (GST_CAT_SCHEDULING, "disconnecting pads %s:%s and %s:%s",
GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
/* we need to have the parent elements of each pad */
element1 = GST_ELEMENT (GST_PAD_PARENT (srcpad));
element2 = GST_ELEMENT (GST_PAD_PARENT (sinkpad));
element1 = GST_ELEMENT_CAST (GST_PAD_PARENT (srcpad));
element2 = GST_ELEMENT_CAST (GST_PAD_PARENT (sinkpad));
/* first task is to remove the old chain they belonged to.
* this can be accomplished by taking either of the elements,
* since they are guaranteed to be in the same chain
* FIXME is it potentially better to make an attempt at splitting cleaner??
*/
chain1 = gst_basic_scheduler_find_chain (sched, element1);
chain2 = gst_basic_scheduler_find_chain (sched, element2);
chain1 = gst_basic_scheduler_find_chain (bsched, element1);
chain2 = gst_basic_scheduler_find_chain (bsched, element2);
if (chain1 != chain2) {
/* elements not in the same chain don't need to be separated */
@ -992,14 +1107,14 @@ gst_basic_scheduler_pad_disconnect (GstScheduler * sched, GstPad * srcpad, GstPa
gst_basic_scheduler_chain_destroy (chain1);
/* now create a new chain to hold element1 and build it from scratch */
chain1 = gst_basic_scheduler_chain_new (sched);
chain1 = gst_basic_scheduler_chain_new (bsched);
gst_basic_scheduler_chain_recursive_add (chain1, element1);
}
/* check the other element to see if it landed in the newly created chain */
if (gst_basic_scheduler_find_chain (sched, element2) == NULL) {
if (gst_basic_scheduler_find_chain (bsched, element2) == NULL) {
/* if not in chain, create chain and build from scratch */
chain2 = gst_basic_scheduler_chain_new (sched);
chain2 = gst_basic_scheduler_chain_new (bsched);
gst_basic_scheduler_chain_recursive_add (chain2, element2);
}
}
@ -1034,7 +1149,7 @@ gst_basic_scheduler_pad_select (GstScheduler * sched, GList * padlist)
if (pad != NULL) {
GstRealPad *peer = GST_RPAD_PEER (pad);
cothread_switch (GST_ELEMENT (GST_PAD_PARENT (peer))->threadstate);
cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (peer)));
g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)),
gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
@ -1055,14 +1170,13 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
GstElement *entry;
gboolean eos = FALSE;
GList *elements;
gint scheduled = 0;
GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
GST_DEBUG_ENTER ("(\"%s\")", GST_ELEMENT_NAME (bin));
g_return_val_if_fail (bin != NULL, TRUE);
g_return_val_if_fail (GST_IS_BIN (bin), TRUE);
/* step through all the chains */
chains = sched->chains;
chains = bsched->chains;
if (chains == NULL)
return FALSE;
@ -1080,14 +1194,14 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
GST_DEBUG (GST_CAT_SCHEDULING, "there are %d elements in this chain\n", chain->num_elements);
elements = chain->elements;
while (elements) {
entry = GST_ELEMENT (elements->data);
entry = GST_ELEMENT_CAST (elements->data);
elements = g_list_next (elements);
if (GST_FLAG_IS_SET (entry, GST_ELEMENT_DECOUPLED)) {
GST_DEBUG (GST_CAT_SCHEDULING, "entry \"%s\" is DECOUPLED, skipping\n",
GST_ELEMENT_NAME (entry));
entry = NULL;
}
else if (GST_FLAG_IS_SET (entry, GST_ELEMENT_NO_ENTRY)) {
else if (GST_FLAG_IS_SET (entry, GST_ELEMENT_INFINITE_LOOP)) {
GST_DEBUG (GST_CAT_SCHEDULING, "entry \"%s\" is not valid, skipping\n",
GST_ELEMENT_NAME (entry));
entry = NULL;
@ -1099,7 +1213,13 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
GST_DEBUG (GST_CAT_DATAFLOW, "set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
GST_ELEMENT_NAME (entry), entry);
cothread_switch (entry->threadstate);
if (GST_ELEMENT_THREADSTATE (entry)) {
cothread_switch (GST_ELEMENT_THREADSTATE (entry));
}
else {
GST_DEBUG (GST_CAT_DATAFLOW, "cothread switch not possible, element has no threadstate\n");
return FALSE;
}
/* following is a check to see if the chain was interrupted due to a
* top-half state_change(). (i.e., if there's a pending state.)
@ -1113,16 +1233,18 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
GST_STATE_PENDING (GST_SCHEDULER (sched)->parent));
return FALSE;
}
scheduled++;
}
else {
GST_INFO (GST_CAT_DATAFLOW, "NO ENTRY INTO CHAIN!");
eos = TRUE;
if (scheduled == 0)
eos = TRUE;
}
}
else {
GST_INFO (GST_CAT_DATAFLOW, "NO ENABLED ELEMENTS IN CHAIN!!");
eos = TRUE;
if (scheduled == 0)
eos = TRUE;
}
}
@ -1137,6 +1259,7 @@ gst_basic_scheduler_show (GstScheduler * sched)
GList *chains, *elements;
GstElement *element;
GstSchedulerChain *chain;
GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
if (sched == NULL) {
g_print ("scheduler doesn't exist for this element\n");
@ -1147,8 +1270,8 @@ gst_basic_scheduler_show (GstScheduler * sched)
g_print ("SCHEDULER DUMP FOR MANAGING BIN \"%s\"\n", GST_ELEMENT_NAME (sched->parent));
g_print ("scheduler has %d elements in it: ", sched->num_elements);
elements = sched->elements;
g_print ("scheduler has %d elements in it: ", bsched->num_elements);
elements = bsched->elements;
while (elements) {
element = GST_ELEMENT (elements->data);
elements = g_list_next (elements);
@ -1157,8 +1280,8 @@ gst_basic_scheduler_show (GstScheduler * sched)
}
g_print ("\n");
g_print ("scheduler has %d chains in it\n", sched->num_chains);
chains = sched->chains;
g_print ("scheduler has %d chains in it\n", bsched->num_chains);
chains = bsched->chains;
while (chains) {
chain = (GstSchedulerChain *) (chains->data);
chains = g_list_next (chains);

View file

@ -272,46 +272,44 @@ gst_aggregator_loop (GstElement *element)
aggregator = GST_AGGREGATOR (element);
do {
if (aggregator->sched == AGGREGATOR_LOOP ||
aggregator->sched == AGGREGATOR_LOOP_PEEK) {
GList *pads = aggregator->sinkpads;
if (aggregator->sched == AGGREGATOR_LOOP ||
aggregator->sched == AGGREGATOR_LOOP_PEEK) {
GList *pads = aggregator->sinkpads;
while (pads) {
GstPad *pad = GST_PAD (pads->data);
pads = g_list_next (pads);
while (pads) {
GstPad *pad = GST_PAD (pads->data);
pads = g_list_next (pads);
if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
buf = gst_pad_peek (pad);
if (buf == NULL)
continue;
if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
buf = gst_pad_peek (pad);
if (buf == NULL)
continue;
g_assert (buf == gst_pad_pull (pad));
debug = "loop_peek";
}
else {
buf = gst_pad_pull (pad);
debug = "loop";
}
gst_aggregator_push (aggregator, pad, buf, debug);
}
}
else {
if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
GstPad *pad;
debug = "loop_select";
pad = gst_pad_select (aggregator->sinkpads);
buf = gst_pad_pull (pad);
gst_aggregator_push (aggregator, pad, buf, debug);
g_assert (buf == gst_pad_pull (pad));
debug = "loop_peek";
}
else {
g_assert_not_reached ();
buf = gst_pad_pull (pad);
debug = "loop";
}
gst_aggregator_push (aggregator, pad, buf, debug);
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
}
else {
if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
GstPad *pad;
debug = "loop_select";
pad = gst_pad_select (aggregator->sinkpads);
buf = gst_pad_pull (pad);
gst_aggregator_push (aggregator, pad, buf, debug);
}
else {
g_assert_not_reached ();
}
}
}
/**

View file

@ -387,7 +387,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
case ARG_OUTPUT:
break;
case ARG_DATA:
src->data = g_value_get_int (value);
src->data = g_value_get_enum (value);
switch (src->data) {
case FAKESRC_DATA_ALLOCATE:
if (src->parent) {
@ -403,7 +403,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
}
break;
case ARG_SIZETYPE:
src->sizetype = g_value_get_int (value);
src->sizetype = g_value_get_enum (value);
break;
case ARG_SIZEMIN:
src->sizemin = g_value_get_int (value);
@ -415,7 +415,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
src->parentsize = g_value_get_int (value);
break;
case ARG_FILLTYPE:
src->filltype = g_value_get_int (value);
src->filltype = g_value_get_enum (value);
break;
case ARG_PATTERN:
break;
@ -455,13 +455,13 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS
g_value_set_boolean (value, src->loop_based);
break;
case ARG_OUTPUT:
g_value_set_int (value, src->output);
g_value_set_enum (value, src->output);
break;
case ARG_DATA:
g_value_set_int (value, src->data);
g_value_set_enum (value, src->data);
break;
case ARG_SIZETYPE:
g_value_set_int (value, src->sizetype);
g_value_set_enum (value, src->sizetype);
break;
case ARG_SIZEMIN:
g_value_set_int (value, src->sizemin);
@ -473,7 +473,7 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS
g_value_set_int (value, src->parentsize);
break;
case ARG_FILLTYPE:
g_value_set_int (value, src->filltype);
g_value_set_enum (value, src->filltype);
break;
case ARG_PATTERN:
g_value_set_string (value, src->pattern);
@ -689,49 +689,46 @@ static void
gst_fakesrc_loop(GstElement *element)
{
GstFakeSrc *src;
GList *pads;
g_return_if_fail(element != NULL);
g_return_if_fail(GST_IS_FAKESRC(element));
src = GST_FAKESRC (element);
do {
GList *pads;
pads = gst_element_get_pad_list (element);
pads = GST_ELEMENT (src)->pads;
while (pads) {
GstPad *pad = GST_PAD (pads->data);
GstBuffer *buf;
while (pads) {
GstPad *pad = GST_PAD (pads->data);
GstBuffer *buf;
if (src->rt_num_buffers == 0) {
src->eos = TRUE;
}
else {
if (src->rt_num_buffers > 0)
src->rt_num_buffers--;
}
if (src->eos) {
gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
return;
}
buf = gst_fakesrc_create_buffer (src);
GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
if (!src->silent) {
gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
}
g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
buf, pad);
gst_pad_push (pad, buf);
pads = g_list_next (pads);
if (src->rt_num_buffers == 0) {
src->eos = TRUE;
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
else {
if (src->rt_num_buffers > 0)
src->rt_num_buffers--;
}
if (src->eos) {
gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
return;
}
buf = gst_fakesrc_create_buffer (src);
GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
if (!src->silent) {
gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
}
g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
buf, pad);
gst_pad_push (pad, buf);
pads = g_list_next (pads);
}
}
static GstElementStateReturn

View file

@ -30,6 +30,7 @@
#include <unistd.h>
#include <sys/mman.h>
#include <errno.h>
#include <string.h>
/**********************************************************************
@ -327,10 +328,10 @@ gst_filesrc_map_region (GstFileSrc *src, off_t offset, size_t size)
/* mmap() the data into this new buffer */
GST_BUFFER_DATA(buf) = mmap (NULL, size, PROT_READ, MAP_SHARED, src->fd, offset);
if (GST_BUFFER_DATA(buf) == NULL) {
fprintf (stderr, "ERROR: gstfilesrc couldn't map file!\n");
gst_element_error (GST_ELEMENT (src), "couldn't map file");
} else if (GST_BUFFER_DATA(buf) == MAP_FAILED) {
g_error ("gstfilesrc mmap(0x%x, %d, 0x%llx) : %s",
size, src->fd, offset, sys_errlist[errno]);
gst_element_error (GST_ELEMENT (src), "mmap (0x%x, %d, 0x%llx) : %s",
size, src->fd, offset, strerror (errno));
}
#ifdef MADV_SEQUENTIAL
/* madvise to tell the kernel what to do with it */
@ -533,8 +534,8 @@ gst_filesrc_open_file (GstFileSrc *src)
/* open the file */
src->fd = open (src->filename, O_RDONLY);
if (src->fd < 0) {
perror ("open");
gst_element_error (GST_ELEMENT (src), g_strconcat("opening file \"", src->filename, "\"", NULL));
gst_element_error (GST_ELEMENT (src), "opening file \"%s\" (%s)",
src->filename, strerror (errno), NULL);
return FALSE;
} else {
/* find the file length */
@ -557,7 +558,6 @@ gst_filesrc_close_file (GstFileSrc *src)
{
g_return_if_fail (GST_FLAG_IS_SET (src, GST_FILESRC_OPEN));
g_print ("close\n");
/* close the file */
close (src->fd);
@ -565,6 +565,8 @@ gst_filesrc_close_file (GstFileSrc *src)
src->fd = 0;
src->filelen = 0;
src->curoffset = 0;
if (src->mapbuf)
gst_buffer_unref (src->mapbuf);
GST_FLAG_UNSET (src, GST_FILESRC_OPEN);
}
@ -575,17 +577,22 @@ gst_filesrc_change_state (GstElement *element)
{
GstFileSrc *src = GST_FILESRC(element);
if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
gst_filesrc_close_file (GST_FILESRC (element));
} if (GST_STATE_PENDING (element) == GST_STATE_READY) {
src->curoffset = 0;
} else {
if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
if (!gst_filesrc_open_file (GST_FILESRC (element)))
return GST_STATE_FAILURE;
}
switch (GST_STATE_TRANSITION (element)) {
case GST_STATE_NULL_TO_READY:
if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
if (!gst_filesrc_open_file (GST_FILESRC (element)))
return GST_STATE_FAILURE;
}
break;
case GST_STATE_READY_TO_NULL:
if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
gst_filesrc_close_file (GST_FILESRC (element));
break;
case GST_STATE_READY_TO_PAUSED:
case GST_STATE_PAUSED_TO_READY:
src->curoffset = 0;
default:
break;
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)

View file

@ -205,27 +205,24 @@ gst_identity_loop (GstElement *element)
identity = GST_IDENTITY (element);
do {
buf = gst_pad_pull (identity->sinkpad);
buf = gst_pad_pull (identity->sinkpad);
for (i=identity->duplicate; i; i--) {
if (!identity->silent)
g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n",
for (i=identity->duplicate; i; i--) {
if (!identity->silent)
g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n",
GST_DEBUG_PAD_NAME (identity->sinkpad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
buf);
if (i>1)
gst_buffer_ref (buf);
if (i>1)
gst_buffer_ref (buf);
gst_pad_push (identity->srcpad, buf);
gst_pad_push (identity->srcpad, buf);
if (identity->sleep_time)
usleep (identity->sleep_time);
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
if (identity->sleep_time)
usleep (identity->sleep_time);
}
}
static void

View file

@ -65,11 +65,13 @@ enum {
ARG_LEAKY,
ARG_LEVEL,
ARG_MAX_LEVEL,
ARG_MAY_DEADLOCK,
};
static void gst_queue_class_init (GstQueueClass *klass);
static void gst_queue_init (GstQueue *queue);
static void gst_queue_dispose (GObject *object);
static void gst_queue_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
@ -141,18 +143,22 @@ gst_queue_class_init (GstQueueClass *klass)
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEAKY,
g_param_spec_enum("leaky","Leaky","Where the queue leaks, if at all.",
GST_TYPE_QUEUE_LEAKY,GST_QUEUE_NO_LEAK,G_PARAM_READWRITE));
g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEVEL,
g_param_spec_int("level","Level","How many buffers are in the queue.",
0,G_MAXINT,0,G_PARAM_READABLE));
g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_MAX_LEVEL,
g_param_spec_int("max_level","Maximum Level","How many buffers the queue holds.",
0,G_MAXINT,100,G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY,
g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL,
g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
0, G_MAXINT, 0, G_PARAM_READABLE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
0, G_MAXINT, 100, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
TRUE, G_PARAM_READWRITE));
gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_queue_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_queue_get_property);
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state);
}
@ -182,6 +188,7 @@ gst_queue_init (GstQueue *queue)
queue->size_buffers = 100; /* 100 buffers */
queue->size_bytes = 100 * 1024; /* 100KB */
queue->size_time = 1000000000LL; /* 1sec */
queue->may_deadlock = TRUE;
queue->qlock = g_mutex_new ();
queue->reader = FALSE;
@ -191,6 +198,18 @@ gst_queue_init (GstQueue *queue)
GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
}
static void
gst_queue_dispose (GObject *object)
{
GstQueue *queue = GST_QUEUE (object);
g_mutex_free (queue->qlock);
g_cond_free (queue->not_empty);
g_cond_free (queue->not_full);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static GstBufferPool*
gst_queue_get_bufferpool (GstPad *pad)
{
@ -334,10 +353,21 @@ restart:
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
gst_element_interrupt (GST_ELEMENT (queue));
goto restart;
}
g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
if (GST_STATE (queue) != GST_STATE_PLAYING) {
/* this means the other end is shut down */
/* try to signal to resolve the error */
if (!queue->may_deadlock) {
g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
return;
}
else {
gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements");
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->writer)
@ -402,10 +432,20 @@ restart:
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
gst_element_interrupt (GST_ELEMENT (queue));
goto restart;
}
g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
if (GST_STATE (queue) != GST_STATE_PLAYING) {
/* this means the other end is shut down */
if (!queue->may_deadlock) {
g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
return NULL;
}
else {
gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->reader)
@ -513,10 +553,13 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa
switch (prop_id) {
case ARG_LEAKY:
queue->leaky = g_value_get_int(value);
queue->leaky = g_value_get_int (value);
break;
case ARG_MAX_LEVEL:
queue->size_buffers = g_value_get_int(value);
queue->size_buffers = g_value_get_int (value);
break;
case ARG_MAY_DEADLOCK:
queue->may_deadlock = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@ -536,13 +579,16 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe
switch (prop_id) {
case ARG_LEAKY:
g_value_set_int(value, queue->leaky);
g_value_set_int (value, queue->leaky);
break;
case ARG_LEVEL:
g_value_set_int(value, queue->level_buffers);
g_value_set_int (value, queue->level_buffers);
break;
case ARG_MAX_LEVEL:
g_value_set_int(value, queue->size_buffers);
g_value_set_int (value, queue->size_buffers);
break;
case ARG_MAY_DEADLOCK:
g_value_set_boolean (value, queue->may_deadlock);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);

View file

@ -74,6 +74,7 @@ struct _GstQueue {
guint64 size_time; /* size of queue in time */
gint leaky; /* whether the queue is leaky, and if so at which end */
gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
GMutex *qlock; /* lock for queue (vs object lock) */
/* we are single reader and single writer queue */