From 07e9374eff7967713ab2dc47b1eb2843d0bfcaa0 Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Mon, 10 Sep 2018 23:52:05 +1000 Subject: [PATCH] webrtcbin: add support for data channels based on SCTP Mostly follows the W3C specification https://www.w3.org/TR/webrtc/#peer-to-peer-data-api With contributions from: Mathieu Duponchelle https://bugzilla.gnome.org/show_bug.cgi?id=794351 --- ext/webrtc/Makefile.am | 8 +- ext/webrtc/fwd.h | 4 + ext/webrtc/gstwebrtcbin.c | 1084 +++++++++++++++++++++---- ext/webrtc/gstwebrtcbin.h | 13 +- ext/webrtc/meson.build | 4 +- ext/webrtc/sctptransport.c | 270 +++++++ ext/webrtc/sctptransport.h | 65 ++ ext/webrtc/transportreceivebin.c | 58 +- ext/webrtc/transportsendbin.c | 26 +- ext/webrtc/webrtcdatachannel.c | 1296 ++++++++++++++++++++++++++++++ ext/webrtc/webrtcdatachannel.h | 83 ++ ext/webrtc/webrtcsdp.c | 35 + ext/webrtc/webrtcsdp.h | 4 + gst-libs/gst/webrtc/webrtc_fwd.h | 53 ++ tests/check/elements/webrtcbin.c | 500 +++++++++++- 15 files changed, 3299 insertions(+), 204 deletions(-) create mode 100644 ext/webrtc/sctptransport.c create mode 100644 ext/webrtc/sctptransport.h create mode 100644 ext/webrtc/webrtcdatachannel.c create mode 100644 ext/webrtc/webrtcdatachannel.h diff --git a/ext/webrtc/Makefile.am b/ext/webrtc/Makefile.am index 5f9a714882..778f112d00 100644 --- a/ext/webrtc/Makefile.am +++ b/ext/webrtc/Makefile.am @@ -7,11 +7,13 @@ noinst_HEADERS = \ gstwebrtcstats.h \ icestream.h \ nicetransport.h \ + sctptransport.h \ transportstream.h \ transportsendbin.h \ transportreceivebin.h \ utils.h \ webrtcsdp.h \ + webrtcdatachannel.h \ webrtctransceiver.h libgstwebrtc_la_SOURCES = \ @@ -21,11 +23,13 @@ libgstwebrtc_la_SOURCES = \ gstwebrtcstats.c \ icestream.c \ nicetransport.c \ + sctptransport.c \ transportstream.c \ transportsendbin.c \ transportreceivebin.c \ utils.c \ webrtcsdp.c \ + webrtcdatachannel.c \ webrtctransceiver.c libgstwebrtc_la_SOURCES += $(BUILT_SOURCES) @@ -40,12 +44,14 @@ libgstwebrtc_la_CFLAGS = \ $(GST_SDP_CFLAGS) \ $(NICE_CFLAGS) libgstwebrtc_la_LIBADD = \ + -lgstapp-@GST_API_VERSION@ \ $(GST_PLUGINS_BASE_LIBS) \ $(GST_BASE_LIBS) \ $(GST_LIBS) \ $(GST_SDP_LIBS) \ $(NICE_LIBS) \ - $(top_builddir)/gst-libs/gst/webrtc/libgstwebrtc-@GST_API_VERSION@.la + $(top_builddir)/gst-libs/gst/webrtc/libgstwebrtc-@GST_API_VERSION@.la \ + $(top_builddir)/gst-libs/gst/sctp/libgstsctp-@GST_API_VERSION@.la libgstwebrtc_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) libgstwebrtc_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS) diff --git a/ext/webrtc/fwd.h b/ext/webrtc/fwd.h index 903145fbf9..aa26ec6dea 100644 --- a/ext/webrtc/fwd.h +++ b/ext/webrtc/fwd.h @@ -41,6 +41,10 @@ typedef struct _GstWebRTCNiceTransport GstWebRTCNiceTransport; typedef struct _GstWebRTCNiceTransportClass GstWebRTCNiceTransportClass; typedef struct _GstWebRTCNiceTransportPrivate GstWebRTCNiceTransportPrivate; +typedef struct _GstWebRTCSCTPTransport GstWebRTCSCTPTransport; +typedef struct _GstWebRTCSCTPTransportClass GstWebRTCSCTPTransportClass; +typedef struct _GstWebRTCSCTPTransportPrivate GstWebRTCSCTPTransportPrivate; + typedef struct _TransportStream TransportStream; typedef struct _TransportStreamClass TransportStreamClass; diff --git a/ext/webrtc/gstwebrtcbin.c b/ext/webrtc/gstwebrtcbin.c index 268432caaf..60614b829c 100644 --- a/ext/webrtc/gstwebrtcbin.c +++ b/ext/webrtc/gstwebrtcbin.c @@ -28,6 +28,8 @@ #include "utils.h" #include "webrtcsdp.h" #include "webrtctransceiver.h" +#include "webrtcdatachannel.h" +#include "sctptransport.h" #include #include @@ -50,8 +52,8 @@ /* * This webrtcbin implements the majority of the W3's peerconnection API and * implementation guide where possible. Generating offers, answers and setting - * local and remote SDP's are all supported. To start with, only the media - * interface has been implemented (no datachannel yet). + * local and remote SDP's are all supported. Both media descriptions and + * descriptions involving data channels are supported. * * Each input/output pad is equivalent to a Track in W3 parlance which are * added/removed from the bin. The number of requested sink pads is the number @@ -70,7 +72,6 @@ * LS groups * bundling * setting custom DTLS certificates - * data channel * * seperate session id's from mlineindex properly * how to deal with replacing a input/output track/stream @@ -107,6 +108,32 @@ _have_nice_elements (GstWebRTCBin * webrtc) return TRUE; } +static gboolean +_have_sctp_elements (GstWebRTCBin * webrtc) +{ + GstPluginFeature *feature; + + feature = gst_registry_lookup_feature (gst_registry_get (), "sctpdec"); + if (feature) { + gst_object_unref (feature); + } else { + GST_ELEMENT_ERROR (webrtc, CORE, MISSING_PLUGIN, NULL, + ("%s", "sctp elements are not available")); + return FALSE; + } + + feature = gst_registry_lookup_feature (gst_registry_get (), "sctpenc"); + if (feature) { + gst_object_unref (feature); + } else { + GST_ELEMENT_ERROR (webrtc, CORE, MISSING_PLUGIN, NULL, + ("%s", "sctp elements are not available")); + return FALSE; + } + + return TRUE; +} + static gboolean _have_dtls_elements (GstWebRTCBin * webrtc) { @@ -273,7 +300,8 @@ gst_webrtc_bin_pad_new (const gchar * name, GstPadDirection direction) G_DEFINE_TYPE_WITH_CODE (GstWebRTCBin, gst_webrtc_bin, GST_TYPE_BIN, G_ADD_PRIVATE (GstWebRTCBin) GST_DEBUG_CATEGORY_INIT (gst_webrtc_bin_debug, "webrtcbin", 0, - "webrtcbin element");); + "webrtcbin element"); + ); static GstPad *_connect_input_stream (GstWebRTCBin * webrtc, GstWebRTCBinPad * pad); @@ -303,6 +331,8 @@ enum ADD_TRANSCEIVER_SIGNAL, GET_TRANSCEIVERS_SIGNAL, ADD_TURN_SERVER_SIGNAL, + CREATE_DATA_CHANNEL_SIGNAL, + ON_DATA_CHANNEL_SIGNAL, LAST_SIGNAL, }; @@ -524,6 +554,47 @@ _find_pad (GstWebRTCBin * webrtc, gconstpointer data, FindPadFunc func) return NULL; } +typedef gboolean (*FindDataChannelFunc) (GstWebRTCDataChannel * p1, + gconstpointer data); + +static GstWebRTCDataChannel * +_find_data_channel (GstWebRTCBin * webrtc, gconstpointer data, + FindDataChannelFunc func) +{ + int i; + + for (i = 0; i < webrtc->priv->data_channels->len; i++) { + GstWebRTCDataChannel *channel = + g_array_index (webrtc->priv->data_channels, GstWebRTCDataChannel *, + i); + + if (func (channel, data)) + return channel; + } + + return NULL; +} + +static gboolean +data_channel_match_for_id (GstWebRTCDataChannel * channel, gint * id) +{ + return channel->id == *id; +} + +static GstWebRTCDataChannel * +_find_data_channel_for_id (GstWebRTCBin * webrtc, gint id) +{ + GstWebRTCDataChannel *channel; + + channel = _find_data_channel (webrtc, &id, + (FindDataChannelFunc) data_channel_match_for_id); + + GST_TRACE_OBJECT (webrtc, + "Found data channel %" GST_PTR_FORMAT " for id %i", channel, id); + + return channel; +} + static void _add_pad_to_list (GstWebRTCBin * webrtc, GstWebRTCBinPad * pad) { @@ -1415,7 +1486,6 @@ _create_transport_channel (GstWebRTCBin * webrtc, guint session_id) { GstWebRTCDTLSTransport *transport; TransportStream *ret; - gchar *pad_name; /* FIXME: how to parametrize the sender and the receiver */ ret = transport_stream_new (webrtc, session_id); @@ -1439,6 +1509,44 @@ _create_transport_channel (GstWebRTCBin * webrtc, guint session_id) G_CALLBACK (_on_dtls_transport_notify_state), webrtc); } + GST_TRACE_OBJECT (webrtc, + "Create transport %" GST_PTR_FORMAT " for session %u", ret, session_id); + + return ret; +} + +static gboolean +_message_media_is_datachannel (const GstSDPMessage * msg, guint media_id) +{ + const GstSDPMedia *media; + + if (!msg) + return FALSE; + + if (gst_sdp_message_medias_len (msg) <= media_id) + return FALSE; + + media = gst_sdp_message_get_media (msg, media_id); + + if (g_strcmp0 (gst_sdp_media_get_media (media), "application") != 0) + return FALSE; + + if (gst_sdp_media_formats_len (media) != 1) + return FALSE; + + if (g_strcmp0 (gst_sdp_media_get_format (media, 0), + "webrtc-datachannel") != 0) + return FALSE; + + return TRUE; +} + +static TransportStream * +_create_rtp_transport_channel (GstWebRTCBin * webrtc, guint session_id) +{ + TransportStream *ret = _create_transport_channel (webrtc, session_id); + gchar *pad_name; + gst_bin_add (GST_BIN (webrtc), GST_ELEMENT (ret->send_bin)); gst_bin_add (GST_BIN (webrtc), GST_ELEMENT (ret->receive_bin)); @@ -1456,15 +1564,217 @@ _create_transport_channel (GstWebRTCBin * webrtc, guint session_id) g_array_append_val (webrtc->priv->transports, ret); - GST_TRACE_OBJECT (webrtc, - "Create transport %" GST_PTR_FORMAT " for session %u", ret, session_id); - gst_element_sync_state_with_parent (GST_ELEMENT (ret->send_bin)); gst_element_sync_state_with_parent (GST_ELEMENT (ret->receive_bin)); return ret; } +/* this is called from the webrtc thread with the pc lock held */ +static void +_on_data_channel_ready_state (GstWebRTCDataChannel * channel, + GParamSpec * pspec, GstWebRTCBin * webrtc) +{ + GstWebRTCDataChannelState ready_state; + guint i; + + g_object_get (channel, "ready-state", &ready_state, NULL); + + if (ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_OPEN) { + gboolean found = FALSE; + + for (i = 0; i < webrtc->priv->pending_data_channels->len; i++) { + GstWebRTCDataChannel *c; + + c = g_array_index (webrtc->priv->pending_data_channels, + GstWebRTCDataChannel *, i); + if (c == channel) { + found = TRUE; + g_array_remove_index (webrtc->priv->pending_data_channels, i); + break; + } + } + if (found == FALSE) { + GST_FIXME_OBJECT (webrtc, "Received open for unknown data channel"); + return; + } + + g_array_append_val (webrtc->priv->data_channels, channel); + + g_signal_emit (webrtc, gst_webrtc_bin_signals[ON_DATA_CHANNEL_SIGNAL], 0, + gst_object_ref (channel)); + } +} + +static void +_link_data_channel_to_sctp (GstWebRTCBin * webrtc, + GstWebRTCDataChannel * channel) +{ + if (webrtc->priv->sctp_transport && !channel->sctp_transport) { + gint id; + + g_object_get (channel, "id", &id, NULL); + + if (webrtc->priv->sctp_transport->association_established && id != -1) { + gchar *pad_name; + + gst_webrtc_data_channel_set_sctp_transport (channel, + webrtc->priv->sctp_transport); + pad_name = g_strdup_printf ("sink_%u", id); + if (!gst_element_link_pads (channel->appsrc, "src", + channel->sctp_transport->sctpenc, pad_name)) + g_warn_if_reached (); + g_free (pad_name); + } + } +} + +static void +_on_sctpdec_pad_added (GstElement * sctpdec, GstPad * pad, + GstWebRTCBin * webrtc) +{ + GstWebRTCDataChannel *channel; + guint stream_id; + GstPad *sink_pad; + + if (sscanf (GST_PAD_NAME (pad), "src_%u", &stream_id) != 1) + return; + + PC_LOCK (webrtc); + channel = _find_data_channel_for_id (webrtc, stream_id); + if (!channel) { + channel = g_object_new (GST_TYPE_WEBRTC_DATA_CHANNEL, NULL); + channel->id = stream_id; + channel->webrtcbin = webrtc; + + gst_bin_add (GST_BIN (webrtc), channel->appsrc); + gst_bin_add (GST_BIN (webrtc), channel->appsink); + + gst_element_sync_state_with_parent (channel->appsrc); + gst_element_sync_state_with_parent (channel->appsink); + + _link_data_channel_to_sctp (webrtc, channel); + + g_array_append_val (webrtc->priv->pending_data_channels, channel); + } + + g_signal_connect (channel, "notify::ready-state", + G_CALLBACK (_on_data_channel_ready_state), webrtc); + + sink_pad = gst_element_get_static_pad (channel->appsink, "sink"); + if (gst_pad_link (pad, sink_pad) != GST_PAD_LINK_OK) + GST_WARNING_OBJECT (channel, "Failed to link sctp pad %s with channel %" + GST_PTR_FORMAT, GST_PAD_NAME (pad), channel); + gst_object_unref (sink_pad); + PC_UNLOCK (webrtc); +} + +static void +_on_sctp_state_notify (GstWebRTCSCTPTransport * sctp, GParamSpec * pspec, + GstWebRTCBin * webrtc) +{ + GstWebRTCSCTPTransportState state; + + g_object_get (sctp, "state", &state, NULL); + + if (state == GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED) { + int i; + + PC_LOCK (webrtc); + GST_DEBUG_OBJECT (webrtc, "SCTP association established"); + + for (i = 0; i < webrtc->priv->data_channels->len; i++) { + GstWebRTCDataChannel *channel; + + channel = + g_array_index (webrtc->priv->data_channels, GstWebRTCDataChannel *, + i); + + _link_data_channel_to_sctp (webrtc, channel); + + if (!channel->negotiated && !channel->opened) + gst_webrtc_data_channel_start_negotiation (channel); + } + PC_UNLOCK (webrtc); + } +} + +static TransportStream * +_create_data_channel_transports (GstWebRTCBin * webrtc, guint session_id) +{ + if (!webrtc->priv->data_channel_transport) { + TransportStream *stream = _create_transport_channel (webrtc, session_id); + GstWebRTCSCTPTransport *sctp_transport; + int i; + + webrtc->priv->data_channel_transport = stream; + + g_object_set (stream, "rtcp-mux", TRUE, NULL); + + gst_bin_add (GST_BIN (webrtc), GST_ELEMENT (stream->send_bin)); + gst_bin_add (GST_BIN (webrtc), GST_ELEMENT (stream->receive_bin)); + + if (!(sctp_transport = webrtc->priv->sctp_transport)) { + sctp_transport = gst_webrtc_sctp_transport_new (); + sctp_transport->transport = + g_object_ref (webrtc->priv->data_channel_transport->transport); + sctp_transport->webrtcbin = webrtc; + + gst_bin_add (GST_BIN (webrtc), sctp_transport->sctpdec); + gst_bin_add (GST_BIN (webrtc), sctp_transport->sctpenc); + } + + g_signal_connect (sctp_transport->sctpdec, "pad-added", + G_CALLBACK (_on_sctpdec_pad_added), webrtc); + g_signal_connect (sctp_transport, "notify::state", + G_CALLBACK (_on_sctp_state_notify), webrtc); + + if (!gst_element_link_pads (GST_ELEMENT (stream->receive_bin), "data_src", + GST_ELEMENT (sctp_transport->sctpdec), "sink")) + g_warn_if_reached (); + + if (!gst_element_link_pads (GST_ELEMENT (sctp_transport->sctpenc), "src", + GST_ELEMENT (stream->send_bin), "data_sink")) + g_warn_if_reached (); + + for (i = 0; i < webrtc->priv->data_channels->len; i++) { + GstWebRTCDataChannel *channel; + + channel = + g_array_index (webrtc->priv->data_channels, GstWebRTCDataChannel *, + i); + + _link_data_channel_to_sctp (webrtc, channel); + } + + gst_element_sync_state_with_parent (GST_ELEMENT (stream->send_bin)); + gst_element_sync_state_with_parent (GST_ELEMENT (stream->receive_bin)); + + if (!webrtc->priv->sctp_transport) { + gst_element_sync_state_with_parent (GST_ELEMENT + (sctp_transport->sctpdec)); + gst_element_sync_state_with_parent (GST_ELEMENT + (sctp_transport->sctpenc)); + } + + g_array_append_val (webrtc->priv->transports, stream); + + webrtc->priv->sctp_transport = sctp_transport; + } + + return webrtc->priv->data_channel_transport; +} + +static TransportStream * +_create_transport_stream (GstWebRTCBin * webrtc, guint session_id, + gboolean is_datachannel) +{ + if (is_datachannel) + return _create_data_channel_transports (webrtc, session_id); + else + return _create_rtp_transport_channel (webrtc, session_id); +} + static guint g_array_find_uint (GArray * array, guint val) { @@ -1797,7 +2107,7 @@ sdp_media_from_transceiver (GstWebRTCBin * webrtc, GstSDPMedia * media, /* FIXME: bundle */ item = _find_transport_for_session (webrtc, media_idx); if (!item) - item = _create_transport_channel (webrtc, media_idx); + item = _create_transport_stream (webrtc, media_idx, FALSE); webrtc_transceiver_set_transport (WEBRTC_TRANSCEIVER (trans), item); } @@ -1873,6 +2183,58 @@ _create_offer_task (GstWebRTCBin * webrtc, const GstStructure * options) gst_sdp_media_uninit (&media); } + /* add data channel support */ + if (webrtc->priv->data_channels->len > 0) { + GstSDPMedia media = { 0, }; + gchar *ufrag, *pwd, *sdp_mid; + + gst_sdp_media_init (&media); + /* mandated by JSEP */ + gst_sdp_media_add_attribute (&media, "setup", "actpass"); + + /* FIXME: only needed when restarting ICE */ + _generate_ice_credentials (&ufrag, &pwd); + gst_sdp_media_add_attribute (&media, "ice-ufrag", ufrag); + gst_sdp_media_add_attribute (&media, "ice-pwd", pwd); + g_free (ufrag); + g_free (pwd); + + gst_sdp_media_set_media (&media, "application"); + gst_sdp_media_set_port_info (&media, 9, 0); + gst_sdp_media_set_proto (&media, "UDP/DTLS/SCTP"); + gst_sdp_media_add_connection (&media, "IN", "IP4", "0.0.0.0", 0, 0); + gst_sdp_media_add_format (&media, "webrtc-datachannel"); + + sdp_mid = g_strdup_printf ("%s%u", gst_sdp_media_get_media (&media), + webrtc->priv->media_counter++); + gst_sdp_media_add_attribute (&media, "mid", sdp_mid); + g_free (sdp_mid); + + /* FIXME: negotiate this properly */ + gst_sdp_media_add_attribute (&media, "sctp-port", "5000"); + + _create_data_channel_transports (webrtc, webrtc->priv->transceivers->len); + { + gchar *cert, *fingerprint, *val; + + g_object_get (webrtc->priv->sctp_transport->transport, "certificate", + &cert, NULL); + + fingerprint = + _generate_fingerprint_from_certificate (cert, G_CHECKSUM_SHA256); + g_free (cert); + val = + g_strdup_printf ("%s %s", + _g_checksum_to_webrtc_string (G_CHECKSUM_SHA256), fingerprint); + g_free (fingerprint); + + gst_sdp_media_add_attribute (&media, "fingerprint", val); + g_free (val); + } + + gst_sdp_message_add_media (ret, &media); + } + /* FIXME: pre-emptively setup receiving elements when needed */ /* XXX: only true for the initial offerer */ @@ -2045,7 +2407,6 @@ _create_answer_task (GstWebRTCBin * webrtc, const GstStructure * options) gst_sdp_media_new (&media); gst_sdp_media_set_port_info (media, 9, 0); - gst_sdp_media_set_proto (media, "UDP/TLS/RTP/SAVPF"); gst_sdp_media_add_connection (media, "IN", "IP4", "0.0.0.0", 0, 0); { @@ -2060,6 +2421,7 @@ _create_answer_task (GstWebRTCBin * webrtc, const GstStructure * options) offer_media = (GstSDPMedia *) gst_sdp_message_get_media (pending_remote->sdp, i); + for (j = 0; j < gst_sdp_media_attributes_len (offer_media); j++) { const GstSDPAttribute *attr = gst_sdp_media_get_attribute (offer_media, j); @@ -2071,154 +2433,216 @@ _create_answer_task (GstWebRTCBin * webrtc, const GstStructure * options) } } - offer_caps = gst_caps_new_empty (); - for (j = 0; j < gst_sdp_media_formats_len (offer_media); j++) { - guint pt = atoi (gst_sdp_media_get_format (offer_media, j)); - GstCaps *caps; - - caps = gst_sdp_media_get_caps_from_media (offer_media, pt); - - /* gst_sdp_media_get_caps_from_media() produces caps with name - * "application/x-unknown" which will fail intersection with - * "application/x-rtp" caps so mangle the returns caps to have the - * correct name here */ - for (k = 0; k < gst_caps_get_size (caps); k++) { - GstStructure *s = gst_caps_get_structure (caps, k); - gst_structure_set_name (s, "application/x-rtp"); - } - - gst_caps_append (offer_caps, caps); - } - - for (j = 0; j < webrtc->priv->transceivers->len; j++) { - GstCaps *trans_caps; - - rtp_trans = - g_array_index (webrtc->priv->transceivers, GstWebRTCRTPTransceiver *, - j); - trans_caps = _find_codec_preferences (webrtc, rtp_trans, GST_PAD_SINK, j); - - GST_TRACE_OBJECT (webrtc, "trying to compare %" GST_PTR_FORMAT - " and %" GST_PTR_FORMAT, offer_caps, trans_caps); - - /* FIXME: technically this is a little overreaching as some fields we - * we can deal with not having and/or we may have unrecognized fields - * that we cannot actually support */ - if (trans_caps) { - answer_caps = gst_caps_intersect (offer_caps, trans_caps); - if (answer_caps && !gst_caps_is_empty (answer_caps)) { - GST_LOG_OBJECT (webrtc, - "found compatible transceiver %" GST_PTR_FORMAT - " for offer media %u", trans, i); - if (trans_caps) - gst_caps_unref (trans_caps); - break; - } else { - if (answer_caps) { - gst_caps_unref (answer_caps); - answer_caps = NULL; - } - if (trans_caps) - gst_caps_unref (trans_caps); - rtp_trans = NULL; - } - } else { - rtp_trans = NULL; - } - } - - if (rtp_trans) { - answer_dir = rtp_trans->direction; - g_assert (answer_caps != NULL); - } else { - /* if no transceiver, then we only receive that stream and respond with - * the exact same caps */ - /* FIXME: how to validate that subsequent elements can actually receive - * this payload/format */ - answer_dir = GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY; - answer_caps = gst_caps_ref (offer_caps); - } - - if (!rtp_trans) { - trans = _create_webrtc_transceiver (webrtc, answer_dir, i); - rtp_trans = GST_WEBRTC_RTP_TRANSCEIVER (trans); - } else { - trans = WEBRTC_TRANSCEIVER (rtp_trans); - } - - if (!trans->do_nack) { - answer_caps = gst_caps_make_writable (answer_caps); - for (k = 0; k < gst_caps_get_size (answer_caps); k++) { - GstStructure *s = gst_caps_get_structure (answer_caps, k); - gst_structure_remove_fields (s, "rtcp-fb-nack", NULL); - } - } - - gst_sdp_media_set_media_from_caps (answer_caps, media); - - _get_rtx_target_pt_and_ssrc_from_caps (answer_caps, &target_pt, - &target_ssrc); - - original_target_pt = target_pt; - - _media_add_fec (media, trans, offer_caps, &target_pt); - if (trans->do_nack) { - _media_add_rtx (media, trans, offer_caps, target_pt, target_ssrc); - if (target_pt != original_target_pt) - _media_add_rtx (media, trans, offer_caps, original_target_pt, - target_ssrc); - } - - if (answer_dir != GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY) - _media_add_ssrcs (media, answer_caps, webrtc, - WEBRTC_TRANSCEIVER (rtp_trans)); - - gst_caps_unref (answer_caps); - answer_caps = NULL; - - /* set the new media direction */ - offer_dir = _get_direction_from_media (offer_media); - answer_dir = _intersect_answer_directions (offer_dir, answer_dir); - if (answer_dir == GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_NONE) { - GST_WARNING_OBJECT (webrtc, "Could not intersect offer direction with " - "transceiver direction"); - goto rejected; - } - _media_replace_direction (media, answer_dir); - /* set the a=setup: attribute */ offer_setup = _get_dtls_setup_from_media (offer_media); answer_setup = _intersect_dtls_setup (offer_setup); if (answer_setup == GST_WEBRTC_DTLS_SETUP_NONE) { - GST_WARNING_OBJECT (webrtc, "Could not intersect offer direction with " + GST_WARNING_OBJECT (webrtc, "Could not intersect offer setup with " "transceiver direction"); goto rejected; } _media_replace_setup (media, answer_setup); - /* FIXME: bundle! */ - if (!trans->stream) { - TransportStream *item = _find_transport_for_session (webrtc, i); - if (!item) - item = _create_transport_channel (webrtc, i); - webrtc_transceiver_set_transport (trans, item); - } - /* set the a=fingerprint: for this transport */ - g_object_get (trans->stream->transport, "certificate", &cert, NULL); + if (g_strcmp0 (gst_sdp_media_get_media (offer_media), "application") == 0) { + int sctp_port; - { - gchar *fingerprint, *val; + if (gst_sdp_media_formats_len (offer_media) != 1) { + GST_WARNING_OBJECT (webrtc, "Could not find a format in the m= line " + "for webrtc-datachannel"); + goto rejected; + } + if (g_strcmp0 (gst_sdp_media_get_format (offer_media, 0), + "webrtc-datachannel") != 0) { + GST_WARNING_OBJECT (webrtc, + "format field of data channel m= line " + "is not \'webrtc-datachannel\'"); + goto rejected; + } + sctp_port = _get_sctp_port_from_media (offer_media); + if (sctp_port == -1) { + GST_WARNING_OBJECT (webrtc, "media does not contain a sctp port"); + goto rejected; + } - fingerprint = - _generate_fingerprint_from_certificate (cert, G_CHECKSUM_SHA256); - g_free (cert); - val = - g_strdup_printf ("%s %s", - _g_checksum_to_webrtc_string (G_CHECKSUM_SHA256), fingerprint); - g_free (fingerprint); + /* XXX: older browsers will produce a different SDP format for data + * channel that is currently not parsed correctly */ + gst_sdp_media_set_proto (media, "UDP/DTLS/SCTP"); - gst_sdp_media_add_attribute (media, "fingerprint", val); - g_free (val); + gst_sdp_media_set_media (media, "application"); + gst_sdp_media_set_port_info (media, 9, 0); + gst_sdp_media_add_format (media, "webrtc-datachannel"); + + /* FIXME: negotiate this properly on renegotiation */ + gst_sdp_media_add_attribute (media, "sctp-port", "5000"); + + _create_data_channel_transports (webrtc, i); + + { + gchar *cert, *fingerprint, *val; + + g_object_get (webrtc->priv->sctp_transport->transport, "certificate", + &cert, NULL); + + fingerprint = + _generate_fingerprint_from_certificate (cert, G_CHECKSUM_SHA256); + g_free (cert); + val = + g_strdup_printf ("%s %s", + _g_checksum_to_webrtc_string (G_CHECKSUM_SHA256), fingerprint); + g_free (fingerprint); + + gst_sdp_media_add_attribute (media, "fingerprint", val); + g_free (val); + } + } else if (g_strcmp0 (gst_sdp_media_get_media (offer_media), "audio") == 0 + || g_strcmp0 (gst_sdp_media_get_media (offer_media), "video") == 0) { + gst_sdp_media_set_proto (media, "UDP/TLS/RTP/SAVPF"); + + offer_caps = gst_caps_new_empty (); + for (j = 0; j < gst_sdp_media_formats_len (offer_media); j++) { + guint pt = atoi (gst_sdp_media_get_format (offer_media, j)); + GstCaps *caps; + + caps = gst_sdp_media_get_caps_from_media (offer_media, pt); + + /* gst_sdp_media_get_caps_from_media() produces caps with name + * "application/x-unknown" which will fail intersection with + * "application/x-rtp" caps so mangle the returns caps to have the + * correct name here */ + for (k = 0; k < gst_caps_get_size (caps); k++) { + GstStructure *s = gst_caps_get_structure (caps, k); + gst_structure_set_name (s, "application/x-rtp"); + } + + gst_caps_append (offer_caps, caps); + } + + for (j = 0; j < webrtc->priv->transceivers->len; j++) { + GstCaps *trans_caps; + + rtp_trans = + g_array_index (webrtc->priv->transceivers, + GstWebRTCRTPTransceiver *, j); + trans_caps = + _find_codec_preferences (webrtc, rtp_trans, GST_PAD_SINK, j); + + GST_TRACE_OBJECT (webrtc, "trying to compare %" GST_PTR_FORMAT + " and %" GST_PTR_FORMAT, offer_caps, trans_caps); + + /* FIXME: technically this is a little overreaching as some fields we + * we can deal with not having and/or we may have unrecognized fields + * that we cannot actually support */ + if (trans_caps) { + answer_caps = gst_caps_intersect (offer_caps, trans_caps); + if (answer_caps && !gst_caps_is_empty (answer_caps)) { + GST_LOG_OBJECT (webrtc, + "found compatible transceiver %" GST_PTR_FORMAT + " for offer media %u", trans, i); + if (trans_caps) + gst_caps_unref (trans_caps); + break; + } else { + if (answer_caps) { + gst_caps_unref (answer_caps); + answer_caps = NULL; + } + if (trans_caps) + gst_caps_unref (trans_caps); + rtp_trans = NULL; + } + } else { + rtp_trans = NULL; + } + } + + if (rtp_trans) { + answer_dir = rtp_trans->direction; + g_assert (answer_caps != NULL); + } else { + /* if no transceiver, then we only receive that stream and respond with + * the exact same caps */ + /* FIXME: how to validate that subsequent elements can actually receive + * this payload/format */ + answer_dir = GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY; + answer_caps = gst_caps_ref (offer_caps); + } + + if (!rtp_trans) { + trans = _create_webrtc_transceiver (webrtc, answer_dir, i); + rtp_trans = GST_WEBRTC_RTP_TRANSCEIVER (trans); + } else { + trans = WEBRTC_TRANSCEIVER (rtp_trans); + } + + if (!trans->do_nack) { + answer_caps = gst_caps_make_writable (answer_caps); + for (k = 0; k < gst_caps_get_size (answer_caps); k++) { + GstStructure *s = gst_caps_get_structure (answer_caps, k); + gst_structure_remove_fields (s, "rtcp-fb-nack", NULL); + } + } + + gst_sdp_media_set_media_from_caps (answer_caps, media); + + _get_rtx_target_pt_and_ssrc_from_caps (answer_caps, &target_pt, + &target_ssrc); + + original_target_pt = target_pt; + + _media_add_fec (media, trans, offer_caps, &target_pt); + if (trans->do_nack) { + _media_add_rtx (media, trans, offer_caps, target_pt, target_ssrc); + if (target_pt != original_target_pt) + _media_add_rtx (media, trans, offer_caps, original_target_pt, + target_ssrc); + } + + if (answer_dir != GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY) + _media_add_ssrcs (media, answer_caps, webrtc, + WEBRTC_TRANSCEIVER (rtp_trans)); + + gst_caps_unref (answer_caps); + answer_caps = NULL; + + /* set the new media direction */ + offer_dir = _get_direction_from_media (offer_media); + answer_dir = _intersect_answer_directions (offer_dir, answer_dir); + if (answer_dir == GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_NONE) { + GST_WARNING_OBJECT (webrtc, "Could not intersect offer direction with " + "transceiver direction"); + goto rejected; + } + _media_replace_direction (media, answer_dir); + + /* FIXME: bundle! */ + if (!trans->stream) { + TransportStream *item = _find_transport_for_session (webrtc, i); + if (!item) + item = _create_transport_stream (webrtc, i, FALSE); + webrtc_transceiver_set_transport (trans, item); + } + /* set the a=fingerprint: for this transport */ + g_object_get (trans->stream->transport, "certificate", &cert, NULL); + + { + gchar *fingerprint, *val; + + fingerprint = + _generate_fingerprint_from_certificate (cert, G_CHECKSUM_SHA256); + g_free (cert); + val = + g_strdup_printf ("%s %s", + _g_checksum_to_webrtc_string (G_CHECKSUM_SHA256), fingerprint); + g_free (fingerprint); + + gst_sdp_media_add_attribute (media, "fingerprint", val); + g_free (val); + } + + gst_caps_unref (offer_caps); + } else { + GST_WARNING_OBJECT (webrtc, "unknown m= line media name"); + goto rejected; } if (0) { @@ -2230,8 +2654,6 @@ _create_answer_task (GstWebRTCBin * webrtc, const GstStructure * options) } gst_sdp_message_add_media (ret, media); gst_sdp_media_free (media); - - gst_caps_unref (offer_caps); } /* FIXME: can we add not matched transceivers? */ @@ -2410,7 +2832,7 @@ _connect_input_stream (GstWebRTCBin * webrtc, GstWebRTCBinPad * pad) /* FIXME: bundle */ item = _find_transport_for_session (webrtc, pad->mlineindex); if (!item) - item = _create_transport_channel (webrtc, pad->mlineindex); + item = _create_transport_stream (webrtc, pad->mlineindex, FALSE); webrtc_transceiver_set_transport (trans, item); } @@ -2454,7 +2876,7 @@ _connect_output_stream (GstWebRTCBin * webrtc, GstWebRTCBinPad * pad) /* FIXME: bundle */ item = _find_transport_for_session (webrtc, pad->mlineindex); if (!item) - item = _create_transport_channel (webrtc, pad->mlineindex); + item = _create_transport_stream (webrtc, pad->mlineindex, FALSE); webrtc_transceiver_set_transport (trans, item); } @@ -2512,10 +2934,9 @@ _filter_sdp_fields (GQuark field_id, const GValue * value, static void _update_transceiver_from_sdp_media (GstWebRTCBin * webrtc, const GstSDPMessage * sdp, guint media_idx, - GstWebRTCRTPTransceiver * rtp_trans) + TransportStream * stream, GstWebRTCRTPTransceiver * rtp_trans) { WebRTCTransceiver *trans = WEBRTC_TRANSCEIVER (rtp_trans); - TransportStream *stream = trans->stream; GstWebRTCRTPTransceiverDirection prev_dir = rtp_trans->current_direction; GstWebRTCRTPTransceiverDirection new_dir; const GstSDPMedia *media = gst_sdp_message_get_media (sdp, media_idx); @@ -2534,14 +2955,6 @@ _update_transceiver_from_sdp_media (GstWebRTCBin * webrtc, } } - if (!stream) { - /* FIXME: find an existing transport for e.g. bundle/reconfiguration */ - stream = _find_transport_for_session (webrtc, media_idx); - if (!stream) - stream = _create_transport_channel (webrtc, media_idx); - webrtc_transceiver_set_transport (trans, stream); - } - { const GstSDPMedia *local_media, *remote_media; GstWebRTCRTPTransceiverDirection local_dir, remote_dir; @@ -2725,6 +3138,121 @@ _update_transceiver_from_sdp_media (GstWebRTCBin * webrtc, } } +/* must be called with the pc lock held */ +static gint +_generate_data_channel_id (GstWebRTCBin * webrtc) +{ + gboolean is_client; + gint new_id = -1, max_channels = 0; + + if (webrtc->priv->sctp_transport) { + g_object_get (webrtc->priv->sctp_transport, "max-channels", &max_channels, + NULL); + } + if (max_channels <= 0) { + max_channels = 65534; + } + + g_object_get (webrtc->priv->sctp_transport->transport, "client", &is_client, + NULL); + + /* TODO: a better search algorithm */ + do { + GstWebRTCDataChannel *channel; + + new_id++; + + if (new_id < 0 || new_id >= max_channels) { + /* exhausted id space */ + GST_WARNING_OBJECT (webrtc, "Could not find a suitable " + "data channel id (max %i)", max_channels); + return -1; + } + + /* client must generate even ids, server must generate odd ids */ + if (new_id % 2 == ! !is_client) + continue; + + channel = _find_data_channel_for_id (webrtc, new_id); + if (!channel) + break; + } while (TRUE); + + return new_id; +} + +static void +_update_data_channel_from_sdp_media (GstWebRTCBin * webrtc, + const GstSDPMessage * sdp, guint media_idx, TransportStream * stream) +{ + const GstSDPMedia *local_media, *remote_media; + GstWebRTCDTLSSetup local_setup, remote_setup, new_setup; + TransportReceiveBin *receive; + int local_port, remote_port; + guint64 local_max_size, remote_max_size, max_size; + int i; + + local_media = + gst_sdp_message_get_media (webrtc->current_local_description->sdp, + media_idx); + remote_media = + gst_sdp_message_get_media (webrtc->current_remote_description->sdp, + media_idx); + + local_setup = _get_dtls_setup_from_media (local_media); + remote_setup = _get_dtls_setup_from_media (remote_media); + new_setup = _get_final_setup (local_setup, remote_setup); + if (new_setup == GST_WEBRTC_DTLS_SETUP_NONE) + return; + + /* data channel is always rtcp-muxed to avoid generating ICE candidates + * for RTCP */ + g_object_set (stream, "rtcp-mux", TRUE, "dtls-client", + new_setup == GST_WEBRTC_DTLS_SETUP_ACTIVE, NULL); + + local_port = _get_sctp_port_from_media (local_media); + remote_port = _get_sctp_port_from_media (local_media); + if (local_port == -1 || remote_port == -1) + return; + + if (0 == (local_max_size = + _get_sctp_max_message_size_from_media (local_media))) + local_max_size = G_MAXUINT64; + if (0 == (remote_max_size = + _get_sctp_max_message_size_from_media (remote_media))) + remote_max_size = G_MAXUINT64; + max_size = MIN (local_max_size, remote_max_size); + + webrtc->priv->sctp_transport->max_message_size = max_size; + + g_object_set (webrtc->priv->sctp_transport->sctpdec, "local-sctp-port", + local_port, NULL); + g_object_set (webrtc->priv->sctp_transport->sctpenc, "remote-sctp-port", + remote_port, NULL); + + for (i = 0; i < webrtc->priv->data_channels->len; i++) { + GstWebRTCDataChannel *channel; + + channel = + g_array_index (webrtc->priv->data_channels, GstWebRTCDataChannel *, i); + + if (channel->id == -1) + channel->id = _generate_data_channel_id (webrtc); + if (channel->id == -1) + GST_ELEMENT_WARNING (webrtc, RESOURCE, NOT_FOUND, + ("%s", "Failed to generate an identifier for a data channel"), NULL); + + if (webrtc->priv->sctp_transport->association_established + && !channel->negotiated && !channel->opened) { + _link_data_channel_to_sctp (webrtc, channel); + gst_webrtc_data_channel_start_negotiation (channel); + } + } + + receive = TRANSPORT_RECEIVE_BIN (stream->receive_bin); + transport_receive_bin_set_receive_state (receive, RECEIVE_STATE_PASS); +} + static gboolean _find_compatible_unassociated_transceiver (GstWebRTCRTPTransceiver * p1, gconstpointer data) @@ -2745,6 +3273,7 @@ _update_transceivers_from_sdp (GstWebRTCBin * webrtc, SDPSource source, for (i = 0; i < gst_sdp_message_medias_len (sdp->sdp); i++) { const GstSDPMedia *media = gst_sdp_message_get_media (sdp->sdp, i); + TransportStream *stream; GstWebRTCRTPTransceiver *trans; /* skip rejected media */ @@ -2753,24 +3282,41 @@ _update_transceivers_from_sdp (GstWebRTCBin * webrtc, SDPSource source, trans = _find_transceiver_for_sdp_media (webrtc, sdp->sdp, i); + stream = _find_transport_for_session (webrtc, i); + if (!stream) { + stream = _create_transport_stream (webrtc, i, + _message_media_is_datachannel (sdp->sdp, i)); + if (trans) + webrtc_transceiver_set_transport ((WebRTCTransceiver *) trans, stream); + } + if (source == SDP_LOCAL && sdp->type == GST_WEBRTC_SDP_TYPE_OFFER && !trans) { GST_ERROR ("State mismatch. Could not find local transceiver by mline."); return FALSE; } else { - if (trans) { - _update_transceiver_from_sdp_media (webrtc, sdp->sdp, i, trans); + if (g_strcmp0 (gst_sdp_media_get_media (media), "audio") == 0 || + g_strcmp0 (gst_sdp_media_get_media (media), "video") == 0) { + if (trans) { + _update_transceiver_from_sdp_media (webrtc, sdp->sdp, i, stream, + trans); + } else { + trans = _find_transceiver (webrtc, NULL, + (FindTransceiverFunc) _find_compatible_unassociated_transceiver); + /* XXX: default to the advertised direction in the sdp for new + * transceviers. The spec doesn't actually say what happens here, only + * that calls to setDirection will change the value. Nothing about + * a default value when the transceiver is created internally */ + if (!trans) + trans = + GST_WEBRTC_RTP_TRANSCEIVER (_create_webrtc_transceiver (webrtc, + _get_direction_from_media (media), i)); + _update_transceiver_from_sdp_media (webrtc, sdp->sdp, i, stream, + trans); + } + } else if (_message_media_is_datachannel (sdp->sdp, i)) { + _update_data_channel_from_sdp_media (webrtc, sdp->sdp, i, stream); } else { - trans = _find_transceiver (webrtc, NULL, - (FindTransceiverFunc) _find_compatible_unassociated_transceiver); - /* XXX: default to the advertised direction in the sdp for new - * transceviers. The spec doesn't actually say what happens here, only - * that calls to setDirection will change the value. Nothing about - * a default value when the transceiver is created internally */ - if (!trans) - trans = - GST_WEBRTC_RTP_TRANSCEIVER (_create_webrtc_transceiver (webrtc, - _get_direction_from_media (media), i)); - _update_transceiver_from_sdp_media (webrtc, sdp->sdp, i, trans); + GST_ERROR_OBJECT (webrtc, "Unknown media type in SDP at index %u", i); } } } @@ -2982,8 +3528,6 @@ _set_description_task (GstWebRTCBin * webrtc, struct set_description *sd) g_free (to); } - /* TODO: necessary data channel modifications */ - if (sd->sdp->type == GST_WEBRTC_SDP_TYPE_ROLLBACK) { /* FIXME: * If the mid value of an RTCRtpTransceiver was set to a non-null value @@ -3001,8 +3545,8 @@ _set_description_task (GstWebRTCBin * webrtc, struct set_description *sd) } if (webrtc->signaling_state == GST_WEBRTC_SIGNALING_STATE_STABLE) { - GList *tmp; gboolean prev_need_negotiation = webrtc->priv->need_negotiation; + GList *tmp; /* media modifications */ _update_transceivers_from_sdp (webrtc, sd->source, sd->sdp); @@ -3040,7 +3584,9 @@ _set_description_task (GstWebRTCBin * webrtc, struct set_description *sd) /* FIXME: bundle */ item = _find_transport_for_session (webrtc, i); if (!item) - item = _create_transport_channel (webrtc, i); + item = + _create_transport_stream (webrtc, i, + _message_media_is_datachannel (sd->sdp->sdp, i)); _get_ice_credentials_from_sdp_media (sd->sdp->sdp, i, &ufrag, &pwd); gst_webrtc_ice_set_local_credentials (webrtc->priv->ice, @@ -3060,7 +3606,9 @@ _set_description_task (GstWebRTCBin * webrtc, struct set_description *sd) /* FIXME: bundle */ item = _find_transport_for_session (webrtc, i); if (!item) - item = _create_transport_channel (webrtc, i); + item = + _create_transport_stream (webrtc, i, + _message_media_is_datachannel (sd->sdp->sdp, i)); _get_ice_credentials_from_sdp_media (sd->sdp->sdp, i, &ufrag, &pwd); gst_webrtc_ice_set_remote_credentials (webrtc->priv->ice, @@ -3374,6 +3922,130 @@ copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) return TRUE; } +static GstWebRTCDataChannel * +gst_webrtc_bin_create_data_channel (GstWebRTCBin * webrtc, const gchar * label, + GstStructure * init_params) +{ + gboolean ordered; + gint max_packet_lifetime; + gint max_retransmits; + const gchar *protocol; + gboolean negotiated; + gint id; + GstWebRTCPriorityType priority; + GstWebRTCDataChannel *ret; + gint max_channels = 65534; + + g_return_val_if_fail (GST_IS_WEBRTC_BIN (webrtc), NULL); + g_return_val_if_fail (label != NULL, NULL); + g_return_val_if_fail (strlen (label) <= 65535, NULL); + g_return_val_if_fail (webrtc->priv->is_closed != TRUE, NULL); + + if (!init_params + || !gst_structure_get_boolean (init_params, "ordered", &ordered)) + ordered = TRUE; + if (!init_params + || !gst_structure_get_int (init_params, "max-packet-lifetime", + &max_packet_lifetime)) + max_packet_lifetime = -1; + if (!init_params + || !gst_structure_get_boolean (init_params, "max-retransmits", + &max_retransmits)) + max_retransmits = -1; + /* both retransmits and lifetime cannot be set */ + g_return_val_if_fail ((max_packet_lifetime == -1) + || (max_retransmits == -1), NULL); + + if (!init_params + || !(protocol = gst_structure_get_string (init_params, "protocol"))) + protocol = ""; + g_return_val_if_fail (strlen (protocol) <= 65535, NULL); + + if (!init_params + || !gst_structure_get_boolean (init_params, "negotiated", &negotiated)) + negotiated = FALSE; + if (!negotiated || !init_params + || !gst_structure_get_int (init_params, "id", &id)) + id = -1; + if (negotiated) + g_return_val_if_fail (id != -1, NULL); + g_return_val_if_fail (id < 65535, NULL); + + if (!init_params + || !gst_structure_get_enum (init_params, "priority", + GST_TYPE_WEBRTC_PRIORITY_TYPE, (gint *) & priority)) + priority = GST_WEBRTC_PRIORITY_TYPE_LOW; + + /* FIXME: clamp max-retransmits and max-packet-lifetime */ + + if (webrtc->priv->sctp_transport) { + /* Let transport be the connection's [[SctpTransport]] slot. + * + * If the [[DataChannelId]] slot is not null, transport is in + * connected state and [[DataChannelId]] is greater or equal to the + * transport's [[MaxChannels]] slot, throw an OperationError. + */ + g_object_get (webrtc->priv->sctp_transport, "max-channels", &max_channels, + NULL); + + g_return_val_if_fail (id <= max_channels, NULL); + } + + if (!_have_nice_elements (webrtc) || !_have_dtls_elements (webrtc) || + !_have_sctp_elements (webrtc)) + return NULL; + + PC_LOCK (webrtc); + /* check if the id has been used already */ + if (id != -1) { + GstWebRTCDataChannel *channel = _find_data_channel_for_id (webrtc, id); + if (channel) { + GST_ELEMENT_WARNING (webrtc, LIBRARY, SETTINGS, + ("Attempting to add a data channel with a duplicate ID: %i", id), + NULL); + PC_UNLOCK (webrtc); + return NULL; + } + } else if (webrtc->current_local_description + && webrtc->current_remote_description && webrtc->priv->sctp_transport + && webrtc->priv->sctp_transport->transport) { + /* else we can only generate an id if we're configured already. The other + * case for generating an id is on sdp setting */ + id = _generate_data_channel_id (webrtc); + if (id == -1) { + GST_ELEMENT_WARNING (webrtc, RESOURCE, NOT_FOUND, + ("%s", "Failed to generate an identifier for a data channel"), NULL); + PC_UNLOCK (webrtc); + return NULL; + } + } + + ret = g_object_new (GST_TYPE_WEBRTC_DATA_CHANNEL, "label", label, + "ordered", ordered, "max-packet-lifetime", max_packet_lifetime, + "max-retransmits", max_retransmits, "protocol", protocol, + "negotiated", negotiated, "id", id, "priority", priority, NULL); + + if (ret) { + gst_bin_add (GST_BIN (webrtc), ret->appsrc); + gst_bin_add (GST_BIN (webrtc), ret->appsink); + + gst_element_sync_state_with_parent (ret->appsrc); + gst_element_sync_state_with_parent (ret->appsink); + + ret = gst_object_ref (ret); + ret->webrtcbin = webrtc; + g_array_append_val (webrtc->priv->data_channels, ret); + _link_data_channel_to_sctp (webrtc, ret); + if (webrtc->priv->sctp_transport && + webrtc->priv->sctp_transport->association_established + && !ret->negotiated) + gst_webrtc_data_channel_start_negotiation (ret); + } + + PC_UNLOCK (webrtc); + return ret; +} + /* === rtpbin signal implementations === */ static void @@ -4001,6 +4673,8 @@ gst_webrtc_bin_dispose (GObject * object) g_array_free (webrtc->priv->ice_stream_map, TRUE); webrtc->priv->ice_stream_map = NULL; + g_clear_object (&webrtc->priv->sctp_transport); + G_OBJECT_CLASS (parent_class)->dispose (object); } @@ -4017,6 +4691,14 @@ gst_webrtc_bin_finalize (GObject * object) g_array_free (webrtc->priv->transceivers, TRUE); webrtc->priv->transceivers = NULL; + if (webrtc->priv->data_channels) + g_array_free (webrtc->priv->data_channels, TRUE); + webrtc->priv->data_channels = NULL; + + if (webrtc->priv->pending_data_channels) + g_array_free (webrtc->priv->pending_data_channels, TRUE); + webrtc->priv->pending_data_channels = NULL; + if (webrtc->priv->pending_ice_candidates) g_array_free (webrtc->priv->pending_ice_candidates, TRUE); webrtc->priv->pending_ice_candidates = NULL; @@ -4312,6 +4994,16 @@ gst_webrtc_bin_class_init (GstWebRTCBinClass * klass) G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_WEBRTC_RTP_TRANSCEIVER); + /** + * GstWebRTCBin::on-data-channel: + * @object: the #GstWebRtcBin + * @candidate: the new #GstWebRTCDataChannel + */ + gst_webrtc_bin_signals[ON_DATA_CHANNEL_SIGNAL] = + g_signal_new ("on-data-channel", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_WEBRTC_DATA_CHANNEL); + /** * GstWebRTCBin::add-transceiver: * @object: the #GstWebRtcBin @@ -4351,6 +5043,33 @@ gst_webrtc_bin_class_init (GstWebRTCBinClass * klass) G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_CALLBACK (gst_webrtc_bin_add_turn_server), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 1, G_TYPE_STRING); + + /* + * GstWebRTCBin::create-data-channel: + * @object: the #GstWebRtcBin + * @label: the label for the data channel + * @options: a #GstStructure of options for creating the data channel + * + * The options dictionary is the same format as the RTCDataChannelInit + * members outlined https://www.w3.org/TR/webrtc/#dom-rtcdatachannelinit and + * and reproduced below + * + * ordered G_TYPE_BOOLEAN Whether the channal will send data with guarenteed ordering + * max-packet-lifetime G_TYPE_INT The time in milliseconds to attempt transmitting unacknowledged data. -1 for unset + * max-retransmits G_TYPE_INT The number of times data will be attempted to be transmitted without acknowledgement before dropping + * protocol G_TYPE_STRING The subprotocol used by this channel + * negotiated G_TYPE_BOOLEAN Whether the created data channel should not perform in-band chnanel announcment. If %TRUE, then application must negotiate the channel itself and create the corresponding channel on the peer with the same id. + * id G_TYPE_INT Override the default identifier selection of this channel + * priority GST_TYPE_WEBRTC_PRIORITY_TYPE The priority to use for this channel + * + * Returns: a new data channel object + */ + gst_webrtc_bin_signals[CREATE_DATA_CHANNEL_SIGNAL] = + g_signal_new_class_handler ("create-data-channel", + G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_CALLBACK (gst_webrtc_bin_create_data_channel), NULL, NULL, + g_cclosure_marshal_generic, GST_TYPE_WEBRTC_DATA_CHANNEL, 2, + G_TYPE_STRING, GST_TYPE_STRUCTURE); } static void @@ -4404,6 +5123,15 @@ gst_webrtc_bin_init (GstWebRTCBin * webrtc) g_array_set_clear_func (webrtc->priv->transports, (GDestroyNotify) _transport_free); + webrtc->priv->data_channels = g_array_new (FALSE, TRUE, sizeof (gpointer)); + g_array_set_clear_func (webrtc->priv->data_channels, + (GDestroyNotify) _deref_and_unref); + + webrtc->priv->pending_data_channels = + g_array_new (FALSE, TRUE, sizeof (gpointer)); + g_array_set_clear_func (webrtc->priv->pending_data_channels, + (GDestroyNotify) _deref_and_unref); + webrtc->priv->session_mid_map = g_array_new (FALSE, TRUE, sizeof (SessionMidItem)); g_array_set_clear_func (webrtc->priv->session_mid_map, diff --git a/ext/webrtc/gstwebrtcbin.h b/ext/webrtc/gstwebrtcbin.h index 49603ec5e7..a0cc69446f 100644 --- a/ext/webrtc/gstwebrtcbin.h +++ b/ext/webrtc/gstwebrtcbin.h @@ -23,6 +23,7 @@ #include #include "fwd.h" #include "gstwebrtcice.h" +#include "transportstream.h" G_BEGIN_DECLS @@ -37,7 +38,9 @@ typedef enum GST_WEBRTC_BIN_ERROR_INVALID_STATE, GST_WEBRTC_BIN_ERROR_BAD_SDP, GST_WEBRTC_BIN_ERROR_FINGERPRINT, -} GstWebRTCJSEPSDPError; + GST_WEBRTC_BIN_ERROR_SCTP_FAILURE, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, +} GstWebRTCError; GType gst_webrtc_bin_pad_get_type(void); #define GST_TYPE_WEBRTC_BIN_PAD (gst_webrtc_bin_pad_get_type()) @@ -107,6 +110,13 @@ struct _GstWebRTCBinPrivate GArray *transceivers; GArray *session_mid_map; GArray *transports; + GArray *data_channels; + /* list of data channels we've received a sctp stream for but no data + * channel protocol for */ + GArray *pending_data_channels; + + GstWebRTCSCTPTransport *sctp_transport; + TransportStream *data_channel_transport; GstWebRTCICE *ice; GArray *ice_stream_map; @@ -115,7 +125,6 @@ struct _GstWebRTCBinPrivate /* peerconnection variables */ gboolean is_closed; gboolean need_negotiation; - gpointer sctp_transport; /* FIXME */ /* peerconnection helper thread for promises */ GMainContext *main_context; diff --git a/ext/webrtc/meson.build b/ext/webrtc/meson.build index 04145eb054..b2f485be57 100644 --- a/ext/webrtc/meson.build +++ b/ext/webrtc/meson.build @@ -4,6 +4,7 @@ webrtc_sources = [ 'gstwebrtcstats.c', 'icestream.c', 'nicetransport.c', + 'sctptransport.c', 'gstwebrtcbin.c', 'transportreceivebin.c', 'transportsendbin.c', @@ -11,6 +12,7 @@ webrtc_sources = [ 'utils.c', 'webrtcsdp.c', 'webrtctransceiver.c', + 'webrtcdatachannel.c', ] libnice_dep = dependency('nice', version : '>=0.1.14', required : get_option('webrtc'), @@ -22,7 +24,7 @@ if libnice_dep.found() webrtc_sources, c_args : gst_plugins_bad_args + ['-DGST_USE_UNSTABLE_API'], include_directories : [configinc], - dependencies : [libnice_dep, gstbase_dep, gstsdp_dep, gstwebrtc_dep], + dependencies : [libnice_dep, gstbase_dep, gstsdp_dep, gstapp_dep, gstwebrtc_dep, gstsctp_dep], install : true, install_dir : plugins_install_dir, ) diff --git a/ext/webrtc/sctptransport.c b/ext/webrtc/sctptransport.c new file mode 100644 index 0000000000..f5643e9fe0 --- /dev/null +++ b/ext/webrtc/sctptransport.c @@ -0,0 +1,270 @@ +/* GStreamer + * Copyright (C) 2018 Matthew Waters + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include + +#include "sctptransport.h" +#include "gstwebrtcbin.h" + +#define GST_CAT_DEFAULT gst_webrtc_sctp_transport_debug +GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT); + +enum +{ + SIGNAL_0, + ON_RESET_STREAM_SIGNAL, + LAST_SIGNAL, +}; + +enum +{ + PROP_0, + PROP_TRANSPORT, + PROP_STATE, + PROP_MAX_MESSAGE_SIZE, + PROP_MAX_CHANNELS, +}; + +static guint gst_webrtc_sctp_transport_signals[LAST_SIGNAL] = { 0 }; + +#define gst_webrtc_sctp_transport_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE (GstWebRTCSCTPTransport, gst_webrtc_sctp_transport, + GST_TYPE_OBJECT, GST_DEBUG_CATEGORY_INIT (gst_webrtc_sctp_transport_debug, + "webrtcsctptransport", 0, "webrtcsctptransport");); + +typedef void (*SCTPTask) (GstWebRTCSCTPTransport * sctp, gpointer user_data); + +struct task +{ + GstWebRTCSCTPTransport *sctp; + SCTPTask func; + gpointer user_data; + GDestroyNotify notify; +}; + +static void +_execute_task (GstWebRTCBin * webrtc, struct task *task) +{ + if (task->func) + task->func (task->sctp, task->user_data); +} + +static void +_free_task (struct task *task) +{ + gst_object_unref (task->sctp); + + if (task->notify) + task->notify (task->user_data); + g_free (task); +} + +static void +_sctp_enqueue_task (GstWebRTCSCTPTransport * sctp, SCTPTask func, + gpointer user_data, GDestroyNotify notify) +{ + struct task *task = g_new0 (struct task, 1); + + task->sctp = gst_object_ref (sctp); + task->func = func; + task->user_data = user_data; + task->notify = notify; + + gst_webrtc_bin_enqueue_task (sctp->webrtcbin, + (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task); +} + +static void +_emit_stream_reset (GstWebRTCSCTPTransport * sctp, gpointer user_data) +{ + guint stream_id = GPOINTER_TO_UINT (user_data); + + g_signal_emit (sctp, + gst_webrtc_sctp_transport_signals[ON_RESET_STREAM_SIGNAL], 0, stream_id); +} + +static void +_on_sctp_dec_pad_removed (GstElement * sctpdec, GstPad * pad, + GstWebRTCSCTPTransport * sctp) +{ + guint stream_id; + + if (sscanf (GST_PAD_NAME (pad), "src_%u", &stream_id) != 1) + return; + + _sctp_enqueue_task (sctp, (SCTPTask) _emit_stream_reset, + GUINT_TO_POINTER (stream_id), NULL); +} + +static void +_on_sctp_association_established (GstElement * sctpenc, gboolean established, + GstWebRTCSCTPTransport * sctp) +{ + GST_OBJECT_LOCK (sctp); + if (established) + sctp->state = GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED; + else + sctp->state = GST_WEBRTC_SCTP_TRANSPORT_STATE_CLOSED; + sctp->association_established = established; + GST_OBJECT_UNLOCK (sctp); + + g_object_notify (G_OBJECT (sctp), "state"); +} + +static void +gst_webrtc_sctp_transport_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ +// GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object); + + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_webrtc_sctp_transport_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object); + + switch (prop_id) { + case PROP_TRANSPORT: + g_value_set_object (value, sctp->transport); + break; + case PROP_STATE: + g_value_set_enum (value, sctp->state); + break; + case PROP_MAX_MESSAGE_SIZE: + g_value_set_uint64 (value, sctp->max_message_size); + break; + case PROP_MAX_CHANNELS: + g_value_set_uint (value, sctp->max_channels); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_webrtc_sctp_transport_finalize (GObject * object) +{ + GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object); + + g_signal_handlers_disconnect_by_data (sctp->sctpdec, sctp); + g_signal_handlers_disconnect_by_data (sctp->sctpenc, sctp); + + gst_object_unref (sctp->sctpdec); + gst_object_unref (sctp->sctpenc); + + g_clear_object (&sctp->transport); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_webrtc_sctp_transport_constructed (GObject * object) +{ + GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object); + guint association_id; + + association_id = g_random_int_range (0, G_MAXUINT16); + + sctp->sctpdec = + g_object_ref_sink (gst_element_factory_make ("sctpdec", NULL)); + g_object_set (sctp->sctpdec, "sctp-association-id", association_id, NULL); + sctp->sctpenc = + g_object_ref_sink (gst_element_factory_make ("sctpenc", NULL)); + g_object_set (sctp->sctpenc, "sctp-association-id", association_id, NULL); + + g_signal_connect (sctp->sctpdec, "pad-removed", + G_CALLBACK (_on_sctp_dec_pad_removed), sctp); + g_signal_connect (sctp->sctpenc, "sctp-association-established", + G_CALLBACK (_on_sctp_association_established), sctp); + + G_OBJECT_CLASS (parent_class)->constructed (object); +} + +static void +gst_webrtc_sctp_transport_class_init (GstWebRTCSCTPTransportClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + + gobject_class->constructed = gst_webrtc_sctp_transport_constructed; + gobject_class->get_property = gst_webrtc_sctp_transport_get_property; + gobject_class->set_property = gst_webrtc_sctp_transport_set_property; + gobject_class->finalize = gst_webrtc_sctp_transport_finalize; + + g_object_class_install_property (gobject_class, + PROP_TRANSPORT, + g_param_spec_object ("transport", + "WebRTC DTLS Transport", + "DTLS transport used for this SCTP transport", + GST_TYPE_WEBRTC_DTLS_TRANSPORT, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_STATE, + g_param_spec_enum ("state", + "WebRTC SCTP Transport state", "WebRTC SCTP Transport state", + GST_TYPE_WEBRTC_SCTP_TRANSPORT_STATE, + GST_WEBRTC_SCTP_TRANSPORT_STATE_NEW, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_MAX_MESSAGE_SIZE, + g_param_spec_uint64 ("max-message-size", + "Maximum message size", + "Maximum message size as reported by the transport", 0, G_MAXUINT64, + 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_MAX_CHANNELS, + g_param_spec_uint ("max-channels", + "Maximum number of channels", "Maximum number of channels", + 0, G_MAXUINT16, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + /** + * GstWebRTCSCTPTransport::reset-stream: + * @object: the #GstWebRTCSCTPTransport + * @stream_id: the SCTP stream that was reset + */ + gst_webrtc_sctp_transport_signals[ON_RESET_STREAM_SIGNAL] = + g_signal_new ("stream-reset", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, G_TYPE_UINT); +} + +static void +gst_webrtc_sctp_transport_init (GstWebRTCSCTPTransport * nice) +{ +} + +GstWebRTCSCTPTransport * +gst_webrtc_sctp_transport_new (void) +{ + return g_object_new (GST_TYPE_WEBRTC_SCTP_TRANSPORT, NULL); +} diff --git a/ext/webrtc/sctptransport.h b/ext/webrtc/sctptransport.h new file mode 100644 index 0000000000..d5327a77e0 --- /dev/null +++ b/ext/webrtc/sctptransport.h @@ -0,0 +1,65 @@ +/* GStreamer + * Copyright (C) 2018 Matthew Waters + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_WEBRTC_SCTP_TRANSPORT_H__ +#define __GST_WEBRTC_SCTP_TRANSPORT_H__ + +#include +/* libnice */ +#include +#include +#include "gstwebrtcice.h" + +G_BEGIN_DECLS + +GType gst_webrtc_sctp_transport_get_type(void); +#define GST_TYPE_WEBRTC_SCTP_TRANSPORT (gst_webrtc_sctp_transport_get_type()) +#define GST_WEBRTC_SCTP_TRANSPORT(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_WEBRTC_SCTP_TRANSPORT,GstWebRTCSCTPTransport)) +#define GST_IS_WEBRTC_SCTP_TRANSPORT(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_WEBRTC_SCTP_TRANSPORT)) +#define GST_WEBRTC_SCTP_TRANSPORT_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass) ,GST_TYPE_WEBRTC_SCTP_TRANSPORT,GstWebRTCSCTPTransportClass)) +#define GST_IS_WEBRTC_SCTP_TRANSPORT_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass) ,GST_TYPE_WEBRTC_SCTP_TRANSPORT)) +#define GST_WEBRTC_SCTP_TRANSPORT_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj) ,GST_TYPE_WEBRTC_SCTP_TRANSPORT,GstWebRTCSCTPTransportClass)) + +struct _GstWebRTCSCTPTransport +{ + GstObject parent; + + GstWebRTCDTLSTransport *transport; + GstWebRTCSCTPTransportState state; + guint64 max_message_size; + guint max_channels; + + gboolean association_established; + + GstElement *sctpdec; + GstElement *sctpenc; + + GstWebRTCBin *webrtcbin; +}; + +struct _GstWebRTCSCTPTransportClass +{ + GstObjectClass parent_class; +}; + +GstWebRTCSCTPTransport * gst_webrtc_sctp_transport_new (void); + +G_END_DECLS + +#endif /* __GST_WEBRTC_SCTP_TRANSPORT_H__ */ diff --git a/ext/webrtc/transportreceivebin.c b/ext/webrtc/transportreceivebin.c index 6730b1fb72..4038cda0d0 100644 --- a/ext/webrtc/transportreceivebin.c +++ b/ext/webrtc/transportreceivebin.c @@ -25,23 +25,24 @@ #include "utils.h" /* - * ,----------------------------transport_receive_%u-----------------------------, - * ; (rtp) ; - * ; ,---nicesrc----, ,-capsfilter-, ,----dtlssrtpdec----, ,--funnel--, ; - * ; ; src o--o sink src o--o sink rtp_src o------o sink_0 ; ; - * ; '--------------' '------------' ; ; ; src o--o rtp_src - * ; ; rtcp_src o-, ,--o sink_1 ; ; - * ; '-------------------' ; ; '----------' ; - * ; ; ; ,--funnel--, ; - * ; '-+--o sink_0 ; ; - * ; ,-' ; src o--o rtcp_src - * ; (rtcp) ; ,-o sink_1 ; ; - * ; ,---nicesrc----, ,-capsfilter-, ,----dtlssrtpdec----, ; ; '----------' ; - * ; ; src o--o sink src o--o sink rtp_src o-' ; ; - * ; '--------------' '------------' ; ; ; ; - * ; ; rtcp_src o----' ; - * ; '-------------------' ; - * '-----------------------------------------------------------------------------' + * ,----------------------------transport_receive_%u----------------------------, + * ; (rtp/data) ; + * ; ,---nicesrc----, ,-capsfilter-, ,---dtlssrtpdec---, ,--funnel--, ; + * ; ; src o--o sink src o--o sink rtp_src o-------o sink_0 ; ; + * ; '--------------' '------------' ; ; ; src o--o rtp_src + * ; ; rtcp_src o---, ,-o sink_1 ; ; + * ; ; ; ; ; '----------' ; + * ; ; data_src o-, ; ; ,--funnel--, ; + * ; '-----------------' ; '-+-o sink_0 ; ; + * ; ,---dtlssrtpdec---, ; ,-' ; src o--o rtcp_src + * ; (rtcp) ; rtp_src o-+-' ,-o sink_1 ; ; + * ; ,---nicesrc----, ,-capsfilter-, ; ; ; ; '----------' ; + * ; ; src o--o sink src o--o sink rtcp_src o-+---' ,--funnel--, ; + * ; '--------------' '------------' ; ; '-----o sink_0 ; ; + * ; ; data_src o-, ; src o--o data_src + * ; '-----------------' '-----o sink_1 ; ; + * ; '----------' ; + * '----------------------------------------------------------------------------' * * Do we really wnat to be *that* permissive in what we accept? * @@ -70,6 +71,12 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src", GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-rtp")); +static GstStaticPadTemplate data_sink_template = +GST_STATIC_PAD_TEMPLATE ("data_src", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + enum { PROP_0, @@ -336,6 +343,21 @@ transport_receive_bin_constructed (GObject * object) gst_element_add_pad (GST_ELEMENT (receive), ghost); gst_object_unref (pad); + /* create funnel for data_src */ + funnel = gst_element_factory_make ("funnel", NULL); + gst_bin_add (GST_BIN (receive), funnel); + if (!gst_element_link_pads (receive->stream->transport->dtlssrtpdec, + "data_src", funnel, "sink_0")) + g_warn_if_reached (); + if (!gst_element_link_pads (receive->stream->rtcp_transport->dtlssrtpdec, + "data_src", funnel, "sink_1")) + g_warn_if_reached (); + + pad = gst_element_get_static_pad (funnel, "src"); + ghost = gst_ghost_pad_new ("data_src", pad); + gst_element_add_pad (GST_ELEMENT (receive), ghost); + gst_object_unref (pad); + G_OBJECT_CLASS (parent_class)->constructed (object); } @@ -350,6 +372,8 @@ transport_receive_bin_class_init (TransportReceiveBinClass * klass) gst_element_class_add_static_pad_template (element_class, &rtp_sink_template); gst_element_class_add_static_pad_template (element_class, &rtcp_sink_template); + gst_element_class_add_static_pad_template (element_class, + &data_sink_template); gst_element_class_set_metadata (element_class, "WebRTC Transport Receive Bin", "Filter/Network/WebRTC", "A bin for webrtc connections", diff --git a/ext/webrtc/transportsendbin.c b/ext/webrtc/transportsendbin.c index be1b8aa374..36522d3361 100644 --- a/ext/webrtc/transportsendbin.c +++ b/ext/webrtc/transportsendbin.c @@ -27,9 +27,11 @@ /* * ,------------------------transport_send_%u-------------------------, * ; ,-----dtlssrtpenc---, ; - * rtp_sink o--------------------------o rtp_sink_0 ; ,---nicesink---, ; - * ; ; src o--o sink ; ; - * ; ,--outputselector--, ,-o rtcp_sink_0 ; '--------------' ; + * data_sink o--------------------------o data_sink ; ; + * ; ; ; ,---nicesink---, ; + * rtp_sink o--------------------------o rtp_sink_0 src o--o sink ; ; + * ; ; ; '--------------' ; + * ; ,--outputselector--, ,-o rtcp_sink_0 ; ; * ; ; src_0 o-' '-------------------' ; * rtcp_sink ;---o sink ; ,----dtlssrtpenc----, ,---nicesink---, ; * ; ; src_1 o---o rtcp_sink_0 src o--o sink ; ; @@ -61,6 +63,12 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_sink", GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-rtp")); +static GstStaticPadTemplate data_sink_template = +GST_STATIC_PAD_TEMPLATE ("data_sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + enum { PROP_0, @@ -422,6 +430,16 @@ transport_send_bin_constructed (GObject * object) gst_element_add_pad (GST_ELEMENT (send), ghost); gst_object_unref (pad); + /* push the data stream onto the RTP dtls element */ + templ = _find_pad_template (transport->dtlssrtpenc, + GST_PAD_SINK, GST_PAD_REQUEST, "data_sink"); + pad = gst_element_request_pad (transport->dtlssrtpenc, templ, "data_sink", + NULL); + + ghost = gst_ghost_pad_new ("data_sink", pad); + gst_element_add_pad (GST_ELEMENT (send), ghost); + gst_object_unref (pad); + /* RTCP */ transport = send->stream->rtcp_transport; /* Do the common init for the context struct */ @@ -509,6 +527,8 @@ transport_send_bin_class_init (TransportSendBinClass * klass) gst_element_class_add_static_pad_template (element_class, &rtp_sink_template); gst_element_class_add_static_pad_template (element_class, &rtcp_sink_template); + gst_element_class_add_static_pad_template (element_class, + &data_sink_template); gst_element_class_set_metadata (element_class, "WebRTC Transport Send Bin", "Filter/Network/WebRTC", "A bin for webrtc connections", diff --git a/ext/webrtc/webrtcdatachannel.c b/ext/webrtc/webrtcdatachannel.c new file mode 100644 index 0000000000..2f46ee43e5 --- /dev/null +++ b/ext/webrtc/webrtcdatachannel.c @@ -0,0 +1,1296 @@ +/* GStreamer + * Copyright (C) 2018 Matthew Waters + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:gstwebrtc-datachannel + * @short_description: RTCDataChannel object + * @title: GstWebRTCDataChannel + * @see_also: #GstWebRTCRTPTransceiver + * + * http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "webrtcdatachannel.h" +#include +#include +#include +#include +#include +#include + +#include "gstwebrtcbin.h" + +#define GST_CAT_DEFAULT gst_webrtc_data_channel_debug +GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT); + +#define gst_webrtc_data_channel_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE (GstWebRTCDataChannel, gst_webrtc_data_channel, + GST_TYPE_OBJECT, GST_DEBUG_CATEGORY_INIT (gst_webrtc_data_channel_debug, + "webrtcdatachannel", 0, "webrtcdatachannel"); + ); + +enum +{ + SIGNAL_0, + SIGNAL_ON_OPEN, + SIGNAL_ON_CLOSE, + SIGNAL_ON_ERROR, + SIGNAL_ON_MESSAGE_DATA, + SIGNAL_ON_MESSAGE_STRING, + SIGNAL_ON_BUFFERED_AMOUNT_LOW, + SIGNAL_SEND_DATA, + SIGNAL_SEND_STRING, + SIGNAL_CLOSE, + LAST_SIGNAL, +}; + +enum +{ + PROP_0, + PROP_LABEL, + PROP_ORDERED, + PROP_MAX_PACKET_LIFETIME, + PROP_MAX_RETRANSMITS, + PROP_PROTOCOL, + PROP_NEGOTIATED, + PROP_ID, + PROP_PRIORITY, + PROP_READY_STATE, + PROP_BUFFERED_AMOUNT, + PROP_BUFFERED_AMOUNT_LOW_THRESHOLD, +}; + +static guint gst_webrtc_data_channel_signals[LAST_SIGNAL] = { 0 }; + +typedef enum +{ + DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50, + DATA_CHANNEL_PPID_WEBRTC_STRING = 51, + DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL = 52, /* deprecated */ + DATA_CHANNEL_PPID_WEBRTC_BINARY = 53, + DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL = 54, /* deprecated */ + DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY = 56, + DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY = 57, +} DataChannelPPID; + +typedef enum +{ + CHANNEL_TYPE_RELIABLE = 0x00, + CHANNEL_TYPE_RELIABLE_UNORDERED = 0x80, + CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT = 0x01, + CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT_UNORDERED = 0x81, + CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED = 0x02, + CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED_UNORDERED = 0x82, +} DataChannelReliabilityType; + +typedef enum +{ + CHANNEL_MESSAGE_ACK = 0x02, + CHANNEL_MESSAGE_OPEN = 0x03, +} DataChannelMessage; + +static guint16 +priority_type_to_uint (GstWebRTCPriorityType pri) +{ + switch (pri) { + case GST_WEBRTC_PRIORITY_TYPE_VERY_LOW: + return 64; + case GST_WEBRTC_PRIORITY_TYPE_LOW: + return 192; + case GST_WEBRTC_PRIORITY_TYPE_MEDIUM: + return 384; + case GST_WEBRTC_PRIORITY_TYPE_HIGH: + return 768; + } + g_assert_not_reached (); + return 0; +} + +static GstWebRTCPriorityType +priority_uint_to_type (guint16 val) +{ + if (val <= 128) + return GST_WEBRTC_PRIORITY_TYPE_VERY_LOW; + if (val <= 256) + return GST_WEBRTC_PRIORITY_TYPE_LOW; + if (val <= 512) + return GST_WEBRTC_PRIORITY_TYPE_MEDIUM; + return GST_WEBRTC_PRIORITY_TYPE_HIGH; +} + +static GstBuffer * +construct_open_packet (GstWebRTCDataChannel * channel) +{ + GstByteWriter w; + gsize label_len = strlen (channel->label); + gsize proto_len = strlen (channel->protocol); + gsize size = 12 + label_len + proto_len; + DataChannelReliabilityType reliability = 0; + guint32 reliability_param = 0; + guint16 priority; + GstBuffer *buf; + +/* + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | Message Type | Channel Type | Priority | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | Reliability Parameter | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | Label Length | Protocol Length | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * \ / + * | Label | + * / \ + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * \ / + * | Protocol | + * / \ + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + + gst_byte_writer_init_with_size (&w, size, FALSE); + + if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_OPEN)) + g_return_val_if_reached (NULL); + + if (!channel->ordered) + reliability |= 0x80; + if (channel->max_retransmits != -1) { + reliability |= 0x01; + reliability_param = channel->max_retransmits; + } + if (channel->max_packet_lifetime != -1) { + reliability |= 0x02; + reliability_param = channel->max_packet_lifetime; + } + + priority = priority_type_to_uint (channel->priority); + + if (!gst_byte_writer_put_uint8 (&w, (guint8) reliability)) + g_return_val_if_reached (NULL); + if (!gst_byte_writer_put_uint16_be (&w, (guint16) priority)) + g_return_val_if_reached (NULL); + if (!gst_byte_writer_put_uint32_be (&w, (guint32) reliability_param)) + g_return_val_if_reached (NULL); + if (!gst_byte_writer_put_uint16_be (&w, (guint16) label_len)) + g_return_val_if_reached (NULL); + if (!gst_byte_writer_put_uint16_be (&w, (guint16) proto_len)) + g_return_val_if_reached (NULL); + if (!gst_byte_writer_put_data (&w, (guint8 *) channel->label, label_len)) + g_return_val_if_reached (NULL); + if (!gst_byte_writer_put_data (&w, (guint8 *) channel->protocol, proto_len)) + g_return_val_if_reached (NULL); + + buf = gst_byte_writer_reset_and_get_buffer (&w); + + /* send reliable and ordered */ + gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE, + GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0); + + return buf; +} + +static GstBuffer * +construct_ack_packet (GstWebRTCDataChannel * channel) +{ + GstByteWriter w; + GstBuffer *buf; + +/* + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | Message Type | + * +-+-+-+-+-+-+-+-+ + */ + + gst_byte_writer_init_with_size (&w, 1, FALSE); + + if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_ACK)) + g_return_val_if_reached (NULL); + + buf = gst_byte_writer_reset_and_get_buffer (&w); + + /* send reliable and ordered */ + gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE, + GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0); + + return buf; +} + +typedef void (*ChannelTask) (GstWebRTCDataChannel * channel, + gpointer user_data); + +struct task +{ + GstWebRTCDataChannel *channel; + ChannelTask func; + gpointer user_data; + GDestroyNotify notify; +}; + +static void +_execute_task (GstWebRTCBin * webrtc, struct task *task) +{ + if (task->func) + task->func (task->channel, task->user_data); +} + +static void +_free_task (struct task *task) +{ + gst_object_unref (task->channel); + + if (task->notify) + task->notify (task->user_data); + g_free (task); +} + +static void +_channel_enqueue_task (GstWebRTCDataChannel * channel, ChannelTask func, + gpointer user_data, GDestroyNotify notify) +{ + struct task *task = g_new0 (struct task, 1); + + task->channel = gst_object_ref (channel); + task->func = func; + task->user_data = user_data; + task->notify = notify; + + gst_webrtc_bin_enqueue_task (channel->webrtcbin, + (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task); +} + +static void +_channel_store_error (GstWebRTCDataChannel * channel, GError * error) +{ + GST_OBJECT_LOCK (channel); + if (error) { + GST_WARNING_OBJECT (channel, "Error: %s", + error ? error->message : "Unknown"); + if (!channel->stored_error) + channel->stored_error = error; + else + g_clear_error (&error); + } + GST_OBJECT_UNLOCK (channel); +} + +static void +_maybe_emit_on_error (GstWebRTCDataChannel * channel, GError * error) +{ + if (error) { + GST_WARNING_OBJECT (channel, "error thrown"); + g_signal_emit (channel, gst_webrtc_data_channel_signals[SIGNAL_ON_ERROR], 0, + error); + } +} + +static void +_emit_on_open (GstWebRTCDataChannel * channel, gpointer user_data) +{ + GST_OBJECT_LOCK (channel); + if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING || + channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED) { + GST_OBJECT_UNLOCK (channel); + return; + } + + if (channel->ready_state != GST_WEBRTC_DATA_CHANNEL_STATE_OPEN) { + channel->ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_OPEN; + GST_OBJECT_UNLOCK (channel); + g_object_notify (G_OBJECT (channel), "ready-state"); + + GST_INFO_OBJECT (channel, "We are open and ready for data!"); + g_signal_emit (channel, gst_webrtc_data_channel_signals[SIGNAL_ON_OPEN], 0, + NULL); + } else { + GST_OBJECT_UNLOCK (channel); + } +} + +static void +_transport_closed_unlocked (GstWebRTCDataChannel * channel) +{ + GError *error; + + if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED) + return; + + channel->ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED; + + error = channel->stored_error; + channel->stored_error = NULL; + GST_OBJECT_UNLOCK (channel); + + g_object_notify (G_OBJECT (channel), "ready-state"); + GST_INFO_OBJECT (channel, "We are closed for data"); + + _maybe_emit_on_error (channel, error); + + g_signal_emit (channel, gst_webrtc_data_channel_signals[SIGNAL_ON_CLOSE], 0, + NULL); + GST_OBJECT_LOCK (channel); +} + +static void +_transport_closed (GstWebRTCDataChannel * channel, gpointer user_data) +{ + GST_OBJECT_LOCK (channel); + _transport_closed_unlocked (channel); + GST_OBJECT_UNLOCK (channel); +} + +static void +_close_sctp_stream (GstWebRTCDataChannel * channel, gpointer user_data) +{ + GstPad *pad, *peer; + + pad = gst_element_get_static_pad (channel->appsrc, "src"); + peer = gst_pad_get_peer (pad); + gst_object_unref (pad); + + if (peer) { + GstElement *sctpenc = gst_pad_get_parent_element (peer); + + if (sctpenc) { + gst_element_release_request_pad (sctpenc, peer); + gst_object_unref (sctpenc); + } + gst_object_unref (peer); + } + + _transport_closed (channel, NULL); +} + +static void +_close_procedure (GstWebRTCDataChannel * channel, gpointer user_data) +{ + /* https://www.w3.org/TR/webrtc/#data-transport-closing-procedure */ + GST_OBJECT_LOCK (channel); + if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED + || channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING) { + GST_OBJECT_UNLOCK (channel); + return; + } + channel->ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING; + GST_OBJECT_UNLOCK (channel); + g_object_notify (G_OBJECT (channel), "ready-state"); + + GST_OBJECT_LOCK (channel); + if (channel->buffered_amount <= 0) { + _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, + NULL, NULL); + } + + GST_OBJECT_UNLOCK (channel); +} + +static void +_on_sctp_reset_stream (GstWebRTCSCTPTransport * sctp, guint stream_id, + GstWebRTCDataChannel * channel) +{ + if (channel->id == stream_id) + _channel_enqueue_task (channel, (ChannelTask) _transport_closed, + GUINT_TO_POINTER (stream_id), NULL); +} + +static void +gst_webrtc_data_channel_close (GstWebRTCDataChannel * channel) +{ + _close_procedure (channel, NULL); +} + +static GstFlowReturn +_parse_control_packet (GstWebRTCDataChannel * channel, guint8 * data, + gsize size, GError ** error) +{ + GstByteReader r; + guint8 message_type; + + if (!data) + g_return_val_if_reached (GST_FLOW_ERROR); + if (size < 1) + g_return_val_if_reached (GST_FLOW_ERROR); + + gst_byte_reader_init (&r, data, size); + + if (!gst_byte_reader_get_uint8 (&r, &message_type)) + g_return_val_if_reached (GST_FLOW_ERROR); + + if (message_type == CHANNEL_MESSAGE_ACK) { + /* all good */ + GST_INFO_OBJECT (channel, "Received channel ack"); + return GST_FLOW_OK; + } else if (message_type == CHANNEL_MESSAGE_OPEN) { + guint8 reliability; + guint32 reliability_param; + guint16 priority, label_len, proto_len; + const guint8 *src; + gchar *label, *proto; + GstBuffer *buffer; + GstFlowReturn ret; + + GST_INFO_OBJECT (channel, "Received channel open"); + + if (channel->negotiated) { + g_set_error (error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, + "Data channel was signalled as negotiated already"); + g_return_val_if_reached (GST_FLOW_ERROR); + } + + if (channel->opened) + return GST_FLOW_OK; + + if (!gst_byte_reader_get_uint8 (&r, &reliability)) + goto parse_error; + if (!gst_byte_reader_get_uint16_be (&r, &priority)) + goto parse_error; + if (!gst_byte_reader_get_uint32_be (&r, &reliability_param)) + goto parse_error; + if (!gst_byte_reader_get_uint16_be (&r, &label_len)) + goto parse_error; + if (!gst_byte_reader_get_uint16_be (&r, &proto_len)) + goto parse_error; + + label = g_new0 (gchar, (gsize) label_len + 1); + proto = g_new0 (gchar, (gsize) proto_len + 1); + + if (!gst_byte_reader_get_data (&r, label_len, &src)) + goto parse_error; + memcpy (label, src, label_len); + label[label_len] = '\0'; + if (!gst_byte_reader_get_data (&r, proto_len, &src)) + goto parse_error; + memcpy (proto, src, proto_len); + proto[proto_len] = '\0'; + + channel->label = label; + channel->protocol = proto; + channel->priority = priority_uint_to_type (priority); + channel->ordered = !(reliability & 0x80); + if (reliability & 0x01) { + channel->max_retransmits = reliability_param; + channel->max_packet_lifetime = -1; + } else if (reliability & 0x02) { + channel->max_retransmits = -1; + channel->max_packet_lifetime = reliability_param; + } else { + channel->max_retransmits = -1; + channel->max_packet_lifetime = -1; + } + channel->opened = TRUE; + + GST_INFO_OBJECT (channel, "Received channel open for SCTP stream %i " + "label %s protocol %s ordered %s", channel->id, channel->label, + channel->protocol, channel->ordered ? "true" : "false"); + + _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL); + + GST_INFO_OBJECT (channel, "Sending channel ack"); + buffer = construct_ack_packet (channel); + + GST_OBJECT_LOCK (channel); + channel->buffered_amount += gst_buffer_get_size (buffer); + GST_OBJECT_UNLOCK (channel); + + ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer); + if (ret != GST_FLOW_OK) { + g_set_error (error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, + "Could not send ack packet"); + } + return ret; + } else { + g_set_error (error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, + "Unknown message type in control protocol"); + return GST_FLOW_ERROR; + } + +parse_error: + { + g_set_error (error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet"); + g_return_val_if_reached (GST_FLOW_ERROR); + } +} + +static void +on_sink_eos (GstAppSink * sink, gpointer user_data) +{ +} + +struct map_info +{ + GstBuffer *buffer; + GstMapInfo map_info; +}; + +static void +buffer_unmap_and_unref (struct map_info *info) +{ + gst_buffer_unmap (info->buffer, &info->map_info); + gst_buffer_unref (info->buffer); + g_free (info); +} + +static void +_emit_have_data (GstWebRTCDataChannel * channel, GBytes * data) +{ + GST_LOG_OBJECT (channel, "Have data %p", data); + g_signal_emit (channel, + gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_DATA], 0, data); +} + +static void +_emit_have_string (GstWebRTCDataChannel * channel, gchar * str) +{ + GST_LOG_OBJECT (channel, "Have string %p", str); + g_signal_emit (channel, + gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_STRING], 0, str); +} + +static GstFlowReturn +_data_channel_have_sample (GstWebRTCDataChannel * channel, GstSample * sample, + GError ** error) +{ + GstSctpReceiveMeta *receive; + GstBuffer *buffer; + GstFlowReturn ret = GST_FLOW_OK; + + GST_LOG_OBJECT (channel, "Received sample %" GST_PTR_FORMAT, sample); + + g_return_val_if_fail (channel->sctp_transport != NULL, GST_FLOW_ERROR); + + buffer = gst_sample_get_buffer (sample); + if (!buffer) { + g_set_error (error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle"); + return GST_FLOW_ERROR; + } + receive = gst_sctp_buffer_get_receive_meta (buffer); + if (!receive) { + g_set_error (error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, + "No SCTP Receive meta on the buffer"); + return GST_FLOW_ERROR; + } + + switch (receive->ppid) { + case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{ + GstMapInfo info = GST_MAP_INFO_INIT; + if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) { + g_set_error (error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, + "Failed to map received buffer"); + ret = GST_FLOW_ERROR; + } else { + ret = _parse_control_packet (channel, info.data, info.size, error); + } + break; + } + case DATA_CHANNEL_PPID_WEBRTC_STRING: + case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{ + GstMapInfo info = GST_MAP_INFO_INIT; + if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) { + g_set_error (error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, + "Failed to map received buffer"); + ret = GST_FLOW_ERROR; + } else { + gchar *str = g_strndup ((gchar *) info.data, info.size); + _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, str, + g_free); + } + break; + } + case DATA_CHANNEL_PPID_WEBRTC_BINARY: + case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{ + struct map_info *info = g_new0 (struct map_info, 1); + if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) { + g_set_error (error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, + "Failed to map received buffer"); + ret = GST_FLOW_ERROR; + } else { + GBytes *data = g_bytes_new_with_free_func (info->map_info.data, + info->map_info.size, (GDestroyNotify) buffer_unmap_and_unref, info); + info->buffer = gst_buffer_ref (buffer); + _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, data, + (GDestroyNotify) g_bytes_unref); + } + break; + } + case DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY: + _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, NULL, + NULL); + break; + case DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY: + _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, NULL, + NULL); + break; + default: + g_set_error (error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, + "Unknown SCTP PPID %u received", receive->ppid); + ret = GST_FLOW_ERROR; + break; + } + + return ret; +} + +static GstFlowReturn +on_sink_preroll (GstAppSink * sink, gpointer user_data) +{ + GstWebRTCDataChannel *channel = user_data; + GstSample *sample = gst_app_sink_pull_preroll (sink); + GstFlowReturn ret; + + if (sample) { + /* This sample also seems to be provided by the sample callback + ret = _data_channel_have_sample (channel, sample); */ + ret = GST_FLOW_OK; + gst_sample_unref (sample); + } else if (gst_app_sink_is_eos (sink)) { + ret = GST_FLOW_EOS; + } else { + ret = GST_FLOW_ERROR; + } + + if (ret != GST_FLOW_OK) { + _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL); + } + + return ret; +} + +static GstFlowReturn +on_sink_sample (GstAppSink * sink, gpointer user_data) +{ + GstWebRTCDataChannel *channel = user_data; + GstSample *sample = gst_app_sink_pull_sample (sink); + GstFlowReturn ret; + GError *error = NULL; + + if (sample) { + ret = _data_channel_have_sample (channel, sample, &error); + gst_sample_unref (sample); + } else if (gst_app_sink_is_eos (sink)) { + ret = GST_FLOW_EOS; + } else { + ret = GST_FLOW_ERROR; + } + + if (error) + _channel_store_error (channel, error); + + if (ret != GST_FLOW_OK) { + _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL); + } + + return ret; +} + +static GstAppSinkCallbacks sink_callbacks = { + on_sink_eos, + on_sink_preroll, + on_sink_sample, +}; + +void +gst_webrtc_data_channel_start_negotiation (GstWebRTCDataChannel * channel) +{ + GstBuffer *buffer; + + g_return_if_fail (!channel->negotiated); + g_return_if_fail (channel->id != -1); + g_return_if_fail (channel->sctp_transport != NULL); + + buffer = construct_open_packet (channel); + + GST_INFO_OBJECT (channel, "Sending channel open for SCTP stream %i " + "label %s protocol %s ordered %s", channel->id, channel->label, + channel->protocol, channel->ordered ? "true" : "false"); + + GST_OBJECT_LOCK (channel); + channel->buffered_amount += gst_buffer_get_size (buffer); + GST_OBJECT_UNLOCK (channel); + + if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), + buffer) == GST_FLOW_OK) { + channel->opened = TRUE; + _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL); + } else { + GError *error = NULL; + g_set_error (&error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, + "Failed to send DCEP open packet"); + _channel_store_error (channel, error); + _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL); + } +} + +static void +_get_sctp_reliability (GstWebRTCDataChannel * channel, + GstSctpSendMetaPartiallyReliability * reliability, guint * rel_param) +{ + if (channel->max_retransmits != -1) { + *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX; + *rel_param = channel->max_retransmits; + } else if (channel->max_packet_lifetime != -1) { + *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL; + *rel_param = channel->max_packet_lifetime; + } else { + *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE; + *rel_param = 0; + } +} + +static gboolean +_is_within_max_message_size (GstWebRTCDataChannel * channel, gsize size) +{ + return size <= channel->sctp_transport->max_message_size; +} + +static void +gst_webrtc_data_channel_send_data (GstWebRTCDataChannel * channel, + GBytes * bytes) +{ + GstSctpSendMetaPartiallyReliability reliability; + guint rel_param; + guint32 ppid; + GstBuffer *buffer; + GstFlowReturn ret; + + if (!bytes) { + buffer = gst_buffer_new (); + ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY; + } else { + gsize size; + guint8 *data; + + data = (guint8 *) g_bytes_get_data (bytes, &size); + g_return_if_fail (data != NULL); + if (!_is_within_max_message_size (channel, size)) { + GError *error = NULL; + g_set_error (&error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, + "Requested to send data that is too large"); + _channel_store_error (channel, error); + _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, + NULL); + return; + } + + buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, size, + 0, size, bytes, (GDestroyNotify) g_bytes_unref); + ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY; + } + + _get_sctp_reliability (channel, &reliability, &rel_param); + gst_sctp_buffer_add_send_meta (buffer, ppid, channel->ordered, reliability, + rel_param); + + GST_LOG_OBJECT (channel, "Sending data using buffer %" GST_PTR_FORMAT, + buffer); + + GST_OBJECT_LOCK (channel); + channel->buffered_amount += gst_buffer_get_size (buffer); + GST_OBJECT_UNLOCK (channel); + + ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer); + + if (ret != GST_FLOW_OK) { + GError *error = NULL; + g_set_error (&error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data"); + _channel_store_error (channel, error); + _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL); + } +} + +static void +gst_webrtc_data_channel_send_string (GstWebRTCDataChannel * channel, + gchar * str) +{ + GstSctpSendMetaPartiallyReliability reliability; + guint rel_param; + guint32 ppid; + GstBuffer *buffer; + GstFlowReturn ret; + + g_return_if_fail (!channel->negotiated && channel->opened); + g_return_if_fail (channel->sctp_transport != NULL); + + if (!str) { + buffer = gst_buffer_new (); + ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY; + } else { + gsize size = strlen (str); + gchar *str_copy = g_strdup (str); + + if (!_is_within_max_message_size (channel, size)) { + GError *error = NULL; + g_set_error (&error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, + "Requested to send a string that is too large"); + _channel_store_error (channel, error); + _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, + NULL); + return; + } + + buffer = + gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy, + size, 0, size, str_copy, g_free); + ppid = DATA_CHANNEL_PPID_WEBRTC_STRING; + } + + _get_sctp_reliability (channel, &reliability, &rel_param); + gst_sctp_buffer_add_send_meta (buffer, ppid, channel->ordered, reliability, + rel_param); + + GST_TRACE_OBJECT (channel, "Sending string using buffer %" GST_PTR_FORMAT, + buffer); + + GST_OBJECT_LOCK (channel); + channel->buffered_amount += gst_buffer_get_size (buffer); + GST_OBJECT_UNLOCK (channel); + + ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer); + + if (ret != GST_FLOW_OK) { + GError *error = NULL; + g_set_error (&error, GST_WEBRTC_BIN_ERROR, + GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string"); + _channel_store_error (channel, error); + _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL); + } +} + +void +gst_webrtc_data_channel_set_sctp_transport (GstWebRTCDataChannel * channel, + GstWebRTCSCTPTransport * sctp) +{ + g_return_if_fail (GST_IS_WEBRTC_DATA_CHANNEL (channel)); + g_return_if_fail (GST_IS_WEBRTC_SCTP_TRANSPORT (sctp)); + + GST_OBJECT_LOCK (channel); + if (channel->sctp_transport) + g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel); + + gst_object_replace ((GstObject **) & channel->sctp_transport, + GST_OBJECT (sctp)); + + if (sctp) + g_signal_connect (sctp, "stream-reset", G_CALLBACK (_on_sctp_reset_stream), + channel); + GST_OBJECT_UNLOCK (channel); +} + +static void +gst_webrtc_data_channel_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object); + + GST_OBJECT_LOCK (channel); + switch (prop_id) { + case PROP_LABEL: + channel->label = g_value_dup_string (value); + break; + case PROP_ORDERED: + channel->ordered = g_value_get_boolean (value); + break; + case PROP_MAX_PACKET_LIFETIME: + channel->max_packet_lifetime = g_value_get_int (value); + break; + case PROP_MAX_RETRANSMITS: + channel->max_retransmits = g_value_get_int (value); + break; + case PROP_PROTOCOL: + channel->protocol = g_value_dup_string (value); + break; + case PROP_NEGOTIATED: + channel->negotiated = g_value_get_boolean (value); + break; + case PROP_ID: + channel->id = g_value_get_int (value); + break; + case PROP_PRIORITY: + channel->priority = g_value_get_enum (value); + break; + case PROP_BUFFERED_AMOUNT_LOW_THRESHOLD: + channel->buffered_amount_low_threshold = g_value_get_uint64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + GST_OBJECT_UNLOCK (channel); +} + +static void +gst_webrtc_data_channel_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object); + + GST_OBJECT_LOCK (channel); + switch (prop_id) { + case PROP_LABEL: + g_value_set_string (value, channel->label); + break; + case PROP_ORDERED: + g_value_set_boolean (value, channel->ordered); + break; + case PROP_MAX_PACKET_LIFETIME: + g_value_set_int (value, channel->max_packet_lifetime); + break; + case PROP_MAX_RETRANSMITS: + g_value_set_int (value, channel->max_retransmits); + break; + case PROP_PROTOCOL: + g_value_set_string (value, channel->protocol); + break; + case PROP_NEGOTIATED: + g_value_set_boolean (value, channel->negotiated); + break; + case PROP_ID: + g_value_set_int (value, channel->id); + break; + case PROP_PRIORITY: + g_value_set_enum (value, channel->priority); + break; + case PROP_READY_STATE: + g_value_set_enum (value, channel->ready_state); + break; + case PROP_BUFFERED_AMOUNT: + g_value_set_uint64 (value, channel->buffered_amount); + break; + case PROP_BUFFERED_AMOUNT_LOW_THRESHOLD: + g_value_set_uint64 (value, channel->buffered_amount_low_threshold); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + GST_OBJECT_UNLOCK (channel); +} + +static void +_emit_low_threshold (GstWebRTCDataChannel * channel, gpointer user_data) +{ + GST_LOG_OBJECT (channel, "Low threshold reached"); + g_signal_emit (channel, + gst_webrtc_data_channel_signals[SIGNAL_ON_BUFFERED_AMOUNT_LOW], 0); +} + +static GstPadProbeReturn +on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +{ + GstWebRTCDataChannel *channel = user_data; + guint64 prev_amount; + guint64 size = 0; + + if (GST_PAD_PROBE_INFO_TYPE (info) & (GST_PAD_PROBE_TYPE_BUFFER)) { + GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info); + size = gst_buffer_get_size (buffer); + } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) { + GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info); + size = gst_buffer_list_calculate_size (list); + } + + if (size > 0) { + GST_OBJECT_LOCK (channel); + prev_amount = channel->buffered_amount; + channel->buffered_amount -= size; + if (prev_amount > channel->buffered_amount_low_threshold && + channel->buffered_amount < channel->buffered_amount_low_threshold) { + _channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold, + NULL, NULL); + } + + if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING + && channel->buffered_amount <= 0) { + _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL, + NULL); + } + GST_OBJECT_UNLOCK (channel); + } + + return GST_PAD_PROBE_OK; +} + +static void +gst_webrtc_data_channel_constructed (GObject * object) +{ + GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object); + GstPad *pad; + GstCaps *caps; + + caps = gst_caps_new_any (); + + channel->appsrc = gst_element_factory_make ("appsrc", NULL); + gst_object_ref_sink (channel->appsrc); + pad = gst_element_get_static_pad (channel->appsrc, "src"); + + channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH, + (GstPadProbeCallback) on_appsrc_data, channel, NULL); + + channel->appsink = gst_element_factory_make ("appsink", NULL); + gst_object_ref_sink (channel->appsink); + g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps, + NULL); + gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks, + channel, NULL); + + gst_object_unref (pad); + gst_caps_unref (caps); +} + +static void +gst_webrtc_data_channel_finalize (GObject * object) +{ + GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object); + + if (channel->src_probe) { + GstPad *pad = gst_element_get_static_pad (channel->appsrc, "src"); + gst_pad_remove_probe (pad, channel->src_probe); + gst_object_unref (pad); + channel->src_probe = 0; + } + + g_free (channel->label); + channel->label = NULL; + + g_free (channel->protocol); + channel->protocol = NULL; + + if (channel->sctp_transport) + g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel); + g_clear_object (&channel->sctp_transport); + + g_clear_object (&channel->appsrc); + g_clear_object (&channel->appsink); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_webrtc_data_channel_class_init (GstWebRTCDataChannelClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + + gobject_class->constructed = gst_webrtc_data_channel_constructed; + gobject_class->get_property = gst_webrtc_data_channel_get_property; + gobject_class->set_property = gst_webrtc_data_channel_set_property; + gobject_class->finalize = gst_webrtc_data_channel_finalize; + + g_object_class_install_property (gobject_class, + PROP_LABEL, + g_param_spec_string ("label", + "Label", "Data channel label", + NULL, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_ORDERED, + g_param_spec_boolean ("ordered", + "Ordered", "Using ordered transmission mode", + FALSE, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_MAX_PACKET_LIFETIME, + g_param_spec_int ("max-packet-lifetime", + "Maximum Packet Lifetime", + "Maximum number of milliseconds that transmissions and " + "retransmissions may occur in unreliable mode (-1 = unset)", + -1, G_MAXUINT16, -1, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_MAX_RETRANSMITS, + g_param_spec_int ("max-retransmits", + "Maximum Retransmits", + "Maximum number of retransmissions attempted in unreliable mode", + -1, G_MAXUINT16, 0, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_PROTOCOL, + g_param_spec_string ("protocol", + "Protocol", "Data channel protocol", + "", + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_NEGOTIATED, + g_param_spec_boolean ("negotiated", + "Negotiated", + "Whether this data channel was negotiated by the application", FALSE, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_ID, + g_param_spec_int ("id", + "ID", + "ID negotiated by this data channel (-1 = unset)", + -1, G_MAXUINT16, -1, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_PRIORITY, + g_param_spec_enum ("priority", + "Priority", + "The priority of data sent using this data channel", + GST_TYPE_WEBRTC_PRIORITY_TYPE, + GST_WEBRTC_PRIORITY_TYPE_LOW, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_READY_STATE, + g_param_spec_enum ("ready-state", + "Ready State", + "The Ready state of this data channel", + GST_TYPE_WEBRTC_DATA_CHANNEL_STATE, + GST_WEBRTC_DATA_CHANNEL_STATE_NEW, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_BUFFERED_AMOUNT, + g_param_spec_uint64 ("buffered-amount", + "Buffered Amount", + "The amount of data in bytes currently buffered", + 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_BUFFERED_AMOUNT_LOW_THRESHOLD, + g_param_spec_uint64 ("buffered-amount-low-threshold", + "Buffered Amount Low Threshold", + "The threshold at which the buffered amount is considered low and " + "the buffered-amount-low signal is emitted", + 0, G_MAXUINT64, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstWebRTCDataChannel::on-open: + * @object: the #GstWebRTCDataChannel + */ + gst_webrtc_data_channel_signals[SIGNAL_ON_OPEN] = + g_signal_new ("on-open", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 0); + + /** + * GstWebRTCDataChannel::on-close: + * @object: the #GstWebRTCDataChannel + */ + gst_webrtc_data_channel_signals[SIGNAL_ON_CLOSE] = + g_signal_new ("on-close", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 0); + + /** + * GstWebRTCDataChannel::on-error: + * @object: the #GstWebRTCDataChannel + * @error: the #GError thrown + */ + gst_webrtc_data_channel_signals[SIGNAL_ON_ERROR] = + g_signal_new ("on-error", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, G_TYPE_ERROR); + + /** + * GstWebRTCDataChannel::on-message-data: + * @object: the #GstWebRTCDataChannel + * @data: (nullable): a #GBytes of the data received + */ + gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_DATA] = + g_signal_new ("on-message-data", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, G_TYPE_BYTES); + + /** + * GstWebRTCDataChannel::on-message-string: + * @object: the #GstWebRTCDataChannel + * @data: (nullable): the data received as a string + */ + gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_STRING] = + g_signal_new ("on-message-string", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, G_TYPE_STRING); + + /** + * GstWebRTCDataChannel::on-buffered-amount-low: + * @object: the #GstWebRTCDataChannel + */ + gst_webrtc_data_channel_signals[SIGNAL_ON_BUFFERED_AMOUNT_LOW] = + g_signal_new ("on-buffered-amount-low", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 0); + + /** + * GstWebRTCDataChannel::send-data: + * @object: the #GstWebRTCDataChannel + * @data: (nullable): a #GBytes with the data + */ + gst_webrtc_data_channel_signals[SIGNAL_SEND_DATA] = + g_signal_new_class_handler ("send-data", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_CALLBACK (gst_webrtc_data_channel_send_data), NULL, NULL, + g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_BYTES); + + /** + * GstWebRTCDataChannel::send-string: + * @object: the #GstWebRTCDataChannel + * @data: (nullable): a #GBytes with the data + */ + gst_webrtc_data_channel_signals[SIGNAL_SEND_STRING] = + g_signal_new_class_handler ("send-string", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_CALLBACK (gst_webrtc_data_channel_send_string), NULL, NULL, + g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_STRING); + + /** + * GstWebRTCDataChannel::close: + * @object: the #GstWebRTCDataChannel + * + * Close the data channel + */ + gst_webrtc_data_channel_signals[SIGNAL_CLOSE] = + g_signal_new_class_handler ("close", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_CALLBACK (gst_webrtc_data_channel_close), NULL, NULL, + g_cclosure_marshal_generic, G_TYPE_NONE, 0); +} + +static void +gst_webrtc_data_channel_init (GstWebRTCDataChannel * channel) +{ +} diff --git a/ext/webrtc/webrtcdatachannel.h b/ext/webrtc/webrtcdatachannel.h new file mode 100644 index 0000000000..769844171c --- /dev/null +++ b/ext/webrtc/webrtcdatachannel.h @@ -0,0 +1,83 @@ +/* GStreamer + * Copyright (C) 2018 Matthew Waters + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_WEBRTC_DATA_CHANNEL_H__ +#define __GST_WEBRTC_DATA_CHANNEL_H__ + +#include +#include +#include +#include "sctptransport.h" + +G_BEGIN_DECLS + +GST_WEBRTC_API +GType gst_webrtc_data_channel_get_type(void); +#define GST_TYPE_WEBRTC_DATA_CHANNEL (gst_webrtc_data_channel_get_type()) +#define GST_WEBRTC_DATA_CHANNEL(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_WEBRTC_DATA_CHANNEL,GstWebRTCDataChannel)) +#define GST_IS_WEBRTC_DATA_CHANNEL(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_WEBRTC_DATA_CHANNEL)) +#define GST_WEBRTC_DATA_CHANNEL_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass) ,GST_TYPE_WEBRTC_DATA_CHANNEL,GstWebRTCDataChannelClass)) +#define GST_IS_WEBRTC_DATA_CHANNEL_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass) ,GST_TYPE_WEBRTC_DATA_CHANNEL)) +#define GST_WEBRTC_DATA_CHANNEL_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj) ,GST_TYPE_WEBRTC_DATA_CHANNEL,GstWebRTCDataChannelClass)) + +typedef struct _GstWebRTCDataChannel GstWebRTCDataChannel; +typedef struct _GstWebRTCDataChannelClass GstWebRTCDataChannelClass; + +struct _GstWebRTCDataChannel +{ + GstObject parent; + + GstWebRTCSCTPTransport *sctp_transport; + GstElement *appsrc; + GstElement *appsink; + + gchar *label; + gboolean ordered; + guint max_packet_lifetime; + guint max_retransmits; + gchar *protocol; + gboolean negotiated; + gint id; + GstWebRTCPriorityType priority; + GstWebRTCDataChannelState ready_state; + guint64 buffered_amount; + guint64 buffered_amount_low_threshold; + + GstWebRTCBin *webrtcbin; + gboolean opened; + gulong src_probe; + GError *stored_error; + + gpointer _padding[GST_PADDING]; +}; + +struct _GstWebRTCDataChannelClass +{ + GstObjectClass parent_class; + + gpointer _padding[GST_PADDING]; +}; + +void gst_webrtc_data_channel_start_negotiation (GstWebRTCDataChannel *channel); +void gst_webrtc_data_channel_set_sctp_transport (GstWebRTCDataChannel *channel, + GstWebRTCSCTPTransport *sctp); + +G_END_DECLS + +#endif /* __GST_WEBRTC_DATA_CHANNEL_H__ */ diff --git a/ext/webrtc/webrtcsdp.c b/ext/webrtc/webrtcsdp.c index 5584d9bd11..042a342824 100644 --- a/ext/webrtc/webrtcsdp.c +++ b/ext/webrtc/webrtcsdp.c @@ -714,3 +714,38 @@ _generate_ice_credentials (gchar ** ufrag, gchar ** password) ice_credential_chars[g_random_int_range (0, strlen (ice_credential_chars))]; } + +int +_get_sctp_port_from_media (const GstSDPMedia * media) +{ + int sctpmap = -1, i; + + for (i = 0; i < gst_sdp_media_attributes_len (media); i++) { + const GstSDPAttribute *attr = gst_sdp_media_get_attribute (media, i); + + if (g_strcmp0 (attr->key, "sctp-port") == 0) { + return atoi (attr->value); + } else if (g_strcmp0 (attr->key, "sctpmap") == 0) { + sctpmap = atoi (attr->value); + } + } + + if (sctpmap >= 0) + GST_LOG ("no sctp-port attribute in media"); + return sctpmap; +} + +guint64 +_get_sctp_max_message_size_from_media (const GstSDPMedia * media) +{ + int i; + + for (i = 0; i < gst_sdp_media_attributes_len (media); i++) { + const GstSDPAttribute *attr = gst_sdp_media_get_attribute (media, i); + + if (g_strcmp0 (attr->key, "max-message-size") == 0) + return atoi (attr->value); + } + + return 65536; +} diff --git a/ext/webrtc/webrtcsdp.h b/ext/webrtc/webrtcsdp.h index 779dcc2763..d5ea777b35 100644 --- a/ext/webrtc/webrtcsdp.h +++ b/ext/webrtc/webrtcsdp.h @@ -76,5 +76,9 @@ G_GNUC_INTERNAL gboolean _media_has_attribute_key (const GstSDPMedia * media, const gchar * key); +G_GNUC_INTERNAL +int _get_sctp_port_from_media (const GstSDPMedia * media); +G_GNUC_INTERNAL +guint64 _get_sctp_max_message_size_from_media (const GstSDPMedia * media); #endif /* __WEBRTC_UTILS_H__ */ diff --git a/gst-libs/gst/webrtc/webrtc_fwd.h b/gst-libs/gst/webrtc/webrtc_fwd.h index be0a3c334c..2e50125f50 100644 --- a/gst-libs/gst/webrtc/webrtc_fwd.h +++ b/gst-libs/gst/webrtc/webrtc_fwd.h @@ -264,4 +264,57 @@ typedef enum /*< underscore_name=gst_webrtc_fec_type >*/ GST_WEBRTC_FEC_TYPE_ULP_RED, } GstWebRTCFECType; +/** + * GstWebRTCSCTPTransportState: + * GST_WEBRTC_SCTP_TRANSPORT_STATE_NEW: new + * GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTING: connecting + * GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED: connected + * GST_WEBRTC_SCTP_TRANSPORT_STATE_CLOSED: closed + * + * See http://w3c.github.io/webrtc-pc/#dom-rtcsctptransportstate + */ +typedef enum /*< underscore_name=gst_webrtc_sctp_transport_state >*/ +{ + GST_WEBRTC_SCTP_TRANSPORT_STATE_NEW, + GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTING, + GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED, + GST_WEBRTC_SCTP_TRANSPORT_STATE_CLOSED, +} GstWebRTCSCTPTransportState; + +/** + * GstWebRTCPriorityType: + * GST_WEBRTC_PRIORITY_TYPE_VERY_LOW: very-low + * GST_WEBRTC_PRIORITY_TYPE_LOW: low + * GST_WEBRTC_PRIORITY_TYPE_MEDIUM: medium + * GST_WEBRTC_PRIORITY_TYPE_HIGH: high + * + * See http://w3c.github.io/webrtc-pc/#dom-rtcprioritytype + */ +typedef enum /*< underscore_name=gst_webrtc_priority_type >*/ +{ + GST_WEBRTC_PRIORITY_TYPE_VERY_LOW = 1, + GST_WEBRTC_PRIORITY_TYPE_LOW, + GST_WEBRTC_PRIORITY_TYPE_MEDIUM, + GST_WEBRTC_PRIORITY_TYPE_HIGH, +} GstWebRTCPriorityType; + +/** + * GstWebRTCDataChannelState: + * GST_WEBRTC_DATA_CHANNEL_STATE_NEW: new + * GST_WEBRTC_DATA_CHANNEL_STATE_CONNECTING: connection + * GST_WEBRTC_DATA_CHANNEL_STATE_OPEN: open + * GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING: closing + * GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED: closed + * + * See http://w3c.github.io/webrtc-pc/#dom-rtcdatachannelstate + */ +typedef enum /*< underscore_name=gst_webrtc_data_channel_state >*/ +{ + GST_WEBRTC_DATA_CHANNEL_STATE_NEW, + GST_WEBRTC_DATA_CHANNEL_STATE_CONNECTING, + GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, + GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING, + GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED, +} GstWebRTCDataChannelState; + #endif /* __GST_WEBRTC_FWD_H__ */ diff --git a/tests/check/elements/webrtcbin.c b/tests/check/elements/webrtcbin.c index 6f11914c36..2c3b611572 100644 --- a/tests/check/elements/webrtcbin.c +++ b/tests/check/elements/webrtcbin.c @@ -86,6 +86,12 @@ struct test_webrtc GstElement * element, GstPromise * promise, gpointer user_data); + gpointer data_channel_data; + GDestroyNotify data_channel_notify; + void (*on_data_channel) (struct test_webrtc * t, + GstElement * element, + GObject *data_channel, + gpointer user_data); gpointer answer_data; GDestroyNotify answer_notify; void (*on_pad_added) (struct test_webrtc * t, @@ -290,6 +296,16 @@ _on_pad_added (GstElement * webrtc, GstPad * new_pad, struct test_webrtc *t) g_mutex_unlock (&t->lock); } +static void +_on_data_channel (GstElement * webrtc, GObject * data_channel, + struct test_webrtc *t) +{ + g_mutex_lock (&t->lock); + if (t->on_data_channel) + t->on_data_channel (t, webrtc, data_channel, t->data_channel_data); + g_mutex_unlock (&t->lock); +} + static void _pad_added_not_reached (struct test_webrtc *t, GstElement * element, GstPad * pad, gpointer user_data) @@ -332,6 +348,13 @@ _offer_answer_not_reached (struct test_webrtc *t, GstElement * element, g_assert_not_reached (); } +static void +_on_data_channel_not_reached (struct test_webrtc *t, GstElement * element, + GObject * data_channel, gpointer user_data) +{ + g_assert_not_reached (); +} + static void _broadcast (struct test_webrtc *t) { @@ -387,6 +410,7 @@ test_webrtc_new (void) ret->on_pad_added = _pad_added_not_reached; ret->on_offer_created = _offer_answer_not_reached; ret->on_answer_created = _offer_answer_not_reached; + ret->on_data_channel = _on_data_channel_not_reached; ret->bus_message = _bus_no_errors; g_mutex_init (&ret->lock); @@ -415,6 +439,10 @@ test_webrtc_new (void) G_CALLBACK (_on_ice_candidate), ret); g_signal_connect (ret->webrtc2, "on-ice-candidate", G_CALLBACK (_on_ice_candidate), ret); + g_signal_connect (ret->webrtc1, "on-data-channel", + G_CALLBACK (_on_data_channel), ret); + g_signal_connect (ret->webrtc2, "on-data-channel", + G_CALLBACK (_on_data_channel), ret); g_signal_connect (ret->webrtc1, "pad-added", G_CALLBACK (_on_pad_added), ret); g_signal_connect (ret->webrtc2, "pad-added", G_CALLBACK (_on_pad_added), ret); g_signal_connect_swapped (ret->webrtc1, "notify::ice-gathering-state", @@ -475,6 +503,8 @@ test_webrtc_free (struct test_webrtc *t) t->answer_notify (t->answer_data); if (t->pad_added_notify) t->pad_added_notify (t->pad_added_data); + if (t->data_channel_notify) + t->data_channel_notify (t->data_channel_data); fail_unless_equals_int (GST_STATE_CHANGE_SUCCESS, gst_element_set_state (t->webrtc1, GST_STATE_NULL)); @@ -522,12 +552,18 @@ test_webrtc_wait_for_answer_error_eos (struct test_webrtc *t) test_webrtc_wait_for_state_mask (t, states); } +static void +test_webrtc_signal_state_unlocked (struct test_webrtc *t, TestState state) +{ + t->state = state; + g_cond_broadcast (&t->cond); +} + static void test_webrtc_signal_state (struct test_webrtc *t, TestState state) { g_mutex_lock (&t->lock); - t->state = state; - g_cond_broadcast (&t->cond); + test_webrtc_signal_state_unlocked (t, state); g_mutex_unlock (&t->lock); } @@ -1403,6 +1439,459 @@ GST_START_TEST (test_recvonly_sendonly) GST_END_TEST; +static gboolean +_message_media_is_datachannel (const GstSDPMessage * msg, guint media_id) +{ + const GstSDPMedia *media; + + if (!msg) + return FALSE; + + if (gst_sdp_message_medias_len (msg) <= media_id) + return FALSE; + + media = gst_sdp_message_get_media (msg, media_id); + + if (g_strcmp0 (gst_sdp_media_get_media (media), "application") != 0) + return FALSE; + + if (gst_sdp_media_formats_len (media) != 1) + return FALSE; + + if (g_strcmp0 (gst_sdp_media_get_format (media, 0), + "webrtc-datachannel") != 0) + return FALSE; + + return TRUE; +} + +static void +on_sdp_has_datachannel (struct test_webrtc *t, GstElement * element, + GstWebRTCSessionDescription * desc, gpointer user_data) +{ + gboolean have_data_channel = FALSE; + int i; + + for (i = 0; i < gst_sdp_message_medias_len (desc->sdp); i++) { + if (_message_media_is_datachannel (desc->sdp, i)) { + /* there should only be one data channel m= section */ + fail_unless_equals_int (FALSE, have_data_channel); + have_data_channel = TRUE; + } + } + + fail_unless_equals_int (TRUE, have_data_channel); +} + +static void +on_channel_error_not_reached (GObject * channel, GError * error, + gpointer user_data) +{ + g_assert_not_reached (); +} + +GST_START_TEST (test_data_channel_create) +{ + struct test_webrtc *t = test_webrtc_new (); + GObject *channel = NULL; + struct validate_sdp offer = { on_sdp_has_datachannel, NULL }; + struct validate_sdp answer = { on_sdp_has_datachannel, NULL }; + gchar *label; + + t->on_negotiation_needed = NULL; + t->offer_data = &offer; + t->on_offer_created = validate_sdp; + t->answer_data = &answer; + t->on_answer_created = validate_sdp; + t->on_ice_candidate = NULL; + + g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL, + &channel); + g_assert_nonnull (channel); + g_object_get (channel, "label", &label, NULL); + g_assert_cmpstr (label, ==, "label"); + g_signal_connect (channel, "on-error", + G_CALLBACK (on_channel_error_not_reached), NULL); + + test_webrtc_create_offer (t, t->webrtc1); + + test_webrtc_wait_for_answer_error_eos (t); + fail_unless_equals_int (STATE_ANSWER_CREATED, t->state); + g_object_unref (channel); + g_free (label); + test_webrtc_free (t); +} + +GST_END_TEST; + +static void +have_data_channel (struct test_webrtc *t, GstElement * element, + GObject * our, gpointer user_data) +{ + GObject *other = user_data; + gchar *our_label, *other_label; + + g_signal_connect (our, "on-error", G_CALLBACK (on_channel_error_not_reached), + NULL); + + g_object_get (our, "label", &our_label, NULL); + g_object_get (other, "label", &other_label, NULL); + + g_assert_cmpstr (our_label, ==, other_label); + + g_free (our_label); + g_free (other_label); + + test_webrtc_signal_state_unlocked (t, STATE_CUSTOM); +} + +GST_START_TEST (test_data_channel_remote_notify) +{ + struct test_webrtc *t = test_webrtc_new (); + GObject *channel = NULL; + struct validate_sdp offer = { on_sdp_has_datachannel, NULL }; + struct validate_sdp answer = { on_sdp_has_datachannel, NULL }; + + t->on_negotiation_needed = NULL; + t->offer_data = &offer; + t->on_offer_created = validate_sdp; + t->answer_data = &answer; + t->on_answer_created = validate_sdp; + t->on_ice_candidate = NULL; + t->on_data_channel = have_data_channel; + + g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL, + &channel); + g_assert_nonnull (channel); + t->data_channel_data = channel; + g_signal_connect (channel, "on-error", + G_CALLBACK (on_channel_error_not_reached), NULL); + + gst_element_set_state (t->webrtc1, GST_STATE_PLAYING); + gst_element_set_state (t->webrtc2, GST_STATE_PLAYING); + + test_webrtc_create_offer (t, t->webrtc1); + + test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM); + + g_object_unref (channel); + test_webrtc_free (t); +} + +GST_END_TEST; + +static const gchar *test_string = "GStreamer WebRTC is awesome!"; + +static void +on_message_string (GObject * channel, const gchar * str, struct test_webrtc *t) +{ + gchar *expected = g_object_steal_data (channel, "expected"); + g_assert_cmpstr (expected, ==, str); + g_free (expected); + + test_webrtc_signal_state (t, STATE_CUSTOM); +} + +static void +have_data_channel_transfer_string (struct test_webrtc *t, GstElement * element, + GObject * our, gpointer user_data) +{ + GObject *other = user_data; + GstWebRTCDataChannelState state; + + g_object_get (our, "ready-state", &state, NULL); + fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state); + g_object_get (other, "ready-state", &state, NULL); + fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state); + + g_object_set_data_full (our, "expected", g_strdup (test_string), g_free); + g_signal_connect (our, "on-message-string", G_CALLBACK (on_message_string), + t); + + g_signal_connect (other, "on-error", + G_CALLBACK (on_channel_error_not_reached), NULL); + g_signal_emit_by_name (other, "send-string", test_string); +} + +GST_START_TEST (test_data_channel_transfer_string) +{ + struct test_webrtc *t = test_webrtc_new (); + GObject *channel = NULL; + struct validate_sdp offer = { on_sdp_has_datachannel, NULL }; + struct validate_sdp answer = { on_sdp_has_datachannel, NULL }; + + t->on_negotiation_needed = NULL; + t->offer_data = &offer; + t->on_offer_created = validate_sdp; + t->answer_data = &answer; + t->on_answer_created = validate_sdp; + t->on_ice_candidate = NULL; + t->on_data_channel = have_data_channel_transfer_string; + + g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL, + &channel); + g_assert_nonnull (channel); + t->data_channel_data = channel; + g_signal_connect (channel, "on-error", + G_CALLBACK (on_channel_error_not_reached), NULL); + + gst_element_set_state (t->webrtc1, GST_STATE_PLAYING); + gst_element_set_state (t->webrtc2, GST_STATE_PLAYING); + + test_webrtc_create_offer (t, t->webrtc1); + + test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM); + + g_object_unref (channel); + test_webrtc_free (t); +} + +GST_END_TEST; + +#define g_assert_cmpbytes(b1, b2) \ + G_STMT_START { \ + gsize l1, l2; \ + const guint8 *d1 = g_bytes_get_data (b1, &l1); \ + const guint8 *d2 = g_bytes_get_data (b2, &l2); \ + g_assert_cmpmem (d1, l1, d2, l2); \ + } G_STMT_END; + +static void +on_message_data (GObject * channel, GBytes * data, struct test_webrtc *t) +{ + GBytes *expected = g_object_steal_data (channel, "expected"); + g_assert_cmpbytes (data, expected); + g_bytes_unref (expected); + + test_webrtc_signal_state (t, STATE_CUSTOM); +} + +static void +have_data_channel_transfer_data (struct test_webrtc *t, GstElement * element, + GObject * our, gpointer user_data) +{ + GObject *other = user_data; + GBytes *data = g_bytes_new_static (test_string, strlen (test_string)); + GstWebRTCDataChannelState state; + + g_object_get (our, "ready-state", &state, NULL); + fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state); + g_object_get (other, "ready-state", &state, NULL); + fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state); + + g_object_set_data_full (our, "expected", g_bytes_ref (data), + (GDestroyNotify) g_bytes_unref); + g_signal_connect (our, "on-message-data", G_CALLBACK (on_message_data), t); + + g_signal_connect (other, "on-error", + G_CALLBACK (on_channel_error_not_reached), NULL); + g_signal_emit_by_name (other, "send-data", data); +} + +GST_START_TEST (test_data_channel_transfer_data) +{ + struct test_webrtc *t = test_webrtc_new (); + GObject *channel = NULL; + struct validate_sdp offer = { on_sdp_has_datachannel, NULL }; + struct validate_sdp answer = { on_sdp_has_datachannel, NULL }; + + t->on_negotiation_needed = NULL; + t->offer_data = &offer; + t->on_offer_created = validate_sdp; + t->answer_data = &answer; + t->on_answer_created = validate_sdp; + t->on_ice_candidate = NULL; + t->on_data_channel = have_data_channel_transfer_data; + + g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL, + &channel); + g_assert_nonnull (channel); + t->data_channel_data = channel; + g_signal_connect (channel, "on-error", + G_CALLBACK (on_channel_error_not_reached), NULL); + + gst_element_set_state (t->webrtc1, GST_STATE_PLAYING); + gst_element_set_state (t->webrtc2, GST_STATE_PLAYING); + + test_webrtc_create_offer (t, t->webrtc1); + + test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM); + + g_object_unref (channel); + test_webrtc_free (t); +} + +GST_END_TEST; + +static void +have_data_channel_create_data_channel (struct test_webrtc *t, + GstElement * element, GObject * our, gpointer user_data) +{ + GObject *another; + + t->on_data_channel = have_data_channel_transfer_string; + + g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL, + &another); + g_assert_nonnull (another); + t->data_channel_data = another; + g_signal_connect (another, "on-error", + G_CALLBACK (on_channel_error_not_reached), NULL); +} + +GST_START_TEST (test_data_channel_create_after_negotiate) +{ + struct test_webrtc *t = test_webrtc_new (); + GObject *channel = NULL; + struct validate_sdp offer = { on_sdp_has_datachannel, NULL }; + struct validate_sdp answer = { on_sdp_has_datachannel, NULL }; + + t->on_negotiation_needed = NULL; + t->offer_data = &offer; + t->on_offer_created = validate_sdp; + t->answer_data = &answer; + t->on_answer_created = validate_sdp; + t->on_ice_candidate = NULL; + t->on_data_channel = have_data_channel_create_data_channel; + + g_signal_emit_by_name (t->webrtc1, "create-data-channel", "prev-label", NULL, + &channel); + g_assert_nonnull (channel); + t->data_channel_data = channel; + g_signal_connect (channel, "on-error", + G_CALLBACK (on_channel_error_not_reached), NULL); + + gst_element_set_state (t->webrtc1, GST_STATE_PLAYING); + gst_element_set_state (t->webrtc2, GST_STATE_PLAYING); + + test_webrtc_create_offer (t, t->webrtc1); + + test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM); + + g_object_unref (channel); + test_webrtc_free (t); +} + +GST_END_TEST; + +static void +on_buffered_amount_low_emitted (GObject * channel, struct test_webrtc *t) +{ + test_webrtc_signal_state (t, STATE_CUSTOM); +} + +static void +have_data_channel_check_low_threshold_emitted (struct test_webrtc *t, + GstElement * element, GObject * our, gpointer user_data) +{ + g_signal_connect (our, "on-buffered-amount-low", + G_CALLBACK (on_buffered_amount_low_emitted), t); + g_object_set (our, "buffered-amount-low-threshold", 1, NULL); + + g_signal_connect (our, "on-error", G_CALLBACK (on_channel_error_not_reached), + NULL); + g_signal_emit_by_name (our, "send-string", "DATA"); +} + +GST_START_TEST (test_data_channel_low_threshold) +{ + struct test_webrtc *t = test_webrtc_new (); + GObject *channel = NULL; + struct validate_sdp offer = { on_sdp_has_datachannel, NULL }; + struct validate_sdp answer = { on_sdp_has_datachannel, NULL }; + + t->on_negotiation_needed = NULL; + t->offer_data = &offer; + t->on_offer_created = validate_sdp; + t->answer_data = &answer; + t->on_answer_created = validate_sdp; + t->on_ice_candidate = NULL; + t->on_data_channel = have_data_channel_check_low_threshold_emitted; + + g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL, + &channel); + g_assert_nonnull (channel); + t->data_channel_data = channel; + g_signal_connect (channel, "on-error", + G_CALLBACK (on_channel_error_not_reached), NULL); + + gst_element_set_state (t->webrtc1, GST_STATE_PLAYING); + gst_element_set_state (t->webrtc2, GST_STATE_PLAYING); + + test_webrtc_create_offer (t, t->webrtc1); + + test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM); + + g_object_unref (channel); + test_webrtc_free (t); +} + +GST_END_TEST; + +static void +on_channel_error (GObject * channel, GError * error, struct test_webrtc *t) +{ + g_assert_nonnull (error); + + test_webrtc_signal_state (t, STATE_CUSTOM); +} + +static void +have_data_channel_transfer_large_data (struct test_webrtc *t, + GstElement * element, GObject * our, gpointer user_data) +{ + GObject *other = user_data; + const gsize size = 1024 * 1024; + guint8 *random_data = g_new (guint8, size); + GBytes *data; + gsize i; + + for (i = 0; i < size; i++) + random_data[i] = (guint8) (i & 0xff); + + data = g_bytes_new_static (random_data, size); + + g_object_set_data_full (our, "expected", g_bytes_ref (data), + (GDestroyNotify) g_bytes_unref); + g_signal_connect (our, "on-message-data", G_CALLBACK (on_message_data), t); + + g_signal_connect (other, "on-error", G_CALLBACK (on_channel_error), t); + g_signal_emit_by_name (other, "send-data", data); +} + +GST_START_TEST (test_data_channel_max_message_size) +{ + struct test_webrtc *t = test_webrtc_new (); + GObject *channel = NULL; + struct validate_sdp offer = { on_sdp_has_datachannel, NULL }; + struct validate_sdp answer = { on_sdp_has_datachannel, NULL }; + + t->on_negotiation_needed = NULL; + t->offer_data = &offer; + t->on_offer_created = validate_sdp; + t->answer_data = &answer; + t->on_answer_created = validate_sdp; + t->on_ice_candidate = NULL; + t->on_data_channel = have_data_channel_transfer_large_data; + + g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL, + &channel); + g_assert_nonnull (channel); + t->data_channel_data = channel; + + gst_element_set_state (t->webrtc1, GST_STATE_PLAYING); + gst_element_set_state (t->webrtc2, GST_STATE_PLAYING); + + test_webrtc_create_offer (t, t->webrtc1); + + test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM); + + g_object_unref (channel); + test_webrtc_free (t); +} + +GST_END_TEST; + static Suite * webrtcbin_suite (void) { @@ -1429,6 +1918,13 @@ webrtcbin_suite (void) tcase_add_test (tc, test_add_recvonly_transceiver); tcase_add_test (tc, test_recvonly_sendonly); tcase_add_test (tc, test_payload_types); + tcase_add_test (tc, test_data_channel_create); + tcase_add_test (tc, test_data_channel_remote_notify); + tcase_add_test (tc, test_data_channel_transfer_string); + tcase_add_test (tc, test_data_channel_transfer_data); + tcase_add_test (tc, test_data_channel_create_after_negotiate); + tcase_add_test (tc, test_data_channel_low_threshold); + tcase_add_test (tc, test_data_channel_max_message_size); } if (nicesrc)