From 7e90514747ee9a2af4597dbb521f04099f90a9d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20Cr=C3=AAte?= Date: Thu, 29 Oct 2009 19:18:25 +0000 Subject: [PATCH] shm: Add shmpipe implementation --- gst/shm/Makefile.am | 4 +- gst/shm/shmalloc.c | 153 +++++++++ gst/shm/shmalloc.h | 42 +++ gst/shm/shmpipe.c | 819 ++++++++++++++++++++++++++++++++++++++++++++ gst/shm/shmpipe.h | 68 ++++ 5 files changed, 1084 insertions(+), 2 deletions(-) create mode 100644 gst/shm/shmalloc.c create mode 100644 gst/shm/shmalloc.h create mode 100644 gst/shm/shmpipe.c create mode 100644 gst/shm/shmpipe.h diff --git a/gst/shm/Makefile.am b/gst/shm/Makefile.am index 84a26c020d..31375016e2 100644 --- a/gst/shm/Makefile.am +++ b/gst/shm/Makefile.am @@ -4,10 +4,10 @@ include $(top_srcdir)/common/glib-gen.mak plugin_LTLIBRARIES = libgstshm.la -libgstshm_la_SOURCES = gstshm.c gstshmsrc.c gstshmsink.c +libgstshm_la_SOURCES = shmpipe.c shmalloc.c gstshm.c gstshmsrc.c gstshmsink.c libgstshm_la_CFLAGS = $(GST_CFLAGS) libgstshm_la_LIBADD = libgstshm_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) libgstshm_la_LIBTOOLFLAGS = --tag=disable-static -noinst_HEADERS = gstshmsrc.h gstshmsink.h +noinst_HEADERS = gstshmsrc.h gstshmsink.h shmpipe.h shmalloc.h diff --git a/gst/shm/shmalloc.c b/gst/shm/shmalloc.c new file mode 100644 index 0000000000..75acfbcb23 --- /dev/null +++ b/gst/shm/shmalloc.c @@ -0,0 +1,153 @@ + + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "shmalloc.h" + +#include +#include +#include + +struct _ShmAllocSpace +{ + size_t size; + + ShmAllocBlock *blocks; +}; + +struct _ShmAllocBlock +{ + int use_count; + + ShmAllocSpace *space; + + unsigned long offset; + unsigned long size; + + ShmAllocBlock *next; +}; + + +ShmAllocSpace * +shm_alloc_space_new (size_t size) +{ + ShmAllocSpace *self = spalloc_new (ShmAllocSpace); + + memset (self, 0, sizeof (ShmAllocSpace)); + + self->size = size; + + return self; +} + +void +shm_alloc_space_free (ShmAllocSpace * self) +{ + assert (self && self->blocks == NULL); + spalloc_free (ShmAllocSpace, self); +} + + +ShmAllocBlock * +shm_alloc_space_alloc_block (ShmAllocSpace * self, unsigned long size) +{ + ShmAllocBlock *block; + ShmAllocBlock *item = NULL; + ShmAllocBlock *prev_item = NULL; + unsigned long prev_end_offset = 0; + + + for (item = self->blocks; item; item = item->next) { + unsigned long max_size = 0; + + max_size = item->offset - prev_end_offset; + + if (max_size >= size) + break; + + prev_end_offset = item->offset + item->size; + prev_item = item; + } + + /* Did not find space before an existing block */ + if (self->blocks && !item) { + /* Return NULL if there is no big enough space, otherwise, there is space + * at the end */ + if (self->size - prev_end_offset < size) + return NULL; + } + + block = spalloc_new (ShmAllocBlock); + memset (block, 0, sizeof (ShmAllocBlock)); + block->offset = prev_end_offset; + block->size = size; + block->use_count = 1; + block->space = self; + + if (prev_item) + prev_item->next = block; + else + self->blocks = block; + + block->next = item; + + return block; +} + +unsigned long +shm_alloc_space_alloc_block_get_offset (ShmAllocBlock * block) +{ + return block->offset; +} + +static void +shm_alloc_space_free_block (ShmAllocBlock * block) +{ + ShmAllocBlock *item = NULL; + ShmAllocBlock *prev_item = NULL; + ShmAllocSpace *self = block->space; + + for (item = self->blocks; item; item = item->next) { + if (item == block) { + if (prev_item) + prev_item->next = item->next; + else + self->blocks = item->next; + break; + } + prev_item = item; + } + + spalloc_free (ShmAllocBlock, block); +} + +ShmAllocBlock * +shm_alloc_space_block_get (ShmAllocSpace * self, unsigned long offset) +{ + ShmAllocBlock *block = NULL; + + for (block = self->blocks; block; block = block->next) { + if (block->offset <= offset && (block->offset + block->size) > offset) + return block; + } + + return NULL; +} + + +void +shm_alloc_space_block_inc (ShmAllocBlock * block) +{ + block->use_count++; +} + +void +shm_alloc_space_block_dec (ShmAllocBlock * block) +{ + block->use_count--; + + if (block->use_count <= 0) + shm_alloc_space_free_block (block); +} diff --git a/gst/shm/shmalloc.h b/gst/shm/shmalloc.h new file mode 100644 index 0000000000..442b0c4238 --- /dev/null +++ b/gst/shm/shmalloc.h @@ -0,0 +1,42 @@ + +#include + +#ifndef __SHMALLOC_H__ +#define __SHMALLOC_H__ + +#ifdef GST_PACKAGE_NAME +#include + +#define spalloc_new(type) g_slice_new (type) +#define spalloc_alloc(size) g_slice_alloc (size) + +#define spalloc_free(type, buf) g_slice_free (type, buf) +#define spalloc_free1(size, buf) g_slice_free1 (size, buf) + +#else + +#define spalloc_new0(type) malloc (sizeof (type)) +#define spalloc_alloc0(size) malloc (size) + +#define spalloc_free(type, buf) free (buf) +#define spalloc_free1(size, buf) free (buf) + +#endif + +typedef struct _ShmAllocSpace ShmAllocSpace; +typedef struct _ShmAllocBlock ShmAllocBlock; + +ShmAllocSpace *shm_alloc_space_new (size_t size); +void shm_alloc_space_free (ShmAllocSpace * self); + + +ShmAllocBlock *shm_alloc_space_alloc_block (ShmAllocSpace * self, + unsigned long size); +unsigned long shm_alloc_space_alloc_block_get_offset (ShmAllocBlock *block); + +void shm_alloc_space_block_inc (ShmAllocBlock * block); +void shm_alloc_space_block_dec (ShmAllocBlock * block); +ShmAllocBlock * shm_alloc_space_block_get (ShmAllocSpace * space, + unsigned long offset); + +#endif /* __SHMALLOC_H__ */ diff --git a/gst/shm/shmpipe.c b/gst/shm/shmpipe.c new file mode 100644 index 0000000000..40fef0c1f8 --- /dev/null +++ b/gst/shm/shmpipe.c @@ -0,0 +1,819 @@ + + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "shmpipe.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* + * The protocol over the pipe is in packets + * + * The defined types are: + * type 1: new shm area + * Area length + * Size of path (followed by path) + * + * type 2: Close shm area: + * No payload + * + * type 3: shm buffer + * offset + * bufsize + * + * type 4: ack buffer + * offset + * + * Type 4 goes from the client to the server + * The rest are from the server to the client + * The client should never write in the SHM + */ + + +#include "shmalloc.h" + +enum +{ + COMMAND_NEW_SHM_AREA = 1, + COMMAND_CLOSE_SHM_AREA = 2, + COMMAND_NEW_BUFFER = 3, + COMMAND_ACK_BUFFER = 4 +}; + +typedef struct _ShmArea ShmArea; +typedef struct _ShmBuffer ShmBuffer; + +struct _ShmArea +{ + int id; + + int use_count; + + int shm_fd; + + char *shm_area; + size_t shm_area_len; + + char *shm_area_name; + + ShmAllocSpace *allocspace; + + ShmArea *next; +}; + +struct _ShmBuffer +{ + int use_count; + + ShmArea *shm_area; + unsigned long offset; + size_t size; + + ShmAllocBlock *block; + + ShmBuffer *next; + + int num_clients; + int clients[0]; +}; + + +struct _ShmPipe +{ + int main_socket; + char *socket_path; + + ShmArea *shm_area; + + int next_area_id; + + ShmBuffer *buffers; + + int num_clients; + ShmClient *clients; + + mode_t perms; +}; + +struct _ShmClient +{ + int fd; + + ShmClient *next; +}; + +struct _ShmBlock +{ + ShmPipe *pipe; + ShmArea *area; + ShmAllocBlock *ablock; +}; + +struct CommandBuffer +{ + unsigned int type; + int area_id; + + union + { + struct + { + size_t size; + unsigned int path_size; + /* Followed by path */ + } new_shm_area; + struct + { + unsigned long offset; + unsigned long size; + } buffer; + struct + { + unsigned long offset; + } ack_buffer; + } payload; +}; + +static ShmArea *sp_open_shm (char *path, int id, int writer, mode_t perms, + size_t size); +static void sp_close_shm (ShmPipe * self, ShmArea * area); +static int sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, + ShmBuffer * prev_buf); +static void sp_shm_area_dec (ShmPipe * self, ShmArea * area); + + + +#define RETURN_ERROR(format, ...) do { \ + fprintf (stderr, format, __VA_ARGS__); \ + sp_close (self); \ + return NULL; } while (0) + +ShmPipe * +sp_writer_create (const char *path, size_t size, mode_t perms) +{ + ShmPipe *self = spalloc_new (ShmPipe); + int flags; + struct sockaddr_un sun; + + memset (self, 0, sizeof (ShmPipe)); + + self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0); + + if (!self->main_socket < 0) { + RETURN_ERROR ("Could not create socket (%d): %s\n", errno, + strerror (errno)); + } + + flags = fcntl (self->main_socket, F_GETFL, 0); + if (flags < 0) { + RETURN_ERROR ("fcntl(F_GETFL) failed (%d): %s\n", errno, strerror (errno)); + } + + if (fcntl (self->main_socket, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC) < 0) { + RETURN_ERROR ("fcntl(F_SETFL) failed (%d): %s\n", errno, strerror (errno)); + } + + sun.sun_family = AF_UNIX; + strncpy (sun.sun_path, path, sizeof (sun.sun_path)); + + if (bind (self->main_socket, (struct sockaddr *) &sun, + sizeof (struct sockaddr_un)) < 0) { + RETURN_ERROR ("bind() failed (%d): %s\n", errno, strerror (errno)); + } + + self->socket_path = strdup (path); + + if (listen (self->main_socket, 10) < 0) { + RETURN_ERROR ("listen() failed (%d): %s\n", errno, strerror (errno)); + } + + self->shm_area = sp_open_shm (NULL, ++self->next_area_id, 1, perms, size); + + self->perms = perms; + + if (!self->shm_area) { + sp_close (self); + return NULL; + } + + return self; +} + +#undef RETURN_ERROR + +#define RETURN_ERROR(format, ...) \ + fprintf (stderr, format, __VA_ARGS__); \ + sp_shm_area_dec (NULL, area); \ + return NULL; + +static ShmArea * +sp_open_shm (char *path, int id, int writer, mode_t perms, size_t size) +{ + ShmArea *area = spalloc_new (ShmArea); + char tmppath[PATH_MAX]; + int flags; + int has_path = (path != NULL) ? 1 : 0; + int prot; + + memset (area, 0, sizeof (ShmArea)); + + area->use_count = 1; + + area->shm_area_len = size; + + + if (writer) + flags = O_RDWR | O_CREAT | O_TRUNC; + else + flags = O_RDONLY; + + area->shm_fd = -1; + if (!path) + path = tmppath; + + do { + if (path == tmppath) { + path = tmppath; + snprintf (tmppath, PATH_MAX, "/%X%X%X%X%X.shmpipe", + rand (), rand (), rand (), rand (), rand ()); + } + area->shm_fd = shm_open (path, flags, perms); + } while (path == tmppath && area->shm_fd < 0 && errno == EEXIST); + + if (area->shm_fd < 0) { + RETURN_ERROR ("shm_open failed on %s (%d): %s\n", path, errno, + strerror (errno)); + } + + if (!has_path) + area->shm_area_name = strdup (path); + + if (writer) { + if (ftruncate (area->shm_fd, size)) { + RETURN_ERROR ("Could not resize memory area to header size," + " ftruncate failed (%d): %s\n", errno, strerror (errno)); + } + } + + if (writer) + prot = PROT_READ | PROT_WRITE; + else + prot = PROT_READ; + + area->shm_area = mmap (NULL, size, prot, MAP_SHARED, area->shm_fd, 0); + + if (area->shm_area == MAP_FAILED) { + RETURN_ERROR ("mmap failed (%d): %s\n", errno, strerror (errno)); + } + + area->id = id; + + if (writer) + area->allocspace = shm_alloc_space_new (area->shm_area_len); + + return area; +} + +#undef RETURN_ERROR + +static void +sp_close_shm (ShmPipe * self, ShmArea * area) +{ + ShmArea *item = NULL; + ShmArea *prev_item = NULL; + + assert (area->use_count == 0); + + if (area->allocspace) + shm_alloc_space_free (area->allocspace); + + + for (item = self->shm_area; item; item = item->next) { + if (item == area) { + if (prev_item) + prev_item->next = item->next; + else + self->shm_area = item->next; + break; + } + prev_item = item; + } + assert (item); + + if (area->shm_area != MAP_FAILED) + munmap (area->shm_area, area->shm_area_len); + + if (area->shm_fd >= 0) + close (area->shm_fd); + + if (area->shm_area_name) { + shm_unlink (area->shm_area_name); + free (area->shm_area_name); + } + + spalloc_free (ShmArea, area); +} + +static void +sp_shm_area_inc (ShmArea * area) +{ + area->use_count++; +} + +static void +sp_shm_area_dec (ShmPipe * self, ShmArea * area) +{ + assert (area->use_count > 0); + area->use_count--; + + if (area->use_count == 0) { + sp_close_shm (self, area); + } +} + +void +sp_close (ShmPipe * self) +{ + if (self->main_socket >= 0) + close (self->main_socket); + + if (self->socket_path) { + unlink (self->socket_path); + free (self->socket_path); + } + + while (self->clients) + sp_writer_close_client (self, self->clients); + + while (self->shm_area) { + sp_shm_area_dec (self, self->shm_area); + } + + spalloc_free (ShmPipe, self); +} + +void +sp_writer_setperms_shm (ShmPipe * self, mode_t perms) +{ + self->perms = perms; + fchmod (self->shm_area->shm_fd, perms); +} + +static int +send_command (int fd, struct CommandBuffer *cb, unsigned short int type, + int area_id) +{ + cb->type = type; + cb->area_id = area_id; + + if (send (fd, cb, sizeof (struct CommandBuffer), MSG_NOSIGNAL) != + sizeof (struct CommandBuffer)) + return 0; + + return 1; +} + +int +sp_writer_resize (ShmPipe * self, size_t size) +{ + ShmArea *newarea; + ShmArea *old_current; + ShmClient *client; + int c = 0; + int pathlen; + + if (self->shm_area->shm_area_len == size) + return 0; + + newarea = sp_open_shm (NULL, ++self->next_area_id, 1, self->perms, size); + + if (!newarea) + return -1; + + old_current = self->shm_area; + newarea->next = self->shm_area; + self->shm_area = newarea; + + pathlen = strlen (newarea->shm_area_name) + 1; + + for (client = self->clients; client; client = client->next) { + struct CommandBuffer cb; + + if (!send_command (client->fd, &cb, COMMAND_CLOSE_SHM_AREA, + old_current->id)) + continue; + + cb.payload.new_shm_area.size = newarea->shm_area_len; + cb.payload.new_shm_area.path_size = pathlen; + if (!send_command (client->fd, &cb, COMMAND_NEW_SHM_AREA, newarea->id)) + continue; + + if (send (client->fd, newarea->shm_area_name, pathlen, MSG_NOSIGNAL) != + pathlen) + continue; + c++; + } + + sp_shm_area_dec (self, old_current); + + + return c; +} + +ShmBlock * +sp_writer_alloc_block (ShmPipe * self, size_t size) +{ + ShmBlock *block; + ShmAllocBlock *ablock = + shm_alloc_space_alloc_block (self->shm_area->allocspace, size); + + if (!ablock) + return NULL; + + block = spalloc_new (ShmBlock); + sp_shm_area_inc (self->shm_area); + block->pipe = self; + block->area = self->shm_area; + block->ablock = ablock; + return block; +} + +char * +sp_writer_block_get_buf (ShmBlock * block) +{ + return block->area->shm_area + + shm_alloc_space_alloc_block_get_offset (block->ablock); +} + +void +sp_writer_free_block (ShmBlock * block) +{ + shm_alloc_space_block_dec (block->ablock); + sp_shm_area_dec (block->pipe, block->area); + spalloc_free (ShmBlock, block); +} + +/* Returns the number of client this has successfully been sent to */ + +int +sp_writer_send_buf (ShmPipe * self, gchar * buf, size_t size) +{ + ShmArea *area = NULL; + unsigned long offset = 0; + unsigned long bsize = size; + ShmBuffer *sb; + ShmClient *client = NULL; + ShmAllocBlock *block = NULL; + int i = 0; + int c = 0; + + if (self->num_clients == 0) + return 0; + + for (area = self->shm_area; area; area = area->next) { + if (buf >= area->shm_area && buf < (area->shm_area + area->shm_area_len)) { + offset = buf - area->shm_area; + block = shm_alloc_space_block_get (area->allocspace, offset); + assert (block); + break; + } + } + + if (!block) + return -1; + + sb = spalloc_alloc (sizeof (ShmBuffer) + sizeof (int) * self->num_clients); + memset (sb, 0, sizeof (ShmBuffer)); + memset (sb->clients, -1, sizeof (int) * self->num_clients); + sb->shm_area = area; + sb->offset = offset; + sb->size = size; + sb->num_clients = self->num_clients; + sb->block = block; + + for (client = self->clients; client; client = client->next) { + struct CommandBuffer cb; + cb.payload.buffer.offset = offset; + cb.payload.buffer.size = bsize; + if (!send_command (client->fd, &cb, COMMAND_NEW_BUFFER, self->shm_area->id)) + continue; + sb->clients[i++] = client->fd; + c++; + } + + if (c == 0) { + spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * self->num_clients, sb); + return 0; + } + + sp_shm_area_inc (area); + shm_alloc_space_block_inc (block); + + sb->use_count = c; + + sb->next = self->buffers; + self->buffers = sb; + + return c; +} + +static int +read_command (int fd, struct CommandBuffer *cb) +{ + int retval; + + retval = recv (fd, cb, sizeof (struct CommandBuffer), 0); + if (retval == sizeof (struct CommandBuffer)) { + return 1; + } else { + return 0; + } +} + +unsigned long +sp_client_recv (ShmPipe * self, char **buf) +{ + char *area_name = NULL; + ShmArea *newarea, *oldarea; + ShmArea *area; + struct CommandBuffer cb; + + if (!read_command (self->main_socket, &cb)) + return -1; + + switch (cb.type) { + case COMMAND_NEW_SHM_AREA: + assert (cb.payload.new_shm_area.path_size > 0); + assert (cb.payload.new_shm_area.size > 0); + + area_name = malloc (cb.payload.new_shm_area.path_size); + if (read (self->main_socket, area_name, cb.payload.new_shm_area.path_size) + != cb.payload.new_shm_area.path_size) { + free (area_name); + return -3; + } + + newarea = sp_open_shm (area_name, cb.area_id, 0, 0, + cb.payload.new_shm_area.size); + free (area_name); + if (!newarea) + return -4; + + oldarea = self->shm_area; + newarea->next = self->shm_area; + self->shm_area = newarea; + if (oldarea) + sp_shm_area_dec (self, oldarea); + break; + + case COMMAND_CLOSE_SHM_AREA: + for (area = self->shm_area; area; area = area->next) { + if (area->id == cb.area_id) { + sp_shm_area_dec (self, area); + break; + } + } + break; + + case COMMAND_NEW_BUFFER: + assert (buf); + for (area = self->shm_area; area; area = area->next) { + if (area->id == cb.area_id) { + *buf = area->shm_area + cb.payload.buffer.offset; + return cb.payload.buffer.size; + } + } + return -23; + + default: + return -99; + } + + return 0; +} + +int +sp_writer_recv (ShmPipe * self, ShmClient * client) +{ + ShmBuffer *buf = NULL, *prev_buf = NULL; + struct CommandBuffer cb; + + if (!read_command (client->fd, &cb)) + return -1; + + switch (cb.type) { + case COMMAND_ACK_BUFFER: + + for (buf = self->buffers; buf; buf = buf->next) { + if (buf->shm_area->id == cb.area_id && + buf->offset == cb.payload.ack_buffer.offset) { + sp_shmbuf_dec (self, buf, prev_buf); + break; + } + prev_buf = buf; + } + + if (!buf) + return -2; + + break; + default: + return -99; + } + + return 0; +} + +int +sp_client_recv_finish (ShmPipe * self, char *buf) +{ + ShmArea *shm_area = NULL; + unsigned long offset; + struct CommandBuffer cb; + + for (shm_area = self->shm_area; shm_area; shm_area = shm_area->next) { + if (buf >= shm_area->shm_area && + buf < shm_area->shm_area + shm_area->shm_area_len) + break; + } + + assert (shm_area); + + offset = buf - shm_area->shm_area; + + cb.payload.ack_buffer.offset = offset; + return send_command (self->main_socket, &cb, COMMAND_ACK_BUFFER, + self->shm_area->id); +} + +ShmPipe * +sp_client_open (const char *path) +{ + ShmPipe *self = spalloc_new (ShmPipe); + int flags; + struct sockaddr_un sun; + + memset (self, 0, sizeof (ShmPipe)); + + self->main_socket = socket (PF_UNIX, SOCK_STREAM, 0); + if (!self->main_socket < 0) { + sp_close (self); + return NULL; + } + + sun.sun_family = AF_UNIX; + strncpy (sun.sun_path, path, sizeof (sun.sun_path)); + + if (connect (self->main_socket, (struct sockaddr *) &sun, + sizeof (struct sockaddr_un)) < 0) + goto error; + + flags = fcntl (self->main_socket, F_GETFL, 0); + if (flags < 0) + goto error; + + flags |= O_NONBLOCK | FD_CLOEXEC; + if (fcntl (self->main_socket, F_SETFL, flags) < 0) + goto error; + + return self; + +error: + spalloc_free (ShmPipe, self); + return NULL; +} + + +ShmClient * +sp_writer_accept_client (ShmPipe * self) +{ + ShmClient *client = NULL; + int fd; + int flags; + struct CommandBuffer cb; + int pathlen = strlen (self->shm_area->shm_area_name) + 1; + + + fd = accept (self->main_socket, NULL, NULL); + + flags = fcntl (fd, F_GETFL, 0); + if (flags < 0) + goto error; + flags |= O_NONBLOCK | FD_CLOEXEC; + if (fcntl (fd, F_SETFL, flags) < 0) + goto error; + + cb.payload.new_shm_area.size = self->shm_area->shm_area_len; + cb.payload.new_shm_area.path_size = pathlen; + if (!send_command (fd, &cb, COMMAND_NEW_SHM_AREA, self->shm_area->id)) + goto error; + + if (send (fd, self->shm_area->shm_area_name, pathlen, MSG_NOSIGNAL) != + pathlen) + goto error; + + client = spalloc_new (ShmClient); + client->fd = fd; + + /* Prepend ot linked list */ + client->next = self->clients; + self->clients = client; + self->num_clients++; + + return client; + +error: + close (fd); + return NULL; +} + +static int +sp_shmbuf_dec (ShmPipe * self, ShmBuffer * buf, ShmBuffer * prev_buf) +{ + buf->use_count--; + + if (buf->use_count == 0) { + /* Remove from linked list */ + if (prev_buf) + prev_buf->next = buf->next; + else + self->buffers = buf->next; + + shm_alloc_space_block_dec (buf->block); + sp_shm_area_dec (self, buf->shm_area); + spalloc_free1 (sizeof (ShmBuffer) + sizeof (int) * buf->num_clients, buf); + return 0; + } + + return 1; +} + +void +sp_writer_close_client (ShmPipe * self, ShmClient * client) +{ + ShmBuffer *buffer = NULL, *prev_buf = NULL; + ShmClient *item = NULL, *prev_item = NULL; + + close (client->fd); + +again: + for (buffer = self->buffers; buffer; buffer = buffer->next) { + int i; + + for (i = 0; i < buffer->num_clients; i++) { + if (buffer->clients[i] == client->fd) { + buffer->clients[i] = -1; + if (!sp_shmbuf_dec (self, buffer, prev_buf)) + goto again; + break; + } + prev_buf = buffer; + } + } + + for (item = self->clients; item; item = item->next) { + if (item == client) + break; + prev_item = item; + } + assert (item); + + if (prev_item) + prev_item->next = client->next; + else + self->clients = client->next; + + self->num_clients--; + + spalloc_free (ShmClient, client); +} + +int +sp_get_fd (ShmPipe * self) +{ + return self->main_socket; +} + +int +sp_writer_get_client_fd (ShmClient * client) +{ + return client->fd; +} + +int +sp_writer_pending_writes (ShmPipe * self) +{ + return (self->buffers != NULL); +} diff --git a/gst/shm/shmpipe.h b/gst/shm/shmpipe.h new file mode 100644 index 0000000000..86b13630f5 --- /dev/null +++ b/gst/shm/shmpipe.h @@ -0,0 +1,68 @@ +/* + * + * First, create a writer with sp_writer_create() + * And selectes() on the socket from sp_get_fd() + * If the socket is closed or there are errors from any function, the app + * should call sp_close() and assume the writer is dead + * The server calls sp_writer_accept_client() when there is something to read + * from the server fd + * It then needs to select() on the socket from sp_writer_get_client_fd() + * If it gets an error on that socket, it call sp_writer_close_client(). + * If there is something to read, it calls sp_writer_recv(). + * + * The writer allocates buffers with sp_writer_alloc_block(), + * writes something in the buffer (retrieved with sp_writer_block_get_buf(), + * then calls sp_writer_send_buf() to send the buffer or a subsection to + * the other side. When it is done with the block, it calls + * sp_writer_free_block(). + * If alloc fails, then the server must wait for events from the clients before + * trying again. + * + * + * The clients connect with sp_client_open() + * And select() on the fd from sp_get_fd() until there is something to read. + * Then they must read using sp_client_recv() which will return > 0 if there + * is a valid buffer (which is read only). It will return 0 if it is an internal + * message and <0 if there was an error. If there was an error, one must close + * it with sp_close(). If was valid buffer was received, the client must release + * it with sp_client_recv_finish() when it is done reading from it. + */ + + +#ifndef __SHMPIPE_H__ +#define __SHMPIPE_H__ + +#include +#include +#include +#include + +typedef struct _ShmClient ShmClient; +typedef struct _ShmPipe ShmPipe; +typedef struct _ShmBlock ShmBlock; + +ShmPipe *sp_writer_create (const char *path, size_t size, mode_t perms); +void sp_close (ShmPipe * self); + +void sp_writer_setperms_shm (ShmPipe * self, mode_t perms); +int sp_writer_resize (ShmPipe * self, size_t size); + +int sp_get_fd (ShmPipe * self); +int sp_writer_get_client_fd (ShmClient * client); + +ShmBlock *sp_writer_alloc_block (ShmPipe * self, size_t size); +void sp_writer_free_block (ShmBlock *block); +int sp_writer_send_buf (ShmPipe * self, char *buf, size_t size); +char *sp_writer_block_get_buf (ShmBlock *block); + +ShmClient * sp_writer_accept_client (ShmPipe * self); +void sp_writer_close_client (ShmPipe *self, ShmClient * client); +int sp_writer_recv (ShmPipe * self, ShmClient * client); + +int sp_writer_pending_writes (ShmPipe * self); + +ShmPipe *sp_client_open (const char *path); +unsigned long sp_client_recv (ShmPipe * self, char **buf); +int sp_client_recv_finish (ShmPipe * self, char *buf); + +#endif /* __SHMPIPE_H__ */