From 9b1a0e538992444b292f8bbca38bbe4a7da4c7ab Mon Sep 17 00:00:00 2001 From: Nirbheek Chauhan Date: Mon, 30 Oct 2017 09:09:36 +0530 Subject: [PATCH] WIP: Add a new multiparty sendrecv gstreamer demo You can join a room and an audio-only call will be started with all peers in that room. Currently uses audiotestsrc only. BUG: With >2 peers in a call, if a peer leaves, the pipeline stops outputting data from the remaining peers to the (audio) sink. TODO: JS code to allow a browser to join the call TODO: Cleanup pipeline when a peer leaves TODO: Add ICE servers to allow calls over the Internet TODO: Perhaps setup a TURN server as well --- webrtc/multiparty-sendrecv/gst/.gitignore | 1 + .../gst/mp-webrtc-sendrecv.c | 889 ++++++++++++++++++ 2 files changed, 890 insertions(+) create mode 100644 webrtc/multiparty-sendrecv/gst/.gitignore create mode 100644 webrtc/multiparty-sendrecv/gst/mp-webrtc-sendrecv.c diff --git a/webrtc/multiparty-sendrecv/gst/.gitignore b/webrtc/multiparty-sendrecv/gst/.gitignore new file mode 100644 index 0000000000..e1b42bed77 --- /dev/null +++ b/webrtc/multiparty-sendrecv/gst/.gitignore @@ -0,0 +1 @@ +mp-webrtc-sendrecv diff --git a/webrtc/multiparty-sendrecv/gst/mp-webrtc-sendrecv.c b/webrtc/multiparty-sendrecv/gst/mp-webrtc-sendrecv.c new file mode 100644 index 0000000000..ed25f117c4 --- /dev/null +++ b/webrtc/multiparty-sendrecv/gst/mp-webrtc-sendrecv.c @@ -0,0 +1,889 @@ +/* + * Demo gstreamer app for negotiating and streaming a sendrecv audio-only webrtc + * stream to all the peers in a multiparty room. + * + * gcc mp-webrtc-sendrecv.c $(pkg-config --cflags --libs gstreamer-webrtc-1.0 gstreamer-sdp-1.0 libsoup-2.4 json-glib-1.0) -o mp-webrtc-sendrecv + * + * Author: Nirbheek Chauhan + */ +#include +#include +#include + +/* For signalling */ +#include +#include + +#include + +enum AppState { + APP_STATE_UNKNOWN = 0, + APP_STATE_ERROR = 1, /* generic error */ + SERVER_CONNECTING = 1000, + SERVER_CONNECTION_ERROR, + SERVER_CONNECTED, /* Ready to register */ + SERVER_REGISTERING = 2000, + SERVER_REGISTRATION_ERROR, + SERVER_REGISTERED, /* Ready to call a peer */ + SERVER_CLOSED, /* server connection closed by us or the server */ + ROOM_JOINING = 3000, + ROOM_JOIN_ERROR, + ROOM_JOINED, + ROOM_CALL_NEGOTIATING = 4000, /* negotiating with some or all peers */ + ROOM_CALL_OFFERING, /* when we're the one sending the offer */ + ROOM_CALL_ANSWERING, /* when we're the one answering an offer */ + ROOM_CALL_STARTED, /* in a call with some or all peers */ + ROOM_CALL_STOPPING, + ROOM_CALL_STOPPED, + ROOM_CALL_ERROR, +}; + +static GMainLoop *loop; +static GstElement *pipeline; +static GList *peers; + +static SoupWebsocketConnection *ws_conn = NULL; +static enum AppState app_state = 0; +static const gchar *default_server_url = "wss://webrtc.nirbheek.in:8443"; +static gchar *server_url = NULL; +static gchar *local_id = NULL; +static gchar *room_id = NULL; + +static GOptionEntry entries[] = +{ + { "name", 0, 0, G_OPTION_ARG_STRING, &local_id, "Name we will send to the server", "ID" }, + { "room-id", 0, 0, G_OPTION_ARG_STRING, &room_id, "Room name to join or create", "ID" }, + { "server", 0, 0, G_OPTION_ARG_STRING, &server_url, "Signalling server to connect to", "URL" }, +}; + +static gint +compare_str_glist (gconstpointer a, gconstpointer b) +{ + return g_strcmp0 (a, b); +} + +static const gchar * +find_peer_from_list (const gchar * peer_id) +{ + return (g_list_find_custom (peers, peer_id, compare_str_glist))->data; +} + +static gboolean +cleanup_and_quit_loop (const gchar * msg, enum AppState state) +{ + if (msg) + g_printerr ("%s\n", msg); + if (state > 0) + app_state = state; + + if (ws_conn) { + if (soup_websocket_connection_get_state (ws_conn) == + SOUP_WEBSOCKET_STATE_OPEN) + /* This will call us again */ + soup_websocket_connection_close (ws_conn, 1000, ""); + else + g_object_unref (ws_conn); + } + + if (loop) { + g_main_loop_quit (loop); + loop = NULL; + } + + /* To allow usage as a GSourceFunc */ + return G_SOURCE_REMOVE; +} + +static gchar* +get_string_from_json_object (JsonObject * object) +{ + JsonNode *root; + JsonGenerator *generator; + gchar *text; + + /* Make it the root node */ + root = json_node_init_object (json_node_alloc (), object); + generator = json_generator_new (); + json_generator_set_root (generator, root); + text = json_generator_to_data (generator, NULL); + + /* Release everything */ + g_object_unref (generator); + json_node_free (root); + return text; +} + +static void +handle_media_stream (GstPad * pad, GstElement * pipe, const char * convert_name, + const char * sink_name) +{ + GstPad *qpad; + GstElement *q, *conv, *sink; + GstPadLinkReturn ret; + + q = gst_element_factory_make ("queue", NULL); + g_assert_nonnull (q); + conv = gst_element_factory_make (convert_name, NULL); + g_assert_nonnull (conv); + sink = gst_element_factory_make (sink_name, NULL); + g_assert_nonnull (sink); + gst_bin_add_many (GST_BIN (pipe), q, conv, sink, NULL); + gst_element_sync_state_with_parent (q); + gst_element_sync_state_with_parent (conv); + gst_element_sync_state_with_parent (sink); + gst_element_link_many (q, conv, sink, NULL); + + qpad = gst_element_get_static_pad (q, "sink"); + + ret = gst_pad_link (pad, qpad); + g_assert_cmpint (ret, ==, GST_PAD_LINK_OK); +} + +static void +on_incoming_decodebin_stream (GstElement * decodebin, GstPad * pad, + GstElement * pipe) +{ + GstCaps *caps; + const gchar *name; + + if (!gst_pad_has_current_caps (pad)) { + g_printerr ("Pad '%s' has no caps, can't do anything, ignoring\n", + GST_PAD_NAME (pad)); + return; + } + + caps = gst_pad_get_current_caps (pad); + name = gst_structure_get_name (gst_caps_get_structure (caps, 0)); + + if (g_str_has_prefix (name, "video")) { + handle_media_stream (pad, pipe, "videoconvert", "autovideosink"); + } else if (g_str_has_prefix (name, "audio")) { + handle_media_stream (pad, pipe, "audioconvert", "autoaudiosink"); + } else { + g_printerr ("Unknown pad %s, ignoring", GST_PAD_NAME (pad)); + } +} + +static void +on_incoming_stream (GstElement * webrtc, GstPad * pad, GstElement * pipe) +{ + GstElement *decodebin; + + if (GST_PAD_DIRECTION (pad) != GST_PAD_SRC) + return; + + decodebin = gst_element_factory_make ("decodebin", NULL); + g_signal_connect (decodebin, "pad-added", + G_CALLBACK (on_incoming_decodebin_stream), pipe); + gst_bin_add (GST_BIN (pipe), decodebin); + gst_element_sync_state_with_parent (decodebin); + gst_element_link (webrtc, decodebin); +} + +static void +send_room_peer_msg (const gchar * text, const gchar * peer_id) +{ + gchar *msg; + + msg = g_strdup_printf ("ROOM_PEER_MSG %s %s", peer_id, text); + soup_websocket_connection_send_text (ws_conn, msg); + g_free (msg); +} + +static void +send_ice_candidate_message (GstElement * webrtc G_GNUC_UNUSED, guint mlineindex, + gchar * candidate, const gchar * peer_id) +{ + gchar *text; + JsonObject *ice, *msg; + + if (app_state < ROOM_CALL_OFFERING) { + cleanup_and_quit_loop ("Can't send ICE, not in call", APP_STATE_ERROR); + return; + } + + ice = json_object_new (); + json_object_set_string_member (ice, "candidate", candidate); + json_object_set_int_member (ice, "sdpMLineIndex", mlineindex); + msg = json_object_new (); + json_object_set_object_member (msg, "ice", ice); + text = get_string_from_json_object (msg); + json_object_unref (msg); + + send_room_peer_msg (text, peer_id); + g_free (text); +} + +static void +send_room_peer_sdp (GstWebRTCSessionDescription * desc, const gchar * peer_id) +{ + JsonObject *msg, *sdp; + gchar *text, *sdptype, *sdptext; + + g_assert_cmpint (app_state, <, ROOM_CALL_OFFERING); + + if (desc->type == GST_WEBRTC_SDP_TYPE_OFFER) + sdptype = "offer"; + else if (desc->type == GST_WEBRTC_SDP_TYPE_ANSWER) + sdptype = "answer"; + else + g_assert_not_reached (); + + text = gst_sdp_message_as_text (desc->sdp); + g_print ("Sending sdp %s to %s:\n%s\n", sdptype, peer_id, text); + + sdp = json_object_new (); + json_object_set_string_member (sdp, "type", sdptype); + json_object_set_string_member (sdp, "sdp", text); + g_free (text); + + msg = json_object_new (); + json_object_set_object_member (msg, "sdp", sdp); + sdptext = get_string_from_json_object (msg); + json_object_unref (msg); + + send_room_peer_msg (sdptext, peer_id); + g_free (sdptext); +} + +/* Offer created by our pipeline, to be sent to the peer */ +static void +on_offer_created (GstPromise * promise, const gchar * peer_id) +{ + GstElement *webrtc; + GstWebRTCSessionDescription *offer; + + g_assert_cmpint (app_state, ==, ROOM_CALL_OFFERING); + + g_assert_cmpint (promise->result, ==, GST_PROMISE_RESULT_REPLIED); + gst_structure_get (promise->promise, "offer", + GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL); + gst_promise_unref (promise); + + promise = gst_promise_new (); + webrtc = gst_bin_get_by_name (GST_BIN (pipeline), peer_id); + g_assert_nonnull (webrtc); + g_signal_emit_by_name (webrtc, "set-local-description", offer, promise); + gst_promise_interrupt (promise); + gst_promise_unref (promise); + + /* Send offer to peer */ + send_room_peer_sdp (offer, peer_id); + gst_webrtc_session_description_free (offer); +} + +static void +on_negotiation_needed (GstElement * webrtc, const gchar * peer_id) +{ + GstPromise *promise = gst_promise_new (); + + app_state = ROOM_CALL_OFFERING; + g_signal_emit_by_name (webrtc, "create-offer", NULL, promise); + gst_promise_set_change_callback (promise, + (GstPromiseChangeFunc) on_offer_created, (gpointer) peer_id, NULL); +} + +static void +add_webrtcbin_to_pipeline (const gchar * peer_id, gboolean offer) +{ + int ret; + GstElement *tee, *webrtc; + GstPad *srcpad, *sinkpad; + + webrtc = gst_element_factory_make ("webrtcbin", peer_id); + g_assert_nonnull (webrtc); + + tee = gst_bin_get_by_name (GST_BIN (pipeline), "audiotee"); + g_assert_nonnull (tee); + srcpad = gst_element_get_request_pad (tee, "src_%u"); + gst_object_unref (tee); + sinkpad = gst_element_get_request_pad (webrtc, "sink_%u"); + + /* Add the bin to the pipeline and connect it */ + gst_bin_add (GST_BIN (pipeline), webrtc); + + ret = gst_pad_link (srcpad, sinkpad); + g_assert_cmpint (ret, ==, GST_PAD_LINK_OK); + gst_object_unref (srcpad); + gst_object_unref (sinkpad); + + /* This is the gstwebrtc entry point where we create the offer and so on. It + * will be called when the pipeline goes to PLAYING. + * XXX: We must connect this after webrtcbin has been linked to a source via + * get_request_pad() otherwise webrtcbin will create an SDP offer with no + * media lines in it. */ + if (offer) + g_signal_connect (webrtc, "on-negotiation-needed", + G_CALLBACK (on_negotiation_needed), (gpointer) peer_id); + + /* We need to transmit this ICE candidate to the browser via the websockets + * signalling server. Incoming ice candidates from the browser need to be + * added by us too, see on_server_message() */ + g_signal_connect (webrtc, "on-ice-candidate", + G_CALLBACK (send_ice_candidate_message), (gpointer) peer_id); + /* Incoming streams will be exposed via this signal */ + g_signal_connect (webrtc, "pad-added", G_CALLBACK (on_incoming_stream), + pipeline); + + /* Set to bin to PLAYING */ + ret = gst_element_sync_state_with_parent (webrtc); + gst_object_unref (webrtc); + g_assert_true (ret); +} + +static void +call_peer (const gchar * peer_id) +{ + add_webrtcbin_to_pipeline (peer_id, TRUE); +} + +static void +incoming_call_from_peer (const gchar * peer_id) +{ + add_webrtcbin_to_pipeline (peer_id, FALSE); +} + +#define STR(x) #x +#define RTP_CAPS_OPUS(x) "application/x-rtp,media=audio,encoding-name=OPUS,payload=" STR(x) +#define RTP_CAPS_VP8(x) "application/x-rtp,media=video,encoding-name=VP8,payload=" STR(x) + +static gboolean +start_pipeline (void) +{ + GstStateChangeReturn ret; + GError *error = NULL; + + /* NOTE: webrtcbin currently does not support dynamic addition/removal of + * streams, so we use a separate webrtcbin for each peer, but all of them are + * inside the same pipeline. We start by connecting it to a fakesink so that + * we can preroll early. */ + pipeline = gst_parse_launch ("tee name=audiotee ! queue ! fakesink " + "audiotestsrc is-live=true wave=red-noise ! queue ! opusenc ! rtpopuspay ! " + "queue ! " RTP_CAPS_OPUS(96) " ! audiotee. ", + &error); + + if (error) { + g_printerr ("Failed to parse launch: %s\n", error->message); + g_error_free (error); + goto err; + } + + g_print ("Starting pipeline, not transmitting yet\n"); + ret = gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING); + if (ret == GST_STATE_CHANGE_FAILURE) + goto err; + + return TRUE; + +err: + if (pipeline) + g_clear_object (&pipeline); + return FALSE; +} + +static gboolean +join_room_on_server (void) +{ + gchar *msg; + + if (soup_websocket_connection_get_state (ws_conn) != + SOUP_WEBSOCKET_STATE_OPEN) + return FALSE; + + if (!room_id) + return FALSE; + + g_print ("Joining room %s\n", room_id); + app_state = ROOM_JOINING; + msg = g_strdup_printf ("ROOM %s", room_id); + soup_websocket_connection_send_text (ws_conn, msg); + g_free (msg); + return TRUE; +} + +static gboolean +register_with_server (void) +{ + gchar *hello; + + if (soup_websocket_connection_get_state (ws_conn) != + SOUP_WEBSOCKET_STATE_OPEN) + return FALSE; + + g_print ("Registering id %s with server\n", local_id); + app_state = SERVER_REGISTERING; + + /* Register with the server with a random integer id. Reply will be received + * by on_server_message() */ + hello = g_strdup_printf ("HELLO %s", local_id); + soup_websocket_connection_send_text (ws_conn, hello); + g_free (hello); + + return TRUE; +} + +static void +on_server_closed (SoupWebsocketConnection * conn G_GNUC_UNUSED, + gpointer user_data G_GNUC_UNUSED) +{ + app_state = SERVER_CLOSED; + cleanup_and_quit_loop ("Server connection closed", 0); +} + +static gboolean +do_registration (void) +{ + if (app_state != SERVER_REGISTERING) { + cleanup_and_quit_loop ("ERROR: Received HELLO when not registering", + APP_STATE_ERROR); + return FALSE; + } + app_state = SERVER_REGISTERED; + g_print ("Registered with server\n"); + /* Ask signalling server that we want to join a room */ + if (!join_room_on_server ()) { + cleanup_and_quit_loop ("ERROR: Failed to join room", ROOM_CALL_ERROR); + return FALSE; + } + return TRUE; +} + +/* + * When we join a room, we are responsible for calling by starting negotiation + * with each peer in it by sending an SDP offer and ICE candidates. + */ +static void +do_join_room (const gchar * text) +{ + gint ii, len; + gchar **peer_ids; + + if (app_state != ROOM_JOINING) { + cleanup_and_quit_loop ("ERROR: Received ROOM_OK when not calling", + ROOM_JOIN_ERROR); + return; + } + + app_state = ROOM_JOINED; + /* Start recording, but not transmitting */ + if (!start_pipeline ()) { + cleanup_and_quit_loop ("ERROR: Failed to start pipeline", + ROOM_CALL_ERROR); + return; + } + + peer_ids = g_strsplit (text, " ", -1); + g_assert_cmpstr (peer_ids[0], ==, "ROOM_OK"); + len = g_strv_length (peer_ids); + /* There are peers in the room already. We need to start negotiation + * (exchange SDP and ICE candidates) and transmission of media. */ + if (len > 1 && strlen (peer_ids[1]) > 0) { + g_print ("Found %i peers already in room\n", len - 1); + app_state = ROOM_CALL_OFFERING; + for (ii = 1; ii < len; ii++) { + gchar *peer_id = g_strdup (peer_ids[ii]); + g_print ("Negotiating with peer %s\n", peer_id); + /* This might fail asynchronously */ + call_peer (peer_id); + peers = g_list_prepend (peers, peer_id); + } + } + + g_strfreev (peer_ids); + return; +} + +static void +handle_error_message (const gchar * msg) +{ + switch (app_state) { + case SERVER_CONNECTING: + app_state = SERVER_CONNECTION_ERROR; + break; + case SERVER_REGISTERING: + app_state = SERVER_REGISTRATION_ERROR; + break; + case ROOM_JOINING: + app_state = ROOM_JOIN_ERROR; + break; + case ROOM_JOINED: + case ROOM_CALL_NEGOTIATING: + case ROOM_CALL_OFFERING: + case ROOM_CALL_ANSWERING: + app_state = ROOM_CALL_ERROR; + break; + case ROOM_CALL_STARTED: + case ROOM_CALL_STOPPING: + case ROOM_CALL_STOPPED: + app_state = ROOM_CALL_ERROR; + break; + default: + app_state = APP_STATE_ERROR; + } + cleanup_and_quit_loop (msg, 0); +} + +static void +on_answer_created (GstPromise * promise, const gchar * peer_id) +{ + GstElement *webrtc; + GstWebRTCSessionDescription *answer; + + g_assert_cmpint (app_state, ==, ROOM_CALL_ANSWERING); + + g_assert_cmpint (promise->result, ==, GST_PROMISE_RESULT_REPLIED); + gst_structure_get (promise->promise, "answer", + GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &answer, NULL); + gst_promise_unref (promise); + + promise = gst_promise_new (); + webrtc = gst_bin_get_by_name (GST_BIN (pipeline), peer_id); + g_assert_nonnull (webrtc); + g_signal_emit_by_name (webrtc, "set-local-description", answer, promise); + gst_promise_interrupt (promise); + gst_promise_unref (promise); + + /* Send offer to peer */ + send_room_peer_sdp (answer, peer_id); + gst_webrtc_session_description_free (answer); + + app_state = ROOM_CALL_STARTED; +} + +static void +handle_sdp_offer (const gchar * peer_id, const gchar * text) +{ + int ret; + GstPromise *promise; + GstElement *webrtc; + GstSDPMessage *sdp; + GstWebRTCSessionDescription *offer; + + g_assert_cmpint (app_state, ==, ROOM_CALL_ANSWERING); + + g_print ("Received offer:\n%s\n", text); + + ret = gst_sdp_message_new (&sdp); + g_assert_cmpint (ret, ==, GST_SDP_OK); + + ret = gst_sdp_message_parse_buffer (text, strlen (text), sdp); + g_assert_cmpint (ret, ==, GST_SDP_OK); + + offer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_OFFER, sdp); + g_assert_nonnull (offer); + + /* Set remote description on our pipeline */ + promise = gst_promise_new (); + webrtc = gst_bin_get_by_name (GST_BIN (pipeline), peer_id); + g_assert_nonnull (webrtc); + g_signal_emit_by_name (webrtc, "set-remote-description", offer, promise); + /* We don't want to be notified when the action is done */ + gst_promise_interrupt (promise); + gst_promise_unref (promise); + + /* Create an answer that we will send back to the peer */ + promise = gst_promise_new (); + gst_promise_set_change_callback (promise, + (GstPromiseChangeFunc) on_answer_created, (gpointer) peer_id, NULL); + g_signal_emit_by_name (webrtc, "create-answer", NULL, promise); + + gst_webrtc_session_description_free (offer); + gst_object_unref (webrtc); +} + +static void +handle_sdp_answer (const gchar * peer_id, const gchar * text) +{ + int ret; + GstPromise *promise; + GstElement *webrtc; + GstSDPMessage *sdp; + GstWebRTCSessionDescription *answer; + + g_assert_cmpint (app_state, >=, ROOM_CALL_OFFERING); + + g_print ("Received answer:\n%s\n", text); + + ret = gst_sdp_message_new (&sdp); + g_assert_cmpint (ret, ==, GST_SDP_OK); + + ret = gst_sdp_message_parse_buffer (text, strlen (text), sdp); + g_assert_cmpint (ret, ==, GST_SDP_OK); + + answer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_ANSWER, sdp); + g_assert_nonnull (answer); + + /* Set remote description on our pipeline */ + promise = gst_promise_new (); + webrtc = gst_bin_get_by_name (GST_BIN (pipeline), peer_id); + g_assert_nonnull (webrtc); + g_signal_emit_by_name (webrtc, "set-remote-description", answer, promise); + gst_object_unref (webrtc); + /* We don't want to be notified when the action is done */ + gst_promise_interrupt (promise); + gst_promise_unref (promise); +} + +static gboolean +handle_peer_message (const gchar * peer_id, const gchar * msg) +{ + JsonNode *root; + JsonObject *object, *child; + JsonParser *parser = json_parser_new (); + if (!json_parser_load_from_data (parser, msg, -1, NULL)) { + g_printerr ("Unknown message '%s' from '%s', ignoring", msg, peer_id); + g_object_unref (parser); + return FALSE; + } + + root = json_parser_get_root (parser); + if (!JSON_NODE_HOLDS_OBJECT (root)) { + g_printerr ("Unknown json message '%s' from '%s', ignoring", msg, peer_id); + g_object_unref (parser); + return FALSE; + } + + g_print ("Message from peer %s: %s\n", peer_id, msg); + + object = json_node_get_object (root); + /* Check type of JSON message */ + if (json_object_has_member (object, "sdp")) { + int ret; + GstSDPMessage *sdp; + const gchar *text, *sdp_type; + GstWebRTCSessionDescription *answer; + + g_assert_cmpint (app_state, >=, ROOM_JOINED); + + child = json_object_get_object_member (object, "sdp"); + + if (!json_object_has_member (child, "type")) { + cleanup_and_quit_loop ("ERROR: received SDP without 'type'", + ROOM_CALL_ERROR); + return FALSE; + } + + sdp_type = json_object_get_string_member (child, "type"); + text = json_object_get_string_member (child, "sdp"); + + if (g_strcmp0 (sdp_type, "offer") == 0) { + app_state = ROOM_CALL_ANSWERING; + incoming_call_from_peer (peer_id); + handle_sdp_offer (peer_id, text); + } else if (g_strcmp0 (sdp_type, "answer") == 0) { + g_assert_cmpint (app_state, >=, ROOM_CALL_OFFERING); + handle_sdp_answer (peer_id, text); + app_state = ROOM_CALL_STARTED; + } else { + cleanup_and_quit_loop ("ERROR: invalid sdp_type", ROOM_CALL_ERROR); + return FALSE; + } + } else if (json_object_has_member (object, "ice")) { + GstElement *webrtc; + const gchar *candidate; + gint sdpmlineindex; + + child = json_object_get_object_member (object, "ice"); + candidate = json_object_get_string_member (child, "candidate"); + sdpmlineindex = json_object_get_int_member (child, "sdpMLineIndex"); + + /* Add ice candidate sent by remote peer */ + webrtc = gst_bin_get_by_name (GST_BIN (pipeline), peer_id); + g_assert_nonnull (webrtc); + g_signal_emit_by_name (webrtc, "add-ice-candidate", sdpmlineindex, + candidate); + gst_object_unref (webrtc); + } else { + g_printerr ("Ignoring unknown JSON message:\n%s\n", msg); + } + g_object_unref (parser); + return TRUE; +} + +/* One mega message handler for our asynchronous calling mechanism */ +static void +on_server_message (SoupWebsocketConnection * conn, SoupWebsocketDataType type, + GBytes * message, gpointer user_data) +{ + gsize size; + gchar *text, *data; + + switch (type) { + case SOUP_WEBSOCKET_DATA_BINARY: + g_printerr ("Received unknown binary message, ignoring\n"); + g_bytes_unref (message); + return; + case SOUP_WEBSOCKET_DATA_TEXT: + data = g_bytes_unref_to_data (message, &size); + /* Convert to NULL-terminated string */ + text = g_strndup (data, size); + g_free (data); + break; + default: + g_assert_not_reached (); + } + + /* Server has accepted our registration, we are ready to send commands */ + if (g_strcmp0 (text, "HELLO") == 0) { + /* May fail asynchronously */ + do_registration (); + /* Room-related message */ + } else if (g_str_has_prefix (text, "ROOM_")) { + /* Room joined, now we can start negotiation */ + if (g_str_has_prefix (text, "ROOM_OK ")) { + /* May fail asynchronously */ + do_join_room (text); + } else if (g_str_has_prefix (text, "ROOM_PEER")) { + gchar **splitm = NULL; + const gchar *peer_id; + /* SDP and ICE, usually */ + if (g_str_has_prefix (text, "ROOM_PEER_MSG")) { + splitm = g_strsplit (text, " ", 3); + peer_id = find_peer_from_list (splitm[1]); + g_assert_nonnull (peer_id); + /* Could be an offer or an answer, or ICE, or an arbitrary message */ + handle_peer_message (peer_id, splitm[2]); + } else if (g_str_has_prefix (text, "ROOM_PEER_JOINED")) { + splitm = g_strsplit (text, " ", 2); + peers = g_list_prepend (peers, g_strdup (splitm[1])); + peer_id = find_peer_from_list (splitm[1]); + g_assert_nonnull (peer_id); + g_print ("Peer %s has joined the room\n", peer_id); + } else if (g_str_has_prefix (text, "ROOM_PEER_LEFT")) { + splitm = g_strsplit (text, " ", 2); + peer_id = find_peer_from_list (splitm[1]); + g_assert_nonnull (peer_id); + peers = g_list_remove (peers, peer_id); + g_print ("Peer %s has left the room\n", peer_id); + g_free ((gchar*) peer_id); + /* TODO: cleanup pipeline */ + } else { + g_printerr ("WARNING: Ignoring unknown message %s\n", text); + } + g_strfreev (splitm); + } else { + goto err; + } + /* Handle errors */ + } else if (g_str_has_prefix (text, "ERROR")) { + handle_error_message (text); + } else { + goto err; + } + +out: + g_free (text); + return; + +err: + { + gchar *err_s = g_strdup_printf ("ERROR: unknown message %s", text); + cleanup_and_quit_loop (err_s, 0); + g_free (err_s); + goto out; + } +} + +static void +on_server_connected (SoupSession * session, GAsyncResult * res, + SoupMessage *msg) +{ + GError *error = NULL; + + ws_conn = soup_session_websocket_connect_finish (session, res, &error); + if (error) { + cleanup_and_quit_loop (error->message, SERVER_CONNECTION_ERROR); + g_error_free (error); + return; + } + + g_assert_nonnull (ws_conn); + + app_state = SERVER_CONNECTED; + g_print ("Connected to signalling server\n"); + + g_signal_connect (ws_conn, "closed", G_CALLBACK (on_server_closed), NULL); + g_signal_connect (ws_conn, "message", G_CALLBACK (on_server_message), NULL); + + /* Register with the server so it knows about us and can accept commands + * responses from the server will be handled in on_server_message() above */ + register_with_server (); +} + +/* + * Connect to the signalling server. This is the entrypoint for everything else. + */ +static void +connect_to_websocket_server_async (void) +{ + SoupLogger *logger; + SoupMessage *message; + SoupSession *session; + const char *https_aliases[] = {"wss", NULL}; + + session = soup_session_new_with_options (SOUP_SESSION_SSL_STRICT, TRUE, + SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE, + //SOUP_SESSION_SSL_CA_FILE, "/etc/ssl/certs/ca-bundle.crt", + SOUP_SESSION_HTTPS_ALIASES, https_aliases, NULL); + + logger = soup_logger_new (SOUP_LOGGER_LOG_BODY, -1); + soup_session_add_feature (session, SOUP_SESSION_FEATURE (logger)); + g_object_unref (logger); + + message = soup_message_new (SOUP_METHOD_GET, server_url); + + g_print ("Connecting to server...\n"); + + /* Once connected, we will register */ + soup_session_websocket_connect_async (session, message, NULL, NULL, NULL, + (GAsyncReadyCallback) on_server_connected, message); + app_state = SERVER_CONNECTING; +} + +int +main (int argc, char *argv[]) +{ + SoupSession *session; + GOptionContext *context; + GError *error = NULL; + + context = g_option_context_new ("- gstreamer webrtc sendrecv demo"); + g_option_context_add_main_entries (context, entries, NULL); + g_option_context_add_group (context, gst_init_get_option_group ()); + if (!g_option_context_parse (context, &argc, &argv, &error)) { + g_printerr ("Error initializing: %s\n", error->message); + return -1; + } + + if (!room_id) { + g_printerr ("--room-id is a required argument\n"); + return -1; + } + + if (!local_id) + local_id = g_strdup_printf ("%s-%i", g_get_user_name (), + g_random_int_range (10, 10000)); + /* Sanitize by removing whitespace, modifies string in-place */ + g_strdelimit (local_id, " \t\n\r", '-'); + + g_print ("Our local id is %s\n", local_id); + + if (!server_url) + server_url = g_strdup (default_server_url); + + loop = g_main_loop_new (NULL, FALSE); + + connect_to_websocket_server_async (); + + g_main_loop_run (loop); + + gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL); + g_print ("Pipeline stopped\n"); + + gst_object_unref (pipeline); + g_free (server_url); + g_free (local_id); + g_free (room_id); + + return 0; +}