win32ipc: Add WIN32 shared memory videosrc/sink elements

Windows supports various IPC methods but that's completely
different form that of *nix from implementation point of view.
So, instead of adding shared memory functionality to existing
shm plugin, new WIN32 shared memory source/sink elements
are implemented in this commit.

Each videosink (server) and videosrc (client) pair will communicate
using WIN32 named pipe and thus user should configure unique/proper
pipe name to them (e.g., \\.\pipe\MyPipeName).
Once connection is established, videosink will create named shared memory
object per frame and client will be able to consume the object
(i.e., memory mapped file handle) without additional copy operation.

Note that implementations under "protocol" directory are almost
pure C/C++ with WIN32 APIs except for a few defines and debug functions.
So, applications can take only the protocol part so that the application
can send/receive shared-memory object from/to the other end
even if it's not an actual GStreamer element.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3441>
This commit is contained in:
Seungha Yang 2022-11-19 02:56:27 +09:00 committed by GStreamer Marge Bot
parent ca7f66f9b5
commit 4f846540cb
21 changed files with 3308 additions and 0 deletions

View file

@ -237685,6 +237685,102 @@
"tracers": {}, "tracers": {},
"url": "Unknown package origin" "url": "Unknown package origin"
}, },
"win32ipc": {
"description": "Windows IPC plugin",
"elements": {
"win32ipcvideosink": {
"author": "Seungha Yang <seungha@centricular.com>",
"description": "Send video frames to win32ipcvideosrc elements",
"hierarchy": [
"GstWin32IpcVideoSink",
"GstBaseSink",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Sink/Video",
"pad-templates": {
"sink": {
"caps": "video/x-raw:\n format: { ABGR64_LE, BGRA64_LE, AYUV64, ARGB64_LE, ARGB64, RGBA64_LE, ABGR64_BE, BGRA64_BE, ARGB64_BE, RGBA64_BE, GBRA_12LE, GBRA_12BE, Y412_LE, Y412_BE, A444_10LE, GBRA_10LE, A444_10BE, GBRA_10BE, A422_10LE, A422_10BE, A420_10LE, A420_10BE, RGB10A2_LE, BGR10A2_LE, Y410, GBRA, ABGR, VUYA, BGRA, AYUV, ARGB, RGBA, A420, AV12, Y444_16LE, Y444_16BE, v216, P016_LE, P016_BE, Y444_12LE, GBR_12LE, Y444_12BE, GBR_12BE, I422_12LE, I422_12BE, Y212_LE, Y212_BE, I420_12LE, I420_12BE, P012_LE, P012_BE, Y444_10LE, GBR_10LE, Y444_10BE, GBR_10BE, r210, I422_10LE, I422_10BE, NV16_10LE32, Y210, v210, UYVP, I420_10LE, I420_10BE, P010_10LE, NV12_10LE32, NV12_10LE40, P010_10BE, NV12_10BE_8L128, Y444, RGBP, GBR, BGRP, NV24, xBGR, BGRx, xRGB, RGBx, BGR, IYU2, v308, RGB, Y42B, NV61, NV16, VYUY, UYVY, YVYU, YUY2, I420, YV12, NV21, NV12, NV12_8L128, NV12_64Z32, NV12_4L4, NV12_32L32, NV12_16L32S, Y41B, IYU1, YVU9, YUV9, RGB16, BGR16, RGB15, BGR15, RGB8P, GRAY16_LE, GRAY16_BE, GRAY10_LE32, GRAY8 }\n width: [ 1, 2147483647 ]\n height: [ 1, 2147483647 ]\n framerate: [ 0/1, 2147483647/1 ]\n",
"direction": "sink",
"presence": "always"
}
},
"properties": {
"pipe-name": {
"blurb": "The name of Win32 named pipe to communicate with clients. Validation of the pipe name is caller's responsibility",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "\\\\.\\pipe\\gst.win32.ipc.video",
"mutable": "ready",
"readable": true,
"type": "gchararray",
"writable": true
}
},
"rank": "none"
},
"win32ipcvideosrc": {
"author": "Seungha Yang <seungha@centricular.com>",
"description": "Receive video frames from the win32ipcvideosink",
"hierarchy": [
"GstWin32IpcVideoSrc",
"GstBaseSrc",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Source/Video",
"pad-templates": {
"src": {
"caps": "video/x-raw:\n format: { ABGR64_LE, BGRA64_LE, AYUV64, ARGB64_LE, ARGB64, RGBA64_LE, ABGR64_BE, BGRA64_BE, ARGB64_BE, RGBA64_BE, GBRA_12LE, GBRA_12BE, Y412_LE, Y412_BE, A444_10LE, GBRA_10LE, A444_10BE, GBRA_10BE, A422_10LE, A422_10BE, A420_10LE, A420_10BE, RGB10A2_LE, BGR10A2_LE, Y410, GBRA, ABGR, VUYA, BGRA, AYUV, ARGB, RGBA, A420, AV12, Y444_16LE, Y444_16BE, v216, P016_LE, P016_BE, Y444_12LE, GBR_12LE, Y444_12BE, GBR_12BE, I422_12LE, I422_12BE, Y212_LE, Y212_BE, I420_12LE, I420_12BE, P012_LE, P012_BE, Y444_10LE, GBR_10LE, Y444_10BE, GBR_10BE, r210, I422_10LE, I422_10BE, NV16_10LE32, Y210, v210, UYVP, I420_10LE, I420_10BE, P010_10LE, NV12_10LE32, NV12_10LE40, P010_10BE, NV12_10BE_8L128, Y444, RGBP, GBR, BGRP, NV24, xBGR, BGRx, xRGB, RGBx, BGR, IYU2, v308, RGB, Y42B, NV61, NV16, VYUY, UYVY, YVYU, YUY2, I420, YV12, NV21, NV12, NV12_8L128, NV12_64Z32, NV12_4L4, NV12_32L32, NV12_16L32S, Y41B, IYU1, YVU9, YUV9, RGB16, BGR16, RGB15, BGR15, RGB8P, GRAY16_LE, GRAY16_BE, GRAY10_LE32, GRAY8 }\n width: [ 1, 2147483647 ]\n height: [ 1, 2147483647 ]\n framerate: [ 0/1, 2147483647/1 ]\n",
"direction": "src",
"presence": "always"
}
},
"properties": {
"pipe-name": {
"blurb": "The name of Win32 named pipe to communicate with server. Validation of the pipe name is caller's responsibility",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "\\\\.\\pipe\\gst.win32.ipc.video",
"mutable": "ready",
"readable": true,
"type": "gchararray",
"writable": true
},
"processing-deadline": {
"blurb": "Maximum processing time for a buffer in nanoseconds",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "20000000",
"max": "18446744073709551615",
"min": "0",
"mutable": "playing",
"readable": true,
"type": "guint64",
"writable": true
}
},
"rank": "none"
}
},
"filename": "gstwin32ipc",
"license": "LGPL",
"other-types": {},
"package": "GStreamer Bad Plug-ins",
"source": "gst-plugins-bad",
"tracers": {},
"url": "Unknown package origin"
},
"winks": { "winks": {
"description": "Windows kernel streaming plugin", "description": "Windows kernel streaming plugin",
"elements": { "elements": {

View file

@ -174,6 +174,7 @@ option('webrtc', type : 'feature', value : 'auto', description : 'WebRTC audio/v
option('webrtcdsp', type : 'feature', value : 'auto', description : 'Plugin with various audio filters provided by the WebRTC audio processing library') option('webrtcdsp', type : 'feature', value : 'auto', description : 'Plugin with various audio filters provided by the WebRTC audio processing library')
option('wildmidi', type : 'feature', value : 'auto', description : 'WildMidi midi soft synth plugin') option('wildmidi', type : 'feature', value : 'auto', description : 'WildMidi midi soft synth plugin')
option('wic', type : 'feature', value : 'auto', description : 'Windows Imaging Component plugin') option('wic', type : 'feature', value : 'auto', description : 'Windows Imaging Component plugin')
option('win32ipc', type : 'feature', value : 'auto', description : 'Windows IPC plugin')
option('winks', type : 'feature', value : 'auto', description : 'Windows Kernel Streaming video source plugin') option('winks', type : 'feature', value : 'auto', description : 'Windows Kernel Streaming video source plugin')
option('winscreencap', type : 'feature', value : 'auto', description : 'Windows Screen Capture video source plugin') option('winscreencap', type : 'feature', value : 'auto', description : 'Windows Screen Capture video source plugin')
option('x265', type : 'feature', value : 'auto', description : 'HEVC/H.265 video encoder plugin (GPL - only built if gpl option is also enabled!)') option('x265', type : 'feature', value : 'auto', description : 'HEVC/H.265 video encoder plugin (GPL - only built if gpl option is also enabled!)')

View file

@ -26,5 +26,6 @@ subdir('va')
subdir('wasapi') subdir('wasapi')
subdir('wasapi2') subdir('wasapi2')
subdir('wic') subdir('wic')
subdir('win32ipc')
subdir('winks') subdir('winks')
subdir('winscreencap') subdir('winscreencap')

View file

@ -0,0 +1,71 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include "gstwin32ipcutils.h"
#include <windows.h>
#include <string>
#include <mutex>
static ULONG global_index = 0;
static DWORD
gst_win32_ipc_get_pid (void)
{
static std::once_flag once_flag;
static DWORD pid = 0;
std::call_once (once_flag,[&]() {
pid = GetCurrentProcessId ();
});
return pid;
}
/* Create unique prefix for named shared memory */
gchar *
gst_win32_ipc_get_mmf_prefix (void)
{
std::string prefix = "Local\\gst.win32.ipc." +
std::to_string (gst_win32_ipc_get_pid ()) + std::string (".") +
std::to_string (InterlockedIncrement (&global_index)) + std::string (".");
return g_strdup (prefix.c_str ());
}
gboolean
gst_win32_ipc_clock_is_qpc (GstClock * clock)
{
GstClockType clock_type = GST_CLOCK_TYPE_MONOTONIC;
GstClock *mclock;
if (G_OBJECT_TYPE (clock) != GST_TYPE_SYSTEM_CLOCK)
return FALSE;
g_object_get (clock, "clock-type", &clock_type, nullptr);
if (clock_type != GST_CLOCK_TYPE_MONOTONIC)
return FALSE;
mclock = gst_clock_get_master (clock);
if (!mclock)
return TRUE;
gst_object_unref (mclock);
return FALSE;
}

View file

@ -0,0 +1,30 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#pragma once
#include <gst/gst.h>
G_BEGIN_DECLS
gchar * gst_win32_ipc_get_mmf_prefix (void);
gboolean gst_win32_ipc_clock_is_qpc (GstClock * clock);
G_END_DECLS

View file

@ -0,0 +1,463 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-win32ipcvideosink
* @title: win32ipcvideosink
* @short_description: Windows shared memory video sink
*
* win32ipcvideosink provides raw video memory to connected win32ipcvideossrc
* elements
*
* ## Example launch line
* ```
* gst-launch-1.0 videotestsrc ! queue ! win32ipcvideosink
* ```
*
* Since: 1.22
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstwin32ipcvideosink.h"
#include "gstwin32ipcutils.h"
#include "protocol/win32ipcpipeserver.h"
#include <string>
#include <string.h>
GST_DEBUG_CATEGORY_STATIC (gst_win32_ipc_video_sink_debug);
#define GST_CAT_DEFAULT gst_win32_ipc_video_sink_debug
static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS (GST_VIDEO_CAPS_MAKE (GST_VIDEO_FORMATS_ALL)));
enum
{
PROP_0,
PROP_PIPE_NAME,
};
#define DEFAULT_PIPE_NAME "\\\\.\\pipe\\gst.win32.ipc.video"
struct _GstWin32IpcVideoSink
{
GstBaseSink parent;
GstVideoInfo info;
Win32IpcPipeServer *pipe;
gchar *mmf_prefix;
guint64 seq_num;
LARGE_INTEGER frequency;
Win32IpcMmf *mmf;
Win32IpcVideoInfo minfo;
/* properties */
gchar *pipe_name;
};
static void gst_win32_ipc_video_sink_finalize (GObject * object);
static void gst_win32_ipc_video_sink_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_win32_video_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstClock *gst_win32_ipc_video_sink_provide_clock (GstElement * elem);
static gboolean gst_win32_ipc_video_sink_start (GstBaseSink * sink);
static gboolean gst_win32_ipc_video_sink_stop (GstBaseSink * sink);
static gboolean gst_win32_ipc_video_sink_unlock_stop (GstBaseSink * sink);
static gboolean gst_win32_ipc_video_sink_set_caps (GstBaseSink * sink,
GstCaps * caps);
static void gst_win32_ipc_video_sink_get_time (GstBaseSink * sink,
GstBuffer * buf, GstClockTime * start, GstClockTime * end);
static gboolean gst_win32_ipc_video_sink_propose_allocation (GstBaseSink * sink,
GstQuery * query);
static GstFlowReturn gst_win32_ipc_video_sink_prepare (GstBaseSink * sink,
GstBuffer * buf);
static GstFlowReturn gst_win32_ipc_video_sink_render (GstBaseSink * sink,
GstBuffer * buf);
#define gst_win32_ipc_video_sink_parent_class parent_class
G_DEFINE_TYPE (GstWin32IpcVideoSink, gst_win32_ipc_video_sink,
GST_TYPE_BASE_SINK);
static void
gst_win32_ipc_video_sink_class_init (GstWin32IpcVideoSinkClass * klass)
{
GObjectClass *object_class = G_OBJECT_CLASS (klass);
GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
GstBaseSinkClass *sink_class = GST_BASE_SINK_CLASS (klass);
object_class->finalize = gst_win32_ipc_video_sink_finalize;
object_class->set_property = gst_win32_ipc_video_sink_set_property;
object_class->get_property = gst_win32_video_sink_get_property;
g_object_class_install_property (object_class, PROP_PIPE_NAME,
g_param_spec_string ("pipe-name", "Pipe Name",
"The name of Win32 named pipe to communicate with clients. "
"Validation of the pipe name is caller's responsibility",
DEFAULT_PIPE_NAME, (GParamFlags) (G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS | GST_PARAM_MUTABLE_READY)));
gst_element_class_set_static_metadata (element_class,
"Win32 IPC Video Sink", "Sink/Video",
"Send video frames to win32ipcvideosrc elements",
"Seungha Yang <seungha@centricular.com>");
gst_element_class_add_static_pad_template (element_class, &sink_template);
element_class->provide_clock =
GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_provide_clock);
sink_class->start = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_start);
sink_class->stop = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_stop);
sink_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_unlock_stop);
sink_class->set_caps = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_set_caps);
sink_class->propose_allocation =
GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_propose_allocation);
sink_class->get_times = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_get_time);
sink_class->prepare = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_prepare);
sink_class->render = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_render);
GST_DEBUG_CATEGORY_INIT (gst_win32_ipc_video_sink_debug, "win32ipcvideosink",
0, "win32ipcvideosink");
}
static void
gst_win32_ipc_video_sink_init (GstWin32IpcVideoSink * self)
{
self->pipe_name = g_strdup (DEFAULT_PIPE_NAME);
QueryPerformanceFrequency (&self->frequency);
GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_REQUIRE_CLOCK);
}
static void
gst_win32_ipc_video_sink_finalize (GObject * object)
{
GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (object);
g_free (self->pipe_name);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_win32_ipc_video_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (object);
switch (prop_id) {
case PROP_PIPE_NAME:
GST_OBJECT_LOCK (self);
g_free (self->pipe_name);
self->pipe_name = g_value_dup_string (value);
if (!self->pipe_name)
self->pipe_name = g_strdup (DEFAULT_PIPE_NAME);
GST_OBJECT_UNLOCK (self);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_win32_video_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (object);
switch (prop_id) {
case PROP_PIPE_NAME:
GST_OBJECT_LOCK (self);
g_value_set_string (value, self->pipe_name);
GST_OBJECT_UNLOCK (self);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static GstClock *
gst_win32_ipc_video_sink_provide_clock (GstElement * elem)
{
return gst_system_clock_obtain ();
}
static gboolean
gst_win32_ipc_video_sink_start (GstBaseSink * sink)
{
GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink);
GST_DEBUG_OBJECT (self, "Start");
self->pipe = win32_ipc_pipe_server_new (self->pipe_name);
if (!self->pipe) {
GST_ERROR_OBJECT (self, "Couldn't create pipe server");
return FALSE;
}
self->mmf_prefix = gst_win32_ipc_get_mmf_prefix ();
self->seq_num = 0;
return TRUE;
}
static gboolean
gst_win32_ipc_video_sink_stop (GstBaseSink * sink)
{
GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink);
GST_DEBUG_OBJECT (self, "Stop");
g_clear_pointer (&self->pipe, win32_ipc_pipe_server_unref);
g_clear_pointer (&self->mmf_prefix, g_free);
g_clear_pointer (&self->mmf, win32_ipc_mmf_unref);
return TRUE;
}
static gboolean
gst_win32_ipc_video_sink_unlock_stop (GstBaseSink * sink)
{
GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink);
g_clear_pointer (&self->mmf, win32_ipc_mmf_unref);
return TRUE;
}
static void
gst_win32_ipc_video_sink_get_time (GstBaseSink * sink, GstBuffer * buf,
GstClockTime * start, GstClockTime * end)
{
GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink);
GstClockTime timestamp;
timestamp = GST_BUFFER_PTS (buf);
if (!GST_CLOCK_TIME_IS_VALID (timestamp))
timestamp = GST_BUFFER_DTS (buf);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
*start = timestamp;
if (GST_BUFFER_DURATION_IS_VALID (buf)) {
*end = timestamp + GST_BUFFER_DURATION (buf);
} else if (self->info.fps_n > 0) {
*end = timestamp +
gst_util_uint64_scale_int (GST_SECOND, self->info.fps_d,
self->info.fps_n);
} else if (sink->segment.rate < 0) {
*end = timestamp;
}
}
}
static gboolean
gst_win32_ipc_video_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
{
GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink);
if (!gst_video_info_from_caps (&self->info, caps)) {
GST_WARNING_OBJECT (self, "Invalid caps");
return FALSE;
}
memset (&self->minfo, 0, sizeof (Win32IpcVideoInfo));
self->minfo.format =
(Win32IpcVideoFormat) GST_VIDEO_INFO_FORMAT (&self->info);
self->minfo.width = GST_VIDEO_INFO_WIDTH (&self->info);
self->minfo.height = GST_VIDEO_INFO_HEIGHT (&self->info);
self->minfo.fps_n = self->info.fps_n;
self->minfo.fps_d = self->info.fps_d;
self->minfo.par_n = self->info.par_n;
self->minfo.par_d = self->info.par_d;
return TRUE;
}
static gboolean
gst_win32_ipc_video_sink_propose_allocation (GstBaseSink * sink,
GstQuery * query)
{
GstCaps *caps;
GstBufferPool *pool = nullptr;
GstVideoInfo info;
guint size;
gboolean need_pool;
gst_query_parse_allocation (query, &caps, &need_pool);
if (!caps) {
GST_WARNING_OBJECT (sink, "No caps specified");
return FALSE;
}
if (!gst_video_info_from_caps (&info, caps)) {
GST_WARNING_OBJECT (sink, "Invalid caps %" GST_PTR_FORMAT, caps);
return FALSE;
}
/* the normal size of a frame */
size = info.size;
if (need_pool) {
GstStructure *config;
pool = gst_video_buffer_pool_new ();
config = gst_buffer_pool_get_config (pool);
gst_buffer_pool_config_add_option (config,
GST_BUFFER_POOL_OPTION_VIDEO_META);
size = GST_VIDEO_INFO_SIZE (&info);
gst_buffer_pool_config_set_params (config, caps, (guint) size, 0, 0);
if (!gst_buffer_pool_set_config (pool, config)) {
GST_ERROR_OBJECT (pool, "Couldn't set config");
gst_object_unref (pool);
return FALSE;
}
}
gst_query_add_allocation_pool (query, pool, size, 0, 0);
gst_clear_object (&pool);
gst_query_add_allocation_meta (query, GST_VIDEO_META_API_TYPE, NULL);
return TRUE;
}
static GstFlowReturn
gst_win32_ipc_video_sink_prepare (GstBaseSink * sink, GstBuffer * buf)
{
GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink);
std::string mmf_name;
GstVideoFrame frame;
GstMapInfo info;
g_clear_pointer (&self->mmf, win32_ipc_mmf_unref);
if (!gst_video_frame_map (&frame, &self->info, buf, GST_MAP_READ)) {
GST_ERROR_OBJECT (self, "Couldn't map frame");
return GST_FLOW_ERROR;
}
mmf_name = std::string (self->mmf_prefix) + std::to_string (self->seq_num);
self->seq_num++;
self->mmf = win32_ipc_mmf_alloc (GST_VIDEO_FRAME_SIZE (&frame),
mmf_name.c_str ());
if (!self->mmf) {
GST_ERROR_OBJECT (self, "Couldn't create memory with name %s",
mmf_name.c_str ());
gst_video_frame_unmap (&frame);
return GST_FLOW_ERROR;
}
self->minfo.size = GST_VIDEO_FRAME_SIZE (&frame);
for (guint i = 0; i < GST_VIDEO_FRAME_N_PLANES (&frame); i++) {
self->minfo.offset[i] = GST_VIDEO_FRAME_PLANE_OFFSET (&frame, i);
self->minfo.stride[i] = GST_VIDEO_FRAME_PLANE_STRIDE (&frame, i);
}
gst_video_frame_unmap (&frame);
gst_buffer_map (buf, &info, GST_MAP_READ);
memcpy (win32_ipc_mmf_get_raw (self->mmf), info.data, self->minfo.size);
gst_buffer_unmap (buf, &info);
return GST_FLOW_OK;
}
static GstFlowReturn
gst_win32_ipc_video_sink_render (GstBaseSink * sink, GstBuffer * buf)
{
GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink);
LARGE_INTEGER cur_time;
GstClockTime pts;
GstClockTime now_qpc;
GstClockTime buf_pts;
GstClockTime buffer_clock = GST_CLOCK_TIME_NONE;
QueryPerformanceCounter (&cur_time);
pts = now_qpc = gst_util_uint64_scale (cur_time.QuadPart, GST_SECOND,
self->frequency.QuadPart);
buf_pts = GST_BUFFER_PTS (buf);
if (!GST_CLOCK_TIME_IS_VALID (buf_pts))
buf_pts = GST_BUFFER_DTS (buf);
if (GST_CLOCK_TIME_IS_VALID (buf_pts)) {
buffer_clock = gst_segment_to_running_time (&sink->segment,
GST_FORMAT_TIME, buf_pts) +
GST_ELEMENT_CAST (sink)->base_time + gst_base_sink_get_latency (sink);
}
if (GST_CLOCK_TIME_IS_VALID (buffer_clock)) {
GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (sink));
gboolean is_qpc = TRUE;
is_qpc = gst_win32_ipc_clock_is_qpc (clock);
if (!is_qpc) {
GstClockTime now_gst = gst_clock_get_time (clock);
GstClockTimeDiff converted = buffer_clock;
GST_LOG_OBJECT (self, "Clock is not QPC");
converted -= now_gst;
converted += now_qpc;
if (converted < 0) {
/* Shouldn't happen */
GST_WARNING_OBJECT (self, "Negative buffer clock");
pts = 0;
} else {
pts = converted;
}
} else {
GST_LOG_OBJECT (self, "Clock is QPC already");
/* buffer clock is already QPC time */
pts = buffer_clock;
}
gst_object_unref (clock);
}
self->minfo.qpc = pts;
if (!self->pipe) {
GST_ERROR_OBJECT (self, "Pipe server was not configured");
return GST_FLOW_ERROR;
}
/* win32_ipc_pipe_server_send_mmf() takes ownership of mmf */
if (!win32_ipc_pipe_server_send_mmf (self->pipe,
(Win32IpcMmf *) g_steal_pointer (&self->mmf), &self->minfo)) {
GST_ERROR_OBJECT (self, "Couldn't send buffer");
return GST_FLOW_ERROR;
}
return GST_FLOW_OK;
}

View file

@ -0,0 +1,32 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#pragma once
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include <gst/video/video.h>
G_BEGIN_DECLS
#define GST_TYPE_WIN32_IPC_VIDEO_SINK (gst_win32_ipc_video_sink_get_type())
G_DECLARE_FINAL_TYPE (GstWin32IpcVideoSink, gst_win32_ipc_video_sink,
GST, WIN32_IPC_VIDEO_SINK, GstBaseSink);
G_END_DECLS

View file

@ -0,0 +1,535 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-win32ipcvideosrc
* @title: win32ipcvideosrc
* @short_description: Windows shared memory video source
*
* win32ipcvideosrc receives raw video frames from win32ipcvideosink
* and outputs the received video frames
*
* ## Example launch line
* ```
* gst-launch-1.0 win32ipcvideosrc ! queue ! videoconvert ! d3d11videosink
* ```
*
* Since: 1.22
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstwin32ipcvideosrc.h"
#include "gstwin32ipcutils.h"
#include "protocol/win32ipcpipeclient.h"
#include <string>
GST_DEBUG_CATEGORY_STATIC (gst_win32_ipc_video_src_debug);
#define GST_CAT_DEFAULT gst_win32_ipc_video_src_debug
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS (GST_VIDEO_CAPS_MAKE (GST_VIDEO_FORMATS_ALL)));
enum
{
PROP_0,
PROP_PIPE_NAME,
PROP_PROCESSING_DELAY,
};
#define DEFAULT_PIPE_NAME "\\\\.\\pipe\\gst.win32.ipc.video"
#define DEFAULT_PROCESSING_DEADLINE (20 * GST_MSECOND)
struct _GstWin32IpcVideoSrc
{
GstBaseSrc parent;
GstVideoInfo info;
Win32IpcPipeClient *pipe;
GstCaps *caps;
gboolean flushing;
SRWLOCK lock;
gboolean have_video_meta;
gsize offset[GST_VIDEO_MAX_PLANES];
gint stride[GST_VIDEO_MAX_PLANES];
LARGE_INTEGER frequency;
GstBufferPool *pool;
/* properties */
gchar *pipe_name;
GstClockTime processing_deadline;
};
static void gst_win32_ipc_video_src_finalize (GObject * object);
static void gst_win32_ipc_video_src_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_win32_video_src_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstClock *gst_win32_video_src_provide_clock (GstElement * elem);
static gboolean gst_win32_ipc_video_src_start (GstBaseSrc * src);
static gboolean gst_win32_ipc_video_src_stop (GstBaseSrc * src);
static gboolean gst_win32_ipc_video_src_unlock (GstBaseSrc * src);
static gboolean gst_win32_ipc_video_src_unlock_stop (GstBaseSrc * src);
static gboolean gst_win32_ipc_video_src_query (GstBaseSrc * src,
GstQuery * query);
static gboolean gst_win32_ipc_video_src_decide_allocation (GstBaseSrc * src,
GstQuery * query);
static GstFlowReturn gst_win32_ipc_video_src_create (GstBaseSrc * src,
guint64 offset, guint size, GstBuffer ** buf);
#define gst_win32_ipc_video_src_parent_class parent_class
G_DEFINE_TYPE (GstWin32IpcVideoSrc, gst_win32_ipc_video_src, GST_TYPE_BASE_SRC);
static void
gst_win32_ipc_video_src_class_init (GstWin32IpcVideoSrcClass * klass)
{
GObjectClass *object_class = G_OBJECT_CLASS (klass);
GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
GstBaseSrcClass *src_class = GST_BASE_SRC_CLASS (klass);
object_class->finalize = gst_win32_ipc_video_src_finalize;
object_class->set_property = gst_win32_ipc_video_src_set_property;
object_class->get_property = gst_win32_video_src_get_property;
g_object_class_install_property (object_class, PROP_PIPE_NAME,
g_param_spec_string ("pipe-name", "Pipe Name",
"The name of Win32 named pipe to communicate with server. "
"Validation of the pipe name is caller's responsibility",
DEFAULT_PIPE_NAME, (GParamFlags) (G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS | GST_PARAM_MUTABLE_READY)));
g_object_class_install_property (object_class, PROP_PROCESSING_DELAY,
g_param_spec_uint64 ("processing-deadline", "Processing deadline",
"Maximum processing time for a buffer in nanoseconds", 0, G_MAXUINT64,
DEFAULT_PROCESSING_DEADLINE, (GParamFlags) (G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS | GST_PARAM_MUTABLE_PLAYING)));
gst_element_class_set_static_metadata (element_class,
"Win32 IPC Video Source", "Source/Video",
"Receive video frames from the win32ipcvideosink",
"Seungha Yang <seungha@centricular.com>");
gst_element_class_add_static_pad_template (element_class, &src_template);
element_class->provide_clock =
GST_DEBUG_FUNCPTR (gst_win32_video_src_provide_clock);
src_class->start = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_start);
src_class->stop = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_stop);
src_class->unlock = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_unlock);
src_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_unlock_stop);
src_class->query = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_query);
src_class->decide_allocation =
GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_decide_allocation);
src_class->create = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_create);
GST_DEBUG_CATEGORY_INIT (gst_win32_ipc_video_src_debug, "win32ipcvideosrc",
0, "win32ipcvideosrc");
}
static void
gst_win32_ipc_video_src_init (GstWin32IpcVideoSrc * self)
{
gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
self->pipe_name = g_strdup (DEFAULT_PIPE_NAME);
self->processing_deadline = DEFAULT_PROCESSING_DEADLINE;
QueryPerformanceFrequency (&self->frequency);
GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_REQUIRE_CLOCK);
}
static void
gst_win32_ipc_video_src_finalize (GObject * object)
{
GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (object);
g_free (self->pipe_name);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_win32_ipc_video_src_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (object);
switch (prop_id) {
case PROP_PIPE_NAME:
GST_OBJECT_LOCK (self);
g_free (self->pipe_name);
self->pipe_name = g_value_dup_string (value);
if (!self->pipe_name)
self->pipe_name = g_strdup (DEFAULT_PIPE_NAME);
GST_OBJECT_UNLOCK (self);
break;
case PROP_PROCESSING_DELAY:
{
GstClockTime prev_val, new_val;
GST_OBJECT_LOCK (self);
prev_val = self->processing_deadline;
new_val = g_value_get_uint64 (value);
self->processing_deadline = new_val;
GST_OBJECT_UNLOCK (self);
if (prev_val != new_val) {
GST_DEBUG_OBJECT (self, "Posting latency message");
gst_element_post_message (GST_ELEMENT_CAST (self),
gst_message_new_latency (GST_OBJECT_CAST (self)));
}
break;
}
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_win32_video_src_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (object);
switch (prop_id) {
case PROP_PIPE_NAME:
GST_OBJECT_LOCK (self);
g_value_set_string (value, self->pipe_name);
GST_OBJECT_UNLOCK (self);
break;
case PROP_PROCESSING_DELAY:
GST_OBJECT_LOCK (self);
g_value_set_uint64 (value, self->processing_deadline);
GST_OBJECT_UNLOCK (self);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static GstClock *
gst_win32_video_src_provide_clock (GstElement * elem)
{
return gst_system_clock_obtain ();
}
static gboolean
gst_win32_ipc_video_src_start (GstBaseSrc * src)
{
GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src);
GST_DEBUG_OBJECT (self, "Start");
gst_video_info_init (&self->info);
return TRUE;
}
static gboolean
gst_win32_ipc_video_src_stop (GstBaseSrc * src)
{
GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src);
GST_DEBUG_OBJECT (self, "Stop");
g_clear_pointer (&self->pipe, win32_ipc_pipe_client_unref);
gst_clear_caps (&self->caps);
if (self->pool) {
gst_buffer_pool_set_active (self->pool, FALSE);
gst_clear_object (&self->pool);
}
return TRUE;
}
static gboolean
gst_win32_ipc_video_src_unlock (GstBaseSrc * src)
{
GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src);
GST_DEBUG_OBJECT (self, "Unlock");
AcquireSRWLockExclusive (&self->lock);
self->flushing = TRUE;
if (self->pipe)
win32_ipc_pipe_client_shutdown (self->pipe);
ReleaseSRWLockExclusive (&self->lock);
return TRUE;
}
static gboolean
gst_win32_ipc_video_src_unlock_stop (GstBaseSrc * src)
{
GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src);
GST_DEBUG_OBJECT (self, "Unlock");
AcquireSRWLockExclusive (&self->lock);
g_clear_pointer (&self->pipe, win32_ipc_pipe_client_unref);
gst_clear_caps (&self->caps);
self->flushing = FALSE;
ReleaseSRWLockExclusive (&self->lock);
return TRUE;
}
static gboolean
gst_win32_ipc_video_src_query (GstBaseSrc * src, GstQuery * query)
{
GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_LATENCY:
{
GST_OBJECT_LOCK (self);
if (GST_CLOCK_TIME_IS_VALID (self->processing_deadline)) {
gst_query_set_latency (query, TRUE, self->processing_deadline,
/* pipe server can hold up to 5 memory objects */
5 * self->processing_deadline);
} else {
gst_query_set_latency (query, TRUE, 0, 0);
}
GST_OBJECT_UNLOCK (self);
return TRUE;
}
default:
break;
}
return GST_BASE_SRC_CLASS (parent_class)->query (src, query);
}
static gboolean
gst_win32_ipc_video_src_decide_allocation (GstBaseSrc * src, GstQuery * query)
{
GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src);
gboolean ret;
ret = GST_BASE_SRC_CLASS (parent_class)->decide_allocation (src, query);
if (!ret)
return ret;
self->have_video_meta = gst_query_find_allocation_meta (query,
GST_VIDEO_META_API_TYPE, nullptr);
GST_DEBUG_OBJECT (self, "Downstream supports video meta: %d",
self->have_video_meta);
return TRUE;
}
static GstCaps *
gst_win32_ipc_video_src_update_info_and_get_caps (GstWin32IpcVideoSrc * self,
const Win32IpcVideoInfo * info)
{
GstVideoInfo vinfo;
gst_video_info_set_format (&vinfo, (GstVideoFormat) info->format,
info->width, info->height);
vinfo.fps_n = info->fps_n;
vinfo.fps_d = info->fps_d;
vinfo.par_n = info->par_n;
vinfo.par_d = info->par_d;
if (!self->caps || !gst_video_info_is_equal (&self->info, &vinfo)) {
self->info = vinfo;
return gst_video_info_to_caps (&vinfo);
}
return nullptr;
}
static gboolean
gst_win32_ipc_ensure_fallback_pool (GstWin32IpcVideoSrc * self)
{
GstStructure *config;
if (self->pool)
return TRUE;
self->pool = gst_video_buffer_pool_new ();
config = gst_buffer_pool_get_config (self->pool);
gst_buffer_pool_config_set_params (config, self->caps,
GST_VIDEO_INFO_SIZE (&self->info), 0, 0);
if (!gst_buffer_pool_set_config (self->pool, config)) {
GST_ERROR_OBJECT (self, "Couldn't set config");
goto error;
}
if (!gst_buffer_pool_set_active (self->pool, TRUE)) {
GST_ERROR_OBJECT (self, "Couldn't set active");
goto error;
}
return TRUE;
error:
gst_clear_object (&self->pool);
return FALSE;
}
static GstFlowReturn
gst_win32_ipc_video_src_create (GstBaseSrc * src, guint64 offset, guint size,
GstBuffer ** buf)
{
GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src);
GstCaps *caps;
Win32IpcMmf *mmf;
Win32IpcVideoInfo info;
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buffer;
GstClock *clock;
GstClockTime pts;
GstClockTime base_time;
GstClockTime now_qpc;
GstClockTime now_gst;
LARGE_INTEGER cur_time;
gboolean is_qpc = TRUE;
gboolean need_video_meta = FALSE;
if (!self->pipe) {
self->pipe = win32_ipc_pipe_client_new (self->pipe_name);
if (!self->pipe) {
GST_ERROR_OBJECT (self, "Couldn't create pipe");
return GST_FLOW_ERROR;
}
}
if (!win32_ipc_pipe_client_get_mmf (self->pipe, &mmf, &info)) {
AcquireSRWLockExclusive (&self->lock);
if (self->flushing) {
ret = GST_FLOW_FLUSHING;
GST_DEBUG_OBJECT (self, "Flushing");
} else {
ret = GST_FLOW_EOS;
GST_WARNING_OBJECT (self, "Couldn't get buffer from server");
}
ReleaseSRWLockExclusive (&self->lock);
return ret;
}
caps = gst_win32_ipc_video_src_update_info_and_get_caps (self, &info);
for (guint i = 0; i < GST_VIDEO_INFO_N_PLANES (&self->info); i++) {
self->offset[i] = (gsize) info.offset[i];
self->stride[i] = (gint) info.stride[i];
if (self->offset[i] != self->info.offset[i] ||
self->stride[i] != self->info.stride[i]) {
need_video_meta = TRUE;
}
}
if (caps) {
if (self->pool) {
gst_buffer_pool_set_active (self->pool, FALSE);
gst_clear_object (&self->pool);
}
gst_caps_replace (&self->caps, caps);
GST_DEBUG_OBJECT (self, "Setting caps %" GST_PTR_FORMAT, caps);
gst_pad_set_caps (GST_BASE_SRC_PAD (src), caps);
gst_caps_unref (caps);
}
if (self->have_video_meta || !need_video_meta) {
buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
win32_ipc_mmf_get_raw (mmf), win32_ipc_mmf_get_size (mmf),
0, win32_ipc_mmf_get_size (mmf), mmf,
(GDestroyNotify) win32_ipc_mmf_unref);
if (self->have_video_meta) {
gst_buffer_add_video_meta_full (buffer,
GST_VIDEO_FRAME_FLAG_NONE, GST_VIDEO_INFO_FORMAT (&self->info),
GST_VIDEO_INFO_WIDTH (&self->info),
GST_VIDEO_INFO_HEIGHT (&self->info),
GST_VIDEO_INFO_N_PLANES (&self->info), self->offset, self->stride);
}
} else {
GstVideoFrame mmf_frame, frame;
if (!gst_win32_ipc_ensure_fallback_pool (self)) {
win32_ipc_mmf_unref (mmf);
return GST_FLOW_ERROR;
}
ret = gst_buffer_pool_acquire_buffer (self->pool, &buffer, nullptr);
if (ret != GST_FLOW_OK) {
GST_ERROR_OBJECT (self, "Couldn't acquire buffer");
win32_ipc_mmf_unref (mmf);
return GST_FLOW_ERROR;
}
gst_video_frame_map (&frame, &self->info, buffer, GST_MAP_WRITE);
mmf_frame.info = self->info;
for (guint i = 0; i < GST_VIDEO_FRAME_N_PLANES (&frame); i++) {
mmf_frame.info.offset[i] = self->offset[i];
mmf_frame.info.stride[i] = self->stride[i];
mmf_frame.data[i] = (guint8 *) win32_ipc_mmf_get_raw (mmf) +
self->offset[i];
}
gst_video_frame_copy (&frame, &mmf_frame);
gst_video_frame_unmap (&frame);
}
QueryPerformanceCounter (&cur_time);
now_qpc = gst_util_uint64_scale (cur_time.QuadPart, GST_SECOND,
self->frequency.QuadPart);
clock = gst_element_get_clock (GST_ELEMENT_CAST (self));
now_gst = gst_clock_get_time (clock);
base_time = GST_ELEMENT_CAST (self)->base_time;
is_qpc = gst_win32_ipc_clock_is_qpc (clock);
gst_object_unref (clock);
if (!is_qpc) {
GstClockTimeDiff now_pts = now_gst - base_time + info.qpc - now_qpc;
if (now_pts >= 0)
pts = now_pts;
else
pts = 0;
} else {
if (info.qpc >= base_time) {
/* Our base_time is also QPC */
pts = info.qpc - base_time;
} else {
GST_WARNING_OBJECT (self, "Server QPC is smaller than our QPC base time");
pts = 0;
}
}
GST_BUFFER_PTS (buffer) = pts;
GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
*buf = buffer;
return GST_FLOW_OK;
}

View file

@ -0,0 +1,32 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#pragma once
#include <gst/gst.h>
#include <gst/base/gstbasesrc.h>
#include <gst/video/video.h>
G_BEGIN_DECLS
#define GST_TYPE_WIN32_IPC_VIDEO_SRC (gst_win32_ipc_video_src_get_type())
G_DECLARE_FINAL_TYPE (GstWin32IpcVideoSrc, gst_win32_ipc_video_src,
GST, WIN32_IPC_VIDEO_SRC, GstBaseSrc);
G_END_DECLS

View file

@ -0,0 +1,39 @@
win32ipc_sources = [
'protocol/win32ipcmmf.cpp',
'protocol/win32ipcpipeclient.cpp',
'protocol/win32ipcpipeserver.cpp',
'protocol/win32ipcprotocol.cpp',
'protocol/win32ipcutils.cpp',
'gstwin32ipcutils.cpp',
'gstwin32ipcvideosink.cpp',
'gstwin32ipcvideosrc.cpp',
'plugin.cpp',
]
if host_system != 'windows' or get_option('win32ipc').disabled()
subdir_done()
endif
code = '''
#include <windows.h>
#if !(WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_APP) && !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP))
#error "Not building for UWP"
#endif'''
if cc.compiles(code, name : 'building for UWP')
if get_option('win32ipc').enabled()
error('win32ipc plugin does not support UWP')
else
subdir_done()
endif
endif
gstwin32ipc = library('gstwin32ipc',
win32ipc_sources,
c_args : gst_plugins_bad_args,
cpp_args: gst_plugins_bad_args,
include_directories : [configinc],
dependencies : [gstbase_dep, gstvideo_dep],
install : true,
install_dir : plugins_install_dir,
)
plugins += [gstwin32ipc]

View file

@ -0,0 +1,56 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* plugin-win32ipc:
*
* Windows IPC plugin
*
* Since: 1.22
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst.h>
#include "gstwin32ipcvideosink.h"
#include "gstwin32ipcvideosrc.h"
GST_DEBUG_CATEGORY (gst_win32_ipc_debug);
#define GST_CAT_DEFAULT gst_win32_ipc_debug
static gboolean
plugin_init (GstPlugin * plugin)
{
GST_DEBUG_CATEGORY_INIT (gst_win32_ipc_debug, "win32ipc", 0, "win32ipc");
gst_element_register (plugin,
"win32ipcvideosink", GST_RANK_NONE, GST_TYPE_WIN32_IPC_VIDEO_SINK);
gst_element_register (plugin,
"win32ipcvideosrc", GST_RANK_NONE, GST_TYPE_WIN32_IPC_VIDEO_SRC);
return TRUE;
}
GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
GST_VERSION_MINOR,
win32ipc,
"Windows IPC plugin",
plugin_init, VERSION, "LGPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)

View file

@ -0,0 +1,241 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*/
#include "win32ipcmmf.h"
#include "win32ipcutils.h"
#include <string>
GST_DEBUG_CATEGORY_EXTERN (gst_win32_ipc_debug);
#define GST_CAT_DEFAULT gst_win32_ipc_debug
struct Win32IpcMmf
{
explicit Win32IpcMmf (HANDLE f, void * b, UINT32 s, const std::string & n)
: file (f), buffer (b), size (s), name (n), ref_count (1)
{
}
~Win32IpcMmf ()
{
GST_TRACE ("Freeing %p (%s)", this, name.c_str ());
if (buffer)
UnmapViewOfFile (buffer);
if (file)
CloseHandle (file);
}
HANDLE file;
void *buffer;
UINT32 size;
std::string name;
ULONG ref_count;
};
static Win32IpcMmf *
win32_pic_mmf_new (HANDLE file, UINT32 size, const char * name)
{
Win32IpcMmf *self;
void *buffer;
std::string msg;
UINT err_code;
buffer = MapViewOfFile (file, FILE_MAP_ALL_ACCESS, 0, 0, size);
if (!buffer) {
err_code = GetLastError ();
msg = win32_ipc_error_message (err_code);
GST_ERROR ("MapViewOfFile failed with 0x%x (%s)",
err_code, msg.c_str ());
CloseHandle (file);
return nullptr;
}
self = new Win32IpcMmf (file, buffer, size, name);
return self;
}
/**
* win32_ipc_mmf_alloc:
* @size: Size of memory to allocate
* @name: The name of Memory Mapped File
*
* Creates named shared memory
*
* Returns: a new Win32IpcMmf object
*/
Win32IpcMmf *
win32_ipc_mmf_alloc (UINT32 size, const char * name)
{
HANDLE file;
std::string msg;
UINT err_code;
if (!size) {
GST_ERROR ("Zero size is not allowed");
return nullptr;
}
if (!name) {
GST_ERROR ("Name must be specified");
return nullptr;
}
file = CreateFileMappingA (INVALID_HANDLE_VALUE, nullptr,
PAGE_READWRITE | SEC_COMMIT, 0, size, name);
if (!file) {
err_code = GetLastError ();
msg = win32_ipc_error_message (err_code);
GST_ERROR ("CreateFileMappingA failed with 0x%x (%s)",
err_code, msg.c_str ());
return nullptr;
}
/* The name is already occupied, it's caller's fault... */
if (GetLastError () == ERROR_ALREADY_EXISTS) {
GST_ERROR ("File already exists");
CloseHandle (file);
return nullptr;
}
return win32_pic_mmf_new (file, size, name);
}
/**
* win32_ipc_mmf_open:
* @size: Size of memory to allocate
* @name: The name of Memory Mapped File
*
* Opens named shared memory
*
* Returns: a new Win32IpcMmf object
*/
Win32IpcMmf *
win32_ipc_mmf_open (UINT32 size, const char * name)
{
HANDLE file;
std::string msg;
UINT err_code;
if (!size) {
GST_ERROR ("Zero size is not allowed");
return nullptr;
}
if (!name) {
GST_ERROR ("Name must be specified");
return nullptr;
}
file = OpenFileMappingA (FILE_MAP_ALL_ACCESS, FALSE, name);
if (!file) {
err_code = GetLastError ();
msg = win32_ipc_error_message (err_code);
GST_ERROR ("OpenFileMappingA failed with 0x%x (%s)",
err_code, msg.c_str ());
return nullptr;
}
return win32_pic_mmf_new (file, size, name);
}
/**
* win32_ipc_mmf_get_name:
* @mmf: a Win32IpcMmf object
*
* Returns: the name of @mmf
*/
const char *
win32_ipc_mmf_get_name (Win32IpcMmf * mmf)
{
if (!mmf)
return nullptr;
return mmf->name.c_str ();
}
/**
* win32_ipc_mmf_get_size:
* @mmf: a Win32IpcMmf object
*
* Returns: the size of allocated memory
*/
UINT32
win32_ipc_mmf_get_size (Win32IpcMmf * mmf)
{
if (!mmf)
return 0;
return mmf->size;
}
/**
* win32_ipc_mmf_get_raw:
* @mmf: a Win32IpcMmf object
*
* Returns: the address of allocated memory
*/
void *
win32_ipc_mmf_get_raw (Win32IpcMmf * mmf)
{
if (!mmf)
return nullptr;
return mmf->buffer;
}
/**
* win32_ipc_mmf_ref:
* @mmf: a Win32IpcMmf object
*
* Increase ref count
*/
Win32IpcMmf *
win32_ipc_mmf_ref (Win32IpcMmf * mmf)
{
if (!mmf)
return nullptr;
InterlockedIncrement (&mmf->ref_count);
return mmf;
}
/**
* win32_ipc_mmf_unref:
* @mmf: a Win32IpcMmf object
*
* Decrease ref count
*/
void
win32_ipc_mmf_unref (Win32IpcMmf * mmf)
{
ULONG count;
if (!mmf)
return;
count = InterlockedDecrement (&mmf->ref_count);
if (count == 0)
delete mmf;
}

View file

@ -0,0 +1,50 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*/
#pragma once
#include <gst/gst.h>
#include <windows.h>
G_BEGIN_DECLS
struct Win32IpcMmf;
Win32IpcMmf * win32_ipc_mmf_alloc (UINT32 size,
const char * name);
Win32IpcMmf * win32_ipc_mmf_open (UINT32 size,
const char * name);
const char * win32_ipc_mmf_get_name (Win32IpcMmf * mmf);
UINT32 win32_ipc_mmf_get_size (Win32IpcMmf * mmf);
void * win32_ipc_mmf_get_raw (Win32IpcMmf * mmf);
Win32IpcMmf * win32_ipc_mmf_ref (Win32IpcMmf * mmf);
void win32_ipc_mmf_unref (Win32IpcMmf * mmf);
G_END_DECLS

View file

@ -0,0 +1,448 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*/
#include "win32ipcpipeclient.h"
#include "win32ipcutils.h"
#include <mutex>
#include <condition_variable>
#include <memory>
#include <thread>
#include <queue>
#include <string>
GST_DEBUG_CATEGORY_EXTERN (gst_win32_ipc_debug);
#define GST_CAT_DEFAULT gst_win32_ipc_debug
#define CONN_BUFFER_SIZE 1024
struct MmfInfo
{
Win32IpcMmf *mmf;
Win32IpcVideoInfo info;
};
struct ClientConnection : public OVERLAPPED
{
ClientConnection () : pipe (INVALID_HANDLE_VALUE), to_read (0), to_write (0),
seq_num (0)
{
OVERLAPPED *parent = dynamic_cast<OVERLAPPED *> (this);
parent->Internal = 0;
parent->InternalHigh = 0;
parent->Offset = 0;
parent->OffsetHigh = 0;
}
Win32IpcPipeClient *self;
HANDLE pipe;
UINT8 client_msg[CONN_BUFFER_SIZE];
UINT32 to_read;
UINT8 server_msg[CONN_BUFFER_SIZE];
UINT32 to_write;
UINT64 seq_num;
};
struct Win32IpcPipeClient
{
explicit Win32IpcPipeClient (const std::string & n)
: name (n), ref_count(1), last_err (ERROR_SUCCESS)
{
cancellable = CreateEventA (nullptr, TRUE, FALSE, nullptr);
conn.pipe = INVALID_HANDLE_VALUE;
conn.self = this;
}
~Win32IpcPipeClient ()
{
GST_DEBUG ("Free client %p", this);
win32_ipc_pipe_client_shutdown (this);
CloseHandle (cancellable);
}
std::mutex lock;
std::condition_variable cond;
std::unique_ptr<std::thread> thread;
std::queue<MmfInfo> queue;
std::string name;
ULONG ref_count;
HANDLE cancellable;
UINT last_err;
ClientConnection conn;
};
static DWORD
win32_ipc_pipe_client_send_need_data_async (Win32IpcPipeClient * self);
static VOID WINAPI
win32_ipc_pipe_client_send_read_done_finish (DWORD error_code, DWORD n_bytes,
LPOVERLAPPED overlapped)
{
ClientConnection *conn = (ClientConnection *) overlapped;
Win32IpcPipeClient *self = conn->self;
if (error_code != ERROR_SUCCESS) {
std::string msg = win32_ipc_error_message (error_code);
self->last_err = error_code;
GST_WARNING ("READ-DONE failed with 0x%x (%s)",
self->last_err, msg.c_str ());
goto error;
}
GST_TRACE ("READ-DONE sent");
self->last_err = win32_ipc_pipe_client_send_need_data_async (self);
if (self->last_err != ERROR_SUCCESS)
goto error;
/* All done, back to need-data state */
return;
error:
SetEvent (self->cancellable);
}
static DWORD
win32_ipc_pipe_client_send_read_done_async (Win32IpcPipeClient * self)
{
ClientConnection *conn = &self->conn;
conn->to_write = win32_ipc_pkt_build_read_done (conn->client_msg,
CONN_BUFFER_SIZE, conn->seq_num);
if (conn->to_write == 0) {
GST_ERROR ("Couldn't build READ-DONE pkt");
return ERROR_BAD_FORMAT;
}
GST_TRACE ("Sending READ-DONE");
if (!WriteFileEx (conn->pipe, conn->client_msg, conn->to_write,
(OVERLAPPED *) conn, win32_ipc_pipe_client_send_read_done_finish)) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("WriteFileEx failed with 0x%x (%s)", last_err, msg.c_str ());
return last_err;
}
return ERROR_SUCCESS;
}
static VOID WINAPI
win32_ipc_pipe_client_receive_have_data_finish (DWORD error_code, DWORD n_bytes,
LPOVERLAPPED overlapped)
{
ClientConnection *conn = (ClientConnection *) overlapped;
Win32IpcPipeClient *self = conn->self;
char mmf_name[1024] = { '\0', };
Win32IpcVideoInfo info;
Win32IpcMmf *mmf;
MmfInfo minfo;
if (error_code != ERROR_SUCCESS) {
std::string msg = win32_ipc_error_message (error_code);
self->last_err = error_code;
GST_WARNING ("HAVE-DATA failed with 0x%x (%s)",
self->last_err, msg.c_str ());
goto error;
}
if (!win32_ipc_pkt_parse_have_data (conn->server_msg, n_bytes,
&conn->seq_num, mmf_name, &info)) {
self->last_err = ERROR_BAD_FORMAT;
GST_WARNING ("Couldn't parse HAVE-DATA pkg");
goto error;
}
mmf = win32_ipc_mmf_open (info.size, mmf_name);
if (!mmf) {
GST_ERROR ("Couldn't open file %s", mmf_name);
self->last_err = ERROR_BAD_FORMAT;
goto error;
}
GST_TRACE ("Got HAVE-DATA %s", mmf_name);
minfo.mmf = mmf;
minfo.info = info;
{
std::lock_guard<std::mutex> lk (self->lock);
/* Drops too old data */
while (self->queue.size () > 5) {
MmfInfo info = self->queue.front ();
self->queue.pop ();
win32_ipc_mmf_unref (info.mmf);
}
self->queue.push (minfo);
self->cond.notify_all ();
}
self->last_err = win32_ipc_pipe_client_send_read_done_async (self);
if (self->last_err != ERROR_SUCCESS)
goto error;
return;
error:
SetEvent (self->cancellable);
}
static DWORD
win32_ipc_pipe_client_receive_have_data_async (Win32IpcPipeClient * self)
{
ClientConnection *conn = &self->conn;
GST_TRACE ("Waiting HAVE-DATA");
if (!ReadFileEx (conn->pipe, conn->server_msg, CONN_BUFFER_SIZE,
(OVERLAPPED *) conn, win32_ipc_pipe_client_receive_have_data_finish)) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("ReadFileEx failed with 0x%x (%s)", last_err, msg.c_str ());
return last_err;
}
return ERROR_SUCCESS;
}
static VOID WINAPI
pipe_clinet_send_need_data_finish (DWORD error_code, DWORD n_bytes,
LPOVERLAPPED overlapped)
{
ClientConnection *conn = (ClientConnection *) overlapped;
Win32IpcPipeClient *self = conn->self;
if (error_code != ERROR_SUCCESS) {
std::string msg = win32_ipc_error_message (error_code);
self->last_err = error_code;
GST_WARNING ("NEED-DATA failed with 0x%x (%s)",
self->last_err, msg.c_str ());
goto error;
}
self->last_err = win32_ipc_pipe_client_receive_have_data_async (self);
if (self->last_err != ERROR_SUCCESS)
goto error;
return;
error:
SetEvent (self->cancellable);
}
static DWORD
win32_ipc_pipe_client_send_need_data_async (Win32IpcPipeClient * self)
{
ClientConnection *conn = &self->conn;
conn->to_write = win32_ipc_pkt_build_need_data (conn->client_msg,
CONN_BUFFER_SIZE, conn->seq_num);
if (conn->to_write == 0) {
GST_ERROR ("Couldn't build NEED-DATA pkt");
return ERROR_BAD_FORMAT;
}
GST_TRACE ("Sending NEED-DATA");
if (!WriteFileEx (conn->pipe, conn->client_msg, conn->to_write,
(OVERLAPPED *) conn, pipe_clinet_send_need_data_finish)) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("WriteFileEx failed with 0x%x (%s)", last_err, msg.c_str ());
return last_err;
}
return ERROR_SUCCESS;
}
static VOID
win32_ipc_pipe_client_loop (Win32IpcPipeClient * self)
{
DWORD mode = PIPE_READMODE_MESSAGE;
std::unique_lock<std::mutex> lk (self->lock);
ClientConnection *conn = &self->conn;
conn->pipe = CreateFileA (self->name.c_str (),
GENERIC_READ | GENERIC_WRITE, 0, nullptr, OPEN_EXISTING,
FILE_FLAG_OVERLAPPED, nullptr);
self->last_err = GetLastError ();
if (conn->pipe == INVALID_HANDLE_VALUE) {
std::string msg = win32_ipc_error_message (self->last_err);
GST_WARNING ("CreateFileA failed with 0x%x (%s)", self->last_err,
msg.c_str ());
self->cond.notify_all ();
return;
}
if (!SetNamedPipeHandleState (conn->pipe, &mode, nullptr, nullptr)) {
self->last_err = GetLastError ();
std::string msg = win32_ipc_error_message (self->last_err);
GST_WARNING ("SetNamedPipeHandleState failed with 0x%x (%s)",
self->last_err, msg.c_str ());
CloseHandle (conn->pipe);
conn->pipe = INVALID_HANDLE_VALUE;
self->cond.notify_all ();
return;
}
self->last_err = ERROR_SUCCESS;
self->cond.notify_all ();
lk.unlock ();
/* Once connection is established, send NEED-DATA message to server,
* and then it will loop NEED-DATA -> HAVE-DATA -> READ-DONE */
self->last_err = win32_ipc_pipe_client_send_need_data_async (self);
if (self->last_err != ERROR_SUCCESS)
goto out;
do {
/* Enters alertable thread state and wait for I/O completion event
* or cancellable event */
DWORD ret = WaitForSingleObjectEx (self->cancellable, INFINITE, TRUE);
if (ret == WAIT_OBJECT_0) {
GST_DEBUG ("Operation cancelled");
CancelIoEx (conn->pipe, (OVERLAPPED *) &conn);
break;
} else if (ret != WAIT_IO_COMPLETION) {
GST_WARNING ("Unexpected wait return 0x%x", (UINT) ret);
CancelIoEx (conn->pipe, (OVERLAPPED *) &conn);
break;
}
} while (true);
out:
if (conn->pipe != INVALID_HANDLE_VALUE)
CloseHandle (conn->pipe);
lk.lock ();
self->last_err = ERROR_OPERATION_ABORTED;
conn->pipe = INVALID_HANDLE_VALUE;
self->cond.notify_all ();
}
static BOOL
win32_ipc_pipe_client_run (Win32IpcPipeClient * self)
{
std::unique_lock<std::mutex> lk (self->lock);
self->thread = std::make_unique<std::thread>
(std::thread (win32_ipc_pipe_client_loop, self));
self->cond.wait (lk);
if (self->last_err != ERROR_SUCCESS) {
self->thread->join ();
self->thread = nullptr;
return FALSE;
}
return TRUE;
}
Win32IpcPipeClient *
win32_ipc_pipe_client_new (const char * pipe_name)
{
Win32IpcPipeClient *self;
if (!pipe_name) {
GST_ERROR ("Pipe name must be specified");
return nullptr;
}
self = new Win32IpcPipeClient (pipe_name);
if (!win32_ipc_pipe_client_run (self)) {
win32_ipc_pipe_client_unref (self);
return nullptr;
}
return self;
}
Win32IpcPipeClient *
win32_ipc_pipe_client_ref (Win32IpcPipeClient * client)
{
InterlockedIncrement (&client->ref_count);
return client;
}
void
win32_ipc_pipe_client_unref (Win32IpcPipeClient * client)
{
ULONG ref_count;
ref_count = InterlockedDecrement (&client->ref_count);
if (ref_count == 0)
delete client;
}
void
win32_ipc_pipe_client_shutdown (Win32IpcPipeClient * client)
{
GST_DEBUG ("Shutting down %p", client);
SetEvent (client->cancellable);
if (client->thread) {
client->thread->join ();
client->thread = nullptr;
}
std::lock_guard<std::mutex> lk (client->lock);
client->last_err = ERROR_OPERATION_ABORTED;
while (!client->queue.empty ()) {
MmfInfo info = client->queue.front ();
client->queue.pop ();
win32_ipc_mmf_unref (info.mmf);
}
client->cond.notify_all ();
}
BOOL
win32_ipc_pipe_client_get_mmf (Win32IpcPipeClient * client, Win32IpcMmf ** mmf,
Win32IpcVideoInfo * info)
{
std::unique_lock<std::mutex> lk (client->lock);
if (client->last_err != ERROR_SUCCESS) {
GST_WARNING ("Last error code was 0x%x", client->last_err);
return FALSE;
}
while (client->queue.empty () && client->last_err == ERROR_SUCCESS)
client->cond.wait (lk);
if (client->last_err != ERROR_SUCCESS || client->queue.empty ())
return FALSE;
MmfInfo mmf_info = client->queue.front ();
client->queue.pop ();
*mmf = mmf_info.mmf;
*info = mmf_info.info;
return TRUE;
}

View file

@ -0,0 +1,49 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*/
#pragma once
#include <windows.h>
#include <string.h>
#include "win32ipcmmf.h"
#include "win32ipcprotocol.h"
#include <gst/gst.h>
G_BEGIN_DECLS
struct Win32IpcPipeClient;
Win32IpcPipeClient * win32_ipc_pipe_client_new (const char * pipe_name);
Win32IpcPipeClient * win32_ipc_pipe_client_ref (Win32IpcPipeClient * client);
void win32_ipc_pipe_client_unref (Win32IpcPipeClient * client);
void win32_ipc_pipe_client_shutdown (Win32IpcPipeClient * client);
BOOL win32_ipc_pipe_client_get_mmf (Win32IpcPipeClient * client,
Win32IpcMmf ** mmf,
Win32IpcVideoInfo * info);
G_END_DECLS

View file

@ -0,0 +1,550 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*/
#include "win32ipcpipeserver.h"
#include "win32ipcutils.h"
#include <mutex>
#include <condition_variable>
#include <memory>
#include <thread>
#include <queue>
#include <vector>
#include <string>
#include <algorithm>
#include <assert.h>
GST_DEBUG_CATEGORY_EXTERN (gst_win32_ipc_debug);
#define GST_CAT_DEFAULT gst_win32_ipc_debug
#define CONN_BUFFER_SIZE 1024
struct MmfInfo
{
explicit MmfInfo (Win32IpcMmf * m, const Win32IpcVideoInfo * i, UINT64 s)
{
mmf = m;
info = *i;
seq_num = s;
}
~MmfInfo()
{
if (mmf)
win32_ipc_mmf_unref (mmf);
}
Win32IpcMmf *mmf = nullptr;
Win32IpcVideoInfo info;
UINT64 seq_num;
};
struct ServerConnection : public OVERLAPPED
{
ServerConnection(Win32IpcPipeServer * server, HANDLE p)
: self(server), pipe(p)
{
OVERLAPPED *parent = dynamic_cast<OVERLAPPED *> (this);
parent->Internal = 0;
parent->InternalHigh = 0;
parent->Offset = 0;
parent->OffsetHigh = 0;
}
Win32IpcPipeServer *self;
std::shared_ptr<MmfInfo> minfo;
HANDLE pipe = INVALID_HANDLE_VALUE;
UINT8 client_msg[CONN_BUFFER_SIZE];
UINT32 to_read = 0;
UINT8 server_msg[CONN_BUFFER_SIZE];
UINT32 to_write = 0;
UINT64 seq_num = 0;
BOOL pending_have_data = FALSE;
};
struct Win32IpcPipeServer
{
explicit Win32IpcPipeServer (const std::string & n)
: name (n), ref_count (1), last_err (ERROR_SUCCESS), seq_num (0)
{
enqueue_event = CreateEventA (nullptr, FALSE, FALSE, nullptr);
cancellable = CreateEventA (nullptr, TRUE, FALSE, nullptr);
}
~Win32IpcPipeServer ()
{
win32_ipc_pipe_server_shutdown (this);
CloseHandle (cancellable);
CloseHandle (enqueue_event);
}
std::mutex lock;
std::condition_variable cond;
std::unique_ptr<std::thread> thread;
std::shared_ptr<MmfInfo> minfo;
std::string name;
std::vector<ServerConnection *> conn;
ULONG ref_count;
HANDLE enqueue_event;
HANDLE cancellable;
UINT last_err;
UINT64 seq_num;
};
static void
win32_ipc_pipe_server_receive_need_data_async (ServerConnection * conn);
static void
win32_ipc_pipe_server_close_connection (ServerConnection * conn,
BOOL remove_from_list)
{
Win32IpcPipeServer *self = conn->self;
GST_DEBUG ("Closing connection %p", conn);
if (remove_from_list) {
self->conn.erase (std::remove (self->conn.begin (), self->conn.end (),
conn), self->conn.end ());
}
if (!DisconnectNamedPipe (conn->pipe)) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("DisconnectNamedPipe failed with 0x%x (%s)",
last_err, msg.c_str ());
}
CloseHandle (conn->pipe);
delete conn;
}
static void WINAPI
win32_ipc_pipe_server_receive_read_done_finish (DWORD error_code, DWORD n_bytes,
LPOVERLAPPED overlapped)
{
ServerConnection *conn = (ServerConnection *) overlapped;
if (error_code != ERROR_SUCCESS) {
std::string msg = win32_ipc_error_message (error_code);
GST_WARNING ("READ-DONE failed with 0x%x (%s)",
(UINT) error_code, msg.c_str ());
win32_ipc_pipe_server_close_connection (conn, TRUE);
return;
}
GST_TRACE ("Got READ-DONE %p", conn);
conn->minfo = nullptr;
/* All done, wait for need-data again */
win32_ipc_pipe_server_receive_need_data_async (conn);
}
static void
win32_ipc_pipe_server_receive_read_done_async (ServerConnection * conn)
{
GST_TRACE ("Waiting READ-DONE %p", conn);
if (!ReadFileEx (conn->pipe, conn->client_msg, CONN_BUFFER_SIZE,
(OVERLAPPED *) conn, win32_ipc_pipe_server_receive_read_done_finish)) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("ReadFileEx failed with 0x%x (%s)", last_err, msg.c_str ());
win32_ipc_pipe_server_close_connection (conn, TRUE);
}
}
static void WINAPI
win32_ipc_pipe_server_send_have_data_finish (DWORD error_code, DWORD n_bytes,
LPOVERLAPPED overlapped)
{
ServerConnection *conn = (ServerConnection *) overlapped;
if (error_code != ERROR_SUCCESS) {
std::string msg = win32_ipc_error_message (error_code);
GST_WARNING ("HAVE-DATA failed with 0x%x (%s)",
(UINT) error_code, msg.c_str ());
win32_ipc_pipe_server_close_connection (conn, TRUE);
return;
}
GST_TRACE ("HAVE-DATA done with %s",
win32_ipc_mmf_get_name (conn->minfo->mmf));
win32_ipc_pipe_server_receive_read_done_async (conn);
}
static void
win32_ipc_pipe_server_send_have_data_async (ServerConnection * conn)
{
assert (conn->minfo != nullptr);
conn->pending_have_data = FALSE;
conn->seq_num = conn->minfo->seq_num;
conn->to_write = win32_ipc_pkt_build_have_data (conn->server_msg,
CONN_BUFFER_SIZE, conn->seq_num,
win32_ipc_mmf_get_name (conn->minfo->mmf), &conn->minfo->info);
if (conn->to_write == 0) {
GST_ERROR ("Couldn't build HAVE-DATA pkt");
win32_ipc_pipe_server_close_connection (conn, TRUE);
return;
}
conn->seq_num++;
GST_TRACE ("Sending HAVE-DATA");
if (!WriteFileEx (conn->pipe, conn->server_msg, conn->to_write,
(OVERLAPPED *) conn, win32_ipc_pipe_server_send_have_data_finish)) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("WriteFileEx failed with 0x%x (%s)", last_err, msg.c_str ());
win32_ipc_pipe_server_close_connection (conn, TRUE);
}
}
static void WINAPI
win32_ipc_pipe_server_receive_need_data_finish (DWORD error_code, DWORD n_bytes,
LPOVERLAPPED overlapped)
{
ServerConnection *conn = (ServerConnection *) overlapped;
UINT64 seq_num;
if (error_code != ERROR_SUCCESS) {
std::string msg = win32_ipc_error_message (error_code);
GST_WARNING ("NEED-DATA failed with 0x%x (%s)",
(UINT) error_code, msg.c_str ());
win32_ipc_pipe_server_close_connection (conn, TRUE);
return;
}
if (!win32_ipc_pkt_parse_need_data (conn->client_msg, CONN_BUFFER_SIZE,
&seq_num)) {
GST_ERROR ("Couldn't parse NEED-DATA message");
win32_ipc_pipe_server_close_connection (conn, TRUE);
return;
}
GST_TRACE ("Got NEED-DATA");
/* Will response later once data is available */
if (!conn->minfo) {
GST_LOG ("No data available, waiting");
conn->pending_have_data = TRUE;
return;
}
win32_ipc_pipe_server_send_have_data_async (conn);
}
static void
win32_ipc_pipe_server_receive_need_data_async (ServerConnection * conn)
{
GST_TRACE ("Waiting NEED-DATA");
if (!ReadFileEx (conn->pipe, conn->client_msg, CONN_BUFFER_SIZE,
(OVERLAPPED *) conn, win32_ipc_pipe_server_receive_need_data_finish)) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("ReadFileEx failed with 0x%x (%s)", last_err, msg.c_str ());
win32_ipc_pipe_server_close_connection (conn, TRUE);
}
}
static HANDLE
win32_ipc_pipe_server_create_pipe (Win32IpcPipeServer * self,
OVERLAPPED * overlap, BOOL * io_pending)
{
HANDLE pipe = CreateNamedPipeA (self->name.c_str (),
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES,
CONN_BUFFER_SIZE, CONN_BUFFER_SIZE, 5000, nullptr);
if (pipe == INVALID_HANDLE_VALUE) {
self->last_err = GetLastError ();
std::string msg = win32_ipc_error_message (self->last_err);
GST_WARNING ("CreateNamedPipeA failed with 0x%x (%s)",
self->last_err, msg.c_str ());
return INVALID_HANDLE_VALUE;
}
/* Async pipe should return FALSE */
if (ConnectNamedPipe (pipe, overlap)) {
self->last_err = GetLastError ();
std::string msg = win32_ipc_error_message (self->last_err);
GST_WARNING ("ConnectNamedPipe failed with 0x%x (%s)",
self->last_err, msg.c_str ());
CloseHandle (pipe);
return INVALID_HANDLE_VALUE;
}
*io_pending = FALSE;
self->last_err = GetLastError ();
switch (self->last_err) {
case ERROR_IO_PENDING:
*io_pending = TRUE;
break;
case ERROR_PIPE_CONNECTED:
SetEvent (overlap->hEvent);
break;
default:
{
std::string msg = win32_ipc_error_message (self->last_err);
GST_WARNING ("ConnectNamedPipe failed with 0x%x (%s)",
self->last_err, msg.c_str ());
CloseHandle (pipe);
return INVALID_HANDLE_VALUE;
}
}
self->last_err = ERROR_SUCCESS;
return pipe;
}
static void
win32_ipc_pipe_server_loop (Win32IpcPipeServer * self)
{
BOOL io_pending = FALSE;
DWORD n_bytes;
DWORD wait_ret;
HANDLE waitables[3];
HANDLE pipe;
OVERLAPPED overlap;
std::unique_lock<std::mutex> lk (self->lock);
overlap.hEvent = CreateEvent (nullptr, TRUE, TRUE, nullptr);
pipe = win32_ipc_pipe_server_create_pipe (self, &overlap, &io_pending);
if (pipe == INVALID_HANDLE_VALUE) {
CloseHandle (overlap.hEvent);
self->cond.notify_all ();
return;
}
self->last_err = ERROR_SUCCESS;
self->cond.notify_all ();
lk.unlock ();
do {
ServerConnection *conn;
waitables[0] = overlap.hEvent;
waitables[1] = self->enqueue_event;
waitables[2] = self->cancellable;
/* Enters alertable state and wait for
* 1) Client's connection request
* (similar to socket listen/accept in async manner)
* 2) Or, performs completion routines (finish APC)
* 3) Or, terminates if cancellable event was signalled
*/
wait_ret = WaitForMultipleObjectsEx (3, waitables, FALSE, INFINITE, TRUE);
if (wait_ret == WAIT_OBJECT_0 + 2) {
GST_DEBUG ("Operation cancelled");
goto out;
}
switch (wait_ret) {
case WAIT_OBJECT_0:
if (io_pending) {
BOOL ret = GetOverlappedResult (pipe, &overlap, &n_bytes, FALSE);
if (!ret) {
UINT last_err = GetLastError ();
std::string msg = win32_ipc_error_message (last_err);
GST_WARNING ("ConnectNamedPipe failed with 0x%x (%s)",
last_err, msg.c_str ());
CloseHandle (pipe);
break;
}
}
conn = new ServerConnection (self, pipe);
GST_DEBUG ("New connection is established %p", conn);
/* Stores current buffer if available */
lk.lock();
conn->minfo = self->minfo;
lk.unlock ();
pipe = INVALID_HANDLE_VALUE;
self->conn.push_back (conn);
win32_ipc_pipe_server_receive_need_data_async (conn);
pipe = win32_ipc_pipe_server_create_pipe (self, &overlap, &io_pending);
if (pipe == INVALID_HANDLE_VALUE)
goto out;
break;
case WAIT_OBJECT_0 + 1:
case WAIT_IO_COMPLETION:
{
std::vector<ServerConnection *> pending_conns;
std::shared_ptr<MmfInfo> minfo;
lk.lock();
minfo = self->minfo;
lk.unlock();
if (minfo) {
for (auto iter: self->conn) {
if (iter->pending_have_data && iter->seq_num <= minfo->seq_num) {
iter->minfo = minfo;
pending_conns.push_back (iter);
}
}
}
for (auto iter: pending_conns) {
GST_LOG ("Sending pending have data to %p", iter);
win32_ipc_pipe_server_send_have_data_async (iter);
}
break;
}
default:
GST_WARNING ("Unexpected WaitForMultipleObjectsEx return 0x%x",
(UINT) wait_ret);
goto out;
}
} while (true);
out:
/* Cancels all I/O event issued from this thread */
{
std::vector<HANDLE> pipes;
for (auto iter: self->conn) {
if (iter->pipe != INVALID_HANDLE_VALUE)
pipes.push_back (iter->pipe);
}
for (auto iter: pipes)
CancelIo (iter);
}
for (auto iter: self->conn)
win32_ipc_pipe_server_close_connection (iter, FALSE);
self->conn.clear ();
lk.lock ();
CloseHandle (overlap.hEvent);
self->last_err = ERROR_OPERATION_ABORTED;
self->cond.notify_all ();
}
static BOOL
win32_ipc_pipe_server_run (Win32IpcPipeServer * self)
{
std::unique_lock<std::mutex> lk (self->lock);
self->thread = std::make_unique<std::thread>
(std::thread (win32_ipc_pipe_server_loop, self));
self->cond.wait (lk);
if (self->last_err != ERROR_SUCCESS) {
self->thread->join ();
self->thread = nullptr;
return FALSE;
}
return TRUE;
}
Win32IpcPipeServer *
win32_ipc_pipe_server_new (const char * pipe_name)
{
Win32IpcPipeServer *self;
if (!pipe_name)
return nullptr;
self = new Win32IpcPipeServer (pipe_name);
if (!win32_ipc_pipe_server_run (self)) {
win32_ipc_pipe_server_unref (self);
return nullptr;
}
return self;
}
Win32IpcPipeServer *
win32_ipc_pipe_server_ref (Win32IpcPipeServer * server)
{
if (!server)
return nullptr;
InterlockedIncrement (&server->ref_count);
return server;
}
void
win32_ipc_pipe_server_unref (Win32IpcPipeServer * server)
{
ULONG ref_count;
if (!server)
return;
ref_count = InterlockedDecrement (&server->ref_count);
if (ref_count == 0)
delete server;
}
void
win32_ipc_pipe_server_shutdown (Win32IpcPipeServer * server)
{
GST_DEBUG ("Shutting down");
SetEvent (server->cancellable);
if (server->thread) {
server->thread->join ();
server->thread = nullptr;
}
std::lock_guard<std::mutex> lk (server->lock);
server->last_err = ERROR_OPERATION_ABORTED;
server->minfo = nullptr;
server->cond.notify_all ();
}
BOOL
win32_ipc_pipe_server_send_mmf (Win32IpcPipeServer * server, Win32IpcMmf * mmf,
const Win32IpcVideoInfo * info)
{
std::lock_guard<std::mutex> lk (server->lock);
server->minfo = std::make_shared<MmfInfo> (mmf, info, server->seq_num);
GST_LOG ("Enqueue mmf %s", win32_ipc_mmf_get_name (mmf));
server->seq_num++;
/* Wakeup event loop */
SetEvent (server->enqueue_event);
return TRUE;
}

View file

@ -0,0 +1,50 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*/
#pragma once
#include <windows.h>
#include <string.h>
#include "win32ipcmmf.h"
#include "win32ipcprotocol.h"
#include <gst/gst.h>
G_BEGIN_DECLS
struct Win32IpcPipeServer;
Win32IpcPipeServer * win32_ipc_pipe_server_new (const char * pipe_name);
Win32IpcPipeServer * win32_ipc_pipe_server_ref (Win32IpcPipeServer * server);
void win32_ipc_pipe_server_unref (Win32IpcPipeServer * server);
void win32_ipc_pipe_server_shutdown (Win32IpcPipeServer * server);
BOOL win32_ipc_pipe_server_send_mmf (Win32IpcPipeServer * server,
Win32IpcMmf * mmf,
const Win32IpcVideoInfo * info);
G_END_DECLS

View file

@ -0,0 +1,237 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*/
#include "win32ipcprotocol.h"
#include <string.h>
const char *
win32_ipc_pkt_type_to_string (Win32IpcPktType type)
{
switch (type) {
case WIN32_IPC_PKT_NEED_DATA:
return "NEED-DATA";
case WIN32_IPC_PKT_HAVE_DATA:
return "HAVE-DATA";
case WIN32_IPC_PKT_READ_DONE:
return "READ-DONE";
default:
break;
}
return "Unknown";
}
Win32IpcPktType
win32_ipc_pkt_type_from_raw (UINT8 type)
{
return (Win32IpcPktType) type;
}
UINT8
win32_ipc_pkt_type_to_raw (Win32IpcPktType type)
{
return (UINT8) type;
}
#define READ_UINT32(d,v) do { \
(*((UINT32 *) v)) = *((UINT32 *) d); \
(d) += sizeof (UINT32); \
} while (0)
#define WRITE_UINT32(d,v) do { \
*((UINT32 *) d) = v; \
(d) += sizeof (UINT32); \
} while (0)
#define READ_UINT64(d,v) do { \
(*((UINT64 *) v)) = *((UINT64 *) d); \
(d) += sizeof (UINT64); \
} while (0)
#define WRITE_UINT64(d,v) do { \
*((UINT64 *) d) = v; \
(d) += sizeof (UINT64); \
} while (0)
UINT32
win32_ipc_pkt_build_need_data (UINT8 * pkt, UINT32 pkt_len, UINT64 seq_num)
{
UINT8 *data = pkt;
if (!pkt || pkt_len < WIN32_IPC_PKT_NEED_DATA_SIZE)
return 0;
data[0] = win32_ipc_pkt_type_to_raw (WIN32_IPC_PKT_NEED_DATA);
data++;
WRITE_UINT64 (data, seq_num);
return WIN32_IPC_PKT_NEED_DATA_SIZE;
}
BOOL
win32_ipc_pkt_parse_need_data (UINT8 * pkt, UINT32 pkt_len, UINT64 * seq_num)
{
UINT8 *data = pkt;
if (!pkt || pkt_len < WIN32_IPC_PKT_NEED_DATA_SIZE)
return FALSE;
if (win32_ipc_pkt_type_from_raw (data[0]) != WIN32_IPC_PKT_NEED_DATA)
return FALSE;
data++;
READ_UINT64 (data, seq_num);
return TRUE;
}
UINT32
win32_ipc_pkt_build_have_data (UINT8 * pkt, UINT32 pkt_size, UINT64 seq_num,
const char * mmf_name, const Win32IpcVideoInfo * info)
{
UINT8 *data = pkt;
size_t len;
if (!pkt || !mmf_name || !info)
return 0;
len = strlen (mmf_name);
if (len == 0)
return 0;
len++;
if (pkt_size < WIN32_IPC_PKT_HAVE_DATA_SIZE + len)
return 0;
data[0] = win32_ipc_pkt_type_to_raw (WIN32_IPC_PKT_HAVE_DATA);
data++;
WRITE_UINT64 (data, seq_num);
strcpy ((char *) data, mmf_name);
data += len;
WRITE_UINT32 (data, info->format);
WRITE_UINT32 (data, info->width);
WRITE_UINT32 (data, info->height);
WRITE_UINT32 (data, info->fps_n);
WRITE_UINT32 (data, info->fps_d);
WRITE_UINT32 (data, info->par_n);
WRITE_UINT32 (data, info->par_d);
WRITE_UINT64 (data, info->size);
for (UINT i = 0; i < 4; i++)
WRITE_UINT64 (data, info->offset[i]);
for (UINT i = 0; i < 4; i++)
WRITE_UINT32 (data, info->stride[i]);
WRITE_UINT64 (data, info->qpc);
return data - pkt;
}
BOOL
win32_ipc_pkt_parse_have_data (UINT8 * pkt, UINT32 pkt_size, UINT64 * seq_num,
char * mmf_name, Win32IpcVideoInfo * info)
{
UINT8 *data = pkt;
size_t len;
if (!pkt || pkt_size < WIN32_IPC_PKT_HAVE_DATA_SIZE)
return FALSE;
if (win32_ipc_pkt_type_from_raw (pkt[0]) != WIN32_IPC_PKT_HAVE_DATA)
return FALSE;
data++;
READ_UINT64 (data, seq_num);
len = strnlen ((const char *) data, pkt_size - (data - pkt));
if (len == 0)
return FALSE;
len++;
if (pkt_size < WIN32_IPC_PKT_HAVE_DATA_SIZE + len)
return FALSE;
strcpy (mmf_name, (const char *) data);
data += len;
READ_UINT32 (data, &info->format);
READ_UINT32 (data, &info->width);
READ_UINT32 (data, &info->height);
READ_UINT32 (data, &info->fps_n);
READ_UINT32 (data, &info->fps_d);
READ_UINT32 (data, &info->par_n);
READ_UINT32 (data, &info->par_d);
READ_UINT64 (data, &info->size);
for (UINT i = 0; i < 4; i++)
READ_UINT64 (data, &info->offset[i]);
for (UINT i = 0; i < 4; i++)
READ_UINT32 (data, &info->stride[i]);
READ_UINT64 (data, &info->qpc);
return TRUE;
}
UINT32
win32_ipc_pkt_build_read_done (UINT8 * pkt, UINT32 pkt_len, UINT64 seq_num)
{
UINT8 *data = pkt;
if (!pkt || pkt_len < WIN32_IPC_PKT_READ_DONE_SIZE)
return 0;
data[0] = win32_ipc_pkt_type_to_raw (WIN32_IPC_PKT_READ_DONE);
data++;
WRITE_UINT64 (data, seq_num);
return WIN32_IPC_PKT_READ_DONE_SIZE;
}
BOOL
win32_ipc_pkt_parse_read_done (UINT8 * pkt, UINT32 pkt_len, UINT64 * seq_num)
{
UINT8 *data = pkt;
if (!pkt || pkt_len < WIN32_IPC_PKT_READ_DONE_SIZE)
return FALSE;
if (win32_ipc_pkt_type_from_raw (data[0]) != WIN32_IPC_PKT_READ_DONE)
return FALSE;
data++;
READ_UINT64 (data, seq_num);
return TRUE;
}

View file

@ -0,0 +1,243 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*/
#pragma once
#include <windows.h>
#include <gst/gst.h>
G_BEGIN_DECLS
/*
* Communication Sequence
*
* +--------+ +--------+
* | client | | server |
* +--------+ +--------+
* | |
* +--------- NEED-DATA ---------->|
* | +-------+
* | | prepare named
* | | shared-memory
* | +<------+
* +<-- HAVE-DATA (w/ shm name) ---|
* +--------+ |
* Open named | |
* shared-memory | |
* +------->+ |
* |--------- READ-DONE ---------->|
*/
typedef enum
{
WIN32_IPC_PKT_UNKNOWN,
WIN32_IPC_PKT_NEED_DATA,
WIN32_IPC_PKT_HAVE_DATA,
WIN32_IPC_PKT_READ_DONE,
} Win32IpcPktType;
/* Same as GstVideoFormat */
typedef enum
{
WIN32_IPC_VIDEO_FORMAT_UNKNOWN,
WIN32_IPC_VIDEO_FORMAT_ENCODED,
WIN32_IPC_VIDEO_FORMAT_I420,
WIN32_IPC_VIDEO_FORMAT_YV12,
WIN32_IPC_VIDEO_FORMAT_YUY2,
WIN32_IPC_VIDEO_FORMAT_UYVY,
WIN32_IPC_VIDEO_FORMAT_AYUV,
WIN32_IPC_VIDEO_FORMAT_RGBx,
WIN32_IPC_VIDEO_FORMAT_BGRx,
WIN32_IPC_VIDEO_FORMAT_xRGB,
WIN32_IPC_VIDEO_FORMAT_xBGR,
WIN32_IPC_VIDEO_FORMAT_RGBA,
WIN32_IPC_VIDEO_FORMAT_BGRA,
WIN32_IPC_VIDEO_FORMAT_ARGB,
WIN32_IPC_VIDEO_FORMAT_ABGR,
WIN32_IPC_VIDEO_FORMAT_RGB,
WIN32_IPC_VIDEO_FORMAT_BGR,
WIN32_IPC_VIDEO_FORMAT_Y41B,
WIN32_IPC_VIDEO_FORMAT_Y42B,
WIN32_IPC_VIDEO_FORMAT_YVYU,
WIN32_IPC_VIDEO_FORMAT_Y444,
WIN32_IPC_VIDEO_FORMAT_v210,
WIN32_IPC_VIDEO_FORMAT_v216,
WIN32_IPC_VIDEO_FORMAT_NV12,
WIN32_IPC_VIDEO_FORMAT_NV21,
WIN32_IPC_VIDEO_FORMAT_GRAY8,
WIN32_IPC_VIDEO_FORMAT_GRAY16_BE,
WIN32_IPC_VIDEO_FORMAT_GRAY16_LE,
WIN32_IPC_VIDEO_FORMAT_v308,
WIN32_IPC_VIDEO_FORMAT_RGB16,
WIN32_IPC_VIDEO_FORMAT_BGR16,
WIN32_IPC_VIDEO_FORMAT_RGB15,
WIN32_IPC_VIDEO_FORMAT_BGR15,
WIN32_IPC_VIDEO_FORMAT_UYVP,
WIN32_IPC_VIDEO_FORMAT_A420,
WIN32_IPC_VIDEO_FORMAT_RGB8P,
WIN32_IPC_VIDEO_FORMAT_YUV9,
WIN32_IPC_VIDEO_FORMAT_YVU9,
WIN32_IPC_VIDEO_FORMAT_IYU1,
WIN32_IPC_VIDEO_FORMAT_ARGB64,
WIN32_IPC_VIDEO_FORMAT_AYUV64,
WIN32_IPC_VIDEO_FORMAT_r210,
WIN32_IPC_VIDEO_FORMAT_I420_10BE,
WIN32_IPC_VIDEO_FORMAT_I420_10LE,
WIN32_IPC_VIDEO_FORMAT_I422_10BE,
WIN32_IPC_VIDEO_FORMAT_I422_10LE,
WIN32_IPC_VIDEO_FORMAT_Y444_10BE,
WIN32_IPC_VIDEO_FORMAT_Y444_10LE,
WIN32_IPC_VIDEO_FORMAT_GBR,
WIN32_IPC_VIDEO_FORMAT_GBR_10BE,
WIN32_IPC_VIDEO_FORMAT_GBR_10LE,
WIN32_IPC_VIDEO_FORMAT_NV16,
WIN32_IPC_VIDEO_FORMAT_NV24,
WIN32_IPC_VIDEO_FORMAT_NV12_64Z32,
WIN32_IPC_VIDEO_FORMAT_A420_10BE,
WIN32_IPC_VIDEO_FORMAT_A420_10LE,
WIN32_IPC_VIDEO_FORMAT_A422_10BE,
WIN32_IPC_VIDEO_FORMAT_A422_10LE,
WIN32_IPC_VIDEO_FORMAT_A444_10BE,
WIN32_IPC_VIDEO_FORMAT_A444_10LE,
WIN32_IPC_VIDEO_FORMAT_NV61,
WIN32_IPC_VIDEO_FORMAT_P010_10BE,
WIN32_IPC_VIDEO_FORMAT_P010_10LE,
WIN32_IPC_VIDEO_FORMAT_IYU2,
WIN32_IPC_VIDEO_FORMAT_VYUY,
WIN32_IPC_VIDEO_FORMAT_GBRA,
WIN32_IPC_VIDEO_FORMAT_GBRA_10BE,
WIN32_IPC_VIDEO_FORMAT_GBRA_10LE,
WIN32_IPC_VIDEO_FORMAT_GBR_12BE,
WIN32_IPC_VIDEO_FORMAT_GBR_12LE,
WIN32_IPC_VIDEO_FORMAT_GBRA_12BE,
WIN32_IPC_VIDEO_FORMAT_GBRA_12LE,
WIN32_IPC_VIDEO_FORMAT_I420_12BE,
WIN32_IPC_VIDEO_FORMAT_I420_12LE,
WIN32_IPC_VIDEO_FORMAT_I422_12BE,
WIN32_IPC_VIDEO_FORMAT_I422_12LE,
WIN32_IPC_VIDEO_FORMAT_Y444_12BE,
WIN32_IPC_VIDEO_FORMAT_Y444_12LE,
WIN32_IPC_VIDEO_FORMAT_GRAY10_LE32,
WIN32_IPC_VIDEO_FORMAT_NV12_10LE32,
WIN32_IPC_VIDEO_FORMAT_NV16_10LE32,
WIN32_IPC_VIDEO_FORMAT_NV12_10LE40,
WIN32_IPC_VIDEO_FORMAT_Y210,
WIN32_IPC_VIDEO_FORMAT_Y410,
WIN32_IPC_VIDEO_FORMAT_VUYA,
WIN32_IPC_VIDEO_FORMAT_BGR10A2_LE,
WIN32_IPC_VIDEO_FORMAT_RGB10A2_LE,
WIN32_IPC_VIDEO_FORMAT_Y444_16BE,
WIN32_IPC_VIDEO_FORMAT_Y444_16LE,
WIN32_IPC_VIDEO_FORMAT_P016_BE,
WIN32_IPC_VIDEO_FORMAT_P016_LE,
WIN32_IPC_VIDEO_FORMAT_P012_BE,
WIN32_IPC_VIDEO_FORMAT_P012_LE,
WIN32_IPC_VIDEO_FORMAT_Y212_BE,
WIN32_IPC_VIDEO_FORMAT_Y212_LE,
WIN32_IPC_VIDEO_FORMAT_Y412_BE,
WIN32_IPC_VIDEO_FORMAT_Y412_LE,
WIN32_IPC_VIDEO_FORMAT_NV12_4L4,
WIN32_IPC_VIDEO_FORMAT_NV12_32L32,
WIN32_IPC_VIDEO_FORMAT_RGBP,
WIN32_IPC_VIDEO_FORMAT_BGRP,
WIN32_IPC_VIDEO_FORMAT_AV12,
WIN32_IPC_VIDEO_FORMAT_ARGB64_LE,
WIN32_IPC_VIDEO_FORMAT_ARGB64_BE,
WIN32_IPC_VIDEO_FORMAT_RGBA64_LE,
WIN32_IPC_VIDEO_FORMAT_RGBA64_BE,
WIN32_IPC_VIDEO_FORMAT_BGRA64_LE,
WIN32_IPC_VIDEO_FORMAT_BGRA64_BE,
WIN32_IPC_VIDEO_FORMAT_ABGR64_LE,
WIN32_IPC_VIDEO_FORMAT_ABGR64_BE,
WIN32_IPC_VIDEO_FORMAT_NV12_16L32S,
WIN32_IPC_VIDEO_FORMAT_NV12_8L128,
WIN32_IPC_VIDEO_FORMAT_NV12_10BE_8L128,
} Win32IpcVideoFormat;
typedef struct
{
Win32IpcVideoFormat format;
UINT32 width;
UINT32 height;
UINT32 fps_n;
UINT32 fps_d;
UINT32 par_n;
UINT32 par_d;
/* the size of memory */
UINT64 size;
/* plane offsets */
UINT64 offset[4];
/* stride of each plane */
UINT32 stride[4];
/* QPC time */
UINT64 qpc;
} Win32IpcVideoInfo;
/* 1 byte (type) + 8 byte (seq-num) */
#define WIN32_IPC_PKT_NEED_DATA_SIZE 9
/* 1 byte (type) + 8 byte (seq-num) + N bytes (name) + 4 (format) +
* 4 (width) + 4 (height) + 4 (fps_n) + 4 (fps_d) + 4 (par_n) + 4 (par_d) +
* 8 (size) + 8 * 4 (offset) + 4 * 4 (stride) + 8 (timestamp) */
#define WIN32_IPC_PKT_HAVE_DATA_SIZE 101
/* 1 byte (type) + 8 byte (seq-num) */
#define WIN32_IPC_PKT_READ_DONE_SIZE 5
const char * win32_ipc_pkt_type_to_string (Win32IpcPktType type);
Win32IpcPktType win32_ipc_pkt_type_from_raw (UINT8 type);
UINT8 win32_ipc_pkt_type_to_raw (Win32IpcPktType type);
UINT32 win32_ipc_pkt_build_need_data (UINT8 * pkt,
UINT32 pkt_size,
UINT64 seq_num);
BOOL win32_ipc_pkt_parse_need_data (UINT8 * pkt,
UINT32 pkt_size,
UINT64 * seq_num);
UINT32 win32_ipc_pkt_build_have_data (UINT8 * pkt,
UINT32 pkt_size,
UINT64 seq_num,
const char * mmf_name,
const Win32IpcVideoInfo * info);
BOOL win32_ipc_pkt_parse_have_data (UINT8 * pkt,
UINT32 pkt_size,
UINT64 * seq_num,
char * mmf_name,
Win32IpcVideoInfo * info);
UINT32 win32_ipc_pkt_build_read_done (UINT8 * pkt,
UINT32 pkt_size,
UINT64 seq_num);
BOOL win32_ipc_pkt_parse_read_done (UINT8 * pkt,
UINT32 pkt_size,
UINT64 * seq_num);
G_END_DECLS

View file

@ -0,0 +1,54 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*/
#include "win32ipcutils.h"
#include <string>
#include <locale>
#include <codecvt>
#include <algorithm>
static inline void rtrim(std::string &s) {
s.erase (std::find_if (s.rbegin(), s.rend(),
[](unsigned char ch) {
return !std::isspace (ch);
}).base (), s.end ());
}
std::string
win32_ipc_error_message (DWORD error_code)
{
WCHAR buffer[1024];
if (!FormatMessageW (FORMAT_MESSAGE_IGNORE_INSERTS |
FORMAT_MESSAGE_FROM_SYSTEM, nullptr, error_code, 0, buffer,
1024, nullptr)) {
return std::string ("");
}
std::wstring_convert<std::codecvt_utf8<wchar_t>, wchar_t> converter;
std::string ret = converter.to_bytes (buffer);
rtrim (ret);
return ret;
}

View file

@ -0,0 +1,30 @@
/* GStreamer
* Copyright (C) 2022 Seungha Yang <seungha@centricular.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* SPDX-License-Identifier: MIT
*/
#pragma once
#include <windows.h>
#include <string>
std::string win32_ipc_error_message (DWORD error_code);