gstreamer/plugins/elements/gsttee.c
Sebastian Dröge d25a88e3a1 tee: First deactivate the pad and then remove it when releasing pads
This reverts a96002bb28, which is not
necessary anymore. If we release the pad after removing it then none of
the deactivation code will actually be called because the pad has no
parent anymore, and we require a parent on the pad for deactivation to
happen.

This can then, among other things, cause a streaming thread to be still
stuck in a pad probe because the pad was never flushed, and waiting
there forever because now the pad will actually never be flushed anymore.
2019-10-24 22:49:41 +00:00

1242 lines
35 KiB
C

/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
* 2007 Wim Taymans <wim.taymans@gmail.com>
*
* gsttee.c: Tee element, one in N out
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-tee
* @title: tee
* @see_also: #GstIdentity
*
* Split data to multiple pads. Branching the data flow is useful when e.g.
* capturing a video where the video is shown on the screen and also encoded and
* written to a file. Another example is playing music and hooking up a
* visualisation module.
*
* One needs to use separate queue elements (or a multiqueue) in each branch to
* provide separate threads for each branch. Otherwise a blocked dataflow in one
* branch would stall the other branches.
*
* ## Example launch line
* |[
* gst-launch-1.0 filesrc location=song.ogg ! decodebin ! tee name=t ! queue ! audioconvert ! audioresample ! autoaudiosink t. ! queue ! audioconvert ! goom ! videoconvert ! autovideosink
* ]|
*
* Play song.ogg audio file which must be in the current working directory
* and render visualisations using the goom element (this can be easier done
* using the playbin element, this is just an example pipeline).
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "gsttee.h"
#include "gst/glib-compat-private.h"
#include <string.h>
#include <stdio.h>
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (gst_tee_debug);
#define GST_CAT_DEFAULT gst_tee_debug
#define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
static GType
gst_tee_pull_mode_get_type (void)
{
static GType type = 0;
static const GEnumValue data[] = {
{GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
{GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
"single"},
{0, NULL, NULL},
};
if (!type) {
type = g_enum_register_static ("GstTeePullMode", data);
}
return type;
}
#define DEFAULT_PROP_NUM_SRC_PADS 0
#define DEFAULT_PROP_HAS_CHAIN TRUE
#define DEFAULT_PROP_SILENT TRUE
#define DEFAULT_PROP_LAST_MESSAGE NULL
#define DEFAULT_PULL_MODE GST_TEE_PULL_MODE_NEVER
#define DEFAULT_PROP_ALLOW_NOT_LINKED FALSE
enum
{
PROP_0,
PROP_NUM_SRC_PADS,
PROP_HAS_CHAIN,
PROP_SILENT,
PROP_LAST_MESSAGE,
PROP_PULL_MODE,
PROP_ALLOC_PAD,
PROP_ALLOW_NOT_LINKED,
};
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
GST_PAD_SRC,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
#define _do_init \
GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element");
#define gst_tee_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init);
static GParamSpec *pspec_last_message = NULL;
static GParamSpec *pspec_alloc_pad = NULL;
GType gst_tee_pad_get_type (void);
#define GST_TYPE_TEE_PAD \
(gst_tee_pad_get_type())
#define GST_TEE_PAD(obj) \
(G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEE_PAD, GstTeePad))
#define GST_TEE_PAD_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEE_PAD, GstTeePadClass))
#define GST_IS_TEE_PAD(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_TEE_PAD))
#define GST_IS_TEE_PAD_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_TEE_PAD))
#define GST_TEE_PAD_CAST(obj) \
((GstTeePad *)(obj))
typedef struct _GstTeePad GstTeePad;
typedef struct _GstTeePadClass GstTeePadClass;
struct _GstTeePad
{
GstPad parent;
guint index;
gboolean pushed;
GstFlowReturn result;
gboolean removed;
};
struct _GstTeePadClass
{
GstPadClass parent;
};
G_DEFINE_TYPE (GstTeePad, gst_tee_pad, GST_TYPE_PAD);
static void
gst_tee_pad_class_init (GstTeePadClass * klass)
{
}
static void
gst_tee_pad_reset (GstTeePad * pad)
{
pad->pushed = FALSE;
pad->result = GST_FLOW_NOT_LINKED;
pad->removed = FALSE;
}
static void
gst_tee_pad_init (GstTeePad * pad)
{
gst_tee_pad_reset (pad);
}
static GstPad *gst_tee_request_new_pad (GstElement * element,
GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
static void gst_tee_release_pad (GstElement * element, GstPad * pad);
static void gst_tee_finalize (GObject * object);
static void gst_tee_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_tee_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static void gst_tee_dispose (GObject * object);
static GstFlowReturn gst_tee_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer);
static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstObject * parent,
GstBufferList * list);
static gboolean gst_tee_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent,
GstQuery * query);
static gboolean gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent,
GstPadMode mode, gboolean active);
static gboolean gst_tee_src_query (GstPad * pad, GstObject * parent,
GstQuery * query);
static gboolean gst_tee_src_activate_mode (GstPad * pad, GstObject * parent,
GstPadMode mode, gboolean active);
static GstFlowReturn gst_tee_src_get_range (GstPad * pad, GstObject * parent,
guint64 offset, guint length, GstBuffer ** buf);
static void
gst_tee_dispose (GObject * object)
{
GList *item;
restart:
for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
GstPad *pad = GST_PAD (item->data);
if (GST_PAD_IS_SRC (pad)) {
gst_element_release_request_pad (GST_ELEMENT (object), pad);
goto restart;
}
}
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_tee_finalize (GObject * object)
{
GstTee *tee;
tee = GST_TEE (object);
g_hash_table_unref (tee->pad_indexes);
g_free (tee->last_message);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_tee_class_init (GstTeeClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
gobject_class = G_OBJECT_CLASS (klass);
gstelement_class = GST_ELEMENT_CLASS (klass);
gobject_class->finalize = gst_tee_finalize;
gobject_class->set_property = gst_tee_set_property;
gobject_class->get_property = gst_tee_get_property;
gobject_class->dispose = gst_tee_dispose;
g_object_class_install_property (gobject_class, PROP_NUM_SRC_PADS,
g_param_spec_int ("num-src-pads", "Num Src Pads",
"The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
g_param_spec_boolean ("has-chain", "Has Chain",
"If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SILENT,
g_param_spec_boolean ("silent", "Silent",
"Don't produce last_message events", DEFAULT_PROP_SILENT,
G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
pspec_last_message = g_param_spec_string ("last-message", "Last Message",
"The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
pspec_last_message);
g_object_class_install_property (gobject_class, PROP_PULL_MODE,
g_param_spec_enum ("pull-mode", "Pull mode",
"Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
DEFAULT_PULL_MODE,
G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad",
"The pad ALLOCATION queries will be proxied to (DEPRECATED, has no effect)",
GST_TYPE_PAD,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED);
g_object_class_install_property (gobject_class, PROP_ALLOC_PAD,
pspec_alloc_pad);
/**
* GstTee:allow-not-linked
*
* This property makes sink pad return GST_FLOW_OK even if there are no
* source pads or any of them is linked.
*
* This is useful to avoid errors when you have a dynamic pipeline and during
* a reconnection you can have all the pads unlinked or removed.
*
* Since: 1.6
*/
g_object_class_install_property (gobject_class, PROP_ALLOW_NOT_LINKED,
g_param_spec_boolean ("allow-not-linked", "Allow not linked",
"Return GST_FLOW_OK even if there are no source pads or they are "
"all unlinked", DEFAULT_PROP_ALLOW_NOT_LINKED,
G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_set_static_metadata (gstelement_class,
"Tee pipe fitting",
"Generic",
"1-to-N pipe fitting",
"Erik Walthinsen <omega@cse.ogi.edu>, " "Wim Taymans <wim@fluendo.com>");
gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
gst_element_class_add_static_pad_template (gstelement_class, &src_template);
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_tee_release_pad);
}
static void
gst_tee_init (GstTee * tee)
{
tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
tee->sink_mode = GST_PAD_MODE_NONE;
gst_pad_set_event_function (tee->sinkpad,
GST_DEBUG_FUNCPTR (gst_tee_sink_event));
gst_pad_set_query_function (tee->sinkpad,
GST_DEBUG_FUNCPTR (gst_tee_sink_query));
gst_pad_set_activatemode_function (tee->sinkpad,
GST_DEBUG_FUNCPTR (gst_tee_sink_activate_mode));
gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
gst_pad_set_chain_list_function (tee->sinkpad,
GST_DEBUG_FUNCPTR (gst_tee_chain_list));
GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
tee->pad_indexes = g_hash_table_new (NULL, NULL);
tee->last_message = NULL;
}
static void
gst_tee_notify_alloc_pad (GstTee * tee)
{
g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad);
}
static gboolean
forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
{
GstPad *srcpad = GST_PAD_CAST (user_data);
GstFlowReturn ret;
ret = gst_pad_store_sticky_event (srcpad, *event);
if (ret != GST_FLOW_OK) {
GST_DEBUG_OBJECT (srcpad, "storing sticky event %p (%s) failed: %s", *event,
GST_EVENT_TYPE_NAME (*event), gst_flow_get_name (ret));
}
return TRUE;
}
static GstPad *
gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
const gchar * name_templ, const GstCaps * caps)
{
gchar *name;
GstPad *srcpad;
GstTee *tee;
GstPadMode mode;
gboolean res;
guint index = 0;
tee = GST_TEE (element);
GST_DEBUG_OBJECT (tee, "requesting pad");
GST_OBJECT_LOCK (tee);
if (name_templ && sscanf (name_templ, "src_%u", &index) == 1) {
GST_LOG_OBJECT (element, "name: %s (index %d)", name_templ, index);
if (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index))) {
GST_ERROR_OBJECT (element, "pad name %s is not unique", name_templ);
GST_OBJECT_UNLOCK (tee);
return NULL;
}
if (index >= tee->next_pad_index)
tee->next_pad_index = index + 1;
} else {
index = tee->next_pad_index;
while (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index)))
index++;
tee->next_pad_index = index + 1;
}
g_hash_table_insert (tee->pad_indexes, GUINT_TO_POINTER (index), NULL);
name = g_strdup_printf ("src_%u", index);
srcpad = GST_PAD_CAST (g_object_new (GST_TYPE_TEE_PAD,
"name", name, "direction", templ->direction, "template", templ,
NULL));
GST_TEE_PAD_CAST (srcpad)->index = index;
g_free (name);
mode = tee->sink_mode;
GST_OBJECT_UNLOCK (tee);
switch (mode) {
case GST_PAD_MODE_PULL:
/* we already have a src pad in pull mode, and our pull mode can only be
SINGLE, so fall through to activate this new pad in push mode */
case GST_PAD_MODE_PUSH:
res = gst_pad_activate_mode (srcpad, GST_PAD_MODE_PUSH, TRUE);
break;
default:
res = TRUE;
break;
}
if (!res)
goto activate_failed;
gst_pad_set_activatemode_function (srcpad,
GST_DEBUG_FUNCPTR (gst_tee_src_activate_mode));
gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query));
gst_pad_set_getrange_function (srcpad,
GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
/* Forward sticky events to the new srcpad */
gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad);
gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
return srcpad;
/* ERRORS */
activate_failed:
{
gboolean changed = FALSE;
GST_OBJECT_LOCK (tee);
GST_DEBUG_OBJECT (tee, "warning failed to activate request pad");
if (tee->allocpad == srcpad) {
tee->allocpad = NULL;
changed = TRUE;
}
GST_OBJECT_UNLOCK (tee);
gst_object_unref (srcpad);
if (changed) {
gst_tee_notify_alloc_pad (tee);
}
return NULL;
}
}
static void
gst_tee_release_pad (GstElement * element, GstPad * pad)
{
GstTee *tee;
gboolean changed = FALSE;
guint index;
tee = GST_TEE (element);
GST_DEBUG_OBJECT (tee, "releasing pad");
GST_OBJECT_LOCK (tee);
index = GST_TEE_PAD_CAST (pad)->index;
/* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */
GST_TEE_PAD_CAST (pad)->removed = TRUE;
if (tee->allocpad == pad) {
tee->allocpad = NULL;
changed = TRUE;
}
GST_OBJECT_UNLOCK (tee);
gst_pad_set_active (pad, FALSE);
gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
if (changed) {
gst_tee_notify_alloc_pad (tee);
}
GST_OBJECT_LOCK (tee);
g_hash_table_remove (tee->pad_indexes, GUINT_TO_POINTER (index));
GST_OBJECT_UNLOCK (tee);
}
static void
gst_tee_set_property (GObject * object, guint prop_id, const GValue * value,
GParamSpec * pspec)
{
GstTee *tee = GST_TEE (object);
GST_OBJECT_LOCK (tee);
switch (prop_id) {
case PROP_HAS_CHAIN:
tee->has_chain = g_value_get_boolean (value);
break;
case PROP_SILENT:
tee->silent = g_value_get_boolean (value);
break;
case PROP_PULL_MODE:
tee->pull_mode = (GstTeePullMode) g_value_get_enum (value);
break;
case PROP_ALLOC_PAD:
{
GstPad *pad = g_value_get_object (value);
GST_OBJECT_LOCK (pad);
if (GST_OBJECT_PARENT (pad) == GST_OBJECT_CAST (object))
tee->allocpad = pad;
else
GST_WARNING_OBJECT (object, "Tried to set alloc pad %s which"
" is not my pad", GST_OBJECT_NAME (pad));
GST_OBJECT_UNLOCK (pad);
break;
}
case PROP_ALLOW_NOT_LINKED:
tee->allow_not_linked = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
GST_OBJECT_UNLOCK (tee);
}
static void
gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstTee *tee = GST_TEE (object);
GST_OBJECT_LOCK (tee);
switch (prop_id) {
case PROP_NUM_SRC_PADS:
g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads);
break;
case PROP_HAS_CHAIN:
g_value_set_boolean (value, tee->has_chain);
break;
case PROP_SILENT:
g_value_set_boolean (value, tee->silent);
break;
case PROP_LAST_MESSAGE:
g_value_set_string (value, tee->last_message);
break;
case PROP_PULL_MODE:
g_value_set_enum (value, tee->pull_mode);
break;
case PROP_ALLOC_PAD:
g_value_set_object (value, tee->allocpad);
break;
case PROP_ALLOW_NOT_LINKED:
g_value_set_boolean (value, tee->allow_not_linked);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
GST_OBJECT_UNLOCK (tee);
}
static gboolean
gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
gboolean res;
switch (GST_EVENT_TYPE (event)) {
default:
res = gst_pad_event_default (pad, parent, event);
break;
}
return res;
}
struct AllocQueryCtx
{
GstTee *tee;
GstQuery *query;
GstAllocationParams params;
guint size;
guint min_buffers;
gboolean first_query;
guint num_pads;
};
/* This function will aggregate some of the allocation query information with
* the strategy to force upstream allocation. Depending on downstream
* allocation would otherwise make dynamic pipelines much more complicated as
* application would need to now drain buffer in certain cases before getting
* rid of a tee branch. */
static gboolean
gst_tee_query_allocation (const GValue * item, GValue * ret, gpointer user_data)
{
struct AllocQueryCtx *ctx = user_data;
GstPad *src_pad = g_value_get_object (item);
GstPad *peer_pad;
GstCaps *caps;
GstQuery *query;
guint count, i, size, min;
GST_DEBUG_OBJECT (ctx->tee, "Aggregating allocation from pad %s:%s",
GST_DEBUG_PAD_NAME (src_pad));
peer_pad = gst_pad_get_peer (src_pad);
if (!peer_pad) {
if (ctx->tee->allow_not_linked) {
GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, but allowed.",
GST_DEBUG_PAD_NAME (src_pad));
return TRUE;
} else {
GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, ignoring allocation.",
GST_DEBUG_PAD_NAME (src_pad));
g_value_set_boolean (ret, FALSE);
return FALSE;
}
}
gst_query_parse_allocation (ctx->query, &caps, NULL);
query = gst_query_new_allocation (caps, FALSE);
if (!gst_pad_query (peer_pad, query)) {
GST_DEBUG_OBJECT (ctx->tee,
"Allocation query failed on pad %s, ignoring allocation",
GST_PAD_NAME (src_pad));
g_value_set_boolean (ret, FALSE);
gst_query_unref (query);
gst_object_unref (peer_pad);
return FALSE;
}
gst_object_unref (peer_pad);
/* Allocation Params:
* store the maximum alignment, prefix and padding, but ignore the
* allocators and the flags which are tied to downstream allocation */
count = gst_query_get_n_allocation_params (query);
for (i = 0; i < count; i++) {
GstAllocationParams params = { 0, };
gst_query_parse_nth_allocation_param (query, i, NULL, &params);
GST_DEBUG_OBJECT (ctx->tee, "Aggregating AllocationParams align=%"
G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%"
G_GSIZE_FORMAT, params.align, params.prefix, params.padding);
if (ctx->params.align < params.align)
ctx->params.align = params.align;
if (ctx->params.prefix < params.prefix)
ctx->params.prefix = params.prefix;
if (ctx->params.padding < params.padding)
ctx->params.padding = params.padding;
}
/* Allocation Pool:
* We want to keep the biggest size and biggest minimum number of buffers to
* make sure downstream requirement can be satisfied. We don't really care
* about the maximum, as this is a parameter of the downstream provided
* pool. We only read the first allocation pool as the minimum number of
* buffers is normally constant regardless of the pool being used. */
if (gst_query_get_n_allocation_pools (query) > 0) {
gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL);
GST_DEBUG_OBJECT (ctx->tee,
"Aggregating allocation pool size=%u min_buffers=%u", size, min);
if (ctx->size < size)
ctx->size = size;
if (ctx->min_buffers < min)
ctx->min_buffers = min;
}
/* Allocation Meta:
* For allocation meta, we'll need to aggregate the argument using the new
* GstMetaInfo::agggregate_func */
count = gst_query_get_n_allocation_metas (query);
for (i = 0; i < count; i++) {
guint ctx_index;
GType api;
const GstStructure *param;
api = gst_query_parse_nth_allocation_meta (query, i, &param);
/* For the first query, copy all metas */
if (ctx->first_query) {
gst_query_add_allocation_meta (ctx->query, api, param);
continue;
}
/* Afterward, aggregate the common params */
if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) {
const GstStructure *ctx_param;
gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param);
/* Keep meta which has no params */
if (ctx_param == NULL && param == NULL)
continue;
GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s",
g_type_name (api));
gst_query_remove_nth_allocation_meta (ctx->query, ctx_index);
}
}
/* Finally, cleanup metas from the stored query that aren't support on this
* pad. */
count = gst_query_get_n_allocation_metas (ctx->query);
for (i = 0; i < count;) {
GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL);
if (!gst_query_find_allocation_meta (query, api, NULL)) {
GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s",
g_type_name (api));
gst_query_remove_nth_allocation_meta (ctx->query, i);
count--;
continue;
}
i++;
}
ctx->first_query = FALSE;
ctx->num_pads++;
gst_query_unref (query);
return TRUE;
}
static void
gst_tee_clear_query_allocation_meta (GstQuery * query)
{
guint count = gst_query_get_n_allocation_metas (query);
guint i;
for (i = 1; i <= count; i++)
gst_query_remove_nth_allocation_meta (query, count - i);
}
static gboolean
gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstTee *tee = GST_TEE (parent);
gboolean res;
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_ALLOCATION:
{
GstIterator *iter;
GValue ret = G_VALUE_INIT;
struct AllocQueryCtx ctx = { tee, query, };
g_value_init (&ret, G_TYPE_BOOLEAN);
g_value_set_boolean (&ret, TRUE);
ctx.first_query = TRUE;
gst_allocation_params_init (&ctx.params);
iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
while (GST_ITERATOR_RESYNC ==
gst_iterator_fold (iter, gst_tee_query_allocation, &ret, &ctx)) {
gst_iterator_resync (iter);
ctx.first_query = TRUE;
gst_allocation_params_init (&ctx.params);
ctx.size = 0;
ctx.min_buffers = 0;
ctx.num_pads = 0;
gst_tee_clear_query_allocation_meta (query);
}
gst_iterator_free (iter);
res = g_value_get_boolean (&ret);
g_value_unset (&ret);
if (res) {
GST_DEBUG_OBJECT (tee, "Aggregated AllocationParams to align=%"
G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%"
G_GSIZE_FORMAT, ctx.params.align, ctx.params.prefix,
ctx.params.padding);
GST_DEBUG_OBJECT (tee,
"Aggregated allocation pools size=%u min_buffers=%u", ctx.size,
ctx.min_buffers);
#ifndef GST_DISABLE_GST_DEBUG
{
guint count = gst_query_get_n_allocation_metas (query);
guint i;
GST_DEBUG_OBJECT (tee, "Aggregated %u allocation meta:", count);
for (i = 0; i < count; i++)
GST_DEBUG_OBJECT (tee, " + aggregated allocation meta %s",
g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i,
NULL)));
}
#endif
/* Allocate one more buffers when multiplexing so we don't starve the
* downstream threads. */
if (ctx.num_pads > 1)
ctx.min_buffers++;
/* Check that we actually have parameters besides the defaults. */
if (ctx.params.align || ctx.params.prefix || ctx.params.padding) {
gst_query_add_allocation_param (ctx.query, NULL, &ctx.params);
}
/* When size == 0, buffers created from this pool would have no memory
* allocated. */
if (ctx.size) {
gst_query_add_allocation_pool (ctx.query, NULL, ctx.size,
ctx.min_buffers, 0);
}
} else {
gst_tee_clear_query_allocation_meta (query);
}
break;
}
default:
res = gst_pad_query_default (pad, parent, query);
break;
}
return res;
}
static void
gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
{
GST_OBJECT_LOCK (tee);
g_free (tee->last_message);
if (is_list) {
tee->last_message =
g_strdup_printf ("chain-list ******* (%s:%s)t %p",
GST_DEBUG_PAD_NAME (pad), data);
} else {
tee->last_message =
g_strdup_printf ("chain ******* (%s:%s)t (%" G_GSIZE_FORMAT
" bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
gst_buffer_get_size (data), GST_BUFFER_TIMESTAMP (data), data);
}
GST_OBJECT_UNLOCK (tee);
g_object_notify_by_pspec ((GObject *) tee, pspec_last_message);
}
static GstFlowReturn
gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
{
GstFlowReturn res;
/* Push */
if (pad == tee->pull_pad) {
/* don't push on the pad we're pulling from */
res = GST_FLOW_OK;
} else if (is_list) {
res =
gst_pad_push_list (pad,
gst_buffer_list_ref (GST_BUFFER_LIST_CAST (data)));
} else {
res = gst_pad_push (pad, gst_buffer_ref (GST_BUFFER_CAST (data)));
}
return res;
}
static void
clear_pads (GstPad * pad, GstTee * tee)
{
GST_TEE_PAD_CAST (pad)->pushed = FALSE;
GST_TEE_PAD_CAST (pad)->result = GST_FLOW_NOT_LINKED;
}
static GstFlowReturn
gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list)
{
GList *pads;
guint32 cookie;
GstFlowReturn ret, cret;
if (G_UNLIKELY (!tee->silent))
gst_tee_do_message (tee, tee->sinkpad, data, is_list);
GST_OBJECT_LOCK (tee);
pads = GST_ELEMENT_CAST (tee)->srcpads;
/* special case for zero pads */
if (G_UNLIKELY (!pads))
goto no_pads;
/* special case for just one pad that avoids reffing the buffer */
if (!pads->next) {
GstPad *pad = GST_PAD_CAST (pads->data);
/* Keep another ref around, a pad probe
* might release and destroy the pad */
gst_object_ref (pad);
GST_OBJECT_UNLOCK (tee);
if (pad == tee->pull_pad) {
ret = GST_FLOW_OK;
} else if (is_list) {
ret = gst_pad_push_list (pad, GST_BUFFER_LIST_CAST (data));
} else {
ret = gst_pad_push (pad, GST_BUFFER_CAST (data));
}
GST_OBJECT_LOCK (tee);
if (GST_TEE_PAD_CAST (pad)->removed)
ret = GST_FLOW_NOT_LINKED;
if (ret == GST_FLOW_NOT_LINKED && tee->allow_not_linked) {
ret = GST_FLOW_OK;
}
GST_OBJECT_UNLOCK (tee);
gst_object_unref (pad);
return ret;
}
/* mark all pads as 'not pushed on yet' */
g_list_foreach (pads, (GFunc) clear_pads, tee);
restart:
if (tee->allow_not_linked) {
cret = GST_FLOW_OK;
} else {
cret = GST_FLOW_NOT_LINKED;
}
pads = GST_ELEMENT_CAST (tee)->srcpads;
cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
while (pads) {
GstPad *pad;
pad = GST_PAD_CAST (pads->data);
if (G_LIKELY (!GST_TEE_PAD_CAST (pad)->pushed)) {
/* not yet pushed, release lock and start pushing */
gst_object_ref (pad);
GST_OBJECT_UNLOCK (tee);
GST_LOG_OBJECT (pad, "Starting to push %s %p",
is_list ? "list" : "buffer", data);
ret = gst_tee_do_push (tee, pad, data, is_list);
GST_LOG_OBJECT (pad, "Pushing item %p yielded result %s", data,
gst_flow_get_name (ret));
GST_OBJECT_LOCK (tee);
/* keep track of which pad we pushed and the result value */
if (GST_TEE_PAD_CAST (pad)->removed)
ret = GST_FLOW_NOT_LINKED;
GST_TEE_PAD_CAST (pad)->pushed = TRUE;
GST_TEE_PAD_CAST (pad)->result = ret;
gst_object_unref (pad);
pad = NULL;
} else {
/* already pushed, use previous return value */
ret = GST_TEE_PAD_CAST (pad)->result;
GST_LOG_OBJECT (pad, "pad already pushed with %s",
gst_flow_get_name (ret));
}
/* before we go combining the return value, check if the pad list is still
* the same. It could be possible that the pad we just pushed was removed
* and the return value it not valid anymore */
if (G_UNLIKELY (GST_ELEMENT_CAST (tee)->pads_cookie != cookie)) {
GST_LOG_OBJECT (tee, "pad list changed");
/* the list of pads changed, restart iteration. Pads that we already
* pushed on and are still in the new list, will not be pushed on
* again. */
goto restart;
}
/* stop pushing more buffers when we have a fatal error */
if (G_UNLIKELY (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED))
goto error;
/* keep all other return values, overwriting the previous one. */
if (G_LIKELY (ret != GST_FLOW_NOT_LINKED)) {
GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
cret = ret;
}
pads = g_list_next (pads);
}
GST_OBJECT_UNLOCK (tee);
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
/* no need to unset gvalue */
return cret;
/* ERRORS */
no_pads:
{
if (tee->allow_not_linked) {
GST_DEBUG_OBJECT (tee, "there are no pads, dropping %s",
is_list ? "buffer-list" : "buffer");
ret = GST_FLOW_OK;
} else {
GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked");
ret = GST_FLOW_NOT_LINKED;
}
goto end;
}
error:
{
GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
goto end;
}
end:
{
GST_OBJECT_UNLOCK (tee);
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
return ret;
}
}
static GstFlowReturn
gst_tee_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstFlowReturn res;
GstTee *tee;
tee = GST_TEE_CAST (parent);
GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
res = gst_tee_handle_data (tee, buffer, FALSE);
GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
return res;
}
static GstFlowReturn
gst_tee_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
{
GstFlowReturn res;
GstTee *tee;
tee = GST_TEE_CAST (parent);
GST_DEBUG_OBJECT (tee, "received list %p", list);
res = gst_tee_handle_data (tee, list, TRUE);
GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res));
return res;
}
static gboolean
gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
gboolean active)
{
gboolean res;
GstTee *tee;
tee = GST_TEE (parent);
switch (mode) {
case GST_PAD_MODE_PUSH:
{
GST_OBJECT_LOCK (tee);
tee->sink_mode = active ? mode : GST_PAD_MODE_NONE;
if (active && !tee->has_chain)
goto no_chain;
GST_OBJECT_UNLOCK (tee);
res = TRUE;
break;
}
default:
res = FALSE;
break;
}
return res;
/* ERRORS */
no_chain:
{
GST_OBJECT_UNLOCK (tee);
GST_INFO_OBJECT (tee,
"Tee cannot operate in push mode with has-chain==FALSE");
return FALSE;
}
}
static gboolean
gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
gboolean active)
{
GstTee *tee;
gboolean res;
GstPad *sinkpad;
tee = GST_TEE (parent);
switch (mode) {
case GST_PAD_MODE_PULL:
{
GST_OBJECT_LOCK (tee);
if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
goto cannot_pull;
if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
goto cannot_pull_multiple_srcs;
sinkpad = gst_object_ref (tee->sinkpad);
GST_OBJECT_UNLOCK (tee);
res = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, active);
gst_object_unref (sinkpad);
if (!res)
goto sink_activate_failed;
GST_OBJECT_LOCK (tee);
if (active) {
if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
tee->pull_pad = pad;
} else {
if (pad == tee->pull_pad)
tee->pull_pad = NULL;
}
tee->sink_mode = (active ? GST_PAD_MODE_PULL : GST_PAD_MODE_NONE);
GST_OBJECT_UNLOCK (tee);
break;
}
default:
res = TRUE;
break;
}
return res;
/* ERRORS */
cannot_pull:
{
GST_OBJECT_UNLOCK (tee);
GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
"set to NEVER");
return FALSE;
}
cannot_pull_multiple_srcs:
{
GST_OBJECT_UNLOCK (tee);
GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
"pull-mode set to SINGLE");
return FALSE;
}
sink_activate_failed:
{
GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
active ? "" : "de");
return FALSE;
}
}
static gboolean
gst_tee_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
{
GstTee *tee;
gboolean res;
GstPad *sinkpad;
tee = GST_TEE (parent);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_SCHEDULING:
{
gboolean pull_mode;
GST_OBJECT_LOCK (tee);
pull_mode = TRUE;
if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) {
GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
"set to NEVER");
pull_mode = FALSE;
} else if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad) {
GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
"pull-mode set to SINGLE");
pull_mode = FALSE;
}
sinkpad = gst_object_ref (tee->sinkpad);
GST_OBJECT_UNLOCK (tee);
if (pull_mode) {
/* ask peer if we can operate in pull mode */
res = gst_pad_peer_query (sinkpad, query);
} else {
res = TRUE;
}
gst_object_unref (sinkpad);
break;
}
default:
res = gst_pad_query_default (pad, parent, query);
break;
}
return res;
}
static void
gst_tee_push_eos (const GValue * vpad, GstTee * tee)
{
GstPad *pad = g_value_get_object (vpad);
if (pad != tee->pull_pad)
gst_pad_push_event (pad, gst_event_new_eos ());
}
static void
gst_tee_pull_eos (GstTee * tee)
{
GstIterator *iter;
iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
while (gst_iterator_foreach (iter,
(GstIteratorForeachFunction) gst_tee_push_eos,
tee) == GST_ITERATOR_RESYNC)
gst_iterator_resync (iter);
gst_iterator_free (iter);
}
static GstFlowReturn
gst_tee_src_get_range (GstPad * pad, GstObject * parent, guint64 offset,
guint length, GstBuffer ** buf)
{
GstTee *tee;
GstFlowReturn ret;
tee = GST_TEE (parent);
ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
if (ret == GST_FLOW_OK)
ret = gst_tee_handle_data (tee, gst_buffer_ref (*buf), FALSE);
else if (ret == GST_FLOW_EOS)
gst_tee_pull_eos (tee);
return ret;
}