implemented threadsafe property set/get system as discussed in docs/random/wingo/threadsafe-properties some cleanups ...

Original commit message from CVS:
* implemented threadsafe property set/get system as discussed in
docs/random/wingo/threadsafe-properties
* some cleanups
* this change will cause binary incompatibilities, better rebuild them plugins

now, off to drink :-)
This commit is contained in:
Andy Wingo 2002-05-26 03:23:25 +00:00
parent 9a6e87b6b6
commit d2ed0906a6
6 changed files with 490 additions and 57 deletions

View file

@ -23,6 +23,7 @@
/* #define GST_DEBUG_ENABLED */
#include <glib.h>
#include <stdarg.h>
#include <gobject/gvaluecollector.h>
#include "gst_private.h"
#include "gstelement.h"
@ -54,9 +55,9 @@ static void gst_element_class_init (GstElementClass *klass);
static void gst_element_init (GstElement *element);
static void gst_element_base_class_init (GstElementClass *klass);
static void gst_element_set_property (GObject *object, guint prop_id,
static void gst_element_real_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
static void gst_element_get_property (GObject *object, guint prop_id, GValue *value,
static void gst_element_real_get_property (GObject *object, guint prop_id, GValue *value,
GParamSpec *pspec);
static void gst_element_dispatch_properties_changed (GObject * object, guint n_pspecs, GParamSpec **pspecs);
@ -139,8 +140,8 @@ gst_element_class_init (GstElementClass *klass)
2, G_TYPE_OBJECT, G_TYPE_PARAM);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_element_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_element_get_property);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_element_real_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_element_real_get_property);
/* see the comments at gst_element_dispatch_properties_changed */
gobject_class->dispatch_properties_changed
@ -167,8 +168,8 @@ gst_element_base_class_init (GstElementClass *klass)
gobject_class = (GObjectClass*) klass;
gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_element_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_element_get_property);
gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_element_real_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_element_real_get_property);
}
static void
@ -188,7 +189,7 @@ gst_element_init (GstElement *element)
}
static void
gst_element_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
gst_element_real_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
{
GstElementClass *oclass = CLASS (object);
@ -197,7 +198,7 @@ gst_element_set_property (GObject *object, guint prop_id, const GValue *value, G
}
static void
gst_element_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
gst_element_real_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
{
GstElementClass *oclass = CLASS (object);
@ -237,6 +238,339 @@ gst_element_dispatch_properties_changed (GObject *object,
}
}
typedef struct {
const GParamSpec *pspec;
const GValue *value;
} prop_value_t;
static void
element_set_property (GstElement *element, const GParamSpec *pspec, const GValue *value)
{
prop_value_t *prop_value = g_new0 (prop_value_t, 1);
g_message ("Setting property %s::%s to %s for object %s\n", G_OBJECT_TYPE_NAME (element),
pspec->name, g_strdup_value_contents (value), GST_OBJECT_NAME (element));
prop_value->pspec = pspec;
prop_value->value = value;
g_async_queue_push (element->prop_value_queue, prop_value);
}
static void
element_get_property (GstElement *element, const GParamSpec *pspec, GValue *value)
{
g_message ("Getting property %s::%s to %s for object %s\n", G_OBJECT_TYPE_NAME (element),
pspec->name, g_strdup_value_contents (value), GST_OBJECT_NAME (element));
g_mutex_lock (element->property_mutex);
g_object_get_property ((GObject*)element, pspec->name, value);
g_mutex_unlock (element->property_mutex);
}
static void
gst_element_threadsafe_properties_pre_run (GstElement *element)
{
GST_DEBUG (GST_CAT_THREAD, "locking element %s", GST_OBJECT_NAME (element));
g_mutex_lock (element->property_mutex);
gst_element_set_pending_properties (element);
}
static void
gst_element_threadsafe_properties_post_run (GstElement *element)
{
GST_DEBUG (GST_CAT_THREAD, "unlocking element %s", GST_OBJECT_NAME (element));
g_mutex_unlock (element->property_mutex);
}
void
gst_element_enable_threadsafe_properties (GstElement *element)
{
g_return_if_fail (GST_IS_ELEMENT (element));
GST_FLAG_SET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES);
element->pre_run_func = gst_element_threadsafe_properties_pre_run;
element->post_run_func = gst_element_threadsafe_properties_post_run;
if (!element->prop_value_queue)
element->prop_value_queue = g_async_queue_new ();
if (!element->property_mutex)
element->property_mutex = g_mutex_new ();
}
void
gst_element_disable_threadsafe_properties (GstElement *element)
{
g_return_if_fail (GST_IS_ELEMENT (element));
GST_FLAG_UNSET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES);
element->pre_run_func = NULL;
element->post_run_func = NULL;
/* let's keep around that async queue */
}
void
gst_element_set_pending_properties (GstElement *element)
{
prop_value_t *prop_value;
while ((prop_value = g_async_queue_try_pop (element->prop_value_queue))) {
g_object_set_property ((GObject*)element, prop_value->pspec->name, prop_value->value);
g_free (prop_value);
}
}
/* following 6 functions taken mostly from gobject.c */
void
gst_element_set (GstElement *element, const gchar *first_property_name, ...)
{
va_list var_args;
g_return_if_fail (GST_IS_ELEMENT (element));
va_start (var_args, first_property_name);
gst_element_set_valist (element, first_property_name, var_args);
va_end (var_args);
}
void
gst_element_get (GstElement *element, const gchar *first_property_name, ...)
{
va_list var_args;
g_return_if_fail (GST_IS_ELEMENT (element));
va_start (var_args, first_property_name);
gst_element_get_valist (element, first_property_name, var_args);
va_end (var_args);
}
void
gst_element_set_valist (GstElement *element, const gchar *first_property_name, va_list var_args)
{
const gchar *name;
GObject *object;
g_return_if_fail (GST_IS_ELEMENT (element));
object = (GObject*)element;
if (!GST_FLAG_IS_SET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES)) {
g_object_set_valist (object, first_property_name, var_args);
return;
}
g_object_ref (object);
name = first_property_name;
while (name)
{
GValue value = { 0, };
GParamSpec *pspec;
gchar *error = NULL;
pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (object), name);
if (!pspec)
{
g_warning ("%s: object class `%s' has no property named `%s'",
G_STRLOC,
G_OBJECT_TYPE_NAME (object),
name);
break;
}
if (!(pspec->flags & G_PARAM_WRITABLE))
{
g_warning ("%s: property `%s' of object class `%s' is not writable",
G_STRLOC,
pspec->name,
G_OBJECT_TYPE_NAME (object));
break;
}
g_value_init (&value, G_PARAM_SPEC_VALUE_TYPE (pspec));
G_VALUE_COLLECT (&value, var_args, 0, &error);
if (error)
{
g_warning ("%s: %s", G_STRLOC, error);
g_free (error);
/* we purposely leak the value here, it might not be
* in a sane state if an error condition occoured
*/
break;
}
element_set_property (element, pspec, &value);
g_value_unset (&value);
name = va_arg (var_args, gchar*);
}
g_object_unref (object);
}
void
gst_element_get_valist (GstElement *element, const gchar *first_property_name, va_list var_args)
{
const gchar *name;
GObject *object;
g_return_if_fail (GST_IS_ELEMENT (element));
object = (GObject*)element;
if (!GST_FLAG_IS_SET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES)) {
g_object_get_valist (object, first_property_name, var_args);
return;
}
g_object_ref (object);
name = first_property_name;
while (name)
{
GValue value = { 0, };
GParamSpec *pspec;
gchar *error;
pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (object), name);
if (!pspec)
{
g_warning ("%s: object class `%s' has no property named `%s'",
G_STRLOC,
G_OBJECT_TYPE_NAME (object),
name);
break;
}
if (!(pspec->flags & G_PARAM_READABLE))
{
g_warning ("%s: property `%s' of object class `%s' is not readable",
G_STRLOC,
pspec->name,
G_OBJECT_TYPE_NAME (object));
break;
}
g_value_init (&value, G_PARAM_SPEC_VALUE_TYPE (pspec));
element_get_property (element, pspec, &value);
G_VALUE_LCOPY (&value, var_args, 0, &error);
if (error)
{
g_warning ("%s: %s", G_STRLOC, error);
g_free (error);
g_value_unset (&value);
break;
}
g_value_unset (&value);
name = va_arg (var_args, gchar*);
}
g_object_unref (object);
}
void
gst_element_set_property (GstElement *element, const gchar *property_name, const GValue *value)
{
GParamSpec *pspec;
GObject *object;
g_return_if_fail (GST_IS_ELEMENT (element));
g_return_if_fail (property_name != NULL);
g_return_if_fail (G_IS_VALUE (value));
object = (GObject*)element;
if (!GST_FLAG_IS_SET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES)) {
g_object_set_property (object, property_name, value);
return;
}
g_object_ref (object);
pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (object), property_name);
if (!pspec)
g_warning ("%s: object class `%s' has no property named `%s'",
G_STRLOC,
G_OBJECT_TYPE_NAME (object),
property_name);
else
element_set_property (element, pspec, value);
g_object_unref (object);
}
void
gst_element_get_property (GstElement *element, const gchar *property_name, GValue *value)
{
GParamSpec *pspec;
GObject *object;
g_return_if_fail (GST_IS_ELEMENT (element));
g_return_if_fail (property_name != NULL);
g_return_if_fail (G_IS_VALUE (value));
object = (GObject*)element;
if (!GST_FLAG_IS_SET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES)) {
g_object_get_property (object, property_name, value);
return;
}
g_object_ref (object);
pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (object), property_name);
if (!pspec)
g_warning ("%s: object class `%s' has no property named `%s'",
G_STRLOC,
G_OBJECT_TYPE_NAME (object),
property_name);
else
{
GValue *prop_value, tmp_value = { 0, };
/* auto-conversion of the callers value type
*/
if (G_VALUE_TYPE (value) == G_PARAM_SPEC_VALUE_TYPE (pspec))
{
g_value_reset (value);
prop_value = value;
}
else if (!g_value_type_transformable (G_PARAM_SPEC_VALUE_TYPE (pspec), G_VALUE_TYPE (value)))
{
g_warning ("can't retrieve property `%s' of type `%s' as value of type `%s'",
pspec->name,
g_type_name (G_PARAM_SPEC_VALUE_TYPE (pspec)),
G_VALUE_TYPE_NAME (value));
g_object_unref (object);
return;
}
else
{
g_value_init (&tmp_value, G_PARAM_SPEC_VALUE_TYPE (pspec));
prop_value = &tmp_value;
}
element_get_property (element, pspec, prop_value);
if (prop_value != value)
{
g_value_transform (prop_value, value);
g_value_unset (&tmp_value);
}
}
g_object_unref (object);
}
static GstPad*
gst_element_request_pad (GstElement *element, GstPadTemplate *templ, const gchar* name)
{
@ -1618,6 +1952,12 @@ gst_element_dispose (GObject *object)
g_mutex_free (element->state_mutex);
g_cond_free (element->state_cond);
if (element->prop_value_queue)
g_async_queue_unref (element->prop_value_queue);
element->prop_value_queue = NULL;
if (element->property_mutex)
g_mutex_free (element->property_mutex);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
@ -1897,7 +2237,7 @@ gst_element_state_get_name (GstElementState state)
static void
gst_element_populate_std_props (GObjectClass * klass,
const char *prop_name, guint arg_id, GParamFlags flags)
const gchar *prop_name, guint arg_id, GParamFlags flags)
{
GQuark prop_id = g_quark_from_string (prop_name);
GParamSpec *pspec;
@ -2004,7 +2344,7 @@ gst_element_populate_std_props (GObjectClass * klass,
* the flags determine readability / writeability.
**/
void
gst_element_class_install_std_props (GstElementClass * klass, const char *first_name, ...)
gst_element_class_install_std_props (GstElementClass * klass, const gchar *first_name, ...)
{
const char *name;

View file

@ -32,9 +32,7 @@
#include <gst/gstclock.h>
#include <gst/gstpluginfeature.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
G_BEGIN_DECLS
#define GST_NUM_STATES 4
@ -80,23 +78,22 @@ typedef enum {
GST_ELEMENT_THREAD_SUGGESTED,
/* this element is incable of seeking (FIXME: does this apply to filters?) */
GST_ELEMENT_NO_SEEK,
/* this element, for some reason, has a loop function that performs
* an infinite loop without calls to gst_element_yield () */
GST_ELEMENT_INFINITE_LOOP,
/* there is a new loopfunction ready for placement */
GST_ELEMENT_NEW_LOOPFUNC,
/* if this element can handle events */
GST_ELEMENT_EVENT_AWARE,
/* use threadsafe property get/set implementation */
GST_ELEMENT_USE_THREADSAFE_PROPERTIES,
/* private flags that can be used by the scheduler */
GST_ELEMENT_SCHEDULER_PRIVATE1,
GST_ELEMENT_SCHEDULER_PRIVATE2,
/* there is a new loopfunction ready for placement */
GST_ELEMENT_NEW_LOOPFUNC,
/* if this element can handle events */
GST_ELEMENT_EVENT_AWARE,
/* use some padding for future expansion */
GST_ELEMENT_FLAG_LAST = GST_OBJECT_FLAG_LAST + 12,
GST_ELEMENT_FLAG_LAST = GST_OBJECT_FLAG_LAST + 16,
} GstElementFlags;
#define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED))
@ -111,14 +108,14 @@ typedef enum {
#define GST_ELEMENT_CLOCK(obj) (((GstElement*)(obj))->clock)
#define GST_ELEMENT_PADS(obj) ((obj)->pads)
/*typedef struct _GstElement GstElement;*/
/*typedef struct _GstElementClass GstElementClass;*/
typedef struct _GstElementFactory GstElementFactory;
typedef struct _GstElementFactoryClass GstElementFactoryClass;
typedef void (*GstElementLoopFunction) (GstElement *element);
typedef void (*GstElementSetClockFunction) (GstElement *element, GstClock *clock);
typedef GstClock* (*GstElementGetClockFunction) (GstElement *element);
typedef void (*GstElementPreRunFunction) (GstElement *element);
typedef void (*GstElementPostRunFunction) (GstElement *element);
struct _GstElement {
GstObject object;
@ -143,6 +140,11 @@ struct _GstElement {
GMutex *state_mutex;
GCond *state_cond;
GstElementPreRunFunction pre_run_func;
GstElementPostRunFunction post_run_func;
GAsyncQueue *prop_value_queue;
GMutex *property_mutex;
};
struct _GstElementClass {
@ -155,12 +157,12 @@ struct _GstElementClass {
gint numpadtemplates;
/* signal callbacks */
void (*state_change) (GstElement *element, GstElementState old, GstElementState state);
void (*new_pad) (GstElement *element, GstPad *pad);
void (*pad_removed) (GstElement *element, GstPad *pad);
void (*error) (GstElement *element, GstElement *source, gchar *error);
void (*eos) (GstElement *element);
void (*deep_notify) (GstObject *object, GstObject *orig, GParamSpec *pspec);
void (*state_change) (GstElement *element, GstElementState old, GstElementState state);
void (*new_pad) (GstElement *element, GstPad *pad);
void (*pad_removed) (GstElement *element, GstPad *pad);
void (*error) (GstElement *element, GstElement *source, gchar *error);
void (*eos) (GstElement *element);
void (*deep_notify) (GstObject *object, GstObject *orig, GParamSpec *pspec);
/* local pointers for get/set */
void (*set_property) (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec);
@ -175,6 +177,9 @@ struct _GstElementClass {
};
void gst_element_class_add_pad_template (GstElementClass *klass, GstPadTemplate *templ);
void gst_element_class_install_std_props (GstElementClass *klass,
const gchar *first_name, ...);
GType gst_element_get_type (void);
#define gst_element_destroy(element) gst_object_destroy (GST_OBJECT (element))
@ -182,6 +187,22 @@ GType gst_element_get_type (void);
void gst_element_set_loop_function (GstElement *element,
GstElementLoopFunction loop);
/* threadsafe versions of their g_object_* counterparts */
void gst_element_set (GstElement *element, const gchar *first_property_name, ...);
void gst_element_get (GstElement *element, const gchar *first_property_name, ...);
void gst_element_set_valist (GstElement *element, const gchar *first_property_name,
va_list var_args);
void gst_element_get_valist (GstElement *element, const gchar *first_property_name,
va_list var_args);
void gst_element_set_property (GstElement *element, const gchar *property_name,
const GValue *value);
void gst_element_get_property (GstElement *element, const gchar *property_name,
GValue *value);
void gst_element_enable_threadsafe_properties (GstElement *element);
void gst_element_disable_threadsafe_properties (GstElement *element);
void gst_element_set_pending_properties (GstElement *element);
void gst_element_set_name (GstElement *element, const gchar *name);
const gchar* gst_element_get_name (GstElement *element);
@ -248,9 +269,6 @@ const gchar* gst_element_state_get_name (GstElementState state);
GstElementFactory* gst_element_get_factory (GstElement *element);
void gst_element_class_install_std_props (GstElementClass *klass,
const char *first_name, ...);
GstBin* gst_element_get_managing_bin (GstElement *element);
@ -314,9 +332,7 @@ GstElement* gst_element_factory_create (GstElementFactory *factory,
/* FIXME this name is wrong, probably so is the one above it */
GstElement* gst_element_factory_make (const gchar *factoryname, const gchar *name);
#ifdef __cplusplus
}
#endif /* __cplusplus */
G_END_DECLS
#endif /* __GST_ELEMENT_H__ */

View file

@ -84,7 +84,7 @@ struct _GstObject {
/* locking for all sorts of things (like the refcount) */
GMutex *lock;
/* this objects parent */
/* this object's parent */
GstObject *parent;
guint32 flags;

View file

@ -1791,7 +1791,9 @@ gst_pad_push (GstPad *pad, GstBuffer *buf)
GstBuffer*
gst_pad_pull (GstPad *pad)
{
GstRealPad *peer = GST_RPAD_PEER(pad);
GstRealPad *peer;
peer = GST_RPAD_PEER (pad);
GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
@ -1811,8 +1813,10 @@ gst_pad_pull (GstPad *pad)
GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_PAD_NAME (peer));
buf = (peer->gethandler) (GST_PAD_CAST (peer));
if (buf)
return buf;
/* no null buffers allowed */
gst_element_error (GST_PAD_PARENT (pad),
"NULL buffer during pull on %s:%s", GST_DEBUG_PAD_NAME (pad), NULL);

View file

@ -290,6 +290,14 @@ gst_thread_change_state (GstElement * element)
g_mutex_unlock (thread->lock);
break;
case GST_STATE_PAUSED_TO_PLAYING:
{
/* fixme: recurse into sub-bins */
const GList *elements = gst_bin_get_list (GST_BIN (thread));
while (elements) {
gst_element_enable_threadsafe_properties ((GstElement*)elements->data);
elements = g_list_next (elements);
}
THR_DEBUG ("telling thread to start spinning");
g_mutex_lock (thread->lock);
THR_DEBUG ("signaling");
@ -299,9 +307,10 @@ gst_thread_change_state (GstElement * element)
THR_DEBUG ("got ack");
g_mutex_unlock (thread->lock);
break;
}
case GST_STATE_PLAYING_TO_PAUSED:
{
GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
const GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread));
THR_INFO ("pausing thread");
@ -374,6 +383,8 @@ gst_thread_change_state (GstElement * element)
}
}
}
gst_element_disable_threadsafe_properties (element);
}
THR_DEBUG ("telling thread to pause, signaling");
g_cond_signal (thread->cond);

View file

@ -21,7 +21,7 @@
*/
/*#define GST_DEBUG_ENABLED */
#include <gst/gst.h>
#include "../gst.h"
#include "cothreads_compat.h"
@ -62,6 +62,7 @@ struct _GstSchedulerChain {
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BASIC_SCHEDULER))
#define GST_BASIC_SCHEDULER_CAST(sched) ((GstBasicScheduler *)(sched))
#define SCHED(element) GST_BASIC_SCHEDULER_CAST (GST_ELEMENT_SCHED (element))
typedef enum {
GST_BASIC_SCHEDULER_STATE_NONE,
@ -87,6 +88,7 @@ struct _GstBasicScheduler {
GstBasicSchedulerState state;
cothread_context *context;
GstElement *current;
};
struct _GstBasicSchedulerClass {
@ -123,6 +125,18 @@ static void gst_basic_scheduler_show (GstScheduler *sched);
static GstSchedulerClass *parent_class = NULL;
#define do_element_switch(element) G_STMT_START{ \
GstElement *from = SCHED (element)->current; \
if (from->post_run_func) \
from->post_run_func (from); \
SCHED (element)->current = element; \
do_cothread_switch (GST_ELEMENT_THREADSTATE (element)); \
/* we assume other cothread switches will set ->current \
* properly, no need to do it from this side */ \
if (from->pre_run_func) \
from->pre_run_func (from); \
}G_STMT_END
static GType
gst_basic_scheduler_get_type (void)
{
@ -312,6 +326,9 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[])
GST_DEBUG_ENTER ("(%d,\"%s\")", argc, name);
if (element->pre_run_func)
element->pre_run_func (element);
do {
pads = element->pads;
while (pads) {
@ -358,8 +375,12 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[])
static void
gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
{
GstRealPad *peer = GST_RPAD_PEER (pad);
gint loop_count = 100;
GstElement *parent;
GstRealPad *peer;
parent = GST_PAD_PARENT (pad);
peer = GST_RPAD_PEER (pad);
GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad));
GST_DEBUG (GST_CAT_DATAFLOW, "putting buffer %p in peer \"%s:%s\"'s pen", buf,
@ -370,8 +391,9 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
*/
while (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) != NULL && --loop_count) {
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen %d",
GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)), loop_count);
do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
GST_ELEMENT_THREADSTATE (parent), loop_count);
do_element_switch (parent);
/* we may no longer be the same pad, check. */
if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
@ -381,16 +403,19 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
}
if (loop_count == 0) {
gst_element_error (GST_PAD_PARENT (pad),
gst_element_error (parent,
"(internal error) maximum number of switches exceeded");
return;
}
g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL);
/* now fill the bufferpen and switch so it can be consumed */
GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf;
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p",
GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
do_element_switch (parent);
GST_DEBUG (GST_CAT_DATAFLOW, "done switching");
}
@ -398,6 +423,10 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
static void
gst_basic_scheduler_select_proxy (GstPad * pad, GstBuffer * buf)
{
GstElement *parent;
parent = GST_PAD_PARENT (pad);
GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad));
GST_DEBUG (GST_CAT_DATAFLOW, "putting buffer %p in peer's pen", buf);
@ -406,11 +435,11 @@ gst_basic_scheduler_select_proxy (GstPad * pad, GstBuffer * buf)
/* now fill the bufferpen and switch so it can be consumed */
GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf;
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p",
GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad;
do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
GST_ELEMENT_THREADSTATE (parent));
parent->select_pad = pad;
do_element_switch (parent);
GST_DEBUG (GST_CAT_DATAFLOW, "done switching");
}
@ -419,7 +448,11 @@ static GstBuffer *
gst_basic_scheduler_gethandler_proxy (GstPad * pad)
{
GstBuffer *buf;
GstRealPad *peer = GST_RPAD_PEER (pad);
GstElement *parent;
GstRealPad *peer;
parent = GST_PAD_PARENT (pad);
peer = GST_RPAD_PEER (pad);
GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad));
@ -427,16 +460,17 @@ gst_basic_scheduler_gethandler_proxy (GstPad * pad)
/* we will loop switching to the peer until it's filled up the bufferpen */
while (GST_RPAD_BUFPEN (pad) == NULL) {
GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen",
GST_ELEMENT_NAME (GST_ELEMENT (GST_PAD_PARENT (pad))),
GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
GST_ELEMENT_NAME (parent),
GST_ELEMENT_THREADSTATE (parent));
do_element_switch (parent);
/* we may no longer be the same pad, check. */
if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!");
pad = (GstPad *) GST_RPAD_PEER (peer);
if (!pad) {
gst_element_error (GST_ELEMENT (GST_PAD_PARENT (peer)), "pad unconnected");
gst_element_error (parent, "pad unconnected");
}
}
}
@ -453,7 +487,11 @@ static GstBuffer *
gst_basic_scheduler_pullregionfunc_proxy (GstPad * pad, GstRegionType type, guint64 offset, guint64 len)
{
GstBuffer *buf;
GstRealPad *peer = GST_RPAD_PEER (pad);
GstElement *parent;
GstRealPad *peer;
parent = GST_PAD_PARENT (pad);
peer = GST_RPAD_PEER (pad);
GST_DEBUG_ENTER ("%s:%s,%d,%lld,%lld", GST_DEBUG_PAD_NAME (pad), type, offset, len);
@ -466,8 +504,9 @@ gst_basic_scheduler_pullregionfunc_proxy (GstPad * pad, GstRegionType type, guin
/* we will loop switching to the peer until it's filled up the bufferpen */
while (GST_RPAD_BUFPEN (pad) == NULL) {
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen",
GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
GST_ELEMENT_THREADSTATE (parent));
do_element_switch (parent);
/* we may no longer be the same pad, check. */
if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
@ -1064,7 +1103,13 @@ static void
gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element)
{
if (GST_ELEMENT_IS_COTHREAD_STOPPING (element)) {
if (element->post_run_func)
element->post_run_func (element);
SCHED (element)->current = NULL;
do_cothread_switch (do_cothread_get_main (((GstBasicScheduler *) sched)->context));
/* no need to do a pre_run, the cothread is stopping */
}
}
@ -1072,6 +1117,11 @@ static gboolean
gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element)
{
GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING);
if (element->post_run_func)
element->post_run_func (element);
SCHED (element)->current = NULL;
do_cothread_switch (do_cothread_get_main (((GstBasicScheduler *) sched)->context));
return FALSE;
@ -1091,6 +1141,10 @@ gst_basic_scheduler_error (GstScheduler *sched, GstElement *element)
GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_ERROR;
if (element->post_run_func)
element->post_run_func (element);
SCHED (element)->current = NULL;
do_cothread_switch (do_cothread_get_main (((GstBasicScheduler *) sched)->context));
}
}
@ -1192,7 +1246,7 @@ gst_basic_scheduler_pad_select (GstScheduler * sched, GList * padlist)
if (pad != NULL) {
GstRealPad *peer = GST_RPAD_PEER (pad);
do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (peer)));
do_element_switch (GST_PAD_PARENT (peer));
pad = GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad;
@ -1266,7 +1320,15 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
GST_DEBUG (GST_CAT_DATAFLOW, "set COTHREAD_STOPPING flag on \"%s\"(@%p)",
GST_ELEMENT_NAME (entry), entry);
if (GST_ELEMENT_THREADSTATE (entry)) {
if (entry->pre_run_func)
entry->pre_run_func (entry);
bsched->current = entry;
do_cothread_switch (GST_ELEMENT_THREADSTATE (entry));
if (bsched->current && bsched->current->post_run_func)
bsched->current->post_run_func (bsched->current);
state = GST_SCHEDULER_STATE (sched);
/* if something changed, return - go on else */
if (GST_FLAG_IS_SET(bsched, GST_BASIC_SCHEDULER_CHANGE) &&