From 5ca39884207042ef542835847486101fc2249612 Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Thu, 10 Nov 2022 14:31:43 +1100 Subject: [PATCH] webrtc/datachannel: handle error messages from appsrc/sink Fixes a possible race where closing a data channel may produce e.g. not-linked errors. Part-of: --- .../gst-plugins-bad/ext/webrtc/gstwebrtcbin.c | 18 +- .../ext/webrtc/webrtcdatachannel.c | 215 ++++++++++++------ .../ext/webrtc/webrtcdatachannel.h | 4 + 3 files changed, 164 insertions(+), 73 deletions(-) diff --git a/subprojects/gst-plugins-bad/ext/webrtc/gstwebrtcbin.c b/subprojects/gst-plugins-bad/ext/webrtc/gstwebrtcbin.c index dd16885238..5d59ee6959 100644 --- a/subprojects/gst-plugins-bad/ext/webrtc/gstwebrtcbin.c +++ b/subprojects/gst-plugins-bad/ext/webrtc/gstwebrtcbin.c @@ -2407,11 +2407,11 @@ _on_sctpdec_pad_added (GstElement * sctpdec, GstPad * pad, g_signal_emit (webrtc, gst_webrtc_bin_signals[PREPARE_DATA_CHANNEL_SIGNAL], 0, channel, FALSE); - gst_bin_add (GST_BIN (webrtc), channel->appsrc); - gst_bin_add (GST_BIN (webrtc), channel->appsink); + gst_bin_add (GST_BIN (webrtc), channel->src_bin); + gst_bin_add (GST_BIN (webrtc), channel->sink_bin); - gst_element_sync_state_with_parent (channel->appsrc); - gst_element_sync_state_with_parent (channel->appsink); + gst_element_sync_state_with_parent (channel->src_bin); + gst_element_sync_state_with_parent (channel->sink_bin); webrtc_data_channel_link_to_sctp (channel, webrtc->priv->sctp_transport); @@ -2422,7 +2422,7 @@ _on_sctpdec_pad_added (GstElement * sctpdec, GstPad * pad, g_signal_connect (channel, "notify::ready-state", G_CALLBACK (_on_data_channel_ready_state), webrtc); - sink_pad = gst_element_get_static_pad (channel->appsink, "sink"); + sink_pad = gst_element_get_static_pad (channel->sink_bin, "sink"); if (gst_pad_link (pad, sink_pad) != GST_PAD_LINK_OK) GST_WARNING_OBJECT (channel, "Failed to link sctp pad %s with channel %" GST_PTR_FORMAT, GST_PAD_NAME (pad), channel); @@ -6968,11 +6968,11 @@ gst_webrtc_bin_create_data_channel (GstWebRTCBin * webrtc, const gchar * label, g_signal_emit (webrtc, gst_webrtc_bin_signals[PREPARE_DATA_CHANNEL_SIGNAL], 0, ret, TRUE); - gst_bin_add (GST_BIN (webrtc), ret->appsrc); - gst_bin_add (GST_BIN (webrtc), ret->appsink); + gst_bin_add (GST_BIN (webrtc), ret->src_bin); + gst_bin_add (GST_BIN (webrtc), ret->sink_bin); - gst_element_sync_state_with_parent (ret->appsrc); - gst_element_sync_state_with_parent (ret->appsink); + gst_element_sync_state_with_parent (ret->src_bin); + gst_element_sync_state_with_parent (ret->sink_bin); ret = gst_object_ref (ret); ret->webrtcbin = webrtc; diff --git a/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.c b/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.c index bb2b023618..0260c61721 100644 --- a/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.c +++ b/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.c @@ -44,6 +44,141 @@ #define GST_CAT_DEFAULT webrtc_data_channel_debug GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT); +static void _close_procedure (WebRTCDataChannel * channel, gpointer user_data); + +typedef void (*ChannelTask) (GstWebRTCDataChannel * channel, + gpointer user_data); + +struct task +{ + GstWebRTCDataChannel *channel; + ChannelTask func; + gpointer user_data; + GDestroyNotify notify; +}; + +static GstStructure * +_execute_task (GstWebRTCBin * webrtc, struct task *task) +{ + if (task->func) + task->func (task->channel, task->user_data); + + return NULL; +} + +static void +_free_task (struct task *task) +{ + gst_object_unref (task->channel); + + if (task->notify) + task->notify (task->user_data); + g_free (task); +} + +static void +_channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func, + gpointer user_data, GDestroyNotify notify) +{ + struct task *task = g_new0 (struct task, 1); + + task->channel = gst_object_ref (channel); + task->func = func; + task->user_data = user_data; + task->notify = notify; + + gst_webrtc_bin_enqueue_task (channel->webrtcbin, + (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task, + NULL); +} + +static void +_channel_store_error (WebRTCDataChannel * channel, GError * error) +{ + GST_WEBRTC_DATA_CHANNEL_LOCK (channel); + if (error) { + GST_WARNING_OBJECT (channel, "Error: %s", + error ? error->message : "Unknown"); + if (!channel->stored_error) + channel->stored_error = error; + else + g_clear_error (&error); + } + GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel); +} + +struct _WebRTCErrorIgnoreBin +{ + GstBin bin; + + WebRTCDataChannel *data_channel; +}; + +G_DEFINE_TYPE (WebRTCErrorIgnoreBin, webrtc_error_ignore_bin, GST_TYPE_BIN); + +static void +webrtc_error_ignore_bin_handle_message (GstBin * bin, GstMessage * message) +{ + WebRTCErrorIgnoreBin *self = WEBRTC_ERROR_IGNORE_BIN (bin); + + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_ERROR:{ + GError *error = NULL; + gst_message_parse_error (message, &error, NULL); + GST_DEBUG_OBJECT (bin, "handling error message from internal element"); + _channel_store_error (self->data_channel, error); + _channel_enqueue_task (self->data_channel, (ChannelTask) _close_procedure, + NULL, NULL); + break; + } + default: + GST_BIN_CLASS (webrtc_error_ignore_bin_parent_class)->handle_message (bin, + message); + break; + } +} + +static void +webrtc_error_ignore_bin_class_init (WebRTCErrorIgnoreBinClass * klass) +{ + GstBinClass *bin_class = (GstBinClass *) klass; + + bin_class->handle_message = webrtc_error_ignore_bin_handle_message; +} + +static void +webrtc_error_ignore_bin_init (WebRTCErrorIgnoreBin * bin) +{ +} + +static GstElement * +webrtc_error_ignore_bin_new (WebRTCDataChannel * data_channel, + GstElement * other) +{ + WebRTCErrorIgnoreBin *self; + GstPad *pad; + + self = g_object_new (webrtc_error_ignore_bin_get_type (), NULL); + self->data_channel = data_channel; + + gst_bin_add (GST_BIN (self), other); + + pad = gst_element_get_static_pad (other, "src"); + if (pad) { + GstPad *ghost_pad = gst_ghost_pad_new ("src", pad); + gst_element_add_pad (GST_ELEMENT (self), ghost_pad); + gst_clear_object (&pad); + } + pad = gst_element_get_static_pad (other, "sink"); + if (pad) { + GstPad *ghost_pad = gst_ghost_pad_new ("sink", pad); + gst_element_add_pad (GST_ELEMENT (self), ghost_pad); + gst_clear_object (&pad); + } + + return (GstElement *) self; +} + #define webrtc_data_channel_parent_class parent_class G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel, GST_TYPE_WEBRTC_DATA_CHANNEL, @@ -213,67 +348,6 @@ construct_ack_packet (WebRTCDataChannel * channel) return buf; } -typedef void (*ChannelTask) (GstWebRTCDataChannel * channel, - gpointer user_data); - -struct task -{ - GstWebRTCDataChannel *channel; - ChannelTask func; - gpointer user_data; - GDestroyNotify notify; -}; - -static GstStructure * -_execute_task (GstWebRTCBin * webrtc, struct task *task) -{ - if (task->func) - task->func (task->channel, task->user_data); - - return NULL; -} - -static void -_free_task (struct task *task) -{ - gst_object_unref (task->channel); - - if (task->notify) - task->notify (task->user_data); - g_free (task); -} - -static void -_channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func, - gpointer user_data, GDestroyNotify notify) -{ - struct task *task = g_new0 (struct task, 1); - - task->channel = gst_object_ref (channel); - task->func = func; - task->user_data = user_data; - task->notify = notify; - - gst_webrtc_bin_enqueue_task (channel->webrtcbin, - (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task, - NULL); -} - -static void -_channel_store_error (WebRTCDataChannel * channel, GError * error) -{ - GST_WEBRTC_DATA_CHANNEL_LOCK (channel); - if (error) { - GST_WARNING_OBJECT (channel, "Error: %s", - error ? error->message : "Unknown"); - if (!channel->stored_error) - channel->stored_error = error; - else - g_clear_error (&error); - } - GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel); -} - static void _emit_on_open (WebRTCDataChannel * channel, gpointer user_data) { @@ -290,6 +364,10 @@ _transport_closed (WebRTCDataChannel * channel) error = channel->stored_error; channel->stored_error = NULL; + GST_TRACE_OBJECT (channel, "transport closed, peer closed %u error %p " + "buffered %" G_GUINT64_FORMAT, channel->peer_closed, error, + channel->parent.buffered_amount); + both_sides_closed = channel->peer_closed && channel->parent.buffered_amount <= 0; if (both_sides_closed || error) { @@ -314,7 +392,7 @@ _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data) GST_INFO_OBJECT (channel, "Closing outgoing SCTP stream %i label \"%s\"", channel->parent.id, channel->parent.label); - pad = gst_element_get_static_pad (channel->appsrc, "src"); + pad = gst_element_get_static_pad (channel->src_bin, "src"); peer = gst_pad_get_peer (pad); gst_object_unref (pad); @@ -322,6 +400,7 @@ _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data) GstElement *sctpenc = gst_pad_get_parent_element (peer); if (sctpenc) { + GST_TRACE_OBJECT (channel, "removing sctpenc pad %" GST_PTR_FORMAT, peer); gst_element_release_request_pad (sctpenc, peer); gst_object_unref (sctpenc); } @@ -484,6 +563,8 @@ _parse_control_packet (WebRTCDataChannel * channel, guint8 * data, if (ret != GST_FLOW_OK) { g_set_error (error, GST_WEBRTC_ERROR, GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Could not send ack packet"); + GST_WARNING_OBJECT (channel, "push returned %i, %s", ret, + gst_flow_get_name (ret)); return ret; } @@ -800,6 +881,8 @@ webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel, } else { g_set_error (error, GST_WEBRTC_ERROR, GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data"); + GST_WARNING_OBJECT (channel, "push returned %i, %s", ret, + gst_flow_get_name (ret)); GST_WEBRTC_DATA_CHANNEL_LOCK (channel); channel->parent.buffered_amount -= size; @@ -1001,6 +1084,8 @@ gst_webrtc_data_channel_constructed (GObject * object) channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH, (GstPadProbeCallback) on_appsrc_data, channel, NULL); + channel->src_bin = webrtc_error_ignore_bin_new (channel, channel->appsrc); + channel->appsink = gst_element_factory_make ("appsink", NULL); gst_object_ref_sink (channel->appsink); g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps, @@ -1008,6 +1093,8 @@ gst_webrtc_data_channel_constructed (GObject * object) gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks, channel, NULL); + channel->sink_bin = webrtc_error_ignore_bin_new (channel, channel->appsink); + gst_object_unref (pad); gst_caps_unref (caps); } @@ -1078,7 +1165,7 @@ _data_channel_set_sctp_transport (WebRTCDataChannel * channel, GST_WEBRTC_DATA_CHANNEL_LOCK (channel); if (channel->sctp_transport) g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel); - GST_TRACE ("%p set sctp %p", channel, sctp); + GST_TRACE_OBJECT (channel, "set sctp %p", sctp); gst_object_replace ((GstObject **) & channel->sctp_transport, GST_OBJECT (sctp)); @@ -1106,7 +1193,7 @@ webrtc_data_channel_link_to_sctp (WebRTCDataChannel * channel, _data_channel_set_sctp_transport (channel, sctp_transport); pad_name = g_strdup_printf ("sink_%u", id); - if (!gst_element_link_pads (channel->appsrc, "src", + if (!gst_element_link_pads (channel->src_bin, "src", channel->sctp_transport->sctpenc, pad_name)) g_warn_if_reached (); g_free (pad_name); diff --git a/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.h b/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.h index a0b38a7ad2..dd65a66ae3 100644 --- a/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.h +++ b/subprojects/gst-plugins-bad/ext/webrtc/webrtcdatachannel.h @@ -46,7 +46,9 @@ struct _WebRTCDataChannel GstWebRTCDataChannel parent; WebRTCSCTPTransport *sctp_transport; + GstElement *src_bin; GstElement *appsrc; + GstElement *sink_bin; GstElement *appsink; GstWebRTCBin *webrtcbin; @@ -70,6 +72,8 @@ G_GNUC_INTERNAL void webrtc_data_channel_link_to_sctp (WebRTCDataChannel *channel, WebRTCSCTPTransport *sctp_transport); +G_DECLARE_FINAL_TYPE (WebRTCErrorIgnoreBin, webrtc_error_ignore_bin, WEBRTC, ERROR_IGNORE_BIN, GstBin); + G_END_DECLS #endif /* __WEBRTC_DATA_CHANNEL_H__ */