added taaz's threading patch, including queue events

Original commit message from CVS:
added taaz's threading patch, including queue events
This commit is contained in:
Erik Walthinsen 2001-10-27 20:28:31 +00:00
parent c5f4648dd9
commit 42ec9085a2
11 changed files with 284 additions and 195 deletions

View file

@ -679,8 +679,10 @@ gst_fakesrc_get(GstPad *pad)
g_print("fakesrc: get ******* (%s:%s)> (%d bytes, %llu) \n",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "pre handoff emit\n");
g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
buf, pad);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "post handoff emit\n");
return buf;
}

View file

@ -37,7 +37,6 @@ GstElementDetails gst_statistics_details = {
/* Statistics signals and args */
enum {
SIGNAL_UPDATE,
/* FILL ME */
LAST_SIGNAL
};
@ -66,7 +65,7 @@ static void gst_statistics_reset (GstStatistics *statistics);
static void gst_statistics_print (GstStatistics *statistics);
static GstElementClass *parent_class = NULL;
static guint gst_statistics_signals[LAST_SIGNAL] = { 0 };
static guint gst_statistics_signals[LAST_SIGNAL] = { 0, };
static stats zero_stats = { 0, };
@ -131,7 +130,7 @@ gst_statistics_class_init (GstStatisticsClass *klass)
gst_statistics_signals[SIGNAL_UPDATE] =
g_signal_new ("update", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstStatisticsClass, update), NULL, NULL,
g_cclosure_marshal_VOID__POINTER, G_TYPE_NONE, 0);
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_statistics_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_statistics_get_property);
@ -283,10 +282,12 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf)
if (GST_IS_EVENT(buf)) {
GstEvent *event = GST_EVENT (buf);
gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
statistics->stats.events += 1;
if (statistics->update_on_eos && (GST_EVENT_TYPE(event) == GST_EVENT_EOS)) {
update = TRUE;
if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
if (statistics->update_on_eos) {
update = TRUE;
}
}
if (statistics->update_freq.events) {
statistics->update_count.events += 1;
@ -317,7 +318,9 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf)
if (update) {
if (statistics->update) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "pre update emit\n");
g_signal_emit (G_OBJECT (statistics), gst_statistics_signals[SIGNAL_UPDATE], 0);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "post update emit\n");
}
if (!statistics->silent) {
gst_statistics_print(statistics);

View file

@ -257,7 +257,7 @@ struct _GstGhostPadClass {
#define GST_GPAD_REALPAD(pad) (((GstGhostPad *)(pad))->realpad)
/* Generic */
#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
#define GST_PAD_DIRECTION(pad) GST_RPAD_DIRECTION(GST_PAD_REALIZE(pad))
#define GST_PAD_CAPS(pad) GST_RPAD_CAPS(GST_PAD_REALIZE(pad))
#define GST_PAD_PEER(pad) GST_RPAD_PEER(GST_PAD_REALIZE(pad))

View file

@ -81,6 +81,7 @@ static void gst_queue_chain (GstPad *pad, GstBuffer *buf);
static GstBuffer * gst_queue_get (GstPad *pad);
static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
static void gst_queue_locked_flush (GstQueue *queue);
static void gst_queue_flush (GstQueue *queue);
static GstElementStateReturn gst_queue_change_state (GstElement *element);
@ -180,9 +181,12 @@ gst_queue_init (GstQueue *queue)
queue->size_bytes = 100 * 1024; // 100KB
queue->size_time = 1000000000LL; // 1sec
queue->emptycond = g_cond_new ();
queue->fullcond = g_cond_new ();
GST_DEBUG(GST_CAT_THREAD, "initialized queue's emptycond and fullcond\n");
queue->qlock = g_mutex_new ();
queue->reader = FALSE;
queue->writer = FALSE;
queue->not_empty = g_cond_new ();
queue->not_full = g_cond_new ();
GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
}
static GstBufferPool*
@ -215,41 +219,18 @@ gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data)
return gst_pad_negotiate_proxy (pad, queue->srcpad, caps);
}
static gboolean
gst_queue_handle_event (GstPad *pad)
{
GstQueue *queue;
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
GST_DEBUG (GST_CAT_DATAFLOW,"%s received event\n", GST_ELEMENT_NAME (queue));
GST_LOCK (queue);
GST_DEBUG (GST_CAT_DATAFLOW,"%s has %d buffers left\n", GST_ELEMENT_NAME (queue),
queue->level_buffers);
GST_FLAG_SET (pad, GST_PAD_EOS);
g_cond_signal (queue->emptycond);
GST_UNLOCK (queue);
return TRUE;
}
static void
gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
{
GST_DEBUG (GST_CAT_DATAFLOW,"%s cleaning buffer %p\n", (gchar *)user_data, data);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
gst_buffer_unref (GST_BUFFER (data));
}
static void
gst_queue_flush (GstQueue *queue)
gst_queue_locked_flush (GstQueue *queue)
{
g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
(char *) GST_ELEMENT_NAME (queue));
(gpointer) queue);
g_slist_free (queue->queue);
queue->queue = NULL;
@ -257,40 +238,63 @@ gst_queue_flush (GstQueue *queue)
queue->timeval = NULL;
}
static void
gst_queue_flush (GstQueue *queue)
{
g_mutex_lock (queue->qlock);
gst_queue_locked_flush (queue);
g_mutex_unlock (queue->qlock);
}
static void
gst_queue_chain (GstPad *pad, GstBuffer *buf)
{
GstQueue *queue;
const guchar *name;
gboolean reader;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
name = GST_ELEMENT_NAME (queue);
reader = FALSE;
/* we have to lock the queue since we span threads */
// GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name);
GST_LOCK (queue);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
g_mutex_lock (queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n");
gst_queue_flush (queue);
if (GST_IS_EVENT(buf)) {
GstEvent *event = GST_EVENT(buf);
switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_FLUSH:
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "flushing queue\n");
gst_queue_locked_flush (queue);
break;
default:
break;
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
if (queue->level_buffers >= queue->size_buffers) {
if (queue->level_buffers == queue->size_buffers) {
// if this is a leaky queue...
if (queue->leaky) {
// FIXME don't want to leak events!
// if we leak on the upstream side, drop the current buffer
if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
if (GST_IS_EVENT (buf))
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(buf)));
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
gst_buffer_unref(buf);
// now we have to clean up and exit right away
GST_UNLOCK (queue);
g_mutex_unlock (queue->qlock);
return;
}
// otherwise we have to push a buffer off the other end
@ -300,6 +304,10 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
front = queue->queue;
leakbuf = (GstBuffer *)(front->data);
if (GST_IS_EVENT (leakbuf))
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(leakbuf)));
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
gst_buffer_unref(leakbuf);
@ -308,7 +316,9 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
}
}
while (queue->level_buffers >= queue->size_buffers) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
queue->level_buffers, queue->size_buffers);
while (queue->level_buffers == queue->size_buffers) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@ -316,36 +326,45 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) !=
GST_STATE_VOID_PENDING)
{
GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
GST_UNLOCK(queue);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers);
g_cond_signal (queue->emptycond);
g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->writer)
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
queue->writer = TRUE;
g_cond_wait (queue->not_full, queue->qlock);
queue->writer = FALSE;
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
queue->level_buffers, queue->size_buffers);
}
/* put the buffer on the tail of the list */
queue->queue = g_slist_append (queue->queue, buf);
queue->level_buffers++;
queue->level_bytes += GST_BUFFER_SIZE(buf);
// GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
GST_DEBUG_PAD_NAME(pad),
queue->level_buffers, queue->size_buffers);
/* if we were empty, but aren't any more, signal a condition */
if (queue->level_buffers == 1)
/* reader waiting on an empty queue */
reader = queue->reader;
g_mutex_unlock (queue->qlock);
if (reader)
{
GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name);
g_cond_signal (queue->emptycond);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
g_cond_signal (queue->not_empty);
}
GST_UNLOCK (queue);
}
static GstBuffer *
@ -354,7 +373,7 @@ gst_queue_get (GstPad *pad)
GstQueue *queue;
GstBuffer *buf = NULL;
GSList *front;
const guchar *name;
gboolean writer;
g_assert(pad != NULL);
g_assert(GST_IS_PAD(pad));
@ -362,22 +381,16 @@ gst_queue_get (GstPad *pad)
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
name = GST_ELEMENT_NAME (queue);
writer = FALSE;
/* have to lock for thread-safety */
GST_DEBUG (GST_CAT_DATAFLOW,"%s try have queue lock\n", name);
GST_LOCK (queue);
GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name);
while (!queue->level_buffers) {
if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name);
GST_UNLOCK(queue);
// this return NULL shouldn't hurt anything...
return NULL;
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
g_mutex_lock (queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
while (queue->level_buffers == 0) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@ -385,37 +398,60 @@ gst_queue_get (GstPad *pad)
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) !=
GST_STATE_VOID_PENDING)
{
GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
GST_UNLOCK(queue);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
}
g_cond_signal (queue->fullcond);
g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->reader)
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
queue->reader = TRUE;
g_cond_wait (queue->not_empty, queue->qlock);
queue->reader = FALSE;
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
front = queue->queue;
buf = (GstBuffer *)(front->data);
GST_DEBUG (GST_CAT_DATAFLOW,"retrieved buffer %p from queue\n",buf);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
queue->queue = g_slist_remove_link (queue->queue, front);
g_slist_free (front);
// if (queue->level_buffers < queue->size_buffers)
if (queue->level_buffers == queue->size_buffers)
{
GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name);
g_cond_signal (queue->fullcond);
}
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf);
GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
GST_DEBUG_PAD_NAME(pad),
queue->level_buffers, queue->size_buffers);
GST_UNLOCK(queue);
/* writer waiting on a full queue */
writer = queue->writer;
g_mutex_unlock (queue->qlock);
if (writer)
{
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
g_cond_signal (queue->not_full);
}
// FIXME where should this be? locked?
if (GST_IS_EVENT(buf)) {
GstEvent *event = GST_EVENT(buf);
switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_EOS:
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue eos\n");
gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED);
break;
default:
break;
}
}
return buf;
}

View file

@ -75,9 +75,12 @@ struct _GstQueue {
gint leaky; /* whether the queue is leaky, and if so at which end */
// GMutex *lock; (optimization?)
GCond *emptycond;
GCond *fullcond;
GMutex *qlock; /* lock for queue (vs object lock) */
/* we are single reader and single writer queue */
gboolean reader; /* reader waiting on empty queue */
gboolean writer; /* writer waiting on full queue */
GCond *not_empty; /* signals buffers now available for reading */
GCond *not_full; /* signals space now available for writing */
GTimeVal *timeval; /* the timeout for the queue locking */
};

View file

@ -1490,6 +1490,7 @@ GST_DEBUG(GST_CAT_SCHEDULING,"there are %d elements in this chain\n",chain->num_
} else {
GST_INFO (GST_CAT_DATAFLOW,"NO ENTRY INTO CHAIN!");
gst_schedule_show(sched);
//eos = TRUE;
}
} else {

View file

@ -382,10 +382,10 @@ gst_thread_change_state (GstElement *element)
//
//FIXME also make this more efficient by keeping list of managed queues
THR_DEBUG("waking queue \"%s\"\n",GST_ELEMENT_NAME(e));
GST_LOCK(e);
g_cond_signal((GST_QUEUE(e)->emptycond));
g_cond_signal((GST_QUEUE(e)->fullcond));
GST_UNLOCK(e);
//GST_LOCK(e);
g_cond_signal((GST_QUEUE(e)->not_empty));
g_cond_signal((GST_QUEUE(e)->not_full));
//GST_UNLOCK(e);
}
else
{
@ -417,10 +417,10 @@ gst_thread_change_state (GstElement *element)
if (GST_ELEMENT_SCHED(peerelement) != GST_ELEMENT_SCHED(thread))
{
THR_DEBUG(" element \"%s\" has pad cross sched boundary\n",GST_ELEMENT_NAME(e));
GST_LOCK(peerelement);
g_cond_signal(GST_QUEUE(peerelement)->emptycond);
g_cond_signal(GST_QUEUE(peerelement)->fullcond);
GST_UNLOCK(peerelement);
//GST_LOCK(peerelement);
g_cond_signal(GST_QUEUE(peerelement)->not_empty);
g_cond_signal(GST_QUEUE(peerelement)->not_full);
//GST_UNLOCK(peerelement);
}
}
}

View file

@ -679,8 +679,10 @@ gst_fakesrc_get(GstPad *pad)
g_print("fakesrc: get ******* (%s:%s)> (%d bytes, %llu) \n",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "pre handoff emit\n");
g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
buf, pad);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, src, "post handoff emit\n");
return buf;
}

View file

@ -81,6 +81,7 @@ static void gst_queue_chain (GstPad *pad, GstBuffer *buf);
static GstBuffer * gst_queue_get (GstPad *pad);
static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
static void gst_queue_locked_flush (GstQueue *queue);
static void gst_queue_flush (GstQueue *queue);
static GstElementStateReturn gst_queue_change_state (GstElement *element);
@ -180,9 +181,12 @@ gst_queue_init (GstQueue *queue)
queue->size_bytes = 100 * 1024; // 100KB
queue->size_time = 1000000000LL; // 1sec
queue->emptycond = g_cond_new ();
queue->fullcond = g_cond_new ();
GST_DEBUG(GST_CAT_THREAD, "initialized queue's emptycond and fullcond\n");
queue->qlock = g_mutex_new ();
queue->reader = FALSE;
queue->writer = FALSE;
queue->not_empty = g_cond_new ();
queue->not_full = g_cond_new ();
GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
}
static GstBufferPool*
@ -215,41 +219,18 @@ gst_queue_handle_negotiate_sink (GstPad *pad, GstCaps **caps, gpointer *data)
return gst_pad_negotiate_proxy (pad, queue->srcpad, caps);
}
static gboolean
gst_queue_handle_event (GstPad *pad)
{
GstQueue *queue;
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
GST_DEBUG (GST_CAT_DATAFLOW,"%s received event\n", GST_ELEMENT_NAME (queue));
GST_LOCK (queue);
GST_DEBUG (GST_CAT_DATAFLOW,"%s has %d buffers left\n", GST_ELEMENT_NAME (queue),
queue->level_buffers);
GST_FLAG_SET (pad, GST_PAD_EOS);
g_cond_signal (queue->emptycond);
GST_UNLOCK (queue);
return TRUE;
}
static void
gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
{
GST_DEBUG (GST_CAT_DATAFLOW,"%s cleaning buffer %p\n", (gchar *)user_data, data);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
gst_buffer_unref (GST_BUFFER (data));
}
static void
gst_queue_flush (GstQueue *queue)
gst_queue_locked_flush (GstQueue *queue)
{
g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
(char *) GST_ELEMENT_NAME (queue));
(gpointer) queue);
g_slist_free (queue->queue);
queue->queue = NULL;
@ -257,40 +238,63 @@ gst_queue_flush (GstQueue *queue)
queue->timeval = NULL;
}
static void
gst_queue_flush (GstQueue *queue)
{
g_mutex_lock (queue->qlock);
gst_queue_locked_flush (queue);
g_mutex_unlock (queue->qlock);
}
static void
gst_queue_chain (GstPad *pad, GstBuffer *buf)
{
GstQueue *queue;
const guchar *name;
gboolean reader;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (buf != NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
name = GST_ELEMENT_NAME (queue);
reader = FALSE;
/* we have to lock the queue since we span threads */
// GST_DEBUG (GST_CAT_DATAFLOW,"trying to get lock on queue \"%s\"\n",name);
GST_LOCK (queue);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
g_mutex_lock (queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "buffer has FLUSH bit set, flushing queue\n");
gst_queue_flush (queue);
if (GST_IS_EVENT(buf)) {
GstEvent *event = GST_EVENT(buf);
switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_FLUSH:
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "flushing queue\n");
gst_queue_locked_flush (queue);
break;
default:
break;
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
if (queue->level_buffers >= queue->size_buffers) {
if (queue->level_buffers == queue->size_buffers) {
// if this is a leaky queue...
if (queue->leaky) {
// FIXME don't want to leak events!
// if we leak on the upstream side, drop the current buffer
if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
if (GST_IS_EVENT (buf))
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(buf)));
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
gst_buffer_unref(buf);
// now we have to clean up and exit right away
GST_UNLOCK (queue);
g_mutex_unlock (queue->qlock);
return;
}
// otherwise we have to push a buffer off the other end
@ -300,6 +304,10 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
front = queue->queue;
leakbuf = (GstBuffer *)(front->data);
if (GST_IS_EVENT (leakbuf))
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(leakbuf)));
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
gst_buffer_unref(leakbuf);
@ -308,7 +316,9 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
}
}
while (queue->level_buffers >= queue->size_buffers) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
queue->level_buffers, queue->size_buffers);
while (queue->level_buffers == queue->size_buffers) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@ -316,36 +326,45 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) !=
GST_STATE_VOID_PENDING)
{
GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
GST_UNLOCK(queue);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for space, level is %d\n", queue->level_buffers);
g_cond_signal (queue->emptycond);
g_cond_wait (queue->fullcond, GST_OBJECT(queue)->lock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "done waiting, level is now %d\n", queue->level_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->writer)
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
queue->writer = TRUE;
g_cond_wait (queue->not_full, queue->qlock);
queue->writer = FALSE;
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
queue->level_buffers, queue->size_buffers);
}
/* put the buffer on the tail of the list */
queue->queue = g_slist_append (queue->queue, buf);
queue->level_buffers++;
queue->level_bytes += GST_BUFFER_SIZE(buf);
// GST_DEBUG (GST_CAT_DATAFLOW, "(%s:%s)+\n",GST_DEBUG_PAD_NAME(pad));
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
GST_DEBUG_PAD_NAME(pad),
queue->level_buffers, queue->size_buffers);
/* if we were empty, but aren't any more, signal a condition */
if (queue->level_buffers == 1)
/* reader waiting on an empty queue */
reader = queue->reader;
g_mutex_unlock (queue->qlock);
if (reader)
{
GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling emptycond\n", name);
g_cond_signal (queue->emptycond);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
g_cond_signal (queue->not_empty);
}
GST_UNLOCK (queue);
}
static GstBuffer *
@ -354,7 +373,7 @@ gst_queue_get (GstPad *pad)
GstQueue *queue;
GstBuffer *buf = NULL;
GSList *front;
const guchar *name;
gboolean writer;
g_assert(pad != NULL);
g_assert(GST_IS_PAD(pad));
@ -362,22 +381,16 @@ gst_queue_get (GstPad *pad)
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
name = GST_ELEMENT_NAME (queue);
writer = FALSE;
/* have to lock for thread-safety */
GST_DEBUG (GST_CAT_DATAFLOW,"%s try have queue lock\n", name);
GST_LOCK (queue);
GST_DEBUG (GST_CAT_DATAFLOW,"%s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond);
GST_DEBUG (GST_CAT_DATAFLOW,"%s have queue lock\n", name);
while (!queue->level_buffers) {
if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
GST_DEBUG (GST_CAT_DATAFLOW, "%s U released lock\n", name);
GST_UNLOCK(queue);
// this return NULL shouldn't hurt anything...
return NULL;
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
g_mutex_lock (queue->qlock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
while (queue->level_buffers == 0) {
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING ||
@ -385,37 +398,60 @@ gst_queue_get (GstPad *pad)
GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) !=
GST_STATE_VOID_PENDING)
{
GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
if (GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(queue) != GST_STATE_VOID_PENDING)\n");
if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING)
GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
GST_UNLOCK(queue);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_VOID_PENDING\n");
g_mutex_unlock (queue->qlock);
cothread_switch(cothread_current_main());
}
g_cond_signal (queue->fullcond);
g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->reader)
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
queue->reader = TRUE;
g_cond_wait (queue->not_empty, queue->qlock);
queue->reader = FALSE;
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
front = queue->queue;
buf = (GstBuffer *)(front->data);
GST_DEBUG (GST_CAT_DATAFLOW,"retrieved buffer %p from queue\n",buf);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
queue->queue = g_slist_remove_link (queue->queue, front);
g_slist_free (front);
// if (queue->level_buffers < queue->size_buffers)
if (queue->level_buffers == queue->size_buffers)
{
GST_DEBUG (GST_CAT_DATAFLOW,"%s signalling fullcond\n", name);
g_cond_signal (queue->fullcond);
}
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf);
GST_DEBUG (GST_CAT_DATAFLOW,"(%s:%s)- ",GST_DEBUG_PAD_NAME(pad));
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
GST_DEBUG_PAD_NAME(pad),
queue->level_buffers, queue->size_buffers);
GST_UNLOCK(queue);
/* writer waiting on a full queue */
writer = queue->writer;
g_mutex_unlock (queue->qlock);
if (writer)
{
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
g_cond_signal (queue->not_full);
}
// FIXME where should this be? locked?
if (GST_IS_EVENT(buf)) {
GstEvent *event = GST_EVENT(buf);
switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_EOS:
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue eos\n");
gst_element_set_state (GST_ELEMENT (queue), GST_STATE_PAUSED);
break;
default:
break;
}
}
return buf;
}

View file

@ -75,9 +75,12 @@ struct _GstQueue {
gint leaky; /* whether the queue is leaky, and if so at which end */
// GMutex *lock; (optimization?)
GCond *emptycond;
GCond *fullcond;
GMutex *qlock; /* lock for queue (vs object lock) */
/* we are single reader and single writer queue */
gboolean reader; /* reader waiting on empty queue */
gboolean writer; /* writer waiting on full queue */
GCond *not_empty; /* signals buffers now available for reading */
GCond *not_full; /* signals space now available for writing */
GTimeVal *timeval; /* the timeout for the queue locking */
};

View file

@ -37,7 +37,6 @@ GstElementDetails gst_statistics_details = {
/* Statistics signals and args */
enum {
SIGNAL_UPDATE,
/* FILL ME */
LAST_SIGNAL
};
@ -66,7 +65,7 @@ static void gst_statistics_reset (GstStatistics *statistics);
static void gst_statistics_print (GstStatistics *statistics);
static GstElementClass *parent_class = NULL;
static guint gst_statistics_signals[LAST_SIGNAL] = { 0 };
static guint gst_statistics_signals[LAST_SIGNAL] = { 0, };
static stats zero_stats = { 0, };
@ -131,7 +130,7 @@ gst_statistics_class_init (GstStatisticsClass *klass)
gst_statistics_signals[SIGNAL_UPDATE] =
g_signal_new ("update", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstStatisticsClass, update), NULL, NULL,
g_cclosure_marshal_VOID__POINTER, G_TYPE_NONE, 0);
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_statistics_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_statistics_get_property);
@ -283,10 +282,12 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf)
if (GST_IS_EVENT(buf)) {
GstEvent *event = GST_EVENT (buf);
gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
statistics->stats.events += 1;
if (statistics->update_on_eos && (GST_EVENT_TYPE(event) == GST_EVENT_EOS)) {
update = TRUE;
if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
gst_element_set_state (GST_ELEMENT (statistics), GST_STATE_PAUSED);
if (statistics->update_on_eos) {
update = TRUE;
}
}
if (statistics->update_freq.events) {
statistics->update_count.events += 1;
@ -317,7 +318,9 @@ gst_statistics_chain (GstPad *pad, GstBuffer *buf)
if (update) {
if (statistics->update) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "pre update emit\n");
g_signal_emit (G_OBJECT (statistics), gst_statistics_signals[SIGNAL_UPDATE], 0);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, statistics, "post update emit\n");
}
if (!statistics->silent) {
gst_statistics_print(statistics);