mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-02-24 15:06:38 +00:00
As per the C++ standard, any usage of a std::set::iterator after it has been erased from the collection results in undefined behaviour. This has resulted in application crashes due to CUDA illegal address errors. This commit fixes the issue by copying and incrementing the iterator within any for-loops that also invoke std::set::erase. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/8472>
713 lines
20 KiB
C++
713 lines
20 KiB
C++
/* GStreamer
|
|
* Copyright (C) 2023 Seungha Yang <seungha@centricular.com>
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include "gstcudaipcserver.h"
|
|
#include <unordered_map>
|
|
#include <mutex>
|
|
#include <atomic>
|
|
#include <gst/cuda/gstcuda-private.h>
|
|
|
|
GST_DEBUG_CATEGORY (cuda_ipc_server_debug);
|
|
#define GST_CAT_DEFAULT cuda_ipc_server_debug
|
|
|
|
/**
|
|
* GstCudaIpcMode:
|
|
*
|
|
* Since: 1.24
|
|
*/
|
|
GType
|
|
gst_cuda_ipc_mode_get_type (void)
|
|
{
|
|
static GType type = 0;
|
|
static const GEnumValue ipc_modes[] = {
|
|
{GST_CUDA_IPC_LEGACY, "Legacy IPC mode", "legacy"},
|
|
{GST_CUDA_IPC_MMAP, "Memory Map", "mmap"},
|
|
{0, nullptr, nullptr}
|
|
};
|
|
|
|
GST_CUDA_CALL_ONCE_BEGIN {
|
|
type = g_enum_register_static ("GstCudaIpcMode", ipc_modes);
|
|
} GST_CUDA_CALL_ONCE_END;
|
|
|
|
return type;
|
|
}
|
|
|
|
struct GstCudaIpcServerPrivate
|
|
{
|
|
GstCudaIpcServerPrivate ()
|
|
{
|
|
shutdown = false;
|
|
aborted = false;
|
|
}
|
|
|
|
std::mutex lock;
|
|
guint64 seq_num = 0;
|
|
guint next_conn_id = 0;
|
|
std::unordered_map < guint,
|
|
std::shared_ptr < GstCudaIpcServerConn >> conn_map;
|
|
GThread *loop_thread = nullptr;
|
|
std::atomic < bool >shutdown;
|
|
std::atomic < bool >aborted;
|
|
std::shared_ptr < GstCudaIpcServerData > data;
|
|
};
|
|
|
|
static void gst_cuda_ipc_server_dispose (GObject * object);
|
|
static void gst_cuda_ipc_server_finalize (GObject * object);
|
|
|
|
#define gst_cuda_ipc_server_parent_class parent_class
|
|
G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstCudaIpcServer,
|
|
gst_cuda_ipc_server, GST_TYPE_OBJECT,
|
|
GST_DEBUG_CATEGORY_INIT (cuda_ipc_server_debug, "cudaipcserver",
|
|
0, "cudaipcserver"));
|
|
|
|
static void
|
|
gst_cuda_ipc_server_class_init (GstCudaIpcServerClass * klass)
|
|
{
|
|
GObjectClass *object_class = G_OBJECT_CLASS (klass);
|
|
|
|
object_class->finalize = gst_cuda_ipc_server_finalize;
|
|
object_class->dispose = gst_cuda_ipc_server_dispose;
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_init (GstCudaIpcServer * self)
|
|
{
|
|
self->priv = new GstCudaIpcServerPrivate ();
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_dispose (GObject * object)
|
|
{
|
|
GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object);
|
|
GstCudaIpcServerPrivate *priv = self->priv;
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
GST_DEBUG_OBJECT (self, "dispose");
|
|
|
|
g_assert (klass->terminate);
|
|
klass->terminate (self);
|
|
|
|
g_clear_pointer (&priv->loop_thread, g_thread_join);
|
|
|
|
G_OBJECT_CLASS (parent_class)->dispose (object);
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_finalize (GObject * object)
|
|
{
|
|
GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object);
|
|
|
|
GST_DEBUG_OBJECT (self, "finalize");
|
|
|
|
gst_clear_object (&self->context);
|
|
|
|
delete self->priv;
|
|
|
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
|
}
|
|
|
|
GstFlowReturn
|
|
gst_cuda_ipc_server_send_data (GstCudaIpcServer * server, GstSample * sample,
|
|
const GstVideoInfo & info, const CUipcMemHandle & handle, GstClockTime pts,
|
|
GByteArray * meta)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
GstCudaIpcServerClass *klass;
|
|
|
|
g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR);
|
|
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
|
|
|
|
if (server->ipc_mode != GST_CUDA_IPC_LEGACY) {
|
|
GST_ERROR_OBJECT (server, "Invalid call");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
priv = server->priv;
|
|
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
|
|
GST_LOG_OBJECT (server, "Sending data");
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (priv->aborted) {
|
|
GST_DEBUG_OBJECT (server, "Was aborted");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
auto data = std::make_shared < GstCudaIpcServerData > ();
|
|
data->sample = gst_sample_ref (sample);
|
|
data->info = info;
|
|
data->handle = handle;
|
|
data->pts = pts;
|
|
data->seq_num = priv->seq_num;
|
|
if (meta && meta->len) {
|
|
data->meta.resize (meta->len);
|
|
memcpy (data->meta.data (), meta->data, meta->len);
|
|
}
|
|
|
|
priv->seq_num++;
|
|
priv->data = data;
|
|
lk.unlock ();
|
|
|
|
klass->invoke (server);
|
|
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
GstFlowReturn
|
|
gst_cuda_ipc_server_send_mmap_data (GstCudaIpcServer * server,
|
|
GstSample * sample, const GstVideoInfo & info, GstCudaSharableHandle handle,
|
|
GstClockTime pts, GByteArray * meta)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
GstCudaIpcServerClass *klass;
|
|
|
|
g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR);
|
|
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
|
|
|
|
if (server->ipc_mode != GST_CUDA_IPC_MMAP) {
|
|
GST_ERROR_OBJECT (server, "Invalid call");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
priv = server->priv;
|
|
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
|
|
GST_LOG_OBJECT (server, "Sending data");
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (priv->aborted) {
|
|
GST_DEBUG_OBJECT (server, "Was aborted");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
auto data = std::make_shared < GstCudaIpcServerData > ();
|
|
data->sample = gst_sample_ref (sample);
|
|
data->info = info;
|
|
data->os_handle = handle;
|
|
data->pts = pts;
|
|
data->seq_num = priv->seq_num;
|
|
if (meta && meta->len) {
|
|
data->meta.resize (meta->len);
|
|
memcpy (data->meta.data (), meta->data, meta->len);
|
|
}
|
|
|
|
priv->seq_num++;
|
|
priv->data = data;
|
|
lk.unlock ();
|
|
|
|
klass->invoke (server);
|
|
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
static gpointer
|
|
gst_cuda_ipc_server_loop_thread_func (GstCudaIpcServer * self)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = self->priv;
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
g_assert (klass->loop);
|
|
|
|
GST_DEBUG_OBJECT (self, "Start loop thread");
|
|
|
|
klass->loop (self);
|
|
priv->conn_map.clear ();
|
|
|
|
GST_DEBUG_OBJECT (self, "Exit loop thread");
|
|
|
|
return nullptr;
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_run (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server));
|
|
|
|
priv = server->priv;
|
|
|
|
GST_DEBUG_OBJECT (server, "Running");
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (priv->loop_thread) {
|
|
GST_DEBUG_OBJECT (server, "Already running");
|
|
return;
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (server, "Spawning thread");
|
|
priv->loop_thread = g_thread_new ("cuda-ipc-server",
|
|
(GThreadFunc) gst_cuda_ipc_server_loop_thread_func, server);
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_stop (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
GstCudaIpcServerClass *klass;
|
|
|
|
g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server));
|
|
|
|
priv = server->priv;
|
|
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
|
|
GST_DEBUG_OBJECT (server, "Stopping");
|
|
priv->shutdown = true;
|
|
klass->invoke (server);
|
|
|
|
g_clear_pointer (&priv->loop_thread, g_thread_join);
|
|
|
|
GST_DEBUG_OBJECT (server, "Stopped");
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_close_connection (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = self->priv;
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
GST_DEBUG_OBJECT (self, "Closing conn-id %u", conn->id);
|
|
|
|
priv->conn_map.erase (conn->id);
|
|
|
|
if (priv->shutdown && priv->conn_map.empty ()) {
|
|
GST_DEBUG_OBJECT (self, "All connection were closed");
|
|
klass->terminate (self);
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_send_msg (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
if (!klass->send_msg (self, conn)) {
|
|
GST_WARNING_OBJECT (self, "Send msg failed");
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_config_data (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCaps *caps = gst_sample_get_caps (conn->data->sample);
|
|
|
|
gst_caps_replace (&conn->caps, caps);
|
|
|
|
gst_cuda_ipc_pkt_build_config (conn->server_msg, self->pid,
|
|
self->ipc_mode == GST_CUDA_IPC_MMAP, conn->caps);
|
|
conn->type = GstCudaIpcPktType::CONFIG;
|
|
|
|
GST_LOG_OBJECT (self, "Sending CONFIG, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_send_msg (self, conn);
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_on_incoming_connection (GstCudaIpcServer * server,
|
|
std::shared_ptr < GstCudaIpcServerConn > conn)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = server->priv;
|
|
|
|
priv->lock.lock ();
|
|
conn->server = server;
|
|
conn->id = priv->next_conn_id;
|
|
conn->context = (GstCudaContext *) gst_object_ref (server->context);
|
|
conn->data = priv->data;
|
|
priv->next_conn_id++;
|
|
priv->lock.unlock ();
|
|
|
|
/* *INDENT-OFF* */
|
|
priv->conn_map.insert ({conn->id, conn});
|
|
/* *INDENT-ON* */
|
|
|
|
if (conn->data) {
|
|
conn->configured = true;
|
|
gst_cuda_ipc_server_config_data (server, conn.get ());
|
|
} else {
|
|
GST_DEBUG_OBJECT (server, "Have no config data yet, waiting for data");
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_have_data (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
GstCaps *caps;
|
|
GstBuffer *buffer;
|
|
GstCudaMemory *cmem;
|
|
|
|
if (!conn->data) {
|
|
GST_ERROR_OBJECT (self, "Have no data to send, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
return;
|
|
}
|
|
|
|
conn->pending_have_data = false;
|
|
conn->seq_num = conn->data->seq_num + 1;
|
|
|
|
caps = gst_sample_get_caps (conn->data->sample);
|
|
if (!conn->caps || !gst_caps_is_equal (conn->caps, caps)) {
|
|
GST_DEBUG_OBJECT (self, "Sending caps %" GST_PTR_FORMAT " to conn-id %u",
|
|
caps, conn->id);
|
|
gst_caps_replace (&conn->caps, caps);
|
|
} else {
|
|
caps = nullptr;
|
|
}
|
|
|
|
buffer = gst_sample_get_buffer (conn->data->sample);
|
|
cmem = (GstCudaMemory *) gst_buffer_peek_memory (buffer, 0);
|
|
|
|
if (self->ipc_mode == GST_CUDA_IPC_LEGACY) {
|
|
auto handle_dump = gst_cuda_ipc_mem_handle_to_string (conn->data->handle);
|
|
GST_LOG_OBJECT (self, "Sending HAVE-DATA with handle %s, conn-id: %u",
|
|
handle_dump.c_str (), conn->id);
|
|
|
|
if (!gst_cuda_ipc_pkt_build_have_data (conn->server_msg, conn->data->pts,
|
|
conn->data->info, conn->data->handle, caps, conn->data->meta)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't build HAVE-DATA pkt, conn-id: %u",
|
|
conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
return;
|
|
}
|
|
|
|
conn->type = GstCudaIpcPktType::HAVE_DATA;
|
|
} else {
|
|
guint max_size = cmem->mem.maxsize;
|
|
GST_LOG_OBJECT (self, "Sending HAVE-MMAP-DATA with handle %"
|
|
GST_CUDA_OS_HANDLE_FORMAT ", conn-id: %u", conn->data->os_handle,
|
|
conn->id);
|
|
if (!gst_cuda_ipc_pkt_build_have_mmap_data (conn->server_msg,
|
|
conn->data->pts, conn->data->info, max_size, conn->data->os_handle,
|
|
caps, conn->data->meta)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't build HAVE-MMAP-DATA pkt, conn-id: %u",
|
|
conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
return;
|
|
}
|
|
|
|
conn->type = GstCudaIpcPktType::HAVE_MMAP_DATA;
|
|
if (klass->send_mmap_msg) {
|
|
if (!klass->send_mmap_msg (self, conn, conn->data->os_handle)) {
|
|
GST_WARNING_OBJECT (self, "Send msg failed");
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
}
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
gst_cuda_ipc_server_send_msg (self, conn);
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_wait_msg (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
if (!klass->wait_msg (self, conn)) {
|
|
GST_WARNING_OBJECT (self, "Wait msg failed, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
}
|
|
}
|
|
|
|
static bool
|
|
gst_cuda_ipc_server_on_release_data (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
bool found = false;
|
|
|
|
if (self->ipc_mode == GST_CUDA_IPC_LEGACY) {
|
|
CUipcMemHandle handle;
|
|
if (!gst_cuda_ipc_pkt_parse_release_data (conn->client_msg, handle)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-DATA, conn-id: %u",
|
|
conn->id);
|
|
return false;
|
|
}
|
|
|
|
auto handle_dump = gst_cuda_ipc_mem_handle_to_string (handle);
|
|
GST_LOG_OBJECT (self, "RELEASE-DATA %s, conn-id: %u",
|
|
handle_dump.c_str (), conn->id);
|
|
|
|
for (auto it = conn->peer_handles.begin ();
|
|
it != conn->peer_handles.end ();) {
|
|
auto current = it++;
|
|
CUipcMemHandle & tmp = (*current)->handle;
|
|
if (gst_cuda_ipc_handle_is_equal (tmp, handle)) {
|
|
found = true;
|
|
conn->peer_handles.erase (current);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!found) {
|
|
GST_WARNING_OBJECT (self,
|
|
"Unexpected memory handle to remove %s, conn-id: %u",
|
|
handle_dump.c_str (), conn->id);
|
|
return false;
|
|
}
|
|
} else {
|
|
GstCudaSharableHandle handle;
|
|
if (!gst_cuda_ipc_pkt_parse_release_mmap_data (conn->client_msg, &handle)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-MMAP-DATA, conn-id: %u",
|
|
conn->id);
|
|
return false;
|
|
}
|
|
|
|
GST_LOG_OBJECT (self, "RELEASE-MMAP-DATA %" GST_CUDA_OS_HANDLE_FORMAT
|
|
", conn-id %u", handle, conn->id);
|
|
|
|
for (auto it = conn->peer_handles.begin ();
|
|
it != conn->peer_handles.end ();) {
|
|
auto current = it++;
|
|
if ((*current)->os_handle == handle) {
|
|
found = true;
|
|
conn->peer_handles.erase (current);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!found) {
|
|
GST_WARNING_OBJECT (self,
|
|
"Unexpected memory handle to remove %" GST_CUDA_OS_HANDLE_FORMAT
|
|
", conn-id: %u", handle, conn->id);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
GST_LOG_OBJECT (self, "Client is holding %" G_GSIZE_FORMAT " handles",
|
|
conn->peer_handles.size ());
|
|
|
|
return true;
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_wait_msg_finish (GstCudaIpcServer * server,
|
|
GstCudaIpcServerConn * conn, bool result)
|
|
{
|
|
GstCudaIpcPacketHeader header;
|
|
|
|
if (!result) {
|
|
GST_WARNING_OBJECT (server, "Wait msg failed, conn->id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
if (!gst_cuda_ipc_pkt_identify (conn->client_msg, header)) {
|
|
GST_ERROR_OBJECT (server, "Broken header, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
switch (header.type) {
|
|
case GstCudaIpcPktType::NEED_DATA:
|
|
GST_LOG_OBJECT (server, "NEED-DATA, conn-id: %u", conn->id);
|
|
if (!conn->data) {
|
|
GST_LOG_OBJECT (server, "Wait for available data, conn-id: %u",
|
|
conn->id);
|
|
conn->pending_have_data = true;
|
|
gst_cuda_ipc_server_on_idle (server);
|
|
return;
|
|
}
|
|
gst_cuda_ipc_server_have_data (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::READ_DONE:
|
|
GST_LOG_OBJECT (server, "READ-DONE, conn-id: %u", conn->id);
|
|
|
|
if (!conn->data) {
|
|
GST_ERROR_OBJECT (server, "Unexpected READ-DATA, conn-id: %u",
|
|
conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
conn->peer_handles.push_back (conn->data);
|
|
conn->data = nullptr;
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::RELEASE_DATA:
|
|
case GstCudaIpcPktType::RELEASE_MMAP_DATA:
|
|
GST_LOG_OBJECT (server, "RELEASE-DATA, conn-id: %u", conn->id);
|
|
if (!gst_cuda_ipc_server_on_release_data (server, conn))
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
else
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::FIN:
|
|
GST_DEBUG_OBJECT (server, "FIN, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
break;
|
|
default:
|
|
GST_ERROR_OBJECT (server, "Unexpected packet, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
break;
|
|
}
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_send_msg_finish (GstCudaIpcServer * server,
|
|
GstCudaIpcServerConn * conn, bool result)
|
|
{
|
|
if (!result) {
|
|
GST_WARNING_OBJECT (server, "Send msg failed, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
switch (conn->type) {
|
|
case GstCudaIpcPktType::CONFIG:
|
|
GST_DEBUG_OBJECT (server, "Sent CONFIG-DATA, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::HAVE_DATA:
|
|
GST_LOG_OBJECT (server, "Sent HAVE-DATA, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::HAVE_MMAP_DATA:
|
|
GST_LOG_OBJECT (server, "Sent HAVE-MMAP-DATA, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::EOS:
|
|
GST_DEBUG_OBJECT (server, "Sent EOS, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
default:
|
|
GST_ERROR_OBJECT (server, "Unexpected msg type %d", (gint) conn->type);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_eos (GstCudaIpcServer * self, GstCudaIpcServerConn * conn)
|
|
{
|
|
gst_cuda_ipc_pkt_build_eos (conn->server_msg);
|
|
conn->eos = true;
|
|
conn->type = GstCudaIpcPktType::EOS;
|
|
|
|
gst_cuda_ipc_server_send_msg (self, conn);
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_on_idle (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
GstCudaIpcServerPrivate *priv = server->priv;
|
|
|
|
GST_LOG_OBJECT (server, "idle");
|
|
|
|
if (priv->shutdown) {
|
|
GST_DEBUG_OBJECT (server, "We are stopping");
|
|
|
|
if (priv->conn_map.empty ()) {
|
|
GST_DEBUG_OBJECT (server, "All connections were closed");
|
|
klass->terminate (server);
|
|
return;
|
|
}
|
|
|
|
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_eos;
|
|
/* *INDENT-OFF* */
|
|
for (auto it : priv->conn_map) {
|
|
auto conn = it.second;
|
|
if (conn->eos || !conn->pending_have_data)
|
|
continue;
|
|
|
|
to_send_eos.push_back (conn);
|
|
}
|
|
|
|
for (auto it : to_send_eos) {
|
|
GST_DEBUG_OBJECT (server, "Sending EOS to conn-id: %u", it->id);
|
|
gst_cuda_ipc_server_eos (server, it.get ());
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (server, "Have %" G_GSIZE_FORMAT " alive connections",
|
|
priv->conn_map.size());
|
|
|
|
size_t num_closed = 0;
|
|
for (auto it : priv->conn_map) {
|
|
auto conn = it.second;
|
|
GST_DEBUG_OBJECT (server, "conn-id %u"
|
|
" peer handle size %" G_GSIZE_FORMAT, conn->id,
|
|
conn->peer_handles.size ());
|
|
|
|
/* Cannot erase conn since it's still referenced.
|
|
* Manually close connection */
|
|
if (conn->peer_handles.empty ()) {
|
|
conn->CloseConn ();
|
|
num_closed++;
|
|
}
|
|
}
|
|
/* *INDENT-ON* */
|
|
|
|
if (priv->conn_map.size () == num_closed) {
|
|
GST_DEBUG_OBJECT (server, "All connections were closed");
|
|
klass->terminate (server);
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
if (priv->conn_map.empty ()) {
|
|
GST_LOG_OBJECT (server, "Have no connection");
|
|
return;
|
|
}
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (!priv->data)
|
|
return;
|
|
|
|
/* *INDENT-OFF* */
|
|
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_config_data;
|
|
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_have_data;
|
|
for (auto it : priv->conn_map) {
|
|
auto conn = it.second;
|
|
if (!conn->configured) {
|
|
conn->configured = true;
|
|
conn->data = priv->data;
|
|
to_config_data.push_back (conn);
|
|
} else if (conn->pending_have_data && conn->seq_num <= priv->data->seq_num) {
|
|
conn->data = priv->data;
|
|
to_send_have_data.push_back (conn);
|
|
}
|
|
}
|
|
lk.unlock ();
|
|
|
|
for (auto it: to_config_data)
|
|
gst_cuda_ipc_server_config_data (server, it.get ());
|
|
|
|
for (auto it: to_send_have_data)
|
|
gst_cuda_ipc_server_have_data (server, it.get ());
|
|
/* *INDENT-ON* */
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_abort (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = server->priv;
|
|
|
|
priv->aborted = true;
|
|
}
|