gstreamer/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeclient.cpp
Seungha Yang c6a6c56cdf win32ipc: Add support for zero-copy rendering
* Extend protocol so that client can notify of releasing shared memory
* Server will hold shared memory object until it's released by client
* Add allocator/buffer pool to reuse shared memory objects and buffers

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3765>
2023-01-27 18:55:13 +00:00

570 lines
15 KiB
C++

/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*/
#include "win32ipcpipeclient.h"
#include "win32ipcutils.h"
#include <mutex>
#include <condition_variable>
#include <memory>
#include <thread>
#include <queue>
#include <string>
GST_DEBUG_CATEGORY_EXTERN (gst_win32_ipc_debug);
#define GST_CAT_DEFAULT gst_win32_ipc_debug
#define CONN_BUFFER_SIZE 1024
struct MmfInfo
{
Win32IpcMmf *mmf;
Win32IpcVideoInfo info;
};
struct ClientConnection : public OVERLAPPED
{
ClientConnection () : pipe (INVALID_HANDLE_VALUE), to_read (0), to_write (0),
seq_num (0)
{
OVERLAPPED *parent = dynamic_cast<OVERLAPPED *> (this);
parent->Internal = 0;
parent->InternalHigh = 0;
parent->Offset = 0;
parent->OffsetHigh = 0;
}
Win32IpcPipeClient *self;
HANDLE pipe;
UINT8 client_msg[CONN_BUFFER_SIZE];
UINT32 to_read;
UINT8 server_msg[CONN_BUFFER_SIZE];
UINT32 to_write;
UINT64 seq_num;
};
struct Win32IpcPipeClient
{
explicit Win32IpcPipeClient (const std::string & n)
: name (n), ref_count(1), last_err (ERROR_SUCCESS), flushing (FALSE)
, stopped (FALSE), io_pending (FALSE)
{
release_event = CreateEventA (nullptr, FALSE, FALSE, nullptr);
cancellable = CreateEventA (nullptr, TRUE, FALSE, nullptr);
conn.pipe = INVALID_HANDLE_VALUE;
conn.self = this;
}
~Win32IpcPipeClient ()
{
GST_DEBUG ("Free client %p", this);
SetEvent (cancellable);
if (thread) {
thread->join ();
thread = nullptr;
}
last_err = ERROR_OPERATION_ABORTED;
while (!queue.empty ()) {
MmfInfo info = queue.front ();
queue.pop ();
win32_ipc_mmf_unref (info.mmf);
}
CloseHandle (release_event);
CloseHandle (cancellable);
}
std::mutex lock;
std::condition_variable cond;
std::unique_ptr<std::thread> thread;
std::queue<MmfInfo> queue;
std::queue<std::string> unused_mmf;
std::string name;
ULONG ref_count;
HANDLE release_event;
HANDLE cancellable;
UINT last_err;
BOOL flushing;
BOOL stopped;
BOOL io_pending;
ClientConnection conn;
};
static DWORD
win32_ipc_pipe_client_send_need_data_async (Win32IpcPipeClient * self);
static DWORD
win32_ipc_pipe_client_send_release_data_async (Win32IpcPipeClient * self,
const char * mmf_name);
static VOID WINAPI
win32_ipc_pipe_client_send_finish (DWORD error_code, DWORD n_bytes,
LPOVERLAPPED overlapped)
{
ClientConnection *conn = (ClientConnection *) overlapped;
Win32IpcPipeClient *self = conn->self;
std::string unused_mmf;
if (error_code != ERROR_SUCCESS) {
std::string msg = win32_ipc_error_message (error_code);
self->last_err = error_code;
GST_WARNING ("Failed with 0x%x (%s)", self->last_err, msg.c_str ());
goto error;
}
self->lock.lock ();
if (!self->unused_mmf.empty ()) {
unused_mmf = self->unused_mmf.front ();
self->unused_mmf.pop ();
}
self->lock.unlock ();
if (unused_mmf.size () > 0) {
self->last_err = win32_ipc_pipe_client_send_release_data_async (self,
unused_mmf.c_str ());
if (self->last_err != ERROR_SUCCESS)
goto error;
return;
}
/* Don't request data anymore if we are stopped, but keep connection
* to send release data message later */
if (self->stopped) {
GST_DEBUG ("We are stopped");
self->io_pending = FALSE;
return;
}
self->last_err = win32_ipc_pipe_client_send_need_data_async (self);
if (self->last_err != ERROR_SUCCESS)
goto error;
/* All done, back to need-data state */
return;
error:
SetEvent (self->cancellable);
}
static DWORD
win32_ipc_pipe_client_send_release_data_async (Win32IpcPipeClient * self,
const char * mmf_name)
{
ClientConnection *conn = &self->conn;
conn->to_write = win32_ipc_pkt_build_release_data (conn->client_msg,
CONN_BUFFER_SIZE, conn->seq_num, mmf_name);
if (conn->to_write == 0) {
GST_ERROR ("Couldn't build RELEASE-DATA pkt");
return ERROR_BAD_FORMAT;
}
GST_TRACE ("Sending RELEASE-DATA");
if (!WriteFileEx (conn->pipe, conn->client_msg, conn->to_write,
(OVERLAPPED *) conn, win32_ipc_pipe_client_send_finish)) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("WriteFileEx failed with 0x%x (%s)", last_err, msg.c_str ());
return last_err;
}
return ERROR_SUCCESS;
}
static DWORD
win32_ipc_pipe_client_send_read_done_async (Win32IpcPipeClient * self)
{
ClientConnection *conn = &self->conn;
conn->to_write = win32_ipc_pkt_build_read_done (conn->client_msg,
CONN_BUFFER_SIZE, conn->seq_num);
if (conn->to_write == 0) {
GST_ERROR ("Couldn't build READ-DONE pkt");
return ERROR_BAD_FORMAT;
}
GST_TRACE ("Sending READ-DONE");
if (!WriteFileEx (conn->pipe, conn->client_msg, conn->to_write,
(OVERLAPPED *) conn, win32_ipc_pipe_client_send_finish)) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("WriteFileEx failed with 0x%x (%s)", last_err, msg.c_str ());
return last_err;
}
return ERROR_SUCCESS;
}
static VOID WINAPI
win32_ipc_pipe_client_receive_have_data_finish (DWORD error_code, DWORD n_bytes,
LPOVERLAPPED overlapped)
{
ClientConnection *conn = (ClientConnection *) overlapped;
Win32IpcPipeClient *self = conn->self;
char mmf_name[1024] = { '\0', };
Win32IpcVideoInfo info;
Win32IpcMmf *mmf;
MmfInfo minfo;
if (error_code != ERROR_SUCCESS) {
std::string msg = win32_ipc_error_message (error_code);
self->last_err = error_code;
GST_WARNING ("HAVE-DATA failed with 0x%x (%s)",
self->last_err, msg.c_str ());
goto error;
}
if (!win32_ipc_pkt_parse_have_data (conn->server_msg, n_bytes,
&conn->seq_num, mmf_name, &info)) {
self->last_err = ERROR_BAD_FORMAT;
GST_WARNING ("Couldn't parse HAVE-DATA pkt");
goto error;
}
mmf = win32_ipc_mmf_open (info.size, mmf_name);
if (!mmf) {
GST_ERROR ("Couldn't open file %s", mmf_name);
self->last_err = ERROR_BAD_FORMAT;
goto error;
}
GST_TRACE ("Got HAVE-DATA %s", mmf_name);
minfo.mmf = mmf;
minfo.info = info;
{
std::lock_guard<std::mutex> lk (self->lock);
/* Drops too old data */
while (self->queue.size () > 5) {
MmfInfo info = self->queue.front ();
self->queue.pop ();
win32_ipc_mmf_unref (info.mmf);
}
self->queue.push (minfo);
self->cond.notify_all ();
}
self->last_err = win32_ipc_pipe_client_send_read_done_async (self);
if (self->last_err != ERROR_SUCCESS)
goto error;
return;
error:
SetEvent (self->cancellable);
}
static DWORD
win32_ipc_pipe_client_receive_have_data_async (Win32IpcPipeClient * self)
{
ClientConnection *conn = &self->conn;
GST_TRACE ("Waiting HAVE-DATA");
if (!ReadFileEx (conn->pipe, conn->server_msg, CONN_BUFFER_SIZE,
(OVERLAPPED *) conn, win32_ipc_pipe_client_receive_have_data_finish)) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("ReadFileEx failed with 0x%x (%s)", last_err, msg.c_str ());
return last_err;
}
return ERROR_SUCCESS;
}
static VOID WINAPI
pipe_clinet_send_need_data_finish (DWORD error_code, DWORD n_bytes,
LPOVERLAPPED overlapped)
{
ClientConnection *conn = (ClientConnection *) overlapped;
Win32IpcPipeClient *self = conn->self;
if (error_code != ERROR_SUCCESS) {
std::string msg = win32_ipc_error_message (error_code);
self->last_err = error_code;
GST_WARNING ("NEED-DATA failed with 0x%x (%s)",
self->last_err, msg.c_str ());
goto error;
}
self->last_err = win32_ipc_pipe_client_receive_have_data_async (self);
if (self->last_err != ERROR_SUCCESS)
goto error;
return;
error:
SetEvent (self->cancellable);
}
static DWORD
win32_ipc_pipe_client_send_need_data_async (Win32IpcPipeClient * self)
{
ClientConnection *conn = &self->conn;
conn->to_write = win32_ipc_pkt_build_need_data (conn->client_msg,
CONN_BUFFER_SIZE, conn->seq_num);
if (conn->to_write == 0) {
GST_ERROR ("Couldn't build NEED-DATA pkt");
return ERROR_BAD_FORMAT;
}
GST_TRACE ("Sending NEED-DATA");
if (!WriteFileEx (conn->pipe, conn->client_msg, conn->to_write,
(OVERLAPPED *) conn, pipe_clinet_send_need_data_finish)) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("WriteFileEx failed with 0x%x (%s)", last_err, msg.c_str ());
return last_err;
}
return ERROR_SUCCESS;
}
static VOID
win32_ipc_pipe_client_loop (Win32IpcPipeClient * self)
{
DWORD mode = PIPE_READMODE_MESSAGE;
std::unique_lock<std::mutex> lk (self->lock);
ClientConnection *conn = &self->conn;
HANDLE waitables[2];
DWORD wait_ret;
conn->pipe = CreateFileA (self->name.c_str (),
GENERIC_READ | GENERIC_WRITE, 0, nullptr, OPEN_EXISTING,
FILE_FLAG_OVERLAPPED, nullptr);
self->last_err = GetLastError ();
if (conn->pipe == INVALID_HANDLE_VALUE) {
std::string msg = win32_ipc_error_message (self->last_err);
GST_WARNING ("CreateFileA failed with 0x%x (%s)", self->last_err,
msg.c_str ());
self->cond.notify_all ();
return;
}
if (!SetNamedPipeHandleState (conn->pipe, &mode, nullptr, nullptr)) {
self->last_err = GetLastError ();
std::string msg = win32_ipc_error_message (self->last_err);
GST_WARNING ("SetNamedPipeHandleState failed with 0x%x (%s)",
self->last_err, msg.c_str ());
CloseHandle (conn->pipe);
conn->pipe = INVALID_HANDLE_VALUE;
self->cond.notify_all ();
return;
}
self->last_err = ERROR_SUCCESS;
self->cond.notify_all ();
lk.unlock ();
/* Once connection is established, send NEED-DATA message to server,
* and then it will loop NEED-DATA -> HAVE-DATA -> READ-DONE */
self->last_err = win32_ipc_pipe_client_send_need_data_async (self);
if (self->last_err != ERROR_SUCCESS)
goto out;
self->io_pending = TRUE;
waitables[0] = self->cancellable;
waitables[1] = self->release_event;
do {
/* Enters alertable thread state and wait for I/O completion event
* or cancellable event */
wait_ret = WaitForMultipleObjectsEx (2, waitables, FALSE, INFINITE, TRUE);
if (wait_ret == WAIT_OBJECT_0) {
GST_DEBUG ("Operation cancelled");
goto out;
}
switch (wait_ret) {
case WAIT_OBJECT_0 + 1:
case WAIT_IO_COMPLETION:
{
std::string unused_mmf;
/* If I/O chain is stopped, send release data message here */
if (!self->io_pending) {
lk.lock ();
if (!self->unused_mmf.empty ()) {
unused_mmf = self->unused_mmf.front ();
self->unused_mmf.pop ();
}
lk.unlock ();
}
if (unused_mmf.size () > 0) {
GST_DEBUG ("Sending release data for %s", unused_mmf.c_str ());
self->io_pending = TRUE;
self->last_err = win32_ipc_pipe_client_send_release_data_async (self,
unused_mmf.c_str ());
if (self->last_err != ERROR_SUCCESS)
goto out;
}
break;
}
default:
GST_WARNING ("Unexpected wait return 0x%x", (UINT) wait_ret);
goto out;
}
} while (true);
out:
if (conn->pipe != INVALID_HANDLE_VALUE) {
CancelIoEx (conn->pipe, (OVERLAPPED *) &conn);
CloseHandle (conn->pipe);
}
lk.lock ();
self->last_err = ERROR_OPERATION_ABORTED;
conn->pipe = INVALID_HANDLE_VALUE;
self->io_pending = FALSE;
self->cond.notify_all ();
}
static BOOL
win32_ipc_pipe_client_run (Win32IpcPipeClient * self)
{
std::unique_lock<std::mutex> lk (self->lock);
self->thread = std::make_unique<std::thread>
(std::thread (win32_ipc_pipe_client_loop, self));
self->cond.wait (lk);
if (self->last_err != ERROR_SUCCESS) {
self->thread->join ();
self->thread = nullptr;
return FALSE;
}
return TRUE;
}
Win32IpcPipeClient *
win32_ipc_pipe_client_new (const char * pipe_name)
{
Win32IpcPipeClient *self;
if (!pipe_name) {
GST_ERROR ("Pipe name must be specified");
return nullptr;
}
self = new Win32IpcPipeClient (pipe_name);
if (!win32_ipc_pipe_client_run (self)) {
win32_ipc_pipe_client_unref (self);
return nullptr;
}
return self;
}
Win32IpcPipeClient *
win32_ipc_pipe_client_ref (Win32IpcPipeClient * client)
{
InterlockedIncrement (&client->ref_count);
return client;
}
void
win32_ipc_pipe_client_unref (Win32IpcPipeClient * client)
{
ULONG ref_count;
ref_count = InterlockedDecrement (&client->ref_count);
if (ref_count == 0)
delete client;
}
void
win32_ipc_pipe_client_set_flushing (Win32IpcPipeClient * client, BOOL flushing)
{
std::lock_guard<std::mutex> lk (client->lock);
client->flushing = flushing;
client->cond.notify_all ();
}
BOOL
win32_ipc_pipe_client_get_mmf (Win32IpcPipeClient * client, Win32IpcMmf ** mmf,
Win32IpcVideoInfo * info)
{
std::unique_lock<std::mutex> lk (client->lock);
if (client->last_err != ERROR_SUCCESS) {
GST_WARNING ("Last error code was 0x%x", client->last_err);
return FALSE;
}
while (client->queue.empty () && client->last_err == ERROR_SUCCESS &&
!client->flushing && !client->stopped) {
client->cond.wait (lk);
}
if (client->queue.empty ())
return FALSE;
MmfInfo mmf_info = client->queue.front ();
client->queue.pop ();
*mmf = mmf_info.mmf;
*info = mmf_info.info;
return TRUE;
}
void
win32_ipc_pipe_client_release_mmf (Win32IpcPipeClient * client,
Win32IpcMmf * mmf)
{
std::string name = win32_ipc_mmf_get_name (mmf);
win32_ipc_mmf_unref (mmf);
std::lock_guard<std::mutex> lk (client->lock);
if (client->last_err != ERROR_SUCCESS)
return;
GST_LOG ("Enqueue release data %s", name.c_str ());
client->unused_mmf.push (name);
SetEvent (client->release_event);
}
void
win32_ipc_pipe_client_stop (Win32IpcPipeClient * client)
{
GST_DEBUG ("Stopping %p", client);
std::lock_guard<std::mutex> lk (client->lock);
client->stopped = TRUE;
client->cond.notify_all ();
}