webrtcbin: add support for data channels based on SCTP

Mostly follows the W3C specification
https://www.w3.org/TR/webrtc/#peer-to-peer-data-api

With contributions from:
Mathieu Duponchelle <mathieu@centricular.com>

https://bugzilla.gnome.org/show_bug.cgi?id=794351
This commit is contained in:
Matthew Waters 2018-09-10 23:52:05 +10:00
parent cf46d49b1e
commit 07e9374eff
15 changed files with 3299 additions and 204 deletions

View file

@ -7,11 +7,13 @@ noinst_HEADERS = \
gstwebrtcstats.h \ gstwebrtcstats.h \
icestream.h \ icestream.h \
nicetransport.h \ nicetransport.h \
sctptransport.h \
transportstream.h \ transportstream.h \
transportsendbin.h \ transportsendbin.h \
transportreceivebin.h \ transportreceivebin.h \
utils.h \ utils.h \
webrtcsdp.h \ webrtcsdp.h \
webrtcdatachannel.h \
webrtctransceiver.h webrtctransceiver.h
libgstwebrtc_la_SOURCES = \ libgstwebrtc_la_SOURCES = \
@ -21,11 +23,13 @@ libgstwebrtc_la_SOURCES = \
gstwebrtcstats.c \ gstwebrtcstats.c \
icestream.c \ icestream.c \
nicetransport.c \ nicetransport.c \
sctptransport.c \
transportstream.c \ transportstream.c \
transportsendbin.c \ transportsendbin.c \
transportreceivebin.c \ transportreceivebin.c \
utils.c \ utils.c \
webrtcsdp.c \ webrtcsdp.c \
webrtcdatachannel.c \
webrtctransceiver.c webrtctransceiver.c
libgstwebrtc_la_SOURCES += $(BUILT_SOURCES) libgstwebrtc_la_SOURCES += $(BUILT_SOURCES)
@ -40,12 +44,14 @@ libgstwebrtc_la_CFLAGS = \
$(GST_SDP_CFLAGS) \ $(GST_SDP_CFLAGS) \
$(NICE_CFLAGS) $(NICE_CFLAGS)
libgstwebrtc_la_LIBADD = \ libgstwebrtc_la_LIBADD = \
-lgstapp-@GST_API_VERSION@ \
$(GST_PLUGINS_BASE_LIBS) \ $(GST_PLUGINS_BASE_LIBS) \
$(GST_BASE_LIBS) \ $(GST_BASE_LIBS) \
$(GST_LIBS) \ $(GST_LIBS) \
$(GST_SDP_LIBS) \ $(GST_SDP_LIBS) \
$(NICE_LIBS) \ $(NICE_LIBS) \
$(top_builddir)/gst-libs/gst/webrtc/libgstwebrtc-@GST_API_VERSION@.la $(top_builddir)/gst-libs/gst/webrtc/libgstwebrtc-@GST_API_VERSION@.la \
$(top_builddir)/gst-libs/gst/sctp/libgstsctp-@GST_API_VERSION@.la
libgstwebrtc_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) libgstwebrtc_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
libgstwebrtc_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS) libgstwebrtc_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS)

View file

@ -41,6 +41,10 @@ typedef struct _GstWebRTCNiceTransport GstWebRTCNiceTransport;
typedef struct _GstWebRTCNiceTransportClass GstWebRTCNiceTransportClass; typedef struct _GstWebRTCNiceTransportClass GstWebRTCNiceTransportClass;
typedef struct _GstWebRTCNiceTransportPrivate GstWebRTCNiceTransportPrivate; typedef struct _GstWebRTCNiceTransportPrivate GstWebRTCNiceTransportPrivate;
typedef struct _GstWebRTCSCTPTransport GstWebRTCSCTPTransport;
typedef struct _GstWebRTCSCTPTransportClass GstWebRTCSCTPTransportClass;
typedef struct _GstWebRTCSCTPTransportPrivate GstWebRTCSCTPTransportPrivate;
typedef struct _TransportStream TransportStream; typedef struct _TransportStream TransportStream;
typedef struct _TransportStreamClass TransportStreamClass; typedef struct _TransportStreamClass TransportStreamClass;

File diff suppressed because it is too large Load diff

View file

@ -23,6 +23,7 @@
#include <gst/sdp/sdp.h> #include <gst/sdp/sdp.h>
#include "fwd.h" #include "fwd.h"
#include "gstwebrtcice.h" #include "gstwebrtcice.h"
#include "transportstream.h"
G_BEGIN_DECLS G_BEGIN_DECLS
@ -37,7 +38,9 @@ typedef enum
GST_WEBRTC_BIN_ERROR_INVALID_STATE, GST_WEBRTC_BIN_ERROR_INVALID_STATE,
GST_WEBRTC_BIN_ERROR_BAD_SDP, GST_WEBRTC_BIN_ERROR_BAD_SDP,
GST_WEBRTC_BIN_ERROR_FINGERPRINT, GST_WEBRTC_BIN_ERROR_FINGERPRINT,
} GstWebRTCJSEPSDPError; GST_WEBRTC_BIN_ERROR_SCTP_FAILURE,
GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
} GstWebRTCError;
GType gst_webrtc_bin_pad_get_type(void); GType gst_webrtc_bin_pad_get_type(void);
#define GST_TYPE_WEBRTC_BIN_PAD (gst_webrtc_bin_pad_get_type()) #define GST_TYPE_WEBRTC_BIN_PAD (gst_webrtc_bin_pad_get_type())
@ -107,6 +110,13 @@ struct _GstWebRTCBinPrivate
GArray *transceivers; GArray *transceivers;
GArray *session_mid_map; GArray *session_mid_map;
GArray *transports; GArray *transports;
GArray *data_channels;
/* list of data channels we've received a sctp stream for but no data
* channel protocol for */
GArray *pending_data_channels;
GstWebRTCSCTPTransport *sctp_transport;
TransportStream *data_channel_transport;
GstWebRTCICE *ice; GstWebRTCICE *ice;
GArray *ice_stream_map; GArray *ice_stream_map;
@ -115,7 +125,6 @@ struct _GstWebRTCBinPrivate
/* peerconnection variables */ /* peerconnection variables */
gboolean is_closed; gboolean is_closed;
gboolean need_negotiation; gboolean need_negotiation;
gpointer sctp_transport; /* FIXME */
/* peerconnection helper thread for promises */ /* peerconnection helper thread for promises */
GMainContext *main_context; GMainContext *main_context;

View file

@ -4,6 +4,7 @@ webrtc_sources = [
'gstwebrtcstats.c', 'gstwebrtcstats.c',
'icestream.c', 'icestream.c',
'nicetransport.c', 'nicetransport.c',
'sctptransport.c',
'gstwebrtcbin.c', 'gstwebrtcbin.c',
'transportreceivebin.c', 'transportreceivebin.c',
'transportsendbin.c', 'transportsendbin.c',
@ -11,6 +12,7 @@ webrtc_sources = [
'utils.c', 'utils.c',
'webrtcsdp.c', 'webrtcsdp.c',
'webrtctransceiver.c', 'webrtctransceiver.c',
'webrtcdatachannel.c',
] ]
libnice_dep = dependency('nice', version : '>=0.1.14', required : get_option('webrtc'), libnice_dep = dependency('nice', version : '>=0.1.14', required : get_option('webrtc'),
@ -22,7 +24,7 @@ if libnice_dep.found()
webrtc_sources, webrtc_sources,
c_args : gst_plugins_bad_args + ['-DGST_USE_UNSTABLE_API'], c_args : gst_plugins_bad_args + ['-DGST_USE_UNSTABLE_API'],
include_directories : [configinc], include_directories : [configinc],
dependencies : [libnice_dep, gstbase_dep, gstsdp_dep, gstwebrtc_dep], dependencies : [libnice_dep, gstbase_dep, gstsdp_dep, gstapp_dep, gstwebrtc_dep, gstsctp_dep],
install : true, install : true,
install_dir : plugins_install_dir, install_dir : plugins_install_dir,
) )

270
ext/webrtc/sctptransport.c Normal file
View file

@ -0,0 +1,270 @@
/* GStreamer
* Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <stdio.h>
#include "sctptransport.h"
#include "gstwebrtcbin.h"
#define GST_CAT_DEFAULT gst_webrtc_sctp_transport_debug
GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
enum
{
SIGNAL_0,
ON_RESET_STREAM_SIGNAL,
LAST_SIGNAL,
};
enum
{
PROP_0,
PROP_TRANSPORT,
PROP_STATE,
PROP_MAX_MESSAGE_SIZE,
PROP_MAX_CHANNELS,
};
static guint gst_webrtc_sctp_transport_signals[LAST_SIGNAL] = { 0 };
#define gst_webrtc_sctp_transport_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstWebRTCSCTPTransport, gst_webrtc_sctp_transport,
GST_TYPE_OBJECT, GST_DEBUG_CATEGORY_INIT (gst_webrtc_sctp_transport_debug,
"webrtcsctptransport", 0, "webrtcsctptransport"););
typedef void (*SCTPTask) (GstWebRTCSCTPTransport * sctp, gpointer user_data);
struct task
{
GstWebRTCSCTPTransport *sctp;
SCTPTask func;
gpointer user_data;
GDestroyNotify notify;
};
static void
_execute_task (GstWebRTCBin * webrtc, struct task *task)
{
if (task->func)
task->func (task->sctp, task->user_data);
}
static void
_free_task (struct task *task)
{
gst_object_unref (task->sctp);
if (task->notify)
task->notify (task->user_data);
g_free (task);
}
static void
_sctp_enqueue_task (GstWebRTCSCTPTransport * sctp, SCTPTask func,
gpointer user_data, GDestroyNotify notify)
{
struct task *task = g_new0 (struct task, 1);
task->sctp = gst_object_ref (sctp);
task->func = func;
task->user_data = user_data;
task->notify = notify;
gst_webrtc_bin_enqueue_task (sctp->webrtcbin,
(GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task);
}
static void
_emit_stream_reset (GstWebRTCSCTPTransport * sctp, gpointer user_data)
{
guint stream_id = GPOINTER_TO_UINT (user_data);
g_signal_emit (sctp,
gst_webrtc_sctp_transport_signals[ON_RESET_STREAM_SIGNAL], 0, stream_id);
}
static void
_on_sctp_dec_pad_removed (GstElement * sctpdec, GstPad * pad,
GstWebRTCSCTPTransport * sctp)
{
guint stream_id;
if (sscanf (GST_PAD_NAME (pad), "src_%u", &stream_id) != 1)
return;
_sctp_enqueue_task (sctp, (SCTPTask) _emit_stream_reset,
GUINT_TO_POINTER (stream_id), NULL);
}
static void
_on_sctp_association_established (GstElement * sctpenc, gboolean established,
GstWebRTCSCTPTransport * sctp)
{
GST_OBJECT_LOCK (sctp);
if (established)
sctp->state = GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED;
else
sctp->state = GST_WEBRTC_SCTP_TRANSPORT_STATE_CLOSED;
sctp->association_established = established;
GST_OBJECT_UNLOCK (sctp);
g_object_notify (G_OBJECT (sctp), "state");
}
static void
gst_webrtc_sctp_transport_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
// GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object);
switch (prop_id) {
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_webrtc_sctp_transport_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object);
switch (prop_id) {
case PROP_TRANSPORT:
g_value_set_object (value, sctp->transport);
break;
case PROP_STATE:
g_value_set_enum (value, sctp->state);
break;
case PROP_MAX_MESSAGE_SIZE:
g_value_set_uint64 (value, sctp->max_message_size);
break;
case PROP_MAX_CHANNELS:
g_value_set_uint (value, sctp->max_channels);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_webrtc_sctp_transport_finalize (GObject * object)
{
GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object);
g_signal_handlers_disconnect_by_data (sctp->sctpdec, sctp);
g_signal_handlers_disconnect_by_data (sctp->sctpenc, sctp);
gst_object_unref (sctp->sctpdec);
gst_object_unref (sctp->sctpenc);
g_clear_object (&sctp->transport);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
gst_webrtc_sctp_transport_constructed (GObject * object)
{
GstWebRTCSCTPTransport *sctp = GST_WEBRTC_SCTP_TRANSPORT (object);
guint association_id;
association_id = g_random_int_range (0, G_MAXUINT16);
sctp->sctpdec =
g_object_ref_sink (gst_element_factory_make ("sctpdec", NULL));
g_object_set (sctp->sctpdec, "sctp-association-id", association_id, NULL);
sctp->sctpenc =
g_object_ref_sink (gst_element_factory_make ("sctpenc", NULL));
g_object_set (sctp->sctpenc, "sctp-association-id", association_id, NULL);
g_signal_connect (sctp->sctpdec, "pad-removed",
G_CALLBACK (_on_sctp_dec_pad_removed), sctp);
g_signal_connect (sctp->sctpenc, "sctp-association-established",
G_CALLBACK (_on_sctp_association_established), sctp);
G_OBJECT_CLASS (parent_class)->constructed (object);
}
static void
gst_webrtc_sctp_transport_class_init (GstWebRTCSCTPTransportClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
gobject_class->constructed = gst_webrtc_sctp_transport_constructed;
gobject_class->get_property = gst_webrtc_sctp_transport_get_property;
gobject_class->set_property = gst_webrtc_sctp_transport_set_property;
gobject_class->finalize = gst_webrtc_sctp_transport_finalize;
g_object_class_install_property (gobject_class,
PROP_TRANSPORT,
g_param_spec_object ("transport",
"WebRTC DTLS Transport",
"DTLS transport used for this SCTP transport",
GST_TYPE_WEBRTC_DTLS_TRANSPORT,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_STATE,
g_param_spec_enum ("state",
"WebRTC SCTP Transport state", "WebRTC SCTP Transport state",
GST_TYPE_WEBRTC_SCTP_TRANSPORT_STATE,
GST_WEBRTC_SCTP_TRANSPORT_STATE_NEW,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_MAX_MESSAGE_SIZE,
g_param_spec_uint64 ("max-message-size",
"Maximum message size",
"Maximum message size as reported by the transport", 0, G_MAXUINT64,
0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_MAX_CHANNELS,
g_param_spec_uint ("max-channels",
"Maximum number of channels", "Maximum number of channels",
0, G_MAXUINT16, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstWebRTCSCTPTransport::reset-stream:
* @object: the #GstWebRTCSCTPTransport
* @stream_id: the SCTP stream that was reset
*/
gst_webrtc_sctp_transport_signals[ON_RESET_STREAM_SIGNAL] =
g_signal_new ("stream-reset", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
G_TYPE_NONE, 1, G_TYPE_UINT);
}
static void
gst_webrtc_sctp_transport_init (GstWebRTCSCTPTransport * nice)
{
}
GstWebRTCSCTPTransport *
gst_webrtc_sctp_transport_new (void)
{
return g_object_new (GST_TYPE_WEBRTC_SCTP_TRANSPORT, NULL);
}

View file

@ -0,0 +1,65 @@
/* GStreamer
* Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_WEBRTC_SCTP_TRANSPORT_H__
#define __GST_WEBRTC_SCTP_TRANSPORT_H__
#include <gst/gst.h>
/* libnice */
#include <agent.h>
#include <gst/webrtc/webrtc.h>
#include "gstwebrtcice.h"
G_BEGIN_DECLS
GType gst_webrtc_sctp_transport_get_type(void);
#define GST_TYPE_WEBRTC_SCTP_TRANSPORT (gst_webrtc_sctp_transport_get_type())
#define GST_WEBRTC_SCTP_TRANSPORT(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_WEBRTC_SCTP_TRANSPORT,GstWebRTCSCTPTransport))
#define GST_IS_WEBRTC_SCTP_TRANSPORT(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_WEBRTC_SCTP_TRANSPORT))
#define GST_WEBRTC_SCTP_TRANSPORT_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass) ,GST_TYPE_WEBRTC_SCTP_TRANSPORT,GstWebRTCSCTPTransportClass))
#define GST_IS_WEBRTC_SCTP_TRANSPORT_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass) ,GST_TYPE_WEBRTC_SCTP_TRANSPORT))
#define GST_WEBRTC_SCTP_TRANSPORT_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj) ,GST_TYPE_WEBRTC_SCTP_TRANSPORT,GstWebRTCSCTPTransportClass))
struct _GstWebRTCSCTPTransport
{
GstObject parent;
GstWebRTCDTLSTransport *transport;
GstWebRTCSCTPTransportState state;
guint64 max_message_size;
guint max_channels;
gboolean association_established;
GstElement *sctpdec;
GstElement *sctpenc;
GstWebRTCBin *webrtcbin;
};
struct _GstWebRTCSCTPTransportClass
{
GstObjectClass parent_class;
};
GstWebRTCSCTPTransport * gst_webrtc_sctp_transport_new (void);
G_END_DECLS
#endif /* __GST_WEBRTC_SCTP_TRANSPORT_H__ */

View file

@ -25,23 +25,24 @@
#include "utils.h" #include "utils.h"
/* /*
* ,----------------------------transport_receive_%u-----------------------------, * ,----------------------------transport_receive_%u----------------------------,
* ; (rtp) ; * ; (rtp/data) ;
* ; ,---nicesrc----, ,-capsfilter-, ,----dtlssrtpdec----, ,--funnel--, ; * ; ,---nicesrc----, ,-capsfilter-, ,---dtlssrtpdec---, ,--funnel--, ;
* ; ; src o--o sink src o--o sink rtp_src o------o sink_0 ; ; * ; ; src o--o sink src o--o sink rtp_src o-------o sink_0 ; ;
* ; '--------------' '------------' ; ; ; src o--o rtp_src * ; '--------------' '------------' ; ; ; src o--o rtp_src
* ; ; rtcp_src o-, ,--o sink_1 ; ; * ; ; rtcp_src o---, ,-o sink_1 ; ;
* ; '-------------------' ; ; '----------' ; * ; ; ; ; ; '----------' ;
* ; ; ; ,--funnel--, ; * ; ; data_src o-, ; ; ,--funnel--, ;
* ; '-+--o sink_0 ; ; * ; '-----------------' ; '-+-o sink_0 ; ;
* ; ,-' ; src o--o rtcp_src * ; ,---dtlssrtpdec---, ; ,-' ; src o--o rtcp_src
* ; (rtcp) ; ,-o sink_1 ; ; * ; (rtcp) ; rtp_src o-+-' ,-o sink_1 ; ;
* ; ,---nicesrc----, ,-capsfilter-, ,----dtlssrtpdec----, ; ; '----------' ; * ; ,---nicesrc----, ,-capsfilter-, ; ; ; ; '----------' ;
* ; ; src o--o sink src o--o sink rtp_src o-' ; ; * ; ; src o--o sink src o--o sink rtcp_src o-+---' ,--funnel--, ;
* ; '--------------' '------------' ; ; ; ; * ; '--------------' '------------' ; ; '-----o sink_0 ; ;
* ; ; rtcp_src o----' ; * ; ; data_src o-, ; src o--o data_src
* ; '-------------------' ; * ; '-----------------' '-----o sink_1 ; ;
* '-----------------------------------------------------------------------------' * ; '----------' ;
* '----------------------------------------------------------------------------'
* *
* Do we really wnat to be *that* permissive in what we accept? * Do we really wnat to be *that* permissive in what we accept?
* *
@ -70,6 +71,12 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src",
GST_PAD_ALWAYS, GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-rtp")); GST_STATIC_CAPS ("application/x-rtp"));
static GstStaticPadTemplate data_sink_template =
GST_STATIC_PAD_TEMPLATE ("data_src",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
enum enum
{ {
PROP_0, PROP_0,
@ -336,6 +343,21 @@ transport_receive_bin_constructed (GObject * object)
gst_element_add_pad (GST_ELEMENT (receive), ghost); gst_element_add_pad (GST_ELEMENT (receive), ghost);
gst_object_unref (pad); gst_object_unref (pad);
/* create funnel for data_src */
funnel = gst_element_factory_make ("funnel", NULL);
gst_bin_add (GST_BIN (receive), funnel);
if (!gst_element_link_pads (receive->stream->transport->dtlssrtpdec,
"data_src", funnel, "sink_0"))
g_warn_if_reached ();
if (!gst_element_link_pads (receive->stream->rtcp_transport->dtlssrtpdec,
"data_src", funnel, "sink_1"))
g_warn_if_reached ();
pad = gst_element_get_static_pad (funnel, "src");
ghost = gst_ghost_pad_new ("data_src", pad);
gst_element_add_pad (GST_ELEMENT (receive), ghost);
gst_object_unref (pad);
G_OBJECT_CLASS (parent_class)->constructed (object); G_OBJECT_CLASS (parent_class)->constructed (object);
} }
@ -350,6 +372,8 @@ transport_receive_bin_class_init (TransportReceiveBinClass * klass)
gst_element_class_add_static_pad_template (element_class, &rtp_sink_template); gst_element_class_add_static_pad_template (element_class, &rtp_sink_template);
gst_element_class_add_static_pad_template (element_class, gst_element_class_add_static_pad_template (element_class,
&rtcp_sink_template); &rtcp_sink_template);
gst_element_class_add_static_pad_template (element_class,
&data_sink_template);
gst_element_class_set_metadata (element_class, "WebRTC Transport Receive Bin", gst_element_class_set_metadata (element_class, "WebRTC Transport Receive Bin",
"Filter/Network/WebRTC", "A bin for webrtc connections", "Filter/Network/WebRTC", "A bin for webrtc connections",

View file

@ -27,9 +27,11 @@
/* /*
* ,------------------------transport_send_%u-------------------------, * ,------------------------transport_send_%u-------------------------,
* ; ,-----dtlssrtpenc---, ; * ; ,-----dtlssrtpenc---, ;
* rtp_sink o--------------------------o rtp_sink_0 ; ,---nicesink---, ; * data_sink o--------------------------o data_sink ; ;
* ; ; src o--o sink ; ; * ; ; ; ,---nicesink---, ;
* ; ,--outputselector--, ,-o rtcp_sink_0 ; '--------------' ; * rtp_sink o--------------------------o rtp_sink_0 src o--o sink ; ;
* ; ; ; '--------------' ;
* ; ,--outputselector--, ,-o rtcp_sink_0 ; ;
* ; ; src_0 o-' '-------------------' ; * ; ; src_0 o-' '-------------------' ;
* rtcp_sink ;---o sink ; ,----dtlssrtpenc----, ,---nicesink---, ; * rtcp_sink ;---o sink ; ,----dtlssrtpenc----, ,---nicesink---, ;
* ; ; src_1 o---o rtcp_sink_0 src o--o sink ; ; * ; ; src_1 o---o rtcp_sink_0 src o--o sink ; ;
@ -61,6 +63,12 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
GST_PAD_ALWAYS, GST_PAD_ALWAYS,
GST_STATIC_CAPS ("application/x-rtp")); GST_STATIC_CAPS ("application/x-rtp"));
static GstStaticPadTemplate data_sink_template =
GST_STATIC_PAD_TEMPLATE ("data_sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
enum enum
{ {
PROP_0, PROP_0,
@ -422,6 +430,16 @@ transport_send_bin_constructed (GObject * object)
gst_element_add_pad (GST_ELEMENT (send), ghost); gst_element_add_pad (GST_ELEMENT (send), ghost);
gst_object_unref (pad); gst_object_unref (pad);
/* push the data stream onto the RTP dtls element */
templ = _find_pad_template (transport->dtlssrtpenc,
GST_PAD_SINK, GST_PAD_REQUEST, "data_sink");
pad = gst_element_request_pad (transport->dtlssrtpenc, templ, "data_sink",
NULL);
ghost = gst_ghost_pad_new ("data_sink", pad);
gst_element_add_pad (GST_ELEMENT (send), ghost);
gst_object_unref (pad);
/* RTCP */ /* RTCP */
transport = send->stream->rtcp_transport; transport = send->stream->rtcp_transport;
/* Do the common init for the context struct */ /* Do the common init for the context struct */
@ -509,6 +527,8 @@ transport_send_bin_class_init (TransportSendBinClass * klass)
gst_element_class_add_static_pad_template (element_class, &rtp_sink_template); gst_element_class_add_static_pad_template (element_class, &rtp_sink_template);
gst_element_class_add_static_pad_template (element_class, gst_element_class_add_static_pad_template (element_class,
&rtcp_sink_template); &rtcp_sink_template);
gst_element_class_add_static_pad_template (element_class,
&data_sink_template);
gst_element_class_set_metadata (element_class, "WebRTC Transport Send Bin", gst_element_class_set_metadata (element_class, "WebRTC Transport Send Bin",
"Filter/Network/WebRTC", "A bin for webrtc connections", "Filter/Network/WebRTC", "A bin for webrtc connections",

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,83 @@
/* GStreamer
* Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_WEBRTC_DATA_CHANNEL_H__
#define __GST_WEBRTC_DATA_CHANNEL_H__
#include <gst/gst.h>
#include <gst/webrtc/webrtc_fwd.h>
#include <gst/webrtc/dtlstransport.h>
#include "sctptransport.h"
G_BEGIN_DECLS
GST_WEBRTC_API
GType gst_webrtc_data_channel_get_type(void);
#define GST_TYPE_WEBRTC_DATA_CHANNEL (gst_webrtc_data_channel_get_type())
#define GST_WEBRTC_DATA_CHANNEL(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_WEBRTC_DATA_CHANNEL,GstWebRTCDataChannel))
#define GST_IS_WEBRTC_DATA_CHANNEL(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_WEBRTC_DATA_CHANNEL))
#define GST_WEBRTC_DATA_CHANNEL_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass) ,GST_TYPE_WEBRTC_DATA_CHANNEL,GstWebRTCDataChannelClass))
#define GST_IS_WEBRTC_DATA_CHANNEL_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass) ,GST_TYPE_WEBRTC_DATA_CHANNEL))
#define GST_WEBRTC_DATA_CHANNEL_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj) ,GST_TYPE_WEBRTC_DATA_CHANNEL,GstWebRTCDataChannelClass))
typedef struct _GstWebRTCDataChannel GstWebRTCDataChannel;
typedef struct _GstWebRTCDataChannelClass GstWebRTCDataChannelClass;
struct _GstWebRTCDataChannel
{
GstObject parent;
GstWebRTCSCTPTransport *sctp_transport;
GstElement *appsrc;
GstElement *appsink;
gchar *label;
gboolean ordered;
guint max_packet_lifetime;
guint max_retransmits;
gchar *protocol;
gboolean negotiated;
gint id;
GstWebRTCPriorityType priority;
GstWebRTCDataChannelState ready_state;
guint64 buffered_amount;
guint64 buffered_amount_low_threshold;
GstWebRTCBin *webrtcbin;
gboolean opened;
gulong src_probe;
GError *stored_error;
gpointer _padding[GST_PADDING];
};
struct _GstWebRTCDataChannelClass
{
GstObjectClass parent_class;
gpointer _padding[GST_PADDING];
};
void gst_webrtc_data_channel_start_negotiation (GstWebRTCDataChannel *channel);
void gst_webrtc_data_channel_set_sctp_transport (GstWebRTCDataChannel *channel,
GstWebRTCSCTPTransport *sctp);
G_END_DECLS
#endif /* __GST_WEBRTC_DATA_CHANNEL_H__ */

View file

@ -714,3 +714,38 @@ _generate_ice_credentials (gchar ** ufrag, gchar ** password)
ice_credential_chars[g_random_int_range (0, ice_credential_chars[g_random_int_range (0,
strlen (ice_credential_chars))]; strlen (ice_credential_chars))];
} }
int
_get_sctp_port_from_media (const GstSDPMedia * media)
{
int sctpmap = -1, i;
for (i = 0; i < gst_sdp_media_attributes_len (media); i++) {
const GstSDPAttribute *attr = gst_sdp_media_get_attribute (media, i);
if (g_strcmp0 (attr->key, "sctp-port") == 0) {
return atoi (attr->value);
} else if (g_strcmp0 (attr->key, "sctpmap") == 0) {
sctpmap = atoi (attr->value);
}
}
if (sctpmap >= 0)
GST_LOG ("no sctp-port attribute in media");
return sctpmap;
}
guint64
_get_sctp_max_message_size_from_media (const GstSDPMedia * media)
{
int i;
for (i = 0; i < gst_sdp_media_attributes_len (media); i++) {
const GstSDPAttribute *attr = gst_sdp_media_get_attribute (media, i);
if (g_strcmp0 (attr->key, "max-message-size") == 0)
return atoi (attr->value);
}
return 65536;
}

View file

@ -76,5 +76,9 @@ G_GNUC_INTERNAL
gboolean _media_has_attribute_key (const GstSDPMedia * media, gboolean _media_has_attribute_key (const GstSDPMedia * media,
const gchar * key); const gchar * key);
G_GNUC_INTERNAL
int _get_sctp_port_from_media (const GstSDPMedia * media);
G_GNUC_INTERNAL
guint64 _get_sctp_max_message_size_from_media (const GstSDPMedia * media);
#endif /* __WEBRTC_UTILS_H__ */ #endif /* __WEBRTC_UTILS_H__ */

View file

@ -264,4 +264,57 @@ typedef enum /*< underscore_name=gst_webrtc_fec_type >*/
GST_WEBRTC_FEC_TYPE_ULP_RED, GST_WEBRTC_FEC_TYPE_ULP_RED,
} GstWebRTCFECType; } GstWebRTCFECType;
/**
* GstWebRTCSCTPTransportState:
* GST_WEBRTC_SCTP_TRANSPORT_STATE_NEW: new
* GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTING: connecting
* GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED: connected
* GST_WEBRTC_SCTP_TRANSPORT_STATE_CLOSED: closed
*
* See <ulink url="http://w3c.github.io/webrtc-pc/#dom-rtcsctptransportstate">http://w3c.github.io/webrtc-pc/#dom-rtcsctptransportstate</ulink>
*/
typedef enum /*< underscore_name=gst_webrtc_sctp_transport_state >*/
{
GST_WEBRTC_SCTP_TRANSPORT_STATE_NEW,
GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTING,
GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED,
GST_WEBRTC_SCTP_TRANSPORT_STATE_CLOSED,
} GstWebRTCSCTPTransportState;
/**
* GstWebRTCPriorityType:
* GST_WEBRTC_PRIORITY_TYPE_VERY_LOW: very-low
* GST_WEBRTC_PRIORITY_TYPE_LOW: low
* GST_WEBRTC_PRIORITY_TYPE_MEDIUM: medium
* GST_WEBRTC_PRIORITY_TYPE_HIGH: high
*
* See <ulink url="http://w3c.github.io/webrtc-pc/#dom-rtcprioritytype">http://w3c.github.io/webrtc-pc/#dom-rtcprioritytype</ulink>
*/
typedef enum /*< underscore_name=gst_webrtc_priority_type >*/
{
GST_WEBRTC_PRIORITY_TYPE_VERY_LOW = 1,
GST_WEBRTC_PRIORITY_TYPE_LOW,
GST_WEBRTC_PRIORITY_TYPE_MEDIUM,
GST_WEBRTC_PRIORITY_TYPE_HIGH,
} GstWebRTCPriorityType;
/**
* GstWebRTCDataChannelState:
* GST_WEBRTC_DATA_CHANNEL_STATE_NEW: new
* GST_WEBRTC_DATA_CHANNEL_STATE_CONNECTING: connection
* GST_WEBRTC_DATA_CHANNEL_STATE_OPEN: open
* GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING: closing
* GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED: closed
*
* See <ulink url="http://w3c.github.io/webrtc-pc/#dom-rtcdatachannelstate">http://w3c.github.io/webrtc-pc/#dom-rtcdatachannelstate</ulink>
*/
typedef enum /*< underscore_name=gst_webrtc_data_channel_state >*/
{
GST_WEBRTC_DATA_CHANNEL_STATE_NEW,
GST_WEBRTC_DATA_CHANNEL_STATE_CONNECTING,
GST_WEBRTC_DATA_CHANNEL_STATE_OPEN,
GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING,
GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED,
} GstWebRTCDataChannelState;
#endif /* __GST_WEBRTC_FWD_H__ */ #endif /* __GST_WEBRTC_FWD_H__ */

View file

@ -86,6 +86,12 @@ struct test_webrtc
GstElement * element, GstElement * element,
GstPromise * promise, GstPromise * promise,
gpointer user_data); gpointer user_data);
gpointer data_channel_data;
GDestroyNotify data_channel_notify;
void (*on_data_channel) (struct test_webrtc * t,
GstElement * element,
GObject *data_channel,
gpointer user_data);
gpointer answer_data; gpointer answer_data;
GDestroyNotify answer_notify; GDestroyNotify answer_notify;
void (*on_pad_added) (struct test_webrtc * t, void (*on_pad_added) (struct test_webrtc * t,
@ -290,6 +296,16 @@ _on_pad_added (GstElement * webrtc, GstPad * new_pad, struct test_webrtc *t)
g_mutex_unlock (&t->lock); g_mutex_unlock (&t->lock);
} }
static void
_on_data_channel (GstElement * webrtc, GObject * data_channel,
struct test_webrtc *t)
{
g_mutex_lock (&t->lock);
if (t->on_data_channel)
t->on_data_channel (t, webrtc, data_channel, t->data_channel_data);
g_mutex_unlock (&t->lock);
}
static void static void
_pad_added_not_reached (struct test_webrtc *t, GstElement * element, _pad_added_not_reached (struct test_webrtc *t, GstElement * element,
GstPad * pad, gpointer user_data) GstPad * pad, gpointer user_data)
@ -332,6 +348,13 @@ _offer_answer_not_reached (struct test_webrtc *t, GstElement * element,
g_assert_not_reached (); g_assert_not_reached ();
} }
static void
_on_data_channel_not_reached (struct test_webrtc *t, GstElement * element,
GObject * data_channel, gpointer user_data)
{
g_assert_not_reached ();
}
static void static void
_broadcast (struct test_webrtc *t) _broadcast (struct test_webrtc *t)
{ {
@ -387,6 +410,7 @@ test_webrtc_new (void)
ret->on_pad_added = _pad_added_not_reached; ret->on_pad_added = _pad_added_not_reached;
ret->on_offer_created = _offer_answer_not_reached; ret->on_offer_created = _offer_answer_not_reached;
ret->on_answer_created = _offer_answer_not_reached; ret->on_answer_created = _offer_answer_not_reached;
ret->on_data_channel = _on_data_channel_not_reached;
ret->bus_message = _bus_no_errors; ret->bus_message = _bus_no_errors;
g_mutex_init (&ret->lock); g_mutex_init (&ret->lock);
@ -415,6 +439,10 @@ test_webrtc_new (void)
G_CALLBACK (_on_ice_candidate), ret); G_CALLBACK (_on_ice_candidate), ret);
g_signal_connect (ret->webrtc2, "on-ice-candidate", g_signal_connect (ret->webrtc2, "on-ice-candidate",
G_CALLBACK (_on_ice_candidate), ret); G_CALLBACK (_on_ice_candidate), ret);
g_signal_connect (ret->webrtc1, "on-data-channel",
G_CALLBACK (_on_data_channel), ret);
g_signal_connect (ret->webrtc2, "on-data-channel",
G_CALLBACK (_on_data_channel), ret);
g_signal_connect (ret->webrtc1, "pad-added", G_CALLBACK (_on_pad_added), ret); g_signal_connect (ret->webrtc1, "pad-added", G_CALLBACK (_on_pad_added), ret);
g_signal_connect (ret->webrtc2, "pad-added", G_CALLBACK (_on_pad_added), ret); g_signal_connect (ret->webrtc2, "pad-added", G_CALLBACK (_on_pad_added), ret);
g_signal_connect_swapped (ret->webrtc1, "notify::ice-gathering-state", g_signal_connect_swapped (ret->webrtc1, "notify::ice-gathering-state",
@ -475,6 +503,8 @@ test_webrtc_free (struct test_webrtc *t)
t->answer_notify (t->answer_data); t->answer_notify (t->answer_data);
if (t->pad_added_notify) if (t->pad_added_notify)
t->pad_added_notify (t->pad_added_data); t->pad_added_notify (t->pad_added_data);
if (t->data_channel_notify)
t->data_channel_notify (t->data_channel_data);
fail_unless_equals_int (GST_STATE_CHANGE_SUCCESS, fail_unless_equals_int (GST_STATE_CHANGE_SUCCESS,
gst_element_set_state (t->webrtc1, GST_STATE_NULL)); gst_element_set_state (t->webrtc1, GST_STATE_NULL));
@ -522,12 +552,18 @@ test_webrtc_wait_for_answer_error_eos (struct test_webrtc *t)
test_webrtc_wait_for_state_mask (t, states); test_webrtc_wait_for_state_mask (t, states);
} }
static void
test_webrtc_signal_state_unlocked (struct test_webrtc *t, TestState state)
{
t->state = state;
g_cond_broadcast (&t->cond);
}
static void static void
test_webrtc_signal_state (struct test_webrtc *t, TestState state) test_webrtc_signal_state (struct test_webrtc *t, TestState state)
{ {
g_mutex_lock (&t->lock); g_mutex_lock (&t->lock);
t->state = state; test_webrtc_signal_state_unlocked (t, state);
g_cond_broadcast (&t->cond);
g_mutex_unlock (&t->lock); g_mutex_unlock (&t->lock);
} }
@ -1403,6 +1439,459 @@ GST_START_TEST (test_recvonly_sendonly)
GST_END_TEST; GST_END_TEST;
static gboolean
_message_media_is_datachannel (const GstSDPMessage * msg, guint media_id)
{
const GstSDPMedia *media;
if (!msg)
return FALSE;
if (gst_sdp_message_medias_len (msg) <= media_id)
return FALSE;
media = gst_sdp_message_get_media (msg, media_id);
if (g_strcmp0 (gst_sdp_media_get_media (media), "application") != 0)
return FALSE;
if (gst_sdp_media_formats_len (media) != 1)
return FALSE;
if (g_strcmp0 (gst_sdp_media_get_format (media, 0),
"webrtc-datachannel") != 0)
return FALSE;
return TRUE;
}
static void
on_sdp_has_datachannel (struct test_webrtc *t, GstElement * element,
GstWebRTCSessionDescription * desc, gpointer user_data)
{
gboolean have_data_channel = FALSE;
int i;
for (i = 0; i < gst_sdp_message_medias_len (desc->sdp); i++) {
if (_message_media_is_datachannel (desc->sdp, i)) {
/* there should only be one data channel m= section */
fail_unless_equals_int (FALSE, have_data_channel);
have_data_channel = TRUE;
}
}
fail_unless_equals_int (TRUE, have_data_channel);
}
static void
on_channel_error_not_reached (GObject * channel, GError * error,
gpointer user_data)
{
g_assert_not_reached ();
}
GST_START_TEST (test_data_channel_create)
{
struct test_webrtc *t = test_webrtc_new ();
GObject *channel = NULL;
struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
gchar *label;
t->on_negotiation_needed = NULL;
t->offer_data = &offer;
t->on_offer_created = validate_sdp;
t->answer_data = &answer;
t->on_answer_created = validate_sdp;
t->on_ice_candidate = NULL;
g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
&channel);
g_assert_nonnull (channel);
g_object_get (channel, "label", &label, NULL);
g_assert_cmpstr (label, ==, "label");
g_signal_connect (channel, "on-error",
G_CALLBACK (on_channel_error_not_reached), NULL);
test_webrtc_create_offer (t, t->webrtc1);
test_webrtc_wait_for_answer_error_eos (t);
fail_unless_equals_int (STATE_ANSWER_CREATED, t->state);
g_object_unref (channel);
g_free (label);
test_webrtc_free (t);
}
GST_END_TEST;
static void
have_data_channel (struct test_webrtc *t, GstElement * element,
GObject * our, gpointer user_data)
{
GObject *other = user_data;
gchar *our_label, *other_label;
g_signal_connect (our, "on-error", G_CALLBACK (on_channel_error_not_reached),
NULL);
g_object_get (our, "label", &our_label, NULL);
g_object_get (other, "label", &other_label, NULL);
g_assert_cmpstr (our_label, ==, other_label);
g_free (our_label);
g_free (other_label);
test_webrtc_signal_state_unlocked (t, STATE_CUSTOM);
}
GST_START_TEST (test_data_channel_remote_notify)
{
struct test_webrtc *t = test_webrtc_new ();
GObject *channel = NULL;
struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
t->on_negotiation_needed = NULL;
t->offer_data = &offer;
t->on_offer_created = validate_sdp;
t->answer_data = &answer;
t->on_answer_created = validate_sdp;
t->on_ice_candidate = NULL;
t->on_data_channel = have_data_channel;
g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
&channel);
g_assert_nonnull (channel);
t->data_channel_data = channel;
g_signal_connect (channel, "on-error",
G_CALLBACK (on_channel_error_not_reached), NULL);
gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
test_webrtc_create_offer (t, t->webrtc1);
test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
g_object_unref (channel);
test_webrtc_free (t);
}
GST_END_TEST;
static const gchar *test_string = "GStreamer WebRTC is awesome!";
static void
on_message_string (GObject * channel, const gchar * str, struct test_webrtc *t)
{
gchar *expected = g_object_steal_data (channel, "expected");
g_assert_cmpstr (expected, ==, str);
g_free (expected);
test_webrtc_signal_state (t, STATE_CUSTOM);
}
static void
have_data_channel_transfer_string (struct test_webrtc *t, GstElement * element,
GObject * our, gpointer user_data)
{
GObject *other = user_data;
GstWebRTCDataChannelState state;
g_object_get (our, "ready-state", &state, NULL);
fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state);
g_object_get (other, "ready-state", &state, NULL);
fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state);
g_object_set_data_full (our, "expected", g_strdup (test_string), g_free);
g_signal_connect (our, "on-message-string", G_CALLBACK (on_message_string),
t);
g_signal_connect (other, "on-error",
G_CALLBACK (on_channel_error_not_reached), NULL);
g_signal_emit_by_name (other, "send-string", test_string);
}
GST_START_TEST (test_data_channel_transfer_string)
{
struct test_webrtc *t = test_webrtc_new ();
GObject *channel = NULL;
struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
t->on_negotiation_needed = NULL;
t->offer_data = &offer;
t->on_offer_created = validate_sdp;
t->answer_data = &answer;
t->on_answer_created = validate_sdp;
t->on_ice_candidate = NULL;
t->on_data_channel = have_data_channel_transfer_string;
g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
&channel);
g_assert_nonnull (channel);
t->data_channel_data = channel;
g_signal_connect (channel, "on-error",
G_CALLBACK (on_channel_error_not_reached), NULL);
gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
test_webrtc_create_offer (t, t->webrtc1);
test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
g_object_unref (channel);
test_webrtc_free (t);
}
GST_END_TEST;
#define g_assert_cmpbytes(b1, b2) \
G_STMT_START { \
gsize l1, l2; \
const guint8 *d1 = g_bytes_get_data (b1, &l1); \
const guint8 *d2 = g_bytes_get_data (b2, &l2); \
g_assert_cmpmem (d1, l1, d2, l2); \
} G_STMT_END;
static void
on_message_data (GObject * channel, GBytes * data, struct test_webrtc *t)
{
GBytes *expected = g_object_steal_data (channel, "expected");
g_assert_cmpbytes (data, expected);
g_bytes_unref (expected);
test_webrtc_signal_state (t, STATE_CUSTOM);
}
static void
have_data_channel_transfer_data (struct test_webrtc *t, GstElement * element,
GObject * our, gpointer user_data)
{
GObject *other = user_data;
GBytes *data = g_bytes_new_static (test_string, strlen (test_string));
GstWebRTCDataChannelState state;
g_object_get (our, "ready-state", &state, NULL);
fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state);
g_object_get (other, "ready-state", &state, NULL);
fail_unless_equals_int (GST_WEBRTC_DATA_CHANNEL_STATE_OPEN, state);
g_object_set_data_full (our, "expected", g_bytes_ref (data),
(GDestroyNotify) g_bytes_unref);
g_signal_connect (our, "on-message-data", G_CALLBACK (on_message_data), t);
g_signal_connect (other, "on-error",
G_CALLBACK (on_channel_error_not_reached), NULL);
g_signal_emit_by_name (other, "send-data", data);
}
GST_START_TEST (test_data_channel_transfer_data)
{
struct test_webrtc *t = test_webrtc_new ();
GObject *channel = NULL;
struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
t->on_negotiation_needed = NULL;
t->offer_data = &offer;
t->on_offer_created = validate_sdp;
t->answer_data = &answer;
t->on_answer_created = validate_sdp;
t->on_ice_candidate = NULL;
t->on_data_channel = have_data_channel_transfer_data;
g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
&channel);
g_assert_nonnull (channel);
t->data_channel_data = channel;
g_signal_connect (channel, "on-error",
G_CALLBACK (on_channel_error_not_reached), NULL);
gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
test_webrtc_create_offer (t, t->webrtc1);
test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
g_object_unref (channel);
test_webrtc_free (t);
}
GST_END_TEST;
static void
have_data_channel_create_data_channel (struct test_webrtc *t,
GstElement * element, GObject * our, gpointer user_data)
{
GObject *another;
t->on_data_channel = have_data_channel_transfer_string;
g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
&another);
g_assert_nonnull (another);
t->data_channel_data = another;
g_signal_connect (another, "on-error",
G_CALLBACK (on_channel_error_not_reached), NULL);
}
GST_START_TEST (test_data_channel_create_after_negotiate)
{
struct test_webrtc *t = test_webrtc_new ();
GObject *channel = NULL;
struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
t->on_negotiation_needed = NULL;
t->offer_data = &offer;
t->on_offer_created = validate_sdp;
t->answer_data = &answer;
t->on_answer_created = validate_sdp;
t->on_ice_candidate = NULL;
t->on_data_channel = have_data_channel_create_data_channel;
g_signal_emit_by_name (t->webrtc1, "create-data-channel", "prev-label", NULL,
&channel);
g_assert_nonnull (channel);
t->data_channel_data = channel;
g_signal_connect (channel, "on-error",
G_CALLBACK (on_channel_error_not_reached), NULL);
gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
test_webrtc_create_offer (t, t->webrtc1);
test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
g_object_unref (channel);
test_webrtc_free (t);
}
GST_END_TEST;
static void
on_buffered_amount_low_emitted (GObject * channel, struct test_webrtc *t)
{
test_webrtc_signal_state (t, STATE_CUSTOM);
}
static void
have_data_channel_check_low_threshold_emitted (struct test_webrtc *t,
GstElement * element, GObject * our, gpointer user_data)
{
g_signal_connect (our, "on-buffered-amount-low",
G_CALLBACK (on_buffered_amount_low_emitted), t);
g_object_set (our, "buffered-amount-low-threshold", 1, NULL);
g_signal_connect (our, "on-error", G_CALLBACK (on_channel_error_not_reached),
NULL);
g_signal_emit_by_name (our, "send-string", "DATA");
}
GST_START_TEST (test_data_channel_low_threshold)
{
struct test_webrtc *t = test_webrtc_new ();
GObject *channel = NULL;
struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
t->on_negotiation_needed = NULL;
t->offer_data = &offer;
t->on_offer_created = validate_sdp;
t->answer_data = &answer;
t->on_answer_created = validate_sdp;
t->on_ice_candidate = NULL;
t->on_data_channel = have_data_channel_check_low_threshold_emitted;
g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
&channel);
g_assert_nonnull (channel);
t->data_channel_data = channel;
g_signal_connect (channel, "on-error",
G_CALLBACK (on_channel_error_not_reached), NULL);
gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
test_webrtc_create_offer (t, t->webrtc1);
test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
g_object_unref (channel);
test_webrtc_free (t);
}
GST_END_TEST;
static void
on_channel_error (GObject * channel, GError * error, struct test_webrtc *t)
{
g_assert_nonnull (error);
test_webrtc_signal_state (t, STATE_CUSTOM);
}
static void
have_data_channel_transfer_large_data (struct test_webrtc *t,
GstElement * element, GObject * our, gpointer user_data)
{
GObject *other = user_data;
const gsize size = 1024 * 1024;
guint8 *random_data = g_new (guint8, size);
GBytes *data;
gsize i;
for (i = 0; i < size; i++)
random_data[i] = (guint8) (i & 0xff);
data = g_bytes_new_static (random_data, size);
g_object_set_data_full (our, "expected", g_bytes_ref (data),
(GDestroyNotify) g_bytes_unref);
g_signal_connect (our, "on-message-data", G_CALLBACK (on_message_data), t);
g_signal_connect (other, "on-error", G_CALLBACK (on_channel_error), t);
g_signal_emit_by_name (other, "send-data", data);
}
GST_START_TEST (test_data_channel_max_message_size)
{
struct test_webrtc *t = test_webrtc_new ();
GObject *channel = NULL;
struct validate_sdp offer = { on_sdp_has_datachannel, NULL };
struct validate_sdp answer = { on_sdp_has_datachannel, NULL };
t->on_negotiation_needed = NULL;
t->offer_data = &offer;
t->on_offer_created = validate_sdp;
t->answer_data = &answer;
t->on_answer_created = validate_sdp;
t->on_ice_candidate = NULL;
t->on_data_channel = have_data_channel_transfer_large_data;
g_signal_emit_by_name (t->webrtc1, "create-data-channel", "label", NULL,
&channel);
g_assert_nonnull (channel);
t->data_channel_data = channel;
gst_element_set_state (t->webrtc1, GST_STATE_PLAYING);
gst_element_set_state (t->webrtc2, GST_STATE_PLAYING);
test_webrtc_create_offer (t, t->webrtc1);
test_webrtc_wait_for_state_mask (t, 1 << STATE_CUSTOM);
g_object_unref (channel);
test_webrtc_free (t);
}
GST_END_TEST;
static Suite * static Suite *
webrtcbin_suite (void) webrtcbin_suite (void)
{ {
@ -1429,6 +1918,13 @@ webrtcbin_suite (void)
tcase_add_test (tc, test_add_recvonly_transceiver); tcase_add_test (tc, test_add_recvonly_transceiver);
tcase_add_test (tc, test_recvonly_sendonly); tcase_add_test (tc, test_recvonly_sendonly);
tcase_add_test (tc, test_payload_types); tcase_add_test (tc, test_payload_types);
tcase_add_test (tc, test_data_channel_create);
tcase_add_test (tc, test_data_channel_remote_notify);
tcase_add_test (tc, test_data_channel_transfer_string);
tcase_add_test (tc, test_data_channel_transfer_data);
tcase_add_test (tc, test_data_channel_create_after_negotiate);
tcase_add_test (tc, test_data_channel_low_threshold);
tcase_add_test (tc, test_data_channel_max_message_size);
} }
if (nicesrc) if (nicesrc)