mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-02-12 17:25:36 +00:00
Otherwise it can happen that e.g. the stream-start event is tried to be sent as part of pushing the first buffer. Downstream might not be in PAUSED/PLAYING yet, so the event is rejected with GST_FLOW_FLUSHING and because it's an event would not cause the blocking pad probe to trigger first. This would then return GST_FLOW_FLUSHING for the buffer and shut down all of upstream. To solve this we return GST_PAD_PROBE_DROP for all events. In case of sticky events they would be resent again later once we unblocked after blocking on the buffer and everything works fine. Don't handle events specifically in sink pad blocking pad probes as here downstream is not linked yet and we are actually waiting for the following CAPS event before unblocking can happen. Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/1172
472 lines
16 KiB
C
472 lines
16 KiB
C
/* GStreamer
|
|
* Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
# include "config.h"
|
|
#endif
|
|
|
|
#include "transportreceivebin.h"
|
|
#include "utils.h"
|
|
|
|
/*
|
|
* ,----------------------------transport_receive_%u---------------------------,
|
|
* ; (rtp/data) ;
|
|
* ; ,-nicesrc-, ,-capsfilter-, ,--queue--, ,-dtlssrtpdec-, ,-funnel-, ;
|
|
* ; ; src o-o sink src o-osink srco-osink rtp_srco-------o sink_0 ; ;
|
|
* ; '---------' '------------' '---------' ; ; ; src o--o rtp_src
|
|
* ; ; rtcp_srco---, ,-o sink_1 ; ;
|
|
* ; ; ; ; ; '--------' ;
|
|
* ; ; data_srco-, ; ; ,-funnel-, ;
|
|
* ; (rtcp) '-------------' ; '-+-o sink_0 ; ;
|
|
* ; ,-nicesrc-, ,-capsfilter-, ,--queue--, ,-dtlssrtpdec-, ; ,-' ; src o--o rtcp_src
|
|
* ; ; src o-o sink src o-osink srco-osink rtp_srco-+-' ,-o sink_1 ; ;
|
|
* ; '---------' '------------' '---------' ; ; ; ; '--------' ;
|
|
* ; ; rtcp_srco-+---' ,-funnel-, ;
|
|
* ; ; ; '-----o sink_0 ; ;
|
|
* ; ; data_srco-, ; src o--o data_src
|
|
* ; '-------------' '-----o sink_1 ; ;
|
|
* ; '--------' ;
|
|
* '---------------------------------------------------------------------------'
|
|
*
|
|
* Do we really wnat to be *that* permissive in what we accept?
|
|
*
|
|
* FIXME: When and how do we want to clear the possibly stored buffers?
|
|
*/
|
|
|
|
#define GST_CAT_DEFAULT gst_webrtc_transport_receive_bin_debug
|
|
GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
|
|
|
|
#define transport_receive_bin_parent_class parent_class
|
|
G_DEFINE_TYPE_WITH_CODE (TransportReceiveBin, transport_receive_bin,
|
|
GST_TYPE_BIN,
|
|
GST_DEBUG_CATEGORY_INIT (gst_webrtc_transport_receive_bin_debug,
|
|
"webrtctransportreceivebin", 0, "webrtctransportreceivebin");
|
|
);
|
|
|
|
static GstStaticPadTemplate rtp_sink_template =
|
|
GST_STATIC_PAD_TEMPLATE ("rtp_src",
|
|
GST_PAD_SINK,
|
|
GST_PAD_ALWAYS,
|
|
GST_STATIC_CAPS ("application/x-rtp"));
|
|
|
|
static GstStaticPadTemplate rtcp_sink_template =
|
|
GST_STATIC_PAD_TEMPLATE ("rtcp_src",
|
|
GST_PAD_SINK,
|
|
GST_PAD_ALWAYS,
|
|
GST_STATIC_CAPS ("application/x-rtp"));
|
|
|
|
static GstStaticPadTemplate data_sink_template =
|
|
GST_STATIC_PAD_TEMPLATE ("data_src",
|
|
GST_PAD_SINK,
|
|
GST_PAD_ALWAYS,
|
|
GST_STATIC_CAPS_ANY);
|
|
|
|
enum
|
|
{
|
|
PROP_0,
|
|
PROP_STREAM,
|
|
};
|
|
|
|
static const gchar *
|
|
_receive_state_to_string (ReceiveState state)
|
|
{
|
|
switch (state) {
|
|
case RECEIVE_STATE_BLOCK:
|
|
return "block";
|
|
case RECEIVE_STATE_DROP:
|
|
return "drop";
|
|
case RECEIVE_STATE_PASS:
|
|
return "pass";
|
|
default:
|
|
return "Unknown";
|
|
}
|
|
}
|
|
|
|
static GstPadProbeReturn
|
|
pad_block (GstPad * pad, GstPadProbeInfo * info, TransportReceiveBin * receive)
|
|
{
|
|
/* Drop all events: we don't care about them and don't want to block on
|
|
* them. Sticky events would be forwarded again later once we unblock
|
|
* and we don't want to forward them here already because that might
|
|
* cause a spurious GST_FLOW_FLUSHING */
|
|
if (GST_IS_EVENT (info->data))
|
|
return GST_PAD_PROBE_DROP;
|
|
|
|
/* But block on any actual data-flow so we don't accidentally send that
|
|
* to a pad that is not ready yet, causing GST_FLOW_FLUSHING and everything
|
|
* to silently stop.
|
|
*/
|
|
GST_LOG_OBJECT (pad, "blocking pad with data %" GST_PTR_FORMAT, info->data);
|
|
|
|
return GST_PAD_PROBE_OK;
|
|
}
|
|
|
|
static GstPadProbeReturn
|
|
src_probe_cb (GstPad * pad, GstPadProbeInfo * info,
|
|
TransportReceiveBin * receive)
|
|
{
|
|
GstPadProbeReturn ret;
|
|
|
|
g_mutex_lock (&receive->pad_block_lock);
|
|
|
|
g_assert (receive->receive_state != RECEIVE_STATE_BLOCK);
|
|
|
|
ret =
|
|
receive->receive_state ==
|
|
RECEIVE_STATE_DROP ? GST_PAD_PROBE_DROP : GST_PAD_PROBE_OK;
|
|
|
|
g_mutex_unlock (&receive->pad_block_lock);
|
|
|
|
return ret;
|
|
}
|
|
|
|
void
|
|
transport_receive_bin_set_receive_state (TransportReceiveBin * receive,
|
|
ReceiveState state)
|
|
{
|
|
/* We currently don't support going into BLOCK again and the current code
|
|
* can't possible do that */
|
|
g_assert (state != RECEIVE_STATE_BLOCK);
|
|
|
|
g_mutex_lock (&receive->pad_block_lock);
|
|
receive->receive_state = state;
|
|
GST_DEBUG_OBJECT (receive, "changing receive state to %s",
|
|
_receive_state_to_string (state));
|
|
|
|
if (receive->rtp_block)
|
|
_free_pad_block (receive->rtp_block);
|
|
receive->rtp_block = NULL;
|
|
|
|
if (receive->rtcp_block)
|
|
_free_pad_block (receive->rtcp_block);
|
|
receive->rtcp_block = NULL;
|
|
|
|
g_mutex_unlock (&receive->pad_block_lock);
|
|
}
|
|
|
|
static void
|
|
transport_receive_bin_set_property (GObject * object, guint prop_id,
|
|
const GValue * value, GParamSpec * pspec)
|
|
{
|
|
TransportReceiveBin *receive = TRANSPORT_RECEIVE_BIN (object);
|
|
|
|
GST_OBJECT_LOCK (receive);
|
|
switch (prop_id) {
|
|
case PROP_STREAM:
|
|
/* XXX: weak-ref this? */
|
|
receive->stream = TRANSPORT_STREAM (g_value_get_object (value));
|
|
break;
|
|
default:
|
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
|
break;
|
|
}
|
|
GST_OBJECT_UNLOCK (receive);
|
|
}
|
|
|
|
static void
|
|
transport_receive_bin_get_property (GObject * object, guint prop_id,
|
|
GValue * value, GParamSpec * pspec)
|
|
{
|
|
TransportReceiveBin *receive = TRANSPORT_RECEIVE_BIN (object);
|
|
|
|
GST_OBJECT_LOCK (receive);
|
|
switch (prop_id) {
|
|
case PROP_STREAM:
|
|
g_value_set_object (value, receive->stream);
|
|
break;
|
|
default:
|
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
|
break;
|
|
}
|
|
GST_OBJECT_UNLOCK (receive);
|
|
}
|
|
|
|
static void
|
|
transport_receive_bin_finalize (GObject * object)
|
|
{
|
|
TransportReceiveBin *receive = TRANSPORT_RECEIVE_BIN (object);
|
|
|
|
g_mutex_clear (&receive->pad_block_lock);
|
|
|
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
|
}
|
|
|
|
static GstStateChangeReturn
|
|
transport_receive_bin_change_state (GstElement * element,
|
|
GstStateChange transition)
|
|
{
|
|
TransportReceiveBin *receive = TRANSPORT_RECEIVE_BIN (element);
|
|
GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
|
|
|
|
GST_DEBUG ("changing state: %s => %s",
|
|
gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
|
|
gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
|
|
|
|
switch (transition) {
|
|
case GST_STATE_CHANGE_NULL_TO_READY:{
|
|
GstWebRTCDTLSTransport *transport;
|
|
GstElement *elem, *dtlssrtpdec;
|
|
GstPad *pad;
|
|
|
|
transport = receive->stream->transport;
|
|
dtlssrtpdec = transport->dtlssrtpdec;
|
|
pad = gst_element_get_static_pad (dtlssrtpdec, "sink");
|
|
receive->rtp_block =
|
|
_create_pad_block (GST_ELEMENT (receive), pad, 0, NULL, NULL);
|
|
receive->rtp_block->block_id =
|
|
gst_pad_add_probe (pad,
|
|
GST_PAD_PROBE_TYPE_BLOCK |
|
|
GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
|
|
(GstPadProbeCallback) pad_block, receive, NULL);
|
|
gst_object_unref (pad);
|
|
|
|
receive->rtp_src_probe_id = gst_pad_add_probe (receive->rtp_src,
|
|
GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
|
|
(GstPadProbeCallback) src_probe_cb, receive, NULL);
|
|
|
|
transport = receive->stream->rtcp_transport;
|
|
dtlssrtpdec = transport->dtlssrtpdec;
|
|
pad = gst_element_get_static_pad (dtlssrtpdec, "sink");
|
|
receive->rtcp_block =
|
|
_create_pad_block (GST_ELEMENT (receive), pad, 0, NULL, NULL);
|
|
receive->rtcp_block->block_id =
|
|
gst_pad_add_probe (pad,
|
|
GST_PAD_PROBE_TYPE_BLOCK |
|
|
GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
|
|
(GstPadProbeCallback) pad_block, receive, NULL);
|
|
gst_object_unref (pad);
|
|
|
|
receive->rtcp_src_probe_id = gst_pad_add_probe (receive->rtcp_src,
|
|
GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
|
|
(GstPadProbeCallback) src_probe_cb, receive, NULL);
|
|
|
|
/* XXX: because nice needs the nicesrc internal main loop running in order
|
|
* correctly STUN... */
|
|
/* FIXME: this races with the pad exposure later and may get not-linked */
|
|
elem = receive->stream->transport->transport->src;
|
|
gst_element_set_locked_state (elem, TRUE);
|
|
gst_element_set_state (elem, GST_STATE_PLAYING);
|
|
elem = receive->stream->rtcp_transport->transport->src;
|
|
gst_element_set_locked_state (elem, TRUE);
|
|
gst_element_set_state (elem, GST_STATE_PLAYING);
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
|
|
if (ret == GST_STATE_CHANGE_FAILURE)
|
|
return ret;
|
|
|
|
switch (transition) {
|
|
case GST_STATE_CHANGE_READY_TO_NULL:{
|
|
GstElement *elem;
|
|
|
|
elem = receive->stream->transport->transport->src;
|
|
gst_element_set_locked_state (elem, FALSE);
|
|
gst_element_set_state (elem, GST_STATE_NULL);
|
|
elem = receive->stream->rtcp_transport->transport->src;
|
|
gst_element_set_locked_state (elem, FALSE);
|
|
gst_element_set_state (elem, GST_STATE_NULL);
|
|
|
|
if (receive->rtp_block)
|
|
_free_pad_block (receive->rtp_block);
|
|
receive->rtp_block = NULL;
|
|
|
|
if (receive->rtp_src_probe_id)
|
|
gst_pad_remove_probe (receive->rtp_src, receive->rtp_src_probe_id);
|
|
receive->rtp_src_probe_id = 0;
|
|
|
|
if (receive->rtcp_block)
|
|
_free_pad_block (receive->rtcp_block);
|
|
receive->rtcp_block = NULL;
|
|
|
|
if (receive->rtcp_src_probe_id)
|
|
gst_pad_remove_probe (receive->rtcp_src, receive->rtcp_src_probe_id);
|
|
receive->rtcp_src_probe_id = 0;
|
|
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static void
|
|
rtp_queue_overrun (GstElement * queue, TransportReceiveBin * receive)
|
|
{
|
|
GST_WARNING_OBJECT (receive, "Internal receive queue overrun. Dropping data");
|
|
}
|
|
|
|
static void
|
|
transport_receive_bin_constructed (GObject * object)
|
|
{
|
|
TransportReceiveBin *receive = TRANSPORT_RECEIVE_BIN (object);
|
|
GstWebRTCDTLSTransport *transport;
|
|
GstPad *ghost, *pad;
|
|
GstElement *capsfilter, *funnel, *queue;
|
|
GstCaps *caps;
|
|
|
|
g_return_if_fail (receive->stream);
|
|
|
|
/* link ice src, dtlsrtp together for rtp */
|
|
transport = receive->stream->transport;
|
|
gst_bin_add (GST_BIN (receive), GST_ELEMENT (transport->dtlssrtpdec));
|
|
|
|
capsfilter = gst_element_factory_make ("capsfilter", NULL);
|
|
caps = gst_caps_new_empty_simple ("application/x-rtp");
|
|
g_object_set (capsfilter, "caps", caps, NULL);
|
|
gst_caps_unref (caps);
|
|
|
|
queue = gst_element_factory_make ("queue", NULL);
|
|
/* FIXME: make this configurable? */
|
|
g_object_set (queue, "leaky", 2, "max-size-time", (guint64) 0,
|
|
"max-size-buffers", 0, "max-size-bytes", 5 * 1024 * 1024, NULL);
|
|
g_signal_connect (queue, "overrun", G_CALLBACK (rtp_queue_overrun), receive);
|
|
|
|
gst_bin_add (GST_BIN (receive), GST_ELEMENT (queue));
|
|
gst_bin_add (GST_BIN (receive), GST_ELEMENT (capsfilter));
|
|
if (!gst_element_link_pads (capsfilter, "src", queue, "sink"))
|
|
g_warn_if_reached ();
|
|
|
|
if (!gst_element_link_pads (queue, "src", transport->dtlssrtpdec, "sink"))
|
|
g_warn_if_reached ();
|
|
|
|
gst_bin_add (GST_BIN (receive), GST_ELEMENT (transport->transport->src));
|
|
if (!gst_element_link_pads (GST_ELEMENT (transport->transport->src), "src",
|
|
GST_ELEMENT (capsfilter), "sink"))
|
|
g_warn_if_reached ();
|
|
|
|
/* link ice src, dtlsrtp together for rtcp */
|
|
transport = receive->stream->rtcp_transport;
|
|
gst_bin_add (GST_BIN (receive), GST_ELEMENT (transport->dtlssrtpdec));
|
|
|
|
capsfilter = gst_element_factory_make ("capsfilter", NULL);
|
|
caps = gst_caps_new_empty_simple ("application/x-rtcp");
|
|
g_object_set (capsfilter, "caps", caps, NULL);
|
|
gst_caps_unref (caps);
|
|
|
|
queue = gst_element_factory_make ("queue", NULL);
|
|
/* FIXME: make this configurable? */
|
|
g_object_set (queue, "leaky", 2, "max-size-time", (guint64) 0,
|
|
"max-size-buffers", 0, "max-size-bytes", 5 * 1024 * 1024, NULL);
|
|
g_signal_connect (queue, "overrun", G_CALLBACK (rtp_queue_overrun), receive);
|
|
|
|
gst_bin_add (GST_BIN (receive), queue);
|
|
gst_bin_add (GST_BIN (receive), GST_ELEMENT (capsfilter));
|
|
if (!gst_element_link_pads (capsfilter, "src", queue, "sink"))
|
|
g_warn_if_reached ();
|
|
|
|
if (!gst_element_link_pads (queue, "src", transport->dtlssrtpdec, "sink"))
|
|
g_warn_if_reached ();
|
|
|
|
gst_bin_add (GST_BIN (receive), GST_ELEMENT (transport->transport->src));
|
|
if (!gst_element_link_pads (GST_ELEMENT (transport->transport->src), "src",
|
|
GST_ELEMENT (capsfilter), "sink"))
|
|
g_warn_if_reached ();
|
|
|
|
/* create funnel for rtp_src */
|
|
funnel = gst_element_factory_make ("funnel", NULL);
|
|
gst_bin_add (GST_BIN (receive), funnel);
|
|
if (!gst_element_link_pads (receive->stream->transport->dtlssrtpdec,
|
|
"rtp_src", funnel, "sink_0"))
|
|
g_warn_if_reached ();
|
|
if (!gst_element_link_pads (receive->stream->rtcp_transport->dtlssrtpdec,
|
|
"rtp_src", funnel, "sink_1"))
|
|
g_warn_if_reached ();
|
|
|
|
pad = gst_element_get_static_pad (funnel, "src");
|
|
receive->rtp_src = gst_ghost_pad_new ("rtp_src", pad);
|
|
|
|
gst_element_add_pad (GST_ELEMENT (receive), receive->rtp_src);
|
|
gst_object_unref (pad);
|
|
|
|
/* create funnel for rtcp_src */
|
|
funnel = gst_element_factory_make ("funnel", NULL);
|
|
gst_bin_add (GST_BIN (receive), funnel);
|
|
if (!gst_element_link_pads (receive->stream->transport->dtlssrtpdec,
|
|
"rtcp_src", funnel, "sink_0"))
|
|
g_warn_if_reached ();
|
|
if (!gst_element_link_pads (receive->stream->rtcp_transport->dtlssrtpdec,
|
|
"rtcp_src", funnel, "sink_1"))
|
|
g_warn_if_reached ();
|
|
|
|
pad = gst_element_get_static_pad (funnel, "src");
|
|
receive->rtcp_src = gst_ghost_pad_new ("rtcp_src", pad);
|
|
gst_element_add_pad (GST_ELEMENT (receive), receive->rtcp_src);
|
|
gst_object_unref (pad);
|
|
|
|
/* create funnel for data_src */
|
|
funnel = gst_element_factory_make ("funnel", NULL);
|
|
gst_bin_add (GST_BIN (receive), funnel);
|
|
if (!gst_element_link_pads (receive->stream->transport->dtlssrtpdec,
|
|
"data_src", funnel, "sink_0"))
|
|
g_warn_if_reached ();
|
|
if (!gst_element_link_pads (receive->stream->rtcp_transport->dtlssrtpdec,
|
|
"data_src", funnel, "sink_1"))
|
|
g_warn_if_reached ();
|
|
|
|
pad = gst_element_get_static_pad (funnel, "src");
|
|
ghost = gst_ghost_pad_new ("data_src", pad);
|
|
gst_element_add_pad (GST_ELEMENT (receive), ghost);
|
|
gst_object_unref (pad);
|
|
|
|
G_OBJECT_CLASS (parent_class)->constructed (object);
|
|
}
|
|
|
|
static void
|
|
transport_receive_bin_class_init (TransportReceiveBinClass * klass)
|
|
{
|
|
GObjectClass *gobject_class = (GObjectClass *) klass;
|
|
GstElementClass *element_class = (GstElementClass *) klass;
|
|
|
|
element_class->change_state = transport_receive_bin_change_state;
|
|
|
|
gst_element_class_add_static_pad_template (element_class, &rtp_sink_template);
|
|
gst_element_class_add_static_pad_template (element_class,
|
|
&rtcp_sink_template);
|
|
gst_element_class_add_static_pad_template (element_class,
|
|
&data_sink_template);
|
|
|
|
gst_element_class_set_metadata (element_class, "WebRTC Transport Receive Bin",
|
|
"Filter/Network/WebRTC", "A bin for webrtc connections",
|
|
"Matthew Waters <matthew@centricular.com>");
|
|
|
|
gobject_class->constructed = transport_receive_bin_constructed;
|
|
gobject_class->get_property = transport_receive_bin_get_property;
|
|
gobject_class->set_property = transport_receive_bin_set_property;
|
|
gobject_class->finalize = transport_receive_bin_finalize;
|
|
|
|
g_object_class_install_property (gobject_class,
|
|
PROP_STREAM,
|
|
g_param_spec_object ("stream", "Stream",
|
|
"The TransportStream for this receiving bin",
|
|
transport_stream_get_type (),
|
|
G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
|
|
}
|
|
|
|
static void
|
|
transport_receive_bin_init (TransportReceiveBin * receive)
|
|
{
|
|
receive->receive_state = RECEIVE_STATE_BLOCK;
|
|
g_mutex_init (&receive->pad_block_lock);
|
|
}
|