gstreamer/subprojects/gst-plugins-bad/sys/nvcodec/gstcudaipcserver.cpp
2024-01-30 14:30:41 +00:00

706 lines
20 KiB
C++

/* GStreamer
* Copyright (C) 2023 Seungha Yang <seungha@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstcudaipcserver.h"
#include <unordered_map>
#include <mutex>
#include <atomic>
#include <gst/cuda/gstcuda-private.h>
GST_DEBUG_CATEGORY (cuda_ipc_server_debug);
#define GST_CAT_DEFAULT cuda_ipc_server_debug
GType
gst_cuda_ipc_mode_get_type (void)
{
static GType type = 0;
static const GEnumValue ipc_modes[] = {
{GST_CUDA_IPC_LEGACY, "Legacy IPC mode", "legacy"},
{GST_CUDA_IPC_MMAP, "Memory Map", "mmap"},
{0, nullptr, nullptr}
};
GST_CUDA_CALL_ONCE_BEGIN {
type = g_enum_register_static ("GstCudaIpcMode", ipc_modes);
} GST_CUDA_CALL_ONCE_END;
return type;
}
struct GstCudaIpcServerPrivate
{
GstCudaIpcServerPrivate ()
{
shutdown = false;
aborted = false;
}
std::mutex lock;
guint64 seq_num = 0;
guint next_conn_id = 0;
std::unordered_map < guint,
std::shared_ptr < GstCudaIpcServerConn >> conn_map;
GThread *loop_thread = nullptr;
std::atomic < bool >shutdown;
std::atomic < bool >aborted;
std::shared_ptr < GstCudaIpcServerData > data;
};
static void gst_cuda_ipc_server_dispose (GObject * object);
static void gst_cuda_ipc_server_finalize (GObject * object);
#define gst_cuda_ipc_server_parent_class parent_class
G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstCudaIpcServer,
gst_cuda_ipc_server, GST_TYPE_OBJECT,
GST_DEBUG_CATEGORY_INIT (cuda_ipc_server_debug, "cudaipcserver",
0, "cudaipcserver"));
static void
gst_cuda_ipc_server_class_init (GstCudaIpcServerClass * klass)
{
GObjectClass *object_class = G_OBJECT_CLASS (klass);
object_class->finalize = gst_cuda_ipc_server_finalize;
object_class->dispose = gst_cuda_ipc_server_dispose;
}
static void
gst_cuda_ipc_server_init (GstCudaIpcServer * self)
{
self->priv = new GstCudaIpcServerPrivate ();
}
static void
gst_cuda_ipc_server_dispose (GObject * object)
{
GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object);
GstCudaIpcServerPrivate *priv = self->priv;
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
GST_DEBUG_OBJECT (self, "dispose");
g_assert (klass->terminate);
klass->terminate (self);
g_clear_pointer (&priv->loop_thread, g_thread_join);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_cuda_ipc_server_finalize (GObject * object)
{
GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object);
GST_DEBUG_OBJECT (self, "finalize");
gst_clear_object (&self->context);
delete self->priv;
G_OBJECT_CLASS (parent_class)->finalize (object);
}
GstFlowReturn
gst_cuda_ipc_server_send_data (GstCudaIpcServer * server, GstSample * sample,
const GstVideoInfo & info, const CUipcMemHandle & handle, GstClockTime pts,
GByteArray * meta)
{
GstCudaIpcServerPrivate *priv;
GstCudaIpcServerClass *klass;
g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
if (server->ipc_mode != GST_CUDA_IPC_LEGACY) {
GST_ERROR_OBJECT (server, "Invalid call");
return GST_FLOW_ERROR;
}
priv = server->priv;
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
GST_LOG_OBJECT (server, "Sending data");
std::unique_lock < std::mutex > lk (priv->lock);
if (priv->aborted) {
GST_DEBUG_OBJECT (server, "Was aborted");
return GST_FLOW_ERROR;
}
auto data = std::make_shared < GstCudaIpcServerData > ();
data->sample = gst_sample_ref (sample);
data->info = info;
data->handle = handle;
data->pts = pts;
data->seq_num = priv->seq_num;
if (meta && meta->len) {
data->meta.resize (meta->len);
memcpy (data->meta.data (), meta->data, meta->len);
}
priv->seq_num++;
priv->data = data;
lk.unlock ();
klass->invoke (server);
return GST_FLOW_OK;
}
GstFlowReturn
gst_cuda_ipc_server_send_mmap_data (GstCudaIpcServer * server,
GstSample * sample, const GstVideoInfo & info, GstCudaSharableHandle handle,
GstClockTime pts, GByteArray * meta)
{
GstCudaIpcServerPrivate *priv;
GstCudaIpcServerClass *klass;
g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
if (server->ipc_mode != GST_CUDA_IPC_MMAP) {
GST_ERROR_OBJECT (server, "Invalid call");
return GST_FLOW_ERROR;
}
priv = server->priv;
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
GST_LOG_OBJECT (server, "Sending data");
std::unique_lock < std::mutex > lk (priv->lock);
if (priv->aborted) {
GST_DEBUG_OBJECT (server, "Was aborted");
return GST_FLOW_ERROR;
}
auto data = std::make_shared < GstCudaIpcServerData > ();
data->sample = gst_sample_ref (sample);
data->info = info;
data->os_handle = handle;
data->pts = pts;
data->seq_num = priv->seq_num;
if (meta && meta->len) {
data->meta.resize (meta->len);
memcpy (data->meta.data (), meta->data, meta->len);
}
priv->seq_num++;
priv->data = data;
lk.unlock ();
klass->invoke (server);
return GST_FLOW_OK;
}
static gpointer
gst_cuda_ipc_server_loop_thread_func (GstCudaIpcServer * self)
{
GstCudaIpcServerPrivate *priv = self->priv;
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
g_assert (klass->loop);
GST_DEBUG_OBJECT (self, "Start loop thread");
klass->loop (self);
priv->conn_map.clear ();
GST_DEBUG_OBJECT (self, "Exit loop thread");
return nullptr;
}
void
gst_cuda_ipc_server_run (GstCudaIpcServer * server)
{
GstCudaIpcServerPrivate *priv;
g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server));
priv = server->priv;
GST_DEBUG_OBJECT (server, "Running");
std::unique_lock < std::mutex > lk (priv->lock);
if (priv->loop_thread) {
GST_DEBUG_OBJECT (server, "Already running");
return;
}
GST_DEBUG_OBJECT (server, "Spawning thread");
priv->loop_thread = g_thread_new ("cuda-ipc-server",
(GThreadFunc) gst_cuda_ipc_server_loop_thread_func, server);
}
void
gst_cuda_ipc_server_stop (GstCudaIpcServer * server)
{
GstCudaIpcServerPrivate *priv;
GstCudaIpcServerClass *klass;
g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server));
priv = server->priv;
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
GST_DEBUG_OBJECT (server, "Stopping");
priv->shutdown = true;
klass->invoke (server);
g_clear_pointer (&priv->loop_thread, g_thread_join);
GST_DEBUG_OBJECT (server, "Stopped");
}
static void
gst_cuda_ipc_server_close_connection (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
GstCudaIpcServerPrivate *priv = self->priv;
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
GST_DEBUG_OBJECT (self, "Closing conn-id %u", conn->id);
priv->conn_map.erase (conn->id);
if (priv->shutdown && priv->conn_map.empty ()) {
GST_DEBUG_OBJECT (self, "All connection were closed");
klass->terminate (self);
}
}
static void
gst_cuda_ipc_server_send_msg (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
if (!klass->send_msg (self, conn)) {
GST_WARNING_OBJECT (self, "Send msg failed");
gst_cuda_ipc_server_close_connection (self, conn);
}
}
static void
gst_cuda_ipc_server_config_data (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
GstCaps *caps = gst_sample_get_caps (conn->data->sample);
gst_caps_replace (&conn->caps, caps);
gst_cuda_ipc_pkt_build_config (conn->server_msg, self->pid,
self->ipc_mode == GST_CUDA_IPC_MMAP, conn->caps);
conn->type = GstCudaIpcPktType::CONFIG;
GST_LOG_OBJECT (self, "Sending CONFIG, conn-id %u", conn->id);
gst_cuda_ipc_server_send_msg (self, conn);
}
void
gst_cuda_ipc_server_on_incoming_connection (GstCudaIpcServer * server,
std::shared_ptr < GstCudaIpcServerConn > conn)
{
GstCudaIpcServerPrivate *priv = server->priv;
priv->lock.lock ();
conn->server = server;
conn->id = priv->next_conn_id;
conn->context = (GstCudaContext *) gst_object_ref (server->context);
conn->data = priv->data;
priv->next_conn_id++;
priv->lock.unlock ();
/* *INDENT-OFF* */
priv->conn_map.insert ({conn->id, conn});
/* *INDENT-ON* */
if (conn->data) {
conn->configured = true;
gst_cuda_ipc_server_config_data (server, conn.get ());
} else {
GST_DEBUG_OBJECT (server, "Have no config data yet, waiting for data");
}
}
static void
gst_cuda_ipc_server_have_data (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
GstCaps *caps;
GstBuffer *buffer;
GstCudaMemory *cmem;
if (!conn->data) {
GST_ERROR_OBJECT (self, "Have no data to send, conn-id: %u", conn->id);
gst_cuda_ipc_server_close_connection (self, conn);
return;
}
conn->pending_have_data = false;
conn->seq_num = conn->data->seq_num + 1;
caps = gst_sample_get_caps (conn->data->sample);
if (!conn->caps || !gst_caps_is_equal (conn->caps, caps)) {
GST_DEBUG_OBJECT (self, "Sending caps %" GST_PTR_FORMAT " to conn-id %u",
caps, conn->id);
gst_caps_replace (&conn->caps, caps);
} else {
caps = nullptr;
}
buffer = gst_sample_get_buffer (conn->data->sample);
cmem = (GstCudaMemory *) gst_buffer_peek_memory (buffer, 0);
if (self->ipc_mode == GST_CUDA_IPC_LEGACY) {
auto handle_dump = gst_cuda_ipc_mem_handle_to_string (conn->data->handle);
GST_LOG_OBJECT (self, "Sending HAVE-DATA with handle %s, conn-id: %u",
handle_dump.c_str (), conn->id);
if (!gst_cuda_ipc_pkt_build_have_data (conn->server_msg, conn->data->pts,
conn->data->info, conn->data->handle, caps, conn->data->meta)) {
GST_ERROR_OBJECT (self, "Couldn't build HAVE-DATA pkt, conn-id: %u",
conn->id);
gst_cuda_ipc_server_close_connection (self, conn);
return;
}
conn->type = GstCudaIpcPktType::HAVE_DATA;
} else {
guint max_size = cmem->mem.maxsize;
GST_LOG_OBJECT (self, "Sending HAVE-MMAP-DATA with handle %"
GST_CUDA_OS_HANDLE_FORMAT ", conn-id: %u", conn->data->os_handle,
conn->id);
if (!gst_cuda_ipc_pkt_build_have_mmap_data (conn->server_msg,
conn->data->pts, conn->data->info, max_size, conn->data->os_handle,
caps, conn->data->meta)) {
GST_ERROR_OBJECT (self, "Couldn't build HAVE-MMAP-DATA pkt, conn-id: %u",
conn->id);
gst_cuda_ipc_server_close_connection (self, conn);
return;
}
conn->type = GstCudaIpcPktType::HAVE_MMAP_DATA;
if (klass->send_mmap_msg) {
if (!klass->send_mmap_msg (self, conn, conn->data->os_handle)) {
GST_WARNING_OBJECT (self, "Send msg failed");
gst_cuda_ipc_server_close_connection (self, conn);
}
return;
}
}
gst_cuda_ipc_server_send_msg (self, conn);
}
static void
gst_cuda_ipc_server_wait_msg (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
if (!klass->wait_msg (self, conn)) {
GST_WARNING_OBJECT (self, "Wait msg failed, conn-id: %u", conn->id);
gst_cuda_ipc_server_close_connection (self, conn);
}
}
static bool
gst_cuda_ipc_server_on_release_data (GstCudaIpcServer * self,
GstCudaIpcServerConn * conn)
{
bool found = false;
if (self->ipc_mode == GST_CUDA_IPC_LEGACY) {
CUipcMemHandle handle;
if (!gst_cuda_ipc_pkt_parse_release_data (conn->client_msg, handle)) {
GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-DATA, conn-id: %u",
conn->id);
return false;
}
auto handle_dump = gst_cuda_ipc_mem_handle_to_string (handle);
GST_LOG_OBJECT (self, "RELEASE-DATA %s, conn-id: %u",
handle_dump.c_str (), conn->id);
for (auto it = conn->peer_handles.begin (); it != conn->peer_handles.end ();
it++) {
CUipcMemHandle & tmp = (*it)->handle;
if (gst_cuda_ipc_handle_is_equal (tmp, handle)) {
found = true;
conn->peer_handles.erase (it);
break;
}
}
if (!found) {
GST_WARNING_OBJECT (self,
"Unexpected memory handle to remove %s, conn-id: %u",
handle_dump.c_str (), conn->id);
return false;
}
} else {
GstCudaSharableHandle handle;
if (!gst_cuda_ipc_pkt_parse_release_mmap_data (conn->client_msg, &handle)) {
GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-MMAP-DATA, conn-id: %u",
conn->id);
return false;
}
GST_LOG_OBJECT (self, "RELEASE-MMAP-DATA %" GST_CUDA_OS_HANDLE_FORMAT
", conn-id %u", handle, conn->id);
for (auto it = conn->peer_handles.begin (); it != conn->peer_handles.end ();
it++) {
if ((*it)->os_handle == handle) {
found = true;
conn->peer_handles.erase (it);
break;
}
}
if (!found) {
GST_WARNING_OBJECT (self,
"Unexpected memory handle to remove %" GST_CUDA_OS_HANDLE_FORMAT
", conn-id: %u", handle, conn->id);
return false;
}
}
GST_LOG_OBJECT (self, "Client is holding %" G_GSIZE_FORMAT " handles",
conn->peer_handles.size ());
return true;
}
void
gst_cuda_ipc_server_wait_msg_finish (GstCudaIpcServer * server,
GstCudaIpcServerConn * conn, bool result)
{
GstCudaIpcPacketHeader header;
if (!result) {
GST_WARNING_OBJECT (server, "Wait msg failed, conn->id: %u", conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
return;
}
if (!gst_cuda_ipc_pkt_identify (conn->client_msg, header)) {
GST_ERROR_OBJECT (server, "Broken header, conn-id: %u", conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
return;
}
switch (header.type) {
case GstCudaIpcPktType::NEED_DATA:
GST_LOG_OBJECT (server, "NEED-DATA, conn-id: %u", conn->id);
if (!conn->data) {
GST_LOG_OBJECT (server, "Wait for available data, conn-id: %u",
conn->id);
conn->pending_have_data = true;
gst_cuda_ipc_server_on_idle (server);
return;
}
gst_cuda_ipc_server_have_data (server, conn);
break;
case GstCudaIpcPktType::READ_DONE:
GST_LOG_OBJECT (server, "READ-DONE, conn-id: %u", conn->id);
if (!conn->data) {
GST_ERROR_OBJECT (server, "Unexpected READ-DATA, conn-id: %u",
conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
return;
}
conn->peer_handles.push_back (conn->data);
conn->data = nullptr;
gst_cuda_ipc_server_wait_msg (server, conn);
break;
case GstCudaIpcPktType::RELEASE_DATA:
case GstCudaIpcPktType::RELEASE_MMAP_DATA:
GST_LOG_OBJECT (server, "RELEASE-DATA, conn-id: %u", conn->id);
if (!gst_cuda_ipc_server_on_release_data (server, conn))
gst_cuda_ipc_server_close_connection (server, conn);
else
gst_cuda_ipc_server_wait_msg (server, conn);
break;
case GstCudaIpcPktType::FIN:
GST_DEBUG_OBJECT (server, "FIN, conn-id %u", conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
break;
default:
GST_ERROR_OBJECT (server, "Unexpected packet, conn-id: %u", conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
break;
}
}
void
gst_cuda_ipc_server_send_msg_finish (GstCudaIpcServer * server,
GstCudaIpcServerConn * conn, bool result)
{
if (!result) {
GST_WARNING_OBJECT (server, "Send msg failed, conn-id %u", conn->id);
gst_cuda_ipc_server_close_connection (server, conn);
return;
}
switch (conn->type) {
case GstCudaIpcPktType::CONFIG:
GST_DEBUG_OBJECT (server, "Sent CONFIG-DATA, conn-id %u", conn->id);
gst_cuda_ipc_server_wait_msg (server, conn);
break;
case GstCudaIpcPktType::HAVE_DATA:
GST_LOG_OBJECT (server, "Sent HAVE-DATA, conn-id %u", conn->id);
gst_cuda_ipc_server_wait_msg (server, conn);
break;
case GstCudaIpcPktType::HAVE_MMAP_DATA:
GST_LOG_OBJECT (server, "Sent HAVE-MMAP-DATA, conn-id %u", conn->id);
gst_cuda_ipc_server_wait_msg (server, conn);
break;
case GstCudaIpcPktType::EOS:
GST_DEBUG_OBJECT (server, "Sent EOS, conn-id %u", conn->id);
gst_cuda_ipc_server_wait_msg (server, conn);
break;
default:
GST_ERROR_OBJECT (server, "Unexpected msg type %d", (gint) conn->type);
gst_cuda_ipc_server_close_connection (server, conn);
break;
}
}
static void
gst_cuda_ipc_server_eos (GstCudaIpcServer * self, GstCudaIpcServerConn * conn)
{
gst_cuda_ipc_pkt_build_eos (conn->server_msg);
conn->eos = true;
conn->type = GstCudaIpcPktType::EOS;
gst_cuda_ipc_server_send_msg (self, conn);
}
void
gst_cuda_ipc_server_on_idle (GstCudaIpcServer * server)
{
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
GstCudaIpcServerPrivate *priv = server->priv;
GST_LOG_OBJECT (server, "idle");
if (priv->shutdown) {
GST_DEBUG_OBJECT (server, "We are stopping");
if (priv->conn_map.empty ()) {
GST_DEBUG_OBJECT (server, "All connections were closed");
klass->terminate (server);
return;
}
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_eos;
/* *INDENT-OFF* */
for (auto it : priv->conn_map) {
auto conn = it.second;
if (conn->eos || !conn->pending_have_data)
continue;
to_send_eos.push_back (conn);
}
for (auto it : to_send_eos) {
GST_DEBUG_OBJECT (server, "Sending EOS to conn-id: %u", it->id);
gst_cuda_ipc_server_eos (server, it.get ());
}
GST_DEBUG_OBJECT (server, "Have %" G_GSIZE_FORMAT " alive connections",
priv->conn_map.size());
size_t num_closed = 0;
for (auto it : priv->conn_map) {
auto conn = it.second;
GST_DEBUG_OBJECT (server, "conn-id %u"
" peer handle size %" G_GSIZE_FORMAT, conn->id,
conn->peer_handles.size ());
/* Cannot erase conn since it's still referenced.
* Manually close connection */
if (conn->peer_handles.empty ()) {
conn->CloseConn ();
num_closed++;
}
}
/* *INDENT-ON* */
if (priv->conn_map.size () == num_closed) {
GST_DEBUG_OBJECT (server, "All connections were closed");
klass->terminate (server);
}
return;
}
if (priv->conn_map.empty ()) {
GST_LOG_OBJECT (server, "Have no connection");
return;
}
std::unique_lock < std::mutex > lk (priv->lock);
if (!priv->data)
return;
/* *INDENT-OFF* */
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_config_data;
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_have_data;
for (auto it : priv->conn_map) {
auto conn = it.second;
if (!conn->configured) {
conn->configured = true;
conn->data = priv->data;
to_config_data.push_back (conn);
} else if (conn->pending_have_data && conn->seq_num <= priv->data->seq_num) {
conn->data = priv->data;
to_send_have_data.push_back (conn);
}
}
lk.unlock ();
for (auto it: to_config_data)
gst_cuda_ipc_server_config_data (server, it.get ());
for (auto it: to_send_have_data)
gst_cuda_ipc_server_have_data (server, it.get ());
/* *INDENT-ON* */
}
void
gst_cuda_ipc_server_abort (GstCudaIpcServer * server)
{
GstCudaIpcServerPrivate *priv = server->priv;
priv->aborted = true;
}