webrtc/datachannel: handle error messages from appsrc/sink

Fixes a possible race where closing a data channel may produce e.g.
not-linked errors.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3381>
This commit is contained in:
Matthew Waters 2022-11-10 14:31:43 +11:00 committed by GStreamer Marge Bot
parent a34e380e2e
commit 5ca3988420
3 changed files with 164 additions and 73 deletions

View file

@ -2407,11 +2407,11 @@ _on_sctpdec_pad_added (GstElement * sctpdec, GstPad * pad,
g_signal_emit (webrtc, gst_webrtc_bin_signals[PREPARE_DATA_CHANNEL_SIGNAL],
0, channel, FALSE);
gst_bin_add (GST_BIN (webrtc), channel->appsrc);
gst_bin_add (GST_BIN (webrtc), channel->appsink);
gst_bin_add (GST_BIN (webrtc), channel->src_bin);
gst_bin_add (GST_BIN (webrtc), channel->sink_bin);
gst_element_sync_state_with_parent (channel->appsrc);
gst_element_sync_state_with_parent (channel->appsink);
gst_element_sync_state_with_parent (channel->src_bin);
gst_element_sync_state_with_parent (channel->sink_bin);
webrtc_data_channel_link_to_sctp (channel, webrtc->priv->sctp_transport);
@ -2422,7 +2422,7 @@ _on_sctpdec_pad_added (GstElement * sctpdec, GstPad * pad,
g_signal_connect (channel, "notify::ready-state",
G_CALLBACK (_on_data_channel_ready_state), webrtc);
sink_pad = gst_element_get_static_pad (channel->appsink, "sink");
sink_pad = gst_element_get_static_pad (channel->sink_bin, "sink");
if (gst_pad_link (pad, sink_pad) != GST_PAD_LINK_OK)
GST_WARNING_OBJECT (channel, "Failed to link sctp pad %s with channel %"
GST_PTR_FORMAT, GST_PAD_NAME (pad), channel);
@ -6968,11 +6968,11 @@ gst_webrtc_bin_create_data_channel (GstWebRTCBin * webrtc, const gchar * label,
g_signal_emit (webrtc, gst_webrtc_bin_signals[PREPARE_DATA_CHANNEL_SIGNAL], 0,
ret, TRUE);
gst_bin_add (GST_BIN (webrtc), ret->appsrc);
gst_bin_add (GST_BIN (webrtc), ret->appsink);
gst_bin_add (GST_BIN (webrtc), ret->src_bin);
gst_bin_add (GST_BIN (webrtc), ret->sink_bin);
gst_element_sync_state_with_parent (ret->appsrc);
gst_element_sync_state_with_parent (ret->appsink);
gst_element_sync_state_with_parent (ret->src_bin);
gst_element_sync_state_with_parent (ret->sink_bin);
ret = gst_object_ref (ret);
ret->webrtcbin = webrtc;

View file

@ -44,6 +44,141 @@
#define GST_CAT_DEFAULT webrtc_data_channel_debug
GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
static void _close_procedure (WebRTCDataChannel * channel, gpointer user_data);
typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
gpointer user_data);
struct task
{
GstWebRTCDataChannel *channel;
ChannelTask func;
gpointer user_data;
GDestroyNotify notify;
};
static GstStructure *
_execute_task (GstWebRTCBin * webrtc, struct task *task)
{
if (task->func)
task->func (task->channel, task->user_data);
return NULL;
}
static void
_free_task (struct task *task)
{
gst_object_unref (task->channel);
if (task->notify)
task->notify (task->user_data);
g_free (task);
}
static void
_channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
gpointer user_data, GDestroyNotify notify)
{
struct task *task = g_new0 (struct task, 1);
task->channel = gst_object_ref (channel);
task->func = func;
task->user_data = user_data;
task->notify = notify;
gst_webrtc_bin_enqueue_task (channel->webrtcbin,
(GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
NULL);
}
static void
_channel_store_error (WebRTCDataChannel * channel, GError * error)
{
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
if (error) {
GST_WARNING_OBJECT (channel, "Error: %s",
error ? error->message : "Unknown");
if (!channel->stored_error)
channel->stored_error = error;
else
g_clear_error (&error);
}
GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
}
struct _WebRTCErrorIgnoreBin
{
GstBin bin;
WebRTCDataChannel *data_channel;
};
G_DEFINE_TYPE (WebRTCErrorIgnoreBin, webrtc_error_ignore_bin, GST_TYPE_BIN);
static void
webrtc_error_ignore_bin_handle_message (GstBin * bin, GstMessage * message)
{
WebRTCErrorIgnoreBin *self = WEBRTC_ERROR_IGNORE_BIN (bin);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ERROR:{
GError *error = NULL;
gst_message_parse_error (message, &error, NULL);
GST_DEBUG_OBJECT (bin, "handling error message from internal element");
_channel_store_error (self->data_channel, error);
_channel_enqueue_task (self->data_channel, (ChannelTask) _close_procedure,
NULL, NULL);
break;
}
default:
GST_BIN_CLASS (webrtc_error_ignore_bin_parent_class)->handle_message (bin,
message);
break;
}
}
static void
webrtc_error_ignore_bin_class_init (WebRTCErrorIgnoreBinClass * klass)
{
GstBinClass *bin_class = (GstBinClass *) klass;
bin_class->handle_message = webrtc_error_ignore_bin_handle_message;
}
static void
webrtc_error_ignore_bin_init (WebRTCErrorIgnoreBin * bin)
{
}
static GstElement *
webrtc_error_ignore_bin_new (WebRTCDataChannel * data_channel,
GstElement * other)
{
WebRTCErrorIgnoreBin *self;
GstPad *pad;
self = g_object_new (webrtc_error_ignore_bin_get_type (), NULL);
self->data_channel = data_channel;
gst_bin_add (GST_BIN (self), other);
pad = gst_element_get_static_pad (other, "src");
if (pad) {
GstPad *ghost_pad = gst_ghost_pad_new ("src", pad);
gst_element_add_pad (GST_ELEMENT (self), ghost_pad);
gst_clear_object (&pad);
}
pad = gst_element_get_static_pad (other, "sink");
if (pad) {
GstPad *ghost_pad = gst_ghost_pad_new ("sink", pad);
gst_element_add_pad (GST_ELEMENT (self), ghost_pad);
gst_clear_object (&pad);
}
return (GstElement *) self;
}
#define webrtc_data_channel_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
GST_TYPE_WEBRTC_DATA_CHANNEL,
@ -213,67 +348,6 @@ construct_ack_packet (WebRTCDataChannel * channel)
return buf;
}
typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
gpointer user_data);
struct task
{
GstWebRTCDataChannel *channel;
ChannelTask func;
gpointer user_data;
GDestroyNotify notify;
};
static GstStructure *
_execute_task (GstWebRTCBin * webrtc, struct task *task)
{
if (task->func)
task->func (task->channel, task->user_data);
return NULL;
}
static void
_free_task (struct task *task)
{
gst_object_unref (task->channel);
if (task->notify)
task->notify (task->user_data);
g_free (task);
}
static void
_channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
gpointer user_data, GDestroyNotify notify)
{
struct task *task = g_new0 (struct task, 1);
task->channel = gst_object_ref (channel);
task->func = func;
task->user_data = user_data;
task->notify = notify;
gst_webrtc_bin_enqueue_task (channel->webrtcbin,
(GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
NULL);
}
static void
_channel_store_error (WebRTCDataChannel * channel, GError * error)
{
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
if (error) {
GST_WARNING_OBJECT (channel, "Error: %s",
error ? error->message : "Unknown");
if (!channel->stored_error)
channel->stored_error = error;
else
g_clear_error (&error);
}
GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
}
static void
_emit_on_open (WebRTCDataChannel * channel, gpointer user_data)
{
@ -290,6 +364,10 @@ _transport_closed (WebRTCDataChannel * channel)
error = channel->stored_error;
channel->stored_error = NULL;
GST_TRACE_OBJECT (channel, "transport closed, peer closed %u error %p "
"buffered %" G_GUINT64_FORMAT, channel->peer_closed, error,
channel->parent.buffered_amount);
both_sides_closed =
channel->peer_closed && channel->parent.buffered_amount <= 0;
if (both_sides_closed || error) {
@ -314,7 +392,7 @@ _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
GST_INFO_OBJECT (channel, "Closing outgoing SCTP stream %i label \"%s\"",
channel->parent.id, channel->parent.label);
pad = gst_element_get_static_pad (channel->appsrc, "src");
pad = gst_element_get_static_pad (channel->src_bin, "src");
peer = gst_pad_get_peer (pad);
gst_object_unref (pad);
@ -322,6 +400,7 @@ _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
GstElement *sctpenc = gst_pad_get_parent_element (peer);
if (sctpenc) {
GST_TRACE_OBJECT (channel, "removing sctpenc pad %" GST_PTR_FORMAT, peer);
gst_element_release_request_pad (sctpenc, peer);
gst_object_unref (sctpenc);
}
@ -484,6 +563,8 @@ _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
if (ret != GST_FLOW_OK) {
g_set_error (error, GST_WEBRTC_ERROR,
GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Could not send ack packet");
GST_WARNING_OBJECT (channel, "push returned %i, %s", ret,
gst_flow_get_name (ret));
return ret;
}
@ -800,6 +881,8 @@ webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
} else {
g_set_error (error, GST_WEBRTC_ERROR,
GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
GST_WARNING_OBJECT (channel, "push returned %i, %s", ret,
gst_flow_get_name (ret));
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
channel->parent.buffered_amount -= size;
@ -1001,6 +1084,8 @@ gst_webrtc_data_channel_constructed (GObject * object)
channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
(GstPadProbeCallback) on_appsrc_data, channel, NULL);
channel->src_bin = webrtc_error_ignore_bin_new (channel, channel->appsrc);
channel->appsink = gst_element_factory_make ("appsink", NULL);
gst_object_ref_sink (channel->appsink);
g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
@ -1008,6 +1093,8 @@ gst_webrtc_data_channel_constructed (GObject * object)
gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
channel, NULL);
channel->sink_bin = webrtc_error_ignore_bin_new (channel, channel->appsink);
gst_object_unref (pad);
gst_caps_unref (caps);
}
@ -1078,7 +1165,7 @@ _data_channel_set_sctp_transport (WebRTCDataChannel * channel,
GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
if (channel->sctp_transport)
g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
GST_TRACE ("%p set sctp %p", channel, sctp);
GST_TRACE_OBJECT (channel, "set sctp %p", sctp);
gst_object_replace ((GstObject **) & channel->sctp_transport,
GST_OBJECT (sctp));
@ -1106,7 +1193,7 @@ webrtc_data_channel_link_to_sctp (WebRTCDataChannel * channel,
_data_channel_set_sctp_transport (channel, sctp_transport);
pad_name = g_strdup_printf ("sink_%u", id);
if (!gst_element_link_pads (channel->appsrc, "src",
if (!gst_element_link_pads (channel->src_bin, "src",
channel->sctp_transport->sctpenc, pad_name))
g_warn_if_reached ();
g_free (pad_name);

View file

@ -46,7 +46,9 @@ struct _WebRTCDataChannel
GstWebRTCDataChannel parent;
WebRTCSCTPTransport *sctp_transport;
GstElement *src_bin;
GstElement *appsrc;
GstElement *sink_bin;
GstElement *appsink;
GstWebRTCBin *webrtcbin;
@ -70,6 +72,8 @@ G_GNUC_INTERNAL
void webrtc_data_channel_link_to_sctp (WebRTCDataChannel *channel,
WebRTCSCTPTransport *sctp_transport);
G_DECLARE_FINAL_TYPE (WebRTCErrorIgnoreBin, webrtc_error_ignore_bin, WEBRTC, ERROR_IGNORE_BIN, GstBin);
G_END_DECLS
#endif /* __WEBRTC_DATA_CHANNEL_H__ */