/* GStreamer * Copyright (C) 2023 Seungha Yang * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "gstcudaipcserver.h" #include #include #include #include GST_DEBUG_CATEGORY (cuda_ipc_server_debug); #define GST_CAT_DEFAULT cuda_ipc_server_debug GType gst_cuda_ipc_mode_get_type (void) { static GType type = 0; static const GEnumValue ipc_modes[] = { {GST_CUDA_IPC_LEGACY, "Legacy IPC mode", "legacy"}, {GST_CUDA_IPC_MMAP, "Memory Map", "mmap"}, {0, nullptr, nullptr} }; GST_CUDA_CALL_ONCE_BEGIN { type = g_enum_register_static ("GstCudaIpcMode", ipc_modes); } GST_CUDA_CALL_ONCE_END; return type; } struct GstCudaIpcServerPrivate { GstCudaIpcServerPrivate () { shutdown = false; aborted = false; } std::mutex lock; guint64 seq_num = 0; guint next_conn_id = 0; std::unordered_map < guint, std::shared_ptr < GstCudaIpcServerConn >> conn_map; GThread *loop_thread = nullptr; std::atomic < bool >shutdown; std::atomic < bool >aborted; std::shared_ptr < GstCudaIpcServerData > data; }; static void gst_cuda_ipc_server_dispose (GObject * object); static void gst_cuda_ipc_server_finalize (GObject * object); #define gst_cuda_ipc_server_parent_class parent_class G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GstCudaIpcServer, gst_cuda_ipc_server, GST_TYPE_OBJECT, GST_DEBUG_CATEGORY_INIT (cuda_ipc_server_debug, "cudaipcserver", 0, "cudaipcserver")); static void gst_cuda_ipc_server_class_init (GstCudaIpcServerClass * klass) { GObjectClass *object_class = G_OBJECT_CLASS (klass); object_class->finalize = gst_cuda_ipc_server_finalize; object_class->dispose = gst_cuda_ipc_server_dispose; } static void gst_cuda_ipc_server_init (GstCudaIpcServer * self) { self->priv = new GstCudaIpcServerPrivate (); } static void gst_cuda_ipc_server_dispose (GObject * object) { GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object); GstCudaIpcServerPrivate *priv = self->priv; GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self); GST_DEBUG_OBJECT (self, "dispose"); g_assert (klass->terminate); klass->terminate (self); g_clear_pointer (&priv->loop_thread, g_thread_join); G_OBJECT_CLASS (parent_class)->dispose (object); } static void gst_cuda_ipc_server_finalize (GObject * object) { GstCudaIpcServer *self = GST_CUDA_IPC_SERVER (object); GST_DEBUG_OBJECT (self, "finalize"); gst_clear_object (&self->context); delete self->priv; G_OBJECT_CLASS (parent_class)->finalize (object); } GstFlowReturn gst_cuda_ipc_server_send_data (GstCudaIpcServer * server, GstSample * sample, const GstVideoInfo & info, const CUipcMemHandle & handle, GstClockTime pts, GByteArray * meta) { GstCudaIpcServerPrivate *priv; GstCudaIpcServerClass *klass; g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR); if (server->ipc_mode != GST_CUDA_IPC_LEGACY) { GST_ERROR_OBJECT (server, "Invalid call"); return GST_FLOW_ERROR; } priv = server->priv; klass = GST_CUDA_IPC_SERVER_GET_CLASS (server); GST_LOG_OBJECT (server, "Sending data"); std::unique_lock < std::mutex > lk (priv->lock); if (priv->aborted) { GST_DEBUG_OBJECT (server, "Was aborted"); return GST_FLOW_ERROR; } auto data = std::make_shared < GstCudaIpcServerData > (); data->sample = gst_sample_ref (sample); data->info = info; data->handle = handle; data->pts = pts; data->seq_num = priv->seq_num; if (meta && meta->len) { data->meta.resize (meta->len); memcpy (data->meta.data (), meta->data, meta->len); } priv->seq_num++; priv->data = data; lk.unlock (); klass->invoke (server); return GST_FLOW_OK; } GstFlowReturn gst_cuda_ipc_server_send_mmap_data (GstCudaIpcServer * server, GstSample * sample, const GstVideoInfo & info, GstCudaSharableHandle handle, GstClockTime pts, GByteArray * meta) { GstCudaIpcServerPrivate *priv; GstCudaIpcServerClass *klass; g_return_val_if_fail (GST_IS_CUDA_IPC_SERVER (server), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR); if (server->ipc_mode != GST_CUDA_IPC_MMAP) { GST_ERROR_OBJECT (server, "Invalid call"); return GST_FLOW_ERROR; } priv = server->priv; klass = GST_CUDA_IPC_SERVER_GET_CLASS (server); GST_LOG_OBJECT (server, "Sending data"); std::unique_lock < std::mutex > lk (priv->lock); if (priv->aborted) { GST_DEBUG_OBJECT (server, "Was aborted"); return GST_FLOW_ERROR; } auto data = std::make_shared < GstCudaIpcServerData > (); data->sample = gst_sample_ref (sample); data->info = info; data->os_handle = handle; data->pts = pts; data->seq_num = priv->seq_num; if (meta && meta->len) { data->meta.resize (meta->len); memcpy (data->meta.data (), meta->data, meta->len); } priv->seq_num++; priv->data = data; lk.unlock (); klass->invoke (server); return GST_FLOW_OK; } static gpointer gst_cuda_ipc_server_loop_thread_func (GstCudaIpcServer * self) { GstCudaIpcServerPrivate *priv = self->priv; GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self); g_assert (klass->loop); GST_DEBUG_OBJECT (self, "Start loop thread"); klass->loop (self); priv->conn_map.clear (); GST_DEBUG_OBJECT (self, "Exit loop thread"); return nullptr; } void gst_cuda_ipc_server_run (GstCudaIpcServer * server) { GstCudaIpcServerPrivate *priv; g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server)); priv = server->priv; GST_DEBUG_OBJECT (server, "Running"); std::unique_lock < std::mutex > lk (priv->lock); if (priv->loop_thread) { GST_DEBUG_OBJECT (server, "Already running"); return; } GST_DEBUG_OBJECT (server, "Spawning thread"); priv->loop_thread = g_thread_new ("cuda-ipc-server", (GThreadFunc) gst_cuda_ipc_server_loop_thread_func, server); } void gst_cuda_ipc_server_stop (GstCudaIpcServer * server) { GstCudaIpcServerPrivate *priv; GstCudaIpcServerClass *klass; g_return_if_fail (GST_IS_CUDA_IPC_SERVER (server)); priv = server->priv; klass = GST_CUDA_IPC_SERVER_GET_CLASS (server); GST_DEBUG_OBJECT (server, "Stopping"); priv->shutdown = true; klass->invoke (server); g_clear_pointer (&priv->loop_thread, g_thread_join); GST_DEBUG_OBJECT (server, "Stopped"); } static void gst_cuda_ipc_server_close_connection (GstCudaIpcServer * self, GstCudaIpcServerConn * conn) { GstCudaIpcServerPrivate *priv = self->priv; GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self); GST_DEBUG_OBJECT (self, "Closing conn-id %u", conn->id); priv->conn_map.erase (conn->id); if (priv->shutdown && priv->conn_map.empty ()) { GST_DEBUG_OBJECT (self, "All connection were closed"); klass->terminate (self); } } static void gst_cuda_ipc_server_send_msg (GstCudaIpcServer * self, GstCudaIpcServerConn * conn) { GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self); if (!klass->send_msg (self, conn)) { GST_WARNING_OBJECT (self, "Send msg failed"); gst_cuda_ipc_server_close_connection (self, conn); } } static void gst_cuda_ipc_server_config_data (GstCudaIpcServer * self, GstCudaIpcServerConn * conn) { GstCaps *caps = gst_sample_get_caps (conn->data->sample); gst_caps_replace (&conn->caps, caps); gst_cuda_ipc_pkt_build_config (conn->server_msg, self->pid, self->ipc_mode == GST_CUDA_IPC_MMAP, conn->caps); conn->type = GstCudaIpcPktType::CONFIG; GST_LOG_OBJECT (self, "Sending CONFIG, conn-id %u", conn->id); gst_cuda_ipc_server_send_msg (self, conn); } void gst_cuda_ipc_server_on_incoming_connection (GstCudaIpcServer * server, std::shared_ptr < GstCudaIpcServerConn > conn) { GstCudaIpcServerPrivate *priv = server->priv; priv->lock.lock (); conn->server = server; conn->id = priv->next_conn_id; conn->context = (GstCudaContext *) gst_object_ref (server->context); conn->data = priv->data; priv->next_conn_id++; priv->lock.unlock (); /* *INDENT-OFF* */ priv->conn_map.insert ({conn->id, conn}); /* *INDENT-ON* */ if (conn->data) { conn->configured = true; gst_cuda_ipc_server_config_data (server, conn.get ()); } else { GST_DEBUG_OBJECT (server, "Have no config data yet, waiting for data"); } } static void gst_cuda_ipc_server_have_data (GstCudaIpcServer * self, GstCudaIpcServerConn * conn) { GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self); GstCaps *caps; GstBuffer *buffer; GstCudaMemory *cmem; if (!conn->data) { GST_ERROR_OBJECT (self, "Have no data to send, conn-id: %u", conn->id); gst_cuda_ipc_server_close_connection (self, conn); return; } conn->pending_have_data = false; conn->seq_num = conn->data->seq_num + 1; caps = gst_sample_get_caps (conn->data->sample); if (!conn->caps || !gst_caps_is_equal (conn->caps, caps)) { GST_DEBUG_OBJECT (self, "Sending caps %" GST_PTR_FORMAT " to conn-id %u", caps, conn->id); gst_caps_replace (&conn->caps, caps); } else { caps = nullptr; } buffer = gst_sample_get_buffer (conn->data->sample); cmem = (GstCudaMemory *) gst_buffer_peek_memory (buffer, 0); if (self->ipc_mode == GST_CUDA_IPC_LEGACY) { auto handle_dump = gst_cuda_ipc_mem_handle_to_string (conn->data->handle); GST_LOG_OBJECT (self, "Sending HAVE-DATA with handle %s, conn-id: %u", handle_dump.c_str (), conn->id); if (!gst_cuda_ipc_pkt_build_have_data (conn->server_msg, conn->data->pts, conn->data->info, conn->data->handle, caps, conn->data->meta)) { GST_ERROR_OBJECT (self, "Couldn't build HAVE-DATA pkt, conn-id: %u", conn->id); gst_cuda_ipc_server_close_connection (self, conn); return; } conn->type = GstCudaIpcPktType::HAVE_DATA; } else { guint max_size = cmem->mem.maxsize; GST_LOG_OBJECT (self, "Sending HAVE-MMAP-DATA with handle %" GST_CUDA_OS_HANDLE_FORMAT ", conn-id: %u", conn->data->os_handle, conn->id); if (!gst_cuda_ipc_pkt_build_have_mmap_data (conn->server_msg, conn->data->pts, conn->data->info, max_size, conn->data->os_handle, caps, conn->data->meta)) { GST_ERROR_OBJECT (self, "Couldn't build HAVE-MMAP-DATA pkt, conn-id: %u", conn->id); gst_cuda_ipc_server_close_connection (self, conn); return; } conn->type = GstCudaIpcPktType::HAVE_MMAP_DATA; if (klass->send_mmap_msg) { if (!klass->send_mmap_msg (self, conn, conn->data->os_handle)) { GST_WARNING_OBJECT (self, "Send msg failed"); gst_cuda_ipc_server_close_connection (self, conn); } return; } } gst_cuda_ipc_server_send_msg (self, conn); } static void gst_cuda_ipc_server_wait_msg (GstCudaIpcServer * self, GstCudaIpcServerConn * conn) { GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (self); if (!klass->wait_msg (self, conn)) { GST_WARNING_OBJECT (self, "Wait msg failed, conn-id: %u", conn->id); gst_cuda_ipc_server_close_connection (self, conn); } } static bool gst_cuda_ipc_server_on_release_data (GstCudaIpcServer * self, GstCudaIpcServerConn * conn) { bool found = false; if (self->ipc_mode == GST_CUDA_IPC_LEGACY) { CUipcMemHandle handle; if (!gst_cuda_ipc_pkt_parse_release_data (conn->client_msg, handle)) { GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-DATA, conn-id: %u", conn->id); return false; } auto handle_dump = gst_cuda_ipc_mem_handle_to_string (handle); GST_LOG_OBJECT (self, "RELEASE-DATA %s, conn-id: %u", handle_dump.c_str (), conn->id); for (auto it = conn->peer_handles.begin (); it != conn->peer_handles.end (); it++) { CUipcMemHandle & tmp = (*it)->handle; if (gst_cuda_ipc_handle_is_equal (tmp, handle)) { found = true; conn->peer_handles.erase (it); break; } } if (!found) { GST_WARNING_OBJECT (self, "Unexpected memory handle to remove %s, conn-id: %u", handle_dump.c_str (), conn->id); return false; } } else { GstCudaSharableHandle handle; if (!gst_cuda_ipc_pkt_parse_release_mmap_data (conn->client_msg, &handle)) { GST_ERROR_OBJECT (self, "Couldn't parse RELEASE-MMAP-DATA, conn-id: %u", conn->id); return false; } GST_LOG_OBJECT (self, "RELEASE-MMAP-DATA %" GST_CUDA_OS_HANDLE_FORMAT ", conn-id %u", handle, conn->id); for (auto it = conn->peer_handles.begin (); it != conn->peer_handles.end (); it++) { if ((*it)->os_handle == handle) { found = true; conn->peer_handles.erase (it); break; } } if (!found) { GST_WARNING_OBJECT (self, "Unexpected memory handle to remove %" GST_CUDA_OS_HANDLE_FORMAT ", conn-id: %u", handle, conn->id); return false; } } GST_LOG_OBJECT (self, "Client is holding %" G_GSIZE_FORMAT " handles", conn->peer_handles.size ()); return true; } void gst_cuda_ipc_server_wait_msg_finish (GstCudaIpcServer * server, GstCudaIpcServerConn * conn, bool result) { GstCudaIpcPacketHeader header; if (!result) { GST_WARNING_OBJECT (server, "Wait msg failed, conn->id: %u", conn->id); gst_cuda_ipc_server_close_connection (server, conn); return; } if (!gst_cuda_ipc_pkt_identify (conn->client_msg, header)) { GST_ERROR_OBJECT (server, "Broken header, conn-id: %u", conn->id); gst_cuda_ipc_server_close_connection (server, conn); return; } switch (header.type) { case GstCudaIpcPktType::NEED_DATA: GST_LOG_OBJECT (server, "NEED-DATA, conn-id: %u", conn->id); if (!conn->data) { GST_LOG_OBJECT (server, "Wait for available data, conn-id: %u", conn->id); conn->pending_have_data = true; gst_cuda_ipc_server_on_idle (server); return; } gst_cuda_ipc_server_have_data (server, conn); break; case GstCudaIpcPktType::READ_DONE: GST_LOG_OBJECT (server, "READ-DONE, conn-id: %u", conn->id); if (!conn->data) { GST_ERROR_OBJECT (server, "Unexpected READ-DATA, conn-id: %u", conn->id); gst_cuda_ipc_server_close_connection (server, conn); return; } conn->peer_handles.push_back (conn->data); conn->data = nullptr; gst_cuda_ipc_server_wait_msg (server, conn); break; case GstCudaIpcPktType::RELEASE_DATA: case GstCudaIpcPktType::RELEASE_MMAP_DATA: GST_LOG_OBJECT (server, "RELEASE-DATA, conn-id: %u", conn->id); if (!gst_cuda_ipc_server_on_release_data (server, conn)) gst_cuda_ipc_server_close_connection (server, conn); else gst_cuda_ipc_server_wait_msg (server, conn); break; case GstCudaIpcPktType::FIN: GST_DEBUG_OBJECT (server, "FIN, conn-id %u", conn->id); gst_cuda_ipc_server_close_connection (server, conn); break; default: GST_ERROR_OBJECT (server, "Unexpected packet, conn-id: %u", conn->id); gst_cuda_ipc_server_close_connection (server, conn); break; } } void gst_cuda_ipc_server_send_msg_finish (GstCudaIpcServer * server, GstCudaIpcServerConn * conn, bool result) { if (!result) { GST_WARNING_OBJECT (server, "Send msg failed, conn-id %u", conn->id); gst_cuda_ipc_server_close_connection (server, conn); return; } switch (conn->type) { case GstCudaIpcPktType::CONFIG: GST_DEBUG_OBJECT (server, "Sent CONFIG-DATA, conn-id %u", conn->id); gst_cuda_ipc_server_wait_msg (server, conn); break; case GstCudaIpcPktType::HAVE_DATA: GST_LOG_OBJECT (server, "Sent HAVE-DATA, conn-id %u", conn->id); gst_cuda_ipc_server_wait_msg (server, conn); break; case GstCudaIpcPktType::HAVE_MMAP_DATA: GST_LOG_OBJECT (server, "Sent HAVE-MMAP-DATA, conn-id %u", conn->id); gst_cuda_ipc_server_wait_msg (server, conn); break; case GstCudaIpcPktType::EOS: GST_DEBUG_OBJECT (server, "Sent EOS, conn-id %u", conn->id); gst_cuda_ipc_server_wait_msg (server, conn); break; default: GST_ERROR_OBJECT (server, "Unexpected msg type %d", (gint) conn->type); gst_cuda_ipc_server_close_connection (server, conn); break; } } static void gst_cuda_ipc_server_eos (GstCudaIpcServer * self, GstCudaIpcServerConn * conn) { gst_cuda_ipc_pkt_build_eos (conn->server_msg); conn->eos = true; conn->type = GstCudaIpcPktType::EOS; gst_cuda_ipc_server_send_msg (self, conn); } void gst_cuda_ipc_server_on_idle (GstCudaIpcServer * server) { GstCudaIpcServerClass *klass = GST_CUDA_IPC_SERVER_GET_CLASS (server); GstCudaIpcServerPrivate *priv = server->priv; GST_LOG_OBJECT (server, "idle"); if (priv->shutdown) { GST_DEBUG_OBJECT (server, "We are stopping"); if (priv->conn_map.empty ()) { GST_DEBUG_OBJECT (server, "All connections were closed"); klass->terminate (server); return; } std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_eos; /* *INDENT-OFF* */ for (auto it : priv->conn_map) { auto conn = it.second; if (conn->eos || !conn->pending_have_data) continue; to_send_eos.push_back (conn); } for (auto it : to_send_eos) { GST_DEBUG_OBJECT (server, "Sending EOS to conn-id: %u", it->id); gst_cuda_ipc_server_eos (server, it.get ()); } GST_DEBUG_OBJECT (server, "Have %" G_GSIZE_FORMAT " alive connections", priv->conn_map.size()); size_t num_closed = 0; for (auto it : priv->conn_map) { auto conn = it.second; GST_DEBUG_OBJECT (server, "conn-id %u" " peer handle size %" G_GSIZE_FORMAT, conn->id, conn->peer_handles.size ()); /* Cannot erase conn since it's still referenced. * Manually close connection */ if (conn->peer_handles.empty ()) { conn->CloseConn (); num_closed++; } } /* *INDENT-ON* */ if (priv->conn_map.size () == num_closed) { GST_DEBUG_OBJECT (server, "All connections were closed"); klass->terminate (server); } return; } if (priv->conn_map.empty ()) { GST_LOG_OBJECT (server, "Have no connection"); return; } std::unique_lock < std::mutex > lk (priv->lock); if (!priv->data) return; /* *INDENT-OFF* */ std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_config_data; std::vector < std::shared_ptr < GstCudaIpcServerConn >> to_send_have_data; for (auto it : priv->conn_map) { auto conn = it.second; if (!conn->configured) { conn->configured = true; conn->data = priv->data; to_config_data.push_back (conn); } else if (conn->pending_have_data && conn->seq_num <= priv->data->seq_num) { conn->data = priv->data; to_send_have_data.push_back (conn); } } lk.unlock (); for (auto it: to_config_data) gst_cuda_ipc_server_config_data (server, it.get ()); for (auto it: to_send_have_data) gst_cuda_ipc_server_have_data (server, it.get ()); /* *INDENT-ON* */ } void gst_cuda_ipc_server_abort (GstCudaIpcServer * server) { GstCudaIpcServerPrivate *priv = server->priv; priv->aborted = true; }