mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-26 19:51:11 +00:00
unixfd: Serialize buffer metas
Serialize every GstMeta that supports serialization into the NEW_BUFFER payload. This is especially important for GstVideoMeta in the case of multiplanar buffers, or if stride!=width. Sponsored-by: Netflix Inc. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5355>
This commit is contained in:
parent
06d9d934f9
commit
f076a9bf2f
5 changed files with 121 additions and 38 deletions
|
@ -49,7 +49,7 @@ G_STATIC_ASSERT (sizeof (ReleaseBufferPayload) == 8);
|
||||||
|
|
||||||
gboolean
|
gboolean
|
||||||
gst_unix_fd_send_command (GSocket * socket, CommandType type, GUnixFDList * fds,
|
gst_unix_fd_send_command (GSocket * socket, CommandType type, GUnixFDList * fds,
|
||||||
const gchar * payload, gsize payload_size, GError ** error)
|
const guint8 * payload, gsize payload_size, GError ** error)
|
||||||
{
|
{
|
||||||
Command command = { type, payload_size };
|
Command command = { type, payload_size };
|
||||||
GOutputVector vect[] = {
|
GOutputVector vect[] = {
|
||||||
|
@ -76,7 +76,7 @@ gst_unix_fd_send_command (GSocket * socket, CommandType type, GUnixFDList * fds,
|
||||||
|
|
||||||
gboolean
|
gboolean
|
||||||
gst_unix_fd_receive_command (GSocket * socket, GCancellable * cancellable,
|
gst_unix_fd_receive_command (GSocket * socket, GCancellable * cancellable,
|
||||||
CommandType * type, GUnixFDList ** fds, gchar ** payload,
|
CommandType * type, GUnixFDList ** fds, guint8 ** payload,
|
||||||
gsize * payload_size, GError ** error)
|
gsize * payload_size, GError ** error)
|
||||||
{
|
{
|
||||||
Command command;
|
Command command;
|
||||||
|
@ -98,8 +98,8 @@ gst_unix_fd_receive_command (GSocket * socket, GCancellable * cancellable,
|
||||||
if (command.payload_size > 0) {
|
if (command.payload_size > 0) {
|
||||||
*payload = g_malloc (command.payload_size);
|
*payload = g_malloc (command.payload_size);
|
||||||
*payload_size = command.payload_size;
|
*payload_size = command.payload_size;
|
||||||
if (g_socket_receive (socket, *payload, command.payload_size, cancellable,
|
if (g_socket_receive (socket, (gchar *) * payload, command.payload_size,
|
||||||
error) < (gssize) command.payload_size) {
|
cancellable, error) < (gssize) command.payload_size) {
|
||||||
g_clear_pointer (payload, g_free);
|
g_clear_pointer (payload, g_free);
|
||||||
ret = FALSE;
|
ret = FALSE;
|
||||||
goto out;
|
goto out;
|
||||||
|
@ -133,8 +133,8 @@ out:
|
||||||
}
|
}
|
||||||
|
|
||||||
gboolean
|
gboolean
|
||||||
gst_unix_fd_parse_new_buffer (gchar * payload, gsize payload_size,
|
gst_unix_fd_parse_new_buffer (guint8 * payload, gsize payload_size,
|
||||||
NewBufferPayload ** new_buffer)
|
NewBufferPayload ** new_buffer, guint32 * consumed)
|
||||||
{
|
{
|
||||||
if (payload == NULL || payload_size < sizeof (NewBufferPayload))
|
if (payload == NULL || payload_size < sizeof (NewBufferPayload))
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
@ -146,11 +146,13 @@ gst_unix_fd_parse_new_buffer (gchar * payload, gsize payload_size,
|
||||||
if (payload_size < struct_size)
|
if (payload_size < struct_size)
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
|
||||||
|
*consumed = struct_size;
|
||||||
|
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
gboolean
|
gboolean
|
||||||
gst_unix_fd_parse_release_buffer (gchar * payload, gsize payload_size,
|
gst_unix_fd_parse_release_buffer (guint8 * payload, gsize payload_size,
|
||||||
ReleaseBufferPayload ** release_buffer)
|
ReleaseBufferPayload ** release_buffer)
|
||||||
{
|
{
|
||||||
if (payload == NULL || payload_size < sizeof (ReleaseBufferPayload))
|
if (payload == NULL || payload_size < sizeof (ReleaseBufferPayload))
|
||||||
|
@ -162,12 +164,12 @@ gst_unix_fd_parse_release_buffer (gchar * payload, gsize payload_size,
|
||||||
}
|
}
|
||||||
|
|
||||||
gboolean
|
gboolean
|
||||||
gst_unix_fd_parse_caps (gchar * payload, gsize payload_size, gchar ** caps_str)
|
gst_unix_fd_parse_caps (guint8 * payload, gsize payload_size, gchar ** caps_str)
|
||||||
{
|
{
|
||||||
if (payload == NULL || payload_size < 1 || payload[payload_size - 1] != '\0')
|
if (payload == NULL || payload_size < 1 || payload[payload_size - 1] != '\0')
|
||||||
return FALSE;
|
return FALSE;
|
||||||
|
|
||||||
*caps_str = payload;
|
*caps_str = (gchar *) payload;
|
||||||
|
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ typedef struct {
|
||||||
guint32 flags;
|
guint32 flags;
|
||||||
guint8 type;
|
guint8 type;
|
||||||
guint8 n_memory;
|
guint8 n_memory;
|
||||||
guint16 padding;
|
guint16 n_meta;
|
||||||
MemoryPayload memories[];
|
MemoryPayload memories[];
|
||||||
} NewBufferPayload;
|
} NewBufferPayload;
|
||||||
|
|
||||||
|
@ -66,17 +66,17 @@ typedef struct {
|
||||||
} ReleaseBufferPayload;
|
} ReleaseBufferPayload;
|
||||||
|
|
||||||
gboolean gst_unix_fd_send_command(GSocket * socket, CommandType type,
|
gboolean gst_unix_fd_send_command(GSocket * socket, CommandType type,
|
||||||
GUnixFDList * fds, const gchar * payload, gsize payload_size,
|
GUnixFDList * fds, const guint8 * payload, gsize payload_size,
|
||||||
GError ** error);
|
GError ** error);
|
||||||
gboolean gst_unix_fd_receive_command(GSocket * socket,
|
gboolean gst_unix_fd_receive_command (GSocket *socket,
|
||||||
GCancellable * cancellable, CommandType *type, GUnixFDList ** fds,
|
GCancellable *cancellable, CommandType *type, GUnixFDList **fds,
|
||||||
gchar ** payload, gsize *payload_size, GError ** error);
|
guint8 **payload, gsize *payload_size, GError **error);
|
||||||
|
|
||||||
gboolean gst_unix_fd_parse_new_buffer(gchar *payload, gsize payload_size,
|
gboolean gst_unix_fd_parse_new_buffer (guint8 *payload, gsize payload_size,
|
||||||
NewBufferPayload **new_buffer);
|
NewBufferPayload **new_buffer, guint32 *consumed);
|
||||||
gboolean gst_unix_fd_parse_release_buffer(gchar *payload, gsize payload_size,
|
gboolean gst_unix_fd_parse_release_buffer(guint8 *payload, gsize payload_size,
|
||||||
ReleaseBufferPayload **release_buffer);
|
ReleaseBufferPayload **release_buffer);
|
||||||
gboolean gst_unix_fd_parse_caps(gchar *payload, gsize payload_size,
|
gboolean gst_unix_fd_parse_caps (guint8 *payload, gsize payload_size,
|
||||||
gchar **caps_str);
|
gchar **caps_str);
|
||||||
|
|
||||||
GSocket *gst_unix_fd_socket_new(const gchar *socket_path,
|
GSocket *gst_unix_fd_socket_new(const gchar *socket_path,
|
||||||
|
|
|
@ -86,6 +86,7 @@ struct _GstUnixFdSink
|
||||||
GHashTable *clients;
|
GHashTable *clients;
|
||||||
GstCaps *caps;
|
GstCaps *caps;
|
||||||
gboolean uses_monotonic_clock;
|
gboolean uses_monotonic_clock;
|
||||||
|
GByteArray *payload;
|
||||||
};
|
};
|
||||||
|
|
||||||
G_DEFINE_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST_TYPE_BASE_SINK);
|
G_DEFINE_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST_TYPE_BASE_SINK);
|
||||||
|
@ -199,7 +200,7 @@ incoming_command_cb (GSocket * socket, GIOCondition cond, gpointer user_data)
|
||||||
GstUnixFdSink *self = user_data;
|
GstUnixFdSink *self = user_data;
|
||||||
Client *client;
|
Client *client;
|
||||||
CommandType command;
|
CommandType command;
|
||||||
gchar *payload = NULL;
|
guint8 *payload = NULL;
|
||||||
gsize payload_size;
|
gsize payload_size;
|
||||||
GError *error = NULL;
|
GError *error = NULL;
|
||||||
|
|
||||||
|
@ -264,12 +265,12 @@ on_error:
|
||||||
return G_SOURCE_REMOVE;
|
return G_SOURCE_REMOVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static gchar *
|
static guint8 *
|
||||||
caps_to_payload (GstCaps * caps, gsize * payload_size)
|
caps_to_payload (GstCaps * caps, gsize * payload_size)
|
||||||
{
|
{
|
||||||
gchar *payload = gst_caps_to_string (caps);
|
gchar *payload = gst_caps_to_string (caps);
|
||||||
*payload_size = strlen (payload) + 1;
|
*payload_size = strlen (payload) + 1;
|
||||||
return payload;
|
return (guint8 *) payload;
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
|
@ -303,7 +304,7 @@ new_client_cb (GSocket * socket, GIOCondition cond, gpointer user_data)
|
||||||
* we don't want this client to miss a caps event or receive a buffer while we
|
* we don't want this client to miss a caps event or receive a buffer while we
|
||||||
* send initial caps. */
|
* send initial caps. */
|
||||||
gsize payload_size;
|
gsize payload_size;
|
||||||
gchar *payload = caps_to_payload (self->caps, &payload_size);
|
guint8 *payload = caps_to_payload (self->caps, &payload_size);
|
||||||
if (!gst_unix_fd_send_command (client_socket, COMMAND_TYPE_CAPS, NULL,
|
if (!gst_unix_fd_send_command (client_socket, COMMAND_TYPE_CAPS, NULL,
|
||||||
payload, payload_size, &error)) {
|
payload, payload_size, &error)) {
|
||||||
GST_ERROR_OBJECT (self, "Failed to send caps to new client %p: %s", client,
|
GST_ERROR_OBJECT (self, "Failed to send caps to new client %p: %s", client,
|
||||||
|
@ -365,6 +366,13 @@ gst_unix_fd_sink_start (GstBaseSink * bsink)
|
||||||
|
|
||||||
self->thread = g_thread_new ("unixfdsink", thread_cb, self);
|
self->thread = g_thread_new ("unixfdsink", thread_cb, self);
|
||||||
|
|
||||||
|
/* Preallocate the minimum payload size for a buffer with a single memory and
|
||||||
|
* no metas. Chances are that every buffer will require roughly the same
|
||||||
|
* payload size, by reusing the same GByteArray we avoid reallocations. */
|
||||||
|
self->payload =
|
||||||
|
g_byte_array_sized_new (sizeof (NewBufferPayload) +
|
||||||
|
sizeof (MemoryPayload));
|
||||||
|
|
||||||
out:
|
out:
|
||||||
GST_OBJECT_UNLOCK (self);
|
GST_OBJECT_UNLOCK (self);
|
||||||
g_clear_error (&error);
|
g_clear_error (&error);
|
||||||
|
@ -385,6 +393,7 @@ gst_unix_fd_sink_stop (GstBaseSink * bsink)
|
||||||
g_clear_object (&self->socket);
|
g_clear_object (&self->socket);
|
||||||
gst_clear_caps (&self->caps);
|
gst_clear_caps (&self->caps);
|
||||||
g_hash_table_remove_all (self->clients);
|
g_hash_table_remove_all (self->clients);
|
||||||
|
g_clear_pointer (&self->payload, g_byte_array_unref);
|
||||||
|
|
||||||
if (self->socket_type == G_UNIX_SOCKET_ADDRESS_PATH)
|
if (self->socket_type == G_UNIX_SOCKET_ADDRESS_PATH)
|
||||||
g_unlink (self->socket_path);
|
g_unlink (self->socket_path);
|
||||||
|
@ -394,7 +403,7 @@ gst_unix_fd_sink_stop (GstBaseSink * bsink)
|
||||||
|
|
||||||
static void
|
static void
|
||||||
send_command_to_all (GstUnixFdSink * self, CommandType type, GUnixFDList * fds,
|
send_command_to_all (GstUnixFdSink * self, CommandType type, GUnixFDList * fds,
|
||||||
const gchar * payload, gsize payload_size, GstBuffer * buffer)
|
const guint8 * payload, gsize payload_size, GstBuffer * buffer)
|
||||||
{
|
{
|
||||||
GHashTableIter iter;
|
GHashTableIter iter;
|
||||||
GSocket *socket;
|
GSocket *socket;
|
||||||
|
@ -435,6 +444,21 @@ calculate_timestamp (GstClockTime timestamp, GstClockTime base_time,
|
||||||
return timestamp;
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static guint16
|
||||||
|
serialize_metas (GstBuffer * buffer, GByteArray * payload)
|
||||||
|
{
|
||||||
|
gpointer state = NULL;
|
||||||
|
GstMeta *meta;
|
||||||
|
guint16 n_meta = 0;
|
||||||
|
|
||||||
|
while ((meta = gst_buffer_iterate_meta (buffer, &state)) != NULL) {
|
||||||
|
if (gst_meta_serialize_simple (meta, payload))
|
||||||
|
n_meta++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return n_meta;
|
||||||
|
}
|
||||||
|
|
||||||
static GstFlowReturn
|
static GstFlowReturn
|
||||||
gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
|
gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
|
||||||
{
|
{
|
||||||
|
@ -442,11 +466,11 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
|
||||||
GstFlowReturn ret = GST_FLOW_OK;
|
GstFlowReturn ret = GST_FLOW_OK;
|
||||||
GError *error = NULL;
|
GError *error = NULL;
|
||||||
|
|
||||||
/* Allocate payload */
|
|
||||||
guint n_memory = gst_buffer_n_memory (buffer);
|
guint n_memory = gst_buffer_n_memory (buffer);
|
||||||
gsize payload_size =
|
gsize struct_size =
|
||||||
sizeof (NewBufferPayload) + sizeof (MemoryPayload) * n_memory;
|
sizeof (NewBufferPayload) + sizeof (MemoryPayload) * n_memory;
|
||||||
gchar *payload = g_malloc0 (payload_size);
|
g_byte_array_set_size (self->payload, struct_size);
|
||||||
|
guint32 n_meta = serialize_metas (buffer, self->payload);
|
||||||
|
|
||||||
GstClockTime latency = gst_base_sink_get_latency (GST_BASE_SINK_CAST (self));
|
GstClockTime latency = gst_base_sink_get_latency (GST_BASE_SINK_CAST (self));
|
||||||
GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT_CAST (self));
|
GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT_CAST (self));
|
||||||
|
@ -456,7 +480,7 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
|
||||||
gst_clock_get_time (GST_ELEMENT_CLOCK (self)));
|
gst_clock_get_time (GST_ELEMENT_CLOCK (self)));
|
||||||
}
|
}
|
||||||
|
|
||||||
NewBufferPayload *new_buffer = (NewBufferPayload *) payload;
|
NewBufferPayload *new_buffer = (NewBufferPayload *) self->payload->data;
|
||||||
/* Cast buffer pointer to guint64 identifier. Client will send us back that
|
/* Cast buffer pointer to guint64 identifier. Client will send us back that
|
||||||
* id so we know which buffer to unref. */
|
* id so we know which buffer to unref. */
|
||||||
new_buffer->id = (guint64) buffer;
|
new_buffer->id = (guint64) buffer;
|
||||||
|
@ -472,6 +496,7 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
|
||||||
new_buffer->flags = GST_BUFFER_FLAGS (buffer);
|
new_buffer->flags = GST_BUFFER_FLAGS (buffer);
|
||||||
new_buffer->type = MEMORY_TYPE_DEFAULT;
|
new_buffer->type = MEMORY_TYPE_DEFAULT;
|
||||||
new_buffer->n_memory = n_memory;
|
new_buffer->n_memory = n_memory;
|
||||||
|
new_buffer->n_meta = n_meta;
|
||||||
|
|
||||||
gboolean dmabuf_count = 0;
|
gboolean dmabuf_count = 0;
|
||||||
GUnixFDList *fds = g_unix_fd_list_new ();
|
GUnixFDList *fds = g_unix_fd_list_new ();
|
||||||
|
@ -507,14 +532,13 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
|
||||||
new_buffer->type = MEMORY_TYPE_DMABUF;
|
new_buffer->type = MEMORY_TYPE_DMABUF;
|
||||||
|
|
||||||
GST_OBJECT_LOCK (self);
|
GST_OBJECT_LOCK (self);
|
||||||
send_command_to_all (self, COMMAND_TYPE_NEW_BUFFER, fds, payload,
|
send_command_to_all (self, COMMAND_TYPE_NEW_BUFFER, fds,
|
||||||
payload_size, buffer);
|
self->payload->data, self->payload->len, buffer);
|
||||||
GST_OBJECT_UNLOCK (self);
|
GST_OBJECT_UNLOCK (self);
|
||||||
|
|
||||||
out:
|
out:
|
||||||
g_clear_object (&fds);
|
g_clear_object (&fds);
|
||||||
g_clear_error (&error);
|
g_clear_error (&error);
|
||||||
g_free (payload);
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -532,7 +556,7 @@ gst_unix_fd_sink_event (GstBaseSink * bsink, GstEvent * event)
|
||||||
GST_DEBUG_OBJECT (self, "Send new caps to all clients: %" GST_PTR_FORMAT,
|
GST_DEBUG_OBJECT (self, "Send new caps to all clients: %" GST_PTR_FORMAT,
|
||||||
self->caps);
|
self->caps);
|
||||||
gsize payload_size;
|
gsize payload_size;
|
||||||
gchar *payload = caps_to_payload (self->caps, &payload_size);
|
guint8 *payload = caps_to_payload (self->caps, &payload_size);
|
||||||
send_command_to_all (self, COMMAND_TYPE_CAPS, NULL, payload, payload_size,
|
send_command_to_all (self, COMMAND_TYPE_CAPS, NULL, payload, payload_size,
|
||||||
NULL);
|
NULL);
|
||||||
g_free (payload);
|
g_free (payload);
|
||||||
|
|
|
@ -103,7 +103,7 @@ memory_weak_ref_cb (GstUnixFdSrc * self, GstMemory * mem)
|
||||||
ReleaseBufferPayload payload = { ctx->id };
|
ReleaseBufferPayload payload = { ctx->id };
|
||||||
GError *error = NULL;
|
GError *error = NULL;
|
||||||
if (!gst_unix_fd_send_command (self->socket, COMMAND_TYPE_RELEASE_BUFFER,
|
if (!gst_unix_fd_send_command (self->socket, COMMAND_TYPE_RELEASE_BUFFER,
|
||||||
NULL, (const gchar *) &payload, sizeof (payload), &error)) {
|
NULL, (guint8 *) & payload, sizeof (payload), &error)) {
|
||||||
GST_WARNING_OBJECT (self, "Failed to send release-buffer command: %s",
|
GST_WARNING_OBJECT (self, "Failed to send release-buffer command: %s",
|
||||||
error->message);
|
error->message);
|
||||||
g_clear_error (&error);
|
g_clear_error (&error);
|
||||||
|
@ -301,7 +301,7 @@ gst_unix_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
|
||||||
GstUnixFdSrc *self = GST_UNIX_FD_SRC (psrc);
|
GstUnixFdSrc *self = GST_UNIX_FD_SRC (psrc);
|
||||||
CommandType command;
|
CommandType command;
|
||||||
GUnixFDList *fds = NULL;
|
GUnixFDList *fds = NULL;
|
||||||
gchar *payload = NULL;
|
guint8 *payload = NULL;
|
||||||
gsize payload_size;
|
gsize payload_size;
|
||||||
GError *error = NULL;
|
GError *error = NULL;
|
||||||
GstFlowReturn ret = GST_FLOW_OK;
|
GstFlowReturn ret = GST_FLOW_OK;
|
||||||
|
@ -327,7 +327,9 @@ again:
|
||||||
goto on_error;
|
goto on_error;
|
||||||
case COMMAND_TYPE_NEW_BUFFER:{
|
case COMMAND_TYPE_NEW_BUFFER:{
|
||||||
NewBufferPayload *new_buffer;
|
NewBufferPayload *new_buffer;
|
||||||
if (!gst_unix_fd_parse_new_buffer (payload, payload_size, &new_buffer)) {
|
guint32 payload_off = 0;
|
||||||
|
if (!gst_unix_fd_parse_new_buffer (payload, payload_size, &new_buffer,
|
||||||
|
&payload_off)) {
|
||||||
GST_ERROR_OBJECT (self, "Received new-buffer with wrong payload size");
|
GST_ERROR_OBJECT (self, "Received new-buffer with wrong payload size");
|
||||||
ret = GST_FLOW_ERROR;
|
ret = GST_FLOW_ERROR;
|
||||||
goto on_error;
|
goto on_error;
|
||||||
|
@ -341,8 +343,8 @@ again:
|
||||||
|
|
||||||
if (g_unix_fd_list_get_length (fds) != new_buffer->n_memory) {
|
if (g_unix_fd_list_get_length (fds) != new_buffer->n_memory) {
|
||||||
GST_ERROR_OBJECT (self,
|
GST_ERROR_OBJECT (self,
|
||||||
"Received new buffer command with %d file descriptors instead of %d",
|
"Received new buffer command with %d file descriptors instead of "
|
||||||
g_unix_fd_list_get_length (fds), new_buffer->n_memory);
|
"%d", g_unix_fd_list_get_length (fds), new_buffer->n_memory);
|
||||||
ret = GST_FLOW_ERROR;
|
ret = GST_FLOW_ERROR;
|
||||||
goto on_error;
|
goto on_error;
|
||||||
}
|
}
|
||||||
|
@ -379,6 +381,18 @@ again:
|
||||||
GST_BUFFER_OFFSET_END (*outbuf) = new_buffer->offset_end;
|
GST_BUFFER_OFFSET_END (*outbuf) = new_buffer->offset_end;
|
||||||
GST_BUFFER_FLAGS (*outbuf) = new_buffer->flags;
|
GST_BUFFER_FLAGS (*outbuf) = new_buffer->flags;
|
||||||
|
|
||||||
|
for (int i = 0; i < new_buffer->n_meta; i++) {
|
||||||
|
guint32 consumed = 0;
|
||||||
|
gst_meta_deserialize (*outbuf, (guint8 *) payload + payload_off,
|
||||||
|
payload_size - payload_off, &consumed);
|
||||||
|
if (consumed == 0) {
|
||||||
|
GST_ERROR_OBJECT (self, "Malformed meta serialization");
|
||||||
|
ret = GST_FLOW_ERROR;
|
||||||
|
goto on_error;
|
||||||
|
}
|
||||||
|
payload_off += consumed;
|
||||||
|
}
|
||||||
|
|
||||||
GST_OBJECT_LOCK (self);
|
GST_OBJECT_LOCK (self);
|
||||||
for (int i = 0; i < new_buffer->n_memory; i++) {
|
for (int i = 0; i < new_buffer->n_memory; i++) {
|
||||||
GstMemory *mem = gst_fd_allocator_alloc (allocator, fds_arr[i],
|
GstMemory *mem = gst_fd_allocator_alloc (allocator, fds_arr[i],
|
||||||
|
|
|
@ -37,10 +37,26 @@ wait_preroll (GstElement * element)
|
||||||
fail_unless (state_res == GST_STATE_CHANGE_SUCCESS);
|
fail_unless (state_res == GST_STATE_CHANGE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static GstPadProbeReturn
|
||||||
|
buffer_pad_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
|
||||||
|
{
|
||||||
|
GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
|
||||||
|
if (buffer != NULL) {
|
||||||
|
GstCustomMeta *cmeta =
|
||||||
|
gst_buffer_add_custom_meta (buffer, "unix-fd-custom-meta");
|
||||||
|
GstStructure *s = gst_custom_meta_get_structure (cmeta);
|
||||||
|
gst_structure_set (s, "field", G_TYPE_INT, 42, NULL);
|
||||||
|
}
|
||||||
|
return GST_PAD_PROBE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
GST_START_TEST (test_unixfd_videotestsrc)
|
GST_START_TEST (test_unixfd_videotestsrc)
|
||||||
{
|
{
|
||||||
GError *error = NULL;
|
GError *error = NULL;
|
||||||
|
|
||||||
|
const gchar *tags[] = { NULL };
|
||||||
|
gst_meta_register_custom ("unix-fd-custom-meta", tags, NULL, NULL, NULL);
|
||||||
|
|
||||||
/* Ensure we don't have socket from previous failed test */
|
/* Ensure we don't have socket from previous failed test */
|
||||||
gchar *socket_path =
|
gchar *socket_path =
|
||||||
g_strdup_printf ("%s/unixfd-test-socket", g_get_user_runtime_dir ());
|
g_strdup_printf ("%s/unixfd-test-socket", g_get_user_runtime_dir ());
|
||||||
|
@ -50,15 +66,26 @@ GST_START_TEST (test_unixfd_videotestsrc)
|
||||||
|
|
||||||
/* Setup source */
|
/* Setup source */
|
||||||
gchar *pipeline_str =
|
gchar *pipeline_str =
|
||||||
g_strdup_printf ("videotestsrc ! unixfdsink socket-path=%s", socket_path);
|
g_strdup_printf ("videotestsrc name=src ! unixfdsink socket-path=%s",
|
||||||
|
socket_path);
|
||||||
GstElement *pipeline_service = gst_parse_launch (pipeline_str, &error);
|
GstElement *pipeline_service = gst_parse_launch (pipeline_str, &error);
|
||||||
g_assert_no_error (error);
|
g_assert_no_error (error);
|
||||||
g_free (pipeline_str);
|
g_free (pipeline_str);
|
||||||
|
|
||||||
|
/* Add a custom meta on each buffer */
|
||||||
|
GstElement *src = gst_bin_get_by_name (GST_BIN (pipeline_service), "src");
|
||||||
|
GstPad *pad = gst_element_get_static_pad (src, "src");
|
||||||
|
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER, buffer_pad_probe_cb, NULL,
|
||||||
|
NULL);
|
||||||
|
gst_object_unref (src);
|
||||||
|
gst_object_unref (pad);
|
||||||
|
|
||||||
wait_preroll (pipeline_service);
|
wait_preroll (pipeline_service);
|
||||||
|
|
||||||
/* Setup sink */
|
/* Setup sink */
|
||||||
pipeline_str =
|
pipeline_str =
|
||||||
g_strdup_printf ("unixfdsrc socket-path=%s ! fakesink", socket_path);
|
g_strdup_printf ("unixfdsrc socket-path=%s ! fakesink name=sink",
|
||||||
|
socket_path);
|
||||||
GstElement *pipeline_client_1 = gst_parse_launch (pipeline_str, &error);
|
GstElement *pipeline_client_1 = gst_parse_launch (pipeline_str, &error);
|
||||||
g_assert_no_error (error);
|
g_assert_no_error (error);
|
||||||
wait_preroll (pipeline_client_1);
|
wait_preroll (pipeline_client_1);
|
||||||
|
@ -73,6 +100,22 @@ GST_START_TEST (test_unixfd_videotestsrc)
|
||||||
g_assert_no_error (error);
|
g_assert_no_error (error);
|
||||||
wait_preroll (pipeline_client_2);
|
wait_preroll (pipeline_client_2);
|
||||||
|
|
||||||
|
/* Check we received our custom meta */
|
||||||
|
GstSample *sample;
|
||||||
|
GstElement *sink = gst_bin_get_by_name (GST_BIN (pipeline_client_2), "sink");
|
||||||
|
g_object_get (sink, "last-sample", &sample, NULL);
|
||||||
|
fail_unless (sample);
|
||||||
|
GstBuffer *buffer = gst_sample_get_buffer (sample);
|
||||||
|
GstCustomMeta *cmeta =
|
||||||
|
gst_buffer_get_custom_meta (buffer, "unix-fd-custom-meta");
|
||||||
|
fail_unless (cmeta);
|
||||||
|
GstStructure *s = gst_custom_meta_get_structure (cmeta);
|
||||||
|
gint value;
|
||||||
|
fail_unless (gst_structure_get_int (s, "field", &value));
|
||||||
|
fail_unless_equals_int (value, 42);
|
||||||
|
gst_object_unref (sink);
|
||||||
|
gst_sample_unref (sample);
|
||||||
|
|
||||||
/* Teardown */
|
/* Teardown */
|
||||||
fail_unless (gst_element_set_state (pipeline_client_1,
|
fail_unless (gst_element_set_state (pipeline_client_1,
|
||||||
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
|
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
|
||||||
|
|
Loading…
Reference in a new issue