mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-18 07:47:17 +00:00
07ba225183
Forward custom meta to peer ipcsrc elements Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/6006>
706 lines
20 KiB
C++
706 lines
20 KiB
C++
/* GStreamer
|
|
* Copyright (C) 2023 Seungha Yang <seungha@centricular.com>
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include "gstcudaipcserver.h"
|
|
#include <unordered_map>
|
|
#include <mutex>
|
|
#include <atomic>
|
|
#include <gst/cuda/gstcuda-private.h>
|
|
|
|
GST_DEBUG_CATEGORY (cuda_ipc_server_debug);
|
|
#define GST_CAT_DEFAULT cuda_ipc_server_debug
|
|
|
|
GType
|
|
gst_cuda_ipc_mode_get_type (void)
|
|
{
|
|
static GType type = 0;
|
|
static const GEnumValue ipc_modes[] = {
|
|
{GST_CUDA_IPC_LEGACY, "Legacy IPC mode", "legacy"},
|
|
{GST_CUDA_IPC_MMAP, "Memory Map", "mmap"},
|
|
{0, nullptr, nullptr}
|
|
};
|
|
|
|
GST_CUDA_CALL_ONCE_BEGIN {
|
|
type = g_enum_register_static ("GstCudaIpcMode", ipc_modes);
|
|
} GST_CUDA_CALL_ONCE_END;
|
|
|
|
return type;
|
|
}
|
|
|
|
struct GstCudaIpcServerPrivate
|
|
{
|
|
GstCudaIpcServerPrivate ()
|
|
{
|
|
shutdown = false;
|
|
aborted = false;
|
|
}
|
|
|
|
std::mutex lock;
|
|
guint64 seq_num = 0;
|
|
guint next_conn_id = 0;
|
|
std::unordered_map < guint,
|
|
std::shared_ptr < GstCudaIpcServerConn >> conn_map;
|
|
GThread *loop_thread = nullptr;
|
|
std::atomic < bool >shutdown;
|
|
std::atomic < bool >aborted;
|
|
std::shared_ptr < GstCudaIpcServerData > data;
|
|
};
|
|
|
|
static void gst_cuda_ipc_server_dispose (GObject * object);
|
|
static void gst_cuda_ipc_server_finalize (GObject * object);
|
|
|
|
#define gst_cuda_ipc_server_parent_class parent_class
|
|
G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstCudaIpcServer,
|
|
gst_cuda_ipc_server, GST_TYPE_OBJECT,
|
|
GST_DEBUG_CATEGORY_INIT (cuda_ipc_server_debug, "cudaipcserver",
|
|
0, "cudaipcserver"));
|
|
|
|
static void
|
|
gst_cuda_ipc_server_class_init (GstCudaIpcServerClass * klass)
|
|
{
|
|
GObjectClass *object_class = G_OBJECT_CLASS (klass);
|
|
|
|
object_class->finalize = gst_cuda_ipc_server_finalize;
|
|
object_class->dispose = gst_cuda_ipc_server_dispose;
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_init (GstCudaIpcServer * self)
|
|
{
|
|
self->priv = new GstCudaIpcServerPrivate ();
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_dispose (GObject * object)
|
|
{
|
|
GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object);
|
|
GstCudaIpcServerPrivate *priv = self->priv;
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
GST_DEBUG_OBJECT (self, "dispose");
|
|
|
|
g_assert (klass->terminate);
|
|
klass->terminate (self);
|
|
|
|
g_clear_pointer (&priv->loop_thread, g_thread_join);
|
|
|
|
G_OBJECT_CLASS (parent_class)->dispose (object);
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_finalize (GObject * object)
|
|
{
|
|
GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object);
|
|
|
|
GST_DEBUG_OBJECT (self, "finalize");
|
|
|
|
gst_clear_object (&self->context);
|
|
|
|
delete self->priv;
|
|
|
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
|
}
|
|
|
|
GstFlowReturn
|
|
gst_cuda_ipc_server_send_data (GstCudaIpcServer * server, GstSample * sample,
|
|
const GstVideoInfo & info, const CUipcMemHandle & handle, GstClockTime pts,
|
|
GByteArray * meta)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
GstCudaIpcServerClass *klass;
|
|
|
|
g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR);
|
|
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
|
|
|
|
if (server->ipc_mode != GST_CUDA_IPC_LEGACY) {
|
|
GST_ERROR_OBJECT (server, "Invalid call");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
priv = server->priv;
|
|
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
|
|
GST_LOG_OBJECT (server, "Sending data");
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (priv->aborted) {
|
|
GST_DEBUG_OBJECT (server, "Was aborted");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
auto data = std::make_shared < GstCudaIpcServerData > ();
|
|
data->sample = gst_sample_ref (sample);
|
|
data->info = info;
|
|
data->handle = handle;
|
|
data->pts = pts;
|
|
data->seq_num = priv->seq_num;
|
|
if (meta && meta->len) {
|
|
data->meta.resize (meta->len);
|
|
memcpy (data->meta.data (), meta->data, meta->len);
|
|
}
|
|
|
|
priv->seq_num++;
|
|
priv->data = data;
|
|
lk.unlock ();
|
|
|
|
klass->invoke (server);
|
|
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
GstFlowReturn
|
|
gst_cuda_ipc_server_send_mmap_data (GstCudaIpcServer * server,
|
|
GstSample * sample, const GstVideoInfo & info, GstCudaSharableHandle handle,
|
|
GstClockTime pts, GByteArray * meta)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
GstCudaIpcServerClass *klass;
|
|
|
|
g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR);
|
|
g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
|
|
|
|
if (server->ipc_mode != GST_CUDA_IPC_MMAP) {
|
|
GST_ERROR_OBJECT (server, "Invalid call");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
priv = server->priv;
|
|
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
|
|
GST_LOG_OBJECT (server, "Sending data");
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (priv->aborted) {
|
|
GST_DEBUG_OBJECT (server, "Was aborted");
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
auto data = std::make_shared < GstCudaIpcServerData > ();
|
|
data->sample = gst_sample_ref (sample);
|
|
data->info = info;
|
|
data->os_handle = handle;
|
|
data->pts = pts;
|
|
data->seq_num = priv->seq_num;
|
|
if (meta && meta->len) {
|
|
data->meta.resize (meta->len);
|
|
memcpy (data->meta.data (), meta->data, meta->len);
|
|
}
|
|
|
|
priv->seq_num++;
|
|
priv->data = data;
|
|
lk.unlock ();
|
|
|
|
klass->invoke (server);
|
|
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
static gpointer
|
|
gst_cuda_ipc_server_loop_thread_func (GstCudaIpcServer * self)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = self->priv;
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
g_assert (klass->loop);
|
|
|
|
GST_DEBUG_OBJECT (self, "Start loop thread");
|
|
|
|
klass->loop (self);
|
|
priv->conn_map.clear ();
|
|
|
|
GST_DEBUG_OBJECT (self, "Exit loop thread");
|
|
|
|
return nullptr;
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_run (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
|
|
g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server));
|
|
|
|
priv = server->priv;
|
|
|
|
GST_DEBUG_OBJECT (server, "Running");
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (priv->loop_thread) {
|
|
GST_DEBUG_OBJECT (server, "Already running");
|
|
return;
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (server, "Spawning thread");
|
|
priv->loop_thread = g_thread_new ("cuda-ipc-server",
|
|
(GThreadFunc) gst_cuda_ipc_server_loop_thread_func, server);
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_stop (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerPrivate *priv;
|
|
GstCudaIpcServerClass *klass;
|
|
|
|
g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server));
|
|
|
|
priv = server->priv;
|
|
klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
|
|
GST_DEBUG_OBJECT (server, "Stopping");
|
|
priv->shutdown = true;
|
|
klass->invoke (server);
|
|
|
|
g_clear_pointer (&priv->loop_thread, g_thread_join);
|
|
|
|
GST_DEBUG_OBJECT (server, "Stopped");
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_close_connection (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = self->priv;
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
GST_DEBUG_OBJECT (self, "Closing conn-id %u", conn->id);
|
|
|
|
priv->conn_map.erase (conn->id);
|
|
|
|
if (priv->shutdown && priv->conn_map.empty ()) {
|
|
GST_DEBUG_OBJECT (self, "All connection were closed");
|
|
klass->terminate (self);
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_send_msg (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
if (!klass->send_msg (self, conn)) {
|
|
GST_WARNING_OBJECT (self, "Send msg failed");
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_config_data (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCaps *caps = gst_sample_get_caps (conn->data->sample);
|
|
|
|
gst_caps_replace (&conn->caps, caps);
|
|
|
|
gst_cuda_ipc_pkt_build_config (conn->server_msg, self->pid,
|
|
self->ipc_mode == GST_CUDA_IPC_MMAP, conn->caps);
|
|
conn->type = GstCudaIpcPktType::CONFIG;
|
|
|
|
GST_LOG_OBJECT (self, "Sending CONFIG, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_send_msg (self, conn);
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_on_incoming_connection (GstCudaIpcServer * server,
|
|
std::shared_ptr < GstCudaIpcServerConn > conn)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = server->priv;
|
|
|
|
priv->lock.lock ();
|
|
conn->server = server;
|
|
conn->id = priv->next_conn_id;
|
|
conn->context = (GstCudaContext *) gst_object_ref (server->context);
|
|
conn->data = priv->data;
|
|
priv->next_conn_id++;
|
|
priv->lock.unlock ();
|
|
|
|
/* *INDENT-OFF* */
|
|
priv->conn_map.insert ({conn->id, conn});
|
|
/* *INDENT-ON* */
|
|
|
|
if (conn->data) {
|
|
conn->configured = true;
|
|
gst_cuda_ipc_server_config_data (server, conn.get ());
|
|
} else {
|
|
GST_DEBUG_OBJECT (server, "Have no config data yet, waiting for data");
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_have_data (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
GstCaps *caps;
|
|
GstBuffer *buffer;
|
|
GstCudaMemory *cmem;
|
|
|
|
if (!conn->data) {
|
|
GST_ERROR_OBJECT (self, "Have no data to send, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
return;
|
|
}
|
|
|
|
conn->pending_have_data = false;
|
|
conn->seq_num = conn->data->seq_num + 1;
|
|
|
|
caps = gst_sample_get_caps (conn->data->sample);
|
|
if (!conn->caps || !gst_caps_is_equal (conn->caps, caps)) {
|
|
GST_DEBUG_OBJECT (self, "Sending caps %" GST_PTR_FORMAT " to conn-id %u",
|
|
caps, conn->id);
|
|
gst_caps_replace (&conn->caps, caps);
|
|
} else {
|
|
caps = nullptr;
|
|
}
|
|
|
|
buffer = gst_sample_get_buffer (conn->data->sample);
|
|
cmem = (GstCudaMemory *) gst_buffer_peek_memory (buffer, 0);
|
|
|
|
if (self->ipc_mode == GST_CUDA_IPC_LEGACY) {
|
|
auto handle_dump = gst_cuda_ipc_mem_handle_to_string (conn->data->handle);
|
|
GST_LOG_OBJECT (self, "Sending HAVE-DATA with handle %s, conn-id: %u",
|
|
handle_dump.c_str (), conn->id);
|
|
|
|
if (!gst_cuda_ipc_pkt_build_have_data (conn->server_msg, conn->data->pts,
|
|
conn->data->info, conn->data->handle, caps, conn->data->meta)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't build HAVE-DATA pkt, conn-id: %u",
|
|
conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
return;
|
|
}
|
|
|
|
conn->type = GstCudaIpcPktType::HAVE_DATA;
|
|
} else {
|
|
guint max_size = cmem->mem.maxsize;
|
|
GST_LOG_OBJECT (self, "Sending HAVE-MMAP-DATA with handle %"
|
|
GST_CUDA_OS_HANDLE_FORMAT ", conn-id: %u", conn->data->os_handle,
|
|
conn->id);
|
|
if (!gst_cuda_ipc_pkt_build_have_mmap_data (conn->server_msg,
|
|
conn->data->pts, conn->data->info, max_size, conn->data->os_handle,
|
|
caps, conn->data->meta)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't build HAVE-MMAP-DATA pkt, conn-id: %u",
|
|
conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
return;
|
|
}
|
|
|
|
conn->type = GstCudaIpcPktType::HAVE_MMAP_DATA;
|
|
if (klass->send_mmap_msg) {
|
|
if (!klass->send_mmap_msg (self, conn, conn->data->os_handle)) {
|
|
GST_WARNING_OBJECT (self, "Send msg failed");
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
}
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
gst_cuda_ipc_server_send_msg (self, conn);
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_wait_msg (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self);
|
|
|
|
if (!klass->wait_msg (self, conn)) {
|
|
GST_WARNING_OBJECT (self, "Wait msg failed, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (self, conn);
|
|
}
|
|
}
|
|
|
|
static bool
|
|
gst_cuda_ipc_server_on_release_data (GstCudaIpcServer * self,
|
|
GstCudaIpcServerConn * conn)
|
|
{
|
|
bool found = false;
|
|
|
|
if (self->ipc_mode == GST_CUDA_IPC_LEGACY) {
|
|
CUipcMemHandle handle;
|
|
if (!gst_cuda_ipc_pkt_parse_release_data (conn->client_msg, handle)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-DATA, conn-id: %u",
|
|
conn->id);
|
|
return false;
|
|
}
|
|
|
|
auto handle_dump = gst_cuda_ipc_mem_handle_to_string (handle);
|
|
GST_LOG_OBJECT (self, "RELEASE-DATA %s, conn-id: %u",
|
|
handle_dump.c_str (), conn->id);
|
|
|
|
for (auto it = conn->peer_handles.begin (); it != conn->peer_handles.end ();
|
|
it++) {
|
|
CUipcMemHandle & tmp = (*it)->handle;
|
|
if (gst_cuda_ipc_handle_is_equal (tmp, handle)) {
|
|
found = true;
|
|
conn->peer_handles.erase (it);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!found) {
|
|
GST_WARNING_OBJECT (self,
|
|
"Unexpected memory handle to remove %s, conn-id: %u",
|
|
handle_dump.c_str (), conn->id);
|
|
return false;
|
|
}
|
|
} else {
|
|
GstCudaSharableHandle handle;
|
|
if (!gst_cuda_ipc_pkt_parse_release_mmap_data (conn->client_msg, &handle)) {
|
|
GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-MMAP-DATA, conn-id: %u",
|
|
conn->id);
|
|
return false;
|
|
}
|
|
|
|
GST_LOG_OBJECT (self, "RELEASE-MMAP-DATA %" GST_CUDA_OS_HANDLE_FORMAT
|
|
", conn-id %u", handle, conn->id);
|
|
|
|
for (auto it = conn->peer_handles.begin (); it != conn->peer_handles.end ();
|
|
it++) {
|
|
if ((*it)->os_handle == handle) {
|
|
found = true;
|
|
conn->peer_handles.erase (it);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!found) {
|
|
GST_WARNING_OBJECT (self,
|
|
"Unexpected memory handle to remove %" GST_CUDA_OS_HANDLE_FORMAT
|
|
", conn-id: %u", handle, conn->id);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
GST_LOG_OBJECT (self, "Client is holding %" G_GSIZE_FORMAT " handles",
|
|
conn->peer_handles.size ());
|
|
|
|
return true;
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_wait_msg_finish (GstCudaIpcServer * server,
|
|
GstCudaIpcServerConn * conn, bool result)
|
|
{
|
|
GstCudaIpcPacketHeader header;
|
|
|
|
if (!result) {
|
|
GST_WARNING_OBJECT (server, "Wait msg failed, conn->id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
if (!gst_cuda_ipc_pkt_identify (conn->client_msg, header)) {
|
|
GST_ERROR_OBJECT (server, "Broken header, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
switch (header.type) {
|
|
case GstCudaIpcPktType::NEED_DATA:
|
|
GST_LOG_OBJECT (server, "NEED-DATA, conn-id: %u", conn->id);
|
|
if (!conn->data) {
|
|
GST_LOG_OBJECT (server, "Wait for available data, conn-id: %u",
|
|
conn->id);
|
|
conn->pending_have_data = true;
|
|
gst_cuda_ipc_server_on_idle (server);
|
|
return;
|
|
}
|
|
gst_cuda_ipc_server_have_data (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::READ_DONE:
|
|
GST_LOG_OBJECT (server, "READ-DONE, conn-id: %u", conn->id);
|
|
|
|
if (!conn->data) {
|
|
GST_ERROR_OBJECT (server, "Unexpected READ-DATA, conn-id: %u",
|
|
conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
conn->peer_handles.push_back (conn->data);
|
|
conn->data = nullptr;
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::RELEASE_DATA:
|
|
case GstCudaIpcPktType::RELEASE_MMAP_DATA:
|
|
GST_LOG_OBJECT (server, "RELEASE-DATA, conn-id: %u", conn->id);
|
|
if (!gst_cuda_ipc_server_on_release_data (server, conn))
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
else
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::FIN:
|
|
GST_DEBUG_OBJECT (server, "FIN, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
break;
|
|
default:
|
|
GST_ERROR_OBJECT (server, "Unexpected packet, conn-id: %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
break;
|
|
}
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_send_msg_finish (GstCudaIpcServer * server,
|
|
GstCudaIpcServerConn * conn, bool result)
|
|
{
|
|
if (!result) {
|
|
GST_WARNING_OBJECT (server, "Send msg failed, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
return;
|
|
}
|
|
|
|
switch (conn->type) {
|
|
case GstCudaIpcPktType::CONFIG:
|
|
GST_DEBUG_OBJECT (server, "Sent CONFIG-DATA, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::HAVE_DATA:
|
|
GST_LOG_OBJECT (server, "Sent HAVE-DATA, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::HAVE_MMAP_DATA:
|
|
GST_LOG_OBJECT (server, "Sent HAVE-MMAP-DATA, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
case GstCudaIpcPktType::EOS:
|
|
GST_DEBUG_OBJECT (server, "Sent EOS, conn-id %u", conn->id);
|
|
gst_cuda_ipc_server_wait_msg (server, conn);
|
|
break;
|
|
default:
|
|
GST_ERROR_OBJECT (server, "Unexpected msg type %d", (gint) conn->type);
|
|
gst_cuda_ipc_server_close_connection (server, conn);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static void
|
|
gst_cuda_ipc_server_eos (GstCudaIpcServer * self, GstCudaIpcServerConn * conn)
|
|
{
|
|
gst_cuda_ipc_pkt_build_eos (conn->server_msg);
|
|
conn->eos = true;
|
|
conn->type = GstCudaIpcPktType::EOS;
|
|
|
|
gst_cuda_ipc_server_send_msg (self, conn);
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_on_idle (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (server);
|
|
GstCudaIpcServerPrivate *priv = server->priv;
|
|
|
|
GST_LOG_OBJECT (server, "idle");
|
|
|
|
if (priv->shutdown) {
|
|
GST_DEBUG_OBJECT (server, "We are stopping");
|
|
|
|
if (priv->conn_map.empty ()) {
|
|
GST_DEBUG_OBJECT (server, "All connections were closed");
|
|
klass->terminate (server);
|
|
return;
|
|
}
|
|
|
|
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_eos;
|
|
/* *INDENT-OFF* */
|
|
for (auto it : priv->conn_map) {
|
|
auto conn = it.second;
|
|
if (conn->eos || !conn->pending_have_data)
|
|
continue;
|
|
|
|
to_send_eos.push_back (conn);
|
|
}
|
|
|
|
for (auto it : to_send_eos) {
|
|
GST_DEBUG_OBJECT (server, "Sending EOS to conn-id: %u", it->id);
|
|
gst_cuda_ipc_server_eos (server, it.get ());
|
|
}
|
|
|
|
GST_DEBUG_OBJECT (server, "Have %" G_GSIZE_FORMAT " alive connections",
|
|
priv->conn_map.size());
|
|
|
|
size_t num_closed = 0;
|
|
for (auto it : priv->conn_map) {
|
|
auto conn = it.second;
|
|
GST_DEBUG_OBJECT (server, "conn-id %u"
|
|
" peer handle size %" G_GSIZE_FORMAT, conn->id,
|
|
conn->peer_handles.size ());
|
|
|
|
/* Cannot erase conn since it's still referenced.
|
|
* Manually close connection */
|
|
if (conn->peer_handles.empty ()) {
|
|
conn->CloseConn ();
|
|
num_closed++;
|
|
}
|
|
}
|
|
/* *INDENT-ON* */
|
|
|
|
if (priv->conn_map.size () == num_closed) {
|
|
GST_DEBUG_OBJECT (server, "All connections were closed");
|
|
klass->terminate (server);
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
if (priv->conn_map.empty ()) {
|
|
GST_LOG_OBJECT (server, "Have no connection");
|
|
return;
|
|
}
|
|
|
|
std::unique_lock < std::mutex > lk (priv->lock);
|
|
if (!priv->data)
|
|
return;
|
|
|
|
/* *INDENT-OFF* */
|
|
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_config_data;
|
|
std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_have_data;
|
|
for (auto it : priv->conn_map) {
|
|
auto conn = it.second;
|
|
if (!conn->configured) {
|
|
conn->configured = true;
|
|
conn->data = priv->data;
|
|
to_config_data.push_back (conn);
|
|
} else if (conn->pending_have_data && conn->seq_num <= priv->data->seq_num) {
|
|
conn->data = priv->data;
|
|
to_send_have_data.push_back (conn);
|
|
}
|
|
}
|
|
lk.unlock ();
|
|
|
|
for (auto it: to_config_data)
|
|
gst_cuda_ipc_server_config_data (server, it.get ());
|
|
|
|
for (auto it: to_send_have_data)
|
|
gst_cuda_ipc_server_have_data (server, it.get ());
|
|
/* *INDENT-ON* */
|
|
}
|
|
|
|
void
|
|
gst_cuda_ipc_server_abort (GstCudaIpcServer * server)
|
|
{
|
|
GstCudaIpcServerPrivate *priv = server->priv;
|
|
|
|
priv->aborted = true;
|
|
}
|