mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-23 07:38:16 +00:00
7b6023d9cf
Adding cudaipc{src,sink} element for CUDA IPC support. Implementation note: * For the communication between end points, Win32 named-pipe and unix domain socket will be used on Windows and Linux respectively. * cudaipcsink behaves as a server, and all GPU resources will be owned by the server process and exported for other processes, then cudaipcsrc (client) will import each exported handle. * User can select IPC mode via "ipc-mode" property of cudaipcsink. There are two IPC mode, one is "legacy" which uses legacy CUDA IPC method and the other is "mmap" which uses CUDA virtual memory API with OS's resource handle sharing method such as DuplicateHandle() on Windows. The "mmap" mode might be better than "legacy" in terms of stability since it relies on OS's resource management but it would consume more GPU memory than "legacy" mode. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4510>
432 lines
12 KiB
C++
432 lines
12 KiB
C++
/* GStreamer
|
|
* Copyright (C) 2023 Seungha Yang <seungha@centricular.com>
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include "gstcudaipcclient_unix.h"
|
|
#include <string>
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
#include <chrono>
|
|
|
|
#include <gio/gunixsocketaddress.h>
|
|
#include <gio/gunixconnection.h>
|
|
|
|
GST_DEBUG_CATEGORY_EXTERN (cuda_ipc_client_debug);
|
|
#define GST_CAT_DEFAULT cuda_ipc_client_debug
|
|
|
|
/* *INDENT-OFF* */
|
|
struct GstCudaIpcClientConnUnix : public GstCudaIpcClientConn
|
|
{
|
|
GstCudaIpcClientConnUnix (GSocketConnection * socket_conn,
|
|
GCancellable * cancel)
|
|
{
|
|
conn = socket_conn;
|
|
cancellable = (GCancellable *) g_object_ref (cancel);
|
|
istream = g_io_stream_get_input_stream (G_IO_STREAM (conn));
|
|
ostream = g_io_stream_get_output_stream (G_IO_STREAM (conn));
|
|
}
|
|
|
|
~GstCudaIpcClientConnUnix ()
|
|
{
|
|
g_cancellable_cancel (cancellable);
|
|
g_object_unref (conn);
|
|
g_object_unref (cancellable);
|
|
}
|
|
|
|
GSocketConnection *conn;
|
|
GInputStream *istream;
|
|
GOutputStream *ostream;
|
|
GCancellable *cancellable;
|
|
};
|
|
|
|
struct GstCudaIpcClientUnixPrivate
|
|
{
|
|
GstCudaIpcClientUnixPrivate ()
|
|
{
|
|
main_context = g_main_context_new ();
|
|
main_loop = g_main_loop_new (main_context, FALSE);
|
|
cancellable = g_cancellable_new ();
|
|
}
|
|
|
|
~GstCudaIpcClientUnixPrivate ()
|
|
{
|
|
g_main_loop_unref (main_loop);
|
|
g_main_context_unref (main_context);
|
|
}
|
|
|
|
std::string address;
|
|
GstClockTime timeout;
|
|
std::mutex lock;
|
|
std::condition_variable cond;
|
|
|
|
GMainLoop *main_loop;
|
|
GMainContext *main_context;
|
|
GCancellable *cancellable;
|
|
bool flushing = false;
|
|
};
|
|
/* *INDENT-ON* */
|
|
|
|
struct _GstCudaIpcClientUnix
|
|
{
|
|
GstCudaIpcClient parent;
|
|
|
|
GstCudaIpcClientUnixPrivate *priv;
|
|
};
|
|
|
|
static void gst_cuda_ipc_client_unix_finalize (GObject * object);
|
|
static bool gst_cuda_ipc_client_unix_send_msg (GstCudaIpcClient * client,
|
|
GstCudaIpcClientConn * conn);
|
|
static bool gst_cuda_ipc_client_unix_wait_msg (GstCudaIpcClient * client,
|
|
GstCudaIpcClientConn * conn);
|
|
static void gst_cuda_ipc_client_unix_terminate (GstCudaIpcClient * client);
|
|
static void gst_cuda_ipc_client_unix_invoke (GstCudaIpcClient * client);
|
|
static void gst_cuda_ipc_client_unix_set_flushing (GstCudaIpcClient * client,
|
|
bool flushing);
|
|
static void gst_cuda_ipc_client_unix_loop (GstCudaIpcClient * client);
|
|
|
|
#define gst_cuda_ipc_client_unix_parent_class parent_class
|
|
G_DEFINE_TYPE (GstCudaIpcClientUnix,
|
|
gst_cuda_ipc_client_unix, GST_TYPE_CUDA_IPC_CLIENT);
|
|
|
|
static void
|
|
gst_cuda_ipc_client_unix_class_init (GstCudaIpcClientUnixClass * klass)
|
|
{
|
|
GObjectClass *object_class = G_OBJECT_CLASS (klass);
|
|
GstCudaIpcClientClass *client_class = GST_CUDA_IPC_CLIENT_CLASS (klass);
|
|
|
|
object_class->finalize = gst_cuda_ipc_client_unix_finalize;
|
|
|
|
client_class->send_msg =
|
|
GST_DEBUG_FUNCPTR (gst_cuda_ipc_client_unix_send_msg);
|
|
client_class->wait_msg =
|
|
GST_DEBUG_FUNCPTR (gst_cuda_ipc_client_unix_wait_msg);
|
|
client_class->terminate =
|
|
GST_DEBUG_FUNCPTR (gst_cuda_ipc_client_unix_terminate);
|
|
client_class->invoke = GST_DEBUG_FUNCPTR (gst_cuda_ipc_client_unix_invoke);
|
|
client_class->set_flushing =
|
|
GST_DEBUG_FUNCPTR (gst_cuda_ipc_client_unix_set_flushing);
|
|
client_class->loop = GST_DEBUG_FUNCPTR (gst_cuda_ipc_client_unix_loop);
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_client_unix_init (GstCudaIpcClientUnix * self)
|
|
{
|
|
self->priv = new GstCudaIpcClientUnixPrivate ();
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_client_unix_finalize (GObject * object)
|
|
{
|
|
GstCudaIpcClientUnix *self = GST_CUDA_IPC_CLIENT_UNIX (object);
|
|
|
|
GST_DEBUG_OBJECT (self, "finalize");
|
|
|
|
delete self->priv;
|
|
|
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_client_unix_send_msg_finish (GObject * source,
|
|
GAsyncResult * result, GstCudaIpcClientConnUnix * conn)
|
|
{
|
|
GstCudaIpcClient *client = conn->client;
|
|
gsize size;
|
|
GError *err = nullptr;
|
|
bool ret = true;
|
|
|
|
if (!g_output_stream_write_all_finish (conn->ostream, result, &size, &err)) {
|
|
GST_WARNING_OBJECT (client, "Write failed with %s", err->message);
|
|
g_clear_error (&err);
|
|
ret = false;
|
|
}
|
|
|
|
gst_cuda_ipc_client_send_msg_finish (client, ret);
|
|
}
|
|
|
|
static bool
|
|
gst_cuda_ipc_client_unix_send_msg (GstCudaIpcClient * client,
|
|
GstCudaIpcClientConn * conn)
|
|
{
|
|
GstCudaIpcClientConnUnix *unix_conn =
|
|
static_cast < GstCudaIpcClientConnUnix * >(conn);
|
|
|
|
g_output_stream_write_all_async (unix_conn->ostream, &conn->client_msg[0],
|
|
conn->client_msg.size (), G_PRIORITY_DEFAULT, unix_conn->cancellable,
|
|
(GAsyncReadyCallback) gst_cuda_ipc_client_unix_send_msg_finish,
|
|
unix_conn);
|
|
|
|
return true;
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_client_unix_finish_have_mmap_data (GstCudaIpcClient * client,
|
|
GstCudaIpcClientConnUnix * conn)
|
|
{
|
|
GstClockTime pts;
|
|
GstCudaIpcMemLayout layout;
|
|
GstCudaSharableHandle server_handle = 0;
|
|
GstCudaSharableHandle client_handle = 0;
|
|
GstCaps *caps = nullptr;
|
|
GError *err = nullptr;
|
|
|
|
if (!gst_cuda_ipc_pkt_parse_have_mmap_data (conn->server_msg, pts,
|
|
layout, &server_handle, &caps)) {
|
|
GST_ERROR_OBJECT (client, "Couldn't parse MMAP-DATA");
|
|
goto error;
|
|
}
|
|
|
|
client_handle =
|
|
g_unix_connection_receive_fd (G_UNIX_CONNECTION (conn->conn),
|
|
conn->cancellable, &err);
|
|
if (err) {
|
|
GST_ERROR_OBJECT (client, "Couldn't get fd, %s", err->message);
|
|
goto error;
|
|
}
|
|
|
|
gst_cuda_ipc_client_have_mmap_data (client, pts, layout, caps,
|
|
server_handle, client_handle);
|
|
return;
|
|
|
|
error:
|
|
gst_cuda_ipc_client_wait_msg_finish (client, false);
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_client_unix_payload_finish (GObject * source,
|
|
GAsyncResult * result, GstCudaIpcClientConnUnix * conn)
|
|
{
|
|
GstCudaIpcClient *client = conn->client;
|
|
gsize size;
|
|
GError *err = nullptr;
|
|
bool ret = true;
|
|
GstCudaIpcPacketHeader header;
|
|
|
|
if (!g_input_stream_read_all_finish (conn->istream, result, &size, &err)) {
|
|
GST_WARNING_OBJECT (client, "Read failed with %s", err->message);
|
|
g_clear_error (&err);
|
|
ret = false;
|
|
} else if (!gst_cuda_ipc_pkt_identify (conn->server_msg, header)) {
|
|
GST_ERROR_OBJECT (client, "Broken header");
|
|
ret = false;
|
|
} else if (header.type == GstCudaIpcPktType::HAVE_MMAP_DATA) {
|
|
gst_cuda_ipc_client_unix_finish_have_mmap_data (client, conn);
|
|
return;
|
|
}
|
|
|
|
gst_cuda_ipc_client_wait_msg_finish (client, ret);
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_client_unix_wait_finish (GObject * source,
|
|
GAsyncResult * result, GstCudaIpcClientConnUnix * conn)
|
|
{
|
|
GstCudaIpcClient *client = conn->client;
|
|
gsize size;
|
|
GError *err = nullptr;
|
|
GstCudaIpcPacketHeader header;
|
|
|
|
if (!g_input_stream_read_all_finish (conn->istream, result, &size, &err)) {
|
|
GST_WARNING_OBJECT (client, "Read failed with %s", err->message);
|
|
g_clear_error (&err);
|
|
gst_cuda_ipc_client_wait_msg_finish (client, false);
|
|
return;
|
|
}
|
|
|
|
if (!gst_cuda_ipc_pkt_identify (conn->server_msg, header)) {
|
|
GST_ERROR_OBJECT (client, "Broken header");
|
|
gst_cuda_ipc_client_wait_msg_finish (client, false);
|
|
return;
|
|
}
|
|
|
|
if (header.payload_size == 0) {
|
|
gst_cuda_ipc_client_wait_msg_finish (client, true);
|
|
return;
|
|
}
|
|
|
|
GST_LOG_OBJECT (client, "Reading payload");
|
|
|
|
g_input_stream_read_all_async (conn->istream,
|
|
&conn->server_msg[0] + GST_CUDA_IPC_PKT_HEADER_SIZE, header.payload_size,
|
|
G_PRIORITY_DEFAULT, conn->cancellable,
|
|
(GAsyncReadyCallback) gst_cuda_ipc_client_unix_payload_finish, conn);
|
|
}
|
|
|
|
static bool
|
|
gst_cuda_ipc_client_unix_wait_msg (GstCudaIpcClient * client,
|
|
GstCudaIpcClientConn * conn)
|
|
{
|
|
GstCudaIpcClientConnUnix *unix_conn =
|
|
static_cast < GstCudaIpcClientConnUnix * >(conn);
|
|
|
|
g_input_stream_read_all_async (unix_conn->istream,
|
|
&conn->server_msg[0], GST_CUDA_IPC_PKT_HEADER_SIZE,
|
|
G_PRIORITY_DEFAULT, unix_conn->cancellable,
|
|
(GAsyncReadyCallback) gst_cuda_ipc_client_unix_wait_finish, unix_conn);
|
|
|
|
return true;
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_client_unix_terminate (GstCudaIpcClient * client)
|
|
{
|
|
GstCudaIpcClientUnix *self = GST_CUDA_IPC_CLIENT_UNIX (client);
|
|
GstCudaIpcClientUnixPrivate *priv = self->priv;
|
|
|
|
g_main_loop_quit (priv->main_loop);
|
|
}
|
|
|
|
static gboolean
|
|
gst_cuda_ipc_client_unix_invoke_func (GstCudaIpcClient * client)
|
|
{
|
|
gst_cuda_ipc_client_on_idle (client);
|
|
|
|
return G_SOURCE_REMOVE;
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_client_unix_invoke (GstCudaIpcClient * client)
|
|
{
|
|
GstCudaIpcClientUnix *self = GST_CUDA_IPC_CLIENT_UNIX (client);
|
|
GstCudaIpcClientUnixPrivate *priv = self->priv;
|
|
|
|
g_main_context_invoke (priv->main_context,
|
|
(GSourceFunc) gst_cuda_ipc_client_unix_invoke_func, client);
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_client_unix_set_flushing (GstCudaIpcClient * client, bool flushing)
|
|
{
|
|
GstCudaIpcClientUnix *self = GST_CUDA_IPC_CLIENT_UNIX (client);
|
|
GstCudaIpcClientUnixPrivate *priv = self->priv;
|
|
|
|
std::lock_guard < std::mutex > lk (priv->lock);
|
|
priv->flushing = flushing;
|
|
priv->cond.notify_all ();
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_client_unix_loop (GstCudaIpcClient * client)
|
|
{
|
|
GstCudaIpcClientUnix *self = GST_CUDA_IPC_CLIENT_UNIX (client);
|
|
GstCudaIpcClientUnixPrivate *priv = self->priv;
|
|
GSocketConnection *socket_conn;
|
|
GSocketClient *socket_client;
|
|
GSocketAddress *addr;
|
|
GError *err = nullptr;
|
|
GstClockTime start_time = gst_util_get_timestamp ();
|
|
|
|
g_main_context_push_thread_default (priv->main_context);
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
|
|
socket_client = g_socket_client_new ();
|
|
addr = g_unix_socket_address_new (priv->address.c_str ());
|
|
|
|
do {
|
|
GstClockTime diff;
|
|
|
|
if (priv->flushing) {
|
|
GST_DEBUG_OBJECT (self, "We are flushing");
|
|
gst_cuda_ipc_client_abort (client);
|
|
return;
|
|
}
|
|
|
|
socket_conn = g_socket_client_connect (socket_client,
|
|
G_SOCKET_CONNECTABLE (addr), priv->cancellable, &err);
|
|
if (socket_conn)
|
|
break;
|
|
|
|
if (err->code == G_IO_ERROR_CANCELLED) {
|
|
GST_DEBUG_OBJECT (self, "Operation cancelled");
|
|
g_clear_error (&err);
|
|
break;
|
|
} else {
|
|
GST_DEBUG_OBJECT (self, "Connection failed with error %s", err->message);
|
|
g_clear_error (&err);
|
|
}
|
|
|
|
if (priv->timeout > 0) {
|
|
diff = gst_util_get_timestamp () - start_time;
|
|
if (diff > priv->timeout) {
|
|
GST_WARNING_OBJECT (self, "Timeout");
|
|
break;
|
|
}
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (self, "Sleep for next retry");
|
|
priv->cond.wait_for (lk, std::chrono::milliseconds (100));
|
|
} while (true);
|
|
lk.unlock ();
|
|
|
|
g_object_unref (socket_client);
|
|
g_object_unref (addr);
|
|
|
|
if (socket_conn) {
|
|
GST_DEBUG_OBJECT (self, "Connection established");
|
|
|
|
auto conn = std::make_shared < GstCudaIpcClientConnUnix > (socket_conn,
|
|
priv->cancellable);
|
|
gst_cuda_ipc_client_new_connection (client, conn);
|
|
} else {
|
|
GST_WARNING_OBJECT (self, "Connection failed");
|
|
gst_cuda_ipc_client_abort (client);
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (self, "Starting loop");
|
|
|
|
g_main_loop_run (priv->main_loop);
|
|
|
|
GST_DEBUG_OBJECT (self, "Exit loop");
|
|
|
|
g_cancellable_cancel (priv->cancellable);
|
|
|
|
g_main_context_pop_thread_default (priv->main_context);
|
|
}
|
|
|
|
GstCudaIpcClient *
|
|
gst_cuda_ipc_client_new (const gchar * address, GstCudaContext * context,
|
|
GstCudaStream * stream, GstCudaIpcIOMode io_mode, guint timeout,
|
|
guint buffer_size)
|
|
{
|
|
GstCudaIpcClient *client;
|
|
GstCudaIpcClientUnix *self;
|
|
|
|
g_return_val_if_fail (address, nullptr);
|
|
g_return_val_if_fail (GST_IS_CUDA_CONTEXT (context), nullptr);
|
|
|
|
self = (GstCudaIpcClientUnix *)
|
|
g_object_new (GST_TYPE_CUDA_IPC_CLIENT_UNIX, nullptr);
|
|
gst_object_ref_sink (self);
|
|
|
|
self->priv->address = address;
|
|
self->priv->timeout = timeout * GST_SECOND;
|
|
|
|
client = GST_CUDA_IPC_CLIENT (self);
|
|
client->context = (GstCudaContext *) gst_object_ref (context);
|
|
if (stream)
|
|
client->stream = gst_cuda_stream_ref (stream);
|
|
client->io_mode = io_mode;
|
|
client->buffer_size = buffer_size;
|
|
|
|
return client;
|
|
}
|