#include #include #include #include #ifdef G_OS_UNIX #include #endif #define GST_USE_UNSTABLE_API #include #include #include #include /* This example is a standalone app which serves a web page * and configures webrtcbin to receive an H.264 video feed, and to * send+recv an Opus audio stream */ #define RTP_PAYLOAD_TYPE "96" #define RTP_CAPS_OPUS "application/x-rtp,media=audio,encoding-name=OPUS,payload=" #define SOUP_HTTP_PORT 57778 #define STUN_SERVER "stun.l.google.com:19302" typedef struct _ReceiverEntry ReceiverEntry; ReceiverEntry *create_receiver_entry (SoupWebsocketConnection * connection); void destroy_receiver_entry (gpointer receiver_entry_ptr); GstPadProbeReturn payloader_caps_event_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data); void on_offer_created_cb (GstPromise * promise, gpointer user_data); void on_negotiation_needed_cb (GstElement * webrtcbin, gpointer user_data); void on_ice_candidate_cb (GstElement * webrtcbin, guint mline_index, gchar * candidate, gpointer user_data); void soup_websocket_message_cb (SoupWebsocketConnection * connection, SoupWebsocketDataType data_type, GBytes * message, gpointer user_data); void soup_websocket_closed_cb (SoupWebsocketConnection * connection, gpointer user_data); void soup_http_handler (SoupServer * soup_server, SoupMessage * message, const char *path, GHashTable * query, SoupClientContext * client_context, gpointer user_data); void soup_websocket_handler (G_GNUC_UNUSED SoupServer * server, SoupWebsocketConnection * connection, const char *path, SoupClientContext * client_context, gpointer user_data); static gchar *get_string_from_json_object (JsonObject * object); struct _ReceiverEntry { SoupWebsocketConnection *connection; GstElement *pipeline; GstElement *webrtcbin; }; const gchar *html_source = " \n \ \n \ \n \ \n \ \n \ \n \ \n \ \n \
\n \ \n \
\n \ \n \ \n \ "; static void handle_media_stream (GstPad * pad, GstElement * pipe, const char *convert_name, const char *sink_name) { GstPad *qpad; GstElement *q, *conv, *resample, *sink; GstPadLinkReturn ret; gst_print ("Trying to handle stream with %s ! %s", convert_name, sink_name); q = gst_element_factory_make ("queue", NULL); g_assert_nonnull (q); conv = gst_element_factory_make (convert_name, NULL); g_assert_nonnull (conv); sink = gst_element_factory_make (sink_name, NULL); g_assert_nonnull (sink); if (g_strcmp0 (convert_name, "audioconvert") == 0) { /* Might also need to resample, so add it just in case. * Will be a no-op if it's not required. */ resample = gst_element_factory_make ("audioresample", NULL); g_assert_nonnull (resample); gst_bin_add_many (GST_BIN (pipe), q, conv, resample, sink, NULL); gst_element_sync_state_with_parent (q); gst_element_sync_state_with_parent (conv); gst_element_sync_state_with_parent (resample); gst_element_sync_state_with_parent (sink); gst_element_link_many (q, conv, resample, sink, NULL); } else { gst_bin_add_many (GST_BIN (pipe), q, conv, sink, NULL); gst_element_sync_state_with_parent (q); gst_element_sync_state_with_parent (conv); gst_element_sync_state_with_parent (sink); gst_element_link_many (q, conv, sink, NULL); } qpad = gst_element_get_static_pad (q, "sink"); ret = gst_pad_link (pad, qpad); g_assert_cmphex (ret, ==, GST_PAD_LINK_OK); } static void on_incoming_decodebin_stream (GstElement * decodebin, GstPad * pad, GstElement * pipe) { GstCaps *caps; const gchar *name; if (!gst_pad_has_current_caps (pad)) { gst_printerr ("Pad '%s' has no caps, can't do anything, ignoring\n", GST_PAD_NAME (pad)); return; } caps = gst_pad_get_current_caps (pad); name = gst_structure_get_name (gst_caps_get_structure (caps, 0)); if (g_str_has_prefix (name, "video")) { handle_media_stream (pad, pipe, "videoconvert", "autovideosink"); } else if (g_str_has_prefix (name, "audio")) { handle_media_stream (pad, pipe, "audioconvert", "autoaudiosink"); } else { gst_printerr ("Unknown pad %s, ignoring", GST_PAD_NAME (pad)); } } static void on_incoming_stream (GstElement * webrtc, GstPad * pad, ReceiverEntry * receiver_entry) { GstElement *decodebin; GstPad *sinkpad; if (GST_PAD_DIRECTION (pad) != GST_PAD_SRC) return; decodebin = gst_element_factory_make ("decodebin", NULL); g_signal_connect (decodebin, "pad-added", G_CALLBACK (on_incoming_decodebin_stream), receiver_entry->pipeline); gst_bin_add (GST_BIN (receiver_entry->pipeline), decodebin); gst_element_sync_state_with_parent (decodebin); sinkpad = gst_element_get_static_pad (decodebin, "sink"); gst_pad_link (pad, sinkpad); gst_object_unref (sinkpad); } static gboolean bus_watch_cb (GstBus * bus, GstMessage * message, gpointer user_data) { GstPipeline *pipeline = user_data; switch (GST_MESSAGE_TYPE (message)) { case GST_MESSAGE_ERROR: { GError *error = NULL; gchar *debug = NULL; gst_message_parse_error (message, &error, &debug); g_error ("Error on bus: %s (debug: %s)", error->message, debug); g_error_free (error); g_free (debug); break; } case GST_MESSAGE_WARNING: { GError *error = NULL; gchar *debug = NULL; gst_message_parse_warning (message, &error, &debug); g_warning ("Warning on bus: %s (debug: %s)", error->message, debug); g_error_free (error); g_free (debug); break; } case GST_MESSAGE_LATENCY: gst_bin_recalculate_latency (GST_BIN (pipeline)); break; default: break; } return G_SOURCE_CONTINUE; } ReceiverEntry * create_receiver_entry (SoupWebsocketConnection * connection) { GError *error; ReceiverEntry *receiver_entry; GstCaps *video_caps; GstWebRTCRTPTransceiver *trans = NULL; GstBus *bus; receiver_entry = g_new0 (ReceiverEntry, 1); receiver_entry->connection = connection; g_object_ref (G_OBJECT (connection)); g_signal_connect (G_OBJECT (connection), "message", G_CALLBACK (soup_websocket_message_cb), (gpointer) receiver_entry); error = NULL; receiver_entry->pipeline = gst_parse_launch ("webrtcbin name=webrtcbin stun-server=stun://" STUN_SERVER " " "audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay ! " "queue ! " RTP_CAPS_OPUS "97 ! webrtcbin. ", &error); if (error != NULL) { g_error ("Could not create WebRTC pipeline: %s\n", error->message); g_error_free (error); goto cleanup; } receiver_entry->webrtcbin = gst_bin_get_by_name (GST_BIN (receiver_entry->pipeline), "webrtcbin"); g_assert (receiver_entry->webrtcbin != NULL); /* Incoming streams will be exposed via this signal */ g_signal_connect (receiver_entry->webrtcbin, "pad-added", G_CALLBACK (on_incoming_stream), receiver_entry); #if 0 GstElement *rtpbin = gst_bin_get_by_name (GST_BIN (receiver_entry->webrtcbin), "rtpbin"); g_object_set (rtpbin, "latency", 40, NULL); gst_object_unref (rtpbin); #endif // Create a 2nd transceiver for the receive only video stream video_caps = gst_caps_from_string ("application/x-rtp,media=video,encoding-name=H264,payload=" RTP_PAYLOAD_TYPE ",clock-rate=90000,packetization-mode=(string)1, profile-level-id=(string)42c016"); g_signal_emit_by_name (receiver_entry->webrtcbin, "add-transceiver", GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_RECVONLY, video_caps, &trans); gst_caps_unref (video_caps); gst_object_unref (trans); g_signal_connect (receiver_entry->webrtcbin, "on-negotiation-needed", G_CALLBACK (on_negotiation_needed_cb), (gpointer) receiver_entry); g_signal_connect (receiver_entry->webrtcbin, "on-ice-candidate", G_CALLBACK (on_ice_candidate_cb), (gpointer) receiver_entry); bus = gst_pipeline_get_bus (GST_PIPELINE (receiver_entry->pipeline)); gst_bus_add_watch (bus, bus_watch_cb, receiver_entry->pipeline); gst_object_unref (bus); if (gst_element_set_state (receiver_entry->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) g_error ("Error starting pipeline"); return receiver_entry; cleanup: destroy_receiver_entry ((gpointer) receiver_entry); return NULL; } void destroy_receiver_entry (gpointer receiver_entry_ptr) { ReceiverEntry *receiver_entry = (ReceiverEntry *) receiver_entry_ptr; g_assert (receiver_entry != NULL); if (receiver_entry->pipeline != NULL) { GstBus *bus; gst_element_set_state (GST_ELEMENT (receiver_entry->pipeline), GST_STATE_NULL); bus = gst_pipeline_get_bus (GST_PIPELINE (receiver_entry->pipeline)); gst_bus_remove_watch (bus); gst_object_unref (bus); gst_object_unref (GST_OBJECT (receiver_entry->webrtcbin)); gst_object_unref (GST_OBJECT (receiver_entry->pipeline)); } if (receiver_entry->connection != NULL) g_object_unref (G_OBJECT (receiver_entry->connection)); g_free (receiver_entry); } void on_offer_created_cb (GstPromise * promise, gpointer user_data) { gchar *sdp_string; gchar *json_string; JsonObject *sdp_json; JsonObject *sdp_data_json; GstStructure const *reply; GstPromise *local_desc_promise; GstWebRTCSessionDescription *offer = NULL; ReceiverEntry *receiver_entry = (ReceiverEntry *) user_data; reply = gst_promise_get_reply (promise); gst_structure_get (reply, "offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL); gst_promise_unref (promise); local_desc_promise = gst_promise_new (); g_signal_emit_by_name (receiver_entry->webrtcbin, "set-local-description", offer, local_desc_promise); gst_promise_interrupt (local_desc_promise); gst_promise_unref (local_desc_promise); sdp_string = gst_sdp_message_as_text (offer->sdp); gst_print ("Negotiation offer created:\n%s\n", sdp_string); sdp_json = json_object_new (); json_object_set_string_member (sdp_json, "type", "sdp"); sdp_data_json = json_object_new (); json_object_set_string_member (sdp_data_json, "type", "offer"); json_object_set_string_member (sdp_data_json, "sdp", sdp_string); json_object_set_object_member (sdp_json, "data", sdp_data_json); json_string = get_string_from_json_object (sdp_json); json_object_unref (sdp_json); soup_websocket_connection_send_text (receiver_entry->connection, json_string); g_free (json_string); g_free (sdp_string); gst_webrtc_session_description_free (offer); } void on_negotiation_needed_cb (GstElement * webrtcbin, gpointer user_data) { GstPromise *promise; ReceiverEntry *receiver_entry = (ReceiverEntry *) user_data; gst_print ("Creating negotiation offer\n"); promise = gst_promise_new_with_change_func (on_offer_created_cb, (gpointer) receiver_entry, NULL); g_signal_emit_by_name (G_OBJECT (webrtcbin), "create-offer", NULL, promise); } void on_ice_candidate_cb (G_GNUC_UNUSED GstElement * webrtcbin, guint mline_index, gchar * candidate, gpointer user_data) { JsonObject *ice_json; JsonObject *ice_data_json; gchar *json_string; ReceiverEntry *receiver_entry = (ReceiverEntry *) user_data; ice_json = json_object_new (); json_object_set_string_member (ice_json, "type", "ice"); ice_data_json = json_object_new (); json_object_set_int_member (ice_data_json, "sdpMLineIndex", mline_index); json_object_set_string_member (ice_data_json, "candidate", candidate); json_object_set_object_member (ice_json, "data", ice_data_json); json_string = get_string_from_json_object (ice_json); json_object_unref (ice_json); soup_websocket_connection_send_text (receiver_entry->connection, json_string); g_free (json_string); } void soup_websocket_message_cb (G_GNUC_UNUSED SoupWebsocketConnection * connection, SoupWebsocketDataType data_type, GBytes * message, gpointer user_data) { gsize size; const gchar *data; gchar *data_string; const gchar *type_string; JsonNode *root_json; JsonObject *root_json_object; JsonObject *data_json_object; JsonParser *json_parser = NULL; ReceiverEntry *receiver_entry = (ReceiverEntry *) user_data; switch (data_type) { case SOUP_WEBSOCKET_DATA_BINARY: g_error ("Received unknown binary message, ignoring\n"); return; case SOUP_WEBSOCKET_DATA_TEXT: data = g_bytes_get_data (message, &size); /* Convert to NULL-terminated string */ data_string = g_strndup (data, size); break; default: g_assert_not_reached (); } json_parser = json_parser_new (); if (!json_parser_load_from_data (json_parser, data_string, -1, NULL)) goto unknown_message; root_json = json_parser_get_root (json_parser); if (!JSON_NODE_HOLDS_OBJECT (root_json)) goto unknown_message; root_json_object = json_node_get_object (root_json); if (!json_object_has_member (root_json_object, "type")) { g_error ("Received message without type field\n"); goto cleanup; } type_string = json_object_get_string_member (root_json_object, "type"); if (!json_object_has_member (root_json_object, "data")) { g_error ("Received message without data field\n"); goto cleanup; } data_json_object = json_object_get_object_member (root_json_object, "data"); if (g_strcmp0 (type_string, "sdp") == 0) { const gchar *sdp_type_string; const gchar *sdp_string; GstPromise *promise; GstSDPMessage *sdp; GstWebRTCSessionDescription *answer; int ret; if (!json_object_has_member (data_json_object, "type")) { g_error ("Received SDP message without type field\n"); goto cleanup; } sdp_type_string = json_object_get_string_member (data_json_object, "type"); if (g_strcmp0 (sdp_type_string, "answer") != 0) { g_error ("Expected SDP message type \"answer\", got \"%s\"\n", sdp_type_string); goto cleanup; } if (!json_object_has_member (data_json_object, "sdp")) { g_error ("Received SDP message without SDP string\n"); goto cleanup; } sdp_string = json_object_get_string_member (data_json_object, "sdp"); gst_print ("Received SDP:\n%s\n", sdp_string); ret = gst_sdp_message_new (&sdp); g_assert_cmphex (ret, ==, GST_SDP_OK); ret = gst_sdp_message_parse_buffer ((guint8 *) sdp_string, strlen (sdp_string), sdp); if (ret != GST_SDP_OK) { g_error ("Could not parse SDP string\n"); goto cleanup; } answer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_ANSWER, sdp); g_assert_nonnull (answer); promise = gst_promise_new (); g_signal_emit_by_name (receiver_entry->webrtcbin, "set-remote-description", answer, promise); gst_promise_interrupt (promise); gst_promise_unref (promise); gst_webrtc_session_description_free (answer); } else if (g_strcmp0 (type_string, "ice") == 0) { guint mline_index; const gchar *candidate_string; if (!json_object_has_member (data_json_object, "sdpMLineIndex")) { g_error ("Received ICE message without mline index\n"); goto cleanup; } mline_index = json_object_get_int_member (data_json_object, "sdpMLineIndex"); if (!json_object_has_member (data_json_object, "candidate")) { g_error ("Received ICE message without ICE candidate string\n"); goto cleanup; } candidate_string = json_object_get_string_member (data_json_object, "candidate"); gst_print ("Received ICE candidate with mline index %u; candidate: %s\n", mline_index, candidate_string); g_signal_emit_by_name (receiver_entry->webrtcbin, "add-ice-candidate", mline_index, candidate_string); } else goto unknown_message; cleanup: if (json_parser != NULL) g_object_unref (G_OBJECT (json_parser)); g_free (data_string); return; unknown_message: g_error ("Unknown message \"%s\", ignoring", data_string); goto cleanup; } void soup_websocket_closed_cb (SoupWebsocketConnection * connection, gpointer user_data) { GHashTable *receiver_entry_table = (GHashTable *) user_data; g_hash_table_remove (receiver_entry_table, connection); gst_print ("Closed websocket connection %p\n", (gpointer) connection); } void soup_http_handler (G_GNUC_UNUSED SoupServer * soup_server, SoupMessage * message, const char *path, G_GNUC_UNUSED GHashTable * query, G_GNUC_UNUSED SoupClientContext * client_context, G_GNUC_UNUSED gpointer user_data) { SoupBuffer *soup_buffer; if ((g_strcmp0 (path, "/") != 0) && (g_strcmp0 (path, "/index.html") != 0)) { soup_message_set_status (message, SOUP_STATUS_NOT_FOUND); return; } soup_buffer = soup_buffer_new (SOUP_MEMORY_STATIC, html_source, strlen (html_source)); soup_message_headers_set_content_type (message->response_headers, "text/html", NULL); soup_message_body_append_buffer (message->response_body, soup_buffer); soup_buffer_free (soup_buffer); soup_message_set_status (message, SOUP_STATUS_OK); } void soup_websocket_handler (G_GNUC_UNUSED SoupServer * server, SoupWebsocketConnection * connection, G_GNUC_UNUSED const char *path, G_GNUC_UNUSED SoupClientContext * client_context, gpointer user_data) { ReceiverEntry *receiver_entry; GHashTable *receiver_entry_table = (GHashTable *) user_data; gst_print ("Processing new websocket connection %p", (gpointer) connection); g_signal_connect (G_OBJECT (connection), "closed", G_CALLBACK (soup_websocket_closed_cb), (gpointer) receiver_entry_table); receiver_entry = create_receiver_entry (connection); g_hash_table_replace (receiver_entry_table, connection, receiver_entry); } static gchar * get_string_from_json_object (JsonObject * object) { JsonNode *root; JsonGenerator *generator; gchar *text; /* Make it the root node */ root = json_node_init_object (json_node_alloc (), object); generator = json_generator_new (); json_generator_set_root (generator, root); text = json_generator_to_data (generator, NULL); /* Release everything */ g_object_unref (generator); json_node_free (root); return text; } #ifdef G_OS_UNIX gboolean exit_sighandler (gpointer user_data) { gst_print ("Caught signal, stopping mainloop\n"); GMainLoop *mainloop = (GMainLoop *) user_data; g_main_loop_quit (mainloop); return TRUE; } #endif int main (int argc, char *argv[]) { GMainLoop *mainloop; SoupServer *soup_server; GHashTable *receiver_entry_table; setlocale (LC_ALL, ""); gst_init (&argc, &argv); receiver_entry_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, destroy_receiver_entry); mainloop = g_main_loop_new (NULL, FALSE); g_assert (mainloop != NULL); #ifdef G_OS_UNIX g_unix_signal_add (SIGINT, exit_sighandler, mainloop); g_unix_signal_add (SIGTERM, exit_sighandler, mainloop); #endif soup_server = soup_server_new (SOUP_SERVER_SERVER_HEADER, "webrtc-soup-server", NULL); soup_server_add_handler (soup_server, "/", soup_http_handler, NULL, NULL); soup_server_add_websocket_handler (soup_server, "/ws", NULL, NULL, soup_websocket_handler, (gpointer) receiver_entry_table, NULL); soup_server_listen_all (soup_server, SOUP_HTTP_PORT, (SoupServerListenOptions) 0, NULL); gst_print ("WebRTC page link: http://127.0.0.1:%d/\n", (gint) SOUP_HTTP_PORT); g_main_loop_run (mainloop); g_object_unref (G_OBJECT (soup_server)); g_hash_table_destroy (receiver_entry_table); g_main_loop_unref (mainloop); gst_deinit (); return 0; }