mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-19 13:55:41 +00:00
7b6023d9cf
Adding cudaipc{src,sink} element for CUDA IPC support. Implementation note: * For the communication between end points, Win32 named-pipe and unix domain socket will be used on Windows and Linux respectively. * cudaipcsink behaves as a server, and all GPU resources will be owned by the server process and exported for other processes, then cudaipcsrc (client) will import each exported handle. * User can select IPC mode via "ipc-mode" property of cudaipcsink. There are two IPC mode, one is "legacy" which uses legacy CUDA IPC method and the other is "mmap" which uses CUDA virtual memory API with OS's resource handle sharing method such as DuplicateHandle() on Windows. The "mmap" mode might be better than "legacy" in terms of stability since it relies on OS's resource management but it would consume more GPU memory than "legacy" mode. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4510>
683 lines
19 KiB
C++
683 lines
19 KiB
C++
/* GStreamer
|
|
* Copyright (C) 2023 Seungha Yang <seungha@centricular.com>
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include "gstcudaipcserver.h"
|
|
#include <unordered_map>
|
|
#include <mutex>
|
|
#include <atomic>
|
|
#include <gst/cuda/gstcuda-private.h>
|
|
|
|
GST_DEBUG_CATEGORY (cuda_ipc_server_debug);
|
|
#define GST_CAT_DEFAULT cuda_ipc_server_debug
|
|
|
|
GType
|
|
gst_cuda_ipc_mode_get_type (void)
|
|
{
|
|
static GType type = 0;
|
|
static const GEnumValue ipc_modes[] = {
|
|
{GST_CUDA_IPC_LEGACY, "Legacy IPC mode", "legacy"},
|
|
{GST_CUDA_IPC_MMAP, "Memory Map", "mmap"},
|
|
{0, nullptr, nullptr}
|
|
};
|
|
|
|
GST_CUDA_CALL_ONCE_BEGIN {
|
|
type = g_enum_register_static ("GstCudaIpcMode", ipc_modes);
|
|
} GST_CUDA_CALL_ONCE_END;
|
|
|
|
return type;
|
|
}
|
|
|
|
struct GstCudaIpcServerPrivate
|
|
{
|
|
GstCudaIpcServerPrivate ()
|
|
{
|
|
shutdown = false;
|
|
aborted = false;
|
|
}
|
|
|
|
std::mutex lock;
|
|
guint64 seq_num = 0;
|
|
guint next_conn_id = 0;
|
|
std::unordered_map < guint,
|
|
std::shared_ptr < GstCudaIpcServerConn >> conn_map;
|
|
GThread *loop_thread = nullptr;
|
|
std::atomic < bool >shutdown;
|
|
std::atomic < bool >aborted;
|
|
std::shared_ptr < GstCudaIpcServerData > data;
|
|
};
|
|
|
|
static void gst_cuda_ipc_server_dispose (GObject * object);
|
|
static void gst_cuda_ipc_server_finalize (GObject * object);
|
|
|
|
#define gst_cuda_ipc_server_parent_class parent_class
|
|
G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstCudaIpcServer,
|
|
gst_cuda_ipc_server, GST_TYPE_OBJECT,
|
|
GST_DEBUG_CATEGORY_INIT (cuda_ipc_server_debug, "cudaipcserver",
|
|
0, "cudaipcserver"));
|
|
|
|
static void
|
|
gst_cuda_ipc_server_class_init (GstCudaIpcServerClass * klass)
|
|
{
|
|
GObjectClass *object_class = G_OBJECT_CLASS (klass);
|
|
|
|
object_class->finalize = gst_cuda_ipc_server_finalize;
|
|
object_class->dispose = gst_cuda_ipc_server_dispose;
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_init (GstCudaIpcServer * self)
|
|
{
|
|
self->priv = new GstCudaIpcServerPrivate ();
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_dispose (GObject * object)
|
|
{
|
|
GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object);
|
|
GstCudaIpcServerPrivate *priv = self->priv;
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
GST_DEBUG_OBJECT (self, "dispose");
|
|
|
|
g_assert (klass->terminate);
|
|
klass->terminate (self);
|
|
|
|
g_clear_pointer (&priv->loop_thread, g_thread_join);
|
|
|
|
G_OBJECT_CLASS (parent_class)->dispose (object);
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_finalize (GObject * object)
|
|
{
|
|
GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object);
|
|
|
|
GST_DEBUG_OBJECT (self, "finalize");
|
|
|
|
gst_clear_object (&self->context);
|
|
|
|
delete self->priv;
|
|
|
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
|
}
|
|
|
|
GstFlowReturn
|
|
gst_cuda_ipc_server_send_data (GstCudaIpcServer * server, GstSample * sample,
|
|
const GstVideoInfo & info, const CUipcMemHandle & handle, GstClockTime pts)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
GstCudaIpcServerClass *klass;
|
|
|
|
g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR);
|
|
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
|
|
|
|
if (server->ipc_mode != GST_CUDA_IPC_LEGACY) {
|
|
GST_ERROR_OBJECT (server, "Invalid call");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
priv = server->priv;
|
|
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
|
|
GST_LOG_OBJECT (server, "Sending data");
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (priv->aborted) {
|
|
GST_DEBUG_OBJECT (server, "Was aborted");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
auto data = std::make_shared < GstCudaIpcServerData > ();
|
|
data->sample = gst_sample_ref (sample);
|
|
data->info = info;
|
|
data->handle = handle;
|
|
data->pts = pts;
|
|
data->seq_num = priv->seq_num;
|
|
|
|
priv->seq_num++;
|
|
priv->data = data;
|
|
lk.unlock ();
|
|
|
|
klass->invoke (server);
|
|
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
GstFlowReturn
|
|
gst_cuda_ipc_server_send_mmap_data (GstCudaIpcServer * server,
|
|
GstSample * sample, const GstVideoInfo & info, GstCudaSharableHandle handle,
|
|
GstClockTime pts)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
GstCudaIpcServerClass *klass;
|
|
|
|
g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR);
|
|
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
|
|
|
|
if (server->ipc_mode != GST_CUDA_IPC_MMAP) {
|
|
GST_ERROR_OBJECT (server, "Invalid call");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
priv = server->priv;
|
|
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
|
|
GST_LOG_OBJECT (server, "Sending data");
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (priv->aborted) {
|
|
GST_DEBUG_OBJECT (server, "Was aborted");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
auto data = std::make_shared < GstCudaIpcServerData > ();
|
|
data->sample = gst_sample_ref (sample);
|
|
data->info = info;
|
|
data->os_handle = handle;
|
|
data->pts = pts;
|
|
data->seq_num = priv->seq_num;
|
|
|
|
priv->seq_num++;
|
|
priv->data = data;
|
|
lk.unlock ();
|
|
|
|
klass->invoke (server);
|
|
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
static gpointer
|
|
gst_cuda_ipc_server_loop_thread_func (GstCudaIpcServer * self)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = self->priv;
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
g_assert (klass->loop);
|
|
|
|
GST_DEBUG_OBJECT (self, "Start loop thread");
|
|
|
|
klass->loop (self);
|
|
priv->conn_map.clear ();
|
|
|
|
GST_DEBUG_OBJECT (self, "Exit loop thread");
|
|
|
|
return nullptr;
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_run (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server));
|
|
|
|
priv = server->priv;
|
|
|
|
GST_DEBUG_OBJECT (server, "Running");
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (priv->loop_thread) {
|
|
GST_DEBUG_OBJECT (server, "Already running");
|
|
return;
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (server, "Spawning thread");
|
|
priv->loop_thread = g_thread_new ("cuda-ipc-server",
|
|
(GThreadFunc) gst_cuda_ipc_server_loop_thread_func, server);
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_stop (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
GstCudaIpcServerClass *klass;
|
|
|
|
g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server));
|
|
|
|
priv = server->priv;
|
|
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
|
|
GST_DEBUG_OBJECT (server, "Stopping");
|
|
priv->shutdown = true;
|
|
klass->invoke (server);
|
|
|
|
g_clear_pointer (&priv->loop_thread, g_thread_join);
|
|
|
|
GST_DEBUG_OBJECT (server, "Stopped");
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_close_connection (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = self->priv;
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
GST_DEBUG_OBJECT (self, "Closing conn-id %u", conn->id);
|
|
|
|
priv->conn_map.erase (conn->id);
|
|
|
|
if (priv->shutdown && priv->conn_map.empty ()) {
|
|
GST_DEBUG_OBJECT (self, "All connection were closed");
|
|
klass->terminate (self);
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_send_msg (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
if (!klass->send_msg (self, conn)) {
|
|
GST_WARNING_OBJECT (self, "Send msg failed");
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_config_data (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCaps *caps = gst_sample_get_caps (conn->data->sample);
|
|
|
|
gst_caps_replace (&conn->caps, caps);
|
|
|
|
gst_cuda_ipc_pkt_build_config (conn->server_msg, self->pid,
|
|
self->ipc_mode == GST_CUDA_IPC_MMAP, conn->caps);
|
|
conn->type = GstCudaIpcPktType::CONFIG;
|
|
|
|
GST_LOG_OBJECT (self, "Sending CONFIG, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_send_msg (self, conn);
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_on_incoming_connection (GstCudaIpcServer * server,
|
|
std::shared_ptr < GstCudaIpcServerConn > conn)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = server->priv;
|
|
|
|
priv->lock.lock ();
|
|
conn->server = server;
|
|
conn->id = priv->next_conn_id;
|
|
conn->context = (GstCudaContext *) gst_object_ref (server->context);
|
|
conn->data = priv->data;
|
|
priv->next_conn_id++;
|
|
priv->lock.unlock ();
|
|
|
|
/* *INDENT-OFF* */
|
|
priv->conn_map.insert ({conn->id, conn});
|
|
/* *INDENT-ON* */
|
|
|
|
if (conn->data) {
|
|
conn->configured = true;
|
|
gst_cuda_ipc_server_config_data (server, conn.get ());
|
|
} else {
|
|
GST_DEBUG_OBJECT (server, "Have no config data yet, waiting for data");
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_have_data (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
GstCaps *caps;
|
|
GstBuffer *buffer;
|
|
GstCudaMemory *cmem;
|
|
|
|
if (!conn->data) {
|
|
GST_ERROR_OBJECT (self, "Have no data to send, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
return;
|
|
}
|
|
|
|
conn->pending_have_data = false;
|
|
conn->seq_num = conn->data->seq_num + 1;
|
|
|
|
caps = gst_sample_get_caps (conn->data->sample);
|
|
if (!conn->caps || !gst_caps_is_equal (conn->caps, caps)) {
|
|
GST_DEBUG_OBJECT (self, "Sending caps %" GST_PTR_FORMAT " to conn-id %u",
|
|
caps, conn->id);
|
|
gst_caps_replace (&conn->caps, caps);
|
|
} else {
|
|
caps = nullptr;
|
|
}
|
|
|
|
buffer = gst_sample_get_buffer (conn->data->sample);
|
|
cmem = (GstCudaMemory *) gst_buffer_peek_memory (buffer, 0);
|
|
|
|
if (self->ipc_mode == GST_CUDA_IPC_LEGACY) {
|
|
auto handle_dump = gst_cuda_ipc_mem_handle_to_string (conn->data->handle);
|
|
GST_LOG_OBJECT (self, "Sending HAVE-DATA with handle %s, conn-id: %u",
|
|
handle_dump.c_str (), conn->id);
|
|
|
|
if (!gst_cuda_ipc_pkt_build_have_data (conn->server_msg, conn->data->pts,
|
|
conn->data->info, conn->data->handle, caps)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't build HAVE-DATA pkt, conn-id: %u",
|
|
conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
return;
|
|
}
|
|
|
|
conn->type = GstCudaIpcPktType::HAVE_DATA;
|
|
} else {
|
|
guint max_size = cmem->mem.maxsize;
|
|
GST_LOG_OBJECT (self, "Sending HAVE-MMAP-DATA with handle %"
|
|
GST_CUDA_OS_HANDLE_FORMAT ", conn-id: %u", conn->data->os_handle,
|
|
conn->id);
|
|
if (!gst_cuda_ipc_pkt_build_have_mmap_data (conn->server_msg,
|
|
conn->data->pts, conn->data->info, max_size, conn->data->os_handle,
|
|
caps)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't build HAVE-MMAP-DATA pkt, conn-id: %u",
|
|
conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
return;
|
|
}
|
|
|
|
conn->type = GstCudaIpcPktType::HAVE_MMAP_DATA;
|
|
if (klass->send_mmap_msg) {
|
|
if (!klass->send_mmap_msg (self, conn, conn->data->os_handle)) {
|
|
GST_WARNING_OBJECT (self, "Send msg failed");
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
}
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
gst_cuda_ipc_server_send_msg (self, conn);
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_wait_msg (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
if (!klass->wait_msg (self, conn)) {
|
|
GST_WARNING_OBJECT (self, "Wait msg failed, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
}
|
|
}
|
|
|
|
static bool
|
|
gst_cuda_ipc_server_on_release_data (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
bool found = false;
|
|
|
|
if (self->ipc_mode == GST_CUDA_IPC_LEGACY) {
|
|
CUipcMemHandle handle;
|
|
if (!gst_cuda_ipc_pkt_parse_release_data (conn->client_msg, handle)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-DATA, conn-id: %u",
|
|
conn->id);
|
|
return false;
|
|
}
|
|
|
|
auto handle_dump = gst_cuda_ipc_mem_handle_to_string (handle);
|
|
GST_LOG_OBJECT (self, "RELEASE-DATA %s, conn-id: %u",
|
|
handle_dump.c_str (), conn->id);
|
|
|
|
for (auto it = conn->peer_handles.begin (); it != conn->peer_handles.end ();
|
|
it++) {
|
|
CUipcMemHandle & tmp = (*it)->handle;
|
|
if (gst_cuda_ipc_handle_is_equal (tmp, handle)) {
|
|
found = true;
|
|
conn->peer_handles.erase (it);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!found) {
|
|
GST_WARNING_OBJECT (self,
|
|
"Unexpected memory handle to remove %s, conn-id: %u",
|
|
handle_dump.c_str (), conn->id);
|
|
return false;
|
|
}
|
|
} else {
|
|
GstCudaSharableHandle handle;
|
|
if (!gst_cuda_ipc_pkt_parse_release_mmap_data (conn->client_msg, &handle)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-MMAP-DATA, conn-id: %u",
|
|
conn->id);
|
|
return false;
|
|
}
|
|
|
|
GST_LOG_OBJECT (self, "RELEASE-MMAP-DATA %" GST_CUDA_OS_HANDLE_FORMAT
|
|
", conn-id %u", handle, conn->id);
|
|
|
|
for (auto it = conn->peer_handles.begin (); it != conn->peer_handles.end ();
|
|
it++) {
|
|
if ((*it)->os_handle == handle) {
|
|
found = true;
|
|
conn->peer_handles.erase (it);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!found) {
|
|
GST_WARNING_OBJECT (self,
|
|
"Unexpected memory handle to remove %" GST_CUDA_OS_HANDLE_FORMAT
|
|
", conn-id: %u", handle, conn->id);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
GST_LOG_OBJECT (self, "Client is holding %" G_GSIZE_FORMAT " handles",
|
|
conn->peer_handles.size ());
|
|
|
|
return true;
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_wait_msg_finish (GstCudaIpcServer * server,
|
|
GstCudaIpcServerConn * conn, bool result)
|
|
{
|
|
GstCudaIpcPacketHeader header;
|
|
|
|
if (!result) {
|
|
GST_WARNING_OBJECT (server, "Wait msg failed, conn->id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
if (!gst_cuda_ipc_pkt_identify (conn->client_msg, header)) {
|
|
GST_ERROR_OBJECT (server, "Broken header, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
switch (header.type) {
|
|
case GstCudaIpcPktType::NEED_DATA:
|
|
GST_LOG_OBJECT (server, "NEED-DATA, conn-id: %u", conn->id);
|
|
if (!conn->data) {
|
|
GST_LOG_OBJECT (server, "Wait for available data, conn-id: %u",
|
|
conn->id);
|
|
conn->pending_have_data = true;
|
|
gst_cuda_ipc_server_on_idle (server);
|
|
return;
|
|
}
|
|
gst_cuda_ipc_server_have_data (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::READ_DONE:
|
|
GST_LOG_OBJECT (server, "READ-DONE, conn-id: %u", conn->id);
|
|
|
|
if (!conn->data) {
|
|
GST_ERROR_OBJECT (server, "Unexpected READ-DATA, conn-id: %u",
|
|
conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
conn->peer_handles.push_back (conn->data);
|
|
conn->data = nullptr;
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::RELEASE_DATA:
|
|
case GstCudaIpcPktType::RELEASE_MMAP_DATA:
|
|
GST_LOG_OBJECT (server, "RELEASE-DATA, conn-id: %u", conn->id);
|
|
if (!gst_cuda_ipc_server_on_release_data (server, conn))
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
else
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::FIN:
|
|
GST_DEBUG_OBJECT (server, "FIN, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
break;
|
|
default:
|
|
GST_ERROR_OBJECT (server, "Unexpected packet, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
break;
|
|
}
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_send_msg_finish (GstCudaIpcServer * server,
|
|
GstCudaIpcServerConn * conn, bool result)
|
|
{
|
|
if (!result) {
|
|
GST_WARNING_OBJECT (server, "Send msg failed, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
switch (conn->type) {
|
|
case GstCudaIpcPktType::CONFIG:
|
|
GST_DEBUG_OBJECT (server, "Sent CONFIG-DATA, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::HAVE_DATA:
|
|
GST_LOG_OBJECT (server, "Sent HAVE-DATA, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::HAVE_MMAP_DATA:
|
|
GST_LOG_OBJECT (server, "Sent HAVE-MMAP-DATA, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::EOS:
|
|
GST_DEBUG_OBJECT (server, "Sent EOS, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
default:
|
|
GST_ERROR_OBJECT (server, "Unexpected msg type %d", (gint) conn->type);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_eos (GstCudaIpcServer * self, GstCudaIpcServerConn * conn)
|
|
{
|
|
gst_cuda_ipc_pkt_build_eos (conn->server_msg);
|
|
conn->eos = true;
|
|
conn->type = GstCudaIpcPktType::EOS;
|
|
|
|
gst_cuda_ipc_server_send_msg (self, conn);
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_on_idle (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
GstCudaIpcServerPrivate *priv = server->priv;
|
|
|
|
GST_LOG_OBJECT (server, "idle");
|
|
|
|
if (priv->shutdown) {
|
|
GST_DEBUG_OBJECT (server, "We are stopping");
|
|
|
|
if (priv->conn_map.empty ()) {
|
|
GST_DEBUG_OBJECT (server, "All connections were closed");
|
|
klass->terminate (server);
|
|
return;
|
|
}
|
|
|
|
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_eos;
|
|
/* *INDENT-OFF* */
|
|
for (auto it : priv->conn_map) {
|
|
auto conn = it.second;
|
|
if (conn->eos || !conn->pending_have_data)
|
|
continue;
|
|
|
|
to_send_eos.push_back (conn);
|
|
}
|
|
|
|
for (auto it : to_send_eos) {
|
|
GST_DEBUG_OBJECT (server, "Sending EOS to conn-id: %u", it->id);
|
|
gst_cuda_ipc_server_eos (server, it.get ());
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (server, "Have %" G_GSIZE_FORMAT " alive connections",
|
|
priv->conn_map.size());
|
|
for (auto it : priv->conn_map) {
|
|
auto conn = it.second;
|
|
GST_DEBUG_OBJECT (server, "conn-id %u"
|
|
" peer handle size %" G_GSIZE_FORMAT, conn->id,
|
|
conn->peer_handles.size ());
|
|
}
|
|
/* *INDENT-ON* */
|
|
|
|
return;
|
|
}
|
|
|
|
if (priv->conn_map.empty ()) {
|
|
GST_LOG_OBJECT (server, "Have no connection");
|
|
return;
|
|
}
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (!priv->data)
|
|
return;
|
|
|
|
/* *INDENT-OFF* */
|
|
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_config_data;
|
|
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_have_data;
|
|
for (auto it : priv->conn_map) {
|
|
auto conn = it.second;
|
|
if (!conn->configured) {
|
|
conn->configured = true;
|
|
conn->data = priv->data;
|
|
to_config_data.push_back (conn);
|
|
} else if (conn->pending_have_data && conn->seq_num <= priv->data->seq_num) {
|
|
conn->data = priv->data;
|
|
to_send_have_data.push_back (conn);
|
|
}
|
|
}
|
|
lk.unlock ();
|
|
|
|
for (auto it: to_config_data)
|
|
gst_cuda_ipc_server_config_data (server, it.get ());
|
|
|
|
for (auto it: to_send_have_data)
|
|
gst_cuda_ipc_server_have_data (server, it.get ());
|
|
/* *INDENT-ON* */
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_abort (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = server->priv;
|
|
|
|
priv->aborted = true;
|
|
}
|