gstreamer/subprojects/gst-plugins-bad/sys/nvcodec/gstcudaipcserver.cpp
Seungha Yang 7b6023d9cf nvcodec: Add support for CUDA IPC
Adding cudaipc{src,sink} element for CUDA IPC support.

Implementation note:
* For the communication between end points, Win32 named-pipe
and unix domain socket will be used on Windows and Linux respectively.

* cudaipcsink behaves as a server, and all GPU resources will be owned by
the server process and exported for other processes, then cudaipcsrc
(client) will import each exported handle.

* User can select IPC mode via "ipc-mode" property of cudaipcsink.
There are two IPC mode, one is "legacy" which uses legacy CUDA IPC
method and the other is "mmap" which uses CUDA virtual memory API
with OS's resource handle sharing method such as DuplicateHandle()
on Windows. The "mmap" mode might be better than "legacy" in terms
of stability since it relies on OS's resource management but
it would consume more GPU memory than "legacy" mode.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4510>
2023-08-14 13:41:01 +00:00

684 lines
19 KiB
C++

/* GStreamer
* Copyright (C) 2023 Seungha Yang <seungha@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstcudaipcserver.h"
#include <unordered_map>
#include <mutex>
#include <atomic>
#include <gst/cuda/gstcuda-private.h>
GST_DEBUG_CATEGORY (cuda_ipc_server_debug);
#define GST_CAT_DEFAULT cuda_ipc_server_debug
GType
gst_cuda_ipc_mode_get_type (void)
{
static GType type = 0;
static const GEnumValue ipc_modes[] = {
{GST_CUDA_IPC_LEGACY, "Legacy IPC mode", "legacy"},
{GST_CUDA_IPC_MMAP, "Memory Map", "mmap"},
{0, nullptr, nullptr}
};
GST_CUDA_CALL_ONCE_BEGIN {
type = g_enum_register_static ("GstCudaIpcMode", ipc_modes);
} GST_CUDA_CALL_ONCE_END;
return type;
}
struct GstCudaIpcServerPrivate
{
GstCudaIpcServerPrivate ()
{
shutdown = false;
aborted = false;
}
std::mutex lock;
guint64 seq_num = 0;
guint next_conn_id = 0;
std::unordered_map < guint,
std::shared_ptr < GstCudaIpcServerConn >> conn_map;
GThread *loop_thread = nullptr;
std::atomic < bool >shutdown;
std::atomic < bool >aborted;
std::shared_ptr < GstCudaIpcServerData > data;
};
static void gst_cuda_ipc_server_dispose (GObject * object);
static void gst_cuda_ipc_server_finalize (GObject * object);
#define gst_cuda_ipc_server_parent_class parent_class
G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstCudaIpcServer,
gst_cuda_ipc_server, GST_TYPE_OBJECT,
GST_DEBUG_CATEGORY_INIT (cuda_ipc_server_debug, "cudaipcserver",
0, "cudaipcserver"));
static void
gst_cuda_ipc_server_class_init (GstCudaIpcServerClass * klass)
{
GObjectClass *object_class = G_OBJECT_CLASS (klass);
object_class->finalize = gst_cuda_ipc_server_finalize;
object_class->dispose = gst_cuda_ipc_server_dispose;
}
static void
gst_cuda_ipc_server_init (GstCudaIpcServer * self)
{
self->priv = new GstCudaIpcServerPrivate ();
}
static void
gst_cuda_ipc_server_dispose (GObject * object)
{
GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object);
GstCudaIpcServerPrivate *priv = self->priv;
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
GST_DEBUG_OBJECT (self, "dispose");
g_assert (klass->terminate);
klass->terminate (self);
g_clear_pointer (&priv->loop_thread, g_thread_join);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_cuda_ipc_server_finalize (GObject * object)
{
GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object);
GST_DEBUG_OBJECT (self, "finalize");
gst_clear_object (&self->context);
delete self->priv;
G_OBJECT_CLASS (parent_class)->finalize (object);
}
GstFlowReturn
gst_cuda_ipc_server_send_data (GstCudaIpcServer * server, GstSample * sample,
const GstVideoInfo & info, const CUipcMemHandle & handle, GstClockTime pts)
{
GstCudaIpcServerPrivate *priv;
GstCudaIpcServerClass *klass;
g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
if (server->ipc_mode != GST_CUDA_IPC_LEGACY) {
GST_ERROR_OBJECT (server, "Invalid call");
return GST_FLOW_ERROR;
}
priv = server->priv;
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
GST_LOG_OBJECT (server, "Sending data");
std::unique_lock < std::mutex > lk (priv->lock);
if (priv->aborted) {
GST_DEBUG_OBJECT (server, "Was aborted");
return GST_FLOW_ERROR;
}
auto data = std::make_shared < GstCudaIpcServerData > ();
data->sample = gst_sample_ref (sample);
data->info = info;
data->handle = handle;
data->pts = pts;
data->seq_num = priv->seq_num;
priv->seq_num++;
priv->data = data;
lk.unlock ();
klass->invoke (server);
return GST_FLOW_OK;
}
GstFlowReturn
gst_cuda_ipc_server_send_mmap_data (GstCudaIpcServer * server,
GstSample * sample, const GstVideoInfo & info, GstCudaSharableHandle handle,
GstClockTime pts)
{
GstCudaIpcServerPrivate *priv;
GstCudaIpcServerClass *klass;
g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
if (server->ipc_mode != GST_CUDA_IPC_MMAP) {
GST_ERROR_OBJECT (server, "Invalid call");
return GST_FLOW_ERROR;
}
priv = server->priv;
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
GST_LOG_OBJECT (server, "Sending data");
std::unique_lock < std::mutex > lk (priv->lock);
if (priv->aborted) {
GST_DEBUG_OBJECT (server, "Was aborted");
return GST_FLOW_ERROR;
}
auto data = std::make_shared < GstCudaIpcServerData > ();
data->sample = gst_sample_ref (sample);
data->info = info;
data->os_handle = handle;
data->pts = pts;
data->seq_num = priv->seq_num;
priv->seq_num++;
priv->data = data;
lk.unlock ();
klass->invoke (server);
return GST_FLOW_OK;
}
static gpointer
gst_cuda_ipc_server_loop_thread_func (GstCudaIpcServer * self)
{
GstCudaIpcServerPrivate *priv = self->priv;
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
g_assert (klass->loop);
GST_DEBUG_OBJECT (self, "Start loop thread");
klass->loop (self);
priv->conn_map.clear ();
GST_DEBUG_OBJECT (self, "Exit loop thread");
return nullptr;
}
void
gst_cuda_ipc_server_run (GstCudaIpcServer * server)
{
GstCudaIpcServerPrivate *priv;
g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server));
priv = server->priv;
GST_DEBUG_OBJECT (server, "Running");
std::unique_lock < std::mutex > lk (priv->lock);
if (priv->loop_thread) {
GST_DEBUG_OBJECT (server, "Already running");
return;
}
GST_DEBUG_OBJECT (server, "Spawning thread");
priv->loop_thread = g_thread_new ("cuda-ipc-server",
(GThreadFunc) gst_cuda_ipc_server_loop_thread_func, server);
}
void
gst_cuda_ipc_server_stop (GstCudaIpcServer * server)
{
GstCudaIpcServerPrivate *priv;
GstCudaIpcServerClass *klass;
g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server));
priv = server->priv;
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
GST_DEBUG_OBJECT (server, "Stopping");
priv->shutdown = true;
klass->invoke (server);
g_clear_pointer (&priv->loop_thread, g_thread_join);
GST_DEBUG_OBJECT (server, "Stopped");
}
static void
gst_cuda_ipc_server_close_connection (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
GstCudaIpcServerPrivate *priv = self->priv;
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
GST_DEBUG_OBJECT (self, "Closing conn-id %u", conn->id);
priv->conn_map.erase (conn->id);
if (priv->shutdown && priv->conn_map.empty ()) {
GST_DEBUG_OBJECT (self, "All connection were closed");
klass->terminate (self);
}
}
static void
gst_cuda_ipc_server_send_msg (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
if (!klass->send_msg (self, conn)) {
GST_WARNING_OBJECT (self, "Send msg failed");
gst_cuda_ipc_server_close_connection (self, conn);
}
}
static void
gst_cuda_ipc_server_config_data (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
GstCaps *caps = gst_sample_get_caps (conn->data->sample);
gst_caps_replace (&conn->caps, caps);
gst_cuda_ipc_pkt_build_config (conn->server_msg, self->pid,
self->ipc_mode == GST_CUDA_IPC_MMAP, conn->caps);
conn->type = GstCudaIpcPktType::CONFIG;
GST_LOG_OBJECT (self, "Sending CONFIG, conn-id %u", conn->id);
gst_cuda_ipc_server_send_msg (self, conn);
}
void
gst_cuda_ipc_server_on_incoming_connection (GstCudaIpcServer * server,
std::shared_ptr < GstCudaIpcServerConn > conn)
{
GstCudaIpcServerPrivate *priv = server->priv;
priv->lock.lock ();
conn->server = server;
conn->id = priv->next_conn_id;
conn->context = (GstCudaContext *) gst_object_ref (server->context);
conn->data = priv->data;
priv->next_conn_id++;
priv->lock.unlock ();
/* *INDENT-OFF* */
priv->conn_map.insert ({conn->id, conn});
/* *INDENT-ON* */
if (conn->data) {
conn->configured = true;
gst_cuda_ipc_server_config_data (server, conn.get ());
} else {
GST_DEBUG_OBJECT (server, "Have no config data yet, waiting for data");
}
}
static void
gst_cuda_ipc_server_have_data (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
GstCaps *caps;
GstBuffer *buffer;
GstCudaMemory *cmem;
if (!conn->data) {
GST_ERROR_OBJECT (self, "Have no data to send, conn-id: %u", conn->id);
gst_cuda_ipc_server_close_connection (self, conn);
return;
}
conn->pending_have_data = false;
conn->seq_num = conn->data->seq_num + 1;
caps = gst_sample_get_caps (conn->data->sample);
if (!conn->caps || !gst_caps_is_equal (conn->caps, caps)) {
GST_DEBUG_OBJECT (self, "Sending caps %" GST_PTR_FORMAT " to conn-id %u",
caps, conn->id);
gst_caps_replace (&conn->caps, caps);
} else {
caps = nullptr;
}
buffer = gst_sample_get_buffer (conn->data->sample);
cmem = (GstCudaMemory *) gst_buffer_peek_memory (buffer, 0);
if (self->ipc_mode == GST_CUDA_IPC_LEGACY) {
auto handle_dump = gst_cuda_ipc_mem_handle_to_string (conn->data->handle);
GST_LOG_OBJECT (self, "Sending HAVE-DATA with handle %s, conn-id: %u",
handle_dump.c_str (), conn->id);
if (!gst_cuda_ipc_pkt_build_have_data (conn->server_msg, conn->data->pts,
conn->data->info, conn->data->handle, caps)) {
GST_ERROR_OBJECT (self, "Couldn't build HAVE-DATA pkt, conn-id: %u",
conn->id);
gst_cuda_ipc_server_close_connection (self, conn);
return;
}
conn->type = GstCudaIpcPktType::HAVE_DATA;
} else {
guint max_size = cmem->mem.maxsize;
GST_LOG_OBJECT (self, "Sending HAVE-MMAP-DATA with handle %"
GST_CUDA_OS_HANDLE_FORMAT ", conn-id: %u", conn->data->os_handle,
conn->id);
if (!gst_cuda_ipc_pkt_build_have_mmap_data (conn->server_msg,
conn->data->pts, conn->data->info, max_size, conn->data->os_handle,
caps)) {
GST_ERROR_OBJECT (self, "Couldn't build HAVE-MMAP-DATA pkt, conn-id: %u",
conn->id);
gst_cuda_ipc_server_close_connection (self, conn);
return;
}
conn->type = GstCudaIpcPktType::HAVE_MMAP_DATA;
if (klass->send_mmap_msg) {
if (!klass->send_mmap_msg (self, conn, conn->data->os_handle)) {
GST_WARNING_OBJECT (self, "Send msg failed");
gst_cuda_ipc_server_close_connection (self, conn);
}
return;
}
}
gst_cuda_ipc_server_send_msg (self, conn);
}
static void
gst_cuda_ipc_server_wait_msg (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
if (!klass->wait_msg (self, conn)) {
GST_WARNING_OBJECT (self, "Wait msg failed, conn-id: %u", conn->id);
gst_cuda_ipc_server_close_connection (self, conn);
}
}
static bool
gst_cuda_ipc_server_on_release_data (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
bool found = false;
if (self->ipc_mode == GST_CUDA_IPC_LEGACY) {
CUipcMemHandle handle;
if (!gst_cuda_ipc_pkt_parse_release_data (conn->client_msg, handle)) {
GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-DATA, conn-id: %u",
conn->id);
return false;
}
auto handle_dump = gst_cuda_ipc_mem_handle_to_string (handle);
GST_LOG_OBJECT (self, "RELEASE-DATA %s, conn-id: %u",
handle_dump.c_str (), conn->id);
for (auto it = conn->peer_handles.begin (); it != conn->peer_handles.end ();
it++) {
CUipcMemHandle & tmp = (*it)->handle;
if (gst_cuda_ipc_handle_is_equal (tmp, handle)) {
found = true;
conn->peer_handles.erase (it);
break;
}
}
if (!found) {
GST_WARNING_OBJECT (self,
"Unexpected memory handle to remove %s, conn-id: %u",
handle_dump.c_str (), conn->id);
return false;
}
} else {
GstCudaSharableHandle handle;
if (!gst_cuda_ipc_pkt_parse_release_mmap_data (conn->client_msg, &handle)) {
GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-MMAP-DATA, conn-id: %u",
conn->id);
return false;
}
GST_LOG_OBJECT (self, "RELEASE-MMAP-DATA %" GST_CUDA_OS_HANDLE_FORMAT
", conn-id %u", handle, conn->id);
for (auto it = conn->peer_handles.begin (); it != conn->peer_handles.end ();
it++) {
if ((*it)->os_handle == handle) {
found = true;
conn->peer_handles.erase (it);
break;
}
}
if (!found) {
GST_WARNING_OBJECT (self,
"Unexpected memory handle to remove %" GST_CUDA_OS_HANDLE_FORMAT
", conn-id: %u", handle, conn->id);
return false;
}
}
GST_LOG_OBJECT (self, "Client is holding %" G_GSIZE_FORMAT " handles",
conn->peer_handles.size ());
return true;
}
void
gst_cuda_ipc_server_wait_msg_finish (GstCudaIpcServer * server,
GstCudaIpcServerConn * conn, bool result)
{
GstCudaIpcPacketHeader header;
if (!result) {
GST_WARNING_OBJECT (server, "Wait msg failed, conn->id: %u", conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
return;
}
if (!gst_cuda_ipc_pkt_identify (conn->client_msg, header)) {
GST_ERROR_OBJECT (server, "Broken header, conn-id: %u", conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
return;
}
switch (header.type) {
case GstCudaIpcPktType::NEED_DATA:
GST_LOG_OBJECT (server, "NEED-DATA, conn-id: %u", conn->id);
if (!conn->data) {
GST_LOG_OBJECT (server, "Wait for available data, conn-id: %u",
conn->id);
conn->pending_have_data = true;
gst_cuda_ipc_server_on_idle (server);
return;
}
gst_cuda_ipc_server_have_data (server, conn);
break;
case GstCudaIpcPktType::READ_DONE:
GST_LOG_OBJECT (server, "READ-DONE, conn-id: %u", conn->id);
if (!conn->data) {
GST_ERROR_OBJECT (server, "Unexpected READ-DATA, conn-id: %u",
conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
return;
}
conn->peer_handles.push_back (conn->data);
conn->data = nullptr;
gst_cuda_ipc_server_wait_msg (server, conn);
break;
case GstCudaIpcPktType::RELEASE_DATA:
case GstCudaIpcPktType::RELEASE_MMAP_DATA:
GST_LOG_OBJECT (server, "RELEASE-DATA, conn-id: %u", conn->id);
if (!gst_cuda_ipc_server_on_release_data (server, conn))
gst_cuda_ipc_server_close_connection (server, conn);
else
gst_cuda_ipc_server_wait_msg (server, conn);
break;
case GstCudaIpcPktType::FIN:
GST_DEBUG_OBJECT (server, "FIN, conn-id %u", conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
break;
default:
GST_ERROR_OBJECT (server, "Unexpected packet, conn-id: %u", conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
break;
}
}
void
gst_cuda_ipc_server_send_msg_finish (GstCudaIpcServer * server,
GstCudaIpcServerConn * conn, bool result)
{
if (!result) {
GST_WARNING_OBJECT (server, "Send msg failed, conn-id %u", conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
return;
}
switch (conn->type) {
case GstCudaIpcPktType::CONFIG:
GST_DEBUG_OBJECT (server, "Sent CONFIG-DATA, conn-id %u", conn->id);
gst_cuda_ipc_server_wait_msg (server, conn);
break;
case GstCudaIpcPktType::HAVE_DATA:
GST_LOG_OBJECT (server, "Sent HAVE-DATA, conn-id %u", conn->id);
gst_cuda_ipc_server_wait_msg (server, conn);
break;
case GstCudaIpcPktType::HAVE_MMAP_DATA:
GST_LOG_OBJECT (server, "Sent HAVE-MMAP-DATA, conn-id %u", conn->id);
gst_cuda_ipc_server_wait_msg (server, conn);
break;
case GstCudaIpcPktType::EOS:
GST_DEBUG_OBJECT (server, "Sent EOS, conn-id %u", conn->id);
gst_cuda_ipc_server_wait_msg (server, conn);
break;
default:
GST_ERROR_OBJECT (server, "Unexpected msg type %d", (gint) conn->type);
gst_cuda_ipc_server_close_connection (server, conn);
break;
}
}
static void
gst_cuda_ipc_server_eos (GstCudaIpcServer * self, GstCudaIpcServerConn * conn)
{
gst_cuda_ipc_pkt_build_eos (conn->server_msg);
conn->eos = true;
conn->type = GstCudaIpcPktType::EOS;
gst_cuda_ipc_server_send_msg (self, conn);
}
void
gst_cuda_ipc_server_on_idle (GstCudaIpcServer * server)
{
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
GstCudaIpcServerPrivate *priv = server->priv;
GST_LOG_OBJECT (server, "idle");
if (priv->shutdown) {
GST_DEBUG_OBJECT (server, "We are stopping");
if (priv->conn_map.empty ()) {
GST_DEBUG_OBJECT (server, "All connections were closed");
klass->terminate (server);
return;
}
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_eos;
/* *INDENT-OFF* */
for (auto it : priv->conn_map) {
auto conn = it.second;
if (conn->eos || !conn->pending_have_data)
continue;
to_send_eos.push_back (conn);
}
for (auto it : to_send_eos) {
GST_DEBUG_OBJECT (server, "Sending EOS to conn-id: %u", it->id);
gst_cuda_ipc_server_eos (server, it.get ());
}
GST_DEBUG_OBJECT (server, "Have %" G_GSIZE_FORMAT " alive connections",
priv->conn_map.size());
for (auto it : priv->conn_map) {
auto conn = it.second;
GST_DEBUG_OBJECT (server, "conn-id %u"
" peer handle size %" G_GSIZE_FORMAT, conn->id,
conn->peer_handles.size ());
}
/* *INDENT-ON* */
return;
}
if (priv->conn_map.empty ()) {
GST_LOG_OBJECT (server, "Have no connection");
return;
}
std::unique_lock < std::mutex > lk (priv->lock);
if (!priv->data)
return;
/* *INDENT-OFF* */
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_config_data;
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_have_data;
for (auto it : priv->conn_map) {
auto conn = it.second;
if (!conn->configured) {
conn->configured = true;
conn->data = priv->data;
to_config_data.push_back (conn);
} else if (conn->pending_have_data && conn->seq_num <= priv->data->seq_num) {
conn->data = priv->data;
to_send_have_data.push_back (conn);
}
}
lk.unlock ();
for (auto it: to_config_data)
gst_cuda_ipc_server_config_data (server, it.get ());
for (auto it: to_send_have_data)
gst_cuda_ipc_server_have_data (server, it.get ());
/* *INDENT-ON* */
}
void
gst_cuda_ipc_server_abort (GstCudaIpcServer * server)
{
GstCudaIpcServerPrivate *priv = server->priv;
priv->aborted = true;
}