gstreamer/subprojects/gst-plugins-bad/ext/sctp/gstsctpenc.c
Johan Sternerup a1f0727186 sctpenc: Fix potential shutdown deadlock
When transitioning from state PAUSED to READY, the sctpenc element
could previously be stuck in an endless loop trying to resend data
in case the underlying sctp stream was in the process of
resetting. usrsctp_sendv() would repeatedly return EAGAIN with the
result that 0 bytes were sent and then sctpenc would retry forever.

To bring sctpenc out of the resend loop we just need to inform the
sink pad that it is flushing, which is already done for the associated
data queue, but we also need to set the bools associated with the
sinkpads that are used as the loop criterion.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4601>
2023-05-15 06:57:07 +00:00

1064 lines
32 KiB
C

/*
* Copyright (c) 2015, Collabora Ltd.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice, this
* list of conditions and the following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
* OF SUCH DAMAGE.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstsctpenc.h"
#include <gst/sctp/sctpsendmeta.h>
#include <stdio.h>
GST_DEBUG_CATEGORY_STATIC (gst_sctp_enc_debug_category);
#define GST_CAT_DEFAULT gst_sctp_enc_debug_category
#define gst_sctp_enc_parent_class parent_class
G_DEFINE_TYPE (GstSctpEnc, gst_sctp_enc, GST_TYPE_ELEMENT);
GST_ELEMENT_REGISTER_DEFINE (sctpenc, "sctpenc", GST_RANK_NONE,
GST_TYPE_SCTP_ENC);
static GstStaticPadTemplate sink_template =
GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK,
GST_PAD_REQUEST, GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate src_template =
GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
enum
{
SIGNAL_SCTP_ASSOCIATION_ESTABLISHED,
SIGNAL_GET_STREAM_BYTES_SENT,
NUM_SIGNALS
};
static guint signals[NUM_SIGNALS];
enum
{
PROP_0,
PROP_GST_SCTP_ASSOCIATION_ID,
PROP_REMOTE_SCTP_PORT,
PROP_USE_SOCK_STREAM,
NUM_PROPERTIES
};
static GParamSpec *properties[NUM_PROPERTIES];
#define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
#define DEFAULT_REMOTE_SCTP_PORT 0
#define DEFAULT_GST_SCTP_ORDERED TRUE
#define DEFAULT_SCTP_PPID 1
#define DEFAULT_USE_SOCK_STREAM FALSE
#define BUFFER_FULL_SLEEP_TIME 100000
GType gst_sctp_enc_pad_get_type (void);
#define GST_TYPE_SCTP_ENC_PAD (gst_sctp_enc_pad_get_type())
#define GST_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPad))
#define GST_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPadClass))
#define GST_IS_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC_PAD))
#define GST_IS_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC_PAD))
typedef struct _GstSctpEncPad GstSctpEncPad;
typedef GstPadClass GstSctpEncPadClass;
struct _GstSctpEncPad
{
GstPad parent;
guint16 stream_id;
gboolean ordered;
guint32 ppid;
GstSctpAssociationPartialReliability reliability;
guint32 reliability_param;
guint64 bytes_sent;
GMutex lock;
GCond cond;
gboolean flushing;
gboolean clear_to_send;
};
G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);
static void
gst_sctp_enc_pad_finalize (GObject * object)
{
GstSctpEncPad *self = GST_SCTP_ENC_PAD (object);
g_cond_clear (&self->cond);
g_mutex_clear (&self->lock);
G_OBJECT_CLASS (gst_sctp_enc_pad_parent_class)->finalize (object);
}
static void
gst_sctp_enc_pad_class_init (GstSctpEncPadClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
gobject_class->finalize = gst_sctp_enc_pad_finalize;
}
static void
gst_sctp_enc_pad_init (GstSctpEncPad * self)
{
g_mutex_init (&self->lock);
g_cond_init (&self->cond);
self->flushing = FALSE;
self->clear_to_send = FALSE;
}
static void gst_sctp_enc_finalize (GObject * object);
static void gst_sctp_enc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_sctp_enc_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstStateChangeReturn gst_sctp_enc_change_state (GstElement * element,
GstStateChange transition);
static GstPad *gst_sctp_enc_request_new_pad (GstElement * element,
GstPadTemplate * template, const gchar * name, const GstCaps * caps);
static void gst_sctp_enc_release_pad (GstElement * element, GstPad * pad);
static void gst_sctp_enc_srcpad_loop (GstPad * pad);
static GstFlowReturn gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer);
static gboolean gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static gboolean gst_sctp_enc_src_event (GstPad * pad, GstObject * parent,
GstEvent * event);
static void on_sctp_association_state_changed (GstSctpAssociation *
sctp_association, GParamSpec * pspec, GstSctpEnc * self);
static gboolean configure_association (GstSctpEnc * self);
static void on_sctp_packet_out (GstSctpAssociation * sctp_association,
const guint8 * buf, gsize length, gpointer user_data);
static void stop_srcpad_task (GstPad * pad, GstSctpEnc * self);
static void sctpenc_cleanup (GstSctpEnc * self);
static void get_config_from_caps (const GstCaps * caps, gboolean * ordered,
GstSctpAssociationPartialReliability * reliability,
guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available);
static guint64 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id);
static void
gst_sctp_enc_class_init (GstSctpEncClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *element_class;
gobject_class = (GObjectClass *) klass;
element_class = (GstElementClass *) klass;
GST_DEBUG_CATEGORY_INIT (gst_sctp_enc_debug_category,
"sctpenc", 0, "debug category for sctpenc element");
gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
gst_static_pad_template_get (&src_template));
gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
gst_static_pad_template_get (&sink_template));
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_sctp_enc_finalize);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_get_property);
element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_enc_change_state);
element_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_sctp_enc_request_new_pad);
element_class->release_pad = GST_DEBUG_FUNCPTR (gst_sctp_enc_release_pad);
properties[PROP_GST_SCTP_ASSOCIATION_ID] =
g_param_spec_uint ("sctp-association-id",
"SCTP Association ID",
"Every encoder/decoder pair should have the same, unique, sctp-association-id. "
"This value must be set before any pads are requested.",
0, G_MAXUINT, DEFAULT_GST_SCTP_ASSOCIATION_ID,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
properties[PROP_REMOTE_SCTP_PORT] =
g_param_spec_uint ("remote-sctp-port",
"Remote SCTP port",
"Sctp remote sctp port for the sctp association. The local port is configured via the "
"GstSctpDec element.",
0, G_MAXUSHORT, DEFAULT_REMOTE_SCTP_PORT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
properties[PROP_USE_SOCK_STREAM] =
g_param_spec_boolean ("use-sock-stream",
"Use sock-stream",
"When set to TRUE, a sequenced, reliable, connection-based connection is used."
"When TRUE the partial reliability parameters of the channel are ignored.",
DEFAULT_USE_SOCK_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED] =
g_signal_new ("sctp-association-established",
G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstSctpEncClass, on_sctp_association_is_established),
NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
signals[SIGNAL_GET_STREAM_BYTES_SENT] = g_signal_new ("bytes-sent",
G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstSctpEncClass, on_get_stream_bytes_sent), NULL, NULL,
NULL, G_TYPE_UINT64, 1, G_TYPE_UINT);
klass->on_get_stream_bytes_sent =
GST_DEBUG_FUNCPTR (on_get_stream_bytes_sent);
gst_element_class_set_static_metadata (element_class,
"SCTP Encoder",
"Encoder/Network/SCTP",
"Encodes packets with SCTP",
"George Kiagiadakis <george.kiagiadakis@collabora.com>");
}
static gboolean
data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
guint64 time, gpointer user_data)
{
/* TODO: When are we considered full? */
return FALSE;
}
static void
data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
{
}
static void
data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
{
}
static void
gst_sctp_enc_init (GstSctpEnc * self)
{
self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
self->remote_sctp_port = DEFAULT_REMOTE_SCTP_PORT;
self->sctp_association = NULL;
self->outbound_sctp_packet_queue =
gst_data_queue_new (data_queue_check_full_cb, data_queue_full_cb,
data_queue_empty_cb, NULL);
self->src_pad = gst_pad_new_from_static_template (&src_template, "src");
gst_pad_set_event_function (self->src_pad,
GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_enc_src_event));
gst_element_add_pad (GST_ELEMENT (self), self->src_pad);
g_queue_init (&self->pending_pads);
self->src_ret = GST_FLOW_FLUSHING;
}
static void
gst_sctp_enc_finalize (GObject * object)
{
GstSctpEnc *self = GST_SCTP_ENC (object);
g_queue_clear (&self->pending_pads);
gst_object_unref (self->outbound_sctp_packet_queue);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_sctp_enc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstSctpEnc *self = GST_SCTP_ENC (object);
switch (prop_id) {
case PROP_GST_SCTP_ASSOCIATION_ID:
self->sctp_association_id = g_value_get_uint (value);
break;
case PROP_REMOTE_SCTP_PORT:
self->remote_sctp_port = g_value_get_uint (value);
break;
case PROP_USE_SOCK_STREAM:
self->use_sock_stream = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
break;
}
}
static void
gst_sctp_enc_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstSctpEnc *self = GST_SCTP_ENC (object);
switch (prop_id) {
case PROP_GST_SCTP_ASSOCIATION_ID:
g_value_set_uint (value, self->sctp_association_id);
break;
case PROP_REMOTE_SCTP_PORT:
g_value_set_uint (value, self->remote_sctp_port);
break;
case PROP_USE_SOCK_STREAM:
g_value_set_boolean (value, self->use_sock_stream);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
break;
}
}
static void
flush_sinkpad (const GValue * item, gpointer user_data)
{
GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
gboolean flush = GPOINTER_TO_INT (user_data);
if (flush) {
g_mutex_lock (&sctpenc_pad->lock);
sctpenc_pad->flushing = TRUE;
g_cond_signal (&sctpenc_pad->cond);
g_mutex_unlock (&sctpenc_pad->lock);
} else {
sctpenc_pad->flushing = FALSE;
}
}
static void
flush_sinkpads (GstSctpEnc * self, gboolean state)
{
GstIterator *it;
it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
while (gst_iterator_foreach (it, flush_sinkpad,
GINT_TO_POINTER (state)) == GST_ITERATOR_RESYNC)
gst_iterator_resync (it);
gst_iterator_free (it);
}
static GstStateChangeReturn
gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
{
GstSctpEnc *self = GST_SCTP_ENC (element);
GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
gboolean res = TRUE;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
self->need_segment = self->need_stream_start_caps = TRUE;
self->src_ret = GST_FLOW_OK;
gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
res = configure_association (self);
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
stop_srcpad_task (self->src_pad, self);
flush_sinkpads (self, TRUE);
self->src_ret = GST_FLOW_FLUSHING;
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
default:
break;
}
if (res)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
gst_pad_start_task (self->src_pad,
(GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
sctpenc_cleanup (self);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
default:
break;
}
return ret;
}
static GstPad *
gst_sctp_enc_request_new_pad (GstElement * element, GstPadTemplate * template,
const gchar * new_pad_name, const GstCaps * caps)
{
GstSctpEnc *self = GST_SCTP_ENC (element);
GstPad *new_pad = NULL;
GstSctpEncPad *sctpenc_pad;
guint32 stream_id;
gint state;
guint32 new_ppid;
gboolean is_new_ppid;
g_object_get (self->sctp_association, "state", &state, NULL);
if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
GST_ERROR_OBJECT
(self,
"The SCTP association must be established before a new stream can be created");
goto invalid_state;
}
if (!template)
goto invalid_parameter;
/* 65535 is not a valid stream id */
if (!new_pad_name || (sscanf (new_pad_name, "sink_%u", &stream_id) != 1)
|| stream_id > 65534) {
GST_ERROR_OBJECT
(self, "Invalid sink pad name %s", GST_STR_NULL (new_pad_name));
goto invalid_parameter;
}
new_pad = gst_element_get_static_pad (element, new_pad_name);
if (new_pad) {
gst_object_unref (new_pad);
new_pad = NULL;
GST_ERROR_OBJECT (self, "Pad %s already exists", new_pad_name);
goto invalid_parameter;
}
GST_DEBUG_OBJECT (self, "Creating new pad %s", new_pad_name);
new_pad =
g_object_new (GST_TYPE_SCTP_ENC_PAD, "name", new_pad_name, "direction",
template->direction, "template", template, NULL);
gst_pad_set_chain_function (new_pad,
GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_chain));
gst_pad_set_event_function (new_pad,
GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_event));
sctpenc_pad = GST_SCTP_ENC_PAD (new_pad);
sctpenc_pad->stream_id = stream_id;
sctpenc_pad->ppid = DEFAULT_SCTP_PPID;
if (caps) {
GST_DEBUG_OBJECT (self, "Pad %s requested with caps %" GST_PTR_FORMAT,
new_pad_name, caps);
get_config_from_caps (caps, &sctpenc_pad->ordered,
&sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
&is_new_ppid);
if (is_new_ppid)
sctpenc_pad->ppid = new_ppid;
}
sctpenc_pad->flushing = FALSE;
if (!gst_pad_set_active (new_pad, TRUE))
goto error_cleanup;
if (!gst_element_add_pad (element, new_pad))
goto error_add_pad;
invalid_state:
invalid_parameter:
return new_pad;
error_add_pad:
gst_pad_set_active (new_pad, FALSE);
error_cleanup:
gst_object_unref (new_pad);
return NULL;
}
static void
gst_sctp_enc_release_pad (GstElement * element, GstPad * pad)
{
GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
GstSctpEnc *self;
guint stream_id = 0;
self = GST_SCTP_ENC (element);
g_mutex_lock (&sctpenc_pad->lock);
sctpenc_pad->flushing = TRUE;
g_cond_signal (&sctpenc_pad->cond);
g_mutex_unlock (&sctpenc_pad->lock);
stream_id = sctpenc_pad->stream_id;
gst_pad_set_active (pad, FALSE);
if (self->sctp_association)
gst_sctp_association_reset_stream (self->sctp_association, stream_id);
GST_PAD_STREAM_LOCK (pad);
if (gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (element)))
gst_element_remove_pad (element, pad);
GST_PAD_STREAM_UNLOCK (pad);
}
static void
gst_sctp_enc_srcpad_loop (GstPad * pad)
{
GstSctpEnc *self = GST_SCTP_ENC (GST_PAD_PARENT (pad));
GstFlowReturn flow_ret;
GstDataQueueItem *item;
if (self->need_stream_start_caps) {
gchar s_id[32];
GstCaps *caps;
g_snprintf (s_id, sizeof (s_id), "sctpenc-%08x", g_random_int ());
gst_pad_push_event (self->src_pad, gst_event_new_stream_start (s_id));
caps = gst_caps_new_empty_simple ("application/x-sctp");
gst_pad_set_caps (self->src_pad, caps);
gst_caps_unref (caps);
self->need_stream_start_caps = FALSE;
}
if (self->need_segment) {
GstSegment segment;
gst_segment_init (&segment, GST_FORMAT_BYTES);
gst_pad_push_event (self->src_pad, gst_event_new_segment (&segment));
self->need_segment = FALSE;
}
if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) {
GstBuffer *buffer = GST_BUFFER (item->object);
GST_DEBUG_OBJECT (self, "Forwarding buffer %" GST_PTR_FORMAT, buffer);
flow_ret = gst_pad_push (self->src_pad, buffer);
item->object = NULL;
GST_OBJECT_LOCK (self);
self->src_ret = flow_ret;
GST_OBJECT_UNLOCK (self);
if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
|| flow_ret == GST_FLOW_NOT_LINKED)) {
GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
gst_flow_get_name (flow_ret));
} else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
gst_flow_get_name (flow_ret));
}
if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
gst_data_queue_flush (self->outbound_sctp_packet_queue);
gst_pad_pause_task (pad);
}
item->destroy (item);
} else {
GST_OBJECT_LOCK (self);
self->src_ret = GST_FLOW_FLUSHING;
GST_OBJECT_UNLOCK (self);
GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
gst_pad_pause_task (pad);
}
}
static GstFlowReturn
gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstSctpEnc *self = GST_SCTP_ENC (parent);
GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
GstSctpEncPad *sctpenc_pad_next = NULL;
GstMapInfo map;
guint32 ppid;
gboolean ordered;
GstSctpAssociationPartialReliability pr;
guint32 pr_param;
gpointer state = NULL;
GstMeta *meta;
const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
GstFlowReturn flow_ret = GST_FLOW_ERROR;
const guint8 *data;
guint32 length;
gboolean clear_to_send;
GST_OBJECT_LOCK (self);
if (self->src_ret != GST_FLOW_OK) {
GST_ERROR_OBJECT (pad, "Pushing on source pad failed before: %s",
gst_flow_get_name (self->src_ret));
flow_ret = self->src_ret;
GST_OBJECT_UNLOCK (self);
gst_buffer_unref (buffer);
return flow_ret;
}
GST_OBJECT_UNLOCK (self);
ppid = sctpenc_pad->ppid;
ordered = sctpenc_pad->ordered;
pr = sctpenc_pad->reliability;
pr_param = sctpenc_pad->reliability_param;
while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
if (meta->info->api == meta_info->api) {
GstSctpSendMeta *sctp_send_meta = (GstSctpSendMeta *) meta;
ppid = sctp_send_meta->ppid;
ordered = sctp_send_meta->ordered;
pr_param = sctp_send_meta->pr_param;
switch (sctp_send_meta->pr) {
case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE:
pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
break;
case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX:
pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
break;
case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_BUF:
pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
break;
case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL:
pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
break;
}
break;
}
}
GST_DEBUG_OBJECT (pad,
"Sending buffer %" GST_PTR_FORMAT
" with ppid %u ordered %d pr %d pr_param %u", buffer, ppid, ordered, pr,
pr_param);
if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
GST_ERROR_OBJECT (pad, "Could not map GstBuffer");
goto error;
}
data = map.data;
length = map.size;
GST_OBJECT_LOCK (self);
clear_to_send = g_queue_is_empty (&self->pending_pads);
g_queue_push_tail (&self->pending_pads, sctpenc_pad);
GST_OBJECT_UNLOCK (self);
g_mutex_lock (&sctpenc_pad->lock);
if (clear_to_send) {
sctpenc_pad->clear_to_send = TRUE;
}
while (!sctpenc_pad->flushing && !sctpenc_pad->clear_to_send) {
g_cond_wait (&sctpenc_pad->cond, &sctpenc_pad->lock);
}
while (!sctpenc_pad->flushing) {
guint32 bytes_sent;
g_mutex_unlock (&sctpenc_pad->lock);
flow_ret =
gst_sctp_association_send_data (self->sctp_association, data,
length, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param,
&bytes_sent);
g_mutex_lock (&sctpenc_pad->lock);
if (flow_ret != GST_FLOW_OK) {
if (flow_ret != GST_FLOW_EOS) {
GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL),
("Failed to send data"));
}
goto out;
} else if (bytes_sent < length && !sctpenc_pad->flushing) {
gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME;
GST_TRACE_OBJECT (pad, "Sent only %u of %u remaining bytes, waiting",
bytes_sent, length);
sctpenc_pad->bytes_sent += bytes_sent;
data += bytes_sent;
length -= bytes_sent;
/* The buffer was probably full. Retry in a while */
g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);
} else if (bytes_sent == length) {
GST_DEBUG_OBJECT (pad, "Successfully sent buffer");
sctpenc_pad->bytes_sent += bytes_sent;
break;
}
}
flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
out:
sctpenc_pad->clear_to_send = FALSE;
g_mutex_unlock (&sctpenc_pad->lock);
GST_OBJECT_LOCK (self);
g_queue_remove (&self->pending_pads, sctpenc_pad);
sctpenc_pad_next = g_queue_peek_head (&self->pending_pads);
GST_OBJECT_UNLOCK (self);
if (sctpenc_pad_next) {
g_mutex_lock (&sctpenc_pad_next->lock);
sctpenc_pad_next->clear_to_send = TRUE;
g_cond_signal (&sctpenc_pad_next->cond);
g_mutex_unlock (&sctpenc_pad_next->lock);
}
gst_buffer_unmap (buffer, &map);
error:
gst_buffer_unref (buffer);
return flow_ret;
}
static gboolean
gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstSctpEnc *self = GST_SCTP_ENC (parent);
GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
gboolean ret, is_new_ppid;
guint32 new_ppid;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:{
GstCaps *caps;
gst_event_parse_caps (event, &caps);
GST_DEBUG_OBJECT (pad, "Received new caps %" GST_PTR_FORMAT, caps);
get_config_from_caps (caps, &sctpenc_pad->ordered,
&sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
&is_new_ppid);
if (is_new_ppid)
sctpenc_pad->ppid = new_ppid;
gst_event_unref (event);
ret = TRUE;
break;
}
case GST_EVENT_STREAM_START:
case GST_EVENT_SEGMENT:
/* Drop these, we create our own */
ret = TRUE;
gst_event_unref (event);
break;
case GST_EVENT_EOS:
/* Drop this, we're never EOS until shut down */
ret = TRUE;
gst_event_unref (event);
break;
case GST_EVENT_FLUSH_START:
g_mutex_lock (&sctpenc_pad->lock);
sctpenc_pad->flushing = TRUE;
g_cond_signal (&sctpenc_pad->cond);
g_mutex_unlock (&sctpenc_pad->lock);
ret = gst_pad_event_default (pad, parent, event);
break;
case GST_EVENT_FLUSH_STOP:
sctpenc_pad->flushing = FALSE;
GST_OBJECT_LOCK (self);
self->src_ret = GST_FLOW_OK;
GST_OBJECT_UNLOCK (self);
ret = gst_pad_event_default (pad, parent, event);
break;
default:
ret = gst_pad_event_default (pad, parent, event);
break;
}
return ret;
}
static gboolean
gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstSctpEnc *self = GST_SCTP_ENC (parent);
gboolean ret;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:{
gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
gst_data_queue_flush (self->outbound_sctp_packet_queue);
flush_sinkpads (self, TRUE);
ret = gst_pad_event_default (pad, parent, event);
break;
}
case GST_EVENT_RECONFIGURE:
case GST_EVENT_FLUSH_STOP:{
flush_sinkpads (self, FALSE);
gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
self->need_segment = TRUE;
GST_OBJECT_LOCK (self);
self->src_ret = GST_FLOW_OK;
GST_OBJECT_UNLOCK (self);
gst_pad_start_task (self->src_pad,
(GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
ret = gst_pad_event_default (pad, parent, event);
break;
}
default:
ret = gst_pad_event_default (pad, parent, event);
break;
}
return ret;
}
static gboolean
configure_association (GstSctpEnc * self)
{
gint state;
self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
g_object_get (self->sctp_association, "state", &state, NULL);
if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
GST_WARNING_OBJECT (self,
"Could not configure SCTP association. Association already in use!");
g_object_unref (self->sctp_association);
self->sctp_association = NULL;
goto error;
}
self->signal_handler_state_changed =
g_signal_connect_object (self->sctp_association, "notify::state",
G_CALLBACK (on_sctp_association_state_changed), self, 0);
g_object_bind_property (self, "remote-sctp-port", self->sctp_association,
"remote-port", G_BINDING_SYNC_CREATE);
g_object_bind_property (self, "use-sock-stream", self->sctp_association,
"use-sock-stream", G_BINDING_SYNC_CREATE);
gst_sctp_association_set_on_packet_out (self->sctp_association,
on_sctp_packet_out, gst_object_ref (self), gst_object_unref);
return TRUE;
error:
return FALSE;
}
static void
on_sctp_association_state_changed (GstSctpAssociation * sctp_association,
GParamSpec * pspec, GstSctpEnc * self)
{
gint state;
g_object_get (sctp_association, "state", &state, NULL);
GST_DEBUG_OBJECT (self, "Association state changed to %d", state);
switch (state) {
case GST_SCTP_ASSOCIATION_STATE_NEW:
break;
case GST_SCTP_ASSOCIATION_STATE_READY:
gst_sctp_association_start (sctp_association);
break;
case GST_SCTP_ASSOCIATION_STATE_CONNECTING:
break;
case GST_SCTP_ASSOCIATION_STATE_CONNECTED:
g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
TRUE);
break;
case GST_SCTP_ASSOCIATION_STATE_DISCONNECTING:
case GST_SCTP_ASSOCIATION_STATE_DISCONNECTED:
g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
FALSE);
break;
case GST_SCTP_ASSOCIATION_STATE_ERROR:
GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL),
("SCTP association went into error state"));
break;
}
}
static void
data_queue_item_free (GstDataQueueItem * item)
{
if (item->object)
gst_mini_object_unref (item->object);
g_free (item);
}
static void
on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
gsize length, gpointer user_data)
{
GstSctpEnc *self = user_data;
GstBuffer *gstbuf;
GstDataQueueItem *item;
GstSctpEncPad *sctpenc_pad;
GST_DEBUG_OBJECT (self, "Received output packet of size %" G_GSIZE_FORMAT,
length);
gstbuf = gst_buffer_new_memdup (buf, length);
item = g_new0 (GstDataQueueItem, 1);
item->object = GST_MINI_OBJECT (gstbuf);
item->size = length;
item->visible = TRUE;
item->destroy = (GDestroyNotify) data_queue_item_free;
if (!gst_data_queue_push (self->outbound_sctp_packet_queue, item)) {
item->destroy (item);
GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
}
/* Wake up the oldest pad which is the one that needs to finish first */
GST_OBJECT_LOCK (self);
sctpenc_pad = g_queue_peek_head (&self->pending_pads);
if (sctpenc_pad) {
gst_object_ref (sctpenc_pad);
GST_OBJECT_UNLOCK (self);
g_mutex_lock (&sctpenc_pad->lock);
g_cond_signal (&sctpenc_pad->cond);
g_mutex_unlock (&sctpenc_pad->lock);
gst_object_unref (sctpenc_pad);
} else {
GST_OBJECT_UNLOCK (self);
}
}
static void
stop_srcpad_task (GstPad * pad, GstSctpEnc * self)
{
gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
gst_data_queue_flush (self->outbound_sctp_packet_queue);
gst_pad_stop_task (pad);
}
static void
remove_sinkpad (const GValue * item, gpointer user_data)
{
GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
GstSctpEnc *self = user_data;
gst_sctp_enc_release_pad (GST_ELEMENT (self), GST_PAD (sctpenc_pad));
}
static void
sctpenc_cleanup (GstSctpEnc * self)
{
GstIterator *it;
gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL,
NULL);
g_signal_handler_disconnect (self->sctp_association,
self->signal_handler_state_changed);
gst_sctp_association_force_close (self->sctp_association);
g_object_unref (self->sctp_association);
self->sctp_association = NULL;
it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
while (gst_iterator_foreach (it, remove_sinkpad, self) == GST_ITERATOR_RESYNC)
gst_iterator_resync (it);
gst_iterator_free (it);
g_queue_clear (&self->pending_pads);
}
static void
get_config_from_caps (const GstCaps * caps, gboolean * ordered,
GstSctpAssociationPartialReliability * reliability,
guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available)
{
GstStructure *s;
guint i, n;
*ordered = TRUE;
*reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
*reliability_param = 0;
*ppid_available = FALSE;
n = gst_caps_get_size (caps);
for (i = 0; i < n; i++) {
s = gst_caps_get_structure (caps, i);
if (gst_structure_has_field (s, "ordered")) {
const GValue *v = gst_structure_get_value (s, "ordered");
*ordered = g_value_get_boolean (v);
}
if (gst_structure_has_field (s, "partially-reliability")) {
const GValue *v = gst_structure_get_value (s, "partially-reliability");
const gchar *reliability_string = g_value_get_string (v);
if (!g_strcmp0 (reliability_string, "none"))
*reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
else if (!g_strcmp0 (reliability_string, "ttl"))
*reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
else if (!g_strcmp0 (reliability_string, "buf"))
*reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
else if (!g_strcmp0 (reliability_string, "rtx"))
*reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
}
if (gst_structure_has_field (s, "reliability-parameter")) {
const GValue *v = gst_structure_get_value (s, "reliability-parameter");
*reliability_param = g_value_get_uint (v);
}
if (gst_structure_has_field (s, "ppid")) {
const GValue *v = gst_structure_get_value (s, "ppid");
*ppid = g_value_get_uint (v);
*ppid_available = TRUE;
}
}
}
static guint64
on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id)
{
gchar *pad_name;
GstPad *pad;
GstSctpEncPad *sctpenc_pad;
guint64 bytes_sent;
pad_name = g_strdup_printf ("sink_%u", stream_id);
pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
g_free (pad_name);
if (!pad) {
GST_DEBUG_OBJECT (self,
"Buffered amount requested on a stream that does not exist!");
return 0;
}
sctpenc_pad = GST_SCTP_ENC_PAD (pad);
g_mutex_lock (&sctpenc_pad->lock);
bytes_sent = sctpenc_pad->bytes_sent;
g_mutex_unlock (&sctpenc_pad->lock);
gst_object_unref (sctpenc_pad);
return bytes_sent;
}