diff --git a/subprojects/gst-plugins-bad/ext/sctp/gstsctpenc.c b/subprojects/gst-plugins-bad/ext/sctp/gstsctpenc.c index eb5590451c..3d9406809f 100644 --- a/subprojects/gst-plugins-bad/ext/sctp/gstsctpenc.c +++ b/subprojects/gst-plugins-bad/ext/sctp/gstsctpenc.c @@ -103,6 +103,7 @@ struct _GstSctpEncPad GMutex lock; GCond cond; gboolean flushing; + gboolean clear_to_send; }; G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD); @@ -132,6 +133,7 @@ gst_sctp_enc_pad_init (GstSctpEncPad * self) g_mutex_init (&self->lock); g_cond_init (&self->cond); self->flushing = FALSE; + self->clear_to_send = FALSE; } static void gst_sctp_enc_finalize (GObject * object); @@ -563,6 +565,7 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { GstSctpEnc *self = GST_SCTP_ENC (parent); GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad); + GstSctpEncPad *sctpenc_pad_next = NULL; GstMapInfo map; guint32 ppid; gboolean ordered; @@ -574,6 +577,7 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GstFlowReturn flow_ret = GST_FLOW_ERROR; const guint8 *data; guint32 length; + gboolean clear_to_send; GST_OBJECT_LOCK (self); if (self->src_ret != GST_FLOW_OK) { @@ -629,7 +633,21 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) data = map.data; length = map.size; + GST_OBJECT_LOCK (self); + clear_to_send = g_queue_is_empty (&self->pending_pads); + g_queue_push_tail (&self->pending_pads, sctpenc_pad); + GST_OBJECT_UNLOCK (self); + g_mutex_lock (&sctpenc_pad->lock); + + if (clear_to_send) { + sctpenc_pad->clear_to_send = TRUE; + } + + while (!sctpenc_pad->flushing && !sctpenc_pad->clear_to_send) { + g_cond_wait (&sctpenc_pad->cond, &sctpenc_pad->lock); + } + while (!sctpenc_pad->flushing) { guint32 bytes_sent; @@ -658,15 +676,8 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) length -= bytes_sent; /* The buffer was probably full. Retry in a while */ - GST_OBJECT_LOCK (self); - g_queue_push_tail (&self->pending_pads, sctpenc_pad); - GST_OBJECT_UNLOCK (self); - g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time); - GST_OBJECT_LOCK (self); - g_queue_remove (&self->pending_pads, sctpenc_pad); - GST_OBJECT_UNLOCK (self); } else if (bytes_sent == length) { GST_DEBUG_OBJECT (pad, "Successfully sent buffer"); sctpenc_pad->bytes_sent += bytes_sent; @@ -676,8 +687,21 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK; out: + sctpenc_pad->clear_to_send = FALSE; g_mutex_unlock (&sctpenc_pad->lock); + GST_OBJECT_LOCK (self); + g_queue_remove (&self->pending_pads, sctpenc_pad); + sctpenc_pad_next = g_queue_peek_head (&self->pending_pads); + GST_OBJECT_UNLOCK (self); + + if (sctpenc_pad_next) { + g_mutex_lock (&sctpenc_pad_next->lock); + sctpenc_pad_next->clear_to_send = TRUE; + g_cond_signal (&sctpenc_pad_next->cond); + g_mutex_unlock (&sctpenc_pad_next->lock); + } + gst_buffer_unmap (buffer, &map); error: gst_buffer_unref (buffer); @@ -890,7 +914,6 @@ on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf, GstSctpEnc *self = user_data; GstBuffer *gstbuf; GstDataQueueItem *item; - GList *pending_pads, *l; GstSctpEncPad *sctpenc_pad; GST_DEBUG_OBJECT (self, "Received output packet of size %" G_GSIZE_FORMAT, @@ -909,21 +932,22 @@ on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf, GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing"); } - /* Wake up pads in the order they waited, oldest pad first */ + /* Wake up the oldest pad which is the one that needs to finish first */ GST_OBJECT_LOCK (self); - pending_pads = NULL; - while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) { - pending_pads = g_list_prepend (pending_pads, sctpenc_pad); - } - GST_OBJECT_UNLOCK (self); + sctpenc_pad = g_queue_peek_head (&self->pending_pads); + if (sctpenc_pad) { + gst_object_ref (sctpenc_pad); + + GST_OBJECT_UNLOCK (self); - for (l = pending_pads; l; l = l->next) { - sctpenc_pad = l->data; g_mutex_lock (&sctpenc_pad->lock); g_cond_signal (&sctpenc_pad->cond); g_mutex_unlock (&sctpenc_pad->lock); + + gst_object_unref (sctpenc_pad); + } else { + GST_OBJECT_UNLOCK (self); } - g_list_free (pending_pads); } static void