shmsrc: Port to ShmPipe

This commit is contained in:
Olivier Crête 2009-10-30 12:37:50 +00:00
parent 7e90514747
commit b6bc52f961
2 changed files with 110 additions and 186 deletions

View file

@ -24,17 +24,10 @@
#endif #endif
#include "gstshmsrc.h" #include "gstshmsrc.h"
#include "gstshm.h"
#include <gst/gst.h> #include <gst/gst.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h> #include <string.h>
#include <unistd.h>
#include <semaphore.h>
/* signals */ /* signals */
enum enum
@ -46,11 +39,18 @@ enum
enum enum
{ {
PROP_0, PROP_0,
PROP_SHM_NAME PROP_SOCKET_PATH
};
struct GstShmBuffer
{
char *buf;
GstShmSrc *src;
}; };
GST_DEBUG_CATEGORY_STATIC (shmsrc_debug); GST_DEBUG_CATEGORY_STATIC (shmsrc_debug);
#define GST_CAT_DEFAULT shmsrc_debug
static const GstElementDetails gst_shm_src_details = static const GstElementDetails gst_shm_src_details =
GST_ELEMENT_DETAILS ("Shared Memory Source", GST_ELEMENT_DETAILS ("Shared Memory Source",
@ -104,18 +104,18 @@ gst_shm_src_class_init (GstShmSrcClass * klass)
gobject_class->set_property = gst_shm_src_set_property; gobject_class->set_property = gst_shm_src_set_property;
gobject_class->get_property = gst_shm_src_get_property; gobject_class->get_property = gst_shm_src_get_property;
gstbasesrc_class->start = gst_shm_src_start; gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_shm_src_start);
gstbasesrc_class->stop = gst_shm_src_stop; gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_shm_src_stop);
gstbasesrc_class->unlock = gst_shm_src_unlock; gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_src_unlock);
gstbasesrc_class->unlock_stop = gst_shm_src_unlock_stop; gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_src_unlock_stop);
gstpush_src_class->create = gst_shm_src_create; gstpush_src_class->create = gst_shm_src_create;
g_object_class_install_property (gobject_class, PROP_SHM_NAME, g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
g_param_spec_string ("shm-name", g_param_spec_string ("socket-path",
"Name of the shared memory area", "Path to the control socket",
"The name of the shared memory area that the source can read from", "The path to the control socket used to control the shared memory"
NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); " transport", NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
GST_DEBUG_CATEGORY_INIT (shmsrc_debug, "shmsrc", 0, "Shared Memory Source"); GST_DEBUG_CATEGORY_INIT (shmsrc_debug, "shmsrc", 0, "Shared Memory Source");
} }
@ -123,13 +123,6 @@ gst_shm_src_class_init (GstShmSrcClass * klass)
static void static void
gst_shm_src_init (GstShmSrc * self, GstShmSrcClass * g_class) gst_shm_src_init (GstShmSrc * self, GstShmSrcClass * g_class)
{ {
gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
gst_base_src_set_do_timestamp (GST_BASE_SRC (self), TRUE);
gst_pad_use_fixed_caps (GST_BASE_SRC_PAD (self));
self->fd = -1;
self->shm_area = MAP_FAILED;
} }
@ -140,10 +133,15 @@ gst_shm_src_set_property (GObject * object, guint prop_id,
GstShmSrc *self = GST_SHM_SRC (object); GstShmSrc *self = GST_SHM_SRC (object);
switch (prop_id) { switch (prop_id) {
case PROP_SHM_NAME: case PROP_SOCKET_PATH:
GST_OBJECT_LOCK (object); GST_OBJECT_LOCK (object);
g_free (self->shm_name); if (self->pipe) {
self->shm_name = g_value_dup_string (value); GST_WARNING_OBJECT (object, "Can not modify socket path while the "
"element is playing");
} else {
g_free (self->socket_path);
self->socket_path = g_value_dup_string (value);
}
GST_OBJECT_UNLOCK (object); GST_OBJECT_UNLOCK (object);
break; break;
default: default:
@ -159,9 +157,9 @@ gst_shm_src_get_property (GObject * object, guint prop_id,
GstShmSrc *self = GST_SHM_SRC (object); GstShmSrc *self = GST_SHM_SRC (object);
switch (prop_id) { switch (prop_id) {
case PROP_SHM_NAME: case PROP_SOCKET_PATH:
GST_OBJECT_LOCK (object); GST_OBJECT_LOCK (object);
g_value_set_string (value, self->shm_name); g_value_set_string (value, self->socket_path);
GST_OBJECT_UNLOCK (object); GST_OBJECT_UNLOCK (object);
break; break;
default: default:
@ -175,45 +173,28 @@ gst_shm_src_start (GstBaseSrc * bsrc)
{ {
GstShmSrc *self = GST_SHM_SRC (bsrc); GstShmSrc *self = GST_SHM_SRC (bsrc);
g_return_val_if_fail (self->fd == -1, FALSE); if (!self->socket_path) {
GST_ELEMENT_ERROR (bsrc, RESOURCE, NOT_FOUND,
("No path specified for socket."), (NULL));
return FALSE;
}
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (!self->shm_name) { self->pipe = sp_client_open (self->socket_path);
GST_OBJECT_UNLOCK (self);
GST_ERROR_OBJECT (self, "Must set the name of the shm area first");
GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
("One must specify the name of the shm area"),
("shm-name property not set"));
return FALSE;
}
self->fd = shm_open (self->shm_name, O_RDWR, 0);
if (self->fd < 0) {
GST_OBJECT_UNLOCK (self);
GST_ERROR_OBJECT (self, "Could not open shm area: %s", strerror (errno));
GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
("Could not open the shm area"),
("shm_open failed (%d): %s", errno, strerror (errno)));
return FALSE;
}
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
if (!self->pipe) {
self->shm_area_len = sizeof (struct GstShmHeader); GST_ELEMENT_ERROR (bsrc, RESOURCE, OPEN_READ_WRITE,
("Could not open socket: %d %s", errno, strerror (errno)), (NULL));
self->shm_area = mmap (NULL, self->shm_area_len, PROT_READ | PROT_WRITE,
MAP_SHARED, self->fd, 0);
if (self->shm_area == MAP_FAILED) {
GST_ERROR_OBJECT (self, "Could not map shm area");
GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
("Could not map memory area"),
("mmap failed (%d): %s", errno, strerror (errno)));
gst_shm_src_stop (bsrc);
return FALSE; return FALSE;
} }
self->poll = gst_poll_new (TRUE);
gst_poll_fd_init (&self->pollfd);
self->pollfd.fd = sp_get_fd (self->pipe);
gst_poll_add_fd (self->poll, &self->pollfd);
gst_poll_fd_ctl_read (self->poll, &self->pollfd, TRUE);
return TRUE; return TRUE;
} }
@ -222,151 +203,90 @@ gst_shm_src_stop (GstBaseSrc * bsrc)
{ {
GstShmSrc *self = GST_SHM_SRC (bsrc); GstShmSrc *self = GST_SHM_SRC (bsrc);
if (self->fd >= 0) GST_DEBUG_OBJECT (self, "Stopping %p", self);
close (self->fd);
self->fd = -1;
if (self->shm_area != MAP_FAILED) sp_close (self->pipe);
munmap (self->shm_area, self->shm_area_len); self->pipe = NULL;
self->shm_area_len = 0;
self->shm_area = MAP_FAILED; gst_poll_free (self->poll);
self->poll = NULL;
return TRUE; return TRUE;
} }
static gboolean static void
resize_area (GstShmSrc * self) free_buffer (gpointer data)
{ {
while ((sizeof (struct GstShmHeader) + self->shm_area->caps_size + struct GstShmBuffer *gsb = data;
self->shm_area->buffer_size) > self->shm_area_len) { g_return_if_fail (gsb->src->pipe != NULL);
size_t new_size = (sizeof (struct GstShmHeader) +
self->shm_area->caps_size + self->shm_area->buffer_size);
SHM_UNLOCK (self->shm_area); GST_LOG ("Freeing buffer %p", gsb->buf);
if (munmap (self->shm_area, self->shm_area_len)) {
GST_ERROR_OBJECT (self, "Could not unmap shared area");
GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
("Could not unmap memory area"),
("munmap failed (%d): %s", errno, strerror (errno)));
return FALSE;
}
self->shm_area = mmap (NULL, new_size, PROT_READ | PROT_WRITE, sp_client_recv_finish (gsb->src->pipe, gsb->buf);
MAP_SHARED, self->fd, 0);
if (!self->shm_area) { gst_object_unref (gsb->src);
GST_ERROR_OBJECT (self, "Could not remap shared area"); g_slice_free (struct GstShmBuffer, gsb);
GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
("Could not map memory area"),
("mmap failed (%d): %s", errno, strerror (errno)));
return FALSE;
}
self->shm_area_len = new_size;
SHM_LOCK (self->shm_area);
}
return TRUE;
} }
static GstFlowReturn static GstFlowReturn
gst_shm_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) gst_shm_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{ {
GstShmSrc *self = GST_SHM_SRC (psrc); GstShmSrc *self = GST_SHM_SRC (psrc);
gchar *buf = NULL;
int rv = 0;
struct GstShmBuffer *gsb;
if (self->unlocked) do {
return GST_FLOW_WRONG_STATE; if (gst_poll_wait (self->poll, GST_CLOCK_TIME_NONE) < 0) {
if (errno == EBUSY)
g_return_val_if_fail (self->shm_area != MAP_FAILED, GST_FLOW_ERROR); return GST_FLOW_WRONG_STATE;
GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
SHM_LOCK (self->shm_area); ("Poll failed on fd: %s", strerror (errno)));
if (self->unlocked)
goto unlocked;
if (self->shm_area->eos)
goto eos;
while (self->buffer_gen == self->shm_area->buffer_gen) {
SHM_UNLOCK (self->shm_area);
if (self->unlocked)
return GST_FLOW_WRONG_STATE;
GST_LOG_OBJECT (self, "Waiting for next buffer");
sem_wait (&self->shm_area->notification);
if (self->unlocked)
return GST_FLOW_WRONG_STATE;
SHM_LOCK (self->shm_area);
}
if (self->unlocked)
goto eos;
if (!resize_area (self)) {
return GST_FLOW_ERROR;
}
if (self->caps_gen != self->shm_area->caps_gen) {
GstCaps *caps;
GST_DEBUG_OBJECT (self, "Got new caps: %s",
GST_SHM_CAPS_BUFFER (self->shm_area));
caps = gst_caps_from_string (GST_SHM_CAPS_BUFFER (self->shm_area));
self->caps_gen = self->shm_area->caps_gen;
if (!caps) {
SHM_UNLOCK (self->shm_area);
GST_ERROR_OBJECT (self, "Could not read caps");
return GST_FLOW_ERROR; return GST_FLOW_ERROR;
} }
if (!gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps)) { if (self->unlocked)
SHM_UNLOCK (self->shm_area); return GST_FLOW_WRONG_STATE;
return GST_FLOW_NOT_NEGOTIATED;
if (gst_poll_fd_has_closed (self->poll, &self->pollfd)) {
GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
("Control socket has closed"));
return GST_FLOW_ERROR;
} }
}
GST_LOG_OBJECT (self, "Create new buffer of size %u", if (gst_poll_fd_has_error (self->poll, &self->pollfd)) {
self->shm_area->buffer_size); GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
("Control socket has error"));
return GST_FLOW_ERROR;
}
*outbuf = gst_buffer_new_and_alloc (self->shm_area->buffer_size); if (gst_poll_fd_can_read (self->poll, &self->pollfd)) {
buf = NULL;
GST_LOG_OBJECT (self, "Reading from pipe");
rv = sp_client_recv (self->pipe, &buf);
if (rv < 0) {
GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
("Error reading control data: %d", rv));
return GST_FLOW_ERROR;
}
}
} while (buf == NULL);
memcpy (GST_BUFFER_DATA (*outbuf), GST_SHM_BUFFER (self->shm_area), GST_LOG_OBJECT (self, "Got buffer %p of size %d", buf, rv);
GST_BUFFER_SIZE (*outbuf));
// GST_BUFFER_TIMESTAMP (*outbuf) = self->shm_area->timestamp; gsb = g_slice_new0 (struct GstShmBuffer);
GST_BUFFER_DURATION (*outbuf) = self->shm_area->duration; gsb->buf = buf;
GST_BUFFER_OFFSET (*outbuf) = self->shm_area->offset; gsb->src = gst_object_ref (self);
GST_BUFFER_OFFSET_END (*outbuf) = self->shm_area->offset_end;
GST_BUFFER_FLAGS (*outbuf) = self->shm_area->flags;
if (self->buffer_gen + 1 != self->shm_area->buffer_gen) { *outbuf = gst_buffer_new ();
GST_WARNING_OBJECT (self, "Skipped %u buffers, setting DISCONT flag", GST_BUFFER_FLAG_SET (*outbuf, GST_BUFFER_FLAG_READONLY);
self->shm_area->buffer_gen - self->buffer_gen - 1); GST_BUFFER_DATA (*outbuf) = (guint8 *) buf;
GST_BUFFER_FLAG_SET (*outbuf, GST_BUFFER_FLAG_DISCONT); GST_BUFFER_SIZE (*outbuf) = rv;
} GST_BUFFER_MALLOCDATA (*outbuf) = (guint8 *) gsb;
GST_BUFFER_FREE_FUNC (*outbuf) = free_buffer;
self->buffer_gen = self->shm_area->buffer_gen;
SHM_UNLOCK (self->shm_area);
gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (psrc)));
return GST_FLOW_OK; return GST_FLOW_OK;
eos:
SHM_UNLOCK (self->shm_area);
return GST_FLOW_UNEXPECTED;
unlocked:
SHM_UNLOCK (self->shm_area);
return GST_FLOW_WRONG_STATE;
} }
static gboolean static gboolean
@ -376,8 +296,8 @@ gst_shm_src_unlock (GstBaseSrc * bsrc)
self->unlocked = TRUE; self->unlocked = TRUE;
if (self->shm_area != MAP_FAILED) if (self->poll)
sem_post (&self->shm_area->notification); gst_poll_set_flushing (self->poll, TRUE);
return TRUE; return TRUE;
} }
@ -389,5 +309,8 @@ gst_shm_src_unlock_stop (GstBaseSrc * bsrc)
self->unlocked = FALSE; self->unlocked = FALSE;
if (self->poll)
gst_poll_set_flushing (self->poll, FALSE);
return TRUE; return TRUE;
} }

View file

@ -26,6 +26,8 @@
#include <gst/base/gstpushsrc.h> #include <gst/base/gstpushsrc.h>
#include <gst/base/gstbasesrc.h> #include <gst/base/gstbasesrc.h>
#include "shmpipe.h"
G_BEGIN_DECLS G_BEGIN_DECLS
#define GST_TYPE_SHM_SRC \ #define GST_TYPE_SHM_SRC \
(gst_shm_src_get_type()) (gst_shm_src_get_type())
@ -44,15 +46,14 @@ struct _GstShmSrc
{ {
GstPushSrc element; GstPushSrc element;
gchar *shm_name; gchar *socket_path;
int fd; ShmPipe *pipe;
struct GstShmHeader *shm_area; GstPoll *poll;
size_t shm_area_len; GstPollFD pollfd;
guint caps_gen;
guint buffer_gen;
GstFlowReturn flow_return;
gboolean unlocked; gboolean unlocked;
}; };