From 39a026760d26050f2f9060c0fc083241b932d9c4 Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Sat, 12 May 2018 21:13:52 +0000 Subject: [PATCH] rpicamsrc: Add webrtc streaming example Add an example for testing webrtc streaming from the rpi camera, based on the code from https://bugzilla.gnome.org/show_bug.cgi?id=795404 Requires GStreamer 1.14.1 or git master --- .../rpicamsrc/webrtc-unidirectional-h264.c | 663 ++++++++++++++++++ 1 file changed, 663 insertions(+) create mode 100644 tests/examples/rpicamsrc/webrtc-unidirectional-h264.c diff --git a/tests/examples/rpicamsrc/webrtc-unidirectional-h264.c b/tests/examples/rpicamsrc/webrtc-unidirectional-h264.c new file mode 100644 index 0000000000..290e46da30 --- /dev/null +++ b/tests/examples/rpicamsrc/webrtc-unidirectional-h264.c @@ -0,0 +1,663 @@ +#include +#include +#include +#include + +#define GST_USE_UNSTABLE_API +#include + +#include +#include +#include + + + +#define RTP_PAYLOAD_TYPE "96" +#define SOUP_HTTP_PORT 57778 + + + +typedef struct _ReceiverEntry ReceiverEntry; + +ReceiverEntry *create_receiver_entry (SoupWebsocketConnection * connection); +void destroy_receiver_entry (gpointer receiver_entry_ptr); + +GstPadProbeReturn payloader_caps_event_probe_cb (GstPad * pad, + GstPadProbeInfo * info, gpointer user_data); + +void on_offer_created_cb (GstPromise * promise, gpointer user_data); +void on_negotiation_needed_cb (GstElement * webrtcbin, gpointer user_data); +void on_ice_candidate_cb (GstElement * webrtcbin, guint mline_index, + gchar * candidate, gpointer user_data); + +void soup_websocket_message_cb (SoupWebsocketConnection * connection, + SoupWebsocketDataType data_type, GBytes * message, gpointer user_data); +void soup_websocket_closed_cb (SoupWebsocketConnection * connection, + gpointer user_data); + +void soup_http_handler (SoupServer * soup_server, SoupMessage * message, + const char *path, GHashTable * query, SoupClientContext * client_context, + gpointer user_data); +void soup_websocket_handler (G_GNUC_UNUSED SoupServer * server, + SoupWebsocketConnection * connection, const char *path, + SoupClientContext * client_context, gpointer user_data); + +static gchar *get_string_from_json_object (JsonObject * object); + +gboolean exit_sighandler (gpointer user_data); + + + + +struct _ReceiverEntry +{ + SoupWebsocketConnection *connection; + + GstElement *pipeline; + GstElement *webrtcbin; + GstElement *payloader; + + GCond profile_level_id_cond; + GMutex profile_level_id_mutex; + gchar *profile_level_id; + + gboolean shutting_down; +}; + + + +const gchar *html_source = " \n \ + \n \ + \n \ + \n \ + \n \ + \n \ + \n \ + \n \ +
\n \ + \n \ +
\n \ + \n \ + \n \ +"; + + + + +ReceiverEntry * +create_receiver_entry (SoupWebsocketConnection * connection) +{ + GError *error; + ReceiverEntry *receiver_entry; + GstPad *payloader_srcpad; + + receiver_entry = g_slice_alloc0 (sizeof (ReceiverEntry)); + receiver_entry->connection = connection; + + g_cond_init (&receiver_entry->profile_level_id_cond); + g_mutex_init (&receiver_entry->profile_level_id_mutex); + + g_object_ref (G_OBJECT (connection)); + + g_signal_connect (G_OBJECT (connection), "message", + G_CALLBACK (soup_websocket_message_cb), (gpointer) receiver_entry); + + error = NULL; + receiver_entry->pipeline = gst_parse_launch ("webrtcbin name=webrtcbin " + "rpicamsrc bitrate=300000 annotation-mode=12 ! video/x-h264,profile=baseline,width=640,height=480 ! queue max-size-time=100000000 ! h264parse ! " + "rtph264pay config-interval=-1 name=payloader ! " + "application/x-rtp,media=video,encoding-name=H264,payload=" + RTP_PAYLOAD_TYPE " ! webrtcbin. ", &error); + if (error != NULL) { + g_error ("Could not create WebRTC pipeline: %s\n", error->message); + g_error_free (error); + goto cleanup; + } + + receiver_entry->webrtcbin = + gst_bin_get_by_name (GST_BIN (receiver_entry->pipeline), "webrtcbin"); + receiver_entry->payloader = + gst_bin_get_by_name (GST_BIN (receiver_entry->pipeline), "payloader"); + g_assert (receiver_entry->webrtcbin != NULL); + g_assert (receiver_entry->payloader != NULL); + + payloader_srcpad = + gst_element_get_static_pad (receiver_entry->payloader, "src"); + gst_pad_add_probe (payloader_srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, + payloader_caps_event_probe_cb, (gpointer) receiver_entry, NULL); + gst_object_unref (GST_OBJECT (payloader_srcpad)); + + g_signal_connect (receiver_entry->webrtcbin, "on-negotiation-needed", + G_CALLBACK (on_negotiation_needed_cb), (gpointer) receiver_entry); + + g_signal_connect (receiver_entry->webrtcbin, "on-ice-candidate", + G_CALLBACK (on_ice_candidate_cb), (gpointer) receiver_entry); + + gst_element_set_state (receiver_entry->pipeline, GST_STATE_PLAYING); + + return receiver_entry; + +cleanup: + destroy_receiver_entry ((gpointer) receiver_entry); + return NULL; +} + +void +destroy_receiver_entry (gpointer receiver_entry_ptr) +{ + ReceiverEntry *receiver_entry = (ReceiverEntry *) receiver_entry_ptr; + + g_assert (receiver_entry != NULL); + + g_mutex_lock (&receiver_entry->profile_level_id_mutex); + receiver_entry->shutting_down = TRUE; + g_cond_signal (&receiver_entry->profile_level_id_cond); + g_mutex_unlock (&receiver_entry->profile_level_id_mutex); + + if (receiver_entry->pipeline != NULL) { + gst_element_set_state (GST_ELEMENT (receiver_entry->pipeline), + GST_STATE_NULL); + + gst_object_unref (GST_OBJECT (receiver_entry->webrtcbin)); + gst_object_unref (GST_OBJECT (receiver_entry->payloader)); + gst_object_unref (GST_OBJECT (receiver_entry->pipeline)); + } + + g_cond_clear (&receiver_entry->profile_level_id_cond); + g_mutex_clear (&receiver_entry->profile_level_id_mutex); + g_free (receiver_entry->profile_level_id); + + if (receiver_entry->connection != NULL) + g_object_unref (G_OBJECT (receiver_entry->connection)); + + g_slice_free1 (sizeof (ReceiverEntry), receiver_entry); +} + + +GstPadProbeReturn +payloader_caps_event_probe_cb (G_GNUC_UNUSED GstPad * pad, + GstPadProbeInfo * info, gpointer user_data) +{ + ReceiverEntry *receiver_entry = (ReceiverEntry *) user_data; + GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info); + + if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) { + gchar const *profile_level_id; + GstStructure *s; + GstCaps *caps; + + caps = NULL; + gst_event_parse_caps (event, &caps); + + s = gst_caps_get_structure (caps, 0); + profile_level_id = gst_structure_get_string (s, "profile-level-id"); + g_assert (profile_level_id != NULL); + + g_mutex_lock (&receiver_entry->profile_level_id_mutex); + + g_free (receiver_entry->profile_level_id); + receiver_entry->profile_level_id = g_strdup (profile_level_id); + + g_cond_signal (&receiver_entry->profile_level_id_cond); + + g_mutex_unlock (&receiver_entry->profile_level_id_mutex); + } + + return GST_PAD_PROBE_OK; +} + + +void +on_offer_created_cb (GstPromise * promise, gpointer user_data) +{ + gchar *fmtp_value; + gchar *sdp_string; + gchar *json_string; + JsonObject *sdp_json; + JsonObject *sdp_data_json; + GstSDPMedia *sdp_media; + GstStructure const *reply; + GstPromise *local_desc_promise; + GstWebRTCSessionDescription *offer = NULL; + ReceiverEntry *receiver_entry = (ReceiverEntry *) user_data; + + reply = gst_promise_get_reply (promise); + gst_structure_get (reply, "offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION, + &offer, NULL); + gst_promise_unref (promise); + + fmtp_value = g_strdup_printf (RTP_PAYLOAD_TYPE " profile-level-id=%s", + receiver_entry->profile_level_id); + sdp_media = + (GstSDPMedia *) & g_array_index (offer->sdp->medias, GstSDPMedia, 0); + gst_sdp_media_add_attribute (sdp_media, "fmtp", fmtp_value); + g_free (fmtp_value); + + local_desc_promise = gst_promise_new (); + g_signal_emit_by_name (receiver_entry->webrtcbin, "set-local-description", + offer, local_desc_promise); + gst_promise_interrupt (local_desc_promise); + gst_promise_unref (local_desc_promise); + + sdp_string = gst_sdp_message_as_text (offer->sdp); + g_print ("Negotiation offer created:\n%s\n", sdp_string); + + sdp_json = json_object_new (); + json_object_set_string_member (sdp_json, "type", "sdp"); + + sdp_data_json = json_object_new (); + json_object_set_string_member (sdp_data_json, "type", "offer"); + json_object_set_string_member (sdp_data_json, "sdp", sdp_string); + json_object_set_object_member (sdp_json, "data", sdp_data_json); + + json_string = get_string_from_json_object (sdp_json); + json_object_unref (sdp_json); + + soup_websocket_connection_send_text (receiver_entry->connection, json_string); + g_free (json_string); + + gst_webrtc_session_description_free (offer); +} + + +void +on_negotiation_needed_cb (GstElement * webrtcbin, gpointer user_data) +{ + gboolean exit_early; + GstPromise *promise; + ReceiverEntry *receiver_entry = (ReceiverEntry *) user_data; + + g_mutex_lock (&receiver_entry->profile_level_id_mutex); + + while ((receiver_entry->profile_level_id == NULL) + && !receiver_entry->shutting_down) + g_cond_wait (&receiver_entry->profile_level_id_cond, + &receiver_entry->profile_level_id_mutex); + + exit_early = receiver_entry->shutting_down; + + g_mutex_unlock (&receiver_entry->profile_level_id_mutex); + + if (exit_early) + return; + + g_print ("Creating negotiation offer\n"); + + promise = gst_promise_new_with_change_func (on_offer_created_cb, + (gpointer) receiver_entry, NULL); + g_signal_emit_by_name (G_OBJECT (webrtcbin), "create-offer", NULL, promise); +} + + +void +on_ice_candidate_cb (G_GNUC_UNUSED GstElement * webrtcbin, guint mline_index, + gchar * candidate, gpointer user_data) +{ + JsonObject *ice_json; + JsonObject *ice_data_json; + gchar *json_string; + ReceiverEntry *receiver_entry = (ReceiverEntry *) user_data; + + ice_json = json_object_new (); + json_object_set_string_member (ice_json, "type", "ice"); + + ice_data_json = json_object_new (); + json_object_set_int_member (ice_data_json, "sdpMLineIndex", mline_index); + json_object_set_string_member (ice_data_json, "candidate", candidate); + json_object_set_object_member (ice_json, "data", ice_data_json); + + json_string = get_string_from_json_object (ice_json); + json_object_unref (ice_json); + + soup_websocket_connection_send_text (receiver_entry->connection, json_string); + g_free (json_string); +} + + +void +soup_websocket_message_cb (G_GNUC_UNUSED SoupWebsocketConnection * connection, + SoupWebsocketDataType data_type, GBytes * message, gpointer user_data) +{ + gsize size; + gchar *data; + gchar *data_string; + const gchar *type_string; + JsonNode *root_json; + JsonObject *root_json_object; + JsonObject *data_json_object; + JsonParser *json_parser = NULL; + ReceiverEntry *receiver_entry = (ReceiverEntry *) user_data; + + switch (data_type) { + case SOUP_WEBSOCKET_DATA_BINARY: + g_error ("Received unknown binary message, ignoring\n"); + g_bytes_unref (message); + return; + + case SOUP_WEBSOCKET_DATA_TEXT: + data = g_bytes_unref_to_data (message, &size); + /* Convert to NULL-terminated string */ + data_string = g_strndup (data, size); + g_free (data); + break; + + default: + g_assert_not_reached (); + } + + json_parser = json_parser_new (); + if (!json_parser_load_from_data (json_parser, data_string, -1, NULL)) + goto unknown_message; + + root_json = json_parser_get_root (json_parser); + if (!JSON_NODE_HOLDS_OBJECT (root_json)) + goto unknown_message; + + root_json_object = json_node_get_object (root_json); + + if (!json_object_has_member (root_json_object, "type")) { + g_error ("Received message without type field\n"); + goto cleanup; + } + type_string = json_object_get_string_member (root_json_object, "type"); + + if (!json_object_has_member (root_json_object, "data")) { + g_error ("Received message without data field\n"); + goto cleanup; + } + data_json_object = json_object_get_object_member (root_json_object, "data"); + + if (g_strcmp0 (type_string, "sdp") == 0) { + const gchar *sdp_type_string; + const gchar *sdp_string; + GstPromise *promise; + GstSDPMessage *sdp; + GstWebRTCSessionDescription *answer; + int ret; + + if (!json_object_has_member (data_json_object, "type")) { + g_error ("Received SDP message without type field\n"); + goto cleanup; + } + sdp_type_string = json_object_get_string_member (data_json_object, "type"); + + if (g_strcmp0 (sdp_type_string, "answer") != 0) { + g_error ("Expected SDP message type \"answer\", got \"%s\"\n", + sdp_type_string); + goto cleanup; + } + + if (!json_object_has_member (data_json_object, "sdp")) { + g_error ("Received SDP message without SDP string\n"); + goto cleanup; + } + sdp_string = json_object_get_string_member (data_json_object, "sdp"); + + g_print ("Received SDP:\n%s\n", sdp_string); + + ret = gst_sdp_message_new (&sdp); + g_assert_cmphex (ret, ==, GST_SDP_OK); + + ret = + gst_sdp_message_parse_buffer ((guint8 *) sdp_string, + strlen (sdp_string), sdp); + if (ret != GST_SDP_OK) { + g_error ("Could not parse SDP string\n"); + goto cleanup; + } + + answer = gst_webrtc_session_description_new (GST_WEBRTC_SDP_TYPE_ANSWER, + sdp); + g_assert_nonnull (answer); + + promise = gst_promise_new (); + g_signal_emit_by_name (receiver_entry->webrtcbin, "set-remote-description", + answer, promise); + gst_promise_interrupt (promise); + gst_promise_unref (promise); + } else if (g_strcmp0 (type_string, "ice") == 0) { + guint mline_index; + const gchar *candidate_string; + + if (!json_object_has_member (data_json_object, "sdpMLineIndex")) { + g_error ("Received ICE message without mline index\n"); + goto cleanup; + } + mline_index = + json_object_get_int_member (data_json_object, "sdpMLineIndex"); + + if (!json_object_has_member (data_json_object, "candidate")) { + g_error ("Received ICE message without ICE candidate string\n"); + goto cleanup; + } + candidate_string = json_object_get_string_member (data_json_object, + "candidate"); + + g_print ("Received ICE candidate with mline index %u; candidate: %s\n", + mline_index, candidate_string); + + g_signal_emit_by_name (receiver_entry->webrtcbin, "add-ice-candidate", + mline_index, candidate_string); + } else + goto unknown_message; + +cleanup: + if (json_parser != NULL) + g_object_unref (G_OBJECT (json_parser)); + g_free (data_string); + return; + +unknown_message: + g_error ("Unknown message \"%s\", ignoring", data_string); + goto cleanup; +} + + +void +soup_websocket_closed_cb (SoupWebsocketConnection * connection, + gpointer user_data) +{ + GHashTable *receiver_entry_table = (GHashTable *) user_data; + g_hash_table_remove (receiver_entry_table, connection); + g_print ("Closed websocket connection %p\n", (gpointer) connection); +} + + +void +soup_http_handler (G_GNUC_UNUSED SoupServer * soup_server, + SoupMessage * message, const char *path, G_GNUC_UNUSED GHashTable * query, + G_GNUC_UNUSED SoupClientContext * client_context, + G_GNUC_UNUSED gpointer user_data) +{ + SoupBuffer *soup_buffer; + + if ((g_strcmp0 (path, "/") != 0) && (g_strcmp0 (path, "/index.html") != 0)) { + soup_message_set_status (message, SOUP_STATUS_NOT_FOUND); + return; + } + + soup_buffer = + soup_buffer_new (SOUP_MEMORY_STATIC, html_source, strlen (html_source)); + + soup_message_headers_set_content_type (message->response_headers, "text/html", + NULL); + soup_message_body_append_buffer (message->response_body, soup_buffer); + soup_buffer_free (soup_buffer); + + soup_message_set_status (message, SOUP_STATUS_OK); +} + + +void +soup_websocket_handler (G_GNUC_UNUSED SoupServer * server, + SoupWebsocketConnection * connection, G_GNUC_UNUSED const char *path, + G_GNUC_UNUSED SoupClientContext * client_context, gpointer user_data) +{ + ReceiverEntry *receiver_entry; + GHashTable *receiver_entry_table = (GHashTable *) user_data; + + g_print ("Processing new websocket connection %p", (gpointer) connection); + + g_signal_connect (G_OBJECT (connection), "closed", + G_CALLBACK (soup_websocket_closed_cb), (gpointer) receiver_entry_table); + + receiver_entry = create_receiver_entry (connection); + g_hash_table_replace (receiver_entry_table, connection, receiver_entry); +} + + +static gchar * +get_string_from_json_object (JsonObject * object) +{ + JsonNode *root; + JsonGenerator *generator; + gchar *text; + + /* Make it the root node */ + root = json_node_init_object (json_node_alloc (), object); + generator = json_generator_new (); + json_generator_set_root (generator, root); + text = json_generator_to_data (generator, NULL); + + /* Release everything */ + g_object_unref (generator); + json_node_free (root); + return text; +} + + +gboolean +exit_sighandler (gpointer user_data) +{ + g_print ("Caught signal, stopping mainloop\n"); + GMainLoop *mainloop = (GMainLoop *) user_data; + g_main_loop_quit (mainloop); + return TRUE; +} + + +int +main (int argc, char *argv[]) +{ + GMainLoop *mainloop; + SoupServer *soup_server; + GHashTable *receiver_entry_table; + + gst_init (&argc, &argv); + + receiver_entry_table = + g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, + destroy_receiver_entry); + + mainloop = g_main_loop_new (NULL, FALSE); + g_assert (mainloop != NULL); + + g_unix_signal_add (SIGINT, exit_sighandler, mainloop); + g_unix_signal_add (SIGTERM, exit_sighandler, mainloop); + + soup_server = + soup_server_new (SOUP_SERVER_SERVER_HEADER, "webrtc-soup-server", NULL); + soup_server_add_handler (soup_server, "/", soup_http_handler, NULL, NULL); + soup_server_add_websocket_handler (soup_server, "/ws", NULL, NULL, + soup_websocket_handler, (gpointer) receiver_entry_table, NULL); + soup_server_listen_all (soup_server, SOUP_HTTP_PORT, + (SoupServerListenOptions) 0, NULL); + + g_print ("WebRTC page link: http://127.0.0.1:%d/\n", (gint) SOUP_HTTP_PORT); + + g_main_loop_run (mainloop); + + g_object_unref (G_OBJECT (soup_server)); + g_hash_table_destroy (receiver_entry_table); + g_main_loop_unref (mainloop); + + gst_deinit (); + + return 0; +}