sctpenc: Prohibit sending of interleaved message parts

Apparently we cannot start sending messages from another datachannel
before the previous message was completely sent. usrsctplib will
complain about being locked on another stream id and set
errno=EINVAL.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2454>
This commit is contained in:
Johan Sternerup 2020-12-10 16:25:26 +01:00 committed by GStreamer Marge Bot
parent 82f63b0d64
commit 44eea7bd8a

View file

@ -103,6 +103,7 @@ struct _GstSctpEncPad
GMutex lock; GMutex lock;
GCond cond; GCond cond;
gboolean flushing; gboolean flushing;
gboolean clear_to_send;
}; };
G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD); G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);
@ -132,6 +133,7 @@ gst_sctp_enc_pad_init (GstSctpEncPad * self)
g_mutex_init (&self->lock); g_mutex_init (&self->lock);
g_cond_init (&self->cond); g_cond_init (&self->cond);
self->flushing = FALSE; self->flushing = FALSE;
self->clear_to_send = FALSE;
} }
static void gst_sctp_enc_finalize (GObject * object); static void gst_sctp_enc_finalize (GObject * object);
@ -563,6 +565,7 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{ {
GstSctpEnc *self = GST_SCTP_ENC (parent); GstSctpEnc *self = GST_SCTP_ENC (parent);
GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad); GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
GstSctpEncPad *sctpenc_pad_next = NULL;
GstMapInfo map; GstMapInfo map;
guint32 ppid; guint32 ppid;
gboolean ordered; gboolean ordered;
@ -574,6 +577,7 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GstFlowReturn flow_ret = GST_FLOW_ERROR; GstFlowReturn flow_ret = GST_FLOW_ERROR;
const guint8 *data; const guint8 *data;
guint32 length; guint32 length;
gboolean clear_to_send;
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
if (self->src_ret != GST_FLOW_OK) { if (self->src_ret != GST_FLOW_OK) {
@ -629,7 +633,21 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
data = map.data; data = map.data;
length = map.size; length = map.size;
GST_OBJECT_LOCK (self);
clear_to_send = g_queue_is_empty (&self->pending_pads);
g_queue_push_tail (&self->pending_pads, sctpenc_pad);
GST_OBJECT_UNLOCK (self);
g_mutex_lock (&sctpenc_pad->lock); g_mutex_lock (&sctpenc_pad->lock);
if (clear_to_send) {
sctpenc_pad->clear_to_send = TRUE;
}
while (!sctpenc_pad->flushing && !sctpenc_pad->clear_to_send) {
g_cond_wait (&sctpenc_pad->cond, &sctpenc_pad->lock);
}
while (!sctpenc_pad->flushing) { while (!sctpenc_pad->flushing) {
guint32 bytes_sent; guint32 bytes_sent;
@ -658,15 +676,8 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
length -= bytes_sent; length -= bytes_sent;
/* The buffer was probably full. Retry in a while */ /* The buffer was probably full. Retry in a while */
GST_OBJECT_LOCK (self);
g_queue_push_tail (&self->pending_pads, sctpenc_pad);
GST_OBJECT_UNLOCK (self);
g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time); g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);
GST_OBJECT_LOCK (self);
g_queue_remove (&self->pending_pads, sctpenc_pad);
GST_OBJECT_UNLOCK (self);
} else if (bytes_sent == length) { } else if (bytes_sent == length) {
GST_DEBUG_OBJECT (pad, "Successfully sent buffer"); GST_DEBUG_OBJECT (pad, "Successfully sent buffer");
sctpenc_pad->bytes_sent += bytes_sent; sctpenc_pad->bytes_sent += bytes_sent;
@ -676,8 +687,21 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK; flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
out: out:
sctpenc_pad->clear_to_send = FALSE;
g_mutex_unlock (&sctpenc_pad->lock); g_mutex_unlock (&sctpenc_pad->lock);
GST_OBJECT_LOCK (self);
g_queue_remove (&self->pending_pads, sctpenc_pad);
sctpenc_pad_next = g_queue_peek_head (&self->pending_pads);
GST_OBJECT_UNLOCK (self);
if (sctpenc_pad_next) {
g_mutex_lock (&sctpenc_pad_next->lock);
sctpenc_pad_next->clear_to_send = TRUE;
g_cond_signal (&sctpenc_pad_next->cond);
g_mutex_unlock (&sctpenc_pad_next->lock);
}
gst_buffer_unmap (buffer, &map); gst_buffer_unmap (buffer, &map);
error: error:
gst_buffer_unref (buffer); gst_buffer_unref (buffer);
@ -890,7 +914,6 @@ on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
GstSctpEnc *self = user_data; GstSctpEnc *self = user_data;
GstBuffer *gstbuf; GstBuffer *gstbuf;
GstDataQueueItem *item; GstDataQueueItem *item;
GList *pending_pads, *l;
GstSctpEncPad *sctpenc_pad; GstSctpEncPad *sctpenc_pad;
GST_DEBUG_OBJECT (self, "Received output packet of size %" G_GSIZE_FORMAT, GST_DEBUG_OBJECT (self, "Received output packet of size %" G_GSIZE_FORMAT,
@ -909,21 +932,22 @@ on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing"); GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
} }
/* Wake up pads in the order they waited, oldest pad first */ /* Wake up the oldest pad which is the one that needs to finish first */
GST_OBJECT_LOCK (self); GST_OBJECT_LOCK (self);
pending_pads = NULL; sctpenc_pad = g_queue_peek_head (&self->pending_pads);
while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) { if (sctpenc_pad) {
pending_pads = g_list_prepend (pending_pads, sctpenc_pad); gst_object_ref (sctpenc_pad);
}
GST_OBJECT_UNLOCK (self); GST_OBJECT_UNLOCK (self);
for (l = pending_pads; l; l = l->next) {
sctpenc_pad = l->data;
g_mutex_lock (&sctpenc_pad->lock); g_mutex_lock (&sctpenc_pad->lock);
g_cond_signal (&sctpenc_pad->cond); g_cond_signal (&sctpenc_pad->cond);
g_mutex_unlock (&sctpenc_pad->lock); g_mutex_unlock (&sctpenc_pad->lock);
gst_object_unref (sctpenc_pad);
} else {
GST_OBJECT_UNLOCK (self);
} }
g_list_free (pending_pads);
} }
static void static void