atomicqueue: add refcounting and docs

This commit is contained in:
Wim Taymans 2011-02-16 16:19:46 +01:00
parent ebb37b48d8
commit 905fea098f
4 changed files with 125 additions and 3 deletions

View file

@ -57,6 +57,7 @@ Windows. It is released under the GNU Library General Public License
</para> </para>
<xi:include href="xml/gst.xml" /> <xi:include href="xml/gst.xml" />
<xi:include href="xml/gstatomicqueue.xml" />
<xi:include href="xml/gstbin.xml" /> <xi:include href="xml/gstbin.xml" />
<xi:include href="xml/gstbuffer.xml" /> <xi:include href="xml/gstbuffer.xml" />
<xi:include href="xml/gstbufferlist.xml" /> <xi:include href="xml/gstbufferlist.xml" />

View file

@ -38,6 +38,22 @@ GstPluginLoader
GstPluginLoaderFuncs GstPluginLoaderFuncs
</SECTION> </SECTION>
<SECTION>
<FILE>gstatomicqueue</FILE>
<TITLE>GstAtomicQueue</TITLE>
GstAtomicQueue
gst_atomic_queue_new
gst_atomic_queue_ref
gst_atomic_queue_unref
gst_atomic_queue_push
gst_atomic_queue_peek
gst_atomic_queue_pop
gst_atomic_queue_length
</SECTION>
<SECTION> <SECTION>
<FILE>gstbin</FILE> <FILE>gstbin</FILE>
<TITLE>GstBin</TITLE> <TITLE>GstBin</TITLE>

View file

@ -21,9 +21,23 @@
*/ */
#include <string.h> #include <string.h>
#include "gst_private.h"
#include <gst/gst.h> #include <gst/gst.h>
#include "gstatomicqueue.h" #include "gstatomicqueue.h"
/**
* SECTION:gstatomicqueue
* @title: GstAtomicQueue
* @short_description: An atomic queue implementation
*
* The #GstAtomicQueue object implements a queue that can be used from multiple
* threads without performing any blocking operations.
*
* Since: 0.10.33
*/
/* By default the queue uses 2 * sizeof(gpointer) * clp2 (max_items) of /* By default the queue uses 2 * sizeof(gpointer) * clp2 (max_items) of
* memory. clp2(x) is the next power of two >= than x. * memory. clp2(x) is the next power of two >= than x.
* *
@ -84,6 +98,7 @@ free_queue_mem (GstAQueueMem * mem)
struct _GstAtomicQueue struct _GstAtomicQueue
{ {
volatile gint refcount;
#ifdef LOW_MEM #ifdef LOW_MEM
gint num_readers; gint num_readers;
#endif #endif
@ -123,6 +138,17 @@ clear_free_list (GstAtomicQueue * queue)
} }
} }
/**
* gst_atomic_queue_new:
* @initial_size: initial queue size
*
* Create a new atomic queue instance. @initial_size will be rounded up to the
* nearest power of 2 and used as the initial size of the queue.
*
* Returns: a new #GstAtomicQueue
*
* Since: 0.10.33
*/
GstAtomicQueue * GstAtomicQueue *
gst_atomic_queue_new (guint initial_size) gst_atomic_queue_new (guint initial_size)
{ {
@ -130,6 +156,7 @@ gst_atomic_queue_new (guint initial_size)
queue = g_new (GstAtomicQueue, 1); queue = g_new (GstAtomicQueue, 1);
queue->refcount = 1;
#ifdef LOW_MEM #ifdef LOW_MEM
queue->num_readers = 0; queue->num_readers = 0;
#endif #endif
@ -139,7 +166,21 @@ gst_atomic_queue_new (guint initial_size)
return queue; return queue;
} }
/**
* gst_atomic_queue_ref:
* @queue: a #GstAtomicQueue
*
* Increase the refcount of @queue.
*/
void void
gst_atomic_queue_ref (GstAtomicQueue * queue)
{
g_return_if_fail (queue != NULL);
g_atomic_int_inc (&queue->refcount);
}
static void
gst_atomic_queue_free (GstAtomicQueue * queue) gst_atomic_queue_free (GstAtomicQueue * queue)
{ {
free_queue_mem (queue->head_mem); free_queue_mem (queue->head_mem);
@ -149,12 +190,37 @@ gst_atomic_queue_free (GstAtomicQueue * queue)
g_free (queue); g_free (queue);
} }
/**
* gst_atomic_queue_unref:
* @queue: a #GstAtomicQueue
*
* Unref @queue and free the memory when the refcount reaches 0.
*/
void
gst_atomic_queue_unref (GstAtomicQueue * queue)
{
g_return_if_fail (queue != NULL);
if (g_atomic_int_dec_and_test (&queue->refcount))
gst_atomic_queue_free (queue);
}
/**
* gst_atomic_queue_peek:
* @queue: a #GstAtomicQueue
*
* Peek the head element of the queue without removing it from the queue.
*
* Returns: the head element of @queue or NULL when the queue is empty.
*/
gpointer gpointer
gst_atomic_queue_peek (GstAtomicQueue * queue) gst_atomic_queue_peek (GstAtomicQueue * queue)
{ {
GstAQueueMem *head_mem; GstAQueueMem *head_mem;
gint head, tail, size; gint head, tail, size;
g_return_val_if_fail (queue != NULL, NULL);
while (TRUE) { while (TRUE) {
GstAQueueMem *next; GstAQueueMem *next;
@ -188,6 +254,14 @@ gst_atomic_queue_peek (GstAtomicQueue * queue)
return head_mem->array[head & size]; return head_mem->array[head & size];
} }
/**
* gst_atomic_queue_pop:
* @queue: a #GstAtomicQueue
*
* Get the head element of the queue.
*
* Returns: the head element of @queue or NULL when the queue is empty.
*/
gpointer gpointer
gst_atomic_queue_pop (GstAtomicQueue * queue) gst_atomic_queue_pop (GstAtomicQueue * queue)
{ {
@ -195,6 +269,8 @@ gst_atomic_queue_pop (GstAtomicQueue * queue)
GstAQueueMem *head_mem; GstAQueueMem *head_mem;
gint head, tail, size; gint head, tail, size;
g_return_val_if_fail (queue != NULL, NULL);
#ifdef LOW_MEM #ifdef LOW_MEM
g_atomic_int_inc (&queue->num_readers); g_atomic_int_inc (&queue->num_readers);
#endif #endif
@ -244,12 +320,21 @@ gst_atomic_queue_pop (GstAtomicQueue * queue)
return ret; return ret;
} }
/**
* gst_atomic_queue_push:
* @queue: a #GstAtomicQueue
* @data: the data
*
* Append @data to the tail of the queue.
*/
void void
gst_atomic_queue_push (GstAtomicQueue * queue, gpointer data) gst_atomic_queue_push (GstAtomicQueue * queue, gpointer data)
{ {
GstAQueueMem *tail_mem; GstAQueueMem *tail_mem;
gint head, tail, size; gint head, tail, size;
g_return_if_fail (queue != NULL);
do { do {
while (TRUE) { while (TRUE) {
GstAQueueMem *mem; GstAQueueMem *mem;
@ -286,12 +371,23 @@ gst_atomic_queue_push (GstAtomicQueue * queue, gpointer data)
tail_mem->array[tail & size] = data; tail_mem->array[tail & size] = data;
} }
/**
* gst_atomic_queue_length:
* @queue: a #GstAtomicQueue
* @data: the data
*
* Get the amount of items in the queue.
*
* Returns: the number of elements in the queue.
*/
guint guint
gst_atomic_queue_length (GstAtomicQueue * queue) gst_atomic_queue_length (GstAtomicQueue * queue)
{ {
GstAQueueMem *head_mem, *tail_mem; GstAQueueMem *head_mem, *tail_mem;
gint head, tail; gint head, tail;
g_return_val_if_fail (queue != NULL, 0);
#ifdef LOW_MEM #ifdef LOW_MEM
g_atomic_int_inc (&queue->num_readers); g_atomic_int_inc (&queue->num_readers);
#endif #endif

View file

@ -25,12 +25,21 @@
#ifndef __GST_ATOMIC_QUEUE_H__ #ifndef __GST_ATOMIC_QUEUE_H__
#define __GST_ATOMIC_QUEUE_H__ #define __GST_ATOMIC_QUEUE_H__
typedef struct _GstAtomicQueue GstAtomicQueue;
G_BEGIN_DECLS G_BEGIN_DECLS
/**
* GstAtomicQueue:
* Opaque atomic data queue.
*
* Use the acessor functions to get the stored values.
*/
typedef struct _GstAtomicQueue GstAtomicQueue;
GstAtomicQueue * gst_atomic_queue_new (guint initial_size); GstAtomicQueue * gst_atomic_queue_new (guint initial_size);
void gst_atomic_queue_free (GstAtomicQueue * queue);
void gst_atomic_queue_ref (GstAtomicQueue * queue);
void gst_atomic_queue_unref (GstAtomicQueue * queue);
void gst_atomic_queue_push (GstAtomicQueue* queue, gpointer data); void gst_atomic_queue_push (GstAtomicQueue* queue, gpointer data);
gpointer gst_atomic_queue_pop (GstAtomicQueue* queue); gpointer gst_atomic_queue_pop (GstAtomicQueue* queue);