- Reworked the clock to prepare for async notifications

Original commit message from CVS:
- Reworked the clock to prepare for async notifications
- moved some common scheduler checking to gstbin
- added some vmethods to gstbin for future use
- more fixes to the optimal scheduler
- use new clock api in the schedulers
This commit is contained in:
Wim Taymans 2002-11-02 13:54:34 +00:00
parent 83380aa595
commit 91b824a34c
9 changed files with 671 additions and 402 deletions

View file

@ -139,9 +139,6 @@ gst_bin_init (GstBin * bin)
bin->post_iterate_func = NULL;
bin->pre_iterate_private = NULL;
bin->post_iterate_private = NULL;
bin->iterate_mutex = g_mutex_new ();
bin->iterate_cond = g_cond_new ();
}
/**
@ -218,11 +215,6 @@ gst_bin_set_element_sched (GstElement *element, GstScheduler *sched)
GList *children;
GstElement *child;
g_return_if_fail (element != NULL);
g_return_if_fail (GST_IS_ELEMENT (element));
g_return_if_fail (sched != NULL);
g_return_if_fail (GST_IS_SCHEDULER (sched));
GST_INFO (GST_CAT_SCHEDULING, "setting element \"%s\" sched to %p", GST_ELEMENT_NAME (element),
sched);
@ -249,7 +241,34 @@ gst_bin_set_element_sched (GstElement *element, GstScheduler *sched)
}
/* otherwise, if it's just a regular old element */
else {
GList *pads;
gst_scheduler_add_element (sched, element);
/* set the sched pointer in all the pads */
pads = element->pads;
while (pads) {
GstPad *pad;
pad = GST_PAD (pads->data);
pads = g_list_next (pads);
/* we only operate on real pads */
if (!GST_IS_REAL_PAD (pad))
continue;
/* if the peer element exists and is a candidate */
if (GST_PAD_PEER (pad)) {
if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
GST_INFO (GST_CAT_SCHEDULING, "peer is in same scheduler, telling scheduler");
if (GST_PAD_IS_SRC (pad))
gst_scheduler_pad_connect (sched, pad, GST_PAD_PEER (pad));
else
gst_scheduler_pad_connect (sched, GST_PAD_PEER (pad), pad);
}
}
}
}
}
@ -260,9 +279,6 @@ gst_bin_unset_element_sched (GstElement *element, GstScheduler *sched)
GList *children;
GstElement *child;
g_return_if_fail (element != NULL);
g_return_if_fail (GST_IS_ELEMENT (element));
if (GST_ELEMENT_SCHED (element) == NULL) {
GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" has no scheduler",
GST_ELEMENT_NAME (element));
@ -296,6 +312,32 @@ gst_bin_unset_element_sched (GstElement *element, GstScheduler *sched)
}
/* otherwise, if it's just a regular old element */
else {
GList *pads;
/* set the sched pointer in all the pads */
pads = element->pads;
while (pads) {
GstPad *pad;
pad = GST_PAD (pads->data);
pads = g_list_next (pads);
/* we only operate on real pads */
if (!GST_IS_REAL_PAD (pad))
continue;
/* if the peer element exists and is a candidate */
if (GST_PAD_PEER (pad)) {
if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
GST_INFO (GST_CAT_SCHEDULING, "peer is in same scheduler, telling scheduler");
if (GST_PAD_IS_SRC (pad))
gst_scheduler_pad_disconnect (sched, pad, GST_PAD_PEER (pad));
else
gst_scheduler_pad_disconnect (sched, GST_PAD_PEER (pad), pad);
}
}
}
gst_scheduler_remove_element (GST_ELEMENT_SCHED (element), element);
}
}
@ -489,6 +531,11 @@ gst_bin_child_state_change (GstBin *bin, GstElementState oldstate, GstElementSta
GST_STATE_PENDING (bin) = state;
GST_UNLOCK (bin);
gst_bin_change_state_norecurse (bin);
if (state != GST_STATE (bin)) {
g_warning ("%s: state change in cllback %d %d",
GST_ELEMENT_NAME (bin),
state, GST_STATE (bin));
}
return;
}
break;

View file

@ -74,9 +74,6 @@ struct _GstBin {
gint numchildren;
GList *children;
GMutex *iterate_mutex;
GCond *iterate_cond;
GstElementState child_states[GST_NUM_STATES];
gpointer sched_private;
@ -90,12 +87,16 @@ struct _GstBin {
struct _GstBinClass {
GstElementClass parent_class;
/* vtable */
void (*add_element) (GstBin *bin, GstElement);
void (*remove_element) (GstBin *bin, GstElement);
/* run a full iteration of operation */
gboolean (*iterate) (GstBin *bin);
/* signals */
void (*object_added) (GstObject *object, GstObject *child);
void (*object_removed) (GstObject *object, GstObject *child);
/* run a full iteration of operation */
gboolean (*iterate) (GstBin *bin);
};
GType gst_bin_get_type (void);

View file

@ -29,63 +29,248 @@
#include "gstlog.h"
#include "gstmemchunk.h"
enum {
ARG_0,
ARG_STATS,
};
#define CLASS(clock) GST_CLOCK_CLASS (G_OBJECT_GET_CLASS (clock))
static GstMemChunk *_gst_clock_entries_chunk;
static void gst_clock_class_init (GstClockClass *klass);
static void gst_clock_init (GstClock *clock);
static void gst_clock_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
static void gst_clock_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec * pspec);
static void gst_clock_update_stats (GstClock *clock);
static GstObjectClass *parent_class = NULL;
/* static guint gst_clock_signals[LAST_SIGNAL] = { 0 }; */
typedef struct _GstClockEntry GstClockEntry;
static GMutex *_gst_clock_mutex;
static GCond *_gst_clock_cond;
static void gst_clock_free_entry (GstClock *clock, GstClockEntry *entry);
typedef enum {
GST_ENTRY_OK,
GST_ENTRY_RESTART,
} GstEntryStatus;
struct _GstClockEntry {
GstClockTime time;
GstEntryStatus status;
GstClockCallback func;
gpointer user_data;
};
#define GST_CLOCK_ENTRY(entry) ((GstClockEntry *)(entry))
#define GST_CLOCK_ENTRY_TIME(entry) (((GstClockEntry *)(entry))->time)
static GstClockEntry*
gst_clock_entry_new (GstClockTime time,
GstClockCallback func, gpointer user_data)
static inline GstClockID
gst_clock_entry_new (GstClock *clock, GstClockTime time,
GstClockTime interval, GstClockEntryType type)
{
GstClockEntry *entry;
entry = gst_mem_chunk_alloc (_gst_clock_entries_chunk);
entry->clock = clock;
entry->time = time;
entry->func = func;
entry->user_data = user_data;
entry->interval = time;
entry->type = type;
entry->status = GST_CLOCK_ENTRY_OK;
return entry;
return (GstClockID) entry;
}
/*
static gint
clock_compare_func (gconstpointer a,
gconstpointer b)
/**
* gst_clock_new_single_shot_id
* @clock: The clockid to get a single shot notification from
* @time: the requested time
*
* Get an ID from the given clock to trigger a single shot
* notification at the requested time.
*
* Returns: An id that can be used to request the time notification.
*/
GstClockID
gst_clock_new_single_shot_id (GstClock *clock, GstClockTime time)
{
GstClockEntry *entry1 = (GstClockEntry *)a;
GstClockEntry *entry2 = (GstClockEntry *)b;
return (entry1->time - entry2->time);
return gst_clock_entry_new (clock,
time,
GST_CLOCK_TIME_NONE,
GST_CLOCK_ENTRY_SINGLE);
}
*/
/**
* gst_clock_new_periodic__id
* @clock: The clockid to get a periodic notification id from
* @start_time: the requested start time
* @interval: the requested interval
*
* Get an ID from the given clock to trigger a periodic notification.
* The periodeic notifications will be start at time start_time and
* will then be fired with the given interval.
*
* Returns: An id that can be used to request the time notification.
*/
GstClockID
gst_clock_new_periodic_id (GstClock *clock, GstClockTime start_time,
GstClockTime interval)
{
return gst_clock_entry_new (clock,
start_time,
interval,
GST_CLOCK_ENTRY_PERIODIC);
}
/**
* gst_clock_id_get_time
* @id: The clockid to query
*
* Get the time of the clock ID
*
* Returns: the time of the given clock id
*/
GstClockTime
gst_clock_id_get_time (GstClockID id)
{
g_return_val_if_fail (id != NULL, GST_CLOCK_TIME_NONE);
return GST_CLOCK_ENTRY_TIME ((GstClockEntry *)id);
}
/**
* gst_clock_id_wait
* @id: The clockid to wait on
* @jitter: A pointer that will contain the jitter
*
* Perform a blocking wait on the given ID. The jitter arg can be
* NULL
*
* Returns: the result of the blocking wait.
*/
GstClockReturn
gst_clock_id_wait (GstClockID id, GstClockTimeDiff *jitter)
{
GstClockEntry *entry;
GstClock *clock;
GstClockReturn res = GST_CLOCK_UNSUPPORTED;
GstClockTime requested;
g_return_val_if_fail (id != NULL, GST_CLOCK_ERROR);
entry = (GstClockEntry *) id;
clock = GST_CLOCK_ENTRY_CLOCK (entry);
requested = GST_CLOCK_ENTRY_TIME (entry);
if (CLASS (clock)->wait) {
GstClockTime now;
do {
res = CLASS (clock)->wait (clock, entry);
}
while (res == GST_CLOCK_ENTRY_RESTART);
if (jitter) {
now = gst_clock_get_time (clock);
*jitter = now - requested;
}
if (clock->stats) {
gst_clock_update_stats (clock);
}
}
if (entry->type == GST_CLOCK_ENTRY_SINGLE) {
gst_clock_id_free (id);
}
return res;
}
/**
* gst_clock_wait_async
* @clock: a #GstClock to wait on
* @time: The #GstClockTime to wait for
* @func: The callback function
* @user_data: User data passed in the calback
*
* Register a callback on the given clock that will be triggered
* when the clock has reached the given time. A ClockID is returned
* that can be used to cancel the request.
*
* Returns: the clock id or NULL when async notification is not supported.
*/
GstClockReturn
gst_clock_id_wait_async (GstClockID id,
GstClockCallback func, gpointer user_data)
{
GstClockEntry *entry;
GstClock *clock;
GstClockReturn res = GST_CLOCK_UNSUPPORTED;
g_return_val_if_fail (id != NULL, GST_CLOCK_ERROR);
entry = (GstClockEntry *) id;
clock = entry->clock;
if (CLASS (clock)->wait_async) {
res = CLASS (clock)->wait_async (clock, entry, func, user_data);
}
return res;
}
/**
* gst_clock_remove_id
* @clock: The clock to cancel the request on
* @id: The id to cancel
*
* Cancel an outstanding async notification request with the given ID.
* This can be an ID generated with gst_clock_wait_async() or
* gst_clock_notify_async().
*/
void
gst_clock_id_unschedule (GstClockID id)
{
GstClockEntry *entry;
GstClock *clock;
g_return_if_fail (id != NULL);
entry = (GstClockEntry *) id;
clock = entry->clock;
if (CLASS (clock)->unschedule)
CLASS (clock)->unschedule (clock, entry);
}
/**
* gst_clock_id_free
* @id: The clockid to free
*
* Free the resources held by the given id
*/
void
gst_clock_id_free (GstClockID id)
{
gst_mem_chunk_free (_gst_clock_entries_chunk, id);
}
/**
* gst_clock_unlock_id
* @id: The clockid to unlock
*
* Unlock the givan ClockID.
*/
void
gst_clock_id_unlock (GstClockID id)
{
GstClockEntry *entry;
GstClock *clock;
g_return_if_fail (id != NULL);
entry = (GstClockEntry *) id;
clock = entry->clock;
if (CLASS (clock)->unlock)
CLASS (clock)->unlock (clock, entry);
}
/**
* GstClock abstract base class implementation
*/
GType
gst_clock_get_type (void)
{
@ -127,6 +312,16 @@ gst_clock_class_init (GstClockClass *klass)
_gst_clock_entries_chunk = gst_mem_chunk_new ("GstClockEntries",
sizeof (GstClockEntry), sizeof (GstClockEntry) * 32,
G_ALLOC_AND_FREE);
_gst_clock_mutex = g_mutex_new ();
_gst_clock_cond = g_cond_new ();
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_clock_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_clock_get_property);
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_STATS,
g_param_spec_boolean ("stats", "Stats", "Enable clock stats",
FALSE, G_PARAM_READWRITE));
}
static void
@ -137,28 +332,13 @@ gst_clock_init (GstClock *clock)
clock->start_time = 0;
clock->last_time = 0;
clock->entries = NULL;
clock->async_supported = FALSE;
clock->flags = 0;
clock->stats = FALSE;
clock->active_mutex = g_mutex_new ();
clock->active_cond = g_cond_new ();
}
/**
* gst_clock_async_supported
* @clock: a #GstClock to query
*
* Checks if this clock can support asynchronous notification.
*
* Returns: TRUE if async notification is supported.
*/
gboolean
gst_clock_async_supported (GstClock *clock)
{
g_return_val_if_fail (GST_IS_CLOCK (clock), FALSE);
return clock->async_supported;
}
/**
* gst_clock_set_speed
* @clock: a #GstClock to modify
@ -166,13 +346,18 @@ gst_clock_async_supported (GstClock *clock)
*
* Sets the speed on the given clock. 1.0 is the default
* speed.
*
* Returns: the new speed of the clock.
*/
void
gdouble
gst_clock_set_speed (GstClock *clock, gdouble speed)
{
g_return_if_fail (GST_IS_CLOCK (clock));
g_return_val_if_fail (GST_IS_CLOCK (clock), 0.0);
clock->speed = speed;
if (CLASS (clock)->change_speed)
clock->speed = CLASS (clock)->change_speed (clock, clock->speed, speed);
return clock->speed;
}
/**
@ -191,29 +376,43 @@ gst_clock_get_speed (GstClock *clock)
return clock->speed;
}
/**
* gst_clock_set_resolution
* @clock: The clock set the resolution on
* @resolution: The resolution to set
*
* Set the accuracy of the clock.
*
* Returns: the new resolution of the clock.
*/
guint64
gst_clock_set_resolution (GstClock *clock, guint64 resolution)
{
g_return_val_if_fail (GST_IS_CLOCK (clock), 0LL);
if (CLASS (clock)->change_resolution)
clock->resolution = CLASS (clock)->change_resolution (clock, clock->resolution, resolution);
return clock->resolution;
}
/**
* gst_clock_reset
* @clock: a #GstClock to reset
* gst_clock_get_resolution
* @clock: The clock get the resolution of
*
* Reset the clock to time 0.
* Get the accuracy of the clock.
*
* Returns: the resolution of the clock in microseconds.
*/
void
gst_clock_reset (GstClock *clock)
guint64
gst_clock_get_resolution (GstClock *clock)
{
GstClockTime time = 0LL;
g_return_val_if_fail (GST_IS_CLOCK (clock), 0LL);
g_return_if_fail (GST_IS_CLOCK (clock));
if (CLASS (clock)->get_resolution)
return CLASS (clock)->get_resolution (clock);
if (CLASS (clock)->get_internal_time) {
time = CLASS (clock)->get_internal_time (clock);
}
GST_LOCK (clock);
clock->active = FALSE;
clock->start_time = time;
clock->last_time = 0LL;
GST_UNLOCK (clock);
return 1LL;
}
/**
@ -269,6 +468,30 @@ gst_clock_is_active (GstClock *clock)
return clock->active;
}
/**
* gst_clock_reset
* @clock: a #GstClock to reset
*
* Reset the clock to time 0.
*/
void
gst_clock_reset (GstClock *clock)
{
GstClockTime time = 0LL;
g_return_if_fail (GST_IS_CLOCK (clock));
if (CLASS (clock)->get_internal_time) {
time = CLASS (clock)->get_internal_time (clock);
}
GST_LOCK (clock);
clock->active = FALSE;
clock->start_time = time;
clock->last_time = 0LL;
GST_UNLOCK (clock);
}
/**
* gst_clock_handle_discont
* @clock: a #GstClock to notify of the discontinuity
@ -349,181 +572,6 @@ gst_clock_get_time (GstClock *clock)
return ret;
}
static GstClockID
gst_clock_wait_async_func (GstClock *clock, GstClockTime time,
GstClockCallback func, gpointer user_data)
{
GstClockEntry *entry = NULL;
g_return_val_if_fail (GST_IS_CLOCK (clock), NULL);
if (!clock->active) {
GST_DEBUG (GST_CAT_CLOCK, "blocking on clock");
g_mutex_lock (clock->active_mutex);
g_cond_wait (clock->active_cond, clock->active_mutex);
g_mutex_unlock (clock->active_mutex);
}
entry = gst_clock_entry_new (time, func, user_data);
return entry;
}
/**
* gst_clock_wait
* @clock: a #GstClock to wait on
* @time: The #GstClockTime to wait for
* @jitter: The jitter
*
* Wait and block till the clock reaches the specified time.
* The jitter value contains the difference between the requested time and
* the actual time, negative values indicate that the requested time
* was allready passed when this call was made.
*
* Returns: the #GstClockReturn result of the operation.
*/
GstClockReturn
gst_clock_wait (GstClock *clock, GstClockTime time, GstClockTimeDiff *jitter)
{
GstClockID id;
GstClockReturn res;
g_return_val_if_fail (GST_IS_CLOCK (clock), GST_CLOCK_STOPPED);
id = gst_clock_wait_async_func (clock, time, NULL, NULL);
res = gst_clock_wait_id (clock, id, jitter);
return res;
}
/**
* gst_clock_wait_async
* @clock: a #GstClock to wait on
* @time: The #GstClockTime to wait for
* @func: The callback function
* @user_data: User data passed in the calback
*
* Register a callback on the given clock that will be triggered
* when the clock has reached the given time. A ClockID is returned
* that can be used to cancel the request.
*
* Returns: the clock id or NULL when async notification is not supported.
*/
GstClockID
gst_clock_wait_async (GstClock *clock, GstClockTime time,
GstClockCallback func, gpointer user_data)
{
g_return_val_if_fail (GST_IS_CLOCK (clock), NULL);
if (clock->async_supported) {
return gst_clock_wait_async_func (clock, time, func, user_data);
}
return NULL;
}
/**
* gst_clock_cancel_wait_async
* @clock: The clock to cancel the request on
* @id: The id to cancel
*
* Cancel an outstanding async notification request with the given ID.
*/
void
gst_clock_cancel_wait_async (GstClock *clock, GstClockID id)
{
g_warning ("not supported");
}
/**
* gst_clock_notify_async
* @clock: The clock to wait on
* @interval: The interval between notifications
* @func: The callback function
* @user_data: User data passed in the calback
*
* Register a callback on the given clock that will be periodically
* triggered with the specified interval. A ClockID is returned
* that can be used to cancel the request.
*
* Returns: the clock id or NULL when async notification is not supported.
*/
GstClockID
gst_clock_notify_async (GstClock *clock, GstClockTime interval,
GstClockCallback func, gpointer user_data)
{
g_warning ("not supported");
return NULL;
}
/**
* gst_clock_remove_notify_async
* @clock: The clock to cancel the request on
* @id: The id to cancel
*
* Cancel an outstanding async notification request with the given ID.
*/
void
gst_clock_remove_notify_async (GstClock *clock, GstClockID id)
{
g_warning ("not supported");
}
static void
gst_clock_unlock_func (GstClock *clock, GstClockTime time, GstClockID id, gpointer user_data)
{
}
/**
* gst_clock_wait_id
* @clock: The clock to wait on
* @id: The clock id to wait on
* @jitter: The jitter
*
* Wait and block on the clockid obtained with gst_clock_wait_async.
* The jitter value is described in gst_clock_wait().
*
* Returns: result of the operation.
*/
GstClockReturn
gst_clock_wait_id (GstClock *clock, GstClockID id, GstClockTimeDiff *jitter)
{
GstClockReturn res = GST_CLOCK_TIMEOUT;
GstClockEntry *entry = (GstClockEntry *) id;
GstClockTime current, target;
GstClockTimeDiff this_jitter;
g_return_val_if_fail (GST_IS_CLOCK (clock), GST_CLOCK_ERROR);
g_return_val_if_fail (entry, GST_CLOCK_ERROR);
current = gst_clock_get_time (clock);
entry->func = gst_clock_unlock_func;
target = GST_CLOCK_ENTRY_TIME (entry) - current;
GST_DEBUG (GST_CAT_CLOCK, "real_target %llu, target %llu, now %llu",
target, GST_CLOCK_ENTRY_TIME (entry), current);
if (((gint64)target) > 0) {
struct timeval tv;
GST_TIME_TO_TIMEVAL (target, tv);
select (0, NULL, NULL, NULL, &tv);
current = gst_clock_get_time (clock);
this_jitter = current - GST_CLOCK_ENTRY_TIME (entry);
}
else {
res = GST_CLOCK_EARLY;
this_jitter = target;
}
if (jitter)
*jitter = this_jitter;
gst_clock_free_entry (clock, entry);
return res;
}
/**
* gst_clock_get_next_id
* @clock: The clock to query
@ -545,76 +593,43 @@ gst_clock_get_next_id (GstClock *clock)
return (GstClockID *) entry;
}
/**
* gst_clock_id_get_time
* @id: The clockid to query
*
* Get the time of the clock ID
*
* Returns: the time of the given clock id
*/
GstClockTime
gst_clock_id_get_time (GstClockID id)
static void
gst_clock_update_stats (GstClock *clock)
{
return GST_CLOCK_ENTRY_TIME (id);
}
static void
gst_clock_free_entry (GstClock *clock, GstClockEntry *entry)
gst_clock_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec)
{
gst_mem_chunk_free (_gst_clock_entries_chunk, entry);
GstClock *clock;
clock = GST_CLOCK (object);
switch (prop_id) {
case ARG_STATS:
clock->stats = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/**
* gst_clock_unlock_id
* @clock: The clock that own the id
* @id: The clockid to unlock
*
* Unlock the ClockID.
*/
void
gst_clock_unlock_id (GstClock *clock, GstClockID id)
static void
gst_clock_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec * pspec)
{
GstClockEntry *entry = (GstClockEntry *) id;
GstClock *clock;
if (entry->func)
entry->func (clock, gst_clock_get_time (clock), id, entry->user_data);
clock = GST_CLOCK (object);
gst_clock_free_entry (clock, entry);
switch (prop_id) {
case ARG_STATS:
g_value_set_boolean (value, clock->stats);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/**
* gst_clock_set_resolution
* @clock: The clock set the resolution on
* @resolution: The resolution to set
*
* Set the accuracy of the clock.
*/
void
gst_clock_set_resolution (GstClock *clock, guint64 resolution)
{
g_return_if_fail (GST_IS_CLOCK (clock));
if (CLASS (clock)->set_resolution)
CLASS (clock)->set_resolution (clock, resolution);
}
/**
* gst_clock_get_resolution
* @clock: The clock get the resolution of
*
* Get the accuracy of the clock.
*
* Returns: the resolution of the clock in microseconds.
*/
guint64
gst_clock_get_resolution (GstClock *clock)
{
g_return_val_if_fail (GST_IS_CLOCK (clock), 0LL);
if (CLASS (clock)->get_resolution)
return CLASS (clock)->get_resolution (clock);
return 1LL;
}

View file

@ -20,7 +20,6 @@
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_CLOCK_H__
#define __GST_CLOCK_H__
@ -45,88 +44,156 @@ typedef gpointer GstClockID;
#define GST_CLOCK_TIME_NONE ((guint64)-1)
#define GST_SECOND ((guint64)G_USEC_PER_SEC * 1000LL)
#define GST_MSECOND ((guint64)GST_SECOND/1000LL)
#define GST_USECOND ((guint64)GST_SECOND/1000000LL)
#define GST_NSECOND ((guint64)GST_SECOND/1000000000LL)
#define GST_SECOND ((guint64) G_USEC_PER_SEC * 1000LL)
#define GST_MSECOND ((guint64) GST_SECOND / 1000LL)
#define GST_USECOND ((guint64) GST_SECOND / 1000000LL)
#define GST_NSECOND ((guint64) GST_SECOND / 1000000000LL)
#define GST_CLOCK_DIFF(s, e) (GstClockTimeDiff)((s)-(e))
#define GST_CLOCK_DIFF(s, e) (GstClockTimeDiff)((s) - (e))
#define GST_TIMEVAL_TO_TIME(tv) ((tv).tv_sec * GST_SECOND + (tv).tv_usec * GST_USECOND)
#define GST_TIME_TO_TIMEVAL(t,tv) \
G_STMT_START { \
(tv).tv_sec = (t) / GST_SECOND; \
(tv).tv_sec = (t) / GST_SECOND; \
(tv).tv_usec = ((t) / GST_USECOND) % GST_MSECOND; \
} G_STMT_END
typedef struct _GstClockEntry GstClockEntry;
typedef struct _GstClock GstClock;
typedef struct _GstClockClass GstClockClass;
typedef void (*GstClockCallback) (GstClock *clock, GstClockTime time, GstClockID id, gpointer user_data);
typedef gboolean (*GstClockCallback) (GstClock *clock, GstClockTime time, GstClockID id, gpointer user_data);
typedef enum {
/*< protected >*/
GST_CLOCK_ENTRY_OK,
GST_CLOCK_ENTRY_EARLY,
GST_CLOCK_ENTRY_RESTART,
} GstClockEntryStatus;
typedef enum {
/*< protected >*/
GST_CLOCK_ENTRY_SINGLE,
GST_CLOCK_ENTRY_PERIODIC,
} GstClockEntryType;
#define GST_CLOCK_ENTRY(entry) ((GstClockEntry *)(entry))
#define GST_CLOCK_ENTRY_CLOCK(entry) ((entry)->clock)
#define GST_CLOCK_ENTRY_TYPE(entry) ((entry)->type)
#define GST_CLOCK_ENTRY_TIME(entry) ((entry)->time)
#define GST_CLOCK_ENTRY_INTERVAL(entry) ((entry)->interval)
#define GST_CLOCK_ENTRY_STATUS(entry) ((entry)->status)
struct _GstClockEntry {
/*< protected >*/
GstClock *clock;
GstClockEntryType type;
GstClockTime time;
GstClockTime interval;
GstClockEntryStatus status;
GstClockCallback func;
gpointer user_data;
};
typedef enum
{
GST_CLOCK_STOPPED = 0,
GST_CLOCK_TIMEOUT = 1,
GST_CLOCK_EARLY = 2,
GST_CLOCK_ERROR = 3
GST_CLOCK_ERROR = 3,
GST_CLOCK_UNSUPPORTED = 4
} GstClockReturn;
typedef enum
{
GST_CLOCK_FLAG_CAN_DO_SINGLE_SYNC = (1 << 1),
GST_CLOCK_FLAG_CAN_DO_SINGLE_ASYNC = (1 << 2),
GST_CLOCK_FLAG_CAN_DO_PERIODIC_SYNC = (1 << 3),
GST_CLOCK_FLAG_CAN_DO_PERIODIC_ASYNC = (1 << 4),
GST_CLOCK_FLAG_CAN_SET_RESOLUTION = (1 << 5),
GST_CLOCK_FLAG_CAN_SET_SPEED = (1 << 6),
} GstClockFlags;
#define GST_CLOCK_FLAGS(clock) (GST_CLOCK(clock)->flags)
struct _GstClock {
GstObject object;
GstClockFlags flags;
/*< protected >*/
GstClockTime start_time;
GstClockTime last_time;
/*< private >*/
gboolean accept_discont;
gdouble speed;
guint64 resolution;
gboolean active;
GList *entries;
gboolean async_supported;
GMutex *active_mutex;
GCond *active_cond;
gboolean stats;
};
struct _GstClockClass {
GstObjectClass parent_class;
/* vtable */
gdouble (*change_speed) (GstClock *clock,
gdouble oldspeed, gdouble newspeed);
gdouble (*get_speed) (GstClock *clock);
guint64 (*change_resolution) (GstClock *clock, guint64 old_resolution,
guint64 new_resolution);
guint64 (*get_resolution) (GstClock *clock);
GstClockTime (*get_internal_time) (GstClock *clock);
void (*set_resolution) (GstClock *clock, guint64 resolution);
guint64 (*get_resolution) (GstClock *clock);
/* waiting on an ID */
GstClockEntryStatus (*wait) (GstClock *clock, GstClockEntry *entry);
GstClockEntryStatus (*wait_async) (GstClock *clock, GstClockEntry *entry,
GstClockCallback func, gpointer user_data);
void (*unschedule) (GstClock *clock, GstClockEntry *entry);
void (*unlock) (GstClock *clock, GstClockEntry *entry);
/* signals */
void (*object_sync) (GstClock *clock, GstObject *object,
GstClockID id);
};
GType gst_clock_get_type (void);
void gst_clock_set_speed (GstClock *clock, gdouble speed);
gdouble gst_clock_set_speed (GstClock *clock, gdouble speed);
gdouble gst_clock_get_speed (GstClock *clock);
guint64 gst_clock_set_resolution (GstClock *clock, guint64 resolution);
guint64 gst_clock_get_resolution (GstClock *clock);
void gst_clock_set_active (GstClock *clock, gboolean active);
gboolean gst_clock_is_active (GstClock *clock);
void gst_clock_reset (GstClock *clock);
gboolean gst_clock_handle_discont (GstClock *clock, guint64 time);
gboolean gst_clock_async_supported (GstClock *clock);
GstClockTime gst_clock_get_time (GstClock *clock);
GstClockReturn gst_clock_wait (GstClock *clock, GstClockTime time, GstClockTimeDiff *jitter);
GstClockID gst_clock_wait_async (GstClock *clock, GstClockTime time,
GstClockCallback func, gpointer user_data);
void gst_clock_cancel_wait_async (GstClock *clock, GstClockID id);
GstClockID gst_clock_notify_async (GstClock *clock, GstClockTime interval,
GstClockCallback func, gpointer user_data);
void gst_clock_remove_notify_async (GstClock *clock, GstClockID id);
GstClockReturn gst_clock_wait_id (GstClock *clock, GstClockID id, GstClockTimeDiff *jitter);
GstClockID gst_clock_get_next_id (GstClock *clock);
void gst_clock_unlock_id (GstClock *clock, GstClockID id);
/* creating IDs that can be used to get notifications */
GstClockID gst_clock_new_single_shot_id (GstClock *clock,
GstClockTime time);
GstClockID gst_clock_new_periodic_id (GstClock *clock,
GstClockTime start_time,
GstClockTime interval);
/* operations on IDs */
GstClockTime gst_clock_id_get_time (GstClockID id);
void gst_clock_set_resolution (GstClock *clock, guint64 resolution);
guint64 gst_clock_get_resolution (GstClock *clock);
GstClockReturn gst_clock_id_wait (GstClockID id,
GstClockTimeDiff *jitter);
GstClockReturn gst_clock_id_wait_async (GstClockID id,
GstClockCallback func,
gpointer user_data);
void gst_clock_id_unschedule (GstClockID id);
void gst_clock_id_unlock (GstClockID id);
void gst_clock_id_free (GstClockID id);
G_END_DECLS

View file

@ -566,6 +566,7 @@ gst_scheduler_set_clock (GstScheduler *sched, GstClock *clock)
GST_DEBUG (GST_CAT_CLOCK, "scheduler setting clock %p (%s) on element %s", clock,
(clock ? GST_OBJECT_NAME (clock) : "nil"), GST_ELEMENT_NAME (element));
gst_element_set_clock (element, clock);
receivers = g_list_next (receivers);
}
@ -618,7 +619,11 @@ gst_scheduler_clock_wait (GstScheduler *sched, GstElement *element, GstClock *cl
if (CLASS (sched)->clock_wait)
return CLASS (sched)->clock_wait (sched, element, clock, time, jitter);
else
return gst_clock_wait (clock, time, jitter);
{
GstClockID id = gst_clock_new_single_shot_id (clock, time);
return gst_clock_id_wait (id, jitter);
}
return GST_CLOCK_TIMEOUT;
}

View file

@ -31,12 +31,15 @@
static GstClock *_the_system_clock = NULL;
static void gst_system_clock_class_init (GstSystemClockClass *klass);
static void gst_system_clock_init (GstSystemClock *clock);
static void gst_system_clock_class_init (GstSystemClockClass *klass);
static void gst_system_clock_init (GstSystemClock *clock);
static GstClockTime gst_system_clock_get_internal_time (GstClock *clock);
static guint64 gst_system_clock_get_resolution (GstClock *clock);
static GstClockTime gst_system_clock_get_internal_time (GstClock *clock);
static guint64 gst_system_clock_get_resolution (GstClock *clock);
static GstClockEntryStatus gst_system_clock_wait (GstClock *clock, GstClockEntry *entry);
static GCond *_gst_sysclock_cond = NULL;
static GMutex *_gst_sysclock_mutex = NULL;
static GstClockClass *parent_class = NULL;
/* static guint gst_system_clock_signals[LAST_SIGNAL] = { 0 }; */
@ -78,8 +81,12 @@ gst_system_clock_class_init (GstSystemClockClass *klass)
parent_class = g_type_class_ref (GST_TYPE_CLOCK);
gstclock_class->get_internal_time = gst_system_clock_get_internal_time;
gstclock_class->get_resolution = gst_system_clock_get_resolution;
gstclock_class->get_internal_time = gst_system_clock_get_internal_time;
gstclock_class->get_resolution = gst_system_clock_get_resolution;
gstclock_class->wait = gst_system_clock_wait;
_gst_sysclock_cond = g_cond_new ();
_gst_sysclock_mutex = g_mutex_new ();
}
static void
@ -99,6 +106,7 @@ gst_system_clock_obtain (void)
{
if (_the_system_clock == NULL) {
_the_system_clock = GST_CLOCK (g_object_new (GST_TYPE_SYSTEM_CLOCK, NULL));
gst_object_set_name (GST_OBJECT (_the_system_clock), "GstSystemClock");
}
return _the_system_clock;
@ -120,4 +128,30 @@ gst_system_clock_get_resolution (GstClock *clock)
return 1 * GST_USECOND;
}
static GstClockEntryStatus
gst_system_clock_wait (GstClock *clock, GstClockEntry *entry)
{
GstClockEntryStatus res = GST_CLOCK_ENTRY_OK;
GstClockTime current, target;
current = gst_clock_get_time (clock);
target = gst_system_clock_get_internal_time (clock) +
GST_CLOCK_ENTRY_TIME (entry) - current;
GST_DEBUG (GST_CAT_CLOCK, "real_target %llu, target %llu, now %llu",
target, GST_CLOCK_ENTRY_TIME (entry), current);
if (((gint64)target) > 0) {
GTimeVal tv;
GST_TIME_TO_TIMEVAL (target, tv);
g_mutex_lock (_gst_sysclock_mutex);
g_cond_timed_wait (_gst_sysclock_cond, _gst_sysclock_mutex, &tv);
g_mutex_unlock (_gst_sysclock_mutex);
}
else {
res = GST_CLOCK_ENTRY_EARLY;
}
return res;
}

View file

@ -5,7 +5,8 @@ plugin_LTLIBRARIES = \
libgstbasicwingoscheduler.la \
libgstfastomegascheduler.la \
libgstfastwingoscheduler.la \
libgstoptomegascheduler.la
libgstoptomegascheduler.la \
libgstoptwingoscheduler.la
libgstbasicomegascheduler_la_SOURCES = gstbasicscheduler.c
libgstbasicomegascheduler_la_CFLAGS = $(GST_CFLAGS) -D_COTHREADS_OMEGA
@ -40,6 +41,13 @@ libgstoptomegascheduler_la_CFLAGS = $(GST_CFLAGS) -D_COTHREADS_OMEGA
libgstoptomegascheduler_la_LIBADD = ../libcothreads.la
libgstoptomegascheduler_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
libgstoptwingoscheduler_la_SOURCES = gstoptimalscheduler.c
libgstoptwingoscheduler_la_CFLAGS = $(GST_CFLAGS) -D_COTHREADS_WINGO
libgstoptwingoscheduler_la_CFLAGS += -I$(top_builddir)/libs/ext/cothreads
libgstoptwingoscheduler_la_CFLAGS += -I$(top_srcdir)/libs/ext/cothreads
libgstoptwingoscheduler_la_LIBADD = $(top_builddir)/libs/ext/cothreads/cothreads/libcothreads-gthreads.la
libgstoptwingoscheduler_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
## this is a REALLY evil hack
## but we need to keep it as long as we have libs/gst and libs/ext
$(top_builddir)/libs/ext/cothreads/cothreads/libcothreads-gthreads.la:

View file

@ -1012,9 +1012,6 @@ gst_basic_scheduler_reset (GstScheduler *sched)
static void
gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
{
GList *pads;
GstPad *pad;
GstElement *peerelement;
GstSchedulerChain *chain;
GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
@ -1032,27 +1029,6 @@ gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
/* create a chain to hold it, and add */
chain = gst_basic_scheduler_chain_new (bsched);
gst_basic_scheduler_chain_add_element (chain, element);
/* set the sched pointer in all the pads */
pads = element->pads;
while (pads) {
pad = GST_PAD (pads->data);
pads = g_list_next (pads);
/* we only operate on real pads */
if (!GST_IS_REAL_PAD (pad))
continue;
/* if the peer element exists and is a candidate */
if (GST_PAD_PEER (pad)) {
peerelement = GST_PAD_PARENT (GST_PAD_PEER (pad));
if (GST_ELEMENT_SCHED (element) == GST_ELEMENT_SCHED (peerelement)) {
GST_INFO (GST_CAT_SCHEDULING, "peer is in same scheduler, chaining together");
/* make sure that the two elements are in the same chain */
gst_basic_scheduler_chain_elements (bsched, element, peerelement);
}
}
}
}
static void
@ -1296,7 +1272,11 @@ static GstClockReturn
gst_basic_scheduler_clock_wait (GstScheduler *sched, GstElement *element,
GstClock *clock, GstClockTime time, GstClockTimeDiff *jitter)
{
return gst_clock_wait (clock, time, jitter);
GstClockID id;
id = gst_clock_new_single_shot_id (clock, time);
return gst_clock_id_wait (id, jitter);
}
static GstSchedulerState

View file

@ -137,8 +137,10 @@ struct _GstOptSchedulerGroup {
gint num_enabled;
GstElement *entry; /* the group's entry point */
cothread *cothread; /* the cothread of this group */
GSList *providers; /* other groups that provide data
for this group */
cothread *cothread; /* the cothread of this group */
GroupScheduleFunction schedulefunc;
int argc;
char **argv;
@ -803,11 +805,11 @@ setup_group_scheduler (GstOptScheduler *osched, GstOptSchedulerGroup *group)
if (osched->use_cothreads) {
if (!(group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)) {
do_cothread_create (group->cothread, osched->context,
wrapper, 0, (char **) group);
(cothread_func) wrapper, 0, (char **) group);
}
else {
do_cothread_setfunc (group->cothread, osched->context,
wrapper, 0, (char **) group);
(cothread_func) wrapper, 0, (char **) group);
}
}
else {
@ -879,6 +881,18 @@ gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gi
return res;
}
static void
get_group (GstElement *element, GstOptSchedulerGroup **group)
{
GstOptSchedulerCtx *ctx;
ctx = GST_ELEMENT_SCHED_CONTEXT (element);
if (ctx)
*group = ctx->group;
else
*group = NULL;
}
/*
* the idea is to put the two elements into the same group.
* - When no element is inside a group, we create a new group and add
@ -891,15 +905,10 @@ gst_opt_scheduler_state_transition (GstScheduler *sched, GstElement *element, gi
static GstOptSchedulerGroup*
group_elements (GstOptScheduler *osched, GstElement *element1, GstElement *element2)
{
GstOptSchedulerCtx *ctx1, *ctx2;
GstOptSchedulerGroup *group1 = NULL, *group2 = NULL, *group = NULL;
GstOptSchedulerGroup *group1, *group2, *group = NULL;
ctx1 = GST_ELEMENT_SCHED_CONTEXT (element1);
if (ctx1)
group1 = ctx1->group;
ctx2 = GST_ELEMENT_SCHED_CONTEXT (element2);
if (ctx2)
group2 = ctx2->group;
get_group (element1, &group1);
get_group (element2, &group2);
/* none of the elements is added to a group, create a new group
* and chain to add the elements to */
@ -1008,14 +1017,29 @@ gst_opt_scheduler_add_element (GstScheduler *sched, GstElement *element)
static void
gst_opt_scheduler_remove_element (GstScheduler *sched, GstElement *element)
{
//GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
GstOptSchedulerGroup *group;
GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from scheduler", GST_ELEMENT_NAME (element));
/* decoupled elements are not added to the scheduler lists and should therefor
* no be removed */
if (GST_ELEMENT_IS_DECOUPLED (element))
return;
/* the element is guaranteed to live in it's own group/chain now */
get_group (element, &group);
if (group) {
if (group->chain) {
remove_from_chain (group->chain, group);
delete_chain (osched, group->chain);
}
delete_group (group);
}
g_free (GST_ELEMENT_SCHED_CONTEXT (element));
GST_ELEMENT_SCHED_CONTEXT (element) = NULL;
g_warning ("remove implement me");
}
static void
@ -1080,14 +1104,34 @@ gst_opt_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sink
type = GST_OPT_LOOP_TO_CHAIN;
}
else if (element2->loopfunc) {
if (GST_RPAD_GETFUNC (srcpad))
if (GST_RPAD_GETFUNC (srcpad)) {
type = GST_OPT_GET_TO_LOOP;
/* this could be tricky, the get based source could
* already be part of a loop based group in another pad,
* we assert on that for now */
if (GST_ELEMENT_SCHED_CONTEXT (element1) &&
GST_ELEMENT_SCHED_GROUP (element1) != NULL)
{
g_warning ("internal error: cannot schedule get to loop with get in group");
return;
}
}
else
type = GST_OPT_CHAIN_TO_LOOP;
}
else {
if (GST_RPAD_GETFUNC (srcpad) && GST_RPAD_CHAINFUNC (sinkpad))
if (GST_RPAD_GETFUNC (srcpad) && GST_RPAD_CHAINFUNC (sinkpad)) {
type = GST_OPT_GET_TO_CHAIN;
/* the get based source could already be part of a loop
* based group in another pad,
* we assert on that for now */
if (GST_ELEMENT_SCHED_CONTEXT (element1) &&
GST_ELEMENT_SCHED_GROUP (element1) != NULL)
{
g_warning ("internal error: cannot schedule get to loop with get in group");
return;
}
}
else
type = GST_OPT_CHAIN_TO_CHAIN;
}
@ -1182,14 +1226,78 @@ gst_opt_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sink
}
}
static gboolean
element_has_connection_with_group (GstElement *element, GstOptSchedulerGroup *group)
{
gboolean connected = FALSE;
const GList *pads;
/* see if the element has no more connections to the peer group */
pads = gst_element_get_pad_list (element);
while (pads && !connected) {
GstPad *pad = GST_PAD_CAST (pads->data);
pads = g_list_next (pads);
/* we only operate on real pads */
if (!GST_IS_REAL_PAD (pad))
continue;
if (GST_PAD_PEER (pad)) {
}
}
return connected;
}
static void
gst_opt_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
{
//GstOptScheduler *osched = GST_OPT_SCHEDULER_CAST (sched);
GstElement *element1, *element2;
GstOptSchedulerGroup *group1, *group2;
gboolean still_connect;
GST_INFO (GST_CAT_SCHEDULING, "pad disconnect between \"%s:%s\" and \"%s:%s\"",
GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
g_warning ("pad disconnect, implement me");
element1 = GST_PAD_PARENT (srcpad);
element2 = GST_PAD_PARENT (sinkpad);
get_group (element1, &group1);
get_group (element2, &group2);
/* having no groups is pretty bad, this means that two decoupled
* elements were connected or something */
if (!group1 && !group2) {
g_warning ("internal error: cannot disconnect pads");
return;
}
/* see if the group has to be broken up */
if (group1)
still_connect = element_has_connection_with_group (element2, group1);
else
still_connect = element_has_connection_with_group (element1, group2);
/* if there is still a connection, we don't need to break this group */
if (still_connect)
return;
/* if they are equal, they both are non zero */
if (group1 == group2) {
g_warning ("pad disconnect: implement me");
}
else if (group1) {
g_warning ("pad disconnect: implement me");
}
else {
/* there was no group for element1, see if the element
* was an entry point for group2 */
if (group2) {
if (group2->entry == element1) {
group2->entry = NULL;
}
}
}
}
static GstPad*
@ -1206,7 +1314,11 @@ static GstClockReturn
gst_opt_scheduler_clock_wait (GstScheduler *sched, GstElement *element,
GstClock *clock, GstClockTime time, GstClockTimeDiff *jitter)
{
return gst_clock_wait (clock, time, jitter);
GstClockID id;
id = gst_clock_new_single_shot_id (clock, time);
return gst_clock_id_wait (id, jitter);
}
/* a scheduler iteration is done by looping and scheduling the active chains */