- Removed bufferpool code and move that to gstbuffer.c

Original commit message from CVS:
- Removed bufferpool code and move that to gstbuffer.c
- implemented refcounting on GstData
- implemented new buffer code based on Company's work in the EVENTS2 branch
- added boxed types for GstData/GstEvent/GstBuffer/GstBufferPool
- added refcounting to bufferpools and events
- use lockfree allocation for buffers
- simplified the clock, use lockfree allocation
- use GQueue in GstQueue for faster access to the tail element
- update core plugins to the new event API
This commit is contained in:
Wim Taymans 2002-07-08 19:22:02 +00:00
parent 74b6b732f1
commit fcb10a6c85
26 changed files with 761 additions and 1307 deletions

View file

@ -55,15 +55,16 @@ libgstreamer_la_SOURCES = \
$(GST_AUTOPLUG_SRC) \
gstbin.c \
gstbuffer.c \
gstbufferpool.c \
gstcaps.c \
gstclock.c \
gstcpu.c \
gstdata.c \
gstelement.c \
gstelementfactory.c \
gstevent.c \
gstextratypes.c \
gstinfo.c \
gstmemchunk.c \
gstpad.c \
gstpipeline.c \
gstplugin.c \
@ -125,6 +126,7 @@ distclean-local:
libgstreamerincludedir = $(includedir)/gstreamer-@VERSION@/gst
libgstreamerinclude_HEADERS = \
gst.h \
gstatomic.h \
gstconfig.h \
gstmarshal.h \
gstenumtypes.h \
@ -133,7 +135,6 @@ libgstreamerinclude_HEADERS = \
gstautoplug.h \
gstbin.h \
gstbuffer.h \
gstbufferpool.h \
gstcaps.h \
gstclock.h \
gstcpu.h \
@ -144,6 +145,7 @@ libgstreamerinclude_HEADERS = \
gstformat.h \
gstinfo.h \
gstlog.h \
gstmemchunk.h \
gstpad.h \
gstpipeline.h \
gstplugin.h \

View file

@ -266,8 +266,6 @@ gst_fakesink_chain (GstPad *pad, GstBuffer *buf)
gst_pad_event_default (pad, event);
break;
}
gst_event_free (event);
return;
}

View file

@ -323,7 +323,6 @@ gst_fakesrc_event_handler (GstPad *pad, GstEvent *event)
src->buffer_count = GST_EVENT_SEEK_OFFSET (event);
if (!GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
gst_event_free (event);
break;
}
/* else we do a flush too */
@ -333,6 +332,7 @@ gst_fakesrc_event_handler (GstPad *pad, GstEvent *event)
default:
break;
}
gst_event_unref (event);
return TRUE;
}
@ -680,8 +680,8 @@ gst_fakesrc_get(GstPad *pad)
if (src->last_message)
g_free (src->last_message);
src->last_message = g_strdup_printf ("get ******* (%s:%s)> (%d bytes, %llu)",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
src->last_message = g_strdup_printf ("get ******* (%s:%s)> (%d bytes, %llu) %p",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf), buf);
g_object_notify (G_OBJECT (src), "last_message");
}

View file

@ -304,7 +304,7 @@ gst_filesink_handle_event (GstPad *pad, GstEvent *event)
if (gst_event_discont_get_value (event, GST_FORMAT_BYTES, &offset))
fseek(filesink->file, offset, SEEK_SET);
gst_event_free (event);
gst_event_unref (event);
break;
}
case GST_EVENT_NEW_MEDIA:

View file

@ -337,6 +337,10 @@ gst_filesrc_free_parent_mmap (GstBuffer *buf)
#endif
/* now unmap the memory */
munmap(GST_BUFFER_DATA(buf),GST_BUFFER_MAXSIZE(buf));
GST_BUFFER_DATA (buf) = NULL;
_gst_buffer_free (buf);
}
static GstBuffer *
@ -371,7 +375,7 @@ gst_filesrc_map_region (GstFileSrc *src, off_t offset, size_t size)
GST_BUFFER_OFFSET(buf) = offset;
GST_BUFFER_TIMESTAMP(buf) = -1LL;
GST_BUFFER_POOL_PRIVATE(buf) = src;
GST_BUFFER_FREE_FUNC(buf) = gst_filesrc_free_parent_mmap;
GST_BUFFER_FREE_FUNC(buf) = (GstDataFreeFunction) gst_filesrc_free_parent_mmap;
g_mutex_lock(src->map_regions_lock);
g_tree_insert(src->map_regions,buf,buf);
@ -716,7 +720,7 @@ gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event)
guint64 offset;
if (GST_EVENT_SEEK_FORMAT (event) != GST_FORMAT_BYTES) {
return FALSE;
goto error;
}
offset = GST_EVENT_SEEK_OFFSET (event);
@ -724,24 +728,24 @@ gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event)
switch (GST_EVENT_SEEK_METHOD (event)) {
case GST_SEEK_METHOD_SET:
if (offset > src->filelen)
return FALSE;
goto error;
src->curoffset = offset;
GST_DEBUG(0, "seek set pending to %lld", src->curoffset);
break;
case GST_SEEK_METHOD_CUR:
if (offset + src->curoffset > src->filelen)
return FALSE;
goto error;
src->curoffset += offset;
GST_DEBUG(0, "seek cur pending to %lld", src->curoffset);
break;
case GST_SEEK_METHOD_END:
if (ABS (offset) > src->filelen)
return FALSE;
goto error;
src->curoffset = src->filelen - ABS (offset);
GST_DEBUG(0, "seek end pending to %lld", src->curoffset);
break;
default:
return FALSE;
goto error;
break;
}
g_object_notify (G_OBJECT (src), "offset");
@ -751,7 +755,7 @@ gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event)
}
case GST_EVENT_SIZE:
if (GST_EVENT_SIZE_FORMAT (event) != GST_FORMAT_BYTES) {
return FALSE;
goto error;
}
src->block_size = GST_EVENT_SIZE_VALUE (event);
g_object_notify (G_OBJECT (src), "blocksize");
@ -760,9 +764,13 @@ gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event)
src->need_flush = TRUE;
break;
default:
return FALSE;
goto error;
break;
}
gst_event_unref (event);
return TRUE;
error:
gst_event_unref (event);
return FALSE;
}

View file

@ -377,7 +377,6 @@ init_post (void)
_gst_plugin_initialize ();
_gst_event_initialize ();
_gst_buffer_initialize ();
_gst_buffer_pool_initialize ();
if (!_gst_registry_fixed) {
/* don't override command-line options */

View file

@ -24,44 +24,35 @@
#define GST_DEBUG_FORCE_DISABLE
#include "gst_private.h"
#include "gstdata_private.h"
#include "gstbuffer.h"
/* #define MEMPROF */
#include "gstmemchunk.h"
#include "gstlog.h"
GType _gst_buffer_type;
GType _gst_buffer_pool_type;
static GMemChunk *_gst_buffer_chunk;
static GMutex *_gst_buffer_chunk_lock;
static gint _gst_buffer_live;
static gint _gst_buffer_pool_live;
static GstMemChunk *chunk;
void
_gst_buffer_initialize (void)
{
int buffersize = sizeof(GstBuffer);
static const GTypeInfo buffer_info = {
0, /* sizeof(class), */
NULL,
NULL,
NULL,
NULL,
NULL,
0, /* sizeof(object), */
0,
NULL,
NULL,
};
_gst_buffer_type = g_boxed_type_register_static ("GstBuffer",
(GBoxedCopyFunc) gst_data_ref,
(GBoxedFreeFunc) gst_data_unref);
/* round up to the nearest 32 bytes for cache-line and other efficiencies */
buffersize = (((buffersize-1) / 32) + 1) * 32;
_gst_buffer_chunk = g_mem_chunk_new ("GstBuffer", buffersize,
buffersize * 32, G_ALLOC_AND_FREE);
_gst_buffer_chunk_lock = g_mutex_new ();
_gst_buffer_type = g_type_register_static (G_TYPE_INT, "GstBuffer", &buffer_info, 0);
_gst_buffer_pool_type = g_boxed_type_register_static ("GstBufferPool",
(GBoxedCopyFunc) gst_data_ref,
(GBoxedFreeFunc) gst_data_unref);
_gst_buffer_live = 0;
chunk = gst_mem_chunk_new ("GstBufferChunk", sizeof (GstBuffer), sizeof (GstBuffer) * 200, 0);
GST_INFO (GST_CAT_BUFFER, "Buffers are initialized now");
}
/**
@ -74,51 +65,140 @@ gst_buffer_print_stats (void)
{
g_log (g_log_domain_gstreamer, G_LOG_LEVEL_INFO,
"%d live buffer(s)", _gst_buffer_live);
g_log (g_log_domain_gstreamer, G_LOG_LEVEL_INFO,
"%d live bufferpool(s)", _gst_buffer_pool_live);
}
static void
_gst_buffer_free_to_pool (GstBuffer *buffer)
{
GstBufferPool *pool = buffer->pool;
buffer->pool->buffer_free (buffer->pool, buffer, buffer->pool->user_data);
gst_data_unref (GST_DATA (pool));
}
static void
_gst_buffer_sub_free (GstBuffer *buffer)
{
gst_data_unref (GST_DATA (buffer->pool_private));
GST_BUFFER_DATA (buffer) = NULL;
GST_BUFFER_SIZE (buffer) = 0;
_GST_DATA_DISPOSE (GST_DATA (buffer));
gst_mem_chunk_free (chunk, GST_DATA (buffer));
_gst_buffer_live--;
}
/**
* _gst_buffer_free:
* @buffer: a #GstBuffer to free
*
* Free the momory associated with the buffer including the buffer data,
* unless the GST_BUFFER_DONTFREE flags was set or the buffer data is NULL.
* This function is used by bufferpools.
*
* Returns: new #GstBuffer
*/
void
_gst_buffer_free (GstBuffer *buffer)
{
/* free our data */
if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_DONTFREE) && GST_BUFFER_DATA (buffer))
g_free (GST_BUFFER_DATA (buffer));
/* set to safe values */
GST_BUFFER_DATA (buffer) = NULL;
GST_BUFFER_SIZE (buffer) = 0;
_GST_DATA_DISPOSE (GST_DATA (buffer));
gst_mem_chunk_free (chunk, GST_DATA (buffer));
_gst_buffer_live--;
}
/**
* _gst_buffer_copy:
* @buffer: a #GstBuffer to make a copy of
*
* Make a full newly allocated copy of the given buffer, data and all.
* This function is used by bufferpools.
*
* Returns: new #GstBuffer
*/
GstBuffer*
_gst_buffer_copy (GstBuffer *buffer)
{
GstBuffer *copy;
/* create a fresh new buffer */
copy = gst_buffer_new ();
/* we simply copy everything from our parent */
GST_BUFFER_DATA (copy) = g_memdup (GST_BUFFER_DATA (buffer), GST_BUFFER_SIZE (buffer));
GST_BUFFER_SIZE (copy) = GST_BUFFER_SIZE (buffer);
GST_BUFFER_MAXSIZE (copy) = GST_BUFFER_MAXSIZE (buffer);
GST_BUFFER_TIMESTAMP (copy) = GST_BUFFER_TIMESTAMP (buffer);
GST_BUFFER_OFFSET (copy) = GST_BUFFER_OFFSET (buffer);
return copy;
}
static GstBuffer*
_gst_buffer_copy_to_pool (GstBuffer *buffer)
{
return buffer->pool->buffer_copy (buffer->pool, buffer, buffer->pool->user_data);
}
/**
* gst_buffer_new:
*
* Creates a newly allocated buffer.
* Creates a newly allocated buffer without any data.
*
* Returns: new #GstBuffer
*/
GstBuffer*
gst_buffer_new (void)
{
GstBuffer *buffer;
GstBuffer *new;
g_mutex_lock (_gst_buffer_chunk_lock);
#ifdef MEMPROF
buffer = g_new0 (GstBuffer, 1);
#else
buffer = g_mem_chunk_alloc (_gst_buffer_chunk);
#endif
new = gst_mem_chunk_alloc0 (chunk);
_gst_buffer_live++;
g_mutex_unlock (_gst_buffer_chunk_lock);
GST_INFO (GST_CAT_BUFFER,"creating new buffer %p",buffer);
GST_DATA_TYPE(buffer) = _gst_buffer_type;
_GST_DATA_INIT (GST_DATA (new),
_gst_buffer_type,
0,
(GstDataFreeFunction) _gst_buffer_free,
(GstDataCopyFunction) _gst_buffer_copy);
buffer->lock = g_mutex_new ();
#ifdef HAVE_ATOMIC_H
atomic_set (&buffer->refcount, 1);
#else
buffer->refcount = 1;
#endif
buffer->flags = 0;
buffer->data = NULL;
buffer->size = 0;
buffer->maxsize = 0;
buffer->offset = -1;
buffer->timestamp = 0;
buffer->parent = NULL;
buffer->pool = NULL;
buffer->pool_private = NULL;
buffer->free = NULL;
buffer->copy = NULL;
GST_BUFFER_BUFFERPOOL (new) = NULL;
GST_BUFFER_POOL_PRIVATE (new) = NULL;
return buffer;
return new;
}
/**
* gst_buffer_new_and_alloc:
*
* Creates a newly allocated buffer with data of the given size.
*
* Returns: new #GstBuffer
*/
GstBuffer*
gst_buffer_new_and_alloc (guint size)
{
GstBuffer *new;
new = gst_buffer_new ();
GST_BUFFER_DATA (new) = g_malloc (size);
GST_BUFFER_SIZE (new) = size;
return new;
}
/**
@ -132,20 +212,22 @@ gst_buffer_new (void)
* Returns: new #GstBuffer
*/
GstBuffer*
gst_buffer_new_from_pool (GstBufferPool *pool, guint32 offset, guint32 size)
gst_buffer_new_from_pool (GstBufferPool *pool, guint64 offset, guint size)
{
GstBuffer *buffer;
g_return_val_if_fail (pool != NULL, NULL);
g_return_val_if_fail (pool->buffer_new != NULL, NULL);
buffer = pool->buffer_new (pool, offset, size, pool->user_data);
buffer->pool = pool;
buffer->free = pool->buffer_free;
buffer->copy = pool->buffer_copy;
if (!buffer)
return NULL;
GST_INFO (GST_CAT_BUFFER,"creating new buffer %p from pool %p (size %x, offset %x)",
buffer, pool, size, offset);
GST_BUFFER_BUFFERPOOL (buffer) = pool;
gst_data_ref (GST_DATA (pool));
/* override the buffer refcount functions with those from the pool (if any) */
if (pool->buffer_free)
GST_DATA (buffer)->free = (GstDataFreeFunction) _gst_buffer_free_to_pool;
if (pool->buffer_copy)
GST_DATA (buffer)->copy = (GstDataCopyFunction) _gst_buffer_copy_to_pool;
return buffer;
}
@ -162,295 +244,77 @@ gst_buffer_new_from_pool (GstBufferPool *pool, guint32 offset, guint32 size)
* Returns: a new #GstBuffer
*/
GstBuffer*
gst_buffer_create_sub (GstBuffer *parent,
guint32 offset,
guint32 size)
gst_buffer_create_sub (GstBuffer *parent, guint offset, guint size)
{
GstBuffer *buffer;
gpointer buffer_data;
guint64 parent_offset;
g_return_val_if_fail (parent != NULL, NULL);
g_return_val_if_fail (GST_BUFFER_REFCOUNT(parent) > 0, NULL);
g_return_val_if_fail (GST_BUFFER_REFCOUNT_VALUE (parent) > 0, NULL);
g_return_val_if_fail (size > 0, NULL);
g_return_val_if_fail ((offset+size) <= parent->size, NULL);
g_return_val_if_fail (parent->size >= offset + size, NULL);
g_mutex_lock (_gst_buffer_chunk_lock);
#ifdef MEMPROF
buffer = g_new0 (GstBuffer, 1);
#else
buffer = g_mem_chunk_alloc (_gst_buffer_chunk);
#endif
/* remember the data for the new buffer */
buffer_data = parent->data + offset;
parent_offset = GST_BUFFER_OFFSET (parent);
/* make sure we're child not child from a child buffer */
while (GST_BUFFER_FLAG_IS_SET (parent, GST_BUFFER_SUBBUFFER)) {
parent = GST_BUFFER (parent->pool_private);
}
/* ref the real parent */
gst_data_ref (GST_DATA (parent));
/* make sure nobody overwrites data in the parent */
if (!GST_DATA_IS_READONLY (parent))
GST_DATA_FLAG_SET(parent, GST_DATA_READONLY);
/* create the new buffer */
buffer = gst_mem_chunk_alloc0 (chunk);
_gst_buffer_live++;
g_mutex_unlock (_gst_buffer_chunk_lock);
GST_INFO (GST_CAT_BUFFER,"creating new subbuffer %p from parent %p (size %u, offset %u)",
buffer, parent, size, offset);
GST_DATA_TYPE(buffer) = _gst_buffer_type;
buffer->lock = g_mutex_new ();
#ifdef HAVE_ATOMIC_H
atomic_set (&buffer->refcount, 1);
#else
buffer->refcount = 1;
#endif
/* make sure nobody overwrites data in the new buffer by setting the READONLY flag */
_GST_DATA_INIT (GST_DATA (buffer),
_gst_buffer_type,
GST_DATA_FLAG_SHIFT (GST_BUFFER_SUBBUFFER) |
GST_DATA_FLAG_SHIFT (GST_DATA_READONLY),
(GstDataFreeFunction) _gst_buffer_sub_free,
(GstDataCopyFunction) _gst_buffer_copy);
/* copy flags and type from parent, for lack of better */
buffer->flags = parent->flags;
GST_BUFFER_OFFSET (buffer) = parent_offset + offset;
GST_BUFFER_TIMESTAMP (buffer) = -1;
GST_BUFFER_BUFFERPOOL (buffer) = NULL;
GST_BUFFER_POOL_PRIVATE (buffer) = parent;
/* set the data pointer, size, offset, and maxsize */
buffer->data = parent->data + offset;
/* set the right values in the child */
buffer->data = buffer_data;
buffer->size = size;
buffer->maxsize = parent->size - offset;
/* deal with bogus/unknown offsets */
if (parent->offset != (guint32)-1)
buffer->offset = parent->offset + offset;
else
buffer->offset = (guint32)-1;
/* again, for lack of better, copy parent's timestamp */
buffer->timestamp = parent->timestamp;
buffer->maxage = parent->maxage;
/* if the parent buffer is a subbuffer itself, use its parent, a real buffer */
if (parent->parent != NULL)
parent = parent->parent;
/* set parentage and reference the parent */
buffer->parent = parent;
gst_buffer_ref (parent);
buffer->pool = NULL;
return buffer;
}
/* FIXME FIXME: how does this overlap with the newly-added gst_buffer_span() ??? */
/**
* gst_buffer_append:
* @first: #GstBuffer to append to
* @second: #GstBuffer to append
* gst_buffer_merge:
* @buf1: first source #GstBuffer to merge
* @buf2: second source #GstBuffer to merge
*
* Creates a new buffer by appending the data of second to the
* existing data of first. This will grow first if first is unused elsewhere,
* or create a newly allocated buffer if it is in use.
* second will not be changed.
* Create a new buffer that is the concatenation of the two source
* buffers. The original source buffers will not be modified or
* unref'd.
*
* Returns: a new #GstBuffer
* Internally is nothing more than a specialized gst_buffer_span,
* so the same optimizations can occur.
*
* Returns: a new #GstBuffer that's the concatenation of the source buffers
*/
GstBuffer*
gst_buffer_append (GstBuffer *first,
GstBuffer *second)
gst_buffer_merge (GstBuffer *buf1, GstBuffer *buf2)
{
guint size;
GstBuffer *newbuf = NULL;
GstBuffer *buffer = NULL;
GstBuffer *result;
/* we're just a specific case of the more general gst_buffer_span() */
result = gst_buffer_span (buf1, 0, buf2, buf1->size + buf2->size);
g_return_val_if_fail (first != NULL, NULL);
g_return_val_if_fail (second != NULL, NULL);
g_return_val_if_fail (first->pool == NULL, NULL);
g_return_val_if_fail (GST_BUFFER_REFCOUNT (first) > 0, NULL);
g_return_val_if_fail (GST_BUFFER_REFCOUNT (second) > 0, NULL);
GST_INFO (GST_CAT_BUFFER,"appending buffers %p and %p",first, second);
GST_BUFFER_LOCK (first);
/* is the buffer used by anyone else ? */
if (GST_BUFFER_REFCOUNT (first) == 1 && first->parent == NULL
&& !GST_BUFFER_FLAG_IS_SET (first, GST_BUFFER_DONTFREE)) {
/* it's not, so we can realloc and expand the first buffer,
* filling it with the second's data */
size = first->size;
first->size += second->size;
first->data = g_realloc (first->data, first->size);
memcpy(first->data + size, second->data, second->size);
GST_BUFFER_UNLOCK (first);
buffer = first;
}
else {
/* the buffer is used, create a new one */
newbuf = gst_buffer_new ();
newbuf->size = first->size + second->size;
newbuf->data = g_malloc (newbuf->size);
memcpy (newbuf->data, first->data, first->size);
memcpy (newbuf->data + first->size, second->data, second->size);
GST_BUFFER_TIMESTAMP (newbuf) = GST_BUFFER_TIMESTAMP (first);
GST_BUFFER_UNLOCK (first);
gst_buffer_unref (first);
buffer = newbuf;
}
return buffer;
}
/**
* gst_buffer_destroy:
* @buffer: #GstBuffer to destroy
*
* Destroys the buffer. Actual data will be retained if DONTFREE is set.
*/
void
gst_buffer_destroy (GstBuffer *buffer)
{
g_return_if_fail (buffer != NULL);
GST_INFO (GST_CAT_BUFFER, "freeing %sbuffer %p",
(buffer->parent?"sub":""),
buffer);
/* free the data only if there is some, DONTFREE isn't set, and not sub */
if (GST_BUFFER_DATA (buffer) &&
!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_DONTFREE) &&
(buffer->parent == NULL)) {
/* if there's a free function, use it */
if (buffer->free != NULL) {
(buffer->free)(buffer);
} else {
g_free (GST_BUFFER_DATA (buffer));
}
}
/* unreference the parent if there is one */
if (buffer->parent != NULL)
gst_buffer_unref (buffer->parent);
g_mutex_free (buffer->lock);
/* g_print("freed mutex\n"); */
#ifdef GST_DEBUG_ENABLED
/* make it hard to reuse by mistake */
memset (buffer, 0, sizeof (GstBuffer));
#endif
/* remove it entirely from memory */
g_mutex_lock (_gst_buffer_chunk_lock);
#ifdef MEMPROF
g_free (buffer);
#else
g_mem_chunk_free (_gst_buffer_chunk,buffer);
#endif
_gst_buffer_live--;
g_mutex_unlock (_gst_buffer_chunk_lock);
}
/**
* gst_buffer_ref:
* @buffer: a #GstBuffer to reference
*
* Increments the reference count of this buffer.
*/
void
gst_buffer_ref (GstBuffer *buffer)
{
g_return_if_fail (buffer != NULL);
GST_INFO (GST_CAT_BUFFER, "ref buffer %p, current count is %d", buffer,GST_BUFFER_REFCOUNT(buffer));
g_return_if_fail (GST_BUFFER_REFCOUNT(buffer) > 0);
#ifdef HAVE_ATOMIC_H
atomic_inc (&(buffer->refcount));
#else
GST_BUFFER_LOCK (buffer);
buffer->refcount++;
GST_BUFFER_UNLOCK (buffer);
#endif
}
/**
* gst_buffer_ref_by_count:
* @buffer: a #GstBuffer to reference
* @count: the number to increment the reference count by
*
* Increments the reference count of this buffer by the given number.
*/
void
gst_buffer_ref_by_count (GstBuffer *buffer, gint count)
{
g_return_if_fail (buffer != NULL);
g_return_if_fail (count >= 0);
#ifdef HAVE_ATOMIC_H
g_return_if_fail (atomic_read (&(buffer->refcount)) > 0);
atomic_add (count, &(buffer->refcount));
#else
g_return_if_fail (buffer->refcount > 0);
GST_BUFFER_LOCK (buffer);
buffer->refcount += count;
GST_BUFFER_UNLOCK (buffer);
#endif
}
/**
* gst_buffer_unref:
* @buffer: a #GstBuffer to unreference
*
* Decrements the refcount of this buffer. If the refcount is
* zero, the buffer will be destroyed.
*/
void
gst_buffer_unref (GstBuffer *buffer)
{
gint zero;
g_return_if_fail (buffer != NULL);
GST_INFO (GST_CAT_BUFFER, "unref buffer %p, current count is %d", buffer,GST_BUFFER_REFCOUNT(buffer));
g_return_if_fail (GST_BUFFER_REFCOUNT(buffer) > 0);
#ifdef HAVE_ATOMIC_H
zero = atomic_dec_and_test (&(buffer->refcount));
#else
GST_BUFFER_LOCK (buffer);
buffer->refcount--;
zero = (buffer->refcount == 0);
GST_BUFFER_UNLOCK (buffer);
#endif
/* if we ended up with the refcount at zero, destroy the buffer */
if (zero) {
gst_buffer_destroy (buffer);
}
}
/**
* gst_buffer_copy:
* @buffer: a #GstBuffer to make a copy of
*
* Make a full newly allocated copy of the given buffer, data and all.
*
* Returns: new #GstBuffer
*/
GstBuffer *
gst_buffer_copy (GstBuffer *buffer)
{
GstBuffer *newbuf;
g_return_val_if_fail (GST_BUFFER_REFCOUNT(buffer) > 0, NULL);
/* if a copy function exists, use it, else copy the bytes */
if (buffer->copy != NULL) {
newbuf = (buffer->copy)(buffer);
} else {
/* allocate a new buffer */
newbuf = gst_buffer_new();
/* copy the absolute size */
newbuf->size = buffer->size;
/* allocate space for the copy */
newbuf->data = (guchar *)g_malloc (buffer->size);
/* copy the data straight across */
memcpy(newbuf->data,buffer->data,buffer->size);
/* the new maxsize is the same as the size, since we just malloc'd it */
newbuf->maxsize = newbuf->size;
}
newbuf->offset = buffer->offset;
newbuf->timestamp = buffer->timestamp;
newbuf->maxage = buffer->maxage;
/* since we just created a new buffer, so we have no ties to old stuff */
newbuf->parent = NULL;
newbuf->pool = NULL;
return newbuf;
return result;
}
/**
@ -466,15 +330,16 @@ gst_buffer_copy (GstBuffer *buffer)
gboolean
gst_buffer_is_span_fast (GstBuffer *buf1, GstBuffer *buf2)
{
g_return_val_if_fail (GST_BUFFER_REFCOUNT(buf1) > 0, FALSE);
g_return_val_if_fail (GST_BUFFER_REFCOUNT(buf2) > 0, FALSE);
g_return_val_if_fail (GST_BUFFER_REFCOUNT_VALUE (buf1) > 0, FALSE);
g_return_val_if_fail (GST_BUFFER_REFCOUNT_VALUE (buf2) > 0, FALSE);
return (buf1->parent && buf2->parent &&
(buf1->parent == buf2->parent) &&
/* it's only fast if we have subbuffers of the same parent */
return ((GST_BUFFER_FLAG_IS_SET (buf1, GST_BUFFER_SUBBUFFER)) &&
(GST_BUFFER_FLAG_IS_SET (buf2, GST_BUFFER_SUBBUFFER)) &&
(buf1->pool_private == buf2->pool_private) &&
((buf1->data + buf1->size) == buf2->data));
}
/**
* gst_buffer_span:
* @buf1: first source #GstBuffer to merge
@ -493,76 +358,149 @@ gst_buffer_is_span_fast (GstBuffer *buf1, GstBuffer *buf2)
*
* Returns: a new #GstBuffer that spans the two source buffers
*/
/* FIXME need to think about CoW and such... */
GstBuffer *
GstBuffer*
gst_buffer_span (GstBuffer *buf1, guint32 offset, GstBuffer *buf2, guint32 len)
{
GstBuffer *newbuf;
g_return_val_if_fail (GST_BUFFER_REFCOUNT(buf1) > 0, NULL);
g_return_val_if_fail (GST_BUFFER_REFCOUNT(buf2) > 0, NULL);
/* make sure buf1 has a lower address than buf2 */
if (buf1->data > buf2->data) {
GstBuffer *tmp = buf1;
/* g_print ("swapping buffers\n"); */
buf1 = buf2;
buf2 = tmp;
}
g_return_val_if_fail (GST_BUFFER_REFCOUNT_VALUE (buf1) > 0, NULL);
g_return_val_if_fail (GST_BUFFER_REFCOUNT_VALUE (buf2) > 0, NULL);
g_return_val_if_fail (len > 0, NULL);
/* if the two buffers have the same parent and are adjacent */
if (gst_buffer_is_span_fast(buf1,buf2)) {
GstBuffer *parent = GST_BUFFER (buf1->pool_private);
/* we simply create a subbuffer of the common parent */
newbuf = gst_buffer_create_sub (buf1->parent, buf1->data - (buf1->parent->data) + offset, len);
newbuf = gst_buffer_create_sub (parent, buf1->data - parent->data + offset, len);
}
else {
/* g_print ("slow path taken in buffer_span\n"); */
GST_DEBUG (GST_CAT_BUFFER,"slow path taken while spanning buffers %p and %p", buffer, append);
/* otherwise we simply have to brute-force copy the buffers */
newbuf = gst_buffer_new ();
newbuf = gst_buffer_new_and_alloc (len);
/* copy relevant stuff from data struct of buffer1 */
GST_BUFFER_OFFSET (newbuf) = GST_BUFFER_OFFSET (buf1) + offset;
/* put in new size */
newbuf->size = len;
/* allocate space for the copy */
newbuf->data = (guchar *)g_malloc(len);
/* copy the first buffer's data across */
memcpy(newbuf->data, buf1->data + offset, buf1->size - offset);
memcpy (newbuf->data, buf1->data + offset, buf1->size - offset);
/* copy the second buffer's data across */
memcpy(newbuf->data + (buf1->size - offset), buf2->data, len - (buf1->size - offset));
if (newbuf->offset != (guint32)-1)
newbuf->offset = buf1->offset + offset;
newbuf->timestamp = buf1->timestamp;
if (buf2->maxage > buf1->maxage) newbuf->maxage = buf2->maxage;
else newbuf->maxage = buf1->maxage;
memcpy (newbuf->data + (buf1->size - offset), buf2->data, len - (buf1->size - offset));
}
return newbuf;
}
static void
_gst_buffer_pool_free (GstBufferPool *pool)
{
_gst_data_free (GST_DATA (pool));
_gst_buffer_pool_live--;
}
/**
* gst_buffer_merge:
* @buf1: first source #GstBuffer to merge
* @buf2: second source #GstBuffer to merge
* gst_buffer_pool_new:
* @free: The function to free the bufferpool
* @copy: The function to copy the bufferpool
* @buffer_new: the function to create a new buffer from this pool
* @buffer_copy: the function to copy a buffer from this pool
* @buffer_free: the function to free a buffer in this pool
* @user_data: user data passed to buffer_* functions
*
* Create a new buffer that is the concatenation of the two source
* buffers. The original source buffers will not be modified or
* unref'd.
* Create a new bufferpool with the given functions.
*
* Internally is nothing more than a specialized gst_buffer_span,
* so the same optimizations can occur.
*
* Returns: a new #GstBuffer that's the concatenation of the source buffers
* Returns: a new GstBufferPool or NULL on error.
*/
GstBuffer *
gst_buffer_merge (GstBuffer *buf1, GstBuffer *buf2)
GstBufferPool*
gst_buffer_pool_new (GstDataFreeFunction free,
GstDataCopyFunction copy,
GstBufferPoolBufferNewFunction buffer_new,
GstBufferPoolBufferCopyFunction buffer_copy,
GstBufferPoolBufferFreeFunction buffer_free,
gpointer user_data)
{
GstBuffer *result;
/* we're just a specific case of the more general gst_buffer_span() */
result = gst_buffer_span (buf1, 0, buf2, buf1->size + buf2->size);
GstBufferPool *pool;
GST_BUFFER_TIMESTAMP (result) = GST_BUFFER_TIMESTAMP (buf1);
/* we need at least a buffer_new function */
g_return_val_if_fail (buffer_new != NULL, NULL);
return result;
pool = g_new0 (GstBufferPool, 1);
_gst_buffer_pool_live++;
GST_DEBUG (GST_CAT_BUFFER,"allocating new buffer pool %p\n", pool);
/* init data struct */
_GST_DATA_INIT (GST_DATA (pool),
_gst_buffer_pool_type,
0,
(free ? free : (GstDataFreeFunction) _gst_buffer_pool_free),
copy);
/* set functions */
pool->buffer_new = buffer_new;
pool->buffer_copy = buffer_copy;
pool->buffer_free = buffer_free;
pool->user_data = user_data;
return pool;
}
/**
* gst_buffer_pool_is_active:
* @pool: the pool to query
*
* Query if the geven bufferpool is active.
*
* Returns: TRUE if the pool is active.
*/
gboolean
gst_buffer_pool_is_active (GstBufferPool *pool)
{
return pool->active;
}
/**
* gst_buffer_pool_set_active:
* @pool: the pool to activate
* @active: new status of the pool
*
* Set the given pool to the active or inactive state depending on the
* activate parameter
*/
void
gst_buffer_pool_set_active (GstBufferPool *pool, gboolean active)
{
pool->active = active;
}
/**
* gst_buffer_pool_set_user_data:
* @pool: the pool set the user data for
* @user_data: the user_data to set
*
* Set the given user data to the bufferpool
*/
void
gst_buffer_pool_set_user_data (GstBufferPool *pool, gpointer user_data)
{
pool->user_data = user_data;
}
/**
* gst_buffer_pool_get_user_data:
* @pool: the pool get the user data for
*
* Get the user data of the bufferpool
*
* Returns: the user data associated with this bufferpool
*/
gpointer
gst_buffer_pool_get_user_data (GstBufferPool *pool)
{
return pool->user_data;
}
/* FIXME */
GstBufferPool*
gst_buffer_pool_get_default (guint size, guint numbuffers)
{
return NULL;
}

View file

@ -24,153 +24,150 @@
#ifndef __GST_BUFFER_H__
#define __GST_BUFFER_H__
/*
* Define this to add file:line info to each GstBuffer showing
* the location in the source code where the buffer was created.
*
* #define GST_BUFFER_WHERE
*
* Then in gdb, you can `call gst_buffer_print_live()' to get a list
* of allocated GstBuffers and also the file:line where they were
* allocated.
*/
#include <gst/gstdata.h>
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
G_BEGIN_DECLS
#ifdef HAVE_ATOMIC_H
#include <asm/atomic.h>
#endif
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
typedef struct _GstBuffer GstBuffer;
typedef struct _GstBufferPool GstBufferPool;
extern GType _gst_buffer_type;
extern GType _gst_buffer_pool_type;
#define GST_TYPE_BUFFER (_gst_buffer_type)
#define GST_TYPE_BUFFER_POOL (_gst_buffer_pool_type)
#define GST_BUFFER(buf) ((GstBuffer *)(buf))
#define GST_BUFFER_POOL(pool) ((GstBufferPool *)(pool))
#define GST_IS_BUFFER(buf) (GST_DATA_TYPE(buf) == GST_TYPE_BUFFER)
#define GST_IS_BUFFER_POOL(buf) (GST_DATA_TYPE(buf) == GST_TYPE_BUFFER_POOL)
#define GST_BUFFER_FLAGS(buf) (GST_BUFFER(buf)->flags)
#define GST_BUFFER_FLAG_IS_SET(buf,flag) (GST_BUFFER_FLAGS(buf) & (1<<(flag)))
#define GST_BUFFER_FLAG_SET(buf,flag) G_STMT_START{ (GST_BUFFER_FLAGS(buf) |= (1<<(flag))); }G_STMT_END
#define GST_BUFFER_FLAG_UNSET(buf,flag) G_STMT_START{ (GST_BUFFER_FLAGS(buf) &= ~(1<<(flag))); }G_STMT_END
#define GST_BUFFER_REFCOUNT(buf) GST_DATA_REFCOUNT(buf)
#define GST_BUFFER_REFCOUNT_VALUE(buf) GST_DATA_REFCOUNT_VALUE(buf)
#define GST_BUFFER_COPY_FUNC(buf) GST_DATA_COPY_FUNC(buf)
#define GST_BUFFER_FREE_FUNC(buf) GST_DATA_FREE_FUNC(buf)
#define GST_BUFFER_FLAGS(buf) GST_DATA_FLAGS(buf)
#define GST_BUFFER_FLAG_IS_SET(buf,flag) GST_DATA_FLAG_IS_SET (buf, flag)
#define GST_BUFFER_FLAG_SET(buf,flag) GST_DATA_FLAG_SET (buf, flag)
#define GST_BUFFER_FLAG_UNSET(buf,flag) GST_DATA_FLAG_UNSET (buf, flag)
#define GST_BUFFER_DATA(buf) (GST_BUFFER(buf)->data)
#define GST_BUFFER_SIZE(buf) (GST_BUFFER(buf)->size)
#define GST_BUFFER_OFFSET(buf) (GST_BUFFER(buf)->offset)
#define GST_BUFFER_MAXSIZE(buf) (GST_BUFFER(buf)->maxsize)
#define GST_BUFFER_TIMESTAMP(buf) (GST_BUFFER(buf)->timestamp)
#define GST_BUFFER_MAXAGE(buf) (GST_BUFFER(buf)->maxage)
#define GST_BUFFER_OFFSET(buf) (GST_BUFFER(buf)->offset)
#define GST_BUFFER_BUFFERPOOL(buf) (GST_BUFFER(buf)->pool)
#define GST_BUFFER_PARENT(buf) (GST_BUFFER(buf)->parent)
#define GST_BUFFER_POOL_PRIVATE(buf) (GST_BUFFER(buf)->pool_private)
#define GST_BUFFER_COPY_FUNC(buf) (GST_BUFFER(buf)->copy)
#define GST_BUFFER_FREE_FUNC(buf) (GST_BUFFER(buf)->free)
#define GST_BUFFER_LOCK(buf) (g_mutex_lock(GST_BUFFER(buf)->lock))
#define GST_BUFFER_TRYLOCK(buf) (g_mutex_trylock(GST_BUFFER(buf)->lock))
#define GST_BUFFER_UNLOCK(buf) (g_mutex_unlock(GST_BUFFER(buf)->lock))
typedef enum {
GST_BUFFER_READONLY,
enum {
GST_BUFFER_READONLY = GST_DATA_FLAG_LAST,
GST_BUFFER_SUBBUFFER,
GST_BUFFER_ORIGINAL,
GST_BUFFER_DONTFREE,
GST_BUFFER_DISCONTINOUS,
GST_BUFFER_KEY_UNIT,
GST_BUFFER_PREROLL,
} GstBufferFlag;
typedef struct _GstBuffer GstBuffer;
typedef void (*GstBufferFreeFunc) (GstBuffer *buf);
typedef GstBuffer *(*GstBufferCopyFunc) (GstBuffer *srcbuf);
#include <gst/gstbufferpool.h>
GST_BUFFER_FLAG_LAST = GST_DATA_FLAG_LAST + 8,
};
struct _GstBuffer {
GstData data_type;
/* locking */
GMutex *lock;
/* pointer to data and its size */
guint8 *data; /* pointer to buffer data */
guint size; /* size of buffer data */
guint64 maxsize; /* max size of this buffer */
/* refcounting */
#ifdef HAVE_ATOMIC_H
atomic_t refcount;
#define GST_BUFFER_REFCOUNT(buf) (atomic_read(&(GST_BUFFER((buf))->refcount)))
#else
int refcount;
#define GST_BUFFER_REFCOUNT(buf) (GST_BUFFER(buf)->refcount)
#endif
/* flags */
guint16 flags; /* boolean properties of buffer */
/* pointer to data, its size, and offset in original source if known */
guchar *data;
guint32 size;
guint32 maxsize;
guint32 offset;
/* timestamp */
gint64 timestamp; /* nanoseconds since zero */
gint64 maxage; /* FIXME: not used yet */
/* subbuffer support, who's my parent? */
GstBuffer *parent;
guint64 timestamp;
guint64 offset;
/* this is a pointer to the buffer pool (if any) */
GstBufferPool *pool;
/* pointer to pool private data of parent buffer in case of a subbuffer */
gpointer pool_private;
/* utility function pointers, can override default */
GstBufferFreeFunc free; /* free the data associated with the buffer */
GstBufferCopyFunc copy; /* copy the data from one buffer to another */
};
/* initialisation */
/* bufferpools */
typedef GstBuffer* (*GstBufferPoolBufferNewFunction) (GstBufferPool *pool, guint64 offset, guint size, gpointer user_data);
typedef GstBuffer* (*GstBufferPoolBufferCopyFunction) (GstBufferPool *pool, const GstBuffer *buffer, gpointer user_data);
typedef void (*GstBufferPoolBufferFreeFunction) (GstBufferPool *pool, GstBuffer *buffer, gpointer user_data);
struct _GstBufferPool {
GstData data;
gboolean active;
GstBufferPoolBufferNewFunction buffer_new;
GstBufferPoolBufferCopyFunction buffer_copy;
GstBufferPoolBufferFreeFunction buffer_free;
gpointer user_data;
};
/*< private >*/
void _gst_buffer_initialize (void);
/* creating a new buffer from scratch */
GstBuffer* gst_buffer_new (void);
GstBuffer* gst_buffer_new_from_pool (GstBufferPool *pool, guint32 offset, guint32 size);
/* creating a subbuffer */
GstBuffer* gst_buffer_create_sub (GstBuffer *parent, guint32 offset, guint32 size);
/* refcounting */
void gst_buffer_ref (GstBuffer *buffer);
void gst_buffer_ref_by_count (GstBuffer *buffer, gint count);
void gst_buffer_unref (GstBuffer *buffer);
/* destroying the buffer */
void gst_buffer_destroy (GstBuffer *buffer);
/* copy buffer */
GstBuffer* gst_buffer_copy (GstBuffer *buffer);
/* merge, span, or append two buffers, intelligently */
GstBuffer* gst_buffer_merge (GstBuffer *buf1, GstBuffer *buf2);
GstBuffer* gst_buffer_span (GstBuffer *buf1, guint32 offset, GstBuffer *buf2, guint32 len);
GstBuffer* gst_buffer_append (GstBuffer *buffer, GstBuffer *append);
gboolean gst_buffer_is_span_fast (GstBuffer *buf1, GstBuffer *buf2);
void _gst_buffer_free (GstBuffer *buf);
GstBuffer* _gst_buffer_copy (GstBuffer *buf);
void gst_buffer_print_stats (void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
/* refcounting */
#define gst_buffer_ref(buf) GST_BUFFER (gst_data_ref (GST_DATA (buf)))
#define gst_buffer_ref_by_count(buf,c) GST_BUFFER (gst_data_ref_by_count (GST_DATA (buf), c))
#define gst_buffer_unref(buf) gst_data_unref (GST_DATA (buf))
/* copy buffer */
#define gst_buffer_copy(buffer) GST_BUFFER (gst_data_copy (GST_DATA (buffer)))
#define gst_buffer_copy_on_write(buffer) GST_BUFFER (gst_data_copy_on_write (GST_DATA (buffer)))
#define gst_buffer_free(buffer) gst_data_free (GST_DATA (buffer))
/* allocation */
GstBuffer* gst_buffer_new (void);
GstBuffer* gst_buffer_new_and_alloc (guint size);
/* creating a new buffer from a pool */
GstBuffer* gst_buffer_new_from_pool (GstBufferPool *pool, guint64 offset, guint size);
/* creating a subbuffer */
GstBuffer* gst_buffer_create_sub (GstBuffer *parent, guint offset, guint size);
/* merge, span, or append two buffers, intelligently */
GstBuffer* gst_buffer_merge (GstBuffer *buf1, GstBuffer *buf2);
gboolean gst_buffer_is_span_fast (GstBuffer *buf1, GstBuffer *buf2);
GstBuffer* gst_buffer_span (GstBuffer *buf1, guint32 offset, GstBuffer *buf2, guint32 len);
/* creating a new buffer pools */
GstBufferPool* gst_buffer_pool_new (GstDataFreeFunction free,
GstDataCopyFunction copy,
GstBufferPoolBufferNewFunction buffer_create,
GstBufferPoolBufferCopyFunction buffer_copy,
GstBufferPoolBufferFreeFunction buffer_free,
gpointer user_data);
gboolean gst_buffer_pool_is_active (GstBufferPool *pool);
void gst_buffer_pool_set_active (GstBufferPool *pool, gboolean active);
GstBufferPool* gst_buffer_pool_get_default (guint size, guint numbuffers);
#define gst_buffer_pool_ref(buf) GST_BUFFER_POOL (gst_data_ref (GST_DATA (buf)))
#define gst_buffer_pool_ref_by_count(buf,c) GST_BUFFER_POOL (gst_data_ref_by_count (GST_DATA (buf), c))
#define gst_buffer_pool_unref(buf) gst_data_unref (GST_DATA (buf))
/* bufferpool operations */
#define gst_buffer_pool_copy(pool) GST_BUFFER_POOL (gst_data_copy (GST_DATA (pool)))
#define gst_buffer_pool_copy_on_write(pool) GST_BUFFER_POOL (gst_data_copy_on_write (GST_DATA (pool)))
#define gst_buffer_pool_free(pool) gst_data_free (GST_DATA (pool))
void gst_buffer_pool_set_user_data (GstBufferPool *pool, gpointer user_data);
gpointer gst_buffer_pool_get_user_data (GstBufferPool *pool);
G_END_DECLS
#endif /* __GST_BUFFER_H__ */

View file

@ -1,390 +0,0 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
*
* gstbufferpool.c: Buffer-pool operations
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "gst_private.h"
#include "gstbuffer.h"
static GMutex *_default_pool_lock;
static GHashTable *_default_pools;
static GstBuffer* gst_buffer_pool_default_buffer_new (GstBufferPool *pool, gint64 location, gint size, gpointer user_data);
static void gst_buffer_pool_default_buffer_free (GstBuffer *buffer);
static void gst_buffer_pool_default_destroy_hook (GstBufferPool *pool, gpointer user_data);
typedef struct _GstBufferPoolDefault GstBufferPoolDefault;
struct _GstBufferPoolDefault {
GMemChunk *mem_chunk;
guint size;
};
void
_gst_buffer_pool_initialize (void)
{
_default_pools = g_hash_table_new(NULL,NULL);
_default_pool_lock = g_mutex_new ();
}
/**
* gst_buffer_pool_new:
*
* Create a new buffer pool.
*
* Returns: new buffer pool
*/
GstBufferPool*
gst_buffer_pool_new (void)
{
GstBufferPool *pool;
pool = g_new0 (GstBufferPool, 1);
GST_DEBUG (GST_CAT_BUFFER,"allocating new buffer pool %p", pool);
/* all hooks and user data set to NULL or 0 by g_new0 */
pool->lock = g_mutex_new ();
#ifdef HAVE_ATOMIC_H
atomic_set (&pool->refcount, 1);
#else
pool->refcount = 1;
#endif
return pool;
}
/**
* gst_buffer_pool_ref:
* @pool: the GstBufferPool to reference
*
* Increment the refcount of this buffer pool.
*/
void
gst_buffer_pool_ref (GstBufferPool *pool)
{
g_return_if_fail (pool != NULL);
GST_DEBUG(GST_CAT_BUFFER,"referencing buffer pool %p from %d", pool, GST_BUFFER_POOL_REFCOUNT(pool));
#ifdef HAVE_ATOMIC_H
atomic_inc (&(pool->refcount));
#else
g_return_if_fail (pool->refcount > 0);
GST_BUFFER_POOL_LOCK (pool);
pool->refcount++;
GST_BUFFER_POOL_UNLOCK (pool);
#endif
}
/**
* gst_buffer_pool_ref_by_count:
* @pool: the GstBufferPool to reference
* @count: a number
*
* Increment the refcount of this buffer pool by the given number.
*/
void
gst_buffer_pool_ref_by_count (GstBufferPool *pool, int count)
{
g_return_if_fail (pool != NULL);
g_return_if_fail (count >= 0);
#ifdef HAVE_ATOMIC_H
g_return_if_fail (atomic_read (&(pool->refcount)) > 0);
atomic_add (count, &(pool->refcount));
#else
g_return_if_fail (pool->refcount > 0);
GST_BUFFER_POOL_LOCK (pool);
pool->refcount += count;
GST_BUFFER_POOL_UNLOCK (pool);
#endif
}
/**
* gst_buffer_pool_unref:
* @pool: the GstBufferPool to unref
*
* Decrement the refcount of this buffer pool. If the refcount is
* zero and the pool is a default implementation,
* the buffer pool will be destroyed.
*/
void
gst_buffer_pool_unref (GstBufferPool *pool)
{
gint zero;
g_return_if_fail (pool != NULL);
GST_DEBUG(GST_CAT_BUFFER, "unreferencing buffer pool %p from %d", pool, GST_BUFFER_POOL_REFCOUNT(pool));
#ifdef HAVE_ATOMIC_H
g_return_if_fail (atomic_read (&(pool->refcount)) > 0);
zero = atomic_dec_and_test (&(pool->refcount));
#else
g_return_if_fail (pool->refcount > 0);
GST_BUFFER_POOL_LOCK (pool);
pool->refcount--;
zero = (pool->refcount == 0);
GST_BUFFER_POOL_UNLOCK (pool);
#endif
/* if we ended up with the refcount at zero, destroy the buffer pool*/
if (zero) {
gst_buffer_pool_destroy (pool);
}
}
/**
* gst_buffer_pool_set_buffer_new_function:
* @pool: the pool to set the buffer create function for
* @create: the create function
*
* Sets the function that will be called when a buffer is created
* from this pool.
*/
void
gst_buffer_pool_set_buffer_new_function (GstBufferPool *pool,
GstBufferPoolBufferNewFunction create)
{
g_return_if_fail (pool != NULL);
pool->buffer_new = create;
}
/**
* gst_buffer_pool_set_buffer_free_function:
* @pool: the pool to set the buffer free function for
* @destroy: the free function
*
* Sets the function that will be called when a buffer is freed
* from this pool.
*/
void
gst_buffer_pool_set_buffer_free_function (GstBufferPool *pool,
GstBufferFreeFunc destroy)
{
g_return_if_fail (pool != NULL);
pool->buffer_free = destroy;
}
/**
* gst_buffer_pool_set_buffer_copy_function:
* @pool: the pool to set the buffer copy function for
* @copy: the copy function
*
* Sets the function that will be called when a buffer is copied.
*
* You may use the default GstBuffer implementation (gst_buffer_copy).
*/
void
gst_buffer_pool_set_buffer_copy_function (GstBufferPool *pool,
GstBufferCopyFunc copy)
{
g_return_if_fail (pool != NULL);
pool->buffer_copy = copy;
}
/**
* gst_buffer_pool_set_destroy_hook:
* @pool: the pool to set the destroy hook for
* @destroy: the destroy function
*
* Sets the function that will be called before a bufferpool is destroyed.
* You can take care of you private_data here.
*/
void
gst_buffer_pool_set_destroy_hook (GstBufferPool *pool,
GstBufferPoolDestroyHook destroy)
{
g_return_if_fail (pool != NULL);
pool->destroy_hook = destroy;
}
/**
* gst_buffer_pool_set_user_data:
* @pool: the pool to set the user data for
* @user_data: any user data to be passed to the create/destroy buffer functions
* and the destroy hook
*
* You can put your private data here.
*/
void
gst_buffer_pool_set_user_data (GstBufferPool *pool,
gpointer user_data)
{
g_return_if_fail (pool != NULL);
pool->user_data = user_data;
}
/**
* gst_buffer_pool_get_user_data:
* @pool: the pool to get the user data from
*
* gets user data
*
* Returns: The user data of this bufferpool
*/
gpointer
gst_buffer_pool_get_user_data (GstBufferPool *pool)
{
g_return_val_if_fail (pool != NULL, NULL);
return pool->user_data;
}
/**
* gst_buffer_pool_destroy:
* @pool: the pool to destroy
*
* Frees the memory for this bufferpool, calls the destroy hook.
*/
void
gst_buffer_pool_destroy (GstBufferPool *pool)
{
g_return_if_fail (pool != NULL);
if (pool->destroy_hook)
pool->destroy_hook (pool, pool->user_data);
g_free(pool);
}
/*
* This is so we don't get messed up by GST_BUFFER_WHERE.
*/
static GstBuffer *
_pool_gst_buffer_copy (GstBuffer *buffer)
{ return gst_buffer_copy (buffer); }
/**
* gst_buffer_pool_get_default:
* @buffer_size: the number of bytes this buffer will store
* @pool_size: the default number of buffers to be preallocated
*
* Returns an instance of a buffer pool using the default
* implementation. If a buffer pool instance with the same buffer_size
* already exists this will be returned, otherwise a new instance will
* be created.
*
* Returns: an instance of GstBufferPool
*/
GstBufferPool*
gst_buffer_pool_get_default (guint buffer_size, guint pool_size)
{
GstBufferPool *pool;
GMemChunk *data_chunk;
guint real_buffer_size;
GstBufferPoolDefault *def;
/* round up to the nearest 32 bytes for cache-line and other efficiencies */
real_buffer_size = (((buffer_size-1) / 32) + 1) * 32;
/* check for an existing GstBufferPool with the same real_buffer_size */
/* (we won't worry about the pool_size) */
g_mutex_lock (_default_pool_lock);
pool = (GstBufferPool*)g_hash_table_lookup(_default_pools,GINT_TO_POINTER(real_buffer_size));
g_mutex_unlock (_default_pool_lock);
if (pool != NULL){
gst_buffer_pool_ref(pool);
return pool;
}
data_chunk = g_mem_chunk_new ("GstBufferPoolDefault", real_buffer_size,
real_buffer_size * pool_size, G_ALLOC_AND_FREE);
pool = gst_buffer_pool_new();
gst_buffer_pool_set_buffer_new_function (pool, gst_buffer_pool_default_buffer_new);
gst_buffer_pool_set_buffer_free_function (pool, gst_buffer_pool_default_buffer_free);
gst_buffer_pool_set_buffer_copy_function (pool, _pool_gst_buffer_copy);
gst_buffer_pool_set_destroy_hook (pool, gst_buffer_pool_default_destroy_hook);
def = g_new0 (GstBufferPoolDefault, 1);
def->size = buffer_size;
def->mem_chunk = data_chunk;
pool->user_data = def;
g_mutex_lock (_default_pool_lock);
g_hash_table_insert(_default_pools,GINT_TO_POINTER(real_buffer_size),pool);
g_mutex_unlock (_default_pool_lock);
GST_DEBUG(GST_CAT_BUFFER,"new buffer pool %p bytes:%d size:%d", pool, real_buffer_size, pool_size);
return pool;
}
static GstBuffer*
gst_buffer_pool_default_buffer_new (GstBufferPool *pool, gint64 location /*unused*/,
gint size /*unused*/, gpointer user_data)
{
GstBuffer *buffer;
GstBufferPoolDefault *def = (GstBufferPoolDefault*) user_data;
GMemChunk *data_chunk = def->mem_chunk;
gst_buffer_pool_ref(pool);
buffer = gst_buffer_new();
GST_INFO (GST_CAT_BUFFER,"creating new buffer %p from pool %p",buffer,pool);
g_mutex_lock (pool->lock);
GST_BUFFER_DATA(buffer) = g_mem_chunk_alloc(data_chunk);
g_mutex_unlock (pool->lock);
GST_BUFFER_SIZE(buffer) = def->size;
GST_BUFFER_MAXSIZE(buffer) = def->size;
return buffer;
}
static void
gst_buffer_pool_default_buffer_free (GstBuffer *buffer)
{
GstBufferPool *pool = buffer->pool;
GstBufferPoolDefault *def = (GstBufferPoolDefault*) pool->user_data;
GMemChunk *data_chunk = def->mem_chunk;
gpointer data = GST_BUFFER_DATA(buffer);
g_mutex_lock (pool->lock);
g_mem_chunk_free (data_chunk,data);
g_mutex_unlock (pool->lock);
buffer->pool = NULL;
gst_buffer_pool_unref(pool);
}
static void
gst_buffer_pool_default_destroy_hook (GstBufferPool *pool, gpointer user_data)
{
GstBufferPoolDefault *def = (GstBufferPoolDefault*) user_data;
GMemChunk *data_chunk = def->mem_chunk;
GST_DEBUG(GST_CAT_BUFFER,"destroying default buffer pool %p", pool);
g_mutex_free (pool->lock);
g_mem_chunk_reset(data_chunk);
g_free(data_chunk);
g_free(def);
}

View file

@ -1,99 +0,0 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
*
* gstbufferpool.h: Header for buffer-pool management
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_BUFFER_POOL_H__
#define __GST_BUFFER_POOL_H__
#include <gst/gstbuffer.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
#define GST_BUFFER_POOL(pool) \
((GstBufferPool *)(pool))
#define GST_BUFFER_POOL_LOCK(pool) (g_mutex_lock(GST_BUFFER_POOL(pool)->lock))
#define GST_BUFFER_POOL_UNLOCK(pool) (g_mutex_unlock(GST_BUFFER_POOL(pool)->lock))
typedef struct _GstBufferPool GstBufferPool;
typedef GstBuffer* (*GstBufferPoolBufferNewFunction) (GstBufferPool *pool, gint64 location, gint size, gpointer user_data);
typedef void (*GstBufferPoolDestroyHook) (GstBufferPool *pool, gpointer user_data);
struct _GstBufferPool {
/* locking */
GMutex *lock;
/* refcounting */
#ifdef HAVE_ATOMIC_H
atomic_t refcount;
#define GST_BUFFER_POOL_REFCOUNT(pool) (atomic_read(&(GST_BUFFER_POOL((pool))->refcount)))
#else
int refcount;
#define GST_BUFFER_POOL_REFCOUNT(pool) (GST_BUFFER_POOL(pool)->refcount)
#endif
GstBufferPoolBufferNewFunction buffer_new;
GstBufferFreeFunc buffer_free;
GstBufferCopyFunc buffer_copy;
GstBufferPoolDestroyHook destroy_hook;
gpointer user_data;
};
void _gst_buffer_pool_initialize (void);
/* creating a new buffer pool from scratch */
GstBufferPool* gst_buffer_pool_new (void);
/* refcounting */
void gst_buffer_pool_ref (GstBufferPool *pool);
void gst_buffer_pool_ref_by_count (GstBufferPool *pool, int count);
void gst_buffer_pool_unref (GstBufferPool *pool);
/* setting create and destroy functions */
void gst_buffer_pool_set_buffer_new_function (GstBufferPool *pool,
GstBufferPoolBufferNewFunction create);
void gst_buffer_pool_set_buffer_free_function (GstBufferPool *pool,
GstBufferFreeFunc destroy);
void gst_buffer_pool_set_buffer_copy_function (GstBufferPool *pool,
GstBufferCopyFunc copy);
void gst_buffer_pool_set_destroy_hook (GstBufferPool *pool,
GstBufferPoolDestroyHook destroy);
void gst_buffer_pool_set_user_data (GstBufferPool *pool,
gpointer user_data);
gpointer gst_buffer_pool_get_user_data (GstBufferPool *pool);
/* destroying the buffer pool */
void gst_buffer_pool_destroy (GstBufferPool *pool);
/* a default buffer pool implementation */
GstBufferPool* gst_buffer_pool_get_default (guint buffer_size, guint pool_size);
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* __GST_BUFFER_POOL_H__ */

View file

@ -26,12 +26,12 @@
#include "gst_private.h"
#include "gstclock.h"
#include "gstlog.h"
#include "gstmemchunk.h"
#define CLASS(clock) GST_CLOCK_CLASS (G_OBJECT_GET_CLASS (clock))
static GMemChunk *_gst_clock_entries_chunk;
static GMutex *_gst_clock_entries_chunk_lock;
static GList *_gst_clock_entries_pool;
static GstMemChunk *_gst_clock_entries_chunk;
static void gst_clock_class_init (GstClockClass *klass);
static void gst_clock_init (GstClock *clock);
@ -54,17 +54,10 @@ struct _GstClockEntry {
GstEntryStatus status;
GstClockCallback func;
gpointer user_data;
GMutex *lock;
GCond *cond;
};
#define GST_CLOCK_ENTRY(entry) ((GstClockEntry *)(entry))
#define GST_CLOCK_ENTRY_TIME(entry) (((GstClockEntry *)(entry))->time)
#define GST_CLOCK_ENTRY_LOCK(entry) (g_mutex_lock ((entry)->lock))
#define GST_CLOCK_ENTRY_UNLOCK(entry) (g_mutex_unlock ((entry)->lock))
#define GST_CLOCK_ENTRY_SIGNAL(entry) (g_cond_signal ((entry)->cond))
#define GST_CLOCK_ENTRY_WAIT(entry) (g_cond_wait (entry->cond, entry->lock))
#define GST_CLOCK_ENTRY_TIMED_WAIT(entry, time) (g_cond_timed_wait (entry->cond, entry->lock, (time)))
static GstClockEntry*
gst_clock_entry_new (GstClockTime time,
@ -72,20 +65,8 @@ gst_clock_entry_new (GstClockTime time,
{
GstClockEntry *entry;
g_mutex_lock (_gst_clock_entries_chunk_lock);
if (_gst_clock_entries_pool) {
entry = GST_CLOCK_ENTRY (_gst_clock_entries_pool->data);
entry = gst_mem_chunk_alloc (_gst_clock_entries_chunk);
_gst_clock_entries_pool = g_list_remove (_gst_clock_entries_pool, entry);
g_mutex_unlock (_gst_clock_entries_chunk_lock);
}
else {
entry = g_mem_chunk_alloc (_gst_clock_entries_chunk);
g_mutex_unlock (_gst_clock_entries_chunk_lock);
entry->lock = g_mutex_new ();
entry->cond = g_cond_new ();
}
entry->time = time;
entry->func = func;
entry->user_data = user_data;
@ -93,6 +74,7 @@ gst_clock_entry_new (GstClockTime time,
return entry;
}
/*
static gint
clock_compare_func (gconstpointer a,
gconstpointer b)
@ -102,6 +84,7 @@ clock_compare_func (gconstpointer a,
return (entry1->time - entry2->time);
}
*/
GType
gst_clock_get_type (void)
@ -141,11 +124,9 @@ gst_clock_class_init (GstClockClass *klass)
if (!g_thread_supported ())
g_thread_init (NULL);
_gst_clock_entries_chunk = g_mem_chunk_new ("GstClockEntries",
_gst_clock_entries_chunk = gst_mem_chunk_new ("GstClockEntries",
sizeof (GstClockEntry), sizeof (GstClockEntry) * 32,
G_ALLOC_AND_FREE);
_gst_clock_entries_chunk_lock = g_mutex_new ();
_gst_clock_entries_pool = NULL;
}
static void
@ -384,10 +365,6 @@ gst_clock_wait_async_func (GstClock *clock, GstClockTime time,
entry = gst_clock_entry_new (time, func, user_data);
GST_LOCK (clock);
clock->entries = g_list_insert_sorted (clock->entries, entry, clock_compare_func);
GST_UNLOCK (clock);
return entry;
}
@ -493,11 +470,6 @@ gst_clock_remove_notify_async (GstClock *clock, GstClockID id)
static void
gst_clock_unlock_func (GstClock *clock, GstClockTime time, GstClockID id, gpointer user_data)
{
GstClockEntry *entry = (GstClockEntry *) id;
GST_CLOCK_ENTRY_LOCK (entry);
GST_CLOCK_ENTRY_SIGNAL (entry);
GST_CLOCK_ENTRY_UNLOCK (entry);
}
/**
@ -516,8 +488,7 @@ gst_clock_wait_id (GstClock *clock, GstClockID id, GstClockTimeDiff *jitter)
{
GstClockReturn res = GST_CLOCK_TIMEOUT;
GstClockEntry *entry = (GstClockEntry *) id;
GstClockTime current_real, current, target;
GTimeVal timeval;
GstClockTime current, target;
GstClockTimeDiff this_jitter;
g_return_val_if_fail (GST_IS_CLOCK (clock), GST_CLOCK_ERROR);
@ -525,27 +496,25 @@ gst_clock_wait_id (GstClock *clock, GstClockID id, GstClockTimeDiff *jitter)
current = gst_clock_get_time (clock);
g_get_current_time (&timeval);
current_real = GST_TIMEVAL_TO_TIME (timeval);
GST_CLOCK_ENTRY_LOCK (entry);
entry->func = gst_clock_unlock_func;
target = GST_CLOCK_ENTRY_TIME (entry) - current + current_real;
target = GST_CLOCK_ENTRY_TIME (entry) - current;
GST_DEBUG (GST_CAT_CLOCK, "real_target %llu, current_real %llu, target %llu, now %llu",
target, current_real, GST_CLOCK_ENTRY_TIME (entry), current);
GST_DEBUG (GST_CAT_CLOCK, "real_target %llu, target %llu, now %llu",
target, GST_CLOCK_ENTRY_TIME (entry), current);
if (((gint64)target) > 0) {
struct timeval tv;
GST_TIME_TO_TIMEVAL (target, tv);
select (0, NULL, NULL, NULL, &tv);
if (target > current_real) {
GST_TIME_TO_TIMEVAL (target, timeval);
GST_CLOCK_ENTRY_TIMED_WAIT (entry, &timeval);
current = gst_clock_get_time (clock);
this_jitter = current - GST_CLOCK_ENTRY_TIME (entry);
}
else {
res = GST_CLOCK_EARLY;
this_jitter = target - current_real;
this_jitter = target;
}
GST_CLOCK_ENTRY_UNLOCK (entry);
if (jitter)
*jitter = this_jitter;
@ -593,13 +562,7 @@ gst_clock_id_get_time (GstClockID id)
static void
gst_clock_free_entry (GstClock *clock, GstClockEntry *entry)
{
GST_LOCK (clock);
clock->entries = g_list_remove (clock->entries, entry);
GST_UNLOCK (clock);
g_mutex_lock (_gst_clock_entries_chunk_lock);
_gst_clock_entries_pool = g_list_prepend (_gst_clock_entries_pool, entry);
g_mutex_unlock (_gst_clock_entries_chunk_lock);
gst_mem_chunk_free (_gst_clock_entries_chunk, entry);
}
/**

View file

@ -24,23 +24,77 @@
#ifndef __GST_DATA_H__
#define __GST_DATA_H__
#include <gst/gstobject.h>
#include <glib-object.h>
#include <gst/gstatomic.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
G_BEGIN_DECLS
#define GST_DATA(data) (GstData*)(data)
#define GST_DATA_TYPE(data) (((GstData*)(data))->type)
/* type */
#define GST_DATA(data) ((GstData*)(data))
#define GST_DATA_TYPE(data) (GST_DATA(data)->type)
/* flags */
#define GST_DATA_FLAGS(data) (GST_DATA(data)->flags)
#define GST_DATA_FLAG_SHIFT(flag) (1<<(flag))
#define GST_DATA_FLAG_IS_SET(data,flag) (GST_DATA_FLAGS(data) & (1<<(flag)))
#define GST_DATA_FLAG_SET(data,flag) G_STMT_START{ (GST_DATA_FLAGS(data) |= (1<<(flag))); }G_STMT_END
#define GST_DATA_FLAG_UNSET(data,flag) G_STMT_START{ (GST_DATA_FLAGS(data) &= ~(1<<(flag))); }G_STMT_END
typedef struct _GstData GstData;
typedef void (*GstDataFreeFunction) (GstData *data);
typedef GstData* (*GstDataCopyFunction) (const GstData *data);
typedef enum
{
GST_DATA_READONLY = 1,
/* insert more */
GST_DATA_FLAG_LAST = 8,
} GstDataFlags;
#define GST_DATA_IS_READONLY(data) (GST_DATA_FLAG_IS_SET((data), GST_DATA_READONLY))
/* refcount */
#define GST_DATA_REFCOUNT(data) ((GST_DATA(data))->refcount)
#define GST_DATA_REFCOUNT_VALUE(data) (GST_ATOMIC_INT_VALUE((&GST_DATA_REFCOUNT (data))))
#define GST_DATA_REFCOUNT_READ(data,value) (GST_ATOMIC_INT_READ(&(GST_DATA_REFCOUNT (data)),value)
/* copy/free functions */
#define GST_DATA_COPY_FUNC(data) (GST_DATA(data)->copy)
#define GST_DATA_FREE_FUNC(data) (GST_DATA(data)->free)
struct _GstData {
GType type;
/* refcounting */
GstAtomicInt refcount;
guint16 flags;
/* utility function pointers, can override default */
GstDataFreeFunction free; /* free the data */
GstDataCopyFunction copy; /* free the data */
};
#ifdef __cplusplus
}
#endif /* __cplusplus */
/* function used by subclasses only */
void _gst_data_init (GstData *data, GType type, guint16 flags,
GstDataFreeFunction free,
GstDataCopyFunction copy);
void _gst_data_free (GstData *data);
GstData* _gst_data_copy (const GstData *data);
/* basic operations on data */
GstData* gst_data_copy (const GstData *data);
GstData* gst_data_copy_on_write (const GstData *data);
void gst_data_free (GstData *data);
/* reference counting */
GstData* gst_data_ref (GstData* data);
GstData* gst_data_ref_by_count (GstData* data, gint count);
void gst_data_unref (GstData* data);
G_END_DECLS
#endif /* __GST_DATA_H__ */

View file

@ -32,6 +32,7 @@
#include "gstscheduler.h"
#include "gstevent.h"
#include "gstutils.h"
#include "gstlog.h"
/* Element signals and args */
enum {

View file

@ -20,44 +20,72 @@
* Boston, MA 02111-1307, USA.
*/
#include "gst/gstinfo.h"
#include "gst/gstevent.h"
#include <string.h> /* memcpy */
#include "gstinfo.h"
#include "gstdata_private.h"
#include "gstevent.h"
#include "gstlog.h"
/* #define MEMPROF */
GType _gst_event_type;
static GMemChunk *_gst_event_chunk;
static GMutex *_gst_event_chunk_lock;
static gint _gst_event_live;
void
_gst_event_initialize (void)
{
gint eventsize = sizeof(GstEvent);
static const GTypeInfo event_info = {
0,
NULL,
NULL,
NULL,
NULL,
NULL,
0,
0,
NULL,
NULL,
};
/* round up to the nearest 32 bytes for cache-line and other efficiencies */
eventsize = (((eventsize-1) / 32) + 1) * 32;
_gst_event_chunk = g_mem_chunk_new ("GstEvent", eventsize,
eventsize * 32, G_ALLOC_AND_FREE);
_gst_event_chunk_lock = g_mutex_new ();
/* register the type */
_gst_event_type = g_type_register_static (G_TYPE_INT, "GstEvent", &event_info, 0);
_gst_event_type = g_boxed_type_register_static ("GstEvent",
(GBoxedCopyFunc) gst_data_ref,
(GBoxedFreeFunc) gst_data_unref);
_gst_event_live = 0;
}
/**
* gst_buffer_print_stats:
*
* Logs statistics about live buffers (using g_log).
*/
void
gst_event_print_stats (void)
{
g_log (g_log_domain_gstreamer, G_LOG_LEVEL_INFO,
"%d live event(s)", _gst_event_live);
}
static GstEvent*
_gst_event_copy (GstEvent *event)
{
GstEvent *copy;
copy = g_new0(GstEvent, 1);
_gst_event_live++;
memcpy (copy, event, sizeof (GstEvent));
/* FIXME copy/ref additional fields */
return copy;
}
static void
_gst_event_free (GstEvent* event)
{
GST_INFO (GST_CAT_EVENT, "freeing event %p", event);
if (GST_EVENT_SRC (event)) {
gst_object_unref (GST_EVENT_SRC (event));
}
switch (GST_EVENT_TYPE (event)) {
default:
break;
}
_GST_DATA_DISPOSE (GST_DATA (event));
_gst_event_live--;
g_free (event);
}
/**
@ -73,16 +101,16 @@ gst_event_new (GstEventType type)
{
GstEvent *event;
#ifndef MEMPROF
g_mutex_lock (_gst_event_chunk_lock);
event = g_mem_chunk_alloc (_gst_event_chunk);
g_mutex_unlock (_gst_event_chunk_lock);
#else
event = g_new0(GstEvent, 1);
#endif
_gst_event_live++;
GST_INFO (GST_CAT_EVENT, "creating new event %p", event);
GST_DATA_TYPE (event) = _gst_event_type;
_GST_DATA_INIT (GST_DATA (event),
_gst_event_type,
0,
(GstDataFreeFunction) _gst_event_free,
(GstDataCopyFunction) _gst_event_copy);
GST_EVENT_TYPE (event) = type;
GST_EVENT_TIMESTAMP (event) = 0LL;
GST_EVENT_SRC (event) = NULL;
@ -90,61 +118,6 @@ gst_event_new (GstEventType type)
return event;
}
/**
* gst_event_copy:
* @event: The event to copy
*
* Copy the event
*
* Returns: A copy of the event.
*/
GstEvent*
gst_event_copy (GstEvent *event)
{
GstEvent *copy;
#ifndef MEMPROF
g_mutex_lock (_gst_event_chunk_lock);
copy = g_mem_chunk_alloc (_gst_event_chunk);
g_mutex_unlock (_gst_event_chunk_lock);
#else
copy = g_new0(GstEvent, 1);
#endif
memcpy (copy, event, sizeof (GstEvent));
/* FIXME copy/ref additional fields */
return copy;
}
/**
* gst_event_free:
* @event: The event to free
*
* Free the given element.
*/
void
gst_event_free (GstEvent* event)
{
GST_INFO (GST_CAT_EVENT, "freeing event %p", event);
g_mutex_lock (_gst_event_chunk_lock);
if (GST_EVENT_SRC (event)) {
gst_object_unref (GST_EVENT_SRC (event));
}
switch (GST_EVENT_TYPE (event)) {
default:
break;
}
#ifndef MEMPROF
g_mem_chunk_free (_gst_event_chunk, event);
#else
g_free (event);
#endif
g_mutex_unlock (_gst_event_chunk_lock);
}
/**
* gst_event_new_seek:
* @type: The type of the seek event
@ -160,6 +133,7 @@ gst_event_new_seek (GstSeekType type, gint64 offset)
GstEvent *event;
event = gst_event_new (GST_EVENT_SEEK);
GST_EVENT_SEEK_TYPE (event) = type;
GST_EVENT_SEEK_OFFSET (event) = offset;
@ -235,12 +209,22 @@ gst_event_discont_get_value (GstEvent *event, GstFormat format, gint64 *value)
}
/**
* gst_event_new_size:
* @format: The format of the size value
* @value: The value of the size event
*
* Create a new size event with the given values.
*
* Returns: The new size event.
*/
GstEvent*
gst_event_new_size (GstFormat format, gint64 value)
{
GstEvent *event;
event = gst_event_new (GST_EVENT_SIZE);
GST_EVENT_SIZE_FORMAT (event) = format;
GST_EVENT_SIZE_VALUE (event) = value;

View file

@ -26,8 +26,8 @@
#include <gst/gsttypes.h>
#include <gst/gstdata.h>
#include <gst/gstcaps.h>
#include <gst/gstformat.h>
#include <gst/gstobject.h>
G_BEGIN_DECLS
@ -124,9 +124,16 @@ struct _GstEvent {
void _gst_event_initialize (void);
void gst_event_print_stats (void);
GstEvent* gst_event_new (GstEventType type);
GstEvent* gst_event_copy (GstEvent *event);
void gst_event_free (GstEvent *event);
/* refcounting */
#define gst_event_ref(ev) gst_data_ref (GST_DATA (ev))
#define gst_event_ref_by_count(ev,c) gst_data_ref_by_count (GST_DATA (ev), c)
#define gst_event_unref(ev) gst_data_unref (GST_DATA (ev))
/* copy buffer */
#define gst_event_copy(ev) GST_EVENT (gst_data_copy (GST_DATA (ev)))
/* seek event */
GstEvent* gst_event_new_seek (GstSeekType type, gint64 offset);

View file

@ -30,6 +30,7 @@
#include "gstbin.h"
#include "gstscheduler.h"
#include "gstevent.h"
#include "gstlog.h"
enum {
TEMPL_PAD_CREATED,
@ -1045,7 +1046,6 @@ gst_pad_try_set_caps_func (GstRealPad *pad, GstCaps *caps, gboolean notify)
GstPadTemplate *template;
GstElement *parent = GST_PAD_PARENT (pad);
/* thomas: FIXME: is this the right result to return ? */
g_return_val_if_fail (pad != NULL, GST_PAD_CONNECT_REFUSED);
g_return_val_if_fail (GST_IS_PAD (pad), GST_PAD_CONNECT_REFUSED);
@ -1856,10 +1856,7 @@ gst_pad_push (GstPad *pad, GstBuffer *buf)
}
/* clean up the mess here */
if (buf != NULL) {
if (GST_IS_BUFFER (buf))
gst_buffer_unref (buf);
else
gst_event_free (GST_EVENT (buf));
gst_data_unref (GST_DATA (buf));
}
}
#endif
@ -2341,7 +2338,9 @@ gst_pad_event_default_dispatch (GstPad *pad, GstElement *element, GstEvent *even
/* for all pads in the opposite direction that are connected */
if (GST_PAD_DIRECTION (eventpad) != GST_PAD_DIRECTION (pad) && GST_PAD_IS_CONNECTED (eventpad)) {
if (GST_PAD_DIRECTION (eventpad) == GST_PAD_SRC) {
gst_pad_push (eventpad, GST_BUFFER (gst_event_copy (event)));
/* increase the refcount */
gst_event_ref (event);
gst_pad_push (eventpad, GST_BUFFER (event));
}
else {
GstPad *peerpad = GST_PAD_CAST (GST_RPAD_PEER (eventpad));
@ -2352,6 +2351,7 @@ gst_pad_event_default_dispatch (GstPad *pad, GstElement *element, GstEvent *even
}
}
}
gst_event_unref (event);
return TRUE;
}
@ -2398,17 +2398,17 @@ gst_pad_event_default (GstPad *pad, GstEvent *event)
/**
* gst_pad_dispatcher:
* @pad: the pad to dispatch
* @dispatch: the GstDispatcherFunc to call
* @dispatch: the GstDispatcherFunction to call
* @data: data passed to the dispatcher function.
*
* Invoke the given dispatcher function on all internally connected
* pads of the given pad. The GstPadDispatcherFunc should return
* pads of the given pad. The GstPadDispatcherFunction should return
* TRUE when no further pads need to be preocessed.
*
* Returns: TRUE if one of the dispatcher functions returned TRUE.
*/
gboolean
gst_pad_dispatcher (GstPad *pad, GstPadDispatcherFunc dispatch, gpointer data)
gst_pad_dispatcher (GstPad *pad, GstPadDispatcherFunction dispatch, gpointer data)
{
gboolean res = FALSE;
GList *int_pads, *orig;
@ -2464,6 +2464,7 @@ gst_pad_send_event (GstPad *pad, GstEvent *event)
success = GST_RPAD_EVENTFUNC (pad) (pad, event);
else {
GST_DEBUG(GST_CAT_EVENT, "there's no event function for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
gst_event_unref (event);
}
return success;
@ -2514,7 +2515,7 @@ gst_pad_convert_default (GstPad *pad,
data.dest_format = dest_format;
data.dest_value = dest_value;
return gst_pad_dispatcher (pad, (GstPadDispatcherFunc) gst_pad_convert_dispatcher, &data);
return gst_pad_dispatcher (pad, (GstPadDispatcherFunction) gst_pad_convert_dispatcher, &data);
}
/**
@ -2594,7 +2595,7 @@ gst_pad_query_default (GstPad *pad, GstPadQueryType type,
data.format = format;
data.value = value;
return gst_pad_dispatcher (pad, (GstPadDispatcherFunc) gst_pad_query_dispatcher, &data);
return gst_pad_dispatcher (pad, (GstPadDispatcherFunction) gst_pad_query_dispatcher, &data);
}
/**

View file

@ -32,9 +32,7 @@
#include <gst/gstevent.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
G_BEGIN_DECLS
extern GType _gst_pad_type;
extern GType _gst_real_pad_type;
@ -142,7 +140,7 @@ typedef GstPadConnectReturn (*GstPadConnectFunction) (GstPad *pad, GstCaps *cap
typedef GstCaps* (*GstPadGetCapsFunction) (GstPad *pad, GstCaps *caps);
typedef GstBufferPool* (*GstPadBufferPoolFunction) (GstPad *pad);
typedef gboolean (*GstPadDispatcherFunc) (GstPad *pad, gpointer data);
typedef gboolean (*GstPadDispatcherFunction) (GstPad *pad, gpointer data);
typedef enum {
GST_PAD_UNKNOWN,
@ -436,7 +434,7 @@ gboolean gst_pad_query_default (GstPad *pad, GstPadQueryType type,
GList* gst_pad_get_internal_connections (GstPad *pad);
GList* gst_pad_get_internal_connections_default (GstPad *pad);
gboolean gst_pad_dispatcher (GstPad *pad, GstPadDispatcherFunc dispatch,
gboolean gst_pad_dispatcher (GstPad *pad, GstPadDispatcherFunction dispatch,
gpointer data);
@ -467,9 +465,7 @@ xmlNodePtr gst_pad_ghost_save_thyself (GstPad *pad,
GstElement *bin,
xmlNodePtr parent);
#ifdef __cplusplus
}
#endif /* __cplusplus */
G_END_DECLS
#endif /* __GST_PAD_H__ */

View file

@ -22,6 +22,7 @@
/* #define DEBUG_ENABLED */
/* #define STATUS_ENABLED */
#ifdef STATUS_ENABLED
#define STATUS(A) GST_DEBUG(GST_CAT_DATAFLOW, A, GST_ELEMENT_NAME(queue))
#else
@ -36,6 +37,7 @@
#include "gstqueue.h"
#include "gstscheduler.h"
#include "gstevent.h"
#include "gstlog.h"
GstElementDetails gst_queue_details = {
"Queue",
@ -239,6 +241,7 @@ gst_queue_init (GstQueue *queue)
queue->not_empty = g_cond_new ();
queue->not_full = g_cond_new ();
queue->events = g_async_queue_new();
queue->queue = g_queue_new ();
GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions");
}
@ -250,6 +253,8 @@ gst_queue_dispose (GObject *object)
g_mutex_free (queue->qlock);
g_cond_free (queue->not_empty);
g_cond_free (queue->not_full);
gst_queue_locked_flush (queue);
g_queue_free (queue->queue);
g_async_queue_unref(queue->events);
@ -267,28 +272,25 @@ gst_queue_get_bufferpool (GstPad *pad)
}
static void
gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
gst_queue_cleanup_data (gpointer data, const gpointer user_data)
{
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p", data);
if (GST_IS_BUFFER (data)) {
gst_buffer_unref (GST_BUFFER (data));
}
else {
gst_event_free (GST_EVENT (data));
}
gst_data_unref (GST_DATA (data));
}
static void
gst_queue_locked_flush (GstQueue *queue)
{
g_list_foreach (queue->queue, gst_queue_cleanup_buffers,
(gpointer) queue);
g_list_free (queue->queue);
gpointer data;
queue->queue = NULL;
queue->level_buffers = 0;
while ((data = g_queue_pop_head (queue->queue))) {
gst_queue_cleanup_data (data, (gpointer) queue);
}
queue->timeval = NULL;
queue->level_buffers = 0;
queue->level_bytes = 0;
queue->level_time = 0LL;
/* make sure any pending buffers to be added are flushed too */
queue->flush = TRUE;
}
@ -364,25 +366,28 @@ restart:
}
/* otherwise we have to push a buffer off the other end */
else {
GList *front;
gpointer front;
GstBuffer *leakbuf;
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end");
front = queue->queue;
leakbuf = (GstBuffer *)(front->data);
if (GST_IS_EVENT (leakbuf))
front = g_queue_pop_head (queue->queue);
leakbuf = (GstBuffer *)(front);
if (GST_IS_EVENT (leakbuf)) {
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(leakbuf)));
}
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
gst_buffer_unref(leakbuf);
queue->queue = g_list_remove_link (queue->queue, front);
g_list_free (front);
gst_data_unref (GST_DATA (leakbuf));
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d",
queue->level_buffers, queue->size_buffers);
while (queue->level_buffers == queue->size_buffers) {
/* if there's a pending state change for this queue or its manager, switch */
/* back to iterator so bottom half of state change executes */
@ -404,8 +409,7 @@ restart:
/* this means the other end is shut down */
/* try to signal to resolve the error */
if (!queue->may_deadlock) {
if (GST_IS_BUFFER (buf)) gst_buffer_unref (buf);
else gst_event_free (GST_EVENT (buf));
gst_data_unref (GST_DATA (buf));
g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
return;
@ -429,17 +433,18 @@ restart:
}
/* put the buffer on the tail of the list */
queue->queue = g_list_append (queue->queue, buf);
g_queue_push_tail (queue->queue, buf);
queue->level_buffers++;
queue->level_bytes += GST_BUFFER_SIZE(buf);
/* this assertion _has_ to hold */
g_assert (queue->queue->length == queue->level_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d",
GST_DEBUG_PAD_NAME(pad),
queue->level_buffers, queue->size_buffers);
/* this assertion _has_ to hold */
/* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
/* reader waiting on an empty queue */
reader = queue->reader;
@ -457,7 +462,7 @@ gst_queue_get (GstPad *pad)
{
GstQueue *queue;
GstBuffer *buf = NULL;
GList *front;
gpointer front;
gboolean writer;
g_assert(pad != NULL);
@ -467,8 +472,6 @@ gst_queue_get (GstPad *pad)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
restart:
/* have to lock for thread-safety */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld", pthread_self ());
@ -505,12 +508,14 @@ restart:
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!");
queue->reader = TRUE;
if (queue->block_timeout > -1){
//if (queue->block_timeout > -1){
if (FALSE) {
GTimeVal timeout;
g_get_current_time(&timeout);
g_time_val_add(&timeout, queue->block_timeout);
if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){
g_mutex_unlock (queue->qlock);
g_warning ("filler");
return GST_BUFFER(gst_event_new_filler());
}
}
@ -522,11 +527,9 @@ restart:
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
front = queue->queue;
buf = (GstBuffer *)(front->data);
front = g_queue_pop_head (queue->queue);
buf = (GstBuffer *)(front);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue", buf);
queue->queue = g_list_remove_link (queue->queue, front);
g_list_free (front);
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf);
@ -536,7 +539,7 @@ restart:
queue->level_buffers, queue->size_buffers);
/* this assertion _has_ to hold */
/* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
g_assert (queue->queue->length == queue->level_buffers);
/* writer waiting on a full queue */
writer = queue->writer;
@ -640,11 +643,7 @@ gst_queue_change_state (GstElement *element)
new_state = GST_STATE_PENDING (element);
if (new_state == GST_STATE_PAUSED) {
/*g_cond_signal (queue->not_full); */
/*g_cond_signal (queue->not_empty); */
}
else if (new_state == GST_STATE_READY) {
if (new_state == GST_STATE_READY) {
gst_queue_locked_flush (queue);
}
else if (new_state == GST_STATE_PLAYING) {

View file

@ -28,10 +28,7 @@
#include <gst/gstelement.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
G_BEGIN_DECLS
extern GstElementDetails gst_queue_details;
@ -63,7 +60,7 @@ struct _GstQueue {
GstPad *srcpad;
/* the queue of buffers we're keeping our grubby hands on */
GList *queue;
GQueue *queue;
guint level_buffers; /* number of buffers queued here */
guint level_bytes; /* number of bytes queued here */
@ -101,9 +98,7 @@ struct _GstQueueClass {
GType gst_queue_get_type (void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
G_END_DECLS
#endif /* __GST_QUEUE_H__ */

View file

@ -26,6 +26,7 @@
#include "gst_private.h"
#include "gstutils.h"
#include "gstlog.h"
#include "gstextratypes.h"
@ -218,14 +219,14 @@ gst_util_dump_mem (guchar * mem, guint size)
guint k;
for (k = i - 16; k < i; k++) {
if (isprint (mem[k]))
if (mem[k]>'a' && mem[k] < 'Z')
g_print ("%c", mem[k]);
else
g_print (".");
}
g_print ("\n");
}
g_print ("%08x : ", i);
g_print ("%08x (%p): ", i, mem+i);
j = 15;
}
else {

View file

@ -266,8 +266,6 @@ gst_fakesink_chain (GstPad *pad, GstBuffer *buf)
gst_pad_event_default (pad, event);
break;
}
gst_event_free (event);
return;
}

View file

@ -323,7 +323,6 @@ gst_fakesrc_event_handler (GstPad *pad, GstEvent *event)
src->buffer_count = GST_EVENT_SEEK_OFFSET (event);
if (!GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
gst_event_free (event);
break;
}
/* else we do a flush too */
@ -333,6 +332,7 @@ gst_fakesrc_event_handler (GstPad *pad, GstEvent *event)
default:
break;
}
gst_event_unref (event);
return TRUE;
}
@ -680,8 +680,8 @@ gst_fakesrc_get(GstPad *pad)
if (src->last_message)
g_free (src->last_message);
src->last_message = g_strdup_printf ("get ******* (%s:%s)> (%d bytes, %llu)",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
src->last_message = g_strdup_printf ("get ******* (%s:%s)> (%d bytes, %llu) %p",
GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf), buf);
g_object_notify (G_OBJECT (src), "last_message");
}

View file

@ -304,7 +304,7 @@ gst_filesink_handle_event (GstPad *pad, GstEvent *event)
if (gst_event_discont_get_value (event, GST_FORMAT_BYTES, &offset))
fseek(filesink->file, offset, SEEK_SET);
gst_event_free (event);
gst_event_unref (event);
break;
}
case GST_EVENT_NEW_MEDIA:

View file

@ -337,6 +337,10 @@ gst_filesrc_free_parent_mmap (GstBuffer *buf)
#endif
/* now unmap the memory */
munmap(GST_BUFFER_DATA(buf),GST_BUFFER_MAXSIZE(buf));
GST_BUFFER_DATA (buf) = NULL;
_gst_buffer_free (buf);
}
static GstBuffer *
@ -371,7 +375,7 @@ gst_filesrc_map_region (GstFileSrc *src, off_t offset, size_t size)
GST_BUFFER_OFFSET(buf) = offset;
GST_BUFFER_TIMESTAMP(buf) = -1LL;
GST_BUFFER_POOL_PRIVATE(buf) = src;
GST_BUFFER_FREE_FUNC(buf) = gst_filesrc_free_parent_mmap;
GST_BUFFER_FREE_FUNC(buf) = (GstDataFreeFunction) gst_filesrc_free_parent_mmap;
g_mutex_lock(src->map_regions_lock);
g_tree_insert(src->map_regions,buf,buf);
@ -716,7 +720,7 @@ gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event)
guint64 offset;
if (GST_EVENT_SEEK_FORMAT (event) != GST_FORMAT_BYTES) {
return FALSE;
goto error;
}
offset = GST_EVENT_SEEK_OFFSET (event);
@ -724,24 +728,24 @@ gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event)
switch (GST_EVENT_SEEK_METHOD (event)) {
case GST_SEEK_METHOD_SET:
if (offset > src->filelen)
return FALSE;
goto error;
src->curoffset = offset;
GST_DEBUG(0, "seek set pending to %lld", src->curoffset);
break;
case GST_SEEK_METHOD_CUR:
if (offset + src->curoffset > src->filelen)
return FALSE;
goto error;
src->curoffset += offset;
GST_DEBUG(0, "seek cur pending to %lld", src->curoffset);
break;
case GST_SEEK_METHOD_END:
if (ABS (offset) > src->filelen)
return FALSE;
goto error;
src->curoffset = src->filelen - ABS (offset);
GST_DEBUG(0, "seek end pending to %lld", src->curoffset);
break;
default:
return FALSE;
goto error;
break;
}
g_object_notify (G_OBJECT (src), "offset");
@ -751,7 +755,7 @@ gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event)
}
case GST_EVENT_SIZE:
if (GST_EVENT_SIZE_FORMAT (event) != GST_FORMAT_BYTES) {
return FALSE;
goto error;
}
src->block_size = GST_EVENT_SIZE_VALUE (event);
g_object_notify (G_OBJECT (src), "blocksize");
@ -760,9 +764,13 @@ gst_filesrc_srcpad_event (GstPad *pad, GstEvent *event)
src->need_flush = TRUE;
break;
default:
return FALSE;
goto error;
break;
}
gst_event_unref (event);
return TRUE;
error:
gst_event_unref (event);
return FALSE;
}

View file

@ -22,6 +22,7 @@
/* #define DEBUG_ENABLED */
/* #define STATUS_ENABLED */
#ifdef STATUS_ENABLED
#define STATUS(A) GST_DEBUG(GST_CAT_DATAFLOW, A, GST_ELEMENT_NAME(queue))
#else
@ -36,6 +37,7 @@
#include "gstqueue.h"
#include "gstscheduler.h"
#include "gstevent.h"
#include "gstlog.h"
GstElementDetails gst_queue_details = {
"Queue",
@ -239,6 +241,7 @@ gst_queue_init (GstQueue *queue)
queue->not_empty = g_cond_new ();
queue->not_full = g_cond_new ();
queue->events = g_async_queue_new();
queue->queue = g_queue_new ();
GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions");
}
@ -250,6 +253,8 @@ gst_queue_dispose (GObject *object)
g_mutex_free (queue->qlock);
g_cond_free (queue->not_empty);
g_cond_free (queue->not_full);
gst_queue_locked_flush (queue);
g_queue_free (queue->queue);
g_async_queue_unref(queue->events);
@ -267,28 +272,25 @@ gst_queue_get_bufferpool (GstPad *pad)
}
static void
gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
gst_queue_cleanup_data (gpointer data, const gpointer user_data)
{
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p", data);
if (GST_IS_BUFFER (data)) {
gst_buffer_unref (GST_BUFFER (data));
}
else {
gst_event_free (GST_EVENT (data));
}
gst_data_unref (GST_DATA (data));
}
static void
gst_queue_locked_flush (GstQueue *queue)
{
g_list_foreach (queue->queue, gst_queue_cleanup_buffers,
(gpointer) queue);
g_list_free (queue->queue);
gpointer data;
queue->queue = NULL;
queue->level_buffers = 0;
while ((data = g_queue_pop_head (queue->queue))) {
gst_queue_cleanup_data (data, (gpointer) queue);
}
queue->timeval = NULL;
queue->level_buffers = 0;
queue->level_bytes = 0;
queue->level_time = 0LL;
/* make sure any pending buffers to be added are flushed too */
queue->flush = TRUE;
}
@ -364,25 +366,28 @@ restart:
}
/* otherwise we have to push a buffer off the other end */
else {
GList *front;
gpointer front;
GstBuffer *leakbuf;
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end");
front = queue->queue;
leakbuf = (GstBuffer *)(front->data);
if (GST_IS_EVENT (leakbuf))
front = g_queue_pop_head (queue->queue);
leakbuf = (GstBuffer *)(front);
if (GST_IS_EVENT (leakbuf)) {
fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
GST_ELEMENT_NAME(GST_ELEMENT(queue)),
GST_EVENT_TYPE(GST_EVENT(leakbuf)));
}
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
gst_buffer_unref(leakbuf);
queue->queue = g_list_remove_link (queue->queue, front);
g_list_free (front);
gst_data_unref (GST_DATA (leakbuf));
}
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d",
queue->level_buffers, queue->size_buffers);
while (queue->level_buffers == queue->size_buffers) {
/* if there's a pending state change for this queue or its manager, switch */
/* back to iterator so bottom half of state change executes */
@ -404,8 +409,7 @@ restart:
/* this means the other end is shut down */
/* try to signal to resolve the error */
if (!queue->may_deadlock) {
if (GST_IS_BUFFER (buf)) gst_buffer_unref (buf);
else gst_event_free (GST_EVENT (buf));
gst_data_unref (GST_DATA (buf));
g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
return;
@ -429,17 +433,18 @@ restart:
}
/* put the buffer on the tail of the list */
queue->queue = g_list_append (queue->queue, buf);
g_queue_push_tail (queue->queue, buf);
queue->level_buffers++;
queue->level_bytes += GST_BUFFER_SIZE(buf);
/* this assertion _has_ to hold */
g_assert (queue->queue->length == queue->level_buffers);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d",
GST_DEBUG_PAD_NAME(pad),
queue->level_buffers, queue->size_buffers);
/* this assertion _has_ to hold */
/* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
/* reader waiting on an empty queue */
reader = queue->reader;
@ -457,7 +462,7 @@ gst_queue_get (GstPad *pad)
{
GstQueue *queue;
GstBuffer *buf = NULL;
GList *front;
gpointer front;
gboolean writer;
g_assert(pad != NULL);
@ -467,8 +472,6 @@ gst_queue_get (GstPad *pad)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
restart:
/* have to lock for thread-safety */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld", pthread_self ());
@ -505,12 +508,14 @@ restart:
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!");
queue->reader = TRUE;
if (queue->block_timeout > -1){
//if (queue->block_timeout > -1){
if (FALSE) {
GTimeVal timeout;
g_get_current_time(&timeout);
g_time_val_add(&timeout, queue->block_timeout);
if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){
g_mutex_unlock (queue->qlock);
g_warning ("filler");
return GST_BUFFER(gst_event_new_filler());
}
}
@ -522,11 +527,9 @@ restart:
}
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
front = queue->queue;
buf = (GstBuffer *)(front->data);
front = g_queue_pop_head (queue->queue);
buf = (GstBuffer *)(front);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue", buf);
queue->queue = g_list_remove_link (queue->queue, front);
g_list_free (front);
queue->level_buffers--;
queue->level_bytes -= GST_BUFFER_SIZE(buf);
@ -536,7 +539,7 @@ restart:
queue->level_buffers, queue->size_buffers);
/* this assertion _has_ to hold */
/* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
g_assert (queue->queue->length == queue->level_buffers);
/* writer waiting on a full queue */
writer = queue->writer;
@ -640,11 +643,7 @@ gst_queue_change_state (GstElement *element)
new_state = GST_STATE_PENDING (element);
if (new_state == GST_STATE_PAUSED) {
/*g_cond_signal (queue->not_full); */
/*g_cond_signal (queue->not_empty); */
}
else if (new_state == GST_STATE_READY) {
if (new_state == GST_STATE_READY) {
gst_queue_locked_flush (queue);
}
else if (new_state == GST_STATE_PLAYING) {

View file

@ -28,10 +28,7 @@
#include <gst/gstelement.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
G_BEGIN_DECLS
extern GstElementDetails gst_queue_details;
@ -63,7 +60,7 @@ struct _GstQueue {
GstPad *srcpad;
/* the queue of buffers we're keeping our grubby hands on */
GList *queue;
GQueue *queue;
guint level_buffers; /* number of buffers queued here */
guint level_bytes; /* number of bytes queued here */
@ -101,9 +98,7 @@ struct _GstQueueClass {
GType gst_queue_get_type (void);
#ifdef __cplusplus
}
#endif /* __cplusplus */
G_END_DECLS
#endif /* __GST_QUEUE_H__ */