mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-27 12:11:13 +00:00
WIP: Add a new multiparty sendrecv gstreamer demo
You can join a room and an audio-only call will be started with all peers in that room. Currently uses audiotestsrc only. BUG: With >2 peers in a call, if a peer leaves, the pipeline stops outputting data from the remaining peers to the (audio) sink. TODO: JS code to allow a browser to join the call TODO: Cleanup pipeline when a peer leaves TODO: Add ICE servers to allow calls over the Internet TODO: Perhaps setup a TURN server as well
This commit is contained in:
parent
569aff43f9
commit
9b1a0e5389
2 changed files with 890 additions and 0 deletions
1
webrtc/multiparty-sendrecv/gst/.gitignore
vendored
Normal file
1
webrtc/multiparty-sendrecv/gst/.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
mp-webrtc-sendrecv
|
889
webrtc/multiparty-sendrecv/gst/mp-webrtc-sendrecv.c
Normal file
889
webrtc/multiparty-sendrecv/gst/mp-webrtc-sendrecv.c
Normal file
|
@ -0,0 +1,889 @@
|
|||
/*
|
||||
* Demo gstreamer app for negotiating and streaming a sendrecv audio-only webrtc
|
||||
* stream to all the peers in a multiparty room.
|
||||
*
|
||||
* gcc mp-webrtc-sendrecv.c $(pkg-config --cflags --libs gstreamer-webrtc-1.0 gstreamer-sdp-1.0 libsoup-2.4 json-glib-1.0) -o mp-webrtc-sendrecv
|
||||
*
|
||||
* Author: Nirbheek Chauhan <nirbheek@centricular.com>
|
||||
*/
|
||||
#include <gst/gst.h>
|
||||
#include <gst/sdp/sdp.h>
|
||||
#include <gst/webrtc/webrtc.h>
|
||||
|
||||
/* For signalling */
|
||||
#include <libsoup/soup.h>
|
||||
#include <json-glib/json-glib.h>
|
||||
|
||||
#include <string.h>
|
||||
|
||||
enum AppState {
|
||||
APP_STATE_UNKNOWN = 0,
|
||||
APP_STATE_ERROR = 1, /* generic error */
|
||||
SERVER_CONNECTING = 1000,
|
||||
SERVER_CONNECTION_ERROR,
|
||||
SERVER_CONNECTED, /* Ready to register */
|
||||
SERVER_REGISTERING = 2000,
|
||||
SERVER_REGISTRATION_ERROR,
|
||||
SERVER_REGISTERED, /* Ready to call a peer */
|
||||
SERVER_CLOSED, /* server connection closed by us or the server */
|
||||
ROOM_JOINING = 3000,
|
||||
ROOM_JOIN_ERROR,
|
||||
ROOM_JOINED,
|
||||
ROOM_CALL_NEGOTIATING = 4000, /* negotiating with some or all peers */
|
||||
ROOM_CALL_OFFERING, /* when we're the one sending the offer */
|
||||
ROOM_CALL_ANSWERING, /* when we're the one answering an offer */
|
||||
ROOM_CALL_STARTED, /* in a call with some or all peers */
|
||||
ROOM_CALL_STOPPING,
|
||||
ROOM_CALL_STOPPED,
|
||||
ROOM_CALL_ERROR,
|
||||
};
|
||||
|
||||
static GMainLoop *loop;
|
||||
static GstElement *pipeline;
|
||||
static GList *peers;
|
||||
|
||||
static SoupWebsocketConnection *ws_conn = NULL;
|
||||
static enum AppState app_state = 0;
|
||||
static const gchar *default_server_url = "wss://webrtc.nirbheek.in:8443";
|
||||
static gchar *server_url = NULL;
|
||||
static gchar *local_id = NULL;
|
||||
static gchar *room_id = NULL;
|
||||
|
||||
static GOptionEntry entries[] =
|
||||
{
|
||||
{ "name", 0, 0, G_OPTION_ARG_STRING, &local_id, "Name we will send to the server", "ID" },
|
||||
{ "room-id", 0, 0, G_OPTION_ARG_STRING, &room_id, "Room name to join or create", "ID" },
|
||||
{ "server", 0, 0, G_OPTION_ARG_STRING, &server_url, "Signalling server to connect to", "URL" },
|
||||
};
|
||||
|
||||
static gint
|
||||
compare_str_glist (gconstpointer a, gconstpointer b)
|
||||
{
|
||||
return g_strcmp0 (a, b);
|
||||
}
|
||||
|
||||
static const gchar *
|
||||
find_peer_from_list (const gchar * peer_id)
|
||||
{
|
||||
return (g_list_find_custom (peers, peer_id, compare_str_glist))->data;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
cleanup_and_quit_loop (const gchar * msg, enum AppState state)
|
||||
{
|
||||
if (msg)
|
||||
g_printerr ("%s\n", msg);
|
||||
if (state > 0)
|
||||
app_state = state;
|
||||
|
||||
if (ws_conn) {
|
||||
if (soup_websocket_connection_get_state (ws_conn) ==
|
||||
SOUP_WEBSOCKET_STATE_OPEN)
|
||||
/* This will call us again */
|
||||
soup_websocket_connection_close (ws_conn, 1000, "");
|
||||
else
|
||||
g_object_unref (ws_conn);
|
||||
}
|
||||
|
||||
if (loop) {
|
||||
g_main_loop_quit (loop);
|
||||
loop = NULL;
|
||||
}
|
||||
|
||||
/* To allow usage as a GSourceFunc */
|
||||
return G_SOURCE_REMOVE;
|
||||
}
|
||||
|
||||
static gchar*
|
||||
get_string_from_json_object (JsonObject * object)
|
||||
{
|
||||
JsonNode *root;
|
||||
JsonGenerator *generator;
|
||||
gchar *text;
|
||||
|
||||
/* Make it the root node */
|
||||
root = json_node_init_object (json_node_alloc (), object);
|
||||
generator = json_generator_new ();
|
||||
json_generator_set_root (generator, root);
|
||||
text = json_generator_to_data (generator, NULL);
|
||||
|
||||
/* Release everything */
|
||||
g_object_unref (generator);
|
||||
json_node_free (root);
|
||||
return text;
|
||||
}
|
||||
|
||||
static void
|
||||
handle_media_stream (GstPad * pad, GstElement * pipe, const char * convert_name,
|
||||
const char * sink_name)
|
||||
{
|
||||
GstPad *qpad;
|
||||
GstElement *q, *conv, *sink;
|
||||
GstPadLinkReturn ret;
|
||||
|
||||
q = gst_element_factory_make ("queue", NULL);
|
||||
g_assert_nonnull (q);
|
||||
conv = gst_element_factory_make (convert_name, NULL);
|
||||
g_assert_nonnull (conv);
|
||||
sink = gst_element_factory_make (sink_name, NULL);
|
||||
g_assert_nonnull (sink);
|
||||
gst_bin_add_many (GST_BIN (pipe), q, conv, sink, NULL);
|
||||
gst_element_sync_state_with_parent (q);
|
||||
gst_element_sync_state_with_parent (conv);
|
||||
gst_element_sync_state_with_parent (sink);
|
||||
gst_element_link_many (q, conv, sink, NULL);
|
||||
|
||||
qpad = gst_element_get_static_pad (q, "sink");
|
||||
|
||||
ret = gst_pad_link (pad, qpad);
|
||||
g_assert_cmpint (ret, ==, GST_PAD_LINK_OK);
|
||||
}
|
||||
|
||||
static void
|
||||
on_incoming_decodebin_stream (GstElement * decodebin, GstPad * pad,
|
||||
GstElement * pipe)
|
||||
{
|
||||
GstCaps *caps;
|
||||
const gchar *name;
|
||||
|
||||
if (!gst_pad_has_current_caps (pad)) {
|
||||
g_printerr ("Pad '%s' has no caps, can't do anything, ignoring\n",
|
||||
GST_PAD_NAME (pad));
|
||||
return;
|
||||
}
|
||||
|
||||
caps = gst_pad_get_current_caps (pad);
|
||||
name = gst_structure_get_name (gst_caps_get_structure (caps, 0));
|
||||
|
||||
if (g_str_has_prefix (name, "video")) {
|
||||
handle_media_stream (pad, pipe, "videoconvert", "autovideosink");
|
||||
} else if (g_str_has_prefix (name, "audio")) {
|
||||
handle_media_stream (pad, pipe, "audioconvert", "autoaudiosink");
|
||||
} else {
|
||||
g_printerr ("Unknown pad %s, ignoring", GST_PAD_NAME (pad));
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
on_incoming_stream (GstElement * webrtc, GstPad * pad, GstElement * pipe)
|
||||
{
|
||||
GstElement *decodebin;
|
||||
|
||||
if (GST_PAD_DIRECTION (pad) != GST_PAD_SRC)
|
||||
return;
|
||||
|
||||
decodebin = gst_element_factory_make ("decodebin", NULL);
|
||||
g_signal_connect (decodebin, "pad-added",
|
||||
G_CALLBACK (on_incoming_decodebin_stream), pipe);
|
||||
gst_bin_add (GST_BIN (pipe), decodebin);
|
||||
gst_element_sync_state_with_parent (decodebin);
|
||||
gst_element_link (webrtc, decodebin);
|
||||
}
|
||||
|
||||
static void
|
||||
send_room_peer_msg (const gchar * text, const gchar * peer_id)
|
||||
{
|
||||
gchar *msg;
|
||||
|
||||
msg = g_strdup_printf ("ROOM_PEER_MSG %s %s", peer_id, text);
|
||||
soup_websocket_connection_send_text (ws_conn, msg);
|
||||
g_free (msg);
|
||||
}
|
||||
|
||||
static void
|
||||
send_ice_candidate_message (GstElement * webrtc G_GNUC_UNUSED, guint mlineindex,
|
||||
gchar * candidate, const gchar * peer_id)
|
||||
{
|
||||
gchar *text;
|
||||
JsonObject *ice, *msg;
|
||||
|
||||
if (app_state < ROOM_CALL_OFFERING) {
|
||||
cleanup_and_quit_loop ("Can't send ICE, not in call", APP_STATE_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
ice = json_object_new ();
|
||||
json_object_set_string_member (ice, "candidate", candidate);
|
||||
json_object_set_int_member (ice, "sdpMLineIndex", mlineindex);
|
||||
msg = json_object_new ();
|
||||
json_object_set_object_member (msg, "ice", ice);
|
||||
text = get_string_from_json_object (msg);
|
||||
json_object_unref (msg);
|
||||
|
||||
send_room_peer_msg (text, peer_id);
|
||||
g_free (text);
|
||||
}
|
||||
|
||||
static void
|
||||
send_room_peer_sdp (GstWebRTCSessionDescription * desc, const gchar * peer_id)
|
||||
{
|
||||
JsonObject *msg, *sdp;
|
||||
gchar *text, *sdptype, *sdptext;
|
||||
|
||||
g_assert_cmpint (app_state, <, ROOM_CALL_OFFERING);
|
||||
|
||||
if (desc->type == GST_WEBRTC_SDP_TYPE_OFFER)
|
||||
sdptype = "offer";
|
||||
else if (desc->type == GST_WEBRTC_SDP_TYPE_ANSWER)
|
||||
sdptype = "answer";
|
||||
else
|
||||
g_assert_not_reached ();
|
||||
|
||||
text = gst_sdp_message_as_text (desc->sdp);
|
||||
g_print ("Sending sdp %s to %s:\n%s\n", sdptype, peer_id, text);
|
||||
|
||||
sdp = json_object_new ();
|
||||
json_object_set_string_member (sdp, "type", sdptype);
|
||||
json_object_set_string_member (sdp, "sdp", text);
|
||||
g_free (text);
|
||||
|
||||
msg = json_object_new ();
|
||||
json_object_set_object_member (msg, "sdp", sdp);
|
||||
sdptext = get_string_from_json_object (msg);
|
||||
json_object_unref (msg);
|
||||
|
||||
send_room_peer_msg (sdptext, peer_id);
|
||||
g_free (sdptext);
|
||||
}
|
||||
|
||||
/* Offer created by our pipeline, to be sent to the peer */
|
||||
static void
|
||||
on_offer_created (GstPromise * promise, const gchar * peer_id)
|
||||
{
|
||||
GstElement *webrtc;
|
||||
GstWebRTCSessionDescription *offer;
|
||||
|
||||
g_assert_cmpint (app_state, ==, ROOM_CALL_OFFERING);
|
||||
|
||||
g_assert_cmpint (promise->result, ==, GST_PROMISE_RESULT_REPLIED);
|
||||
gst_structure_get (promise->promise, "offer",
|
||||
GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
|
||||
gst_promise_unref (promise);
|
||||
|
||||
promise = gst_promise_new ();
|
||||
webrtc = gst_bin_get_by_name (GST_BIN (pipeline), peer_id);
|
||||
g_assert_nonnull (webrtc);
|
||||
g_signal_emit_by_name (webrtc, "set-local-description", offer, promise);
|
||||
gst_promise_interrupt (promise);
|
||||
gst_promise_unref (promise);
|
||||
|
||||
/* Send offer to peer */
|
||||
send_room_peer_sdp (offer, peer_id);
|
||||
gst_webrtc_session_description_free (offer);
|
||||
}
|
||||
|
||||
static void
|
||||
on_negotiation_needed (GstElement * webrtc, const gchar * peer_id)
|
||||
{
|
||||
GstPromise *promise = gst_promise_new ();
|
||||
|
||||
app_state = ROOM_CALL_OFFERING;
|
||||
g_signal_emit_by_name (webrtc, "create-offer", NULL, promise);
|
||||
gst_promise_set_change_callback (promise,
|
||||
(GstPromiseChangeFunc) on_offer_created, (gpointer) peer_id, NULL);
|
||||
}
|
||||
|
||||
static void
|
||||
add_webrtcbin_to_pipeline (const gchar * peer_id, gboolean offer)
|
||||
{
|
||||
int ret;
|
||||
GstElement *tee, *webrtc;
|
||||
GstPad *srcpad, *sinkpad;
|
||||
|
||||
webrtc = gst_element_factory_make ("webrtcbin", peer_id);
|
||||
g_assert_nonnull (webrtc);
|
||||
|
||||
tee = gst_bin_get_by_name (GST_BIN (pipeline), "audiotee");
|
||||
g_assert_nonnull (tee);
|
||||
srcpad = gst_element_get_request_pad (tee, "src_%u");
|
||||
gst_object_unref (tee);
|
||||
sinkpad = gst_element_get_request_pad (webrtc, "sink_%u");
|
||||
|
||||
/* Add the bin to the pipeline and connect it */
|
||||
gst_bin_add (GST_BIN (pipeline), webrtc);
|
||||
|
||||
ret = gst_pad_link (srcpad, sinkpad);
|
||||
g_assert_cmpint (ret, ==, GST_PAD_LINK_OK);
|
||||
gst_object_unref (srcpad);
|
||||
gst_object_unref (sinkpad);
|
||||
|
||||
/* This is the gstwebrtc entry point where we create the offer and so on. It
|
||||
* will be called when the pipeline goes to PLAYING.
|
||||
* XXX: We must connect this after webrtcbin has been linked to a source via
|
||||
* get_request_pad() otherwise webrtcbin will create an SDP offer with no
|
||||
* media lines in it. */
|
||||
if (offer)
|
||||
g_signal_connect (webrtc, "on-negotiation-needed",
|
||||
G_CALLBACK (on_negotiation_needed), (gpointer) peer_id);
|
||||
|
||||
/* We need to transmit this ICE candidate to the browser via the websockets
|
||||
* signalling server. Incoming ice candidates from the browser need to be
|
||||
* added by us too, see on_server_message() */
|
||||
g_signal_connect (webrtc, "on-ice-candidate",
|
||||
G_CALLBACK (send_ice_candidate_message), (gpointer) peer_id);
|
||||
/* Incoming streams will be exposed via this signal */
|
||||
g_signal_connect (webrtc, "pad-added", G_CALLBACK (on_incoming_stream),
|
||||
pipeline);
|
||||
|
||||
/* Set to bin to PLAYING */
|
||||
ret = gst_element_sync_state_with_parent (webrtc);
|
||||
gst_object_unref (webrtc);
|
||||
g_assert_true (ret);
|
||||
}
|
||||
|
||||
static void
|
||||
call_peer (const gchar * peer_id)
|
||||
{
|
||||
add_webrtcbin_to_pipeline (peer_id, TRUE);
|
||||
}
|
||||
|
||||
static void
|
||||
incoming_call_from_peer (const gchar * peer_id)
|
||||
{
|
||||
add_webrtcbin_to_pipeline (peer_id, FALSE);
|
||||
}
|
||||
|
||||
#define STR(x) #x
|
||||
#define RTP_CAPS_OPUS(x) "application/x-rtp,media=audio,encoding-name=OPUS,payload=" STR(x)
|
||||
#define RTP_CAPS_VP8(x) "application/x-rtp,media=video,encoding-name=VP8,payload=" STR(x)
|
||||
|
||||
static gboolean
|
||||
start_pipeline (void)
|
||||
{
|
||||
GstStateChangeReturn ret;
|
||||
GError *error = NULL;
|
||||
|
||||
/* NOTE: webrtcbin currently does not support dynamic addition/removal of
|
||||
* streams, so we use a separate webrtcbin for each peer, but all of them are
|
||||
* inside the same pipeline. We start by connecting it to a fakesink so that
|
||||
* we can preroll early. */
|
||||
pipeline = gst_parse_launch ("tee name=audiotee ! queue ! fakesink "
|
||||
"audiotestsrc is-live=true wave=red-noise ! queue ! opusenc ! rtpopuspay ! "
|
||||
"queue ! " RTP_CAPS_OPUS(96) " ! audiotee. ",
|
||||
&error);
|
||||
|
||||
if (error) {
|
||||
g_printerr ("Failed to parse launch: %s\n", error->message);
|
||||
g_error_free (error);
|
||||
goto err;
|
||||
}
|
||||
|
||||
g_print ("Starting pipeline, not transmitting yet\n");
|
||||
ret = gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
|
||||
if (ret == GST_STATE_CHANGE_FAILURE)
|
||||
goto err;
|
||||
|
||||
return TRUE;
|
||||
|
||||
err:
|
||||
if (pipeline)
|
||||
g_clear_object (&pipeline);
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
join_room_on_server (void)
|
||||
{
|
||||
gchar *msg;
|
||||
|
||||
if (soup_websocket_connection_get_state (ws_conn) !=
|
||||
SOUP_WEBSOCKET_STATE_OPEN)
|
||||
return FALSE;
|
||||
|
||||
if (!room_id)
|
||||
return FALSE;
|
||||
|
||||
g_print ("Joining room %s\n", room_id);
|
||||
app_state = ROOM_JOINING;
|
||||
msg = g_strdup_printf ("ROOM %s", room_id);
|
||||
soup_websocket_connection_send_text (ws_conn, msg);
|
||||
g_free (msg);
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
register_with_server (void)
|
||||
{
|
||||
gchar *hello;
|
||||
|
||||
if (soup_websocket_connection_get_state (ws_conn) !=
|
||||
SOUP_WEBSOCKET_STATE_OPEN)
|
||||
return FALSE;
|
||||
|
||||
g_print ("Registering id %s with server\n", local_id);
|
||||
app_state = SERVER_REGISTERING;
|
||||
|
||||
/* Register with the server with a random integer id. Reply will be received
|
||||
* by on_server_message() */
|
||||
hello = g_strdup_printf ("HELLO %s", local_id);
|
||||
soup_websocket_connection_send_text (ws_conn, hello);
|
||||
g_free (hello);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static void
|
||||
on_server_closed (SoupWebsocketConnection * conn G_GNUC_UNUSED,
|
||||
gpointer user_data G_GNUC_UNUSED)
|
||||
{
|
||||
app_state = SERVER_CLOSED;
|
||||
cleanup_and_quit_loop ("Server connection closed", 0);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
do_registration (void)
|
||||
{
|
||||
if (app_state != SERVER_REGISTERING) {
|
||||
cleanup_and_quit_loop ("ERROR: Received HELLO when not registering",
|
||||
APP_STATE_ERROR);
|
||||
return FALSE;
|
||||
}
|
||||
app_state = SERVER_REGISTERED;
|
||||
g_print ("Registered with server\n");
|
||||
/* Ask signalling server that we want to join a room */
|
||||
if (!join_room_on_server ()) {
|
||||
cleanup_and_quit_loop ("ERROR: Failed to join room", ROOM_CALL_ERROR);
|
||||
return FALSE;
|
||||
}
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
/*
|
||||
* When we join a room, we are responsible for calling by starting negotiation
|
||||
* with each peer in it by sending an SDP offer and ICE candidates.
|
||||
*/
|
||||
static void
|
||||
do_join_room (const gchar * text)
|
||||
{
|
||||
gint ii, len;
|
||||
gchar **peer_ids;
|
||||
|
||||
if (app_state != ROOM_JOINING) {
|
||||
cleanup_and_quit_loop ("ERROR: Received ROOM_OK when not calling",
|
||||
ROOM_JOIN_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
app_state = ROOM_JOINED;
|
||||
/* Start recording, but not transmitting */
|
||||
if (!start_pipeline ()) {
|
||||
cleanup_and_quit_loop ("ERROR: Failed to start pipeline",
|
||||
ROOM_CALL_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
peer_ids = g_strsplit (text, " ", -1);
|
||||
g_assert_cmpstr (peer_ids[0], ==, "ROOM_OK");
|
||||
len = g_strv_length (peer_ids);
|
||||
/* There are peers in the room already. We need to start negotiation
|
||||
* (exchange SDP and ICE candidates) and transmission of media. */
|
||||
if (len > 1 && strlen (peer_ids[1]) > 0) {
|
||||
g_print ("Found %i peers already in room\n", len - 1);
|
||||
app_state = ROOM_CALL_OFFERING;
|
||||
for (ii = 1; ii < len; ii++) {
|
||||
gchar *peer_id = g_strdup (peer_ids[ii]);
|
||||
g_print ("Negotiating with peer %s\n", peer_id);
|
||||
/* This might fail asynchronously */
|
||||
call_peer (peer_id);
|
||||
peers = g_list_prepend (peers, peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
g_strfreev (peer_ids);
|
||||
return;
|
||||
}
|
||||
|
||||
static void
|
||||
handle_error_message (const gchar * msg)
|
||||
{
|
||||
switch (app_state) {
|
||||
case SERVER_CONNECTING:
|
||||
app_state = SERVER_CONNECTION_ERROR;
|
||||
break;
|
||||
case SERVER_REGISTERING:
|
||||
app_state = SERVER_REGISTRATION_ERROR;
|
||||
break;
|
||||
case ROOM_JOINING:
|
||||
app_state = ROOM_JOIN_ERROR;
|
||||
break;
|
||||
case ROOM_JOINED:
|
||||
case ROOM_CALL_NEGOTIATING:
|
||||
case ROOM_CALL_OFFERING:
|
||||
case ROOM_CALL_ANSWERING:
|
||||
app_state = ROOM_CALL_ERROR;
|
||||
break;
|
||||
case ROOM_CALL_STARTED:
|
||||
case ROOM_CALL_STOPPING:
|
||||
case ROOM_CALL_STOPPED:
|
||||
app_state = ROOM_CALL_ERROR;
|
||||
break;
|
||||
default:
|
||||
app_state = APP_STATE_ERROR;
|
||||
}
|
||||
cleanup_and_quit_loop (msg, 0);
|
||||
}
|
||||
|
||||
static void
|
||||
on_answer_created (GstPromise * promise, const gchar * peer_id)
|
||||
{
|
||||
GstElement *webrtc;
|
||||
GstWebRTCSessionDescription *answer;
|
||||
|
||||
g_assert_cmpint (app_state, ==, ROOM_CALL_ANSWERING);
|
||||
|
||||
g_assert_cmpint (promise->result, ==, GST_PROMISE_RESULT_REPLIED);
|
||||
gst_structure_get (promise->promise, "answer",
|
||||
GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &answer, NULL);
|
||||
gst_promise_unref (promise);
|
||||
|
||||
promise = gst_promise_new ();
|
||||
webrtc = gst_bin_get_by_name (GST_BIN (pipeline), peer_id);
|
||||
g_assert_nonnull (webrtc);
|
||||
g_signal_emit_by_name (webrtc, "set-local-description", answer, promise);
|
||||
gst_promise_interrupt (promise);
|
||||
gst_promise_unref (promise);
|
||||
|
||||
/* Send offer to peer */
|
||||
send_room_peer_sdp (answer, peer_id);
|
||||
gst_webrtc_session_description_free (answer);
|
||||
|
||||
app_state = ROOM_CALL_STARTED;
|
||||
}
|
||||
|
||||
static void
|
||||
handle_sdp_offer (const gchar * peer_id, const gchar * text)
|
||||
{
|
||||
int ret;
|
||||
GstPromise *promise;
|
||||
GstElement *webrtc;
|
||||
GstSDPMessage *sdp;
|
||||
GstWebRTCSessionDescription *offer;
|
||||
|
||||
g_assert_cmpint (app_state, ==, ROOM_CALL_ANSWERING);
|
||||
|
||||
g_print ("Received offer:\n%s\n", text);
|
||||
|
||||
ret = gst_sdp_message_new (&sdp);
|
||||
g_assert_cmpint (ret, ==, GST_SDP_OK);
|
||||
|
||||
ret = gst_sdp_message_parse_buffer (text, strlen (text), sdp);
|
||||
g_assert_cmpint (ret, ==, GST_SDP_OK);
|
||||
|
||||
offer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_OFFER, sdp);
|
||||
g_assert_nonnull (offer);
|
||||
|
||||
/* Set remote description on our pipeline */
|
||||
promise = gst_promise_new ();
|
||||
webrtc = gst_bin_get_by_name (GST_BIN (pipeline), peer_id);
|
||||
g_assert_nonnull (webrtc);
|
||||
g_signal_emit_by_name (webrtc, "set-remote-description", offer, promise);
|
||||
/* We don't want to be notified when the action is done */
|
||||
gst_promise_interrupt (promise);
|
||||
gst_promise_unref (promise);
|
||||
|
||||
/* Create an answer that we will send back to the peer */
|
||||
promise = gst_promise_new ();
|
||||
gst_promise_set_change_callback (promise,
|
||||
(GstPromiseChangeFunc) on_answer_created, (gpointer) peer_id, NULL);
|
||||
g_signal_emit_by_name (webrtc, "create-answer", NULL, promise);
|
||||
|
||||
gst_webrtc_session_description_free (offer);
|
||||
gst_object_unref (webrtc);
|
||||
}
|
||||
|
||||
static void
|
||||
handle_sdp_answer (const gchar * peer_id, const gchar * text)
|
||||
{
|
||||
int ret;
|
||||
GstPromise *promise;
|
||||
GstElement *webrtc;
|
||||
GstSDPMessage *sdp;
|
||||
GstWebRTCSessionDescription *answer;
|
||||
|
||||
g_assert_cmpint (app_state, >=, ROOM_CALL_OFFERING);
|
||||
|
||||
g_print ("Received answer:\n%s\n", text);
|
||||
|
||||
ret = gst_sdp_message_new (&sdp);
|
||||
g_assert_cmpint (ret, ==, GST_SDP_OK);
|
||||
|
||||
ret = gst_sdp_message_parse_buffer (text, strlen (text), sdp);
|
||||
g_assert_cmpint (ret, ==, GST_SDP_OK);
|
||||
|
||||
answer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_ANSWER, sdp);
|
||||
g_assert_nonnull (answer);
|
||||
|
||||
/* Set remote description on our pipeline */
|
||||
promise = gst_promise_new ();
|
||||
webrtc = gst_bin_get_by_name (GST_BIN (pipeline), peer_id);
|
||||
g_assert_nonnull (webrtc);
|
||||
g_signal_emit_by_name (webrtc, "set-remote-description", answer, promise);
|
||||
gst_object_unref (webrtc);
|
||||
/* We don't want to be notified when the action is done */
|
||||
gst_promise_interrupt (promise);
|
||||
gst_promise_unref (promise);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
handle_peer_message (const gchar * peer_id, const gchar * msg)
|
||||
{
|
||||
JsonNode *root;
|
||||
JsonObject *object, *child;
|
||||
JsonParser *parser = json_parser_new ();
|
||||
if (!json_parser_load_from_data (parser, msg, -1, NULL)) {
|
||||
g_printerr ("Unknown message '%s' from '%s', ignoring", msg, peer_id);
|
||||
g_object_unref (parser);
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
root = json_parser_get_root (parser);
|
||||
if (!JSON_NODE_HOLDS_OBJECT (root)) {
|
||||
g_printerr ("Unknown json message '%s' from '%s', ignoring", msg, peer_id);
|
||||
g_object_unref (parser);
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
g_print ("Message from peer %s: %s\n", peer_id, msg);
|
||||
|
||||
object = json_node_get_object (root);
|
||||
/* Check type of JSON message */
|
||||
if (json_object_has_member (object, "sdp")) {
|
||||
int ret;
|
||||
GstSDPMessage *sdp;
|
||||
const gchar *text, *sdp_type;
|
||||
GstWebRTCSessionDescription *answer;
|
||||
|
||||
g_assert_cmpint (app_state, >=, ROOM_JOINED);
|
||||
|
||||
child = json_object_get_object_member (object, "sdp");
|
||||
|
||||
if (!json_object_has_member (child, "type")) {
|
||||
cleanup_and_quit_loop ("ERROR: received SDP without 'type'",
|
||||
ROOM_CALL_ERROR);
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
sdp_type = json_object_get_string_member (child, "type");
|
||||
text = json_object_get_string_member (child, "sdp");
|
||||
|
||||
if (g_strcmp0 (sdp_type, "offer") == 0) {
|
||||
app_state = ROOM_CALL_ANSWERING;
|
||||
incoming_call_from_peer (peer_id);
|
||||
handle_sdp_offer (peer_id, text);
|
||||
} else if (g_strcmp0 (sdp_type, "answer") == 0) {
|
||||
g_assert_cmpint (app_state, >=, ROOM_CALL_OFFERING);
|
||||
handle_sdp_answer (peer_id, text);
|
||||
app_state = ROOM_CALL_STARTED;
|
||||
} else {
|
||||
cleanup_and_quit_loop ("ERROR: invalid sdp_type", ROOM_CALL_ERROR);
|
||||
return FALSE;
|
||||
}
|
||||
} else if (json_object_has_member (object, "ice")) {
|
||||
GstElement *webrtc;
|
||||
const gchar *candidate;
|
||||
gint sdpmlineindex;
|
||||
|
||||
child = json_object_get_object_member (object, "ice");
|
||||
candidate = json_object_get_string_member (child, "candidate");
|
||||
sdpmlineindex = json_object_get_int_member (child, "sdpMLineIndex");
|
||||
|
||||
/* Add ice candidate sent by remote peer */
|
||||
webrtc = gst_bin_get_by_name (GST_BIN (pipeline), peer_id);
|
||||
g_assert_nonnull (webrtc);
|
||||
g_signal_emit_by_name (webrtc, "add-ice-candidate", sdpmlineindex,
|
||||
candidate);
|
||||
gst_object_unref (webrtc);
|
||||
} else {
|
||||
g_printerr ("Ignoring unknown JSON message:\n%s\n", msg);
|
||||
}
|
||||
g_object_unref (parser);
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
/* One mega message handler for our asynchronous calling mechanism */
|
||||
static void
|
||||
on_server_message (SoupWebsocketConnection * conn, SoupWebsocketDataType type,
|
||||
GBytes * message, gpointer user_data)
|
||||
{
|
||||
gsize size;
|
||||
gchar *text, *data;
|
||||
|
||||
switch (type) {
|
||||
case SOUP_WEBSOCKET_DATA_BINARY:
|
||||
g_printerr ("Received unknown binary message, ignoring\n");
|
||||
g_bytes_unref (message);
|
||||
return;
|
||||
case SOUP_WEBSOCKET_DATA_TEXT:
|
||||
data = g_bytes_unref_to_data (message, &size);
|
||||
/* Convert to NULL-terminated string */
|
||||
text = g_strndup (data, size);
|
||||
g_free (data);
|
||||
break;
|
||||
default:
|
||||
g_assert_not_reached ();
|
||||
}
|
||||
|
||||
/* Server has accepted our registration, we are ready to send commands */
|
||||
if (g_strcmp0 (text, "HELLO") == 0) {
|
||||
/* May fail asynchronously */
|
||||
do_registration ();
|
||||
/* Room-related message */
|
||||
} else if (g_str_has_prefix (text, "ROOM_")) {
|
||||
/* Room joined, now we can start negotiation */
|
||||
if (g_str_has_prefix (text, "ROOM_OK ")) {
|
||||
/* May fail asynchronously */
|
||||
do_join_room (text);
|
||||
} else if (g_str_has_prefix (text, "ROOM_PEER")) {
|
||||
gchar **splitm = NULL;
|
||||
const gchar *peer_id;
|
||||
/* SDP and ICE, usually */
|
||||
if (g_str_has_prefix (text, "ROOM_PEER_MSG")) {
|
||||
splitm = g_strsplit (text, " ", 3);
|
||||
peer_id = find_peer_from_list (splitm[1]);
|
||||
g_assert_nonnull (peer_id);
|
||||
/* Could be an offer or an answer, or ICE, or an arbitrary message */
|
||||
handle_peer_message (peer_id, splitm[2]);
|
||||
} else if (g_str_has_prefix (text, "ROOM_PEER_JOINED")) {
|
||||
splitm = g_strsplit (text, " ", 2);
|
||||
peers = g_list_prepend (peers, g_strdup (splitm[1]));
|
||||
peer_id = find_peer_from_list (splitm[1]);
|
||||
g_assert_nonnull (peer_id);
|
||||
g_print ("Peer %s has joined the room\n", peer_id);
|
||||
} else if (g_str_has_prefix (text, "ROOM_PEER_LEFT")) {
|
||||
splitm = g_strsplit (text, " ", 2);
|
||||
peer_id = find_peer_from_list (splitm[1]);
|
||||
g_assert_nonnull (peer_id);
|
||||
peers = g_list_remove (peers, peer_id);
|
||||
g_print ("Peer %s has left the room\n", peer_id);
|
||||
g_free ((gchar*) peer_id);
|
||||
/* TODO: cleanup pipeline */
|
||||
} else {
|
||||
g_printerr ("WARNING: Ignoring unknown message %s\n", text);
|
||||
}
|
||||
g_strfreev (splitm);
|
||||
} else {
|
||||
goto err;
|
||||
}
|
||||
/* Handle errors */
|
||||
} else if (g_str_has_prefix (text, "ERROR")) {
|
||||
handle_error_message (text);
|
||||
} else {
|
||||
goto err;
|
||||
}
|
||||
|
||||
out:
|
||||
g_free (text);
|
||||
return;
|
||||
|
||||
err:
|
||||
{
|
||||
gchar *err_s = g_strdup_printf ("ERROR: unknown message %s", text);
|
||||
cleanup_and_quit_loop (err_s, 0);
|
||||
g_free (err_s);
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
on_server_connected (SoupSession * session, GAsyncResult * res,
|
||||
SoupMessage *msg)
|
||||
{
|
||||
GError *error = NULL;
|
||||
|
||||
ws_conn = soup_session_websocket_connect_finish (session, res, &error);
|
||||
if (error) {
|
||||
cleanup_and_quit_loop (error->message, SERVER_CONNECTION_ERROR);
|
||||
g_error_free (error);
|
||||
return;
|
||||
}
|
||||
|
||||
g_assert_nonnull (ws_conn);
|
||||
|
||||
app_state = SERVER_CONNECTED;
|
||||
g_print ("Connected to signalling server\n");
|
||||
|
||||
g_signal_connect (ws_conn, "closed", G_CALLBACK (on_server_closed), NULL);
|
||||
g_signal_connect (ws_conn, "message", G_CALLBACK (on_server_message), NULL);
|
||||
|
||||
/* Register with the server so it knows about us and can accept commands
|
||||
* responses from the server will be handled in on_server_message() above */
|
||||
register_with_server ();
|
||||
}
|
||||
|
||||
/*
|
||||
* Connect to the signalling server. This is the entrypoint for everything else.
|
||||
*/
|
||||
static void
|
||||
connect_to_websocket_server_async (void)
|
||||
{
|
||||
SoupLogger *logger;
|
||||
SoupMessage *message;
|
||||
SoupSession *session;
|
||||
const char *https_aliases[] = {"wss", NULL};
|
||||
|
||||
session = soup_session_new_with_options (SOUP_SESSION_SSL_STRICT, TRUE,
|
||||
SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
|
||||
//SOUP_SESSION_SSL_CA_FILE, "/etc/ssl/certs/ca-bundle.crt",
|
||||
SOUP_SESSION_HTTPS_ALIASES, https_aliases, NULL);
|
||||
|
||||
logger = soup_logger_new (SOUP_LOGGER_LOG_BODY, -1);
|
||||
soup_session_add_feature (session, SOUP_SESSION_FEATURE (logger));
|
||||
g_object_unref (logger);
|
||||
|
||||
message = soup_message_new (SOUP_METHOD_GET, server_url);
|
||||
|
||||
g_print ("Connecting to server...\n");
|
||||
|
||||
/* Once connected, we will register */
|
||||
soup_session_websocket_connect_async (session, message, NULL, NULL, NULL,
|
||||
(GAsyncReadyCallback) on_server_connected, message);
|
||||
app_state = SERVER_CONNECTING;
|
||||
}
|
||||
|
||||
int
|
||||
main (int argc, char *argv[])
|
||||
{
|
||||
SoupSession *session;
|
||||
GOptionContext *context;
|
||||
GError *error = NULL;
|
||||
|
||||
context = g_option_context_new ("- gstreamer webrtc sendrecv demo");
|
||||
g_option_context_add_main_entries (context, entries, NULL);
|
||||
g_option_context_add_group (context, gst_init_get_option_group ());
|
||||
if (!g_option_context_parse (context, &argc, &argv, &error)) {
|
||||
g_printerr ("Error initializing: %s\n", error->message);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!room_id) {
|
||||
g_printerr ("--room-id is a required argument\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!local_id)
|
||||
local_id = g_strdup_printf ("%s-%i", g_get_user_name (),
|
||||
g_random_int_range (10, 10000));
|
||||
/* Sanitize by removing whitespace, modifies string in-place */
|
||||
g_strdelimit (local_id, " \t\n\r", '-');
|
||||
|
||||
g_print ("Our local id is %s\n", local_id);
|
||||
|
||||
if (!server_url)
|
||||
server_url = g_strdup (default_server_url);
|
||||
|
||||
loop = g_main_loop_new (NULL, FALSE);
|
||||
|
||||
connect_to_websocket_server_async ();
|
||||
|
||||
g_main_loop_run (loop);
|
||||
|
||||
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
|
||||
g_print ("Pipeline stopped\n");
|
||||
|
||||
gst_object_unref (pipeline);
|
||||
g_free (server_url);
|
||||
g_free (local_id);
|
||||
g_free (room_id);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in a new issue