ipcpipeline: introduce new plugin for inter-process pipelines

These elements allow splitting a pipeline across several processes,
with communication done by the ipcpipelinesink and ipcpipelinesrc
elements. The main use case is to split a playback pipeline into
a process that runs networking, parser & demuxer and another process
that runs the decoder & sink, for security reasons.

https://bugzilla.gnome.org/show_bug.cgi?id=752214
This commit is contained in:
George Kiagiadakis 2017-07-05 16:50:22 +03:00
parent b89c94b37e
commit 3089d142b0
12 changed files with 4643 additions and 0 deletions

View file

@ -469,6 +469,7 @@ AG_GST_CHECK_PLUGIN(gdp)
AG_GST_CHECK_PLUGIN(id3tag)
AG_GST_CHECK_PLUGIN(inter)
AG_GST_CHECK_PLUGIN(interlace)
AG_GST_CHECK_PLUGIN(ipcpipeline)
AG_GST_CHECK_PLUGIN(ivfparse)
AG_GST_CHECK_PLUGIN(ivtc)
AG_GST_CHECK_PLUGIN(jp2kdecimator)
@ -3587,6 +3588,7 @@ gst/gdp/Makefile
gst/id3tag/Makefile
gst/inter/Makefile
gst/interlace/Makefile
gst/ipcpipeline/Makefile
gst/ivfparse/Makefile
gst/ivtc/Makefile
gst/jp2kdecimator/Makefile

View file

@ -0,0 +1,28 @@
plugin_LTLIBRARIES = libgstipcpipeline.la
libgstipcpipeline_la_SOURCES = \
gstipcpipeline.c \
gstipcpipelinecomm.c \
gstipcpipelinesink.c \
gstipcpipelinesrc.c \
gstipcslavepipeline.c
noinst_HEADERS = \
gstipcpipelinecomm.h \
gstipcpipelinesink.h \
gstipcpipelinesrc.h \
gstipcslavepipeline.h
libgstipcpipeline_la_CFLAGS = \
$(GST_PLUGINS_BAD_CFLAGS) \
$(GST_PLUGINS_BASE_CFLAGS) \
$(GST_BASE_CFLAGS) \
$(GST_CFLAGS)
libgstipcpipeline_la_LIBADD = \
$(GST_PLUGINS_BASE_LIBS) \
$(GST_BASE_LIBS) \
$(GST_LIBS) \
$(LIBM)
libgstipcpipeline_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)

View file

@ -0,0 +1,48 @@
/* GStreamer
* Copyright (C) 2017 YouView TV Ltd
* Author: George Kiagiadakis <george.Kiagiadakis@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
* Boston, MA 02110-1335, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstipcpipelinecomm.h"
#include "gstipcpipelinesink.h"
#include "gstipcpipelinesrc.h"
#include "gstipcslavepipeline.h"
static gboolean
plugin_init (GstPlugin * plugin)
{
gst_ipc_pipeline_comm_plugin_init ();
gst_element_register (plugin, "ipcpipelinesrc", GST_RANK_NONE,
GST_TYPE_IPC_PIPELINE_SRC);
gst_element_register (plugin, "ipcpipelinesink", GST_RANK_NONE,
GST_TYPE_IPC_PIPELINE_SINK);
gst_element_register (plugin, "ipcslavepipeline", GST_RANK_NONE,
GST_TYPE_IPC_SLAVE_PIPELINE);
return TRUE;
}
GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
GST_VERSION_MINOR,
ipcpipeline,
"plugin for inter-process pipeline communication",
plugin_init, VERSION, "LGPL", PACKAGE_NAME, GST_PACKAGE_ORIGIN)

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,132 @@
/* GStreamer
* Copyright (C) 2015-2017 YouView TV Ltd
* Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
*
* gstipcpipelinecomm.h:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_IPC_PIPELINE_COMM_H__
#define __GST_IPC_PIPELINE_COMM_H__
#include <glib.h>
#include <gst/gst.h>
#include <gst/base/gstadapter.h>
G_BEGIN_DECLS
#define GST_FLOW_COMM_ERROR GST_FLOW_CUSTOM_ERROR_1
extern GQuark QUARK_ID;
typedef enum {
GST_IPC_PIPELINE_COMM_STATE_TYPE = 0,
/* for the rest of the states we use directly the data type enums below */
} GstIpcPipelineCommState;
typedef enum {
/* reply types */
GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK = 1,
GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT,
/* data send types */
GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER,
GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT,
GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT,
GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY,
GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE,
GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST,
GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE,
GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE,
} GstIpcPipelineCommDataType;
typedef struct
{
GstElement *element;
GMutex mutex;
int fdin;
int fdout;
GHashTable *waiting_ids;
GThread *reader_thread;
gboolean reader_thread_stopping;
volatile gint thread_running;
int reader_thread_stopping_pipe[2];
GstAdapter *adapter;
guint8 state;
guint32 send_id;
guint32 payload_length;
guint32 id;
guint read_chunk_size;
GstClockTime ack_time;
void (*on_buffer) (guint32, GstBuffer *, gpointer);
void (*on_event) (guint32, GstEvent *, gboolean, gpointer);
void (*on_query) (guint32, GstQuery *, gboolean, gpointer);
void (*on_state_change) (guint32, GstStateChange, gpointer);
void (*on_state_lost) (gpointer);
void (*on_message) (guint32, GstMessage *, gpointer);
gpointer user_data;
} GstIpcPipelineComm;
void gst_ipc_pipeline_comm_plugin_init (void);
void gst_ipc_pipeline_comm_init (GstIpcPipelineComm *comm, GstElement *e);
void gst_ipc_pipeline_comm_clear (GstIpcPipelineComm *comm);
void gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm,
gboolean flushing);
void gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
guint32 id, GstFlowReturn ret);
void gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
guint32 id, gboolean ret);
void gst_ipc_pipeline_comm_write_state_change_ack_to_fd (
GstIpcPipelineComm * comm, guint32 id, GstStateChangeReturn ret);
void gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
guint32 id, gboolean result, GstQuery *query);
GstFlowReturn gst_ipc_pipeline_comm_write_buffer_to_fd (
GstIpcPipelineComm * comm, GstBuffer * buffer);
gboolean gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
gboolean upstream, GstEvent * event);
gboolean gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
gboolean upstream, GstQuery * query);
GstStateChangeReturn gst_ipc_pipeline_comm_write_state_change_to_fd (
GstIpcPipelineComm * comm, GstStateChange transition);
void gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm);
gboolean gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
GstMessage *message);
gboolean gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
void (*on_buffer) (guint32, GstBuffer *, gpointer),
void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
void (*on_state_change) (guint32, GstStateChange, gpointer),
void (*on_state_lost) (gpointer),
void (*on_message) (guint32, GstMessage *, gpointer),
gpointer user_data);
void gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm);
G_END_DECLS
#endif

View file

@ -0,0 +1,722 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2005 Wim Taymans <wim@fluendo.com>
* 2006 Thomas Vander Stichele <thomas at apestaart dot org>
* 2014 Tim-Philipp Müller <tim centricular com>
* 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
*
* gstipcpipelinesink.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-ipcpipelinesink
* @see_also: #GstIpcPipelineSrc, #GstIpcSlavePipeline
*
* Communicates with an ipcpipelinesrc element in another process via a socket.
*
* This element, together with ipcpipelinesrc and ipcslavepipeline form a
* mechanism that allows splitting a single pipeline in different processes.
* The main use-case for it is a playback pipeline split in two parts, where the
* first part contains the networking, parsing and demuxing and the second part
* contains the decoding and display. The intention of this split is to improve
* security of an application, by letting the networking, parsing and demuxing
* parts run in a less privileged process than the process that accesses the
* decoder and display.
*
* Once the pipelines in those different processes have been created, the
* playback can be controlled entirely from the first pipeline, which is the
* one that contains ipcpipelinesink. We call this pipeline the master.
* All relevant events and queries sent from the application are sent to
* the master pipeline and messages to the application are sent from the master
* pipeline. The second pipeline, in the other process, is transparently slaved.
*
* ipcpipelinesink can work only in push mode and does not synchronize buffers
* to the clock. Synchronization is meant to happen either at the real sink at
* the end of the remote slave pipeline, or not to happen at all, if the
* pipeline is live.
*
* A master pipeline may contain more than one ipcpipelinesink elements, which
* can be connected either to the same slave pipeline or to different ones.
*
* Communication with ipcpipelinesrc on the slave happens via a socket, using a
* custom protocol. Each buffer, event, query, message or state change is
* serialized in a "packet" and sent over the socket. The sender then
* performs a blocking wait for a reply, if a return code is needed.
*
* All objects that contan a GstStructure (messages, queries, events) are
* serialized by serializing the GstStructure to a string
* (gst_structure_to_string). This implies some limitations, of course.
* All fields of this structures that are not serializable to strings (ex.
* object pointers) are ignored, except for some cases where custom
* serialization may occur (ex error/warning/info messages that contain a
* GError are serialized differently).
*
* Buffers are transported by writing their content directly on the socket.
* More efficient ways for memory sharing could be implemented in the future.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <unistd.h>
#include "gstipcpipelinesink.h"
#include <string.h>
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_sink_debug);
#define GST_CAT_DEFAULT gst_ipc_pipeline_sink_debug
enum
{
SIGNAL_DISCONNECT,
/* FILL ME */
LAST_SIGNAL
};
static guint gst_ipc_pipeline_sink_signals[LAST_SIGNAL] = { 0 };
enum
{
PROP_0,
PROP_FDIN,
PROP_FDOUT,
PROP_READ_CHUNK_SIZE,
PROP_ACK_TIME,
};
#define DEFAULT_READ_CHUNK_SIZE 4096
#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
#define _do_init \
GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_sink_debug, "ipcpipelinesink", 0, "ipcpipelinesink element");
#define gst_ipc_pipeline_sink_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSink, gst_ipc_pipeline_sink,
GST_TYPE_ELEMENT, _do_init);
static void gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_ipc_pipeline_sink_dispose (GObject * obj);
static void gst_ipc_pipeline_sink_finalize (GObject * obj);
static gboolean gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink *
sink);
static void gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink *
sink);
static GstStateChangeReturn gst_ipc_pipeline_sink_change_state (GstElement *
element, GstStateChange transition);
static GstFlowReturn gst_ipc_pipeline_sink_chain (GstPad * pad,
GstObject * parent, GstBuffer * buffer);
static gboolean gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static gboolean gst_ipc_pipeline_sink_element_query (GstElement * element,
GstQuery * query);
static gboolean gst_ipc_pipeline_sink_send_event (GstElement * element,
GstEvent * event);
static gboolean gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent,
GstQuery * query);
static gboolean gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active);
static void gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink);
static void pusher (gpointer data, gpointer user_data);
static void
gst_ipc_pipeline_sink_class_init (GstIpcPipelineSinkClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = G_OBJECT_CLASS (klass);
gstelement_class = GST_ELEMENT_CLASS (klass);
gobject_class->set_property = gst_ipc_pipeline_sink_set_property;
gobject_class->get_property = gst_ipc_pipeline_sink_get_property;
gobject_class->dispose = gst_ipc_pipeline_sink_dispose;
gobject_class->finalize = gst_ipc_pipeline_sink_finalize;
g_object_class_install_property (gobject_class, PROP_FDIN,
g_param_spec_int ("fdin", "Input file descriptor",
"File descriptor to received data from",
-1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_FDOUT,
g_param_spec_int ("fdout", "Output file descriptor",
"File descriptor to send data through",
-1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
g_param_spec_uint ("read-chunk-size", "Read chunk size",
"Read chunk size",
1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_ACK_TIME,
g_param_spec_uint64 ("ack-time", "Ack time",
"Maximum time to wait for a response to a message",
0, G_MAXUINT64, DEFAULT_ACK_TIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_ipc_pipeline_sink_signals[SIGNAL_DISCONNECT] =
g_signal_new ("disconnect",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstIpcPipelineSinkClass, disconnect),
NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
gst_element_class_set_static_metadata (gstelement_class,
"Inter-process Pipeline Sink",
"Sink",
"Allows splitting and continuing a pipeline in another process",
"Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_change_state);
gstelement_class->query =
GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_element_query);
gstelement_class->send_event =
GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_send_event);
klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_disconnect);
}
static void
gst_ipc_pipeline_sink_init (GstIpcPipelineSink * sink)
{
GstPadTemplate *pad_template;
GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
gst_ipc_pipeline_comm_init (&sink->comm, GST_ELEMENT (sink));
sink->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
sink->comm.ack_time = DEFAULT_ACK_TIME;
sink->comm.fdin = -1;
sink->comm.fdout = -1;
sink->threads = g_thread_pool_new (pusher, sink, -1, FALSE, NULL);
gst_ipc_pipeline_sink_start_reader_thread (sink);
pad_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (sink), "sink");
g_return_if_fail (pad_template != NULL);
sink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
gst_pad_set_activatemode_function (sink->sinkpad,
gst_ipc_pipeline_sink_pad_activate_mode);
gst_pad_set_query_function (sink->sinkpad, gst_ipc_pipeline_sink_query);
gst_pad_set_event_function (sink->sinkpad, gst_ipc_pipeline_sink_event);
gst_pad_set_chain_function (sink->sinkpad, gst_ipc_pipeline_sink_chain);
gst_element_add_pad (GST_ELEMENT_CAST (sink), sink->sinkpad);
}
static void
gst_ipc_pipeline_sink_dispose (GObject * obj)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
gst_ipc_pipeline_sink_stop_reader_thread (sink);
gst_ipc_pipeline_comm_cancel (&sink->comm, TRUE);
G_OBJECT_CLASS (parent_class)->dispose (obj);
}
static void
gst_ipc_pipeline_sink_finalize (GObject * obj)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
gst_ipc_pipeline_comm_clear (&sink->comm);
g_thread_pool_free (sink->threads, TRUE, TRUE);
G_OBJECT_CLASS (parent_class)->finalize (obj);
}
static void
gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
switch (prop_id) {
case PROP_FDIN:
sink->comm.fdin = g_value_get_int (value);
break;
case PROP_FDOUT:
sink->comm.fdout = g_value_get_int (value);
break;
case PROP_READ_CHUNK_SIZE:
sink->comm.read_chunk_size = g_value_get_uint (value);
break;
case PROP_ACK_TIME:
sink->comm.ack_time = g_value_get_uint64 (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
switch (prop_id) {
case PROP_FDIN:
g_value_set_int (value, sink->comm.fdin);
break;
case PROP_FDOUT:
g_value_set_int (value, sink->comm.fdout);
break;
case PROP_READ_CHUNK_SIZE:
g_value_set_uint (value, sink->comm.read_chunk_size);
break;
case PROP_ACK_TIME:
g_value_set_uint64 (value, sink->comm.ack_time);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static gboolean
gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
gboolean ret;
GST_DEBUG_OBJECT (sink, "received event %p of type %s (%d)",
event, gst_event_type_get_name (event->type), event->type);
ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, FALSE, event);
gst_event_unref (event);
return ret;
}
static GstFlowReturn
gst_ipc_pipeline_sink_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
GstFlowReturn ret;
GST_DEBUG_OBJECT (sink, "Rendering buffer %" GST_PTR_FORMAT, buffer);
ret = gst_ipc_pipeline_comm_write_buffer_to_fd (&sink->comm, buffer);
if (ret != GST_FLOW_OK)
GST_DEBUG_OBJECT (sink, "Peer result was %s", gst_flow_get_name (ret));
gst_buffer_unref (buffer);
return ret;
}
static gboolean
gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
gboolean ret;
GST_DEBUG_OBJECT (sink, "Got query %s: %" GST_PTR_FORMAT,
GST_QUERY_TYPE_NAME (query), query);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_ALLOCATION:
GST_DEBUG_OBJECT (sink, "Rejecting ALLOCATION query");
return FALSE;
case GST_QUERY_CAPS:
{
/* caps queries occur even while linking the pipeline.
* It is possible that the ipcpipelinesrc may not be connected at this
* point, so let's avoid a couple of errors... */
GstState state;
GST_OBJECT_LOCK (sink);
state = GST_STATE (sink);
GST_OBJECT_UNLOCK (sink);
if (state == GST_STATE_NULL)
return FALSE;
}
default:
break;
}
ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, FALSE, query);
return ret;
}
static gboolean
gst_ipc_pipeline_sink_element_query (GstElement * element, GstQuery * query)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
gboolean ret;
GST_DEBUG_OBJECT (sink, "Got element query %s: %" GST_PTR_FORMAT,
GST_QUERY_TYPE_NAME (query), query);
ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, TRUE, query);
GST_DEBUG_OBJECT (sink, "Got query reply: %d: %" GST_PTR_FORMAT, ret, query);
return ret;
}
static gboolean
gst_ipc_pipeline_sink_send_event (GstElement * element, GstEvent * event)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
gboolean ret;
GST_DEBUG_OBJECT (sink, "Got element event %s: %" GST_PTR_FORMAT,
GST_EVENT_TYPE_NAME (event), event);
ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, TRUE, event);
GST_DEBUG_OBJECT (sink, "Got event reply: %d: %" GST_PTR_FORMAT, ret, event);
gst_event_unref (event);
return ret;
}
static gboolean
gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active)
{
if (mode == GST_PAD_MODE_PULL)
return FALSE;
return TRUE;
}
static void
on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
GST_ERROR_OBJECT (sink,
"Got buffer id %u! I never knew buffers could go upstream...", id);
gst_buffer_unref (buffer);
}
static void
pusher (gpointer data, gpointer user_data)
{
GstIpcPipelineSink *sink = user_data;
gboolean ret;
guint32 id;
id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (data),
QUARK_ID));
if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT (data);
GST_DEBUG_OBJECT (sink, "Pushing event async: %" GST_PTR_FORMAT, event);
ret = gst_pad_push_event (sink->sinkpad, event);
GST_DEBUG_OBJECT (sink, "Event pushed, return %d", ret);
gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, ret);
} else if (GST_IS_QUERY (data)) {
GstQuery *query = GST_QUERY (data);
GST_DEBUG_OBJECT (sink, "Pushing query async: %" GST_PTR_FORMAT, query);
ret = gst_pad_peer_query (sink->sinkpad, query);
GST_DEBUG_OBJECT (sink, "Query pushed, return %d", ret);
gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, ret,
query);
gst_query_unref (query);
} else {
GST_ERROR_OBJECT (sink, "Unsupported object type");
}
gst_object_unref (sink);
}
static void
on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
if (!upstream) {
GST_ERROR_OBJECT (sink, "Got downstream event id %u! Not supposed to...",
id);
gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, FALSE);
gst_event_unref (event);
return;
}
GST_DEBUG_OBJECT (sink, "Got event id %u: %" GST_PTR_FORMAT, id, event);
gst_object_ref (sink);
g_thread_pool_push (sink->threads, event, NULL);
}
static void
on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
if (!upstream) {
GST_ERROR_OBJECT (sink, "Got downstream query id %u! Not supposed to...",
id);
gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, FALSE,
query);
gst_query_unref (query);
return;
}
GST_DEBUG_OBJECT (sink, "Got query id %u: %" GST_PTR_FORMAT, id, query);
gst_object_ref (sink);
g_thread_pool_push (sink->threads, query, NULL);
}
static void
on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
GST_ERROR_OBJECT (sink, "Got state change id %u! Not supposed to...", id);
}
static void
on_state_lost (gpointer user_data)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
GST_DEBUG_OBJECT (sink, "Got state lost notification, losing state");
GST_OBJECT_LOCK (sink);
sink->pass_next_async_done = TRUE;
GST_OBJECT_UNLOCK (sink);
gst_element_lost_state (GST_ELEMENT (sink));
}
static void
do_async_done (GstElement * element, gpointer user_data)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
GstMessage *message = user_data;
GST_STATE_LOCK (sink);
GST_OBJECT_LOCK (sink);
if (sink->pass_next_async_done) {
sink->pass_next_async_done = FALSE;
GST_OBJECT_UNLOCK (sink);
gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS);
GST_STATE_UNLOCK (sink);
gst_element_post_message (element, gst_message_ref (message));
} else {
GST_OBJECT_UNLOCK (sink);
GST_STATE_UNLOCK (sink);
}
}
static void
on_message (guint32 id, GstMessage * message, gpointer user_data)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
GST_DEBUG_OBJECT (sink, "Got message id %u: %" GST_PTR_FORMAT, id, message);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ASYNC_DONE:
GST_OBJECT_LOCK (sink);
if (sink->pass_next_async_done) {
GST_OBJECT_UNLOCK (sink);
gst_element_call_async (GST_ELEMENT (sink), do_async_done,
message, (GDestroyNotify) gst_message_unref);
} else {
GST_OBJECT_UNLOCK (sink);
gst_message_unref (message);
}
return;
default:
break;
}
gst_element_post_message (GST_ELEMENT (sink), message);
}
static gboolean
gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * sink)
{
if (!gst_ipc_pipeline_comm_start_reader_thread (&sink->comm, on_buffer,
on_event, on_query, on_state_change, on_state_lost, on_message,
sink)) {
GST_ERROR_OBJECT (sink, "Failed to start reader thread");
return FALSE;
}
return TRUE;
}
static void
gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * sink)
{
gst_ipc_pipeline_comm_stop_reader_thread (&sink->comm);
}
static void
gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink)
{
GST_DEBUG_OBJECT (sink, "Disconnecting");
gst_ipc_pipeline_sink_stop_reader_thread (sink);
sink->comm.fdin = -1;
sink->comm.fdout = -1;
gst_ipc_pipeline_comm_cancel (&sink->comm, FALSE);
gst_ipc_pipeline_sink_start_reader_thread (sink);
}
static GstStateChangeReturn
gst_ipc_pipeline_sink_change_state (GstElement * element,
GstStateChange transition)
{
GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
GstStateChangeReturn peer_ret = GST_STATE_CHANGE_SUCCESS;
gboolean async = FALSE;
gboolean down = FALSE;
GST_DEBUG_OBJECT (sink, "Got state change request: %s -> %s",
gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
if (sink->comm.fdin < 0) {
GST_ERROR_OBJECT (element, "Invalid fdin: %d", sink->comm.fdin);
return GST_STATE_CHANGE_FAILURE;
}
if (sink->comm.fdout < 0) {
GST_ERROR_OBJECT (element, "Invalid fdout: %d", sink->comm.fdout);
return GST_STATE_CHANGE_FAILURE;
}
if (!sink->comm.reader_thread) {
GST_ERROR_OBJECT (element, "Failed to start reader thread");
return GST_STATE_CHANGE_FAILURE;
}
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
/* In these transitions, it is possible that the peer returns ASYNC.
* We don't know that in advance, but we post async-start anyway because
* it needs to be delivered *before* async-done, and async-done may
* arrive at any point in time after we've set the state of the peer.
* In case the peer doesn't return ASYNC, we just post async-done
* ourselves and the parent GstBin takes care of matching and deleting
* them, so the app never gets any of these. */
async = TRUE;
break;
default:
break;
}
/* downwards state change */
down = (GST_STATE_TRANSITION_CURRENT (transition) >=
GST_STATE_TRANSITION_NEXT (transition));
if (async) {
GST_DEBUG_OBJECT (sink,
"Posting async-start for %s, will need state-change-done",
gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
gst_element_post_message (GST_ELEMENT (sink),
gst_message_new_async_start (GST_OBJECT (sink)));
GST_OBJECT_LOCK (sink);
sink->pass_next_async_done = TRUE;
GST_OBJECT_UNLOCK (sink);
}
/* change the state of the peer first */
/* If the fd out is -1, we do not actually call the peer. This will happen
when we explicitely disconnected, and in that case we want to be able
to bring the element down to NULL, so it can be restarted with a new
slave pipeline. */
if (sink->comm.fdout >= 0) {
GST_DEBUG_OBJECT (sink, "Calling peer with state change");
peer_ret = gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
transition);
if (ret == GST_STATE_CHANGE_FAILURE && down) {
GST_WARNING_OBJECT (sink, "Peer returned state change failure, "
"but ignoring because we are going down");
peer_ret = GST_STATE_CHANGE_SUCCESS;
}
} else {
if (down) {
GST_WARNING_OBJECT (sink, "Not calling peer (fdout %d)",
sink->comm.fdout);
peer_ret = GST_STATE_CHANGE_SUCCESS;
} else {
GST_ERROR_OBJECT (sink, "Not calling peer (fdout %d) and failing",
sink->comm.fdout);
peer_ret = GST_STATE_CHANGE_FAILURE;
}
}
/* chain up to the parent class to change our state, if the peer succeeded */
if (peer_ret != GST_STATE_CHANGE_FAILURE) {
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
if (G_UNLIKELY (ret == GST_STATE_CHANGE_FAILURE && down)) {
GST_WARNING_OBJECT (sink, "Parent returned state change failure, "
"but ignoring because we are going down");
ret = GST_STATE_CHANGE_SUCCESS;
}
}
GST_DEBUG_OBJECT (sink, "For %s -> %s: Peer ret: %s, parent ret: %s",
gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)),
gst_element_state_change_return_get_name (peer_ret),
gst_element_state_change_return_get_name (ret));
/* now interpret the return codes */
if (async && peer_ret != GST_STATE_CHANGE_ASYNC) {
GST_DEBUG_OBJECT (sink, "Posting async-done for %s; peer wasn't ASYNC",
gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
GST_OBJECT_LOCK (sink);
sink->pass_next_async_done = FALSE;
GST_OBJECT_UNLOCK (sink);
gst_element_post_message (GST_ELEMENT (sink),
gst_message_new_async_done (GST_OBJECT (sink), GST_CLOCK_TIME_NONE));
} else if (G_UNLIKELY (!async && peer_ret == GST_STATE_CHANGE_ASYNC)) {
GST_WARNING_OBJECT (sink, "Transition not async but peer returned ASYNC");
peer_ret = GST_STATE_CHANGE_SUCCESS;
}
if (peer_ret == GST_STATE_CHANGE_FAILURE || ret == GST_STATE_CHANGE_FAILURE) {
if (peer_ret != GST_STATE_CHANGE_FAILURE && sink->comm.fdout >= 0) {
/* only the parent's ret was FAILURE - revert remote changes */
GST_DEBUG_OBJECT (sink, "Reverting remote state change because parent "
"returned failure");
gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
GST_STATE_TRANSITION (GST_STATE_TRANSITION_NEXT (transition),
GST_STATE_TRANSITION_CURRENT (transition)));
}
return GST_STATE_CHANGE_FAILURE;
}
/* the parent's (GstElement) state change func won't return ASYNC or
* NO_PREROLL, so unless it has returned FAILURE, which we have catched above,
* we are not interested in its return code... just return the peer's */
return peer_ret;
}

View file

@ -0,0 +1,67 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
* 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
*
* gstipcpipelinesink.h:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_IPC_PIPELINE_SINK_H__
#define __GST_IPC_PIPELINE_SINK_H__
#include <gst/gst.h>
#include "gstipcpipelinecomm.h"
G_BEGIN_DECLS
#define GST_TYPE_IPC_PIPELINE_SINK \
(gst_ipc_pipeline_sink_get_type())
#define GST_IPC_PIPELINE_SINK(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_PIPELINE_SINK,GstIpcPipelineSink))
#define GST_IPC_PIPELINE_SINK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_PIPELINE_SINK,GstIpcPipelineSinkClass))
#define GST_IS_IPC_PIPELINE_SINK(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_PIPELINE_SINK))
#define GST_IS_IPC_PIPELINE_SINK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_PIPELINE_SINK))
#define GST_IPC_PIPELINE_SINK_CAST(obj) ((GstIpcPipelineSink *)obj)
typedef struct _GstIpcPipelineSink GstIpcPipelineSink;
typedef struct _GstIpcPipelineSinkClass GstIpcPipelineSinkClass;
struct _GstIpcPipelineSink {
GstElement element;
GstIpcPipelineComm comm;
GThreadPool *threads;
gboolean pass_next_async_done;
GstPad *sinkpad;
};
struct _GstIpcPipelineSinkClass {
GstElementClass parent_class;
void (*disconnect) (GstIpcPipelineSink * sink);
};
G_GNUC_INTERNAL GType gst_ipc_pipeline_sink_get_type (void);
G_END_DECLS
#endif /* __GST_IPC_PIPELINE_SINK_H__ */

View file

@ -0,0 +1,954 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wim@fluendo.com>
* 2006 Thomas Vander Stichele <thomas at apestaart dot org>
* 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
*
* gstipcpipelinesrc.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-ipcpipelinesrc
* @see_also: #GstIpcPipelineSink, #GstIpcSlavePipeline
*
* Communicates with an ipcpipelinesink element in another process via a socket.
*
* The ipcpipelinesrc element allows 2-way communication with an ipcpipelinesink
* element on another process/pipeline. It is meant to run inside an
* interslavepipeline and when paired with an ipcpipelinesink, it will slave its
* whole parent pipeline to the "master" one, which contains the ipcpipelinesink.
*
* For more details about this mechanism and its uses, see the documentation
* of the ipcpipelinesink element.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include "gstipcpipelinesrc.h"
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_src_debug);
#define GST_CAT_DEFAULT gst_ipc_pipeline_src_debug
enum
{
/* FILL ME */
SIGNAL_FORWARD_MESSAGE,
SIGNAL_DISCONNECT,
LAST_SIGNAL
};
static guint gst_ipc_pipeline_src_signals[LAST_SIGNAL] = { 0 };
enum
{
PROP_0,
PROP_FDIN,
PROP_FDOUT,
PROP_READ_CHUNK_SIZE,
PROP_ACK_TIME,
PROP_LAST,
};
static GQuark QUARK_UPSTREAM;
#define DEFAULT_READ_CHUNK_SIZE 65536
#define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
#define _do_init \
GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_src_debug, "ipcpipelinesrc", 0, "ipcpipelinesrc element");
#define gst_ipc_pipeline_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSrc, gst_ipc_pipeline_src,
GST_TYPE_ELEMENT, _do_init);
static void gst_ipc_pipeline_src_finalize (GObject * object);
static void gst_ipc_pipeline_src_dispose (GObject * object);
static void gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src);
static gboolean gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc *
src);
static void gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src);
static gboolean gst_ipc_pipeline_src_activate_mode (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active);
static gboolean gst_ipc_pipeline_src_srcpad_event (GstPad * pad,
GstObject * parent, GstEvent * event);
static gboolean gst_ipc_pipeline_src_srcpad_query (GstPad * pad,
GstObject * parent, GstQuery * query);
static void gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src);
static gboolean gst_ipc_pipeline_src_send_event (GstElement * element,
GstEvent * event);
static gboolean gst_ipc_pipeline_src_query (GstElement * element,
GstQuery * query);
static GstStateChangeReturn gst_ipc_pipeline_src_change_state (GstElement *
element, GstStateChange transition);
static gboolean gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src,
GstMessage * msg);
static void gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src);
static void
gst_ipc_pipeline_src_class_init (GstIpcPipelineSrcClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
QUARK_UPSTREAM = g_quark_from_static_string ("ipcpipeline-upstream");
gobject_class->dispose = gst_ipc_pipeline_src_dispose;
gobject_class->finalize = gst_ipc_pipeline_src_finalize;
gobject_class->set_property = gst_ipc_pipeline_src_set_property;
gobject_class->get_property = gst_ipc_pipeline_src_get_property;
gstelement_class->send_event =
GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_send_event);
gstelement_class->query = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_query);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_change_state);
klass->forward_message =
GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_forward_message);
klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_disconnect);
GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_activate_mode);
GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_event);
GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_query);
g_object_class_install_property (gobject_class, PROP_FDIN,
g_param_spec_int ("fdin", "Input file descriptor",
"File descriptor to read data from",
-1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_FDOUT,
g_param_spec_int ("fdout", "Output file descriptor",
"File descriptor to write data through",
-1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
g_param_spec_uint ("read-chunk-size", "Read chunk size",
"Read chunk size",
1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_ACK_TIME,
g_param_spec_uint64 ("ack-time", "Ack time",
"Maximum time to wait for a response to a message",
0, G_MAXUINT64, DEFAULT_ACK_TIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_ipc_pipeline_src_signals[SIGNAL_FORWARD_MESSAGE] =
g_signal_new ("forward-message", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstIpcPipelineSrcClass, forward_message), NULL, NULL,
g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, GST_TYPE_MESSAGE);
gst_ipc_pipeline_src_signals[SIGNAL_DISCONNECT] =
g_signal_new ("disconnect", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstIpcPipelineSrcClass, disconnect),
NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
gst_element_class_set_static_metadata (gstelement_class,
"Inter-process Pipeline Source",
"Source",
"Continues a split pipeline from another process",
"Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&srctemplate));
}
static void
gst_ipc_pipeline_src_init (GstIpcPipelineSrc * src)
{
GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE);
gst_ipc_pipeline_comm_init (&src->comm, GST_ELEMENT (src));
src->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
src->comm.ack_time = DEFAULT_ACK_TIME;
src->flushing = TRUE;
src->last_ret = GST_FLOW_FLUSHING;
src->queued = NULL;
g_cond_init (&src->create_cond);
src->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
gst_pad_set_activatemode_function (src->srcpad,
gst_ipc_pipeline_src_activate_mode);
gst_pad_set_event_function (src->srcpad, gst_ipc_pipeline_src_srcpad_event);
gst_pad_set_query_function (src->srcpad, gst_ipc_pipeline_src_srcpad_query);
gst_element_add_pad (GST_ELEMENT (src), src->srcpad);
gst_ipc_pipeline_src_start_reader_thread (src);
}
static void
gst_ipc_pipeline_src_dispose (GObject * object)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
gst_ipc_pipeline_src_stop_reader_thread (src);
gst_ipc_pipeline_src_cancel_queued (src);
gst_ipc_pipeline_comm_cancel (&src->comm, TRUE);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_ipc_pipeline_src_finalize (GObject * object)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
gst_ipc_pipeline_comm_clear (&src->comm);
g_cond_clear (&src->create_cond);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
switch (prop_id) {
case PROP_FDIN:
src->comm.fdin = g_value_get_int (value);
break;
case PROP_FDOUT:
src->comm.fdout = g_value_get_int (value);
break;
case PROP_READ_CHUNK_SIZE:
src->comm.read_chunk_size = g_value_get_uint (value);
break;
case PROP_ACK_TIME:
src->comm.ack_time = g_value_get_uint64 (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
g_return_if_fail (GST_IS_IPC_PIPELINE_SRC (object));
switch (prop_id) {
case PROP_FDIN:
g_value_set_int (value, src->comm.fdin);
break;
case PROP_FDOUT:
g_value_set_int (value, src->comm.fdout);
break;
case PROP_READ_CHUNK_SIZE:
g_value_set_uint (value, src->comm.read_chunk_size);
break;
case PROP_ACK_TIME:
g_value_set_uint64 (value, src->comm.ack_time);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_ipc_pipeline_src_log_queue (GstIpcPipelineSrc * src)
{
GList *queued;
guint n;
queued = src->queued;
n = 0;
GST_LOG_OBJECT (src, "There are %u objects in the queue",
g_list_length (queued));
while (queued) {
void *object = queued->data;
if (GST_IS_EVENT (object)) {
GST_LOG_OBJECT (src, " #%u: %s event", n, GST_EVENT_TYPE_NAME (object));
} else if (GST_IS_QUERY (object)) {
GST_LOG_OBJECT (src, " #%u: %s query", n, GST_QUERY_TYPE_NAME (object));
} else if (GST_IS_BUFFER (object)) {
GST_LOG_OBJECT (src, " #%u: %zu bytes buffer", n,
(size_t) gst_buffer_get_size (object));
} else {
GST_LOG_OBJECT (src, " #%u: unknown item in queue", n);
}
queued = queued->next;
++n;
}
}
static void
gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src)
{
GList *queued;
guint32 id;
g_mutex_lock (&src->comm.mutex);
queued = src->queued;
src->queued = NULL;
g_cond_broadcast (&src->create_cond);
g_mutex_unlock (&src->comm.mutex);
while (queued) {
void *object = queued->data;
id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object),
QUARK_ID));
queued = g_list_delete_link (queued, queued);
if (GST_IS_EVENT (object)) {
GstEvent *event = GST_EVENT (object);
GST_DEBUG_OBJECT (src, "Cancelling queued event: %" GST_PTR_FORMAT,
event);
gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
gst_event_unref (event);
} else if (GST_IS_BUFFER (object)) {
GstBuffer *buffer = GST_BUFFER (object);
GST_DEBUG_OBJECT (src, "Cancelling queued buffer: %" GST_PTR_FORMAT,
buffer);
gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id,
GST_FLOW_FLUSHING);
gst_buffer_unref (buffer);
} else if (GST_IS_QUERY (object)) {
GstQuery *query = GST_QUERY (object);
GST_DEBUG_OBJECT (src, "Cancelling queued query: %" GST_PTR_FORMAT,
query);
gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, FALSE,
query);
gst_query_unref (query);
}
}
}
static void
gst_ipc_pipeline_src_start_loop (GstIpcPipelineSrc * src)
{
g_mutex_lock (&src->comm.mutex);
src->flushing = FALSE;
src->last_ret = GST_FLOW_OK;
g_mutex_unlock (&src->comm.mutex);
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_ipc_pipeline_src_loop,
src, NULL);
}
static void
gst_ipc_pipeline_src_stop_loop (GstIpcPipelineSrc * src, gboolean stop)
{
g_mutex_lock (&src->comm.mutex);
src->flushing = TRUE;
g_cond_broadcast (&src->create_cond);
g_mutex_unlock (&src->comm.mutex);
if (stop)
gst_pad_stop_task (src->srcpad);
}
static gboolean
gst_ipc_pipeline_src_activate_mode (GstPad * pad, GstObject * parent,
GstPadMode mode, gboolean active)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
switch (mode) {
case GST_PAD_MODE_PUSH:
GST_DEBUG_OBJECT (pad, "%s in push mode", active ? "activating" :
"deactivating");
if (active) {
gst_ipc_pipeline_src_start_loop (src);
} else {
gst_ipc_pipeline_src_stop_loop (src, TRUE);
gst_ipc_pipeline_comm_cancel (&src->comm, FALSE);
}
return TRUE;
default:
GST_DEBUG_OBJECT (pad, "unsupported activation mode");
return FALSE;
}
}
static gboolean
gst_ipc_pipeline_src_srcpad_event (GstPad * pad, GstObject * parent,
GstEvent * event)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
gboolean ret;
GST_DEBUG_OBJECT (src, "Got upstream event %s", GST_EVENT_TYPE_NAME (event));
ret = gst_ipc_pipeline_comm_write_event_to_fd (&src->comm, TRUE, event);
gst_event_unref (event);
GST_DEBUG_OBJECT (src, "Returning event result: %d", ret);
return ret;
}
static gboolean
gst_ipc_pipeline_src_srcpad_query (GstPad * pad, GstObject * parent,
GstQuery * query)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
gboolean ret;
/* answer some queries that do not make sense to be forwarded */
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_LATENCY:
return TRUE;
case GST_QUERY_CONTEXT:
return FALSE;
case GST_QUERY_CAPS:
{
/* caps queries occur even while linking the pipeline.
* It is possible that the ipcpipelinesink may not be connected at this
* point, so let's avoid a couple of errors... */
GstState state;
GST_OBJECT_LOCK (src);
state = GST_STATE (src);
GST_OBJECT_UNLOCK (src);
if (state == GST_STATE_NULL)
return FALSE;
}
default:
break;
}
GST_DEBUG_OBJECT (src, "Got upstream query %s: %" GST_PTR_FORMAT,
GST_QUERY_TYPE_NAME (query), query);
ret = gst_ipc_pipeline_comm_write_query_to_fd (&src->comm, TRUE, query);
GST_DEBUG_OBJECT (src, "Returning query result: %d, %" GST_PTR_FORMAT,
ret, query);
return ret;
}
static void
gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src)
{
gpointer object;
guint32 id;
gboolean ok;
GstFlowReturn ret = GST_FLOW_OK;
g_mutex_lock (&src->comm.mutex);
while (!src->queued && !src->flushing)
g_cond_wait (&src->create_cond, &src->comm.mutex);
if (src->flushing)
goto out;
object = src->queued->data;
src->queued = g_list_delete_link (src->queued, src->queued);
g_mutex_unlock (&src->comm.mutex);
id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object),
QUARK_ID));
if (GST_IS_BUFFER (object)) {
GstBuffer *buf = GST_BUFFER (object);
GST_DEBUG_OBJECT (src, "Pushing queued buffer: %" GST_PTR_FORMAT, buf);
ret = gst_pad_push (src->srcpad, buf);
GST_DEBUG_OBJECT (src, "pushed id %u, ret: %s", id,
gst_flow_get_name (ret));
gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, ret);
} else if (GST_IS_EVENT (object)) {
GstEvent *event = GST_EVENT (object);
GST_DEBUG_OBJECT (src, "Pushing queued event: %" GST_PTR_FORMAT, event);
ok = gst_pad_push_event (src->srcpad, event);
gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
} else if (GST_IS_QUERY (object)) {
GstQuery *query = GST_QUERY (object);
GST_DEBUG_OBJECT (src, "Pushing queued query: %" GST_PTR_FORMAT, query);
ok = gst_pad_peer_query (src->srcpad, query);
gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ok, query);
gst_query_unref (query);
} else {
GST_WARNING_OBJECT (src, "Unknown data type queued");
}
g_mutex_lock (&src->comm.mutex);
if (!src->queued)
g_cond_broadcast (&src->create_cond);
out:
if (src->flushing)
ret = GST_FLOW_FLUSHING;
if (ret != GST_FLOW_OK)
src->last_ret = ret;
g_mutex_unlock (&src->comm.mutex);
if (ret == GST_FLOW_FLUSHING) {
gst_ipc_pipeline_src_cancel_queued (src);
gst_pad_pause_task (src->srcpad);
}
}
static gboolean
gst_ipc_pipeline_src_send_event (GstElement * element, GstEvent * event)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
return gst_pad_push_event (src->srcpad, event);
}
static gboolean
gst_ipc_pipeline_src_query (GstElement * element, GstQuery * query)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
return gst_pad_query (src->srcpad, query);
}
static GstElement *
find_pipeline (GstElement * element)
{
GstElement *pipeline = element;
while (GST_ELEMENT_PARENT (pipeline)) {
pipeline = GST_ELEMENT_PARENT (pipeline);
if (GST_IS_PIPELINE (pipeline))
break;
}
if (!pipeline || !GST_IS_PIPELINE (pipeline)) {
pipeline = NULL;
}
return pipeline;
}
static gboolean
gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src, GstMessage * msg)
{
gboolean skip = FALSE;
GST_DEBUG_OBJECT (src, "Message to forward: %" GST_PTR_FORMAT, msg);
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_STATE_CHANGED:
{
GstState old, new, pending;
GstElement *pipeline = find_pipeline (GST_ELEMENT (src));
gst_message_parse_state_changed (msg, &old, &new, &pending);
if (GST_MESSAGE_SRC (msg) == GST_OBJECT (pipeline) &&
old == new && new == pending) {
GST_DEBUG_OBJECT (src, "Detected lost state, notifying master");
gst_ipc_pipeline_comm_write_state_lost_to_fd (&src->comm);
}
/* fall through & skip */
}
case GST_MESSAGE_ASYNC_START:
case GST_MESSAGE_CLOCK_PROVIDE:
case GST_MESSAGE_CLOCK_LOST:
case GST_MESSAGE_NEW_CLOCK:
case GST_MESSAGE_STREAM_STATUS:
case GST_MESSAGE_NEED_CONTEXT:
case GST_MESSAGE_HAVE_CONTEXT:
case GST_MESSAGE_STRUCTURE_CHANGE:
skip = TRUE;
break;
case GST_MESSAGE_RESET_TIME:
{
GQuark ipcpipelinesrc_posted = g_quark_from_static_string
("gstinterslavepipeline-message-already-posted");
skip = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (msg),
ipcpipelinesrc_posted));
if (!skip) {
gst_mini_object_set_qdata (GST_MINI_OBJECT (msg), ipcpipelinesrc_posted,
GUINT_TO_POINTER (1), NULL);
}
break;
}
case GST_MESSAGE_ERROR:
{
GError *error = NULL;
/* skip forwarding a RESOURCE/WRITE error message that originated from
* ipcpipelinesrc; we post this error when writing to the comm fd fails,
* so if we try to forward it here, we will likely get another one posted
* immediately and end up doing an endless loop */
gst_message_parse_error (msg, &error, NULL);
skip = (GST_MESSAGE_SRC (msg) == GST_OBJECT_CAST (src)
&& error->domain == gst_resource_error_quark ()
&& error->code == GST_RESOURCE_ERROR_WRITE);
g_error_free (error);
break;
}
default:
break;
}
if (skip) {
GST_DEBUG_OBJECT (src, "message will not be forwarded");
return TRUE;
}
return gst_ipc_pipeline_comm_write_message_to_fd (&src->comm, msg);
}
static void
on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
GST_DEBUG_OBJECT (src, "Got buffer id %u, queueing: %" GST_PTR_FORMAT, id,
buffer);
g_mutex_lock (&src->comm.mutex);
if (!GST_PAD_IS_ACTIVE (src->srcpad) || src->flushing) {
g_mutex_unlock (&src->comm.mutex);
GST_INFO_OBJECT (src, "We're not started or flushing, buffer ignored");
gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id,
GST_FLOW_FLUSHING);
gst_buffer_unref (buffer);
return;
}
if (src->last_ret != GST_FLOW_OK) {
GstFlowReturn last_ret = src->last_ret;
g_mutex_unlock (&src->comm.mutex);
GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting buffer",
gst_flow_get_name (last_ret));
gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, last_ret);
gst_buffer_unref (buffer);
return;
}
src->queued = g_list_append (src->queued, buffer); /* keep the ref */
gst_ipc_pipeline_src_log_queue (src);
g_cond_broadcast (&src->create_cond);
g_mutex_unlock (&src->comm.mutex);
}
static void
do_oob_event (GstElement * element, gpointer user_data)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
GstEvent *event = user_data;
gboolean ret, upstream;
guint32 id;
id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
(event), QUARK_ID));
upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
(event), QUARK_UPSTREAM));
if (upstream) {
GstElement *pipeline;
gboolean ok = FALSE;
if (!(pipeline = find_pipeline (element))) {
GST_ERROR_OBJECT (src, "No pipeline found");
gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
} else {
GST_DEBUG_OBJECT (src, "Posting upstream event on pipeline: %"
GST_PTR_FORMAT, event);
ok = gst_element_send_event (pipeline, gst_event_ref (event));
gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
}
} else {
GST_DEBUG_OBJECT (src, "Pushing event async: %" GST_PTR_FORMAT, event);
ret = gst_element_send_event (element, gst_event_ref (event));
GST_DEBUG_OBJECT (src, "Event pushed, return %d", ret);
gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ret);
}
}
static void
on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
GstFlowReturn last_ret = GST_FLOW_OK;
GST_DEBUG_OBJECT (src, "Got event id %u, queueing: %" GST_PTR_FORMAT, id,
event);
gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_UPSTREAM,
GINT_TO_POINTER (upstream), NULL);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
gst_ipc_pipeline_src_stop_loop (src, FALSE);
break;
case GST_EVENT_FLUSH_STOP:
gst_ipc_pipeline_src_start_loop (src);
break;
default:
g_mutex_lock (&src->comm.mutex);
last_ret = src->last_ret;
g_mutex_unlock (&src->comm.mutex);
break;
}
if (GST_EVENT_IS_SERIALIZED (event) && !upstream) {
if (last_ret != GST_FLOW_OK) {
GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event",
gst_flow_get_name (last_ret));
gst_event_unref (event);
gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
} else {
GST_DEBUG_OBJECT (src, "This is a serialized event, adding to queue %p",
src->queued);
g_mutex_lock (&src->comm.mutex);
src->queued = g_list_append (src->queued, event); /* keep the ref */
gst_ipc_pipeline_src_log_queue (src);
g_cond_broadcast (&src->create_cond);
g_mutex_unlock (&src->comm.mutex);
}
} else {
if (last_ret != GST_FLOW_OK && !upstream) {
GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event",
gst_flow_get_name (last_ret));
gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
gst_event_unref (event);
} else {
GST_DEBUG_OBJECT (src,
"This is not a serialized event, pushing in a thread");
gst_element_call_async (GST_ELEMENT (src), do_oob_event, event,
(GDestroyNotify) gst_event_unref);
}
}
}
static void
do_oob_query (GstElement * element, gpointer user_data)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
GstQuery *query = GST_QUERY (user_data);
guint32 id;
gboolean upstream;
gboolean ret;
id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
(query), QUARK_ID));
upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
(query), QUARK_UPSTREAM));
if (upstream) {
GstElement *pipeline;
if (!(pipeline = find_pipeline (element))) {
GST_ERROR_OBJECT (src, "No pipeline found");
ret = FALSE;
} else {
GST_DEBUG_OBJECT (src, "Posting query on pipeline: %" GST_PTR_FORMAT,
query);
ret = gst_element_query (pipeline, query);
}
} else {
GST_DEBUG_OBJECT (src, "Pushing query async: %" GST_PTR_FORMAT, query);
ret = gst_pad_peer_query (src->srcpad, query);
GST_DEBUG_OBJECT (src, "Query pushed, return %d", ret);
}
gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ret, query);
}
static void
on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
GST_DEBUG_OBJECT (src, "Got query id %u, queueing: %" GST_PTR_FORMAT, id,
query);
if (GST_QUERY_IS_SERIALIZED (query) && !upstream) {
g_mutex_lock (&src->comm.mutex);
src->queued = g_list_append (src->queued, query); /* keep the ref */
gst_ipc_pipeline_src_log_queue (src);
g_cond_broadcast (&src->create_cond);
g_mutex_unlock (&src->comm.mutex);
} else {
gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_UPSTREAM,
GINT_TO_POINTER (upstream), NULL);
gst_element_call_async (GST_ELEMENT (src), do_oob_query, query,
(GDestroyNotify) gst_query_unref);
}
}
struct StateChangeData
{
guint32 id;
GstStateChange transition;
};
static void
do_state_change (GstElement * element, gpointer data)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
GstElement *pipeline;
GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
GstState state, pending, effective;
struct StateChangeData *d = data;
guint32 id = d->id;
GstStateChange transition = d->transition;
gboolean down;
GST_DEBUG_OBJECT (src, "Doing state change id %u, %s -> %s", id,
gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
if (!(pipeline = find_pipeline (element))) {
GST_ERROR_OBJECT (src, "No pipeline found");
ret = GST_STATE_CHANGE_FAILURE;
goto done_nolock;
}
down = (GST_STATE_TRANSITION_CURRENT (transition) >=
GST_STATE_TRANSITION_NEXT (transition));
GST_STATE_LOCK (pipeline);
ret = gst_element_get_state (pipeline, &state, &pending, 0);
/* if we are pending a state change, count the pending state as
* the current one */
effective = pending == GST_STATE_VOID_PENDING ? state : pending;
GST_DEBUG_OBJECT (src, "Current element state: ret:%s state:%s pending:%s "
"effective:%s", gst_element_state_change_return_get_name (ret),
gst_element_state_get_name (state),
gst_element_state_get_name (pending),
gst_element_state_get_name (effective));
if ((GST_STATE_TRANSITION_NEXT (transition) <= effective && !down) ||
(GST_STATE_TRANSITION_NEXT (transition) > effective && down)) {
/* if the request was to transition to a state that we have already
* transitioned to in the same direction, then we just silently return */
GST_DEBUG_OBJECT (src, "State transition to %s is unnecessary",
gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
/* make sure we return SUCCESS if the transition is to NULL or READY,
* even if our current ret is ASYNC for example; also, make sure not
* to return FAILURE, since our state is already committed */
if (GST_STATE_TRANSITION_NEXT (transition) <= GST_STATE_READY ||
ret == GST_STATE_CHANGE_FAILURE) {
ret = GST_STATE_CHANGE_SUCCESS;
}
} else if (ret != GST_STATE_CHANGE_FAILURE || down) {
/* if the request was to transition to a state that we haven't already
* transitioned to in the same direction, then we need to request a state
* change in the pipeline, *unless* we are going upwards and the last ret
* was FAILURE, in which case we should just return FAILURE and stop.
* We don't stop a downwards state change though in case of FAILURE, since
* we need to be able to bring the pipeline down to NULL. Note that
* GST_MESSAGE_ERROR will cause ret to be GST_STATE_CHANGE_FAILURE */
ret = gst_element_set_state (pipeline,
GST_STATE_TRANSITION_NEXT (transition));
GST_DEBUG_OBJECT (src, "gst_element_set_state returned %s",
gst_element_state_change_return_get_name (ret));
}
GST_STATE_UNLOCK (pipeline);
done_nolock:
GST_DEBUG_OBJECT (src, "sending state change ack, ret = %s",
gst_element_state_change_return_get_name (ret));
gst_ipc_pipeline_comm_write_state_change_ack_to_fd (&src->comm, id, ret);
}
static void
on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
{
struct StateChangeData *d;
GstElement *ipcpipelinesrc = GST_ELEMENT (user_data);
GST_DEBUG_OBJECT (ipcpipelinesrc, "Got state change id %u, %s -> %s", id,
gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
d = g_new (struct StateChangeData, 1);
d->id = id;
d->transition = transition;
gst_element_call_async (ipcpipelinesrc, do_state_change, d, g_free);
}
static void
on_message (guint32 id, GstMessage * message, gpointer user_data)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
GST_ERROR_OBJECT (src, "Got message id %u, not supposed to: %" GST_PTR_FORMAT,
id, message);
gst_message_unref (message);
}
static gboolean
gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc * src)
{
if (!gst_ipc_pipeline_comm_start_reader_thread (&src->comm, on_buffer,
on_event, on_query, on_state_change, NULL, on_message, src)) {
GST_ERROR_OBJECT (src, "Failed to start reader thread");
return FALSE;
}
return TRUE;
}
static void
gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src)
{
gst_ipc_pipeline_comm_stop_reader_thread (&src->comm);
}
static void
gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src)
{
GST_DEBUG_OBJECT (src, "Disconnecting");
gst_ipc_pipeline_src_stop_reader_thread (src);
src->comm.fdin = -1;
src->comm.fdout = -1;
gst_ipc_pipeline_comm_cancel (&src->comm, FALSE);
gst_ipc_pipeline_src_start_reader_thread (src);
}
static GstStateChangeReturn
gst_ipc_pipeline_src_change_state (GstElement * element,
GstStateChange transition)
{
GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
if (src->comm.fdin < 0) {
GST_ERROR_OBJECT (element, "Invalid fdin: %d", src->comm.fdin);
return GST_STATE_CHANGE_FAILURE;
}
if (src->comm.fdout < 0) {
GST_ERROR_OBJECT (element, "Invalid fdout: %d", src->comm.fdout);
return GST_STATE_CHANGE_FAILURE;
}
if (!src->comm.reader_thread) {
GST_ERROR_OBJECT (element, "Failed to start reader thread");
return GST_STATE_CHANGE_FAILURE;
}
break;
default:
break;
}
return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
}

View file

@ -0,0 +1,76 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
* 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
*
* gstipcpipelinesrc.h:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_IPC_PIPELINE_SRC_H__
#define __GST_IPC_PIPELINE_SRC_H__
#include <gst/gst.h>
#include "gstipcpipelinecomm.h"
G_BEGIN_DECLS
#define GST_TYPE_IPC_PIPELINE_SRC \
(gst_ipc_pipeline_src_get_type())
#define GST_IPC_PIPELINE_SRC(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_PIPELINE_SRC,GstIpcPipelineSrc))
#define GST_IPC_PIPELINE_SRC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_PIPELINE_SRC,GstIpcPipelineSrcClass))
#define GST_IS_IPC_PIPELINE_SRC(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_PIPELINE_SRC))
#define GST_IS_IPC_PIPELINE_SRC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_PIPELINE_SRC))
typedef struct _GstIpcPipelineSrc GstIpcPipelineSrc;
typedef struct _GstIpcPipelineSrcClass GstIpcPipelineSrcClass;
/**
* GstIpcPipelineSrc:
*
* Opaque #GstIpcPipelineSrc data structure.
*/
struct _GstIpcPipelineSrc {
GstElement element;
GstIpcPipelineComm comm;
GstPad *srcpad;
gboolean flushing;
GList *queued;
GstFlowReturn last_ret;
GCond create_cond;
};
struct _GstIpcPipelineSrcClass {
GstElementClass parent_class;
gboolean (*forward_message) (GstIpcPipelineSrc *, GstMessage *);
void (*disconnect) (GstIpcPipelineSrc * src);
};
G_GNUC_INTERNAL GType gst_ipc_pipeline_src_get_type (void);
G_END_DECLS
#endif /* __GST_IPC_PIPELINE_SRC_H__ */

View file

@ -0,0 +1,122 @@
/* GStreamer
* Copyright (C) 2015-2017 YouView TV Ltd
* Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
*
* gstipcslavepipeline.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-ipcslavepipeline
* @see_also: #GstIpcPipelineSink, #GstIpcPipelineSrc
*
* This is a GstPipeline subclass meant to embed one ore more ipcpipelinesrc
* elements, and be slaved transparently to the master pipeline, using one ore
* more ipcpipelinesink elements on the master.
*
* The actual pipeline slaving logic happens in ipcpipelinesrc. The only thing
* that this class actually does is that it watches the pipeline bus for
* messages and forwards them to the master pipeline through the ipcpipelinesrc
* elements that it contains.
*
* For more details about this mechanism and its uses, see the documentation
* of the ipcpipelinesink element.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <string.h>
#include "gstipcpipelinesrc.h"
#include "gstipcslavepipeline.h"
GST_DEBUG_CATEGORY_STATIC (gst_ipcslavepipeline_debug);
#define GST_CAT_DEFAULT gst_ipcslavepipeline_debug
#define _do_init \
GST_DEBUG_CATEGORY_INIT (gst_ipcslavepipeline_debug, "ipcslavepipeline", 0, "ipcslavepipeline element");
#define gst_ipc_slave_pipeline_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstIpcSlavePipeline, gst_ipc_slave_pipeline,
GST_TYPE_PIPELINE, _do_init);
static gboolean gst_ipc_slave_pipeline_post_message (GstElement * element,
GstMessage * message);
static void
gst_ipc_slave_pipeline_class_init (GstIpcSlavePipelineClass * klass)
{
GstElementClass *element_class;
element_class = GST_ELEMENT_CLASS (klass);
element_class->post_message = gst_ipc_slave_pipeline_post_message;
gst_element_class_set_static_metadata (element_class,
"Inter-process slave pipeline",
"Generic/Bin/Slave",
"Contains the slave part of an inter-process pipeline",
"Vincent Penquerc'h <vincent.penquerch@collabora.co.uk");
}
static void
gst_ipc_slave_pipeline_init (GstIpcSlavePipeline * isp)
{
}
static gboolean
send_message_if_ipcpipelinesrc (const GValue * v, GValue * r,
gpointer user_data)
{
GstElement *e;
GType et;
gboolean ret;
GstMessage *message = user_data;
e = g_value_get_object (v);
et = gst_element_factory_get_element_type (gst_element_get_factory (e));
if (et == GST_TYPE_IPC_PIPELINE_SRC) {
g_signal_emit_by_name (G_OBJECT (e), "forward-message", message, &ret);
/* if we succesfully sent this to the master and it's not ASYNC_DONE or EOS,
* we can skip sending it again through the other ipcpipelinesrcs */
if (ret && GST_MESSAGE_TYPE (message) != GST_MESSAGE_ASYNC_DONE &&
GST_MESSAGE_TYPE (message) != GST_MESSAGE_EOS)
return FALSE;
}
return TRUE;
}
static void
gst_ipc_slave_pipeline_forward_message (GstIpcSlavePipeline * pipeline,
GstMessage * message)
{
GstIterator *it;
it = gst_bin_iterate_sources (GST_BIN (pipeline));
gst_iterator_fold (it, send_message_if_ipcpipelinesrc, NULL, message);
gst_iterator_free (it);
}
static gboolean
gst_ipc_slave_pipeline_post_message (GstElement * element, GstMessage * message)
{
gst_ipc_slave_pipeline_forward_message (GST_IPC_SLAVE_PIPELINE
(element), message);
return GST_ELEMENT_CLASS (parent_class)->post_message (element, message);
}

View file

@ -0,0 +1,59 @@
/* GStreamer
* Copyright (C) 2015-2017 YouView TV Ltd
* Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
*
* gstipcslavepipeline.h:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef _GST_IPC_SLAVE_PIPELINE_H_
#define _GST_IPC_SLAVE_PIPELINE_H_
#include <gst/gst.h>
G_BEGIN_DECLS
#define GST_TYPE_IPC_SLAVE_PIPELINE \
(gst_ipc_slave_pipeline_get_type())
#define GST_IPC_SLAVE_PIPELINE(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_IPC_SLAVE_PIPELINE,GstIpcSlavePipeline))
#define GST_IPC_SLAVE_PIPELINE_CAST(obj) \
((GstIpcSlavePipeline *) obj)
#define GST_IPC_SLAVE_PIPELINE_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_IPC_SLAVE_PIPELINE,GstIpcSlavePipelineClass))
#define GST_IS_IPC_SLAVE_PIPELINE(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_IPC_SLAVE_PIPELINE))
#define GST_IS_IPC_SLAVE_PIPELINE_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_IPC_SLAVE_PIPELINE))
typedef struct _GstIpcSlavePipeline GstIpcSlavePipeline;
typedef struct _GstIpcSlavePipelineClass GstIpcSlavePipelineClass;
struct _GstIpcSlavePipeline
{
GstPipeline pipeline;
};
struct _GstIpcSlavePipelineClass
{
GstPipelineClass pipeline_class;
};
G_GNUC_INTERNAL GType gst_ipc_slave_pipeline_get_type (void);
G_END_DECLS
#endif

View file

@ -0,0 +1,92 @@
This documents the protocol used to pass data over fds between ipcpipelinesrc
and ipcpipelinesink.
The protocol is used in both directions. However, some combinations do
not make sense (eg, a buffer going from ipcpipelinesrc to ipcpipelinesink).
The protocol consists of an arbitrary number of variable sized chunks
with a type. Each chunk has a request ID which can be used to match a
request with its reply (ack / query result).
Each chunk consists of:
- a type (byte):
1: ack
2: query result
3: buffer
4: event
5: sink message event
6: query
7: state change
8: state lost
9: message
10: error/warning/info message
- a request ID, 4 bytes, little endian
- the payload size, 4 bytes, little endian
- N bytes payload
Depending on the type, the payload can contain:
- 1: ack
result: 4 bytes, little endian
interpreted as GstFlowReturn for buffers, boolean for events and
GstStateChangeReturn for state changes
- 2: query result
result boolean: 1 byte
query type: 4 bytes, little endian
returned query string representation, NUL terminated
- 3: buffer:
pts: 8 bytes, little endian
dts: 8 bytes, little endian
duration: 8 bytes, little endian
offset: 8 bytes, little endian
offset end: 8 bytes, little endian
flags: 8 bytes, little endian
buffer size: 4 bytes, little endian
data: contents of the buffer data, size specified in "buffer size"
number of GstMeta: 4 bytes, little endian
For each GstMeta:
bytes: 4 bytes, little endian
this is the number of bytes before the string representation
at the end of this block, including the 4 bytes of itself
flags: 4 bytes, little endian
length of the GstMetaInfo::api name: 4 bytes, little endian
GstMetaInfo::api name: string, NUL terminated
GstMetaInfo::size: 8 bytes, little endian
length of the string representation: 4 bytes, little endian
string representation, NUL terminated
- 4: event
event type: 4 bytes, little endian
sequence number: 4 bytes, little endian
direction: 1 byte
whether the event is going upstream (1) or downstream (0)
string representation, NUL terminated
- 5: sink message event
message type: 4 bytes, little endian
event sequence number: 4 bytes, little endian
message sequence number: 4 bytes, little endian
length: 4 bytes, little endian
event structure name: length bytes, NUL terminated
message structure string representation: remaining bytes, NUL terminated
- 6: query
query type: 4 bytes, little endian
direction: 1 byte
whether the query is going upstream (1) or downstream (0)
string representation, NUL terminated
- 7: state change
GstStateChange: 4 bytes, little endian
- 8: state lost
no payload
- 9: message
message type: 4 bytes, little endian
string representation, NUL terminated
- 10: error/warning/info message
message type (2 = error, 1 = warning, 0 = info): 1 byte
error domain string length: 4 bytes, little endian
string representation of the error domain, NUL terminated
error code: 4 bytes, little endian
length: 4 bytes, little endian
if zero: no error message
if non zero: As many bytes as this length: the error message, NUL terminated
length: 4 bytes, little endian
if zero: no extra message
if non zero: As many bytes as this length: the error extra debug message, NUL terminated