diff --git a/ChangeLog b/ChangeLog index e7a6b3e2d6..2acceb4515 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,19 @@ +2004-11-07 Martin Soto + + * gst/schedulers/fairscheduler.c: + * gst/schedulers/faircothreads.c: + * gst/schedulers/faircothreads.h: + New cothread based scheduler: Fair scheduler. + * gst/schedulers/gthread-cothreads.h: + Add the standard #if around the whole file. + Defining symbol GTHREAD_COTHREADS_NO_DEFINITIONS will now prevent + compilation of the functions defined in this file. This is + necessary to be able to use this file as a normal header. + * gst/schedulers/Makefile.am: Add compiling support for fair + scheduler. + * docs/gst/Makefile.am (IGNORE_HFILES): Exclude internal fair + scheduler cothreads layer from documentation generation. + 2004-11-07 Ronald S. Bultje * gst/autoplug/gstspideridentity.c: diff --git a/docs/gst/Makefile.am b/docs/gst/Makefile.am index c76cc63019..3a0556ea6a 100644 --- a/docs/gst/Makefile.am +++ b/docs/gst/Makefile.am @@ -70,6 +70,7 @@ IGNORE_HFILES= \ cothreads.h \ cothreads_compat.h \ gthread-cothreads.h \ + faircothreads.h \ types.h \ grammar.tab.h \ gstmarshal.h \ diff --git a/gst/schedulers/Makefile.am b/gst/schedulers/Makefile.am index 33306c21a9..d58c65eac4 100644 --- a/gst/schedulers/Makefile.am +++ b/gst/schedulers/Makefile.am @@ -17,13 +17,15 @@ plugin_LTLIBRARIES = \ libgstbasicgthreadscheduler.la \ libgstentrygthreadscheduler.la \ libgstoptscheduler.la \ - libgstoptgthreadscheduler.la + libgstoptgthreadscheduler.la \ + libgstfairgthreadscheduler.la AS_LIBTOOL_LIBS = \ $(omegaschedulers_nola) \ libgstbasicgthreadscheduler \ libgstentrygthreadscheduler \ libgstoptscheduler \ - libgstoptgthreadscheduler + libgstoptgthreadscheduler \ + libgstfairgthreadscheduler if GST_DISABLE_OMEGA_COTHREADS else @@ -69,6 +71,11 @@ libgstoptgthreadscheduler_la_CFLAGS = $(GST_OBJ_CFLAGS) -D_COTHREADS_GTHREAD -DU libgstoptgthreadscheduler_la_LIBADD = $(GST_OBJ_LIBS) libgstoptgthreadscheduler_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(as_libtool_LDFLAGS) +libgstfairgthreadscheduler_la_SOURCES = fairscheduler.c faircothreads.c +libgstfairgthreadscheduler_la_CFLAGS = $(GST_OBJ_CFLAGS) -D_COTHREADS_GTHREAD +libgstfairgthreadscheduler_la_LIBADD = $(GST_OBJ_LIBS) +libgstfairgthreadscheduler_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(as_libtool_LDFLAGS) + noinst_HEADERS = cothreads_compat.h gthread-cothreads.h if AS_LIBTOOL_WIN32 diff --git a/gst/schedulers/faircothreads.c b/gst/schedulers/faircothreads.c new file mode 100644 index 0000000000..721b199aa8 --- /dev/null +++ b/gst/schedulers/faircothreads.c @@ -0,0 +1,612 @@ +/* GStreamer + * Copyright (C) 2004 Martin Soto + * + * faircothread.c: High level cothread implementation for the fair scheduler. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include + +#include + +#ifdef _COTHREADS_PTH +#include "pth-cothreads.h" +#else +#include "cothreads_compat.h" +#endif + +#include "faircothreads.h" + +#if !defined(GST_DISABLE_GST_DEBUG) && defined(FAIRSCHEDULER_USE_GETTID) +#include +#include + +_syscall0 (pid_t, gettid) +#endif + GST_DEBUG_CATEGORY_EXTERN (debug_fair_ct); +#define GST_CAT_DEFAULT debug_fair_ct + + +/* + * Support for Asynchronous Operations + */ + + enum + { + ASYNC_OP_CHANGE_STATE = 1, + ASYNC_OP_AWAKE + }; + + typedef struct _AsyncOp AsyncOp; + typedef struct _AsyncOpChangeState AsyncOpChangeState; + typedef struct _AsyncOpAwake AsyncOpAwake; + + struct _AsyncOp + { + int type; + }; + + struct _AsyncOpChangeState + { + AsyncOp parent; + GstFairSchedulerCothread *ct; /* Cothread whose state will be + changed. */ + gint new_state; /* New state for the cothread. */ + }; + + struct _AsyncOpAwake + { + AsyncOp parent; + GstFairSchedulerCothread *ct; /* Cothread to awake. */ + gint priority; /* Priority for the cothread. */ + }; + + + static gchar *gst_fairscheduler_ct_state_names[] = { + "stopped", + "suspended", + "running" + }; + + +/* + * Helpers + */ + +static int +cothread_base_func (int argc, char **argv) +{ + GstFairSchedulerCothread *ct; + + g_return_val_if_fail (argc >= 1, -1); + + ct = (GstFairSchedulerCothread *) argv[0]; + + GST_INFO ("queue %p: Cothread %p starting", ct->queue, ct); +#ifndef GST_DISABLE_GST_DEBUG +#ifdef FAIRSCHEDULER_USE_GETTID + ct->pid = gettid (); +#else + ct->pid = 0; +#endif +#endif + + /* Call the thread function. This looks sort of funny, but there's + no other way I know of doing it. */ + switch (argc - 1) { + case 0: + ct->func (ct, NULL); + break; + case 1: + ct->func (ct, argv[1], NULL); + break; + case 2: + ct->func (ct, argv[1], argv[2], NULL); + break; + case 3: + ct->func (ct, argv[1], argv[2], argv[3], NULL); + break; + case 4: + ct->func (ct, argv[1], argv[2], argv[3], argv[4], NULL); + break; + case 5: + ct->func (ct, argv[1], argv[2], argv[3], argv[4], argv[5], NULL); + break; + case 6: + ct->func (ct, argv[1], argv[2], argv[3], argv[4], argv[5], argv[6], NULL); + break; + case 7: + ct->func (ct, argv[1], argv[2], argv[3], argv[4], argv[5], + argv[6], argv[7], NULL); + break; + default: + g_return_val_if_reached (-1); + break; + } + + /* After the cothread function is finished, we go to the stopped + state. */ + gst_fair_scheduler_cothread_change_state (ct, + GST_FAIRSCHEDULER_CTSTATE_STOPPED); + + return 0; +} + + +static void +cothread_activate (GstFairSchedulerCothread * ct, gint priority) +{ + GST_DEBUG ("queue %p: activating cothread %p", ct->queue, ct); + + if (priority > 0) { + g_queue_push_head (ct->queue->ct_queue, ct); + } else { + g_queue_push_tail (ct->queue->ct_queue, ct); + } +} + + +static void +cothread_deactivate (GstFairSchedulerCothread * ct) +{ + GList *node; + + GST_DEBUG ("queue %p: deactivating cothread %p", ct->queue, ct); + + /* Find the node. */ + node = g_list_find (ct->queue->ct_queue->head, ct); + if (node == NULL) { + return; + } + + if (node->next == NULL) { + g_queue_pop_tail (ct->queue->ct_queue); + } else { + ct->queue->ct_queue->head = + g_list_remove_link (ct->queue->ct_queue->head, node); + } +} + + +static void +queue_async_op (GstFairSchedulerCothreadQueue * queue, AsyncOp * op) +{ + g_mutex_lock (queue->async_mutex); + g_queue_push_tail (queue->async_queue, op); + g_cond_signal (queue->new_async_op); + g_mutex_unlock (queue->async_mutex); +} + + +/* + * Cothreads API + */ + +extern GstFairSchedulerCothreadQueue * +gst_fair_scheduler_cothread_queue_new (void) +{ + GstFairSchedulerCothreadQueue *new; + + new = g_malloc (sizeof (GstFairSchedulerCothreadQueue)); + + new->context = NULL; + new->ct_queue = g_queue_new (); + + new->async_queue = g_queue_new (); + new->async_mutex = g_mutex_new (); + new->new_async_op = g_cond_new (); + + return new; +} + + +extern void +gst_fair_scheduler_cothread_queue_destroy (GstFairSchedulerCothreadQueue * + queue) +{ + GList *iter; + + /* Destroy all remaining cothreads. */ + for (iter = queue->ct_queue->head; iter != NULL; iter = iter->next) { + gst_fair_scheduler_cothread_destroy ( + (GstFairSchedulerCothread *) iter->data); + } + g_queue_free (queue->ct_queue); + + for (iter = queue->async_queue->head; iter != NULL; iter = iter->next) { + g_free (iter->data); + } + g_queue_free (queue->async_queue); + + g_mutex_free (queue->async_mutex); + g_cond_free (queue->new_async_op); + + g_free (queue); +} + + +extern void +gst_fair_scheduler_cothread_queue_start (GstFairSchedulerCothreadQueue * queue) +{ + if (queue->context == NULL) { + do_cothreads_init (NULL); + queue->context = do_cothread_context_init (); + } +} + + +extern void +gst_fair_scheduler_cothread_queue_stop (GstFairSchedulerCothreadQueue * queue) +{ + if (queue->context != NULL) { + do_cothread_context_destroy (queue->context); + } +} + + +gboolean +gst_fair_scheduler_cothread_queue_iterate (GstFairSchedulerCothreadQueue * + queue) +{ + GstFairSchedulerCothread *ct; + + g_return_val_if_fail (queue->context != NULL, FALSE); + + GST_LOG ("queue %p: iterating", queue); + + /* Perform any pending asynchronous operations. Checking the queue + is safe and more efficient without locking the mutex. */ + if (!g_queue_is_empty (queue->async_queue)) { + AsyncOp *basic_op; + + GST_LOG ("queue %p: processing asynchronous operations", queue); + + g_mutex_lock (queue->async_mutex); + + while (!g_queue_is_empty (queue->async_queue)) { + basic_op = (AsyncOp *) g_queue_pop_head (queue->async_queue); + + switch (basic_op->type) { + case ASYNC_OP_CHANGE_STATE: + { + AsyncOpChangeState *op = (AsyncOpChangeState *) basic_op; + + gst_fair_scheduler_cothread_change_state (op->ct, op->new_state); + } + break; + case ASYNC_OP_AWAKE: + { + AsyncOpAwake *op = (AsyncOpAwake *) basic_op; + + gst_fair_scheduler_cothread_awake (op->ct, op->priority); + } + break; + default: + g_return_val_if_reached (FALSE); + break; + } + + g_free (basic_op); + } + + g_mutex_unlock (queue->async_mutex); + } + + /* First cothread in the queue (if any) should get control. */ + ct = g_queue_peek_head (queue->ct_queue); + + if (ct == NULL) { + GTimeVal timeout; + + g_get_current_time (&timeout); + g_time_val_add (&timeout, 5000); + + /* No cothread available, wait until some other thread queues an + operation. */ + g_mutex_lock (queue->async_mutex); + g_cond_timed_wait (queue->new_async_op, queue->async_mutex, &timeout); + g_mutex_unlock (queue->async_mutex); + + return FALSE; + } + + g_return_val_if_fail (ct->state == GST_FAIRSCHEDULER_CTSTATE_RUNNING, FALSE); + + /* Check for a cothread mutex. */ + if (ct->mutex != NULL) { + g_mutex_lock (ct->mutex); + ct->mutex = NULL; + } + + GST_LOG ("queue %p: giving control to %p", queue, ct); + + /* Handle control to the cothread. */ + do_cothread_switch (ct->execst); + + return TRUE; +} + + +void +gst_fair_scheduler_cothread_queue_show (GstFairSchedulerCothreadQueue * queue) +{ + GList *iter; + GstFairSchedulerCothread *ct; + + g_print ("\n Running cothreads (last is active):\n"); + + for (iter = queue->ct_queue->tail; iter != NULL; iter = iter->prev) { + ct = (GstFairSchedulerCothread *) iter->data; + + g_print (" %p: %s (%d)\n", ct, ct->readable_name->str, ct->pid); + } +} + + +GstFairSchedulerCothread * +gst_fair_scheduler_cothread_new (GstFairSchedulerCothreadQueue * queue, + GstFairSchedulerCtFunc function, gpointer first_arg, ...) +{ + GstFairSchedulerCothread *new; + va_list ap; + gpointer arg; + + new = g_malloc (sizeof (GstFairSchedulerCothread)); + + new->queue = queue; + new->func = function; + + /* The first parameter is always the cothread structure itself. */ + new->argv[0] = (char *) new; + new->argc = 1; + + /* Store the parameters. */ + va_start (ap, first_arg); + arg = first_arg; + while (new->argc < GST_FAIRSCHEDULER_MAX_CTARGS && arg != NULL) { + new->argv[new->argc] = (char *) arg; + new->argc++; + arg = va_arg (ap, gpointer); + } + + /* Make sure we don't have more parameters than we can handle. */ + g_return_val_if_fail (arg == NULL, NULL); + + /* Creation of the actual execution state is defered to transition + to running/suspended. */ + new->execst = NULL; + + /* All cothreads are created in the stopped state. */ + new->state = GST_FAIRSCHEDULER_CTSTATE_STOPPED; + + new->mutex = NULL; + +#ifndef GST_DISABLE_GST_DEBUG + new->readable_name = g_string_new (""); + new->pid = 0; +#endif + + GST_DEBUG ("queue %p: cothread %p created", queue, new); + + return new; +} + + +void +gst_fair_scheduler_cothread_destroy (GstFairSchedulerCothread * ct) +{ + GST_DEBUG ("queue %p: destroying cothread %p", ct->queue, ct); + + if (ct->state != GST_FAIRSCHEDULER_CTSTATE_STOPPED) { + cothread_deactivate (ct); + } + + if (ct->execst != NULL) { + do_cothread_destroy (ct->execst); + } +#ifndef GST_DISABLE_GST_DEBUG + g_string_free (ct->readable_name, TRUE); +#endif + + g_free (ct); +} + + +void +gst_fair_scheduler_cothread_change_state (GstFairSchedulerCothread * ct, + gint new_state) +{ + if (new_state == ct->state) { + return; + } + + GST_DEBUG ("queue %p: changing state of %p from %s to %s", ct->queue, ct, + gst_fairscheduler_ct_state_names[ct->state], + gst_fairscheduler_ct_state_names[new_state]); + + switch (ct->state) { + case GST_FAIRSCHEDULER_CTSTATE_STOPPED: + /* (Re)Initialize the cothread. */ + if (ct->execst == NULL) { + /* Initialize cothread's execution state. */ + do_cothread_create (ct->execst, ct->queue->context, + cothread_base_func, ct->argc, ct->argv); + GST_LOG_OBJECT (ct->queue, + "cothread %p has exec state %p", ct, ct->execst); + } else { + /* Reset cothread's execution state. */ + do_cothread_setfunc (ct->execst, ct->queue->context, + cothread_base_func, ct->argc, ct->argv); + } + + ct->sleeping = FALSE; + + if (new_state == GST_FAIRSCHEDULER_CTSTATE_RUNNING) { + cothread_activate (ct, 0); + } + + break; + + case GST_FAIRSCHEDULER_CTSTATE_RUNNING: + if (!ct->sleeping) { + cothread_deactivate (ct); + } + break; + + case GST_FAIRSCHEDULER_CTSTATE_SUSPENDED: + if (new_state == GST_FAIRSCHEDULER_CTSTATE_RUNNING && !ct->sleeping) { + cothread_activate (ct, 0); + } + break; + } + + ct->state = new_state; +} + + +void +gst_fair_scheduler_cothread_change_state_async (GstFairSchedulerCothread * ct, + gint new_state) +{ + AsyncOpChangeState *op; + + /* Queue an asynchronous operation. */ + op = g_new (AsyncOpChangeState, 1); + op->parent.type = ASYNC_OP_CHANGE_STATE; + op->ct = ct; + op->new_state = new_state; + + queue_async_op (ct->queue, (AsyncOp *) op); +} + + +void +gst_fair_scheduler_cothread_sleep (GstFairSchedulerCothreadQueue * queue) +{ + gst_fair_scheduler_cothread_sleep_mutex (queue, NULL); +} + + +/* + * Go to sleep but unblock the mutex while sleeping. + */ +void +gst_fair_scheduler_cothread_sleep_mutex (GstFairSchedulerCothreadQueue * queue, + GMutex * mutex) +{ + GstFairSchedulerCothread *ct; + + g_return_if_fail (queue->context != NULL); + + /* The sleep operation can be invoked when the cothread is already + deactivated. */ + ct = gst_fair_scheduler_cothread_current (queue); + if (ct != NULL && ct->execst == do_cothread_get_current (queue->context)) { + ct = g_queue_pop_head (queue->ct_queue); + ct->sleeping = TRUE; + } + + ct->mutex = mutex; + if (mutex != NULL) { + g_mutex_unlock (mutex); + } + + GST_LOG ("queue %p: cothread going to sleep", queue); + + /* Switch back to the main cothread. */ + do_cothread_switch (do_cothread_get_main (queue->context)); +} + + +void +gst_fair_scheduler_cothread_yield (GstFairSchedulerCothreadQueue * queue) +{ + gst_fair_scheduler_cothread_yield_mutex (queue, NULL); +} + + +void +gst_fair_scheduler_cothread_yield_mutex (GstFairSchedulerCothreadQueue * queue, + GMutex * mutex) +{ + GstFairSchedulerCothread *ct; + + g_return_if_fail (queue->context != NULL); + + /* The yield operation can be invoked when the cothread is already + deactivated. */ + ct = gst_fair_scheduler_cothread_current (queue); + if (ct != NULL && ct->execst == do_cothread_get_current (queue->context)) { + ct = g_queue_pop_head (queue->ct_queue); + g_queue_push_tail (queue->ct_queue, ct); + } + + ct->mutex = mutex; + if (mutex != NULL) { + g_mutex_unlock (mutex); + } + + GST_LOG ("queue %p: cothread yielding control", queue); + + /* Switch back to the main cothread. */ + do_cothread_switch (do_cothread_get_main (queue->context)); +} + + +void +gst_fair_scheduler_cothread_awake (GstFairSchedulerCothread * ct, gint priority) +{ + g_return_if_fail (ct->state != GST_FAIRSCHEDULER_CTSTATE_STOPPED); + + if (!ct->sleeping) { + /* Cothread is already awake. */ + return; + } + + ct->sleeping = FALSE; + + if (ct->state == GST_FAIRSCHEDULER_CTSTATE_RUNNING) { + cothread_activate (ct, priority); + } +} + + +void +gst_fair_scheduler_cothread_awake_async (GstFairSchedulerCothread * ct, + gint priority) +{ + AsyncOpAwake *op; + + /* Queue an asynchronous operation. */ + op = g_new (AsyncOpAwake, 1); + op->parent.type = ASYNC_OP_AWAKE; + op->ct = ct; + op->priority = priority; + + queue_async_op (ct->queue, (AsyncOp *) op); +} + + +GstFairSchedulerCothread * +gst_fair_scheduler_cothread_current (GstFairSchedulerCothreadQueue * queue) +{ + return g_queue_peek_head (queue->ct_queue); +} diff --git a/gst/schedulers/faircothreads.h b/gst/schedulers/faircothreads.h new file mode 100644 index 0000000000..98faeca730 --- /dev/null +++ b/gst/schedulers/faircothreads.h @@ -0,0 +1,163 @@ +/* GStreamer + * Copyright (C) 2004 Martin Soto + * + * faircothread.h: High level cothread implementation for the fair scheduler. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __FAIRCOTHREADS_H__ +#define __FAIRCOTHREADS_H__ + + +#ifdef _COTHREADS_PTH +#include "pth-cothreads.h" +#else +#define GTHREAD_COTHREADS_NO_DEFINITIONS +#include "cothreads_compat.h" +#endif + + +typedef struct _GstFairSchedulerCothread GstFairSchedulerCothread; +typedef struct _GstFairSchedulerCothreadQueue GstFairSchedulerCothreadQueue; + +/* Possible states of a cothread. */ +enum +{ + GST_FAIRSCHEDULER_CTSTATE_STOPPED, + GST_FAIRSCHEDULER_CTSTATE_SUSPENDED, + GST_FAIRSCHEDULER_CTSTATE_RUNNING, +}; + +/* Maximum number of cothread parameters. */ +#define GST_FAIRSCHEDULER_MAX_CTARGS 7 + +/* Cothread function type. */ +typedef void (*GstFairSchedulerCtFunc) (GstFairSchedulerCothread * ct, + gpointer first_arg, ...); + +struct _GstFairSchedulerCothread { + GstFairSchedulerCothreadQueue *queue; + /* Cothread queue this cothread + belongs to. */ + GstFairSchedulerCtFunc func; /* Cothread function. */ + char *argv[1 + GST_FAIRSCHEDULER_MAX_CTARGS]; /* + Arguments for the cothread function. + argv[0] is always the cothread + object itself. */ + int argc; /* Number of stored parameters. */ + + cothread *execst; /* Execution state for this cothread. */ + gint state; /* Current cothread state. */ + gboolean sleeping; /* Is this cothread sleeping? */ + + GMutex *mutex; /* If not null, a mutex to lock before + giving control to this cothread. */ + +#ifndef GST_DISABLE_GST_DEBUG + GString *readable_name; /* Readable name for this cothread. */ + gint pid; /* Process or thread id associated to + this cothread. */ +#endif +}; + +struct _GstFairSchedulerCothreadQueue { + cothread_context *context; /* Cothread context. */ + GQueue *ct_queue; /* Queue of currently running + cothreads. New cothreads are pushed + on the tail. If a cothread is + executing, it is the one in the + head. */ + + /* Asynchronous support. */ + GQueue *async_queue; /* Queue storing asynchronous + operations (operations on the queue + requested potentially from other + threads. */ + GMutex *async_mutex; /* Mutex to protect acceses to + async_queue. */ + GCond *new_async_op; /* Condition variable to signal the + presence of a new asynchronous + operation in the queue. */ +}; + + +extern GstFairSchedulerCothreadQueue * +gst_fair_scheduler_cothread_queue_new (void); + +extern void +gst_fair_scheduler_cothread_queue_destroy ( + GstFairSchedulerCothreadQueue * queue); + +extern void +gst_fair_scheduler_cothread_queue_start ( + GstFairSchedulerCothreadQueue * queue); + +extern void +gst_fair_scheduler_cothread_queue_stop ( + GstFairSchedulerCothreadQueue * queue); + +extern gboolean +gst_fair_scheduler_cothread_queue_iterate ( + GstFairSchedulerCothreadQueue * queue); + +extern void +gst_fair_scheduler_cothread_queue_show ( + GstFairSchedulerCothreadQueue * queue); + + +extern GstFairSchedulerCothread * +gst_fair_scheduler_cothread_new (GstFairSchedulerCothreadQueue * queue, + GstFairSchedulerCtFunc function, gpointer first_arg, ...); + +extern void +gst_fair_scheduler_cothread_destroy (GstFairSchedulerCothread * ct); + +extern void +gst_fair_scheduler_cothread_change_state (GstFairSchedulerCothread * ct, + gint new_state); + +extern void +gst_fair_scheduler_cothread_change_state_async ( + GstFairSchedulerCothread * ct, gint new_state); + +extern void +gst_fair_scheduler_cothread_sleep (GstFairSchedulerCothreadQueue * queue); + +extern void +gst_fair_scheduler_cothread_sleep_mutex ( + GstFairSchedulerCothreadQueue * queue, GMutex * mutex); + +extern void +gst_fair_scheduler_cothread_yield (GstFairSchedulerCothreadQueue * queue); + +extern void +gst_fair_scheduler_cothread_yield_mutex ( + GstFairSchedulerCothreadQueue * queue, GMutex * mutex); + +extern void +gst_fair_scheduler_cothread_awake (GstFairSchedulerCothread * ct, + gint priority); + +extern void +gst_fair_scheduler_cothread_awake_async (GstFairSchedulerCothread * ct, + gint priority); + +extern GstFairSchedulerCothread * +gst_fair_scheduler_cothread_current (GstFairSchedulerCothreadQueue * queue); + + +#endif /* __FAIRCOTHREADS_H__ */ diff --git a/gst/schedulers/fairscheduler.c b/gst/schedulers/fairscheduler.c new file mode 100644 index 0000000000..a395646faa --- /dev/null +++ b/gst/schedulers/fairscheduler.c @@ -0,0 +1,1405 @@ +/* GStreamer + * Copyright (C) 2004 Martin Soto + * + * gstfairscheduler.c: Fair cothread based scheduler + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include + +#include +#include + +#include "faircothreads.h" + + +GST_DEBUG_CATEGORY_STATIC (debug_fair); +#define GST_CAT_DEFAULT debug_fair +GST_DEBUG_CATEGORY (debug_fair_ct); +GST_DEBUG_CATEGORY_STATIC (debug_fair_queues); + + +typedef struct _GstFairScheduler GstFairScheduler; +typedef struct _GstFairSchedulerClass GstFairSchedulerClass; +typedef struct _GstFairSchedulerPrivElem GstFairSchedulerPrivElem; +typedef struct _GstFairSchedulerPrivLink GstFairSchedulerPrivLink; +typedef struct _GstFairSchedulerWaitEntry GstFairSchedulerWaitEntry; + + +#define GST_TYPE_FAIR_SCHEDULER \ + (gst_fair_scheduler_get_type()) +#define GST_FAIR_SCHEDULER(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_FAIR_SCHEDULER,GstFairScheduler)) +#define GST_FAIR_SCHEDULER_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_FAIR_SCHEDULER,GstFairSchedulerClass)) +#define GST_IS_FAIR_SCHEDULER(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_FAIR_SCHEDULER)) +#define GST_IS_FAIR_SCHEDULER_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_FAIR_SCHEDULER)) + + +/* Private scheduler data associated to an element. */ +struct _GstFairSchedulerPrivElem +{ + GstFairSchedulerCothread *elem_ct; + /* Element's cothread. */ + GArray *chain_get_pads; /* Pads in this element with either a + get or a chain function. */ +}; + +#define ELEM_PRIVATE(element) \ + ((GstFairSchedulerPrivElem *) GST_ELEMENT(element)->sched_private) + + +/* Private scheduler data associated to a pad link. This structure is + always stored in the source pad of the link. */ +struct _GstFairSchedulerPrivLink +{ + GstFairScheduler *owner; /* The "owner" of this link. */ + + GstData *bufpen; /* A placeholder for one buffer. */ + GstFairSchedulerCothread *waiting_writer; + /* Cothread waiting to write. */ + GstFairSchedulerCothread *waiting_reader; + /* Cothread waiting to read. */ + + GstFairSchedulerCothread *decoupled_ct; + /* Cothread to handle the decoupled + pad in this link (if any). */ + gulong decoupled_signal_id; /* Id for the signal handler + responsible for managing the + cothread. */ + + /* Queue optimizations. */ + gulong queue_blocked_signal_id; + /* Id for the signal handler connected + to the under/overrun signal of a + queue. */ + GstFairSchedulerCothread *waiting_for_queue; + /* Cothread waiting for a queue to + unblock. */ +}; + +#define LINK_PRIVATE(pad) \ + ((GstFairSchedulerPrivLink *) \ + (GST_PAD_IS_SRC (pad) ? \ + GST_REAL_PAD(pad)->sched_private : \ + GST_RPAD_PEER (GST_REAL_PAD(pad))->sched_private)) + + +/* An entry in the clock wait list. */ +struct _GstFairSchedulerWaitEntry +{ + GstFairSchedulerCothread *ct; /* The waiting cothread. */ + GstClockTime time; /* The clock time it should wake up + on. */ +}; + + +struct _GstFairScheduler +{ + GstScheduler parent; + + GstFairSchedulerCothreadQueue *cothreads; + /* The queue handling the cothreads + for the scheduler. */ + + /* Scheduling control. */ + gboolean in_element; /* True if we are running element + code. */ + + /* Clock wait support. */ + GSList *waiting; /* List of waiting cothreads. Elements + are GstFairSchedulerWaitEntry + structures. */ + + /* Timing statistics. */ + GTimer *iter_timer; /* Iteration timer. */ + guint iter_count; /* Iteration count. */ + +#ifndef GST_DISABLE_GST_DEBUG + GList *elements; /* List of all registered elements + (needed only for debugging. */ + GList *sources; /* List of all source pads involved in + registered links (needed only for + debugging. */ +#endif +}; + + +struct _GstFairSchedulerClass +{ + GstSchedulerClass parent_class; +}; + + +static GType _gst_fair_scheduler_type = 0; + + +enum +{ + ARG_0, +}; + + +/* Standard GObject Operations */ + +static GType gst_fair_scheduler_get_type (void); + +static void gst_fair_scheduler_class_init (GstFairSchedulerClass * klass); + +static void gst_fair_scheduler_init (GObject * object); + +static void gst_fair_scheduler_dispose (GObject * object); + +static void +gst_fair_scheduler_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +static void +gst_fair_scheduler_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); + + +/* Cothread Function Wrappers */ + +static void +gst_fair_scheduler_loop_wrapper (GstFairSchedulerCothread * ct, + GstElement * element); + + +/* Chain, Get and Event Handlers */ + +static void gst_fair_scheduler_chain_handler (GstPad * pad, GstData * data); + +static GstData *gst_fair_scheduler_get_handler (GstPad * pad); + + +/* GstScheduler Operations */ + +static void gst_fair_scheduler_setup (GstScheduler * sched); + +static void gst_fair_scheduler_reset (GstScheduler * sched); + +static void +gst_fair_scheduler_add_element (GstScheduler * sched, GstElement * element); + +static void +gst_fair_scheduler_remove_element (GstScheduler * sched, GstElement * element); + +static GstElementStateReturn +gst_fair_scheduler_state_transition (GstScheduler * sched, + GstElement * element, gint transition); + +static void +decoupled_state_transition (GstElement * element, gint old_state, + gint new_state, gpointer user_data); + +static void +gst_fair_scheduler_scheduling_change (GstScheduler * sched, + GstElement * element); + +static gboolean +gst_fair_scheduler_yield (GstScheduler * sched, GstElement * element); + +static gboolean +gst_fair_scheduler_interrupt (GstScheduler * sched, GstElement * element); + +static void +gst_fair_scheduler_error (GstScheduler * sched, GstElement * element); + +static void +gst_fair_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad, + GstPad * sinkpad); + +static void +gst_fair_scheduler_pad_unlink (GstScheduler * sched, GstPad * srcpad, + GstPad * sinkpad); + +static GstData *gst_fair_scheduler_pad_select (GstScheduler * sched, + GstPad ** pulled_from, GstPad ** pads); + +static GstClockReturn +gst_fair_scheduler_clock_wait (GstScheduler * sched, GstElement * element, + GstClockID id, GstClockTimeDiff * jitter); + +static GstSchedulerState gst_fair_scheduler_iterate (GstScheduler * sched); + +static void gst_fair_scheduler_show (GstScheduler * sched); + + +static GstSchedulerClass *parent_class = NULL; + + +/* + * Standard GObject Operations + */ + +static GType +gst_fair_scheduler_get_type (void) +{ + if (!_gst_fair_scheduler_type) { + static const GTypeInfo scheduler_info = { + sizeof (GstFairSchedulerClass), + NULL, + NULL, + (GClassInitFunc) gst_fair_scheduler_class_init, + NULL, + NULL, + sizeof (GstFairScheduler), + 0, + (GInstanceInitFunc) gst_fair_scheduler_init, + NULL + }; + + _gst_fair_scheduler_type = g_type_register_static (GST_TYPE_SCHEDULER, + "GstFair" COTHREADS_NAME_CAPITAL "Scheduler", &scheduler_info, 0); + } + return _gst_fair_scheduler_type; +} + + +static void +gst_fair_scheduler_class_init (GstFairSchedulerClass * klass) +{ + GObjectClass *gobject_class; + GstObjectClass *gstobject_class; + GstSchedulerClass *gstscheduler_class; + + gobject_class = (GObjectClass *) klass; + gstobject_class = (GstObjectClass *) klass; + gstscheduler_class = (GstSchedulerClass *) klass; + + parent_class = g_type_class_ref (GST_TYPE_SCHEDULER); + + gobject_class->set_property = gst_fair_scheduler_set_property; + gobject_class->get_property = gst_fair_scheduler_get_property; + gobject_class->dispose = gst_fair_scheduler_dispose; + + gstscheduler_class->setup = gst_fair_scheduler_setup; + gstscheduler_class->reset = gst_fair_scheduler_reset; + gstscheduler_class->add_element = gst_fair_scheduler_add_element; + gstscheduler_class->remove_element = gst_fair_scheduler_remove_element; + gstscheduler_class->state_transition = gst_fair_scheduler_state_transition; + gstscheduler_class->scheduling_change = gst_fair_scheduler_scheduling_change; + gstscheduler_class->yield = gst_fair_scheduler_yield; + gstscheduler_class->interrupt = gst_fair_scheduler_interrupt; + gstscheduler_class->error = gst_fair_scheduler_error; + gstscheduler_class->pad_link = gst_fair_scheduler_pad_link; + gstscheduler_class->pad_unlink = gst_fair_scheduler_pad_unlink; + gstscheduler_class->pad_select = gst_fair_scheduler_pad_select; + gstscheduler_class->clock_wait = gst_fair_scheduler_clock_wait; + gstscheduler_class->iterate = gst_fair_scheduler_iterate; + gstscheduler_class->show = gst_fair_scheduler_show; +} + + +static void +gst_fair_scheduler_init (GObject * object) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (object); + + fsched->cothreads = gst_fair_scheduler_cothread_queue_new (); + + /* Proudly suporting the select operation since 2004! */ + GST_FLAG_SET (fsched, GST_SCHEDULER_FLAG_NEW_API); + + fsched->in_element = FALSE; + + fsched->waiting = NULL; + + fsched->iter_timer = g_timer_new (); + +#ifndef GST_DISABLE_GST_DEBUG + fsched->elements = NULL; + fsched->sources = NULL; +#endif +} + + +static void +gst_fair_scheduler_dispose (GObject * object) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (object); + + GST_WARNING_OBJECT (fsched, "disposing"); + + g_slist_free (fsched->waiting); + + g_timer_destroy (fsched->iter_timer); + + gst_fair_scheduler_cothread_queue_destroy (fsched->cothreads); + +#ifndef GST_DISABLE_GST_DEBUG + g_list_free (fsched->elements); + g_list_free (fsched->sources); +#endif + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + + +static void +gst_fair_scheduler_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + /*GstFairScheduler *fsched = GST_FAIR_SCHEDULER (object); */ + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + + +static void +gst_fair_scheduler_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + /*GstFairScheduler *fsched = GST_FAIR_SCHEDULER (object); */ + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + + +/* + * Helpers + */ + +static GstFairSchedulerPrivLink * +get_link_priv (GstPad * pad) +{ + GstFairSchedulerPrivLink *priv; + + GstRealPad *real = GST_PAD_REALIZE (pad); + + if (GST_RPAD_DIRECTION (real) == GST_PAD_SINK) { + real = GST_RPAD_PEER (real); + } + + priv = LINK_PRIVATE (real); + g_return_val_if_fail (priv != NULL, NULL); + + return priv; +} + + +static void +set_cothread_state (GstFairSchedulerCothread * ct, GstElementState state) +{ + guint ct_state; + + switch (state) { + case GST_STATE_PLAYING: + ct_state = GST_FAIRSCHEDULER_CTSTATE_RUNNING; + break; + case GST_STATE_PAUSED: + ct_state = GST_FAIRSCHEDULER_CTSTATE_SUSPENDED; + break; + default: + ct_state = GST_FAIRSCHEDULER_CTSTATE_STOPPED; + break; + } + + gst_fair_scheduler_cothread_change_state_async (ct, ct_state); +} + + +static GstPad * +find_ready_pad (GstPad ** pads) +{ + GstPad *pad; + GstFairSchedulerPrivLink *priv; + int i; + + for (i = 0; pads[i] != NULL; i++) { + pad = pads[i]; + priv = LINK_PRIVATE (pad); + + if (GST_PAD_IS_SRC (pad) && priv->bufpen == NULL) { + return pad; + } else if (GST_PAD_IS_SINK (pad) && priv->bufpen != NULL) { + return pad; + } + } + + return NULL; +} + + +static GstPad * +gst_fair_scheduler_internal_select (GstFairScheduler * fsched, GstPad ** pads) +{ + GstPad *pad; + GstFairSchedulerPrivLink *priv; + int i; + + pad = find_ready_pad (pads); + if (pad == NULL) { + /* Register the current cothread as waiting writer/reader for + every pad on the list. */ + for (i = 0; pads[i] != NULL; i++) { + pad = pads[i]; + priv = LINK_PRIVATE (pad); + + if (GST_PAD_IS_SRC (pad)) { + g_return_val_if_fail (priv->waiting_writer == NULL, NULL); + priv->waiting_writer = + gst_fair_scheduler_cothread_current (fsched->cothreads); + } else { + g_return_val_if_fail (priv->waiting_reader == NULL, NULL); + priv->waiting_reader = + gst_fair_scheduler_cothread_current (fsched->cothreads); + } + } + + /* Sleep until at least one of the pads becomes ready. */ + gst_fair_scheduler_cothread_sleep (fsched->cothreads); + + /* Deregister from all pads. */ + for (i = 0; pads[i] != NULL; i++) { + pad = pads[i]; + priv = LINK_PRIVATE (pad); + + if (GST_PAD_IS_SRC (pad)) { + priv->waiting_writer = NULL; + } else { + priv->waiting_reader = NULL; + } + } + + /* This time it should work. */ + pad = find_ready_pad (pads); + } + + /* At this point, we must have a pad to return. */ + g_return_val_if_fail (pad != NULL, NULL); + + return pad; +} + + +/* + * Cothread Function Wrappers + */ + +static void +gst_fair_scheduler_loop_wrapper (GstFairSchedulerCothread * ct, + GstElement * element) +{ + GST_DEBUG ("Queue %p: entering loop wrapper for '%s'", ct->queue, + GST_OBJECT_NAME (element)); + + g_return_if_fail (element->loopfunc != NULL); + + gst_object_ref (GST_OBJECT (element)); + + while (gst_element_get_state (element) == GST_STATE_PLAYING) { + element->loopfunc (element); + } + + gst_object_unref (GST_OBJECT (element)); + + GST_DEBUG ("Queue %p: leaving loop wrapper for '%s'", ct->queue, + GST_OBJECT_NAME (element)); +} + + +static void +gst_fair_scheduler_chain_get_wrapper (GstFairSchedulerCothread * ct, + GstElement * element) +{ + GstData *data; + GstPad *pad; + GstFairScheduler *fsched = + GST_FAIR_SCHEDULER (gst_element_get_scheduler (element)); + + GST_DEBUG ("Queue %p: entering chain/get wrapper for '%s'", ct->queue, + GST_OBJECT_NAME (element)); + + gst_object_ref (GST_OBJECT (element)); + + while (gst_element_get_state (element) == GST_STATE_PLAYING) { + /* Run a select on the pad list. */ + pad = gst_fair_scheduler_internal_select (fsched, + (GstPad **) ELEM_PRIVATE (element)->chain_get_pads->data); + + if (GST_PAD_IS_SRC (pad)) { + g_return_if_fail (GST_RPAD_GETFUNC (pad) != NULL); + data = gst_pad_call_get_function (pad); + gst_pad_push (pad, data); + } else { + g_return_if_fail (GST_RPAD_CHAINFUNC (pad) != NULL); + data = gst_pad_pull (pad); + gst_pad_call_chain_function (pad, data); + } + } + + gst_object_unref (GST_OBJECT (element)); + + GST_DEBUG ("Queue %p: leaving chain/get wrapper for '%s'", ct->queue, + GST_OBJECT_NAME (element)); +} + + +static void +gst_fair_scheduler_queue_read_blocked_handler (GstQueue * queue, GstPad * pad) +{ + GstFairSchedulerPrivLink *priv; + + priv = LINK_PRIVATE (pad); + + GST_CAT_LOG_OBJECT (debug_fair_queues, priv->owner, + "entering \"blocked\" handler for pad '%s:%s'", GST_DEBUG_PAD_NAME (pad)); + + gst_fair_scheduler_cothread_sleep (priv->owner->cothreads); + + GST_CAT_LOG_OBJECT (debug_fair_queues, priv->owner, + "leaving \"blocked\" handler for queue '%s:%s'", + GST_DEBUG_PAD_NAME (pad)); +} + + +static void +gst_fair_scheduler_decoupled_chain_wrapper (GstFairSchedulerCothread * ct, + GstPad * pad) +{ + GstElement *parent = GST_PAD_PARENT (pad); + GstFairSchedulerPrivLink *priv; + GstData *data; + + g_return_if_fail (GST_RPAD_CHAINFUNC (pad) != NULL); + + priv = LINK_PRIVATE (pad); + + GST_DEBUG ("Queue %p: entering chain wrapper loop for '%s:%s'", ct->queue, + GST_DEBUG_PAD_NAME (pad)); + + gst_object_ref (GST_OBJECT (parent)); + + while (gst_element_get_state (parent) == GST_STATE_PLAYING) { + data = gst_pad_pull (pad); + + gst_pad_call_chain_function (pad, data); + + if (priv->waiting_for_queue != NULL) { + gst_fair_scheduler_cothread_awake_async (priv->waiting_for_queue, 0); + } + } + + gst_object_unref (GST_OBJECT (parent)); + + GST_DEBUG ("Queue %p: leaving chain wrapper loop for '%s:%s'", + ct->queue, GST_DEBUG_PAD_NAME (pad)); +} + + +static void +gst_fair_scheduler_decoupled_get_wrapper (GstFairSchedulerCothread * ct, + GstPad * pad) +{ + GstElement *parent = GST_PAD_PARENT (pad); + GstFairSchedulerPrivLink *priv, *sink_priv = NULL; + GstData *data; + + g_return_if_fail (GST_RPAD_GETFUNC (pad) != NULL); + + priv = LINK_PRIVATE (pad); + + if (GST_IS_QUEUE (parent)) { + /* Decoupled elements are almost always queues. We optimize for + this case. The signal handler stops the cothread when the queue + has no material available. */ + + priv->queue_blocked_signal_id = g_signal_connect (parent, + "underrun", + (GCallback) gst_fair_scheduler_queue_read_blocked_handler, pad); + + /* Register this cothread at the opposite side of the queue. */ + sink_priv = LINK_PRIVATE (gst_element_get_pad (parent, "sink")); + sink_priv->waiting_for_queue = ct; + } + + GST_DEBUG ("Queue %p: entering get wrapper loop for '%s:%s'", ct->queue, + GST_DEBUG_PAD_NAME (pad)); + + gst_object_ref (GST_OBJECT (parent)); + + while (gst_element_get_state (parent) == GST_STATE_PLAYING) { + data = gst_pad_call_get_function (pad); + gst_pad_push (pad, data); + } + + gst_object_unref (GST_OBJECT (parent)); + + GST_DEBUG ("Queue %p: leaving get wrapper loop for '%s:%s'", ct->queue, + GST_DEBUG_PAD_NAME (pad)); + + if (GST_IS_QUEUE (parent)) { + sink_priv->waiting_for_queue = NULL; + + /* Disconnect from the signal. */ + g_signal_handler_disconnect (parent, priv->queue_blocked_signal_id); + priv->queue_blocked_signal_id = 0; + } +} + + +/* + * Chain and Get Handlers + */ + +static void +gst_fair_scheduler_chain_handler (GstPad * pad, GstData * data) +{ + GstFairSchedulerPrivLink *priv = get_link_priv (pad); + GstFairScheduler *fsched = priv->owner; + + while (priv->bufpen != NULL) { + /* The buffer is full. Sleep until it's available again. */ + if (priv->waiting_writer != NULL) { + GST_ERROR_OBJECT (fsched, + "concurrent writers not supported, pad '%s:%s', waiting %p, " + "current %p, ", GST_DEBUG_PAD_NAME (pad), + priv->waiting_writer, + gst_fair_scheduler_cothread_current (fsched->cothreads)); + return; + } + priv->waiting_writer = + gst_fair_scheduler_cothread_current (fsched->cothreads); + gst_fair_scheduler_cothread_sleep (fsched->cothreads); + + /* After sleeping we must be at the head. */ + g_return_if_fail (priv->waiting_writer == + gst_fair_scheduler_cothread_current (fsched->cothreads)); + priv->waiting_writer = NULL; + } + + g_return_if_fail (priv->bufpen == NULL); + + /* Fill the bufpen. */ + priv->bufpen = data; + + /* If there's a waiting reader, wake it up. */ + if (priv->waiting_reader != NULL) { + gst_fair_scheduler_cothread_awake (priv->waiting_reader, 0); + } + + GST_LOG_OBJECT (fsched, "pushed data <%p> on pad '%s:%s'", + data, GST_DEBUG_PAD_NAME (GST_RPAD_PEER (pad))); +} + + +static GstData * +gst_fair_scheduler_get_handler (GstPad * pad) +{ + GstFairSchedulerPrivLink *priv = get_link_priv (pad); + GstFairScheduler *fsched = priv->owner; + GstData *ret; + + while (priv->bufpen == NULL) { + /* The buffer is empty. Sleep until there's something to read. */ + if (priv->waiting_reader != NULL) { + GST_ERROR_OBJECT (fsched, "concurrent readers not supported"); + return NULL; + } + priv->waiting_reader = + gst_fair_scheduler_cothread_current (fsched->cothreads); + gst_fair_scheduler_cothread_sleep (fsched->cothreads); + + /* We should still be there after sleeping. */ + g_return_val_if_fail (priv->waiting_reader == + gst_fair_scheduler_cothread_current (fsched->cothreads), NULL); + priv->waiting_reader = NULL; + } + + g_return_val_if_fail (priv->bufpen != NULL, NULL); + + /* Empty the bufpen. */ + ret = priv->bufpen; + priv->bufpen = NULL; + + /* If there's a waiting writer, wake it up. */ + if (priv->waiting_writer != NULL) { + gst_fair_scheduler_cothread_awake (priv->waiting_writer, 0); + } + + GST_LOG_OBJECT (fsched, "pulled data <%p> from pad '%s:%s'", + ret, GST_DEBUG_PAD_NAME (GST_RPAD_PEER (pad))); + + return ret; +} + + +/* + * GstScheduler Entry Points + */ + +static void +gst_fair_scheduler_setup (GstScheduler * sched) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + + GST_DEBUG_OBJECT (fsched, "setting up scheduler"); + + /* Initialize the cothread system. */ + gst_fair_scheduler_cothread_queue_start (fsched->cothreads); + + fsched->iter_count = 0; + g_timer_start (fsched->iter_timer); +} + + +static void +gst_fair_scheduler_reset (GstScheduler * sched) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + + GST_DEBUG_OBJECT (fsched, "resetting scheduler"); + + g_timer_stop (fsched->iter_timer); + { + gulong msecs; + double elapsed = g_timer_elapsed (fsched->iter_timer, &msecs); + + GST_INFO_OBJECT (fsched, + "%u iterations in %0.3fs, %.0f iterations/sec.", + fsched->iter_count, elapsed, fsched->iter_count / elapsed); + } + + /* Shut down the cothreads system. */ + gst_fair_scheduler_cothread_queue_stop (fsched->cothreads); +} + + +static void +gst_fair_scheduler_add_element (GstScheduler * sched, GstElement * element) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + GstFairSchedulerPrivElem *priv; + + if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) { + /* Decoupled elements don't have their own cothread. Their pads do + have one, though, but it is assigned in the link operation. */ + return; + } + + GST_DEBUG_OBJECT (fsched, "adding element '%s'", GST_OBJECT_NAME (element)); + + g_return_if_fail (ELEM_PRIVATE (element) == NULL); + + priv = g_malloc (sizeof (GstFairSchedulerPrivElem)); + + /* Create the element's cothread. */ + if (element->loopfunc != NULL) { + priv->elem_ct = + gst_fair_scheduler_cothread_new (fsched->cothreads, + (GstFairSchedulerCtFunc) gst_fair_scheduler_loop_wrapper, + element, NULL); +#ifndef GST_DISABLE_GST_DEBUG + g_string_printf (priv->elem_ct->readable_name, "%s:loop", + GST_OBJECT_NAME (element)); +#endif + GST_CAT_INFO_OBJECT (debug_fair_ct, fsched, + "cothread %p is loop for element '%s'", + priv->elem_ct, GST_OBJECT_NAME (element)); + } else { + priv->elem_ct = + gst_fair_scheduler_cothread_new (fsched->cothreads, + (GstFairSchedulerCtFunc) gst_fair_scheduler_chain_get_wrapper, + element, NULL); +#ifndef GST_DISABLE_GST_DEBUG + g_string_printf (priv->elem_ct->readable_name, "%s:chain/get", + GST_OBJECT_NAME (element)); +#endif + GST_CAT_INFO_OBJECT (debug_fair_ct, fsched, + "cothread %p is chain/get for element '%s'", + priv->elem_ct, GST_OBJECT_NAME (element)); + } + + set_cothread_state (priv->elem_ct, gst_element_get_state (element)); + + priv->chain_get_pads = g_array_new (TRUE, FALSE, sizeof (GstPad *)); + + element->sched_private = priv; + +#ifndef GST_DISABLE_GST_DEBUG + fsched->elements = g_list_prepend (fsched->elements, element); +#endif +} + + +static void +gst_fair_scheduler_remove_element (GstScheduler * sched, GstElement * element) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + GstFairSchedulerPrivElem *priv = ELEM_PRIVATE (element); + + if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) { + return; + } + + GST_DEBUG_OBJECT (fsched, "removing element '%s'", GST_OBJECT_NAME (element)); + + g_return_if_fail (priv != NULL); + + /* Clean up the cothread. */ + g_return_if_fail (priv->elem_ct != NULL); + gst_fair_scheduler_cothread_destroy (priv->elem_ct); + +#ifndef GST_DISABLE_GST_DEBUG + fsched->elements = g_list_remove (fsched->elements, element); +#endif + + g_free (priv); + element->sched_private = NULL; +} + + +static void +gst_fair_scheduler_pad_link (GstScheduler * sched, GstPad * srcpad, + GstPad * sinkpad) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + GstFairSchedulerPrivLink *priv; + GstElement *src_parent, *sink_parent; + + g_return_if_fail (LINK_PRIVATE (srcpad) == NULL); + + GST_DEBUG_OBJECT (fsched, "linking pads '%s:%s' and '%s:%s'", + GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad)); + + /* Initialize the private information block. */ + priv = g_malloc (sizeof (GstFairSchedulerPrivLink)); + + priv->owner = fsched; + priv->bufpen = NULL; + priv->waiting_writer = NULL; + priv->waiting_reader = NULL; + priv->decoupled_ct = NULL; + priv->decoupled_signal_id = 0; + priv->queue_blocked_signal_id = 0; + priv->waiting_for_queue = NULL; + + GST_REAL_PAD (srcpad)->sched_private = priv; + + src_parent = GST_PAD_PARENT (srcpad); + sink_parent = GST_PAD_PARENT (sinkpad); + + if (GST_RPAD_GETFUNC (srcpad) != NULL) { + if (GST_FLAG_IS_SET (src_parent, GST_ELEMENT_DECOUPLED)) { + /* Pad is decoupled. Create a separate cothread to run its get + function. */ + priv->decoupled_ct = + gst_fair_scheduler_cothread_new (fsched->cothreads, + (GstFairSchedulerCtFunc) gst_fair_scheduler_decoupled_get_wrapper, + srcpad, NULL); +#ifndef GST_DISABLE_GST_DEBUG + g_string_printf (priv->decoupled_ct->readable_name, "%s:%s:get", + GST_DEBUG_PAD_NAME (srcpad)); +#endif + GST_CAT_INFO_OBJECT (debug_fair_ct, fsched, + "cothread %p is get for pad '%s:%s'", + priv->decoupled_ct, GST_DEBUG_PAD_NAME (srcpad)); + + /* Connect to the state change signal of the decoupled element + in order to manage the state of this cothread. */ + priv->decoupled_signal_id = g_signal_connect (src_parent, + "state-change", (GCallback) decoupled_state_transition, + priv->decoupled_ct); + + set_cothread_state (priv->decoupled_ct, + gst_element_get_state (src_parent)); + } else { + g_array_append_val (ELEM_PRIVATE (src_parent)->chain_get_pads, srcpad); + } + } + + if (GST_RPAD_CHAINFUNC (sinkpad) != NULL) { + if (GST_FLAG_IS_SET (sink_parent, GST_ELEMENT_DECOUPLED)) { + /* Pad is decoupled. Create a separate cothread to run its chain + function. */ + priv->decoupled_ct = + gst_fair_scheduler_cothread_new (fsched->cothreads, + (GstFairSchedulerCtFunc) gst_fair_scheduler_decoupled_chain_wrapper, + sinkpad, NULL); +#ifndef GST_DISABLE_GST_DEBUG + g_string_printf (priv->decoupled_ct->readable_name, "%s:%s:chain", + GST_DEBUG_PAD_NAME (srcpad)); +#endif + GST_CAT_INFO_OBJECT (debug_fair_ct, fsched, + "cothread %p is chain for pad '%s:%s'", + priv->decoupled_ct, GST_DEBUG_PAD_NAME (sinkpad)); + + /* Connect to the state change signal of the decoupled element + in order to manage the state of this cothread. */ + priv->decoupled_signal_id = g_signal_connect (sink_parent, + "state-change", (GCallback) decoupled_state_transition, + priv->decoupled_ct); + + set_cothread_state (priv->decoupled_ct, + gst_element_get_state (sink_parent)); + } else { + g_array_append_val (ELEM_PRIVATE (sink_parent)->chain_get_pads, sinkpad); + } + } + + /* Set the data handlers. */ + GST_RPAD_GETHANDLER (srcpad) = gst_fair_scheduler_get_handler; + GST_RPAD_EVENTHANDLER (srcpad) = GST_RPAD_EVENTFUNC (srcpad); + + GST_RPAD_CHAINHANDLER (sinkpad) = gst_fair_scheduler_chain_handler; + GST_RPAD_EVENTHANDLER (sinkpad) = GST_RPAD_EVENTFUNC (sinkpad); + +#ifndef GST_DISABLE_GST_DEBUG + fsched->sources = g_list_prepend (fsched->sources, srcpad); +#endif +} + + +static void +array_remove_pad (GArray * array, GstPad * pad) +{ + int i; + + for (i = 0; i < array->len; i++) { + if (g_array_index (array, GstPad *, i) == pad) { + g_array_remove_index_fast (array, i); + break; + } + } +} + + +static void +gst_fair_scheduler_pad_unlink (GstScheduler * sched, GstPad * srcpad, + GstPad * sinkpad) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + GstFairSchedulerPrivLink *priv; + GstElement *src_parent, *sink_parent; + + priv = LINK_PRIVATE (srcpad); + g_return_if_fail (priv != NULL); + + GST_DEBUG_OBJECT (fsched, "unlinking pads '%s:%s' and '%s:%s'", + GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad)); + + src_parent = GST_PAD_PARENT (srcpad); + sink_parent = GST_PAD_PARENT (sinkpad); + + if (GST_RPAD_GETFUNC (srcpad) != NULL) { + if (GST_FLAG_IS_SET (src_parent, GST_ELEMENT_DECOUPLED)) { + gst_fair_scheduler_cothread_destroy (priv->decoupled_ct); + } else { + array_remove_pad (ELEM_PRIVATE (src_parent)->chain_get_pads, srcpad); + } + } + + if (GST_RPAD_CHAINFUNC (sinkpad) != NULL) { + if (GST_FLAG_IS_SET (sink_parent, GST_ELEMENT_DECOUPLED)) { + gst_fair_scheduler_cothread_destroy (priv->decoupled_ct); + } else { + array_remove_pad (ELEM_PRIVATE (sink_parent)->chain_get_pads, sinkpad); + } + } + + if (priv->decoupled_signal_id != 0) { + g_signal_handler_disconnect (sink_parent, priv->decoupled_signal_id); + } + if (priv->queue_blocked_signal_id != 0) { + g_signal_handler_disconnect (sink_parent, priv->queue_blocked_signal_id); + } + + if (priv->bufpen != NULL) { + gst_data_unref (priv->bufpen); + } + g_free (priv); + + GST_REAL_PAD (srcpad)->sched_private = NULL; + +#ifndef GST_DISABLE_GST_DEBUG + fsched->sources = g_list_remove (fsched->sources, srcpad); +#endif +} + + +static GstElementStateReturn +gst_fair_scheduler_state_transition (GstScheduler * sched, + GstElement * element, gint transition) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + gint old_state, new_state; + + GST_DEBUG_OBJECT (sched, "Element %s changing from %s to %s", + GST_ELEMENT_NAME (element), + gst_element_state_get_name (transition >> 8), + gst_element_state_get_name (transition & 0xff)); + + if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) { + return GST_STATE_SUCCESS; + } + + /* The parent element must be handled specially. */ + if (GST_IS_BIN (element)) { + if (GST_SCHEDULER_PARENT (sched) == element) { + switch (transition) { + case GST_STATE_PLAYING_TO_PAUSED: + GST_INFO_OBJECT (fsched, "setting scheduler state to stopped"); + GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_STOPPED; + break; + case GST_STATE_PAUSED_TO_PLAYING: + GST_INFO_OBJECT (fsched, "setting scheduler state to running"); + GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_RUNNING; + break; + } + } + return GST_STATE_SUCCESS; + } + + /* FIXME: Are there eny GStreamer macros for doing this? */ + old_state = transition >> 8; + new_state = transition & 0xff; + if (old_state < new_state) { + set_cothread_state (ELEM_PRIVATE (element)->elem_ct, transition & 0xff); + } + + return GST_STATE_SUCCESS; +} + + +static void +decoupled_state_transition (GstElement * element, gint old_state, + gint new_state, gpointer user_data) +{ + GstFairSchedulerCothread *ct = (GstFairSchedulerCothread *) user_data; + + /* This function is only responsible for activating the + cothread. The wrapper function itself does the deactivation. This + is necessary to avoid weird interactions between multiple + threads. */ + if (old_state < new_state) { + set_cothread_state (ct, new_state); + } +} + + +static void +gst_fair_scheduler_scheduling_change (GstScheduler * sched, + GstElement * element) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + + GST_WARNING_OBJECT (fsched, "operation not implemented"); +} + + +static gboolean +gst_fair_scheduler_yield (GstScheduler * sched, GstElement * element) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + + g_return_val_if_fail (fsched->in_element, FALSE); + + /* FIXME: What's the difference between yield and interrupt? */ + gst_fair_scheduler_cothread_yield (fsched->cothreads); + + return FALSE; +} + + +static gboolean +gst_fair_scheduler_interrupt (GstScheduler * sched, GstElement * element) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + + g_return_val_if_fail (fsched->in_element, FALSE); + + gst_fair_scheduler_cothread_yield (fsched->cothreads); + + return FALSE; +} + + +static void +gst_fair_scheduler_error (GstScheduler * sched, GstElement * element) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + + GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_STOPPED; + if (fsched->in_element) { + gst_fair_scheduler_cothread_yield (fsched->cothreads); + } +} + + +static gint +wait_entry_compare (const GstFairSchedulerWaitEntry * first, + const GstFairSchedulerWaitEntry * second) +{ + if (first->time < second->time) { + return -1; + } else if (first->time == second->time) { + return 0; + } else { + return 1; + } +} + + +static GstData * +gst_fair_scheduler_pad_select (GstScheduler * sched, + GstPad ** pulled_from, GstPad ** pads) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + + *pulled_from = gst_fair_scheduler_internal_select (fsched, pads); + g_return_val_if_fail (GST_PAD_IS_SINK (*pulled_from), NULL); + + return gst_pad_pull (*pulled_from); +} + + +static GstClockReturn +gst_fair_scheduler_clock_wait (GstScheduler * sched, GstElement * element, + GstClockID id, GstClockTimeDiff * jitter) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + GstClockEntry *clock_entry = (GstClockEntry *) id; + GstClockTime requested, now; + GstFairSchedulerWaitEntry *entry; + + g_return_val_if_fail (sched->current_clock != NULL, GST_CLOCK_ERROR); + g_return_val_if_fail (sched->current_clock == + GST_CLOCK_ENTRY_CLOCK (clock_entry), GST_CLOCK_ERROR); + + now = gst_clock_get_time (sched->current_clock); + requested = GST_CLOCK_ENTRY_TIME (clock_entry); + + if (requested <= now) { + /* It is already too late. */ + if (jitter) { + *jitter = now - requested; + } + return GST_CLOCK_EARLY; + } + + /* Insert a wait entry. */ + entry = g_malloc (sizeof (GstFairSchedulerWaitEntry)); + entry->ct = gst_fair_scheduler_cothread_current (fsched->cothreads); + entry->time = requested; + fsched->waiting = g_slist_insert_sorted (fsched->waiting, entry, + (GCompareFunc) wait_entry_compare); + + /* Go to sleep until it is time... */ + gst_fair_scheduler_cothread_sleep (fsched->cothreads); + + if (jitter) { + now = gst_clock_get_time (sched->current_clock); + *jitter = now - requested; + } + + /* FIXME: Is this the right value to return? */ + return GST_CLOCK_EARLY; +} + + +static GstSchedulerState +gst_fair_scheduler_iterate (GstScheduler * sched) +{ + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + GstFairSchedulerWaitEntry *entry; + GSList *activate = NULL, *node; + GstClockTime now; + gboolean res; + + /* Count a new iteration for the stats. */ + ++fsched->iter_count; + + /* Check for waiting cothreads. */ + if (fsched->waiting != NULL && sched->current_clock != NULL) { + now = gst_clock_get_time (sched->current_clock); + + /* We need to activate all cothreads whose waiting time was + already reached by the clock. The following code makes sure + that the cothread with the earlier waiting time will be + scheduled first. */ + + /* Move all ready cothreads to the activate list. */ + while (fsched->waiting != NULL) { + entry = (GstFairSchedulerWaitEntry *) fsched->waiting->data; + + if (entry->time > now) { + break; + } + + /* Extract a node from the begining of the waiting + list. */ + node = fsched->waiting; + fsched->waiting = fsched->waiting->next; + + /* Add it to the beginning of the activate list. */ + node->next = activate; + activate = node; + } + + /* Activate the threads in the activate list. */ + while (activate != NULL) { + entry = (GstFairSchedulerWaitEntry *) activate->data; + gst_fair_scheduler_cothread_awake (entry->ct, 1); + activate = g_slist_delete_link (activate, activate); + g_free (entry); + } + } + + /* Handle control to the next cothread. */ + fsched->in_element = TRUE; + res = gst_fair_scheduler_cothread_queue_iterate (fsched->cothreads); + fsched->in_element = FALSE; + + return res ? GST_SCHEDULER_STATE_RUNNING : GST_SCHEDULER_STATE_STOPPED; +} + + +static void +gst_fair_scheduler_show (GstScheduler * sched) +{ +#ifndef GST_DISABLE_GST_DEBUG + GstFairScheduler *fsched = GST_FAIR_SCHEDULER (sched); + GstElement *element; + GstPad *pad; + GstFairSchedulerPrivLink *link_priv; + GstFairSchedulerWaitEntry *entry; + GList *iter1; + GSList *iter2; + GList *iterpads; + + g_print ("Fair scheduler at %p:\n", fsched); + + g_print ("\n Registered elements:\n"); + + for (iter1 = fsched->elements; iter1 != NULL; iter1 = iter1->next) { + element = GST_ELEMENT (iter1->data); + + g_print ("\n %p: %s (%s)\n", element, GST_ELEMENT_NAME (element), + g_type_name (G_OBJECT_TYPE (element))); + + if (GST_IS_BIN (element)) { + continue; + } + + for (iterpads = GST_ELEMENT_PADS (element); iterpads != NULL; + iterpads = iterpads->next) { + pad = GST_PAD (iterpads->data); + + if (GST_IS_GHOST_PAD (pad)) { + continue; + } + + if (GST_PAD_IS_SINK (pad)) { + g_print (" Sink "); + } else { + g_print (" Source "); + } + + g_print ("'%s'", GST_PAD_NAME (pad)); + + link_priv = LINK_PRIVATE (pad); + + if (link_priv == NULL) { + g_print (", unlinked"); + } else { + if (link_priv->bufpen != NULL) { + g_print (", buffer in bufpen"); + } + if (link_priv->waiting_writer != NULL) { + g_print (", waiting writer '%s'", + link_priv->waiting_writer->readable_name->str); + } + if (link_priv->waiting_reader != NULL) { + g_print (", waiting reader '%s'", + link_priv->waiting_reader->readable_name->str); + } + if (link_priv->waiting_for_queue != NULL) { + g_print (", waiting for queue '%s'", + link_priv->waiting_for_queue->readable_name->str); + } + } + + g_print ("\n"); + } + } + + gst_fair_scheduler_cothread_queue_show (fsched->cothreads); + + g_print ("\n Waiting cothreads (current time %" GST_TIME_FORMAT "):\n", + GST_TIME_ARGS (gst_clock_get_time (sched->current_clock))); + + for (iter2 = fsched->waiting; iter2 != NULL; iter2 = iter2->next) { + entry = (GstFairSchedulerWaitEntry *) iter2->data; + g_print (" %p: %s (%d), time = %" GST_TIME_FORMAT "\n", entry->ct, + entry->ct->readable_name->str, entry->ct->pid, + GST_TIME_ARGS (entry->time)); + } +#else + g_print ("Sorry, the 'show' method only works when " + "debugging is activated."); +#endif +} + + +/* + * Plugin Initialization + */ +static gboolean +plugin_init (GstPlugin * plugin) +{ + GstSchedulerFactory *factory; + + GST_DEBUG_CATEGORY_INIT (debug_fair, "fair", 0, "fair scheduler"); + GST_DEBUG_CATEGORY_INIT (debug_fair_ct, "fairct", 0, + "fair scheduler cothreads"); + GST_DEBUG_CATEGORY_INIT (debug_fair_queues, "fairqueues", 0, + "fair scheduler queue related optimizations"); + + factory = gst_scheduler_factory_new ("fair" COTHREADS_NAME, + "A fair scheduler based on " COTHREADS_NAME " cothreads", + gst_fair_scheduler_get_type ()); + + if (factory != NULL) { + gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory)); + } else { + g_warning ("could not register scheduler: fair"); + } + return TRUE; +} + + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "gstfair" COTHREADS_NAME "scheduler", + "A 'fair' type scheduler based on " COTHREADS_NAME " cothreads", + plugin_init, VERSION, GST_LICENSE, GST_PACKAGE, GST_ORIGIN); diff --git a/gst/schedulers/gthread-cothreads.h b/gst/schedulers/gthread-cothreads.h index eeff012724..8051cfa509 100644 --- a/gst/schedulers/gthread-cothreads.h +++ b/gst/schedulers/gthread-cothreads.h @@ -20,6 +20,9 @@ * Boston, MA 02111-1307, USA. */ +#ifndef __GTHREAD_COTHREADS_H__ +#define __GTHREAD_COTHREADS_H__ + #include #include @@ -60,6 +63,8 @@ struct _cothread { cothread_context * context; }; +#ifndef GTHREAD_COTHREADS_NO_DEFINITIONS + /* define functions * Functions starting with "do_" are used by the scheduler. */ @@ -210,3 +215,7 @@ do_cothread_destroy (cothread *thread) #define do_cothread_get_current(context) ((context)->current) #define do_cothread_get_main(context) ((context)->main) + +#endif /* GTHREAD_COTHREADS_NO_DEFINITIONS */ + +#endif /* __GTHREAD_COTHREADS_H__ */