reworked internal parameters a bit, added leaky-queue semantics

Original commit message from CVS:
reworked internal parameters a bit, added leaky-queue semantics
This commit is contained in:
Erik Walthinsen 2001-05-23 19:09:28 +00:00
parent fe0a39cf60
commit d7e533b5ea
4 changed files with 208 additions and 102 deletions

View file

@ -48,15 +48,22 @@ GstElementDetails gst_queue_details = {
/* Queue signals and args */
enum {
/* FILL ME */
LOW_WATERMARK,
HIGH_WATERMARK,
LAST_SIGNAL
};
enum {
ARG_0,
ARG_LEVEL_BUFFERS,
ARG_LEVEL_BYTES,
ARG_LEVEL_TIME,
ARG_SIZE_BUFFERS,
ARG_SIZE_BYTES,
ARG_SIZE_TIME,
ARG_LEAKY,
ARG_LEVEL,
ARG_MAX_LEVEL,
// ARG_BLOCK,
};
@ -78,6 +85,23 @@ static void gst_queue_flush (GstQueue *queue);
static GstElementStateReturn gst_queue_change_state (GstElement *element);
static GtkType
queue_leaky_get_type(void) {
static GtkType queue_leaky_type = 0;
static GtkEnumValue queue_leaky[] = {
{ GST_QUEUE_NO_LEAK, "0", "Not Leaky" },
{ GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream" },
{ GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream" },
{ 0, NULL, NULL },
};
if (!queue_leaky_type) {
queue_leaky_type = gtk_type_register_enum("GstQueueLeaky", queue_leaky);
}
return queue_leaky_type;
}
#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type())
static GstElementClass *parent_class = NULL;
//static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
@ -112,12 +136,12 @@ gst_queue_class_init (GstQueueClass *klass)
parent_class = gtk_type_class (GST_TYPE_ELEMENT);
gtk_object_add_arg_type ("GstQueue::leaky", GST_TYPE_QUEUE_LEAKY,
GTK_ARG_READWRITE, ARG_LEAKY);
gtk_object_add_arg_type ("GstQueue::level", GTK_TYPE_INT,
GTK_ARG_READABLE, ARG_LEVEL);
gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT,
GTK_ARG_READWRITE, ARG_MAX_LEVEL);
// gtk_object_add_arg_type ("GstQueue::block", GTK_TYPE_BOOL,
// GTK_ARG_READWRITE, ARG_BLOCK);
gtkobject_class->set_arg = gst_queue_set_arg;
gtkobject_class->get_arg = gst_queue_get_arg;
@ -145,11 +169,11 @@ gst_queue_init (GstQueue *queue)
queue->queue = NULL;
queue->level_buffers = 0;
queue->max_buffers = 100;
// queue->block = TRUE;
queue->level_bytes = 0;
queue->size_buffers = 0;
queue->size_bytes = 0;
queue->level_time = 0LL;
queue->size_buffers = 100; // 100 buffers
queue->size_bytes = 100 * 1024; // 100KB
queue->size_time = 1000000000LL; // 1sec
queue->emptycond = g_cond_new ();
queue->fullcond = g_cond_new ();
@ -259,18 +283,43 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
/* we have to lock the queue since we span threads */
GST_DEBUG (GST_CAT_DATAFLOW,"try have queue lock\n");
// GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name);
GST_LOCK (queue);
GST_DEBUG (GST_CAT_DATAFLOW,"%s adding buffer %p %ld\n", name, buf, pthread_self ());
GST_DEBUG (GST_CAT_DATAFLOW,"have queue lock\n");
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n");
gst_queue_flush (queue);
}
GST_DEBUG (GST_CAT_DATAFLOW,"%s: chain %d %p\n", name, queue->level_buffers, buf);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
while (queue->level_buffers >= queue->max_buffers) {
if (queue->level_buffers >= queue->size_buffers) {
// if this is a leaky queue...
if (queue->leaky) {
// if we leak on the upstream side, drop the current buffer
if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
gst_buffer_unref(buf);
// now we have to clean up and exit right away
GST_UNLOCK (queue);
return;
}
// otherwise we have to push a buffer off the other end
else {
GSList *front;
GstBuffer *leakbuf;
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
front = queue->queue;
leakbuf = (GstBuffer *)(front->data);
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
gst_buffer_unref(leakbuf);
queue->queue = g_slist_remove_link (queue->queue, front);
g_slist_free (front);
}
}
while (queue->level_buffers >= queue->size_buffers) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
@ -280,29 +329,26 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
cothread_switch(cothread_current_main());
}
GST_DEBUG (GST_CAT_DATAFLOW, "%s waiting %d\n", name, queue->level_buffers);
GST_DEBUG (GST_CAT_DATAFLOW, "%s: O\n", name);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers);
g_cond_signal (queue->emptycond);
g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock);
STATUS("%s: O+\n");
GST_DEBUG (GST_CAT_DATAFLOW, "%s waiting done %d\n", name, queue->level_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers);
}
}
/* put the buffer on the tail of the list */
queue->queue = g_slist_append (queue->queue, buf);
GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
queue->level_buffers++;
queue->level_bytes += GST_BUFFER_SIZE(buf);
// GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
/* if we were empty, but aren't any more, signal a condition */
queue->level_buffers++;
// if (queue->level_buffers >= 0)
if (queue->level_buffers == 1)
{
GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name);
g_cond_signal (queue->emptycond);
}
GST_DEBUG (GST_CAT_DATAFLOW,"%s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
GST_UNLOCK (queue);
}
@ -328,13 +374,6 @@ gst_queue_get (GstPad *pad)
GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name);
// we bail if there's nothing there
// g_assert(queue->block);
// if (!queue->level_buffers && !queue->block) {
// GST_UNLOCK(queue);
// return NULL;
// }
while (!queue->level_buffers) {
if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name);
@ -363,14 +402,15 @@ gst_queue_get (GstPad *pad)
queue->queue = g_slist_remove_link (queue->queue, front);
g_slist_free (front);
// if (queue->level_buffers < queue->max_buffers)
if (queue->level_buffers == queue->max_buffers)
// if (queue->level_buffers < queue->size_buffers)
if (queue->level_buffers == queue->size_buffers)
{
GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name);
g_cond_signal (queue->fullcond);
}
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf);
GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
GST_UNLOCK(queue);
@ -450,12 +490,12 @@ gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id)
queue = GST_QUEUE (object);
switch(id) {
case ARG_MAX_LEVEL:
queue->max_buffers = GTK_VALUE_INT (*arg);
case ARG_LEAKY:
queue->leaky = GTK_VALUE_INT (*arg);
break;
case ARG_MAX_LEVEL:
queue->size_buffers = GTK_VALUE_INT (*arg);
break;
// case ARG_BLOCK:
// queue->block = GTK_VALUE_BOOL (*arg);
// break;
default:
break;
}
@ -472,15 +512,15 @@ gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id)
queue = GST_QUEUE (object);
switch (id) {
case ARG_LEAKY:
GTK_VALUE_INT (*arg) = queue->leaky;
break;
case ARG_LEVEL:
GTK_VALUE_INT (*arg) = queue->level_buffers;
break;
case ARG_MAX_LEVEL:
GTK_VALUE_INT (*arg) = queue->max_buffers;
GTK_VALUE_INT (*arg) = queue->size_buffers;
break;
// case ARG_BLOCK:
// GTK_VALUE_BOOL (*arg) = queue->block;
// break;
default:
arg->type = GTK_TYPE_INVALID;
break;

View file

@ -47,6 +47,12 @@ GstElementDetails gst_queue_details;
#define GST_IS_QUEUE_CLASS(obj) \
(GTK_CHECK_CLASS_TYPE((klass),GST_TYPE_QUEUE))
enum {
GST_QUEUE_NO_LEAK = 0,
GST_QUEUE_LEAK_UPSTREAM = 1,
GST_QUEUE_LEAK_DOWNSTREAM = 2
};
typedef struct _GstQueue GstQueue;
typedef struct _GstQueueClass GstQueueClass;
@ -60,11 +66,14 @@ struct _GstQueue {
GSList *queue;
gint level_buffers; /* number of buffers queued here */
gint max_buffers; /* maximum number of buffers queued here */
// gboolean block; /* if set to FALSE, _get returns NULL if queue empty */
gint level_bytes; /* number of bytes queued here */
guint64 level_time; /* amount of time queued here */
gint size_buffers; /* size of queue in buffers */
gint size_bytes; /* size of queue in bytes */
guint64 size_time; /* size of queue in time */
gint leaky; /* whether the queue is leaky, and if so at which end */
GCond *emptycond;
GCond *fullcond;
@ -74,6 +83,10 @@ struct _GstQueue {
struct _GstQueueClass {
GstElementClass parent_class;
/* signal callbacks */
void (*low_watermark) (GstQueue *queue, gint level);
void (*high_watermark) (GstQueue *queue, gint level);
};
GtkType gst_queue_get_type (void);

View file

@ -48,15 +48,22 @@ GstElementDetails gst_queue_details = {
/* Queue signals and args */
enum {
/* FILL ME */
LOW_WATERMARK,
HIGH_WATERMARK,
LAST_SIGNAL
};
enum {
ARG_0,
ARG_LEVEL_BUFFERS,
ARG_LEVEL_BYTES,
ARG_LEVEL_TIME,
ARG_SIZE_BUFFERS,
ARG_SIZE_BYTES,
ARG_SIZE_TIME,
ARG_LEAKY,
ARG_LEVEL,
ARG_MAX_LEVEL,
// ARG_BLOCK,
};
@ -78,6 +85,23 @@ static void gst_queue_flush (GstQueue *queue);
static GstElementStateReturn gst_queue_change_state (GstElement *element);
static GtkType
queue_leaky_get_type(void) {
static GtkType queue_leaky_type = 0;
static GtkEnumValue queue_leaky[] = {
{ GST_QUEUE_NO_LEAK, "0", "Not Leaky" },
{ GST_QUEUE_LEAK_UPSTREAM, "1", "Leaky on Upstream" },
{ GST_QUEUE_LEAK_DOWNSTREAM, "2", "Leaky on Downstream" },
{ 0, NULL, NULL },
};
if (!queue_leaky_type) {
queue_leaky_type = gtk_type_register_enum("GstQueueLeaky", queue_leaky);
}
return queue_leaky_type;
}
#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type())
static GstElementClass *parent_class = NULL;
//static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
@ -112,12 +136,12 @@ gst_queue_class_init (GstQueueClass *klass)
parent_class = gtk_type_class (GST_TYPE_ELEMENT);
gtk_object_add_arg_type ("GstQueue::leaky", GST_TYPE_QUEUE_LEAKY,
GTK_ARG_READWRITE, ARG_LEAKY);
gtk_object_add_arg_type ("GstQueue::level", GTK_TYPE_INT,
GTK_ARG_READABLE, ARG_LEVEL);
gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT,
GTK_ARG_READWRITE, ARG_MAX_LEVEL);
// gtk_object_add_arg_type ("GstQueue::block", GTK_TYPE_BOOL,
// GTK_ARG_READWRITE, ARG_BLOCK);
gtkobject_class->set_arg = gst_queue_set_arg;
gtkobject_class->get_arg = gst_queue_get_arg;
@ -145,11 +169,11 @@ gst_queue_init (GstQueue *queue)
queue->queue = NULL;
queue->level_buffers = 0;
queue->max_buffers = 100;
// queue->block = TRUE;
queue->level_bytes = 0;
queue->size_buffers = 0;
queue->size_bytes = 0;
queue->level_time = 0LL;
queue->size_buffers = 100; // 100 buffers
queue->size_bytes = 100 * 1024; // 100KB
queue->size_time = 1000000000LL; // 1sec
queue->emptycond = g_cond_new ();
queue->fullcond = g_cond_new ();
@ -259,18 +283,43 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
/* we have to lock the queue since we span threads */
GST_DEBUG (GST_CAT_DATAFLOW,"try have queue lock\n");
// GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name);
GST_LOCK (queue);
GST_DEBUG (GST_CAT_DATAFLOW,"%s adding buffer %p %ld\n", name, buf, pthread_self ());
GST_DEBUG (GST_CAT_DATAFLOW,"have queue lock\n");
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n");
gst_queue_flush (queue);
}
GST_DEBUG (GST_CAT_DATAFLOW,"%s: chain %d %p\n", name, queue->level_buffers, buf);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
while (queue->level_buffers >= queue->max_buffers) {
if (queue->level_buffers >= queue->size_buffers) {
// if this is a leaky queue...
if (queue->leaky) {
// if we leak on the upstream side, drop the current buffer
if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
gst_buffer_unref(buf);
// now we have to clean up and exit right away
GST_UNLOCK (queue);
return;
}
// otherwise we have to push a buffer off the other end
else {
GSList *front;
GstBuffer *leakbuf;
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
front = queue->queue;
leakbuf = (GstBuffer *)(front->data);
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
gst_buffer_unref(leakbuf);
queue->queue = g_slist_remove_link (queue->queue, front);
g_slist_free (front);
}
}
while (queue->level_buffers >= queue->size_buffers) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
@ -280,29 +329,26 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
cothread_switch(cothread_current_main());
}
GST_DEBUG (GST_CAT_DATAFLOW, "%s waiting %d\n", name, queue->level_buffers);
GST_DEBUG (GST_CAT_DATAFLOW, "%s: O\n", name);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers);
g_cond_signal (queue->emptycond);
g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock);
STATUS("%s: O+\n");
GST_DEBUG (GST_CAT_DATAFLOW, "%s waiting done %d\n", name, queue->level_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers);
}
}
/* put the buffer on the tail of the list */
queue->queue = g_slist_append (queue->queue, buf);
GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
queue->level_buffers++;
queue->level_bytes += GST_BUFFER_SIZE(buf);
// GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
/* if we were empty, but aren't any more, signal a condition */
queue->level_buffers++;
// if (queue->level_buffers >= 0)
if (queue->level_buffers == 1)
{
GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name);
g_cond_signal (queue->emptycond);
}
GST_DEBUG (GST_CAT_DATAFLOW,"%s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond);
GST_UNLOCK (queue);
}
@ -328,13 +374,6 @@ gst_queue_get (GstPad *pad)
GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name);
// we bail if there's nothing there
// g_assert(queue->block);
// if (!queue->level_buffers && !queue->block) {
// GST_UNLOCK(queue);
// return NULL;
// }
while (!queue->level_buffers) {
if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name);
@ -363,14 +402,15 @@ gst_queue_get (GstPad *pad)
queue->queue = g_slist_remove_link (queue->queue, front);
g_slist_free (front);
// if (queue->level_buffers < queue->max_buffers)
if (queue->level_buffers == queue->max_buffers)
// if (queue->level_buffers < queue->size_buffers)
if (queue->level_buffers == queue->size_buffers)
{
GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name);
g_cond_signal (queue->fullcond);
}
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf);
GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
GST_UNLOCK(queue);
@ -450,12 +490,12 @@ gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id)
queue = GST_QUEUE (object);
switch(id) {
case ARG_MAX_LEVEL:
queue->max_buffers = GTK_VALUE_INT (*arg);
case ARG_LEAKY:
queue->leaky = GTK_VALUE_INT (*arg);
break;
case ARG_MAX_LEVEL:
queue->size_buffers = GTK_VALUE_INT (*arg);
break;
// case ARG_BLOCK:
// queue->block = GTK_VALUE_BOOL (*arg);
// break;
default:
break;
}
@ -472,15 +512,15 @@ gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id)
queue = GST_QUEUE (object);
switch (id) {
case ARG_LEAKY:
GTK_VALUE_INT (*arg) = queue->leaky;
break;
case ARG_LEVEL:
GTK_VALUE_INT (*arg) = queue->level_buffers;
break;
case ARG_MAX_LEVEL:
GTK_VALUE_INT (*arg) = queue->max_buffers;
GTK_VALUE_INT (*arg) = queue->size_buffers;
break;
// case ARG_BLOCK:
// GTK_VALUE_BOOL (*arg) = queue->block;
// break;
default:
arg->type = GTK_TYPE_INVALID;
break;

View file

@ -47,6 +47,12 @@ GstElementDetails gst_queue_details;
#define GST_IS_QUEUE_CLASS(obj) \
(GTK_CHECK_CLASS_TYPE((klass),GST_TYPE_QUEUE))
enum {
GST_QUEUE_NO_LEAK = 0,
GST_QUEUE_LEAK_UPSTREAM = 1,
GST_QUEUE_LEAK_DOWNSTREAM = 2
};
typedef struct _GstQueue GstQueue;
typedef struct _GstQueueClass GstQueueClass;
@ -60,11 +66,14 @@ struct _GstQueue {
GSList *queue;
gint level_buffers; /* number of buffers queued here */
gint max_buffers; /* maximum number of buffers queued here */
// gboolean block; /* if set to FALSE, _get returns NULL if queue empty */
gint level_bytes; /* number of bytes queued here */
guint64 level_time; /* amount of time queued here */
gint size_buffers; /* size of queue in buffers */
gint size_bytes; /* size of queue in bytes */
guint64 size_time; /* size of queue in time */
gint leaky; /* whether the queue is leaky, and if so at which end */
GCond *emptycond;
GCond *fullcond;
@ -74,6 +83,10 @@ struct _GstQueue {
struct _GstQueueClass {
GstElementClass parent_class;
/* signal callbacks */
void (*low_watermark) (GstQueue *queue, gint level);
void (*high_watermark) (GstQueue *queue, gint level);
};
GtkType gst_queue_get_type (void);