gstreamer/gst/schedulers/entryscheduler.c
Wim Taymans 6cacf76cd9 Added GstBin test.
Original commit message from CVS:
Added GstBin test.
Added GstSystemClock test.
Implemented clock distribution code in GstBin.
Implemented iterate sinks method for future use.
Rearranged gstelement.h
Fix GstIterator comparison bug.
Moved some code to GstPipeline, mostly clocking related.
2005-03-09 16:10:59 +00:00

1175 lines
34 KiB
C

/* GStreamer
* Copyright (C) 2004 Benjamin Otte <otte@gnome.org>
*
* gstentryscheduler.c: A scheduler based on entries
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <gst/gst.h>
#include "cothreads_compat.h"
#include "../gst-i18n-lib.h"
GST_DEBUG_CATEGORY_STATIC (debug_scheduler);
#define GST_CAT_DEFAULT debug_scheduler
#define GET_TYPE(x) gst_entry_ ## x ## _scheduler_get_type
#define GST_TYPE_ENTRY_SCHEDULER \
(GET_TYPE (COTHREADS_TYPE) ())
#define GST_ENTRY_SCHEDULER(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_ENTRY_SCHEDULER,GstEntryScheduler))
#define GST_ENTRY_SCHEDULER_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_ENTRY_SCHEDULER,GstEntrySchedulerClass))
#define GST_IS_ENTRY_SCHEDULER(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_ENTRY_SCHEDULER))
#define GST_IS_ENTRY_SCHEDULER_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_ENTRY_SCHEDULER))
#define SCHED_ASSERT(sched, assertion) G_STMT_START{ \
if (!(assertion)) \
gst_scheduler_show (GST_SCHEDULER (sched)); \
g_assert (assertion); \
}G_STMT_END
typedef enum
{
WAIT_FOR_NOTHING,
WAIT_FOR_MUM,
WAIT_FOR_PADS, /* pad must be scheduled */
/* add more */
WAIT_FOR_ANYTHING
}
WaitInfo;
typedef enum
{
ENTRY_UNDEFINED,
ENTRY_COTHREAD,
ENTRY_LINK
}
EntryType;
typedef struct
{
EntryType type;
}
Entry;
#define ENTRY_IS_COTHREAD(x) (((Entry *)(x))->type == ENTRY_COTHREAD)
#define ENTRY_IS_LINK(x) (((Entry *)(x))->type == ENTRY_LINK)
typedef struct _GstEntryScheduler GstEntryScheduler;
typedef struct _GstEntrySchedulerClass GstEntrySchedulerClass;
typedef struct
{
Entry entry;
/* pointer to scheduler */
GstEntryScheduler *sched;
/* pointer to element */
GstElement *element;
/* the main function of the cothread */
int (*main) (int argc, gchar ** argv);
/* wether the given pad is schedulable */
gboolean (*can_schedule) (GstRealPad * pad);
/* what the element is currently waiting for */
WaitInfo wait;
/* cothread of element */
cothread *thread;
/* pad to schedule next */
GstRealPad *schedule_pad;
}
CothreadPrivate;
#define ELEMENT_PRIVATE(element) ((CothreadPrivate *) GST_ELEMENT (element)->sched_private)
#define SCHED(element) (GST_ENTRY_SCHEDULER ((element)->scheduler))
typedef struct
{
Entry entry;
/* pads */
GstRealPad *srcpad;
GstRealPad *sinkpad;
/* private struct of srcpad's element, needed for decoupled elements */
CothreadPrivate *src;
/* private struct of sinkpad's element */
CothreadPrivate *sink;
/* current data */
GstData *bufpen;
}
LinkPrivate;
#define PAD_PRIVATE(pad) ((LinkPrivate *) (GST_REAL_PAD (pad))->sched_private)
struct _GstEntryScheduler
{
GstScheduler scheduler;
cothread_context *context;
GList *schedule_now; /* entry points that must be scheduled this
iteration */
GList *schedule_possible; /* possible entry points */
GList *waiting; /* elements waiting for the clock */
gboolean error; /* if an element threw an error */
GSList *reaping; /* cothreads we need to destroy but can't */
};
struct _GstEntrySchedulerClass
{
GstSchedulerClass scheduler_class;
};
static void gst_entry_scheduler_class_init (gpointer g_class, gpointer data);
static void gst_entry_scheduler_init (GstEntryScheduler * object);
GType GET_TYPE (COTHREADS_TYPE) (void)
{
static GType object_type = 0;
if (object_type == 0) {
static const GTypeInfo object_info = {
sizeof (GstEntrySchedulerClass),
NULL,
NULL,
gst_entry_scheduler_class_init,
NULL,
NULL,
sizeof (GstEntryScheduler),
0,
(GInstanceInitFunc) gst_entry_scheduler_init
};
object_type =
g_type_register_static (GST_TYPE_SCHEDULER,
"GstEntry" COTHREADS_NAME_CAPITAL "Scheduler", &object_info, 0);
}
return object_type;
}
static int gst_entry_scheduler_loop_wrapper (int argc, char **argv);
static int gst_entry_scheduler_get_wrapper (int argc, char **argv);
static int gst_entry_scheduler_chain_wrapper (int argc, char **argv);
static void gst_entry_scheduler_setup (GstScheduler * sched);
static void gst_entry_scheduler_reset (GstScheduler * sched);
static void gst_entry_scheduler_add_element (GstScheduler * sched,
GstElement * element);
static void gst_entry_scheduler_remove_element (GstScheduler * sched,
GstElement * element);
static GstElementStateReturn gst_entry_scheduler_state_transition (GstScheduler
* sched, GstElement * element, gint transition);
static gboolean gst_entry_scheduler_yield (GstScheduler * sched,
GstElement * element);
static gboolean gst_entry_scheduler_interrupt (GstScheduler * sched,
GstElement * element);
static void gst_entry_scheduler_error (GstScheduler * sched,
GstElement * element);
static void gst_entry_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad,
GstPad * sinkpad);
static void gst_entry_scheduler_pad_unlink (GstScheduler * sched,
GstPad * srcpad, GstPad * sinkpad);
static GstData *gst_entry_scheduler_pad_select (GstScheduler * sched,
GstPad ** pulled_from, GstPad ** pads);
static GstSchedulerState gst_entry_scheduler_iterate (GstScheduler * sched);
static void gst_entry_scheduler_show (GstScheduler * scheduler);
static gboolean can_schedule_pad (GstRealPad * pad);
static void schedule_next_element (GstEntryScheduler * sched);
static void
gst_entry_scheduler_class_init (gpointer klass, gpointer class_data)
{
GstSchedulerClass *scheduler = GST_SCHEDULER_CLASS (klass);
scheduler->setup = gst_entry_scheduler_setup;
scheduler->reset = gst_entry_scheduler_reset;
scheduler->add_element = gst_entry_scheduler_add_element;
scheduler->remove_element = gst_entry_scheduler_remove_element;
scheduler->state_transition = gst_entry_scheduler_state_transition;
scheduler->yield = gst_entry_scheduler_yield;
scheduler->interrupt = gst_entry_scheduler_interrupt;
scheduler->error = gst_entry_scheduler_error;
scheduler->pad_link = gst_entry_scheduler_pad_link;
scheduler->pad_unlink = gst_entry_scheduler_pad_unlink;
scheduler->pad_select = gst_entry_scheduler_pad_select;
scheduler->clock_wait = NULL;
scheduler->iterate = gst_entry_scheduler_iterate;
scheduler->show = gst_entry_scheduler_show;
do_cothreads_init (NULL);
}
static void
gst_entry_scheduler_init (GstEntryScheduler * scheduler)
{
GST_FLAG_SET (scheduler, GST_SCHEDULER_FLAG_NEW_API);
}
/*
* We've got to setup 5 different element types here:
* - loopbased
* - chainbased
* - chainbased PADS of decoupled elements
* - getbased
* - getbased PADS of decoupled elements
*/
/*
* LOOPBASED
*/
typedef struct
{
CothreadPrivate element;
GstPad **sinkpads;
}
LoopPrivate;
#define LOOP_PRIVATE(x) ((LoopPrivate *) ELEMENT_PRIVATE (x))
static gboolean
_can_schedule_loop (GstRealPad * pad)
{
LoopPrivate *priv;
gint i = 0;
g_assert (PAD_PRIVATE (pad));
if (GST_PAD_IS_SRC (pad))
return FALSE;
priv = LOOP_PRIVATE (gst_pad_get_parent (GST_PAD (pad)));
g_assert (priv);
if (!priv->sinkpads)
return FALSE;
while (priv->sinkpads[i]) {
if (pad == GST_REAL_PAD (priv->sinkpads[i++]))
return TRUE;
}
return FALSE;
}
static int
gst_entry_scheduler_loop_wrapper (int argc, char **argv)
{
CothreadPrivate *priv = (CothreadPrivate *) argv;
GstElement *element = priv->element;
priv->wait = WAIT_FOR_NOTHING;
do {
g_assert (priv->wait == WAIT_FOR_NOTHING);
GST_LOG_OBJECT (SCHED (element), "calling loopfunc for element %s",
GST_ELEMENT_NAME (element));
if (element->loopfunc) {
element->loopfunc (element);
} else {
GST_ELEMENT_ERROR (element, CORE, SCHEDULER, (_("badly behaving plugin")),
("loop-based element %s removed loopfunc during processing",
GST_OBJECT_NAME (element)));
}
GST_LOG_OBJECT (SCHED (element), "done calling loopfunc for element %s",
GST_OBJECT_NAME (element));
priv->wait = WAIT_FOR_NOTHING;
schedule_next_element (SCHED (element));
} while (TRUE);
return 0;
}
static CothreadPrivate *
setup_loop (GstEntryScheduler * sched, GstElement * element)
{
/* the types not matching is intentional, that's why it's g_new0 */
CothreadPrivate *priv = (CothreadPrivate *) g_new0 (LoopPrivate, 1);
priv->element = element;
priv->main = gst_entry_scheduler_loop_wrapper;
priv->wait = WAIT_FOR_NOTHING;
priv->can_schedule = _can_schedule_loop;
return priv;
}
/*
* CHAINBASED
*/
static GstData *
get_buffer (GstEntryScheduler * sched, GstRealPad * pad)
{
LinkPrivate *priv = PAD_PRIVATE (pad);
GstData *data = priv->bufpen;
priv->bufpen = NULL;
g_assert (data);
return data;
}
static int
gst_entry_scheduler_chain_wrapper (int argc, char **argv)
{
CothreadPrivate *priv = (CothreadPrivate *) argv;
GstElement *element = priv->element;
priv->wait = WAIT_FOR_PADS;
do {
GstRealPad *pad = priv->schedule_pad;
g_assert (priv->wait == WAIT_FOR_PADS);
g_assert (pad);
g_assert (GST_PAD_IS_SINK (pad));
g_assert (PAD_PRIVATE (pad)->bufpen != NULL);
GST_LOG_OBJECT (priv->sched, "calling chainfunc for pad %s:%s",
GST_DEBUG_PAD_NAME (pad));
if (pad->chainfunc) {
GstData *data = get_buffer (priv->sched, pad);
gst_pad_call_chain_function (GST_PAD (pad), data);
/* don't do anything after here with the pad, it might already be dead!
the element is still alive though */
} else {
GST_ELEMENT_ERROR (element, CORE, SCHEDULER, (_("badly behaving plugin")),
("chain-based element %s removed chainfunc of pad during processing",
GST_OBJECT_NAME (element)));
gst_data_unref (PAD_PRIVATE (pad)->bufpen);
PAD_PRIVATE (pad)->bufpen = NULL;
}
GST_LOG_OBJECT (priv->sched, "done calling chainfunc for element %s",
GST_OBJECT_NAME (element));
priv->wait = WAIT_FOR_PADS;
schedule_next_element (priv->sched);
} while (TRUE);
return 0;
}
static gboolean
_can_schedule_chain (GstRealPad * pad)
{
g_assert (PAD_PRIVATE (pad));
if (GST_PAD_IS_SRC (pad))
return FALSE;
g_assert (PAD_PRIVATE (pad));
return PAD_PRIVATE (pad)->sink->wait == WAIT_FOR_PADS;
}
static CothreadPrivate *
setup_chain (GstEntryScheduler * sched, GstElement * element)
{
CothreadPrivate *priv = g_new0 (CothreadPrivate, 1);
priv->main = gst_entry_scheduler_chain_wrapper;
priv->wait = WAIT_FOR_PADS;
priv->can_schedule = _can_schedule_chain;
return priv;
}
/*
* GETBASED
*/
static int
gst_entry_scheduler_get_wrapper (int argc, char **argv)
{
CothreadPrivate *priv = (CothreadPrivate *) argv;
GstElement *element = priv->element;
priv->wait = WAIT_FOR_PADS;
do {
GstRealPad *pad = priv->schedule_pad;
g_assert (pad);
g_assert (GST_PAD_IS_SRC (pad));
g_assert (PAD_PRIVATE (pad)->bufpen == NULL);
GST_LOG_OBJECT (priv->sched, "calling getfunc for pad %s:%s",
GST_DEBUG_PAD_NAME (pad));
if (pad->getfunc) {
GstData *data = gst_pad_call_get_function (GST_PAD (pad));
/* make sure the pad still exists and is linked */
if (!g_list_find (element->pads, pad)) {
GST_ELEMENT_ERROR (element, CORE, SCHEDULER,
(_("badly behaving plugin")),
("get-based element %s removed pad during getfunc",
GST_OBJECT_NAME (element)));
gst_data_unref (data);
} else if (!GST_PAD_PEER (pad)) {
GST_ELEMENT_ERROR (element, CORE, SCHEDULER,
(_("badly behaving plugin")),
("get-based element %s unlinked pad during getfunc",
GST_OBJECT_NAME (element)));
gst_data_unref (data);
} else {
PAD_PRIVATE (pad)->bufpen = data;
priv->sched->schedule_now =
g_list_prepend (priv->sched->schedule_now, PAD_PRIVATE (pad));
}
} else {
GST_ELEMENT_ERROR (element, CORE, SCHEDULER, (_("badly behaving plugin")),
("get-based element %s removed getfunc during processing",
GST_OBJECT_NAME (element)));
}
GST_LOG_OBJECT (priv->sched, "done calling getfunc for element %s",
GST_ELEMENT_NAME (element));
priv->wait = WAIT_FOR_PADS;
schedule_next_element (priv->sched);
} while (TRUE);
return 0;
}
static gboolean
_can_schedule_get (GstRealPad * pad)
{
g_assert (PAD_PRIVATE (pad));
g_assert (GST_PAD_IS_SRC (pad));
g_assert (PAD_PRIVATE (pad));
return PAD_PRIVATE (pad)->bufpen == NULL &&
PAD_PRIVATE (pad)->src->wait == WAIT_FOR_PADS &&
can_schedule_pad (PAD_PRIVATE (pad)->sinkpad);
}
static CothreadPrivate *
setup_get (GstEntryScheduler * sched, GstElement * element)
{
CothreadPrivate *priv = g_new0 (CothreadPrivate, 1);
priv->main = gst_entry_scheduler_get_wrapper;
priv->wait = WAIT_FOR_PADS;
priv->can_schedule = _can_schedule_get;
return priv;
}
/*
* scheduling functions
*/
static gboolean
can_schedule_pad (GstRealPad * pad)
{
LinkPrivate *link = PAD_PRIVATE (pad);
g_assert (link);
if (GST_STATE (gst_pad_get_parent (GST_PAD (pad))) != GST_STATE_PLAYING)
return FALSE;
if (GST_PAD_IS_SINK (pad)) {
return link->sink->can_schedule (pad);
} else {
return link->src->can_schedule (pad);
}
}
static gboolean
can_schedule (Entry * entry)
{
if (ENTRY_IS_LINK (entry)) {
LinkPrivate *link = (LinkPrivate *) entry;
CothreadPrivate *priv;
GstRealPad *pad;
if (link->bufpen) {
priv = link->sink;
pad = link->sinkpad;
} else {
priv = link->src;
pad = link->srcpad;
}
if (priv->wait != WAIT_FOR_PADS)
return FALSE;
return can_schedule_pad (pad);
} else if (ENTRY_IS_COTHREAD (entry)) {
CothreadPrivate *priv = (CothreadPrivate *) entry;
GList *list;
if (priv->wait != WAIT_FOR_NOTHING)
return FALSE;
if (GST_STATE (priv->element) != GST_STATE_PLAYING)
return FALSE;
if (GST_FLAG_IS_SET (priv->element, GST_ELEMENT_DECOUPLED)) {
g_assert (PAD_PRIVATE (priv->schedule_pad));
return TRUE;
}
for (list = priv->element->pads; list; list = g_list_next (list)) {
GstPad *pad = GST_PAD (list->data);
if (GST_PAD_IS_SRC (pad) && PAD_PRIVATE (pad) &&
PAD_PRIVATE (pad)->bufpen != NULL)
return FALSE;
}
return TRUE;
} else {
g_assert_not_reached ();
return FALSE;
}
}
static void
safe_cothread_switch (GstEntryScheduler * scheduler, cothread * thread)
{
GList *list;
cothread *cur = do_cothread_get_current (scheduler->context);
if (cur == thread) {
GST_LOG_OBJECT (scheduler, "switch to same cothread, ignoring");
}
for (list = scheduler->schedule_possible; list; list = g_list_next (list)) {
if (ENTRY_IS_COTHREAD (list->data)) {
CothreadPrivate *priv = (CothreadPrivate *) list->data;
if (priv->thread == thread)
gst_object_ref (GST_OBJECT (priv->element));
if (priv->thread == cur)
gst_object_unref (GST_OBJECT (priv->element));
}
}
do_cothread_switch (thread);
if (cur == do_cothread_get_main (scheduler->context)) {
GSList *walk;
for (walk = scheduler->reaping; walk; walk = g_slist_next (walk)) {
do_cothread_destroy (walk->data);
}
g_slist_free (scheduler->reaping);
scheduler->reaping = NULL;
}
}
/* the meat - no guarantee as to which cothread this function is called */
static void
schedule (GstEntryScheduler * sched, Entry * entry)
{
CothreadPrivate *schedule_me;
g_assert (can_schedule (entry));
sched->schedule_now = g_list_remove (sched->schedule_now, entry);
sched->schedule_possible = g_list_remove (sched->schedule_possible, entry);
sched->schedule_possible = g_list_append (sched->schedule_possible, entry);
if (ENTRY_IS_LINK (entry)) {
LinkPrivate *link = (LinkPrivate *) entry;
if (link->bufpen) {
schedule_me = link->sink;
schedule_me->schedule_pad = link->sinkpad;
} else {
schedule_me = link->src;
schedule_me->schedule_pad = link->srcpad;
}
GST_DEBUG_OBJECT (sched, "scheduling pad %s:%s",
GST_DEBUG_PAD_NAME (schedule_me->schedule_pad));
} else if (ENTRY_IS_COTHREAD (entry)) {
schedule_me = (CothreadPrivate *) entry;
GST_DEBUG_OBJECT (sched, "scheduling element %s",
GST_OBJECT_NAME (schedule_me->element));
} else {
g_assert_not_reached ();
GST_DEBUG_OBJECT (sched, "scheduling main after error");
sched->error = TRUE;
safe_cothread_switch (sched, do_cothread_get_main (sched->context));
return;
}
if (!schedule_me->thread) {
GST_LOG_OBJECT (sched, "creating cothread for %p (element %s)", schedule_me,
GST_OBJECT_NAME (schedule_me->element));
do_cothread_create (schedule_me->thread, sched->context, schedule_me->main,
0, (gchar **) schedule_me);
}
safe_cothread_switch (sched, schedule_me->thread);
}
/* this function will die a horrible death if you have cyclic pipelines */
static Entry *
schedule_forward (Entry * entry)
{
if (can_schedule (entry))
return entry;
if (ENTRY_IS_LINK (entry)) {
return schedule_forward ((Entry *) ((LinkPrivate *) entry)->sink);
} else if (ENTRY_IS_COTHREAD (entry)) {
GList *list;
GstElement *element = ((CothreadPrivate *) entry)->element;
if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED))
return NULL;
for (list = element->pads; list; list = g_list_next (list)) {
if (GST_PAD_IS_SINK (list->data) || !PAD_PRIVATE (list->data))
continue;
entry = schedule_forward ((Entry *) PAD_PRIVATE (list->data));
if (entry)
return entry;
}
} else {
g_assert_not_reached ();
}
return NULL;
}
static void
schedule_next_element (GstEntryScheduler * scheduler)
{
if (scheduler->error) {
GST_DEBUG_OBJECT (scheduler, "scheduling main after error");
safe_cothread_switch (scheduler, do_cothread_get_main (scheduler->context));
} else if (scheduler->waiting) {
/* FIXME: write me */
g_assert_not_reached ();
} else if (scheduler->schedule_now) {
GList *test;
for (test = scheduler->schedule_now; test; test = g_list_next (test)) {
Entry *entry = schedule_forward ((Entry *) test->data);
if (entry) {
schedule (scheduler, entry);
return;
}
}
if (!scheduler->waiting) {
GST_ERROR_OBJECT (scheduler,
"have stuff that must be scheduled, but nothing that can be scheduled");
scheduler->error = TRUE;
}
}
GST_DEBUG_OBJECT (scheduler, "scheduling main");
safe_cothread_switch (scheduler, do_cothread_get_main (scheduler->context));
}
/*
* handlers to attach to pads
*/
static void
gst_entry_scheduler_chain_handler (GstPad * pad, GstData * data)
{
LinkPrivate *priv = PAD_PRIVATE (pad);
CothreadPrivate *thread = priv->src;
GstEntryScheduler *sched = thread->sched;
GST_LOG_OBJECT (sched, "putting data %p in pen of pad %s:%s",
data, GST_DEBUG_PAD_NAME (pad));
if (priv->bufpen != NULL) {
GST_ERROR_OBJECT (sched, "scheduling error: trying to push data in bufpen"
"of pad %s:%s, but bufpen was full", GST_DEBUG_PAD_NAME (pad));
sched->error = TRUE;
gst_data_unref (data);
} else {
priv->bufpen = data;
sched->schedule_now = g_list_append (sched->schedule_now, priv);
}
thread->wait = WAIT_FOR_NOTHING;
schedule_next_element (sched);
GST_LOG_OBJECT (sched, "done");
}
static GstData *
gst_entry_scheduler_get_handler (GstPad * pad)
{
GstData *data;
GstEntryScheduler *sched = GST_ENTRY_SCHEDULER (gst_pad_get_scheduler (pad));
GstPad *pads[2] = { NULL, NULL };
GstPad *ret;
pad = GST_PAD_PEER (pad);
pads[0] = pad;
GST_LOG_OBJECT (sched, "pad %s:%s pulls", GST_DEBUG_PAD_NAME (pad));
data = gst_entry_scheduler_pad_select (GST_SCHEDULER (sched), &ret, pads);
g_assert (pad == ret);
GST_LOG_OBJECT (sched, "done with %s:%s", GST_DEBUG_PAD_NAME (pad));
return data;
}
static gboolean
gst_entry_scheduler_event_handler (GstPad * srcpad, GstEvent * event)
{
/* FIXME: need to do more here? */
return GST_RPAD_EVENTFUNC (srcpad) (srcpad, event);
}
/*
* Entry points for this scheduler.
*/
static GstData *
gst_entry_scheduler_pad_select (GstScheduler * scheduler, GstPad ** pulled_from,
GstPad ** pads)
{
GstData *data;
GstRealPad *pad = NULL;
GstElement *element = NULL;
GstEntryScheduler *sched = GST_ENTRY_SCHEDULER (scheduler);
gint i = 0;
/* sanity check */
while (pads[i]) {
pad = GST_REAL_PAD (pads[i++]);
if (PAD_PRIVATE (pad)->bufpen) {
sched->schedule_now =
g_list_remove (sched->schedule_now, PAD_PRIVATE (pad));
goto found;
}
}
element = gst_pad_get_parent (GST_PAD (pad));
g_assert (element);
g_assert (ELEMENT_PRIVATE (element)->main ==
gst_entry_scheduler_loop_wrapper);
LOOP_PRIVATE (element)->sinkpads = pads;
ELEMENT_PRIVATE (element)->wait = WAIT_FOR_PADS;
schedule_next_element (SCHED (element));
LOOP_PRIVATE (element)->sinkpads = NULL;
pad = ELEMENT_PRIVATE (element)->schedule_pad;
g_assert (PAD_PRIVATE (pad)->bufpen);
found:
data = get_buffer (sched, pad);
g_return_val_if_fail (pulled_from, data);
*pulled_from = GST_PAD (pad);
return data;
}
static void
gst_entry_scheduler_setup (GstScheduler * sched)
{
/* first create thread context */
if (GST_ENTRY_SCHEDULER (sched)->context == NULL) {
GST_DEBUG_OBJECT (sched, "initializing cothread context");
GST_ENTRY_SCHEDULER (sched)->context = do_cothread_context_init ();
}
}
static void
safe_cothread_destroy (CothreadPrivate * thread)
{
GstEntryScheduler *scheduler = thread->sched;
if (do_cothread_get_current (scheduler->context) ==
do_cothread_get_main (scheduler->context)) {
do_cothread_destroy (thread->thread);
} else {
GST_WARNING_OBJECT (scheduler, "delaying destruction of cothread %p",
thread->thread);
scheduler->reaping = g_slist_prepend (scheduler->reaping, thread->thread);
}
thread->thread = NULL;
}
static void
gst_entry_scheduler_remove_all_cothreads (GstEntryScheduler * scheduler)
{
GList *list;
for (list = scheduler->schedule_possible; list; list = g_list_next (list)) {
if (ENTRY_IS_COTHREAD (list->data)) {
CothreadPrivate *priv = (CothreadPrivate *) list->data;
if (priv->thread)
safe_cothread_destroy (priv);
}
}
}
static void
gst_entry_scheduler_reset (GstScheduler * sched)
{
GstEntryScheduler *scheduler = GST_ENTRY_SCHEDULER (sched);
if (scheduler->context) {
g_return_if_fail (scheduler->reaping == NULL);
gst_entry_scheduler_remove_all_cothreads (scheduler);
do_cothread_context_destroy (scheduler->context);
scheduler->context = NULL;
}
}
static CothreadPrivate *
_setup_cothread (GstEntryScheduler * sched, GstElement * element,
CothreadPrivate * (*setup_func) (GstEntryScheduler *, GstElement *))
{
CothreadPrivate *priv = setup_func (sched, element);
priv->entry.type = ENTRY_COTHREAD;
priv->sched = sched;
priv->element = element;
sched->schedule_possible = g_list_prepend (sched->schedule_possible, priv);
if (GST_STATE (element) >= GST_STATE_READY)
gst_entry_scheduler_state_transition (GST_SCHEDULER (sched), element,
GST_STATE_NULL_TO_READY);
if (GST_STATE (element) >= GST_STATE_PAUSED)
gst_entry_scheduler_state_transition (GST_SCHEDULER (sched), element,
GST_STATE_READY_TO_PAUSED);
if (GST_STATE (element) >= GST_STATE_PLAYING)
gst_entry_scheduler_state_transition (GST_SCHEDULER (sched), element,
GST_STATE_PAUSED_TO_PLAYING);
return priv;
}
static void
gst_entry_scheduler_add_element (GstScheduler * scheduler, GstElement * element)
{
GstEntryScheduler *sched = GST_ENTRY_SCHEDULER (scheduler);
if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
GST_INFO_OBJECT (sched, "decoupled element %s added, ignoring",
GST_OBJECT_NAME (element));
return;
}
g_assert (element->sched_private == NULL);
if (element->loopfunc) {
element->sched_private = _setup_cothread (sched, element, setup_loop);
}
}
static void
_remove_cothread (CothreadPrivate * priv)
{
GstEntryScheduler *sched = priv->sched;
sched->waiting = g_list_remove (sched->waiting, priv);
sched->schedule_now = g_list_remove (sched->schedule_now, priv);
sched->schedule_possible = g_list_remove (sched->schedule_possible, priv);
if (priv->thread)
safe_cothread_destroy (priv);
g_free (priv);
}
static void
gst_entry_scheduler_remove_element (GstScheduler * scheduler,
GstElement * element)
{
if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
GST_INFO_OBJECT (scheduler, "decoupled element %s added, ignoring",
GST_OBJECT_NAME (element));
return;
}
if (element->sched_private) {
_remove_cothread (element->sched_private);
element->sched_private = NULL;
}
}
static GstElementStateReturn
gst_entry_scheduler_state_transition (GstScheduler * scheduler,
GstElement * element, gint transition)
{
GstEntryScheduler *sched = GST_ENTRY_SCHEDULER (scheduler);
if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED))
return GST_STATE_SUCCESS;
/* check if our parent changed state */
switch (transition) {
case GST_STATE_NULL_TO_READY:
break;
case GST_STATE_READY_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_PLAYING:
break;
case GST_STATE_PLAYING_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_READY:
if (element == scheduler->parent) {
gst_entry_scheduler_remove_all_cothreads (sched);
}
if (element->sched_private != NULL
&& ELEMENT_PRIVATE (element)->thread != NULL) {
safe_cothread_destroy (ELEMENT_PRIVATE (element));
}
break;
case GST_STATE_READY_TO_NULL:
break;
default:
g_warning ("invalid state change %d for element %s", transition,
GST_OBJECT_NAME (element));
return GST_STATE_FAILURE;
}
return GST_STATE_SUCCESS;
}
static gboolean
gst_entry_scheduler_yield (GstScheduler * sched, GstElement * element)
{
/* g_assert (ELEMENT_PRIVATE (element)); */
/* FIXME: queue thinks it may just interrupt, is that ok? */
if (!ELEMENT_PRIVATE (element))
return TRUE;
ELEMENT_PRIVATE (element)->wait = WAIT_FOR_NOTHING;
schedule_next_element (GST_ENTRY_SCHEDULER (sched));
return FALSE;
}
static gboolean
gst_entry_scheduler_interrupt (GstScheduler * sched, GstElement * element)
{
/* FIXME? */
return gst_entry_scheduler_yield (sched, element);
}
static void
gst_entry_scheduler_error (GstScheduler * scheduler, GstElement * element)
{
GST_ENTRY_SCHEDULER (scheduler)->error = TRUE;
}
static void
gst_entry_scheduler_pad_link (GstScheduler * scheduler, GstPad * srcpad,
GstPad * sinkpad)
{
GstEntryScheduler *sched = GST_ENTRY_SCHEDULER (scheduler);
LinkPrivate *priv;
GstElement *element;
priv = g_new0 (LinkPrivate, 1);
priv->entry.type = ENTRY_LINK;
/* wrap srcpad */
element = gst_pad_get_parent (srcpad);
priv->srcpad = GST_REAL_PAD (srcpad);
if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
priv->src = _setup_cothread (sched, element, setup_get);
} else {
priv->src = ELEMENT_PRIVATE (element);
if (!priv->src) {
GList *list;
for (list = element->pads; list; list = g_list_next (list)) {
if (GST_PAD_IS_SINK (list->data)) {
priv->src = _setup_cothread (sched, element, setup_chain);
break;
}
}
if (!priv->src)
priv->src = _setup_cothread (sched, element, setup_get);
element->sched_private = priv->src;
}
}
GST_RPAD_GETHANDLER (srcpad) = gst_entry_scheduler_get_handler;
GST_RPAD_EVENTHANDLER (srcpad) = gst_entry_scheduler_event_handler;
GST_REAL_PAD (srcpad)->sched_private = priv;
/* wrap sinkpad */
element = gst_pad_get_parent (sinkpad);
priv->sinkpad = GST_REAL_PAD (sinkpad);
if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
priv->sink = _setup_cothread (sched, element, setup_chain);
} else {
priv->sink = ELEMENT_PRIVATE (element);
if (priv->sink) {
/* LOOP or CHAIN */
g_assert (priv->sink->main != gst_entry_scheduler_get_wrapper);
} else {
priv->sink = _setup_cothread (sched, element, setup_chain);
element->sched_private = priv->sink;
}
}
GST_RPAD_CHAINHANDLER (sinkpad) = gst_entry_scheduler_chain_handler;
GST_RPAD_EVENTHANDLER (sinkpad) = gst_entry_scheduler_event_handler;
GST_REAL_PAD (sinkpad)->sched_private = priv;
sched->schedule_possible = g_list_prepend (sched->schedule_possible, priv);
}
static void
gst_entry_scheduler_pad_unlink (GstScheduler * scheduler, GstPad * srcpad,
GstPad * sinkpad)
{
GstEntryScheduler *sched = GST_ENTRY_SCHEDULER (scheduler);
LinkPrivate *priv;
GstElement *element;
priv = PAD_PRIVATE (srcpad);
/* wrap srcpad */
element = gst_pad_get_parent (srcpad);
if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED))
_remove_cothread (priv->src);
GST_RPAD_GETHANDLER (srcpad) = NULL;
GST_RPAD_EVENTHANDLER (srcpad) = NULL;
GST_REAL_PAD (srcpad)->sched_private = NULL;
/* wrap sinkpad */
element = gst_pad_get_parent (sinkpad);
if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED))
_remove_cothread (priv->sink);
GST_RPAD_CHAINHANDLER (sinkpad) = NULL;
GST_RPAD_EVENTHANDLER (sinkpad) = NULL;
GST_REAL_PAD (sinkpad)->sched_private = NULL;
if (priv->bufpen) {
GST_WARNING_OBJECT (sched,
"found data in bufpen while unlinking %s:%s and %s:%s, discarding",
GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
gst_data_unref (priv->bufpen);
}
sched->schedule_now = g_list_remove (sched->schedule_now, priv);
sched->schedule_possible = g_list_remove (sched->schedule_possible, priv);
g_free (priv);
}
static GstSchedulerState
gst_entry_scheduler_iterate (GstScheduler * scheduler)
{
GstEntryScheduler *sched = GST_ENTRY_SCHEDULER (scheduler);
GList *entries = sched->schedule_possible;
GstSchedulerState ret = GST_SCHEDULER_STATE_STOPPED;
GST_LOG_OBJECT (sched, "starting iteration in bin %s",
GST_ELEMENT_NAME (scheduler->parent));
sched->error = FALSE;
if (sched->schedule_now) {
ret = GST_SCHEDULER_STATE_RUNNING;
} else {
while (entries) {
if (can_schedule ((Entry *) entries->data)) {
Entry *entry = entries->data;
ret = GST_SCHEDULER_STATE_RUNNING;
sched->schedule_now = g_list_prepend (sched->schedule_now, entry);
sched->schedule_possible =
g_list_remove (sched->schedule_possible, entry);
sched->schedule_possible =
g_list_append (sched->schedule_possible, entry);
break;
}
entries = g_list_next (entries);
}
}
if (ret == GST_SCHEDULER_STATE_RUNNING)
schedule_next_element (sched);
if (sched->error || sched->schedule_now) {
GST_ERROR_OBJECT (sched, "returning error because of %s",
sched->error ? "element error" : "unschedulable elements");
#if 0
gst_entry_scheduler_show (scheduler);
#endif
return GST_SCHEDULER_STATE_ERROR;
#if 0
} else if (GST_STATE (GST_SCHEDULER (sched)->parent) == GST_STATE_PLAYING &&
ret == GST_SCHEDULER_STATE_STOPPED && scheduler->schedulers == NULL) {
GST_ERROR_OBJECT (sched,
"returning error because we contain running elements and we didn't do a thing");
gst_entry_scheduler_show (scheduler);
return GST_SCHEDULER_STATE_ERROR;
#endif
} else if (ret == GST_SCHEDULER_STATE_STOPPED) {
GST_INFO_OBJECT (sched, "done iterating returning STOPPED");
return GST_SCHEDULER_STATE_STOPPED;
} else {
return ret;
}
}
static const gchar *
print_state (CothreadPrivate * priv)
{
switch (priv->wait) {
case WAIT_FOR_NOTHING:
return "runnable";
case WAIT_FOR_PADS:
return "waiting for pads";
case WAIT_FOR_ANYTHING:
case WAIT_FOR_MUM:
default:
g_assert_not_reached ();
}
return "";
}
static void
print_entry (GstEntryScheduler * sched, Entry * entry)
{
if (ENTRY_IS_LINK (entry)) {
LinkPrivate *link = (LinkPrivate *) entry;
g_print (" %s", can_schedule (entry) ? "OK" : " ");
g_print (" %s:%s%s =>", GST_DEBUG_PAD_NAME (link->srcpad),
can_schedule_pad (link->srcpad) ? " (active)" : "");
g_print (" %s:%s%s", GST_DEBUG_PAD_NAME (link->sinkpad),
can_schedule_pad (link->sinkpad) ? " (active)" : "");
g_print ("%s\n", link->bufpen ? " FILLED" : "");
/* g_print (" %s %s:%s%s => %s:%s%s%s\n", can_schedule (entry) ? "OK" : " ", GST_DEBUG_PAD_NAME (link->srcpad),
link->src->can_schedule (link->srcpad) ? " (active)" : "",
GST_DEBUG_PAD_NAME (link->sink),
link->sink->can_schedule (link->sinkpad) ? "(active) " : "",
link->bufpen ? " FILLED" : "");
*/ } else if (ENTRY_IS_COTHREAD (entry)) {
CothreadPrivate *priv = (CothreadPrivate *) entry;
g_print (" %s %s (%s)\n", can_schedule (entry) ? "OK" : " ",
GST_ELEMENT_NAME (priv->element), print_state (priv));
} else {
g_assert_not_reached ();
}
}
static void
gst_entry_scheduler_show (GstScheduler * scheduler)
{
GstEntryScheduler *sched = GST_ENTRY_SCHEDULER (scheduler);
GList *list;
g_print ("entry points waiting:\n");
for (list = sched->waiting; list; list = g_list_next (list)) {
print_entry (sched, (Entry *) list->data);
}
g_print ("entry points to schedule now:\n");
for (list = sched->schedule_now; list; list = g_list_next (list)) {
print_entry (sched, (Entry *) list->data);
}
g_print ("entry points that might be scheduled:\n");
for (list = sched->schedule_possible; list; list = g_list_next (list)) {
print_entry (sched, (Entry *) list->data);
}
}
static gboolean
plugin_init (GstPlugin * plugin)
{
if (!gst_scheduler_register (plugin, "entry" COTHREADS_NAME,
"A entry scheduler using " COTHREADS_NAME " cothreads",
GST_TYPE_ENTRY_SCHEDULER))
return FALSE;
GST_DEBUG_CATEGORY_INIT (debug_scheduler, "entry" COTHREADS_NAME, 0,
"entry " COTHREADS_NAME "scheduler");
return TRUE;
}
GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, GST_VERSION_MINOR, "gstentry" COTHREADS_NAME "scheduler", "an entry scheduler using " COTHREADS_NAME " cothreads", /* FIXME */
plugin_init, VERSION, GST_LICENSE, GST_PACKAGE, GST_ORIGIN)