mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-27 02:30:35 +00:00
unixfd: New plugin with unixfdsink and unixfdsrc elements
This pair of elements, inspired from shmsink/shmsrc, send unix file descriptors (e.g. memfd, dmabuf) from one sink to multiple source elements in other processes. The unixfdsink proposes a memfd/shm allocator, which causes for example videotestsrc to write directly into memories that can be transfered to other processes without copying. Sponsored-by: Netflix Inc. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5328>
This commit is contained in:
parent
ceb686a32b
commit
f1a1901b8b
9 changed files with 1472 additions and 1 deletions
|
@ -11,7 +11,7 @@ foreach plugin : ['accurip', 'adpcmdec', 'adpcmenc', 'aiff', 'asfmux',
|
|||
'mxf', 'netsim', 'onvif', 'pcapparse', 'pnm', 'proxy',
|
||||
'rawparse', 'removesilence', 'rist', 'rtmp2', 'rtp', 'sdp',
|
||||
'segmentclip', 'siren', 'smooth', 'speed', 'subenc', 'switchbin',
|
||||
'timecode', 'transcode', 'videofilters',
|
||||
'timecode', 'transcode', 'unixfd', 'videofilters',
|
||||
'videoframe_audiolevel', 'videoparsers', 'videosignal',
|
||||
'vmnc', 'y4m']
|
||||
if not get_option(plugin).disabled()
|
||||
|
|
189
subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.c
Normal file
189
subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.c
Normal file
|
@ -0,0 +1,189 @@
|
|||
/* GStreamer unix file-descriptor source/sink
|
||||
*
|
||||
* Copyright (C) 2023 Netflix Inc.
|
||||
* Author: Xavier Claessens <xavier.claessens@collabora.com>
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Library General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Library General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Library General Public
|
||||
* License along with this library; if not, write to the
|
||||
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
||||
* Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
/**
|
||||
* plugin-unixfd:
|
||||
*
|
||||
* Since: 1.24
|
||||
*/
|
||||
|
||||
#ifdef HAVE_CONFIG_H
|
||||
#include "config.h"
|
||||
#endif
|
||||
|
||||
#include "gstunixfd.h"
|
||||
|
||||
#include <gio/gunixfdmessage.h>
|
||||
|
||||
typedef struct
|
||||
{
|
||||
guint32 type;
|
||||
guint32 payload_size;
|
||||
} Command;
|
||||
|
||||
/* For backward compatibility, do not change the size of Command. It should have
|
||||
* same size on 32bits and 64bits arches. */
|
||||
G_STATIC_ASSERT (sizeof (Command) == 8);
|
||||
G_STATIC_ASSERT (sizeof (MemoryPayload) == 16);
|
||||
G_STATIC_ASSERT (sizeof (NewBufferPayload) == 56);
|
||||
G_STATIC_ASSERT (sizeof (ReleaseBufferPayload) == 8);
|
||||
|
||||
gboolean
|
||||
gst_unix_fd_send_command (GSocket * socket, CommandType type, GUnixFDList * fds,
|
||||
const gchar * payload, gsize payload_size, GError ** error)
|
||||
{
|
||||
Command command = { type, payload_size };
|
||||
GOutputVector vect[] = {
|
||||
{&command, sizeof (Command)},
|
||||
{payload, payload_size},
|
||||
};
|
||||
GSocketControlMessage *msg = NULL;
|
||||
gint num_msg = 0;
|
||||
gboolean ret = TRUE;
|
||||
|
||||
if (fds != NULL) {
|
||||
msg = g_unix_fd_message_new_with_fd_list (fds);
|
||||
num_msg++;
|
||||
}
|
||||
|
||||
if (g_socket_send_message (socket, NULL, vect, G_N_ELEMENTS (vect),
|
||||
&msg, num_msg, G_SOCKET_MSG_NONE, NULL, error) < 0) {
|
||||
ret = FALSE;
|
||||
}
|
||||
|
||||
g_clear_object (&msg);
|
||||
return ret;
|
||||
}
|
||||
|
||||
gboolean
|
||||
gst_unix_fd_receive_command (GSocket * socket, GCancellable * cancellable,
|
||||
CommandType * type, GUnixFDList ** fds, gchar ** payload,
|
||||
gsize * payload_size, GError ** error)
|
||||
{
|
||||
Command command;
|
||||
GInputVector vect = { &command, sizeof (Command) };
|
||||
GSocketControlMessage **msg = NULL;
|
||||
gint num_msg = 0;
|
||||
gint flags = G_SOCKET_MSG_NONE;
|
||||
gboolean ret = TRUE;
|
||||
|
||||
if (g_socket_receive_message (socket, NULL, &vect, 1, &msg, &num_msg, &flags,
|
||||
cancellable, error) <= 0) {
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
*type = command.type;
|
||||
*payload = NULL;
|
||||
*payload_size = 0;
|
||||
|
||||
if (command.payload_size > 0) {
|
||||
*payload = g_malloc (command.payload_size);
|
||||
*payload_size = command.payload_size;
|
||||
if (g_socket_receive (socket, *payload, command.payload_size, cancellable,
|
||||
error) < (gssize) command.payload_size) {
|
||||
g_clear_pointer (payload, g_free);
|
||||
ret = FALSE;
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
|
||||
if (fds != NULL) {
|
||||
*fds = NULL;
|
||||
for (int i = 0; i < num_msg; i++) {
|
||||
if (G_IS_UNIX_FD_MESSAGE (msg[i])) {
|
||||
if (*fds != NULL) {
|
||||
g_set_error (error, GST_STREAM_ERROR, GST_STREAM_ERROR_FAILED,
|
||||
"Received more than one fd message");
|
||||
g_clear_pointer (payload, g_free);
|
||||
g_clear_object (fds);
|
||||
ret = FALSE;
|
||||
goto out;
|
||||
}
|
||||
*fds = g_object_ref (g_unix_fd_message_get_fd_list ((GUnixFDMessage *)
|
||||
msg[i]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
out:
|
||||
for (int i = 0; i < num_msg; i++)
|
||||
g_object_unref (msg[i]);
|
||||
g_free (msg);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
gboolean
|
||||
gst_unix_fd_parse_new_buffer (gchar * payload, gsize payload_size,
|
||||
NewBufferPayload ** new_buffer)
|
||||
{
|
||||
if (payload == NULL || payload_size < sizeof (NewBufferPayload))
|
||||
return FALSE;
|
||||
|
||||
*new_buffer = (NewBufferPayload *) payload;
|
||||
gsize struct_size =
|
||||
sizeof (NewBufferPayload) +
|
||||
sizeof (MemoryPayload) * (*new_buffer)->n_memory;
|
||||
if (payload_size < struct_size)
|
||||
return FALSE;
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
gboolean
|
||||
gst_unix_fd_parse_release_buffer (gchar * payload, gsize payload_size,
|
||||
ReleaseBufferPayload ** release_buffer)
|
||||
{
|
||||
if (payload == NULL || payload_size < sizeof (ReleaseBufferPayload))
|
||||
return FALSE;
|
||||
|
||||
*release_buffer = (ReleaseBufferPayload *) payload;
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
gboolean
|
||||
gst_unix_fd_parse_caps (gchar * payload, gsize payload_size, gchar ** caps_str)
|
||||
{
|
||||
if (payload == NULL || payload_size < 1 || payload[payload_size - 1] != '\0')
|
||||
return FALSE;
|
||||
|
||||
*caps_str = payload;
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
plugin_init (GstPlugin * plugin)
|
||||
{
|
||||
gboolean ret = FALSE;
|
||||
|
||||
ret |= GST_ELEMENT_REGISTER (unixfdsrc, plugin);
|
||||
ret |= GST_ELEMENT_REGISTER (unixfdsink, plugin);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
|
||||
GST_VERSION_MINOR,
|
||||
unixfd,
|
||||
"Unix file descriptor sink and source",
|
||||
plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
|
78
subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.h
Normal file
78
subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.h
Normal file
|
@ -0,0 +1,78 @@
|
|||
/* GStreamer unix file-descriptor source/sink
|
||||
*
|
||||
* Copyright (C) 2023 Netflix Inc.
|
||||
* Author: Xavier Claessens <xavier.claessens@collabora.com>
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Library General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Library General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Library General Public
|
||||
* License along with this library; if not, write to the
|
||||
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
||||
* Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <gst/gst.h>
|
||||
#include <gio/gio.h>
|
||||
#include <gio/gunixfdlist.h>
|
||||
|
||||
G_BEGIN_DECLS
|
||||
|
||||
typedef enum
|
||||
{
|
||||
COMMAND_TYPE_NEW_BUFFER = 0,
|
||||
COMMAND_TYPE_RELEASE_BUFFER = 1,
|
||||
COMMAND_TYPE_CAPS = 2,
|
||||
COMMAND_TYPE_EOS = 3,
|
||||
} CommandType;
|
||||
|
||||
typedef struct {
|
||||
guint64 size;
|
||||
guint64 offset;
|
||||
} MemoryPayload;
|
||||
|
||||
typedef struct {
|
||||
guint64 id;
|
||||
guint64 pts;
|
||||
guint64 dts;
|
||||
guint64 duration;
|
||||
guint64 offset;
|
||||
guint64 offset_end;
|
||||
guint32 flags;
|
||||
guint8 n_memory;
|
||||
guint8 padding8;
|
||||
guint16 padding16;
|
||||
MemoryPayload memories[];
|
||||
} NewBufferPayload;
|
||||
|
||||
typedef struct {
|
||||
guint64 id;
|
||||
} ReleaseBufferPayload;
|
||||
|
||||
gboolean gst_unix_fd_send_command(GSocket * socket, CommandType type,
|
||||
GUnixFDList * fds, const gchar * payload, gsize payload_size,
|
||||
GError ** error);
|
||||
gboolean gst_unix_fd_receive_command(GSocket * socket,
|
||||
GCancellable * cancellable, CommandType *type, GUnixFDList ** fds,
|
||||
gchar ** payload, gsize *payload_size, GError ** error);
|
||||
|
||||
gboolean gst_unix_fd_parse_new_buffer(gchar *payload, gsize payload_size,
|
||||
NewBufferPayload **new_buffer);
|
||||
gboolean gst_unix_fd_parse_release_buffer(gchar *payload, gsize payload_size,
|
||||
ReleaseBufferPayload **release_buffer);
|
||||
gboolean gst_unix_fd_parse_caps(gchar *payload, gsize payload_size,
|
||||
gchar **caps_str);
|
||||
|
||||
GST_ELEMENT_REGISTER_DECLARE (unixfdsrc);
|
||||
GST_ELEMENT_REGISTER_DECLARE (unixfdsink);
|
||||
|
||||
G_END_DECLS
|
596
subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c
Normal file
596
subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c
Normal file
|
@ -0,0 +1,596 @@
|
|||
/* GStreamer unix file-descriptor source/sink
|
||||
*
|
||||
* Copyright (C) 2023 Netflix Inc.
|
||||
* Author: Xavier Claessens <xavier.claessens@collabora.com>
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Library General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Library General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Library General Public
|
||||
* License along with this library; if not, write to the
|
||||
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
||||
* Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
/**
|
||||
* SECTION:element-unixfdsink
|
||||
* @title: unixfdsink
|
||||
*
|
||||
* Send file-descriptor backed buffers (e.g. memfd, dmabuf) over unix socket to
|
||||
* matching unixfdsrc. There can be any number of clients, if none are connected
|
||||
* buffers are dropped.
|
||||
*
|
||||
* Buffers can have any number of #GstMemory, but it is an error if any one of
|
||||
* them lacks a file-descriptor.
|
||||
*
|
||||
* #GstShmAllocator is added into the allocation proposition, which makes
|
||||
* most sources write their data into shared memory automatically.
|
||||
*
|
||||
* ## Example launch lines
|
||||
* |[
|
||||
* gst-launch-1.0 -v videotestsrc ! unixfdsink socket-path=/tmp/blah
|
||||
* gst-launch-1.0 -v unixfdsrc socket-path=/tmp/blah ! autovideosink
|
||||
* ]|
|
||||
*
|
||||
* Since: 1.24
|
||||
*/
|
||||
|
||||
#include "gstunixfd.h"
|
||||
|
||||
#include <gst/base/base.h>
|
||||
#include <gst/allocators/allocators.h>
|
||||
|
||||
#include <glib/gstdio.h>
|
||||
#include <gio/gio.h>
|
||||
#include <gio/gunixsocketaddress.h>
|
||||
|
||||
GST_DEBUG_CATEGORY (unixfdsink_debug);
|
||||
#define GST_CAT_DEFAULT (unixfdsink_debug)
|
||||
|
||||
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
|
||||
GST_PAD_SINK,
|
||||
GST_PAD_ALWAYS,
|
||||
GST_STATIC_CAPS_ANY);
|
||||
|
||||
#define GST_TYPE_UNIX_FD_SINK gst_unix_fd_sink_get_type()
|
||||
G_DECLARE_FINAL_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST, UNIX_FD_SINK,
|
||||
GstBaseSink);
|
||||
|
||||
typedef struct
|
||||
{
|
||||
GHashTable *buffers;
|
||||
GSource *source;
|
||||
} Client;
|
||||
|
||||
struct _GstUnixFdSink
|
||||
{
|
||||
GstBaseSink parent;
|
||||
|
||||
GThread *thread;
|
||||
GMainContext *context;
|
||||
GMainLoop *loop;
|
||||
|
||||
gchar *socket_path;
|
||||
GSocket *socket;
|
||||
GSource *source;
|
||||
|
||||
/* GSocket -> Client */
|
||||
GHashTable *clients;
|
||||
GstCaps *caps;
|
||||
gboolean uses_monotonic_clock;
|
||||
};
|
||||
|
||||
G_DEFINE_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST_TYPE_BASE_SINK);
|
||||
GST_ELEMENT_REGISTER_DEFINE (unixfdsink, "unixfdsink", GST_RANK_NONE,
|
||||
GST_TYPE_UNIX_FD_SINK);
|
||||
|
||||
enum
|
||||
{
|
||||
PROP_0,
|
||||
PROP_SOCKET_PATH,
|
||||
};
|
||||
|
||||
|
||||
static void
|
||||
client_free (Client * client)
|
||||
{
|
||||
g_hash_table_unref (client->buffers);
|
||||
g_source_destroy (client->source);
|
||||
g_source_unref (client->source);
|
||||
g_free (client);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_unix_fd_sink_init (GstUnixFdSink * self)
|
||||
{
|
||||
g_return_if_fail (GST_IS_UNIX_FD_SINK (self));
|
||||
|
||||
self->context = g_main_context_new ();
|
||||
self->loop = g_main_loop_new (self->context, FALSE);
|
||||
self->clients =
|
||||
g_hash_table_new_full (NULL, NULL, g_object_unref,
|
||||
(GDestroyNotify) client_free);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_unix_fd_sink_finalize (GObject * object)
|
||||
{
|
||||
GstUnixFdSink *self = GST_UNIX_FD_SINK (object);
|
||||
|
||||
g_free (self->socket_path);
|
||||
g_main_context_unref (self->context);
|
||||
g_main_loop_unref (self->loop);
|
||||
g_hash_table_unref (self->clients);
|
||||
|
||||
G_OBJECT_CLASS (gst_unix_fd_sink_parent_class)->finalize (object);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_unix_fd_sink_set_property (GObject * object, guint prop_id,
|
||||
const GValue * value, GParamSpec * pspec)
|
||||
{
|
||||
GstUnixFdSink *self = GST_UNIX_FD_SINK (object);
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
|
||||
switch (prop_id) {
|
||||
case PROP_SOCKET_PATH:
|
||||
if (self->socket) {
|
||||
GST_WARNING_OBJECT (self,
|
||||
"Can only change socket path in NULL or READY state");
|
||||
break;
|
||||
}
|
||||
g_free (self->socket_path);
|
||||
self->socket_path = g_value_dup_string (value);
|
||||
break;
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
}
|
||||
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_unix_fd_sink_get_property (GObject * object, guint prop_id,
|
||||
GValue * value, GParamSpec * pspec)
|
||||
{
|
||||
GstUnixFdSink *self = GST_UNIX_FD_SINK (object);
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
|
||||
switch (prop_id) {
|
||||
case PROP_SOCKET_PATH:
|
||||
g_value_set_string (value, self->socket_path);
|
||||
break;
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
}
|
||||
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
incoming_command_cb (GSocket * socket, GIOCondition cond, gpointer user_data)
|
||||
{
|
||||
GstUnixFdSink *self = user_data;
|
||||
Client *client;
|
||||
CommandType command;
|
||||
gchar *payload = NULL;
|
||||
gsize payload_size;
|
||||
GError *error = NULL;
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
|
||||
client = g_hash_table_lookup (self->clients, socket);
|
||||
|
||||
if (client == NULL) {
|
||||
GST_ERROR_OBJECT (self, "Received data from unknown client");
|
||||
goto on_error;
|
||||
}
|
||||
|
||||
if (!gst_unix_fd_receive_command (socket, NULL, &command, NULL, &payload,
|
||||
&payload_size, &error)) {
|
||||
GST_DEBUG_OBJECT (self, "Failed to receive message from client %p: %s",
|
||||
client, error != NULL ? error->message : "Connection closed by peer");
|
||||
goto on_error;
|
||||
}
|
||||
|
||||
switch (command) {
|
||||
case COMMAND_TYPE_NEW_BUFFER:
|
||||
case COMMAND_TYPE_CAPS:
|
||||
GST_ERROR_OBJECT (self, "Received wrong command %d from client %p",
|
||||
command, client);
|
||||
goto on_error;
|
||||
case COMMAND_TYPE_RELEASE_BUFFER:{
|
||||
ReleaseBufferPayload *release_buffer;
|
||||
if (!gst_unix_fd_parse_release_buffer (payload, payload_size,
|
||||
&release_buffer)) {
|
||||
GST_ERROR_OBJECT (self,
|
||||
"Received release-buffer with wrong payload size from client %p",
|
||||
client);
|
||||
goto on_error;
|
||||
}
|
||||
/* id is actually the GstBuffer pointer casted to guint64.
|
||||
* We can now drop its reference kept for this client. */
|
||||
if (!g_hash_table_remove (client->buffers, (gpointer) release_buffer->id)) {
|
||||
GST_ERROR_OBJECT (self,
|
||||
"Received wrong id %" G_GUINT64_FORMAT
|
||||
" in release-buffer command from client %p", release_buffer->id,
|
||||
client);
|
||||
goto on_error;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
/* Protocol could have been extended with new command */
|
||||
GST_DEBUG_OBJECT (self, "Ignoring unknown command %d", command);
|
||||
break;
|
||||
}
|
||||
|
||||
g_free (payload);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
|
||||
return G_SOURCE_CONTINUE;
|
||||
|
||||
on_error:
|
||||
g_hash_table_remove (self->clients, socket);
|
||||
g_clear_error (&error);
|
||||
g_free (payload);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
return G_SOURCE_REMOVE;
|
||||
}
|
||||
|
||||
static gchar *
|
||||
caps_to_payload (GstCaps * caps, gsize * payload_size)
|
||||
{
|
||||
gchar *payload = gst_caps_to_string (caps);
|
||||
*payload_size = strlen (payload) + 1;
|
||||
return payload;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
new_client_cb (GSocket * socket, GIOCondition cond, gpointer user_data)
|
||||
{
|
||||
GstUnixFdSink *self = user_data;
|
||||
Client *client;
|
||||
GError *error = NULL;
|
||||
|
||||
GSocket *client_socket = g_socket_accept (self->socket, NULL, &error);
|
||||
if (client_socket == NULL) {
|
||||
GST_ERROR_OBJECT (self, "Failed to accept connection: %s", error->message);
|
||||
return G_SOURCE_CONTINUE;
|
||||
}
|
||||
|
||||
client = g_new0 (Client, 1);
|
||||
client->buffers =
|
||||
g_hash_table_new_full (NULL, NULL, (GDestroyNotify) gst_buffer_unref,
|
||||
NULL);
|
||||
client->source = g_socket_create_source (client_socket, G_IO_IN, NULL);
|
||||
g_source_set_callback (client->source, (GSourceFunc) incoming_command_cb,
|
||||
self, NULL);
|
||||
g_source_attach (client->source, self->context);
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
|
||||
GST_DEBUG_OBJECT (self, "New client %p", client);
|
||||
g_hash_table_insert (self->clients, client_socket, client);
|
||||
|
||||
/* Start by sending our current caps. Keep the lock while doing that because
|
||||
* we don't want this client to miss a caps event or receive a buffer while we
|
||||
* send initial caps. */
|
||||
gsize payload_size;
|
||||
gchar *payload = caps_to_payload (self->caps, &payload_size);
|
||||
if (!gst_unix_fd_send_command (client_socket, COMMAND_TYPE_CAPS, NULL,
|
||||
payload, payload_size, &error)) {
|
||||
GST_ERROR_OBJECT (self, "Failed to send caps to new client %p: %s", client,
|
||||
error->message);
|
||||
g_hash_table_remove (self->clients, client_socket);
|
||||
g_clear_error (&error);
|
||||
}
|
||||
g_free (payload);
|
||||
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
|
||||
return G_SOURCE_CONTINUE;
|
||||
}
|
||||
|
||||
static gpointer
|
||||
thread_cb (gpointer user_data)
|
||||
{
|
||||
GstUnixFdSink *self = user_data;
|
||||
g_main_loop_run (self->loop);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_unix_fd_sink_start (GstBaseSink * bsink)
|
||||
{
|
||||
GstUnixFdSink *self = (GstUnixFdSink *) bsink;
|
||||
GSocketAddress *addr = NULL;
|
||||
GError *error = NULL;
|
||||
gboolean ret = TRUE;
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
|
||||
if (self->socket_path == NULL) {
|
||||
GST_ERROR_OBJECT (self, "Socket path is NULL");
|
||||
ret = FALSE;
|
||||
goto out;
|
||||
}
|
||||
|
||||
addr = g_unix_socket_address_new (self->socket_path);
|
||||
|
||||
self->socket =
|
||||
g_socket_new (G_SOCKET_FAMILY_UNIX, G_SOCKET_TYPE_STREAM,
|
||||
G_SOCKET_PROTOCOL_DEFAULT, &error);
|
||||
if (self->socket == NULL) {
|
||||
GST_ERROR_OBJECT (self, "Failed to create UNIX socket: %s", error->message);
|
||||
ret = FALSE;
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (!g_socket_bind (self->socket, addr, TRUE, &error)) {
|
||||
GST_ERROR_OBJECT (self, "Failed to bind socket: %s", error->message);
|
||||
g_clear_object (&self->socket);
|
||||
ret = FALSE;
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (!g_socket_listen (self->socket, &error)) {
|
||||
GST_ERROR_OBJECT (self, "Failed to listen socket: %s", error->message);
|
||||
g_clear_object (&self->socket);
|
||||
ret = FALSE;
|
||||
goto out;
|
||||
}
|
||||
|
||||
self->source = g_socket_create_source (self->socket, G_IO_IN, NULL);
|
||||
g_source_set_callback (self->source, (GSourceFunc) new_client_cb, self, NULL);
|
||||
g_source_attach (self->source, self->context);
|
||||
|
||||
self->thread = g_thread_new ("unixfdsink", thread_cb, self);
|
||||
|
||||
out:
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
g_clear_error (&error);
|
||||
g_clear_object (&addr);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_unix_fd_sink_stop (GstBaseSink * bsink)
|
||||
{
|
||||
GstUnixFdSink *self = (GstUnixFdSink *) bsink;
|
||||
|
||||
g_main_loop_quit (self->loop);
|
||||
g_thread_join (self->thread);
|
||||
|
||||
g_source_destroy (self->source);
|
||||
g_clear_pointer (&self->source, g_source_unref);
|
||||
g_clear_object (&self->socket);
|
||||
gst_clear_caps (&self->caps);
|
||||
g_hash_table_remove_all (self->clients);
|
||||
g_unlink (self->socket_path);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static void
|
||||
send_command_to_all (GstUnixFdSink * self, CommandType type, GUnixFDList * fds,
|
||||
const gchar * payload, gsize payload_size, GstBuffer * buffer)
|
||||
{
|
||||
GHashTableIter iter;
|
||||
GSocket *socket;
|
||||
Client *client;
|
||||
GError *error = NULL;
|
||||
|
||||
g_hash_table_iter_init (&iter, self->clients);
|
||||
while (g_hash_table_iter_next (&iter, (gpointer) & socket,
|
||||
(gpointer) & client)) {
|
||||
if (!gst_unix_fd_send_command (socket, type, fds, payload, payload_size,
|
||||
&error)) {
|
||||
GST_ERROR_OBJECT (self, "Failed to send command %d to client %p: %s",
|
||||
type, client, error->message);
|
||||
g_clear_error (&error);
|
||||
g_hash_table_iter_remove (&iter);
|
||||
continue;
|
||||
}
|
||||
/* Keep a ref on this buffer until all clients released it. */
|
||||
if (buffer != NULL)
|
||||
g_hash_table_add (client->buffers, gst_buffer_ref (buffer));
|
||||
}
|
||||
}
|
||||
|
||||
static GstClockTime
|
||||
calculate_timestamp (GstClockTime timestamp, GstClockTime base_time,
|
||||
GstClockTime latency, GstClockTimeDiff clock_diff)
|
||||
{
|
||||
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
|
||||
/* Convert running time to pipeline clock time */
|
||||
timestamp += base_time;
|
||||
if (GST_CLOCK_TIME_IS_VALID (latency))
|
||||
timestamp += latency;
|
||||
/* Convert to system monotonic clock time */
|
||||
if (clock_diff < 0 && -clock_diff > timestamp)
|
||||
return 0;
|
||||
timestamp += clock_diff;
|
||||
}
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
static GstFlowReturn
|
||||
gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
|
||||
{
|
||||
GstUnixFdSink *self = (GstUnixFdSink *) bsink;
|
||||
GstFlowReturn ret = GST_FLOW_OK;
|
||||
GError *error = NULL;
|
||||
|
||||
/* Allocate payload */
|
||||
guint n_memory = gst_buffer_n_memory (buffer);
|
||||
gsize payload_size =
|
||||
sizeof (NewBufferPayload) + sizeof (MemoryPayload) * n_memory;
|
||||
gchar *payload = g_malloc0 (payload_size);
|
||||
|
||||
GstClockTime latency = gst_base_sink_get_latency (GST_BASE_SINK_CAST (self));
|
||||
GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT_CAST (self));
|
||||
GstClockTimeDiff clock_diff = 0;
|
||||
if (!self->uses_monotonic_clock) {
|
||||
clock_diff = GST_CLOCK_DIFF (g_get_monotonic_time () * GST_USECOND,
|
||||
gst_clock_get_time (GST_ELEMENT_CLOCK (self)));
|
||||
}
|
||||
|
||||
NewBufferPayload *new_buffer = (NewBufferPayload *) payload;
|
||||
/* Cast buffer pointer to guint64 identifier. Client will send us back that
|
||||
* id so we know which buffer to unref. */
|
||||
new_buffer->id = (guint64) buffer;
|
||||
new_buffer->pts =
|
||||
calculate_timestamp (GST_BUFFER_PTS (buffer), base_time, latency,
|
||||
clock_diff);
|
||||
new_buffer->dts =
|
||||
calculate_timestamp (GST_BUFFER_DTS (buffer), base_time, latency,
|
||||
clock_diff);
|
||||
new_buffer->duration = GST_BUFFER_DURATION (buffer);
|
||||
new_buffer->offset = GST_BUFFER_OFFSET (buffer);
|
||||
new_buffer->offset_end = GST_BUFFER_OFFSET_END (buffer);
|
||||
new_buffer->flags = GST_BUFFER_FLAGS (buffer);
|
||||
new_buffer->n_memory = n_memory;
|
||||
|
||||
GUnixFDList *fds = g_unix_fd_list_new ();
|
||||
for (int i = 0; i < n_memory; i++) {
|
||||
GstMemory *mem = gst_buffer_peek_memory (buffer, i);
|
||||
if (!gst_is_fd_memory (mem)) {
|
||||
GST_ERROR_OBJECT (self, "Expecting buffers with FD memories");
|
||||
ret = GST_FLOW_ERROR;
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (g_unix_fd_list_append (fds, gst_fd_memory_get_fd (mem), &error) < 0) {
|
||||
GST_ERROR_OBJECT (self, "Failed to append FD: %s", error->message);
|
||||
ret = GST_FLOW_ERROR;
|
||||
goto out;
|
||||
}
|
||||
|
||||
gsize offset;
|
||||
new_buffer->memories[i].size = gst_memory_get_sizes (mem, &offset, NULL);
|
||||
new_buffer->memories[i].offset = offset;
|
||||
}
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
send_command_to_all (self, COMMAND_TYPE_NEW_BUFFER, fds, payload,
|
||||
payload_size, buffer);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
|
||||
out:
|
||||
g_clear_object (&fds);
|
||||
g_clear_error (&error);
|
||||
g_free (payload);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_unix_fd_sink_event (GstBaseSink * bsink, GstEvent * event)
|
||||
{
|
||||
GstUnixFdSink *self = (GstUnixFdSink *) bsink;
|
||||
|
||||
switch (GST_EVENT_TYPE (event)) {
|
||||
case GST_EVENT_CAPS:{
|
||||
GST_OBJECT_LOCK (self);
|
||||
gst_clear_caps (&self->caps);
|
||||
gst_event_parse_caps (event, &self->caps);
|
||||
gst_caps_ref (self->caps);
|
||||
GST_DEBUG_OBJECT (self, "Send new caps to all clients: %" GST_PTR_FORMAT,
|
||||
self->caps);
|
||||
gsize payload_size;
|
||||
gchar *payload = caps_to_payload (self->caps, &payload_size);
|
||||
send_command_to_all (self, COMMAND_TYPE_CAPS, NULL, payload, payload_size,
|
||||
NULL);
|
||||
g_free (payload);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
break;
|
||||
}
|
||||
case GST_EVENT_EOS:{
|
||||
GST_OBJECT_LOCK (self);
|
||||
send_command_to_all (self, COMMAND_TYPE_EOS, NULL, NULL, 0, NULL);
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return GST_BASE_SINK_CLASS (gst_unix_fd_sink_parent_class)->event (bsink,
|
||||
event);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_unix_fd_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query)
|
||||
{
|
||||
GstAllocator *allocator = gst_shm_allocator_get ();
|
||||
gst_query_add_allocation_param (query, allocator, NULL);
|
||||
gst_object_unref (allocator);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_unix_fd_sink_set_clock (GstElement * element, GstClock * clock)
|
||||
{
|
||||
GstUnixFdSink *self = (GstUnixFdSink *) element;
|
||||
|
||||
self->uses_monotonic_clock = FALSE;
|
||||
if (clock != NULL && G_OBJECT_TYPE (clock) == GST_TYPE_SYSTEM_CLOCK) {
|
||||
GstClockType clock_type;
|
||||
g_object_get (clock, "clock-type", &clock_type, NULL);
|
||||
self->uses_monotonic_clock = clock_type == GST_CLOCK_TYPE_MONOTONIC;
|
||||
}
|
||||
|
||||
return GST_ELEMENT_CLASS (gst_unix_fd_sink_parent_class)->set_clock (element,
|
||||
clock);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass)
|
||||
{
|
||||
GObjectClass *gobject_class = (GObjectClass *) klass;
|
||||
GstElementClass *gstelement_class = (GstElementClass *) klass;
|
||||
GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
|
||||
|
||||
GST_DEBUG_CATEGORY_INIT (unixfdsink_debug, "unixfdsink", 0,
|
||||
"Unix file descriptor sink");
|
||||
gst_element_class_set_static_metadata (gstelement_class,
|
||||
"Unix file descriptor sink", "Sink", "Unix file descriptor sink",
|
||||
"Xavier Claessens <xavier.claessens@collabora.com>");
|
||||
gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
|
||||
|
||||
gst_shm_allocator_init_once ();
|
||||
|
||||
gobject_class->finalize = gst_unix_fd_sink_finalize;
|
||||
gobject_class->set_property = gst_unix_fd_sink_set_property;
|
||||
gobject_class->get_property = gst_unix_fd_sink_get_property;
|
||||
|
||||
gstelement_class->set_clock = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_set_clock);
|
||||
|
||||
gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_start);
|
||||
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_stop);
|
||||
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_render);
|
||||
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_event);
|
||||
gstbasesink_class->propose_allocation =
|
||||
GST_DEBUG_FUNCPTR (gst_unix_fd_sink_propose_allocation);
|
||||
|
||||
g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
|
||||
g_param_spec_string ("socket-path",
|
||||
"Path to the control socket",
|
||||
"The path to the control socket used to control the shared memory "
|
||||
"transport. This may be modified during the NULL->READY transition",
|
||||
NULL,
|
||||
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
|
||||
GST_PARAM_MUTABLE_READY));
|
||||
}
|
477
subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsrc.c
Normal file
477
subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsrc.c
Normal file
|
@ -0,0 +1,477 @@
|
|||
/* GStreamer unix file-descriptor source/sink
|
||||
*
|
||||
* Copyright (C) 2023 Netflix Inc.
|
||||
* Author: Xavier Claessens <xavier.claessens@collabora.com>
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Library General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Library General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Library General Public
|
||||
* License along with this library; if not, write to the
|
||||
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
||||
* Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
/**
|
||||
* SECTION:element-unixfdsrc
|
||||
* @title: unixfdsrc
|
||||
*
|
||||
* Receive file-descriptor backed buffers (e.g. memfd, dmabuf) over unix socket
|
||||
* from matching unixfdsink.
|
||||
*
|
||||
* ## Example launch lines
|
||||
* |[
|
||||
* gst-launch-1.0 -v videotestsrc ! unixfdsink socket-path=/tmp/blah
|
||||
* gst-launch-1.0 -v unixfdsrc socket-path=/tmp/blah ! autovideosink
|
||||
* ]|
|
||||
*
|
||||
* Since: 1.24
|
||||
*/
|
||||
|
||||
#include "gstunixfd.h"
|
||||
|
||||
#include <gst/base/base.h>
|
||||
#include <gst/allocators/allocators.h>
|
||||
|
||||
#include <glib/gstdio.h>
|
||||
#include <gio/gio.h>
|
||||
#include <gio/gunixfdmessage.h>
|
||||
#include <gio/gunixsocketaddress.h>
|
||||
|
||||
GST_DEBUG_CATEGORY (unixfdsrc_debug);
|
||||
#define GST_CAT_DEFAULT (unixfdsrc_debug)
|
||||
|
||||
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
|
||||
GST_PAD_SRC,
|
||||
GST_PAD_ALWAYS,
|
||||
GST_STATIC_CAPS_ANY);
|
||||
|
||||
#define GST_TYPE_UNIX_FD_SRC gst_unix_fd_src_get_type()
|
||||
G_DECLARE_FINAL_TYPE (GstUnixFdSrc, gst_unix_fd_src, GST, UNIX_FD_SRC,
|
||||
GstPushSrc);
|
||||
|
||||
struct _GstUnixFdSrc
|
||||
{
|
||||
GstPushSrc parent;
|
||||
|
||||
gchar *socket_path;
|
||||
GSocket *socket;
|
||||
GCancellable *cancellable;
|
||||
|
||||
GstAllocator *allocator;
|
||||
GHashTable *memories;
|
||||
gboolean uses_monotonic_clock;
|
||||
};
|
||||
|
||||
G_DEFINE_TYPE (GstUnixFdSrc, gst_unix_fd_src, GST_TYPE_PUSH_SRC);
|
||||
GST_ELEMENT_REGISTER_DEFINE (unixfdsrc, "unixfdsrc", GST_RANK_NONE,
|
||||
GST_TYPE_UNIX_FD_SRC);
|
||||
|
||||
enum
|
||||
{
|
||||
PROP_0,
|
||||
PROP_SOCKET_PATH,
|
||||
};
|
||||
|
||||
typedef struct
|
||||
{
|
||||
guint64 id;
|
||||
guint n_memory;
|
||||
} BufferContext;
|
||||
|
||||
static void
|
||||
memory_weak_ref_cb (GstUnixFdSrc * self, GstMemory * mem)
|
||||
{
|
||||
GST_OBJECT_LOCK (self);
|
||||
|
||||
BufferContext *ctx = g_hash_table_lookup (self->memories, mem);
|
||||
if (ctx == NULL)
|
||||
goto out;
|
||||
|
||||
if (--ctx->n_memory == 0) {
|
||||
/* Notify that we are not using this buffer anymore */
|
||||
ReleaseBufferPayload payload = { ctx->id };
|
||||
GError *error = NULL;
|
||||
if (!gst_unix_fd_send_command (self->socket, COMMAND_TYPE_RELEASE_BUFFER,
|
||||
NULL, (const gchar *) &payload, sizeof (payload), &error)) {
|
||||
GST_WARNING_OBJECT (self, "Failed to send release-buffer command: %s",
|
||||
error->message);
|
||||
g_clear_error (&error);
|
||||
}
|
||||
g_free (ctx);
|
||||
}
|
||||
|
||||
g_hash_table_remove (self->memories, mem);
|
||||
|
||||
out:
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_unix_fd_src_init (GstUnixFdSrc * self)
|
||||
{
|
||||
g_return_if_fail (GST_IS_UNIX_FD_SRC (self));
|
||||
|
||||
self->cancellable = g_cancellable_new ();
|
||||
self->memories = g_hash_table_new (NULL, NULL);
|
||||
self->allocator = gst_fd_allocator_new ();
|
||||
gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_unix_fd_src_finalize (GObject * object)
|
||||
{
|
||||
GstUnixFdSrc *self = GST_UNIX_FD_SRC (object);
|
||||
|
||||
g_free (self->socket_path);
|
||||
g_object_unref (self->cancellable);
|
||||
g_hash_table_unref (self->memories);
|
||||
gst_object_unref (self->allocator);
|
||||
|
||||
G_OBJECT_CLASS (gst_unix_fd_src_parent_class)->finalize (object);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_unix_fd_src_set_property (GObject * object, guint prop_id,
|
||||
const GValue * value, GParamSpec * pspec)
|
||||
{
|
||||
GstUnixFdSrc *self = GST_UNIX_FD_SRC (object);
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
|
||||
switch (prop_id) {
|
||||
case PROP_SOCKET_PATH:
|
||||
if (self->socket) {
|
||||
GST_WARNING_OBJECT (self,
|
||||
"Can only change socket path in NULL or READY state");
|
||||
break;
|
||||
}
|
||||
g_free (self->socket_path);
|
||||
self->socket_path = g_value_dup_string (value);
|
||||
break;
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
}
|
||||
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_unix_fd_src_get_property (GObject * object, guint prop_id,
|
||||
GValue * value, GParamSpec * pspec)
|
||||
{
|
||||
GstUnixFdSrc *self = GST_UNIX_FD_SRC (object);
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
|
||||
switch (prop_id) {
|
||||
case PROP_SOCKET_PATH:
|
||||
g_value_set_string (value, self->socket_path);
|
||||
break;
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
}
|
||||
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_unix_fd_src_start (GstBaseSrc * bsrc)
|
||||
{
|
||||
GstUnixFdSrc *self = (GstUnixFdSrc *) bsrc;
|
||||
GSocketAddress *addr = NULL;
|
||||
GError *error = NULL;
|
||||
gboolean ret = TRUE;
|
||||
|
||||
gst_base_src_set_format (bsrc, GST_FORMAT_TIME);
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
|
||||
if (self->socket_path == NULL) {
|
||||
GST_ERROR_OBJECT (self, "Socket path is NULL");
|
||||
ret = FALSE;
|
||||
goto out;
|
||||
}
|
||||
|
||||
addr = g_unix_socket_address_new (self->socket_path);
|
||||
|
||||
self->socket =
|
||||
g_socket_new (G_SOCKET_FAMILY_UNIX, G_SOCKET_TYPE_STREAM,
|
||||
G_SOCKET_PROTOCOL_DEFAULT, &error);
|
||||
if (self->socket == NULL) {
|
||||
GST_ERROR_OBJECT (self, "Failed to create UNIX socket: %s", error->message);
|
||||
ret = FALSE;
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (!g_socket_connect (self->socket, addr, NULL, &error)) {
|
||||
GST_ERROR_OBJECT (self, "Failed to connect socket: %s", error->message);
|
||||
g_clear_object (&self->socket);
|
||||
ret = FALSE;
|
||||
goto out;
|
||||
}
|
||||
|
||||
out:
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
g_clear_error (&error);
|
||||
g_clear_object (&addr);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_unix_fd_src_stop (GstBaseSrc * bsrc)
|
||||
{
|
||||
GstUnixFdSrc *self = (GstUnixFdSrc *) bsrc;
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
|
||||
/* Remove all weak refs we could still have to not be called back later.
|
||||
* Service side will cleanup pending buffers when socket gets closed. */
|
||||
GstMemory *mem;
|
||||
BufferContext *ctx;
|
||||
GHashTableIter iter;
|
||||
g_hash_table_iter_init (&iter, self->memories);
|
||||
while (g_hash_table_iter_next (&iter, (gpointer *) & mem, (gpointer *) & ctx)) {
|
||||
gst_mini_object_weak_unref (GST_MINI_OBJECT_CAST (mem),
|
||||
(GstMiniObjectNotify) memory_weak_ref_cb, self);
|
||||
if (--ctx->n_memory == 0)
|
||||
g_free (ctx);
|
||||
}
|
||||
g_hash_table_remove_all (self->memories);
|
||||
g_clear_object (&self->socket);
|
||||
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_unix_fd_src_unlock (GstBaseSrc * bsrc)
|
||||
{
|
||||
GstUnixFdSrc *self = GST_UNIX_FD_SRC (bsrc);
|
||||
g_cancellable_cancel (self->cancellable);
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_unix_fd_src_unlock_stop (GstBaseSrc * bsrc)
|
||||
{
|
||||
GstUnixFdSrc *self = GST_UNIX_FD_SRC (bsrc);
|
||||
g_cancellable_reset (self->cancellable);
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static GstClockTime
|
||||
calculate_timestamp (GstClockTime timestamp, GstClockTime base_time,
|
||||
GstClockTimeDiff clock_diff)
|
||||
{
|
||||
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
|
||||
/* Convert from system monotonic clock time to pipeline clock time */
|
||||
if (clock_diff > 0 && clock_diff > timestamp)
|
||||
return 0;
|
||||
timestamp -= clock_diff;
|
||||
/* Convert to running time */
|
||||
if (base_time > timestamp)
|
||||
return 0;
|
||||
timestamp -= base_time;
|
||||
}
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
static GstFlowReturn
|
||||
gst_unix_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
|
||||
{
|
||||
GstUnixFdSrc *self = GST_UNIX_FD_SRC (psrc);
|
||||
CommandType command;
|
||||
GUnixFDList *fds = NULL;
|
||||
gchar *payload = NULL;
|
||||
gsize payload_size;
|
||||
GError *error = NULL;
|
||||
GstFlowReturn ret = GST_FLOW_OK;
|
||||
|
||||
again:
|
||||
/* Block until we receive a command */
|
||||
if (!gst_unix_fd_receive_command (self->socket, self->cancellable, &command,
|
||||
&fds, &payload, &payload_size, &error)) {
|
||||
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
|
||||
ret = GST_FLOW_FLUSHING;
|
||||
goto on_error;
|
||||
}
|
||||
GST_ERROR_OBJECT (self, "Failed to read from sink element: %s",
|
||||
error != NULL ? error->message : "Connection closed by peer");
|
||||
ret = GST_FLOW_ERROR;
|
||||
goto on_error;
|
||||
}
|
||||
|
||||
switch (command) {
|
||||
case COMMAND_TYPE_RELEASE_BUFFER:
|
||||
GST_ERROR_OBJECT (self, "Received wrong command %d", command);
|
||||
ret = GST_FLOW_ERROR;
|
||||
goto on_error;
|
||||
case COMMAND_TYPE_NEW_BUFFER:{
|
||||
NewBufferPayload *new_buffer;
|
||||
if (!gst_unix_fd_parse_new_buffer (payload, payload_size, &new_buffer)) {
|
||||
GST_ERROR_OBJECT (self, "Received new-buffer with wrong payload size");
|
||||
ret = GST_FLOW_ERROR;
|
||||
goto on_error;
|
||||
}
|
||||
|
||||
if (fds == NULL) {
|
||||
GST_ERROR_OBJECT (self,
|
||||
"Received new buffer command without file descriptors");
|
||||
return GST_FLOW_ERROR;
|
||||
}
|
||||
|
||||
if (g_unix_fd_list_get_length (fds) != new_buffer->n_memory) {
|
||||
GST_ERROR_OBJECT (self,
|
||||
"Received new buffer command with %d file descriptors instead of %d",
|
||||
g_unix_fd_list_get_length (fds), new_buffer->n_memory);
|
||||
ret = GST_FLOW_ERROR;
|
||||
goto on_error;
|
||||
}
|
||||
|
||||
gint *fds_arr = g_unix_fd_list_steal_fds (fds, NULL);
|
||||
|
||||
BufferContext *ctx = g_new0 (BufferContext, 1);
|
||||
ctx->id = new_buffer->id;
|
||||
ctx->n_memory = new_buffer->n_memory;
|
||||
|
||||
*outbuf = gst_buffer_new ();
|
||||
|
||||
GstClockTime base_time =
|
||||
gst_element_get_base_time (GST_ELEMENT_CAST (self));
|
||||
GstClockTimeDiff clock_diff = 0;
|
||||
if (!self->uses_monotonic_clock) {
|
||||
clock_diff = GST_CLOCK_DIFF (g_get_monotonic_time () * GST_USECOND,
|
||||
gst_clock_get_time (GST_ELEMENT_CLOCK (self)));
|
||||
}
|
||||
|
||||
GST_BUFFER_PTS (*outbuf) =
|
||||
calculate_timestamp (new_buffer->pts, base_time, clock_diff);
|
||||
GST_BUFFER_DTS (*outbuf) =
|
||||
calculate_timestamp (new_buffer->dts, base_time, clock_diff);
|
||||
GST_BUFFER_DURATION (*outbuf) = new_buffer->duration;
|
||||
GST_BUFFER_OFFSET (*outbuf) = new_buffer->offset;
|
||||
GST_BUFFER_OFFSET_END (*outbuf) = new_buffer->offset_end;
|
||||
GST_BUFFER_FLAGS (*outbuf) = new_buffer->flags;
|
||||
|
||||
GST_OBJECT_LOCK (self);
|
||||
for (int i = 0; i < new_buffer->n_memory; i++) {
|
||||
GstMemory *mem = gst_fd_allocator_alloc (self->allocator, fds_arr[i],
|
||||
new_buffer->memories[i].size, GST_FD_MEMORY_FLAG_NONE);
|
||||
gst_memory_resize (mem, new_buffer->memories[i].offset,
|
||||
new_buffer->memories[i].size);
|
||||
GST_MINI_OBJECT_FLAG_SET (mem, GST_MEMORY_FLAG_READONLY);
|
||||
|
||||
g_hash_table_insert (self->memories, mem, ctx);
|
||||
gst_mini_object_weak_ref (GST_MINI_OBJECT_CAST (mem),
|
||||
(GstMiniObjectNotify) memory_weak_ref_cb, self);
|
||||
|
||||
gst_buffer_append_memory (*outbuf, mem);
|
||||
}
|
||||
GST_OBJECT_UNLOCK (self);
|
||||
|
||||
g_free (fds_arr);
|
||||
|
||||
break;
|
||||
}
|
||||
case COMMAND_TYPE_CAPS:{
|
||||
gchar *caps_str;
|
||||
if (!gst_unix_fd_parse_caps (payload, payload_size, &caps_str)) {
|
||||
GST_ERROR_OBJECT (self, "Received caps string is not nul-terminated");
|
||||
ret = GST_FLOW_ERROR;
|
||||
goto on_error;
|
||||
}
|
||||
GstCaps *caps = gst_caps_from_string (caps_str);
|
||||
GST_DEBUG_OBJECT (self, "Received caps %" GST_PTR_FORMAT, caps);
|
||||
gst_base_src_set_caps (GST_BASE_SRC_CAST (self), caps);
|
||||
gst_caps_unref (caps);
|
||||
break;
|
||||
}
|
||||
case COMMAND_TYPE_EOS:{
|
||||
GST_DEBUG_OBJECT (self, "Received EOS");
|
||||
ret = GST_FLOW_EOS;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
/* Protocol could have been extended with new command */
|
||||
GST_DEBUG_OBJECT (self, "Ignoring unknown command %d", command);
|
||||
break;
|
||||
}
|
||||
|
||||
if (*outbuf == NULL && ret == GST_FLOW_OK) {
|
||||
g_clear_object (&fds);
|
||||
g_clear_pointer (&payload, g_free);
|
||||
goto again;
|
||||
}
|
||||
|
||||
on_error:
|
||||
g_clear_error (&error);
|
||||
g_clear_object (&fds);
|
||||
g_free (payload);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_unix_fd_src_set_clock (GstElement * element, GstClock * clock)
|
||||
{
|
||||
GstUnixFdSrc *self = (GstUnixFdSrc *) element;
|
||||
|
||||
self->uses_monotonic_clock = FALSE;
|
||||
if (clock != NULL && G_OBJECT_TYPE (clock) == GST_TYPE_SYSTEM_CLOCK) {
|
||||
GstClockType clock_type;
|
||||
g_object_get (clock, "clock-type", &clock_type, NULL);
|
||||
self->uses_monotonic_clock = clock_type == GST_CLOCK_TYPE_MONOTONIC;
|
||||
}
|
||||
|
||||
return GST_ELEMENT_CLASS (gst_unix_fd_src_parent_class)->set_clock (element,
|
||||
clock);
|
||||
}
|
||||
|
||||
static void
|
||||
gst_unix_fd_src_class_init (GstUnixFdSrcClass * klass)
|
||||
{
|
||||
GObjectClass *gobject_class = (GObjectClass *) klass;
|
||||
GstElementClass *gstelement_class = (GstElementClass *) klass;
|
||||
GstBaseSrcClass *gstbasesrc_class = (GstBaseSrcClass *) klass;
|
||||
GstPushSrcClass *gstpushsrc_class = (GstPushSrcClass *) klass;
|
||||
|
||||
GST_DEBUG_CATEGORY_INIT (unixfdsrc_debug, "unixfdsrc", 0,
|
||||
"Unix file descriptor source");
|
||||
gst_element_class_set_static_metadata (gstelement_class,
|
||||
"Unix file descriptor source", "Src", "Unix file descriptor source",
|
||||
"Xavier Claessens <xavier.claessens@collabora.com>");
|
||||
gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
|
||||
|
||||
gobject_class->finalize = gst_unix_fd_src_finalize;
|
||||
gobject_class->set_property = gst_unix_fd_src_set_property;
|
||||
gobject_class->get_property = gst_unix_fd_src_get_property;
|
||||
|
||||
gstelement_class->set_clock = GST_DEBUG_FUNCPTR (gst_unix_fd_src_set_clock);
|
||||
|
||||
gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_unix_fd_src_start);
|
||||
gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_unix_fd_src_stop);
|
||||
gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_unix_fd_src_unlock);
|
||||
gstbasesrc_class->unlock_stop =
|
||||
GST_DEBUG_FUNCPTR (gst_unix_fd_src_unlock_stop);
|
||||
|
||||
gstpushsrc_class->create = gst_unix_fd_src_create;
|
||||
|
||||
g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
|
||||
g_param_spec_string ("socket-path",
|
||||
"Path to the control socket",
|
||||
"The path to the control socket used to control the shared memory "
|
||||
"transport. This may be modified during the NULL->READY transition",
|
||||
NULL,
|
||||
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
|
||||
GST_PARAM_MUTABLE_READY));
|
||||
}
|
23
subprojects/gst-plugins-bad/gst/unixfd/meson.build
Normal file
23
subprojects/gst-plugins-bad/gst/unixfd/meson.build
Normal file
|
@ -0,0 +1,23 @@
|
|||
sources = [
|
||||
'gstunixfd.c',
|
||||
'gstunixfdsink.c',
|
||||
'gstunixfdsrc.c',
|
||||
]
|
||||
|
||||
gstallocators_dep = dependency('gstreamer-allocators-1.0')
|
||||
|
||||
gio_unix_dep = dependency('gio-unix-2.0', required: get_option('unixfd'))
|
||||
if not gio_unix_dep.found()
|
||||
subdir_done()
|
||||
endif
|
||||
|
||||
gstunixfd_plugin = library('gstunixfd',
|
||||
sources,
|
||||
c_args: gst_plugins_bad_args,
|
||||
include_directories: [configinc],
|
||||
dependencies : [gstbase_dep, gstallocators_dep, gio_dep, gio_unix_dep],
|
||||
install : true,
|
||||
install_dir : plugins_install_dir,
|
||||
)
|
||||
|
||||
plugins += [gstunixfd_plugin]
|
|
@ -63,6 +63,7 @@ option('speed', type : 'feature', value : 'auto')
|
|||
option('subenc', type : 'feature', value : 'auto')
|
||||
option('switchbin', type : 'feature', value : 'auto')
|
||||
option('timecode', type : 'feature', value : 'auto')
|
||||
option('unixfd', type : 'feature', value : 'auto')
|
||||
option('videofilters', type : 'feature', value : 'auto')
|
||||
option('videoframe_audiolevel', type : 'feature', value : 'auto')
|
||||
option('videoparsers', type : 'feature', value : 'auto')
|
||||
|
|
106
subprojects/gst-plugins-bad/tests/check/elements/unixfd.c
Normal file
106
subprojects/gst-plugins-bad/tests/check/elements/unixfd.c
Normal file
|
@ -0,0 +1,106 @@
|
|||
/* GStreamer unix file-descriptor source/sink tests
|
||||
*
|
||||
* Copyright (C) 2023 Netflix Inc.
|
||||
* Author: Xavier Claessens <xavier.claessens@collabora.com>
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Library General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Library General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Library General Public
|
||||
* License along with this library; if not, write to the
|
||||
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
||||
* Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#ifdef HAVE_CONFIG_H
|
||||
#include "config.h"
|
||||
#endif
|
||||
|
||||
#include <gst/gst.h>
|
||||
#include <gst/check/gstcheck.h>
|
||||
#include <glib/gstdio.h>
|
||||
|
||||
static void
|
||||
wait_preroll (GstElement * element)
|
||||
{
|
||||
GstStateChangeReturn state_res =
|
||||
gst_element_set_state (element, GST_STATE_PLAYING);
|
||||
fail_unless (state_res != GST_STATE_CHANGE_FAILURE);
|
||||
state_res = gst_element_get_state (element, NULL, NULL, GST_CLOCK_TIME_NONE);
|
||||
fail_unless (state_res == GST_STATE_CHANGE_SUCCESS);
|
||||
}
|
||||
|
||||
GST_START_TEST (test_unixfd_videotestsrc)
|
||||
{
|
||||
GError *error = NULL;
|
||||
|
||||
/* Ensure we don't have socket from previous failed test */
|
||||
gchar *socket_path =
|
||||
g_strdup_printf ("%s/unixfd-test-socket", g_get_user_runtime_dir ());
|
||||
if (g_file_test (socket_path, G_FILE_TEST_EXISTS)) {
|
||||
g_unlink (socket_path);
|
||||
}
|
||||
|
||||
/* Setup source */
|
||||
gchar *pipeline_str =
|
||||
g_strdup_printf ("videotestsrc ! unixfdsink socket-path=%s", socket_path);
|
||||
GstElement *pipeline_service = gst_parse_launch (pipeline_str, &error);
|
||||
g_assert_no_error (error);
|
||||
g_free (pipeline_str);
|
||||
wait_preroll (pipeline_service);
|
||||
|
||||
/* Setup sink */
|
||||
pipeline_str =
|
||||
g_strdup_printf ("unixfdsrc socket-path=%s ! fakesink", socket_path);
|
||||
GstElement *pipeline_client_1 = gst_parse_launch (pipeline_str, &error);
|
||||
g_assert_no_error (error);
|
||||
wait_preroll (pipeline_client_1);
|
||||
|
||||
/* disconnect, reconnect */
|
||||
fail_unless (gst_element_set_state (pipeline_client_1,
|
||||
GST_STATE_READY) == GST_STATE_CHANGE_SUCCESS);
|
||||
wait_preroll (pipeline_client_1);
|
||||
|
||||
/* Connect 2nd sink */
|
||||
GstElement *pipeline_client_2 = gst_parse_launch (pipeline_str, &error);
|
||||
g_assert_no_error (error);
|
||||
wait_preroll (pipeline_client_2);
|
||||
|
||||
/* Teardown */
|
||||
fail_unless (gst_element_set_state (pipeline_client_1,
|
||||
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
|
||||
fail_unless (gst_element_set_state (pipeline_client_2,
|
||||
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
|
||||
fail_unless (gst_element_set_state (pipeline_service,
|
||||
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
|
||||
fail_if (g_file_test (socket_path, G_FILE_TEST_EXISTS));
|
||||
|
||||
gst_object_unref (pipeline_service);
|
||||
gst_object_unref (pipeline_client_1);
|
||||
gst_object_unref (pipeline_client_2);
|
||||
g_free (socket_path);
|
||||
g_free (pipeline_str);
|
||||
}
|
||||
|
||||
GST_END_TEST;
|
||||
|
||||
static Suite *
|
||||
unixfd_suite (void)
|
||||
{
|
||||
Suite *s = suite_create ("unixfd");
|
||||
TCase *tc = tcase_create ("unixfd");
|
||||
|
||||
suite_add_tcase (s, tc);
|
||||
tcase_add_test (tc, test_unixfd_videotestsrc);
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
GST_CHECK_MAIN (unixfd);
|
|
@ -142,6 +142,7 @@ if host_machine.system() != 'windows'
|
|||
[['elements/jpegparse.c'], not cdata.has('HAVE_UNISTD_H')],
|
||||
[['elements/netsim.c']],
|
||||
[['elements/shm.c'], not shm_enabled, shm_deps],
|
||||
[['elements/unixfd.c'], not gio_unix_dep.found()],
|
||||
[['elements/voaacenc.c'],
|
||||
not voaac_dep.found() or not cdata.has('HAVE_UNISTD_H'), [voaac_dep]],
|
||||
[['elements/webrtcbin.c'], not libnice_dep.found(), [gstwebrtc_dep]],
|
||||
|
|
Loading…
Reference in a new issue