From 44eea7bd8a8f9a183955d769a9eb3312031b581c Mon Sep 17 00:00:00 2001 From: Johan Sternerup Date: Thu, 10 Dec 2020 16:25:26 +0100 Subject: [PATCH] sctpenc: Prohibit sending of interleaved message parts Apparently we cannot start sending messages from another datachannel before the previous message was completely sent. usrsctplib will complain about being locked on another stream id and set errno=EINVAL. Part-of: --- .../gst-plugins-bad/ext/sctp/gstsctpenc.c | 58 +++++++++++++------ 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/subprojects/gst-plugins-bad/ext/sctp/gstsctpenc.c b/subprojects/gst-plugins-bad/ext/sctp/gstsctpenc.c index eb5590451c..3d9406809f 100644 --- a/subprojects/gst-plugins-bad/ext/sctp/gstsctpenc.c +++ b/subprojects/gst-plugins-bad/ext/sctp/gstsctpenc.c @@ -103,6 +103,7 @@ struct _GstSctpEncPad GMutex lock; GCond cond; gboolean flushing; + gboolean clear_to_send; }; G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD); @@ -132,6 +133,7 @@ gst_sctp_enc_pad_init (GstSctpEncPad * self) g_mutex_init (&self->lock); g_cond_init (&self->cond); self->flushing = FALSE; + self->clear_to_send = FALSE; } static void gst_sctp_enc_finalize (GObject * object); @@ -563,6 +565,7 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { GstSctpEnc *self = GST_SCTP_ENC (parent); GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad); + GstSctpEncPad *sctpenc_pad_next = NULL; GstMapInfo map; guint32 ppid; gboolean ordered; @@ -574,6 +577,7 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GstFlowReturn flow_ret = GST_FLOW_ERROR; const guint8 *data; guint32 length; + gboolean clear_to_send; GST_OBJECT_LOCK (self); if (self->src_ret != GST_FLOW_OK) { @@ -629,7 +633,21 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) data = map.data; length = map.size; + GST_OBJECT_LOCK (self); + clear_to_send = g_queue_is_empty (&self->pending_pads); + g_queue_push_tail (&self->pending_pads, sctpenc_pad); + GST_OBJECT_UNLOCK (self); + g_mutex_lock (&sctpenc_pad->lock); + + if (clear_to_send) { + sctpenc_pad->clear_to_send = TRUE; + } + + while (!sctpenc_pad->flushing && !sctpenc_pad->clear_to_send) { + g_cond_wait (&sctpenc_pad->cond, &sctpenc_pad->lock); + } + while (!sctpenc_pad->flushing) { guint32 bytes_sent; @@ -658,15 +676,8 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) length -= bytes_sent; /* The buffer was probably full. Retry in a while */ - GST_OBJECT_LOCK (self); - g_queue_push_tail (&self->pending_pads, sctpenc_pad); - GST_OBJECT_UNLOCK (self); - g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time); - GST_OBJECT_LOCK (self); - g_queue_remove (&self->pending_pads, sctpenc_pad); - GST_OBJECT_UNLOCK (self); } else if (bytes_sent == length) { GST_DEBUG_OBJECT (pad, "Successfully sent buffer"); sctpenc_pad->bytes_sent += bytes_sent; @@ -676,8 +687,21 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK; out: + sctpenc_pad->clear_to_send = FALSE; g_mutex_unlock (&sctpenc_pad->lock); + GST_OBJECT_LOCK (self); + g_queue_remove (&self->pending_pads, sctpenc_pad); + sctpenc_pad_next = g_queue_peek_head (&self->pending_pads); + GST_OBJECT_UNLOCK (self); + + if (sctpenc_pad_next) { + g_mutex_lock (&sctpenc_pad_next->lock); + sctpenc_pad_next->clear_to_send = TRUE; + g_cond_signal (&sctpenc_pad_next->cond); + g_mutex_unlock (&sctpenc_pad_next->lock); + } + gst_buffer_unmap (buffer, &map); error: gst_buffer_unref (buffer); @@ -890,7 +914,6 @@ on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf, GstSctpEnc *self = user_data; GstBuffer *gstbuf; GstDataQueueItem *item; - GList *pending_pads, *l; GstSctpEncPad *sctpenc_pad; GST_DEBUG_OBJECT (self, "Received output packet of size %" G_GSIZE_FORMAT, @@ -909,21 +932,22 @@ on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf, GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing"); } - /* Wake up pads in the order they waited, oldest pad first */ + /* Wake up the oldest pad which is the one that needs to finish first */ GST_OBJECT_LOCK (self); - pending_pads = NULL; - while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) { - pending_pads = g_list_prepend (pending_pads, sctpenc_pad); - } - GST_OBJECT_UNLOCK (self); + sctpenc_pad = g_queue_peek_head (&self->pending_pads); + if (sctpenc_pad) { + gst_object_ref (sctpenc_pad); + + GST_OBJECT_UNLOCK (self); - for (l = pending_pads; l; l = l->next) { - sctpenc_pad = l->data; g_mutex_lock (&sctpenc_pad->lock); g_cond_signal (&sctpenc_pad->cond); g_mutex_unlock (&sctpenc_pad->lock); + + gst_object_unref (sctpenc_pad); + } else { + GST_OBJECT_UNLOCK (self); } - g_list_free (pending_pads); } static void