mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-24 01:00:37 +00:00
shmsink: Add custom allocator to allow for zero-copy shared memory use
This commit is contained in:
parent
df321edeaf
commit
48b9fa2c24
5 changed files with 407 additions and 123 deletions
|
@ -95,11 +95,251 @@ static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf);
|
|||
static gboolean gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event);
|
||||
static gboolean gst_shm_sink_unlock (GstBaseSink * bsink);
|
||||
static gboolean gst_shm_sink_unlock_stop (GstBaseSink * bsink);
|
||||
static gboolean gst_shm_sink_propose_allocation (GstBaseSink * sink,
|
||||
GstQuery * query);
|
||||
|
||||
static gpointer pollthread_func (gpointer data);
|
||||
|
||||
static guint signals[LAST_SIGNAL] = { 0 };
|
||||
|
||||
|
||||
|
||||
/********************
|
||||
* CUSTOM ALLOCATOR *
|
||||
********************/
|
||||
|
||||
#define GST_TYPE_SHM_SINK_ALLOCATOR \
|
||||
(gst_shm_sink_allocator_get_type())
|
||||
#define GST_SHM_SINK_ALLOCATOR(obj) \
|
||||
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SHM_SINK_ALLOCATOR, \
|
||||
GstShmSinkAllocator))
|
||||
#define GST_SHM_SINK_ALLOCATOR_CLASS(klass) \
|
||||
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SHM_SINK_ALLOCATOR, \
|
||||
GstShmSinkAllocatorClass))
|
||||
#define GST_IS_SHM_SINK_ALLOCATOR(obj) \
|
||||
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SHM_SINK_ALLOCATOR))
|
||||
#define GST_IS_SHM_SINK_ALLOCATOR_CLASS(klass) \
|
||||
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SINK_ALLOCATOR))
|
||||
|
||||
struct _GstShmSinkAllocator
|
||||
{
|
||||
GstAllocator parent;
|
||||
|
||||
GstShmSink *sink;
|
||||
};
|
||||
|
||||
typedef struct _GstShmSinkAllocatorClass
|
||||
{
|
||||
GstAllocatorClass parent;
|
||||
} GstShmSinkAllocatorClass;
|
||||
|
||||
typedef struct _GstShmSinkMemory
|
||||
{
|
||||
GstMemory mem;
|
||||
|
||||
gchar *data;
|
||||
GstShmSink *sink;
|
||||
ShmBlock *block;
|
||||
} GstShmSinkMemory;
|
||||
|
||||
GType gst_shm_sink_allocator_get_type (void);
|
||||
|
||||
G_DEFINE_TYPE (GstShmSinkAllocator, gst_shm_sink_allocator, GST_TYPE_ALLOCATOR);
|
||||
|
||||
static void
|
||||
gst_shm_sink_allocator_dispose (GObject * object)
|
||||
{
|
||||
GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (object);
|
||||
|
||||
if (self->sink)
|
||||
gst_object_unref (self->sink);
|
||||
self->sink = NULL;
|
||||
|
||||
G_OBJECT_CLASS (gst_shm_sink_allocator_parent_class)->dispose (object);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_shm_sink_allocator_free (GstAllocator * allocator, GstMemory * mem)
|
||||
{
|
||||
GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
|
||||
|
||||
if (mymem->block) {
|
||||
GST_OBJECT_LOCK (mymem->sink);
|
||||
sp_writer_free_block (mymem->block);
|
||||
GST_OBJECT_UNLOCK (mymem->sink);
|
||||
gst_object_unref (mymem->sink);
|
||||
}
|
||||
gst_object_unref (mem->allocator);
|
||||
|
||||
g_slice_free (GstShmSinkMemory, mymem);
|
||||
}
|
||||
|
||||
static gpointer
|
||||
gst_shm_sink_allocator_mem_map (GstMemory * mem, gsize maxsize,
|
||||
GstMapFlags flags)
|
||||
{
|
||||
GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
|
||||
|
||||
return mymem->data;
|
||||
}
|
||||
|
||||
static void
|
||||
gst_shm_sink_allocator_mem_unmap (GstMemory * mem)
|
||||
{
|
||||
}
|
||||
|
||||
static GstMemory *
|
||||
gst_shm_sink_allocator_mem_share (GstMemory * mem, gssize offset, gssize size)
|
||||
{
|
||||
GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
|
||||
GstShmSinkMemory *mysub;
|
||||
GstMemory *parent;
|
||||
|
||||
/* find the real parent */
|
||||
if ((parent = mem->parent) == NULL)
|
||||
parent = mem;
|
||||
|
||||
if (size == -1)
|
||||
size = mem->size - offset;
|
||||
|
||||
mysub = g_slice_new0 (GstShmSinkMemory);
|
||||
/* the shared memory is always readonly */
|
||||
gst_memory_init (GST_MEMORY_CAST (mysub), GST_MINI_OBJECT_FLAGS (parent) |
|
||||
GST_MINI_OBJECT_FLAG_LOCK_READONLY, gst_object_ref (mem->allocator),
|
||||
parent, mem->maxsize, mem->align, mem->offset + offset, size);
|
||||
mysub->data = mymem->data;
|
||||
|
||||
return (GstMemory *) mysub;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_shm_sink_allocator_mem_is_span (GstMemory * mem1, GstMemory * mem2,
|
||||
gsize * offset)
|
||||
{
|
||||
GstShmSinkMemory *mymem1 = (GstShmSinkMemory *) mem1;
|
||||
GstShmSinkMemory *mymem2 = (GstShmSinkMemory *) mem2;
|
||||
|
||||
if (offset) {
|
||||
GstMemory *parent;
|
||||
|
||||
parent = mem1->parent;
|
||||
|
||||
*offset = mem1->offset - parent->offset;
|
||||
}
|
||||
|
||||
/* and memory is contiguous */
|
||||
return mymem1->data + mem1->offset + mem1->size ==
|
||||
mymem2->data + mem2->offset;
|
||||
}
|
||||
|
||||
static void
|
||||
gst_shm_sink_allocator_init (GstShmSinkAllocator * self)
|
||||
{
|
||||
GstAllocator *allocator = GST_ALLOCATOR (self);
|
||||
|
||||
allocator->mem_map = gst_shm_sink_allocator_mem_map;
|
||||
allocator->mem_unmap = gst_shm_sink_allocator_mem_unmap;
|
||||
allocator->mem_share = gst_shm_sink_allocator_mem_share;
|
||||
allocator->mem_is_span = gst_shm_sink_allocator_mem_is_span;
|
||||
}
|
||||
|
||||
|
||||
static GstMemory *
|
||||
gst_shm_sink_allocator_alloc_locked (GstShmSinkAllocator * self, gsize size,
|
||||
GstAllocationParams * params)
|
||||
{
|
||||
GstMemory *memory = NULL;
|
||||
ShmBlock *block = NULL;
|
||||
gsize maxsize = size + params->prefix + params->padding;
|
||||
gsize align = params->align;
|
||||
|
||||
/* ensure configured alignment */
|
||||
align |= gst_memory_alignment;
|
||||
/* allocate more to compensate for alignment */
|
||||
maxsize += align;
|
||||
|
||||
block = sp_writer_alloc_block (self->sink->pipe, size);
|
||||
if (block) {
|
||||
GstShmSinkMemory *mymem;
|
||||
gsize aoffset, padding;
|
||||
|
||||
GST_LOG_OBJECT (self, "Allocated block %p with %u bytes at %p",
|
||||
block, size, sp_writer_block_get_buf (block));
|
||||
|
||||
mymem = g_slice_new0 (GstShmSinkMemory);
|
||||
memory = GST_MEMORY_CAST (mymem);
|
||||
mymem->data = sp_writer_block_get_buf (block);
|
||||
mymem->sink = gst_object_ref (self->sink);
|
||||
mymem->block = block;
|
||||
|
||||
/* do alignment */
|
||||
if ((aoffset = ((guintptr) mymem->data & align))) {
|
||||
aoffset = (align + 1) - aoffset;
|
||||
mymem->data += aoffset;
|
||||
maxsize -= aoffset;
|
||||
}
|
||||
|
||||
if (params->prefix && (params->flags & GST_MEMORY_FLAG_ZERO_PREFIXED))
|
||||
memset (mymem->data, 0, params->prefix);
|
||||
|
||||
padding = maxsize - (params->prefix + size);
|
||||
if (padding && (params->flags & GST_MEMORY_FLAG_ZERO_PADDED))
|
||||
memset (mymem->data + params->prefix + size, 0, padding);
|
||||
|
||||
gst_memory_init (memory, params->flags, g_object_ref (self), NULL,
|
||||
maxsize, align, params->prefix, size);
|
||||
}
|
||||
|
||||
return memory;
|
||||
}
|
||||
|
||||
static GstMemory *
|
||||
gst_shm_sink_allocator_alloc (GstAllocator * allocator, gsize size,
|
||||
GstAllocationParams * params)
|
||||
{
|
||||
GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (allocator);
|
||||
GstMemory *memory = NULL;
|
||||
|
||||
GST_OBJECT_LOCK (self->sink);
|
||||
memory = gst_shm_sink_allocator_alloc_locked (self, size, params);
|
||||
GST_OBJECT_UNLOCK (self->sink);
|
||||
|
||||
if (!memory) {
|
||||
memory = gst_allocator_alloc (NULL, size, params);
|
||||
GST_LOG_OBJECT (self, "Not enough shared memory for GstMemory of %u bytes, "
|
||||
"allocating using standard allocator", size);
|
||||
}
|
||||
|
||||
return memory;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
gst_shm_sink_allocator_class_init (GstShmSinkAllocatorClass * klass)
|
||||
{
|
||||
GstAllocatorClass *allocator_class = GST_ALLOCATOR_CLASS (klass);
|
||||
GObjectClass *object_class = G_OBJECT_CLASS (klass);
|
||||
|
||||
allocator_class->alloc = gst_shm_sink_allocator_alloc;
|
||||
allocator_class->free = gst_shm_sink_allocator_free;
|
||||
object_class->dispose = gst_shm_sink_allocator_dispose;
|
||||
}
|
||||
|
||||
static GstShmSinkAllocator *
|
||||
gst_shm_sink_allocator_new (GstShmSink * sink)
|
||||
{
|
||||
GstShmSinkAllocator *self = g_object_new (GST_TYPE_SHM_SINK_ALLOCATOR, NULL);
|
||||
|
||||
self->sink = gst_object_ref (sink);
|
||||
|
||||
return self;
|
||||
}
|
||||
|
||||
|
||||
/***************
|
||||
* MAIN OBJECT *
|
||||
***************/
|
||||
|
||||
static void
|
||||
gst_shm_sink_init (GstShmSink * self)
|
||||
{
|
||||
|
@ -107,6 +347,8 @@ gst_shm_sink_init (GstShmSink * self)
|
|||
self->size = DEFAULT_SIZE;
|
||||
self->wait_for_connection = DEFAULT_WAIT_FOR_CONNECTION;
|
||||
self->perms = DEFAULT_PERMS;
|
||||
|
||||
gst_allocation_params_init (&self->params);
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -130,6 +372,8 @@ gst_shm_sink_class_init (GstShmSinkClass * klass)
|
|||
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shm_sink_event);
|
||||
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock);
|
||||
gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock_stop);
|
||||
gstbasesink_class->propose_allocation =
|
||||
GST_DEBUG_FUNCPTR (gst_shm_sink_propose_allocation);
|
||||
|
||||
g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
|
||||
g_param_spec_string ("socket-path",
|
||||
|
@ -225,12 +469,18 @@ gst_shm_sink_set_property (GObject * object, guint prop_id,
|
|||
case PROP_SHM_SIZE:
|
||||
GST_OBJECT_LOCK (object);
|
||||
if (self->pipe) {
|
||||
if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0)
|
||||
if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0) {
|
||||
/* Swap allocators, so we can know immediately if the memory is
|
||||
* ours */
|
||||
gst_object_unref (self->allocator);
|
||||
self->allocator = gst_shm_sink_allocator_new (self);
|
||||
|
||||
GST_DEBUG_OBJECT (self, "Resized shared memory area from %u to "
|
||||
"%u bytes", self->size, g_value_get_uint (value));
|
||||
else
|
||||
} else {
|
||||
GST_WARNING_OBJECT (self, "Could not resize shared memory area from"
|
||||
"%u to %u bytes", self->size, g_value_get_uint (value));
|
||||
}
|
||||
}
|
||||
self->size = g_value_get_uint (value);
|
||||
GST_OBJECT_UNLOCK (object);
|
||||
|
@ -329,11 +579,13 @@ gst_shm_sink_start (GstBaseSink * bsink)
|
|||
if (!self->pollthread)
|
||||
goto thread_error;
|
||||
|
||||
self->allocator = gst_shm_sink_allocator_new (self);
|
||||
|
||||
return TRUE;
|
||||
|
||||
thread_error:
|
||||
|
||||
sp_close (self->pipe);
|
||||
sp_writer_close (self->pipe, NULL, NULL);
|
||||
self->pipe = NULL;
|
||||
gst_poll_free (self->poll);
|
||||
|
||||
|
@ -352,6 +604,10 @@ gst_shm_sink_stop (GstBaseSink * bsink)
|
|||
self->stop = TRUE;
|
||||
gst_poll_set_flushing (self->poll, TRUE);
|
||||
|
||||
if (self->allocator)
|
||||
gst_object_unref (self->allocator);
|
||||
self->allocator = NULL;
|
||||
|
||||
g_thread_join (self->pollthread);
|
||||
self->pollthread = NULL;
|
||||
|
||||
|
@ -360,7 +616,8 @@ gst_shm_sink_stop (GstBaseSink * bsink)
|
|||
while (self->clients) {
|
||||
struct GstShmClient *client = self->clients->data;
|
||||
self->clients = g_list_remove (self->clients, client);
|
||||
sp_writer_close_client (self->pipe, client->client);
|
||||
sp_writer_close_client (self->pipe, client->client,
|
||||
(sp_buffer_free_callback) gst_buffer_unref, NULL);
|
||||
g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0,
|
||||
client->pollfd.fd);
|
||||
g_slice_free (struct GstShmClient, client);
|
||||
|
@ -369,7 +626,7 @@ gst_shm_sink_stop (GstBaseSink * bsink)
|
|||
gst_poll_free (self->poll);
|
||||
self->poll = NULL;
|
||||
|
||||
sp_close (self->pipe);
|
||||
sp_writer_close (self->pipe, NULL, NULL);
|
||||
self->pipe = NULL;
|
||||
|
||||
return TRUE;
|
||||
|
@ -385,8 +642,8 @@ gst_shm_sink_can_render (GstShmSink * self, GstClockTime time)
|
|||
|
||||
b = sp_writer_get_pending_buffers (self->pipe);
|
||||
for (; b != NULL; b = sp_writer_get_next_buffer (b)) {
|
||||
GstClockTime t = sp_writer_buf_get_tag (b);
|
||||
if (GST_CLOCK_DIFF (time, t) > self->buffer_time)
|
||||
GstBuffer *buf = sp_writer_buf_get_tag (b);
|
||||
if (GST_CLOCK_DIFF (time, GST_BUFFER_PTS (buf)) > self->buffer_time)
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
|
@ -397,38 +654,44 @@ static GstFlowReturn
|
|||
gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf)
|
||||
{
|
||||
GstShmSink *self = GST_SHM_SINK (bsink);
|
||||
int rv;
|
||||
int rv = 0;
|
||||
GstMapInfo map;
|
||||
gboolean need_new_memory = FALSE;
|
||||
GstFlowReturn ret = GST_FLOW_OK;
|
||||
GstMemory *memory = NULL;
|
||||
GstBuffer *sendbuf = NULL;
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
while (self->wait_for_connection && !self->clients) {
|
||||
g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
|
||||
if (self->unlock) {
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
return GST_FLOW_FLUSHING;
|
||||
}
|
||||
if (self->unlock)
|
||||
goto flushing;
|
||||
}
|
||||
|
||||
while (!gst_shm_sink_can_render (self, GST_BUFFER_TIMESTAMP (buf))) {
|
||||
g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
|
||||
if (self->unlock) {
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
return GST_FLOW_FLUSHING;
|
||||
if (self->unlock)
|
||||
goto flushing;
|
||||
}
|
||||
|
||||
|
||||
if (gst_buffer_n_memory (buf) > 1) {
|
||||
GST_LOG_OBJECT (self, "Buffer %p has %d GstMemory, we only support a single"
|
||||
" one, need to do a memcpy", buf, gst_buffer_n_memory (buf));
|
||||
need_new_memory = TRUE;
|
||||
} else {
|
||||
memory = gst_buffer_peek_memory (buf, 0);
|
||||
|
||||
if (memory->allocator != GST_ALLOCATOR (self->allocator)) {
|
||||
need_new_memory = TRUE;
|
||||
GST_LOG_OBJECT (self, "Memory in buffer %p was not allocated by "
|
||||
"%" GST_PTR_FORMAT ", will memcpy", buf, memory->allocator);
|
||||
}
|
||||
}
|
||||
|
||||
gst_buffer_map (buf, &map, GST_MAP_READ);
|
||||
rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size,
|
||||
GST_BUFFER_TIMESTAMP (buf));
|
||||
gst_buffer_unmap (buf, &map);
|
||||
|
||||
if (rv == -1) {
|
||||
ShmBlock *block = NULL;
|
||||
gchar *shmbuf = NULL;
|
||||
|
||||
if (need_new_memory) {
|
||||
if (gst_buffer_get_size (buf) > sp_writer_get_max_buf_size (self->pipe)) {
|
||||
gsize area_size = sp_writer_get_max_buf_size (self->pipe);
|
||||
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
GST_ELEMENT_ERROR (self, RESOURCE, NO_SPACE_LEFT,
|
||||
("Shared memory area is too small"),
|
||||
|
@ -438,96 +701,71 @@ gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf)
|
|||
return GST_FLOW_ERROR;
|
||||
}
|
||||
|
||||
while ((block = sp_writer_alloc_block (self->pipe,
|
||||
gst_buffer_get_size (buf))) == NULL) {
|
||||
while ((memory =
|
||||
gst_shm_sink_allocator_alloc_locked (self->allocator,
|
||||
gst_buffer_get_size (buf), &self->params)) == NULL) {
|
||||
g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
|
||||
if (self->unlock) {
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
return GST_FLOW_FLUSHING;
|
||||
}
|
||||
if (self->unlock)
|
||||
goto flushing;
|
||||
}
|
||||
|
||||
while (self->wait_for_connection && !self->clients) {
|
||||
g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
|
||||
if (self->unlock) {
|
||||
sp_writer_free_block (block);
|
||||
gst_memory_unref (memory);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
return GST_FLOW_FLUSHING;
|
||||
}
|
||||
}
|
||||
|
||||
shmbuf = sp_writer_block_get_buf (block);
|
||||
gst_buffer_extract (buf, 0, shmbuf, gst_buffer_get_size (buf));
|
||||
sp_writer_send_buf (self->pipe, shmbuf, gst_buffer_get_size (buf),
|
||||
GST_BUFFER_TIMESTAMP (buf));
|
||||
sp_writer_free_block (block);
|
||||
gst_memory_map (memory, &map, GST_MAP_WRITE);
|
||||
gst_buffer_extract (buf, 0, map.data, map.size);
|
||||
gst_memory_unmap (memory, &map);
|
||||
|
||||
sendbuf = gst_buffer_new ();
|
||||
gst_buffer_copy_into (sendbuf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
|
||||
gst_buffer_append_memory (sendbuf, memory);
|
||||
} else {
|
||||
sendbuf = gst_buffer_ref (buf);
|
||||
}
|
||||
|
||||
gst_buffer_map (sendbuf, &map, GST_MAP_READ);
|
||||
/* Make the memory readonly as of now as we've sent it to the other side
|
||||
* We know it's not mapped for writing anywhere as we just mapped it for
|
||||
* reading
|
||||
*/
|
||||
|
||||
rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size, sendbuf);
|
||||
|
||||
gst_buffer_unmap (sendbuf, &map);
|
||||
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
|
||||
return GST_FLOW_OK;
|
||||
if (rv == -1) {
|
||||
GST_ELEMENT_ERROR (self, STREAM, FAILED, ("Invalid allocated buffer"),
|
||||
("The shmpipe library rejects our buffer, this is a bug"));
|
||||
ret = GST_FLOW_ERROR;
|
||||
}
|
||||
|
||||
/* If we allocated our own memory, then unmap it */
|
||||
|
||||
return ret;
|
||||
|
||||
flushing:
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
return GST_FLOW_FLUSHING;
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
/* FIXME 0.11 implement some bufferpool support */
|
||||
|
||||
static void
|
||||
gst_shm_sink_free_buffer (gpointer data)
|
||||
free_buffer_locked (GstBuffer * buffer, void *data)
|
||||
{
|
||||
ShmPipe *pipe;
|
||||
ShmBlock *block = data;
|
||||
GstShmSink *self;
|
||||
GSList **list = data;
|
||||
|
||||
pipe = sp_writer_block_get_pipe (block);
|
||||
self = sp_get_data (pipe);
|
||||
g_assert (buffer != NULL);
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
sp_writer_free_block (block);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
g_object_unref (self);
|
||||
*list = g_slist_prepend (*list, buffer);
|
||||
}
|
||||
|
||||
static GstFlowReturn
|
||||
gst_shm_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
|
||||
GstCaps * caps, GstBuffer ** out_buf)
|
||||
{
|
||||
GstShmSink *self = GST_SHM_SINK (sink);
|
||||
GstBuffer *buffer;
|
||||
ShmBlock *block = NULL;
|
||||
gpointer buf = NULL;
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
block = sp_writer_alloc_block (self->pipe, size);
|
||||
if (block) {
|
||||
buf = sp_writer_block_get_buf (block);
|
||||
g_object_ref (self);
|
||||
}
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
|
||||
if (block) {
|
||||
buffer = gst_buffer_new ();
|
||||
GST_BUFFER_DATA (buffer) = buf;
|
||||
GST_BUFFER_MALLOCDATA (buffer) = (guint8 *) block;
|
||||
GST_BUFFER_FREE_FUNC (buffer) =
|
||||
GST_DEBUG_FUNCPTR (gst_shm_sink_free_buffer);
|
||||
GST_BUFFER_SIZE (buffer) = size;
|
||||
GST_LOG_OBJECT (self,
|
||||
"Allocated buffer of %u bytes from shared memory at %p", size, buf);
|
||||
} else {
|
||||
buffer = gst_buffer_new_and_alloc (size);
|
||||
GST_LOG_OBJECT (self, "Not enough shared memory for buffer of %u bytes, "
|
||||
"allocating using standard allocator", size);
|
||||
}
|
||||
|
||||
GST_BUFFER_OFFSET (buffer) = offset;
|
||||
gst_buffer_set_caps (buffer, caps);
|
||||
|
||||
*out_buf = buffer;
|
||||
|
||||
return GST_FLOW_OK;
|
||||
}
|
||||
#endif
|
||||
|
||||
static gpointer
|
||||
pollthread_func (gpointer data)
|
||||
{
|
||||
|
@ -605,9 +843,10 @@ pollthread_func (gpointer data)
|
|||
|
||||
if (gst_poll_fd_can_read (self->poll, &gclient->pollfd)) {
|
||||
int rv;
|
||||
gpointer tag = NULL;
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
rv = sp_writer_recv (self->pipe, gclient->client);
|
||||
rv = sp_writer_recv (self->pipe, gclient->client, &tag);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
|
||||
if (rv < 0) {
|
||||
|
@ -615,12 +854,22 @@ pollthread_func (gpointer data)
|
|||
" closing (retval: %d errno: %d)", rv, errno);
|
||||
goto close_client;
|
||||
}
|
||||
|
||||
g_assert (rv == 0 || tag == NULL);
|
||||
|
||||
if (rv == 0)
|
||||
gst_buffer_unref (tag);
|
||||
}
|
||||
continue;
|
||||
close_client:
|
||||
GST_OBJECT_LOCK (self);
|
||||
sp_writer_close_client (self->pipe, gclient->client);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
{
|
||||
GSList *list = NULL;
|
||||
GST_OBJECT_LOCK (self);
|
||||
sp_writer_close_client (self->pipe, gclient->client,
|
||||
(sp_buffer_free_callback) free_buffer_locked, (void **) &list);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
g_slist_free_full (list, (GDestroyNotify) gst_buffer_unref);
|
||||
}
|
||||
|
||||
gst_poll_remove_fd (self->poll, &gclient->pollfd);
|
||||
self->clients = g_list_remove (self->clients, gclient);
|
||||
|
@ -683,3 +932,15 @@ gst_shm_sink_unlock_stop (GstBaseSink * bsink)
|
|||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_shm_sink_propose_allocation (GstBaseSink * sink, GstQuery * query)
|
||||
{
|
||||
GstShmSink *self = GST_SHM_SINK (sink);
|
||||
|
||||
if (self->allocator)
|
||||
gst_query_add_allocation_param (query, GST_ALLOCATOR (self->allocator),
|
||||
NULL);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ G_BEGIN_DECLS
|
|||
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SINK))
|
||||
typedef struct _GstShmSink GstShmSink;
|
||||
typedef struct _GstShmSinkClass GstShmSinkClass;
|
||||
typedef struct _GstShmSinkAllocator GstShmSinkAllocator;
|
||||
|
||||
struct _GstShmSink
|
||||
{
|
||||
|
@ -64,6 +65,10 @@ struct _GstShmSink
|
|||
GstClockTimeDiff buffer_time;
|
||||
|
||||
GCond cond;
|
||||
|
||||
GstShmSinkAllocator *allocator;
|
||||
|
||||
GstAllocationParams params;
|
||||
};
|
||||
|
||||
struct _GstShmSinkClass
|
||||
|
|
|
@ -357,10 +357,8 @@ gst_shm_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
|
|||
gsb->pipe = self->pipe;
|
||||
gst_shm_pipe_inc (self->pipe);
|
||||
|
||||
*outbuf = gst_buffer_new ();
|
||||
gst_buffer_append_memory (*outbuf,
|
||||
gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
|
||||
buf, rv, 0, rv, gsb, free_buffer));
|
||||
*outbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
|
||||
buf, rv, 0, rv, gsb, free_buffer);
|
||||
|
||||
return GST_FLOW_OK;
|
||||
}
|
||||
|
@ -445,7 +443,7 @@ gst_shm_pipe_dec (GstShmPipe * pipe)
|
|||
}
|
||||
|
||||
if (pipe->pipe)
|
||||
sp_close (pipe->pipe);
|
||||
sp_client_close (pipe->pipe);
|
||||
GST_OBJECT_UNLOCK (pipe->src);
|
||||
|
||||
gst_object_unref (pipe->src);
|
||||
|
|
|
@ -118,7 +118,7 @@ struct _ShmBuffer
|
|||
int num_clients;
|
||||
int clients[0];
|
||||
|
||||
uint64_t tag;
|
||||
void *tag;
|
||||
};
|
||||
|
||||
|
||||
|
@ -183,14 +183,14 @@ struct CommandBuffer
|
|||
static ShmArea *sp_open_shm (char *path, int id, mode_t perms, size_t size);
|
||||
static void sp_close_shm (ShmArea * area);
|
||||
static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf,
|
||||
ShmBuffer * prev_buf, ShmClient * client);
|
||||
ShmBuffer * prev_buf, ShmClient * client, void **tag);
|
||||
static void sp_shm_area_dec (ShmPipe * self, ShmArea * area);
|
||||
|
||||
|
||||
|
||||
#define RETURN_ERROR(format, ...) do { \
|
||||
fprintf (stderr, format, __VA_ARGS__); \
|
||||
sp_close (self); \
|
||||
sp_writer_close (self, NULL, NULL); \
|
||||
return NULL; \
|
||||
} while (0)
|
||||
|
||||
|
@ -424,7 +424,8 @@ sp_dec (ShmPipe * self)
|
|||
}
|
||||
|
||||
void
|
||||
sp_close (ShmPipe * self)
|
||||
sp_writer_close (ShmPipe * self, sp_buffer_free_callback callback,
|
||||
void *user_data)
|
||||
{
|
||||
if (self->main_socket >= 0)
|
||||
close (self->main_socket);
|
||||
|
@ -435,11 +436,18 @@ sp_close (ShmPipe * self)
|
|||
}
|
||||
|
||||
while (self->clients)
|
||||
sp_writer_close_client (self, self->clients);
|
||||
sp_writer_close_client (self, self->clients, callback, user_data);
|
||||
|
||||
sp_dec (self);
|
||||
}
|
||||
|
||||
void
|
||||
sp_client_close (ShmPipe * self)
|
||||
{
|
||||
sp_writer_close (self, NULL, NULL);
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
sp_writer_setperms_shm (ShmPipe * self, mode_t perms)
|
||||
{
|
||||
|
@ -560,7 +568,7 @@ sp_writer_free_block (ShmBlock * block)
|
|||
/* Returns the number of client this has successfully been sent to */
|
||||
|
||||
int
|
||||
sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, uint64_t tag)
|
||||
sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void *tag)
|
||||
{
|
||||
ShmArea *area = NULL;
|
||||
unsigned long offset = 0;
|
||||
|
@ -699,7 +707,7 @@ sp_client_recv (ShmPipe * self, char **buf)
|
|||
}
|
||||
|
||||
int
|
||||
sp_writer_recv (ShmPipe * self, ShmClient * client)
|
||||
sp_writer_recv (ShmPipe * self, ShmClient * client, void **tag)
|
||||
{
|
||||
ShmBuffer *buf = NULL, *prev_buf = NULL;
|
||||
struct CommandBuffer cb;
|
||||
|
@ -713,7 +721,7 @@ sp_writer_recv (ShmPipe * self, ShmClient * client)
|
|||
for (buf = self->buffers; buf; buf = buf->next) {
|
||||
if (buf->shm_area->id == cb.area_id &&
|
||||
buf->offset == cb.payload.ack_buffer.offset) {
|
||||
sp_shmbuf_dec (self, buf, prev_buf, client);
|
||||
return sp_shmbuf_dec (self, buf, prev_buf, client, tag);
|
||||
break;
|
||||
}
|
||||
prev_buf = buf;
|
||||
|
@ -786,7 +794,7 @@ sp_client_open (const char *path)
|
|||
return self;
|
||||
|
||||
error:
|
||||
sp_close (self);
|
||||
sp_client_close (self);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -837,7 +845,7 @@ error:
|
|||
|
||||
static int
|
||||
sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf,
|
||||
ShmClient * client)
|
||||
ShmClient * client, void **tag)
|
||||
{
|
||||
int i;
|
||||
int had_client = 0;
|
||||
|
@ -866,6 +874,8 @@ sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf,
|
|||
else
|
||||
self->buffers = buf->next;
|
||||
|
||||
if (tag)
|
||||
*tag = buf->tag;
|
||||
shm_alloc_space_block_dec (buf->ablock);
|
||||
sp_shm_area_dec (self, buf->shm_area);
|
||||
spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * buf->num_clients, buf);
|
||||
|
@ -875,7 +885,8 @@ sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf,
|
|||
}
|
||||
|
||||
void
|
||||
sp_writer_close_client (ShmPipe * self, ShmClient * client)
|
||||
sp_writer_close_client (ShmPipe * self, ShmClient * client,
|
||||
sp_buffer_free_callback callback, void *user_data)
|
||||
{
|
||||
ShmBuffer *buffer = NULL, *prev_buf = NULL;
|
||||
ShmClient *item = NULL, *prev_item = NULL;
|
||||
|
@ -885,11 +896,15 @@ sp_writer_close_client (ShmPipe * self, ShmClient * client)
|
|||
again:
|
||||
for (buffer = self->buffers; buffer; buffer = buffer->next) {
|
||||
int i;
|
||||
void *tag = NULL;
|
||||
|
||||
for (i = 0; i < buffer->num_clients; i++) {
|
||||
if (buffer->clients[i] == client->fd) {
|
||||
if (!sp_shmbuf_dec (self, buffer, prev_buf, client))
|
||||
if (!sp_shmbuf_dec (self, buffer, prev_buf, client, &tag)) {
|
||||
if (callback)
|
||||
callback (tag, user_data);
|
||||
goto again;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -949,7 +964,7 @@ sp_writer_get_next_buffer (ShmBuffer * buffer)
|
|||
return buffer->next;
|
||||
}
|
||||
|
||||
uint64_t
|
||||
void *
|
||||
sp_writer_buf_get_tag (ShmBuffer * buffer)
|
||||
{
|
||||
return buffer->tag;
|
||||
|
|
|
@ -78,9 +78,12 @@ typedef struct _ShmPipe ShmPipe;
|
|||
typedef struct _ShmBlock ShmBlock;
|
||||
typedef struct _ShmBuffer ShmBuffer;
|
||||
|
||||
typedef void (*sp_buffer_free_callback) (void * tag, void * user_data);
|
||||
|
||||
ShmPipe *sp_writer_create (const char *path, size_t size, mode_t perms);
|
||||
const char *sp_writer_get_path (ShmPipe *pipe);
|
||||
void sp_close (ShmPipe * self);
|
||||
void sp_writer_close (ShmPipe * self, sp_buffer_free_callback callback,
|
||||
void * user_data);
|
||||
void *sp_get_data (ShmPipe * self);
|
||||
void sp_set_data (ShmPipe * self, void *data);
|
||||
|
||||
|
@ -92,24 +95,26 @@ int sp_writer_get_client_fd (ShmClient * client);
|
|||
|
||||
ShmBlock *sp_writer_alloc_block (ShmPipe * self, size_t size);
|
||||
void sp_writer_free_block (ShmBlock *block);
|
||||
int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, uint64_t tag);
|
||||
int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size, void * tag);
|
||||
char *sp_writer_block_get_buf (ShmBlock *block);
|
||||
ShmPipe *sp_writer_block_get_pipe (ShmBlock *block);
|
||||
size_t sp_writer_get_max_buf_size (ShmPipe * self);
|
||||
|
||||
ShmClient * sp_writer_accept_client (ShmPipe * self);
|
||||
void sp_writer_close_client (ShmPipe *self, ShmClient * client);
|
||||
int sp_writer_recv (ShmPipe * self, ShmClient * client);
|
||||
void sp_writer_close_client (ShmPipe *self, ShmClient * client,
|
||||
sp_buffer_free_callback callback, void * user_data);
|
||||
int sp_writer_recv (ShmPipe * self, ShmClient * client, void ** tag);
|
||||
|
||||
int sp_writer_pending_writes (ShmPipe * self);
|
||||
|
||||
ShmBuffer *sp_writer_get_pending_buffers (ShmPipe * self);
|
||||
ShmBuffer *sp_writer_get_next_buffer (ShmBuffer * buffer);
|
||||
void *sp_writer_buf_get_tag (ShmBuffer * buffer);
|
||||
|
||||
ShmPipe *sp_client_open (const char *path);
|
||||
long int sp_client_recv (ShmPipe * self, char **buf);
|
||||
int sp_client_recv_finish (ShmPipe * self, char *buf);
|
||||
|
||||
ShmBuffer *sp_writer_get_pending_buffers (ShmPipe * self);
|
||||
ShmBuffer *sp_writer_get_next_buffer (ShmBuffer * buffer);
|
||||
uint64_t sp_writer_buf_get_tag (ShmBuffer * buffer);
|
||||
void sp_client_close (ShmPipe * self);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue