diff --git a/network/http-launch/http-launch.c b/network/http-launch/http-launch.c new file mode 100644 index 0000000000..91cf8d0d71 --- /dev/null +++ b/network/http-launch/http-launch.c @@ -0,0 +1,548 @@ +/* + * Copyright (C) 2013 Sebastian Dröge + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include + +typedef struct +{ + gchar *name; + GSocketConnection *connection; + GSocket *socket; + GInputStream *istream; + GOutputStream *ostream; + GSource *isource, *tosource; + GByteArray *current_message; + gchar *http_version; + gboolean waiting_200_ok; +} Client; + +static const char *known_mimetypes[] = { + "video/webm", + "multipart/x-mixed-replace", + NULL +}; + +static GMainLoop *loop = NULL; +G_LOCK_DEFINE_STATIC (clients); +static GList *clients = NULL; +static GstElement *pipeline = NULL; +static GstElement *multisocketsink = NULL; +static gboolean started = FALSE; +static gchar *content_type; +G_LOCK_DEFINE_STATIC (caps); +static gboolean caps_resolved = FALSE; + +static void +remove_client (Client * client) +{ + g_print ("Removing connection %s\n", client->name); + + G_LOCK (clients); + clients = g_list_remove (clients, client); + G_UNLOCK (clients); + + g_free (client->name); + g_free (client->http_version); + + if (client->isource) { + g_source_destroy (client->isource); + g_source_unref (client->isource); + } + if (client->tosource) { + g_source_destroy (client->tosource); + g_source_unref (client->tosource); + } + g_object_unref (client->connection); + g_byte_array_unref (client->current_message); + + g_slice_free (Client, client); +} + +static void +write_bytes (Client * client, const gchar * data, guint len) +{ + gssize w; + GError *err = NULL; + + /* TODO: We assume this never blocks */ + do { + w = g_output_stream_write (client->ostream, data, len, NULL, &err); + if (w > 0) { + len -= w; + data += w; + } + } while (w > 0 && len > 0); + + if (w <= 0) { + if (err) { + g_print ("Write error %s\n", err->message); + g_clear_error (&err); + } + remove_client (client); + } +} + +static void +send_response_200_ok (Client * client) +{ + gchar *response; + response = g_strdup_printf ("%s 200 OK\r\n%s\r\n", client->http_version, + content_type); + write_bytes (client, response, strlen (response)); + g_free (response); +} + +static void +send_response_404_not_found (Client * client) +{ + gchar *response; + response = g_strdup_printf ("%s 404 Not Found\r\n\r\n", client->http_version); + write_bytes (client, response, strlen (response)); + g_free (response); +} + +static void +client_message (Client * client, const gchar * data, guint len) +{ + gboolean http_head_request = FALSE; + gboolean http_get_request = FALSE; + gchar **lines = g_strsplit_set (data, "\r\n", -1); + + if (g_str_has_prefix (lines[0], "HEAD")) + http_head_request = TRUE; + else if (g_str_has_prefix (lines[0], "GET")) + http_get_request = TRUE; + + if (http_head_request || http_get_request) { + gchar **parts = g_strsplit (lines[0], " ", -1); + gboolean ok = FALSE; + + g_free (client->http_version); + + if (parts[1] && parts[2] && *parts[2] != '\0') + client->http_version = g_strdup (parts[2]); + else + client->http_version = g_strdup ("HTTP/1.0"); + + if (parts[1] && strcmp (parts[1], "/") == 0) { + G_LOCK (caps); + if (caps_resolved) + send_response_200_ok (client); + else + client->waiting_200_ok = TRUE; + G_UNLOCK (caps); + ok = TRUE; + } else { + send_response_404_not_found (client); + } + g_strfreev (parts); + + if (ok) { + if (http_get_request) { + /* Start streaming to client socket */ + g_source_destroy (client->isource); + g_source_unref (client->isource); + client->isource = NULL; + g_source_destroy (client->tosource); + g_source_unref (client->tosource); + client->tosource = NULL; + g_print ("Starting to stream to %s\n", client->name); + g_signal_emit_by_name (multisocketsink, "add", client->socket); + } + + if (!started) { + g_print ("Starting pipeline\n"); + if (gst_element_set_state (pipeline, + GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { + g_print ("Failed to start pipeline\n"); + g_main_loop_quit (loop); + } + started = TRUE; + } + } + } else { + gchar **parts = g_strsplit (lines[0], " ", -1); + gchar *response; + const gchar *http_version; + + if (parts[1] && parts[2] && *parts[2] != '\0') + http_version = parts[2]; + else + http_version = "HTTP/1.0"; + + response = g_strdup_printf ("%s 400 Bad Request\r\n\r\n", http_version); + write_bytes (client, response, strlen (response)); + g_free (response); + g_strfreev (parts); + remove_client (client); + } + + g_strfreev (lines); +} + +static gboolean +on_timeout (Client * client) +{ + g_print ("Timeout\n"); + remove_client (client); + + return FALSE; +} + +static gboolean +on_read_bytes (GPollableInputStream * stream, Client * client) +{ + gssize r; + gchar data[4096]; + GError *err = NULL; + + do { + r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM + (client->istream), data, sizeof (data), NULL, &err); + if (r > 0) + g_byte_array_append (client->current_message, (guint8 *) data, r); + } while (r > 0); + + if (r == 0) { + remove_client (client); + return FALSE; + } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + guint8 *tmp = client->current_message->data; + guint tmp_len = client->current_message->len; + + g_clear_error (&err); + + while (tmp_len > 3) { + if (tmp[0] == 0x0d && tmp[1] == 0x0a && tmp[2] == 0x0d && tmp[3] == 0x0a) { + guint len; + + g_byte_array_append (client->current_message, (const guint8 *) "\0", 1); + len = tmp - client->current_message->data + 5; + client_message (client, (gchar *) client->current_message->data, len); + g_byte_array_remove_range (client->current_message, 0, len); + tmp = client->current_message->data; + tmp_len = client->current_message->len; + } else { + tmp++; + tmp_len--; + } + } + + if (client->current_message->len >= 1024 * 1024) { + g_print ("No complete request after 1MB of data\n"); + remove_client (client); + return FALSE; + } + + return TRUE; + } else { + g_print ("Read error %s\n", err->message); + g_clear_error (&err); + remove_client (client); + return FALSE; + } + + return FALSE; +} + +static gboolean +on_new_connection (GSocketService * service, GSocketConnection * connection, + GObject * source_object, gpointer user_data) +{ + Client *client = g_slice_new0 (Client); + GSocketAddress *addr; + GInetAddress *iaddr; + gchar *ip; + guint16 port; + + addr = g_socket_connection_get_remote_address (connection, NULL); + iaddr = g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (addr)); + port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr)); + ip = g_inet_address_to_string (iaddr); + client->name = g_strdup_printf ("%s:%u", ip, port); + g_free (ip); + g_object_unref (addr); + + g_print ("New connection %s\n", client->name); + + client->waiting_200_ok = FALSE; + client->http_version = g_strdup (""); + client->connection = g_object_ref (connection); + client->socket = g_socket_connection_get_socket (connection); + client->istream = + g_io_stream_get_input_stream (G_IO_STREAM (client->connection)); + client->ostream = + g_io_stream_get_output_stream (G_IO_STREAM (client->connection)); + client->current_message = g_byte_array_sized_new (1024); + + client->tosource = g_timeout_source_new_seconds (5); + g_source_set_callback (client->tosource, (GSourceFunc) on_timeout, client, + NULL); + g_source_attach (client->tosource, NULL); + + client->isource = + g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM + (client->istream), NULL); + g_source_set_callback (client->isource, (GSourceFunc) on_read_bytes, client, + NULL); + g_source_attach (client->isource, NULL); + + G_LOCK (clients); + clients = g_list_prepend (clients, client); + G_UNLOCK (clients); + + return TRUE; +} + +static gboolean +on_message (GstBus * bus, GstMessage * message, gpointer user_data) +{ + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_ERROR:{ + gchar *debug; + GError *err; + + gst_message_parse_error (message, &err, &debug); + g_print ("Error %s\n", err->message); + g_error_free (err); + g_free (debug); + g_main_loop_quit (loop); + break; + } + case GST_MESSAGE_WARNING:{ + gchar *debug; + GError *err; + + gst_message_parse_warning (message, &err, &debug); + g_print ("Warning %s\n", err->message); + g_error_free (err); + g_free (debug); + break; + } + case GST_MESSAGE_EOS:{ + g_print ("EOS\n"); + g_main_loop_quit (loop); + } + default: + break; + } + + return TRUE; +} + +static void +on_client_socket_removed (GstElement * element, GSocket * socket, + gpointer user_data) +{ + GList *l; + Client *client = NULL; + + G_LOCK (clients); + for (l = clients; l; l = l->next) { + Client *tmp = l->data; + if (socket == tmp->socket) { + client = tmp; + break; + } + } + G_UNLOCK (clients); + + if (client) + remove_client (client); +} + +static void on_stream_caps_changed (GObject *obj, GParamSpec *pspec, + gpointer user_data) +{ + GstPad *src_pad; + GstCaps *src_caps; + GstStructure *gstrc; + GList *l; + + src_pad = (GstPad *) obj; + src_caps = gst_pad_get_current_caps (src_pad); + gstrc = gst_caps_get_structure (src_caps, 0); + + /* + * Include a Content-type header in the case we know the mime + * type is OK in HTTP. Required for MJPEG streams. + */ + int i = 0; + const gchar *mimetype = gst_structure_get_name(gstrc); + while (known_mimetypes[i] != NULL) + { + if (strcmp(mimetype, known_mimetypes[i]) == 0) + { + if (content_type) + g_free(content_type); + + /* Handle the (maybe not so) especial case of multipart to add boundary */ + if (strcmp(mimetype, "multipart/x-mixed-replace") == 0 && + gst_structure_has_field_typed(gstrc, "boundary", G_TYPE_STRING)) + { + const gchar *boundary = gst_structure_get_string(gstrc, "boundary"); + content_type = g_strdup_printf ("Content-Type: " + "multipart/x-mixed-replace;boundary=--%s\r\n", boundary); + } + else + { + content_type = g_strdup_printf ("Content-Type: %s\r\n", mimetype); + } + g_print("%s", content_type); + break; + } + i++; + } + + gst_caps_unref (src_caps); + + /* Send 200 OK to those clients waiting for it */ + G_LOCK (caps); + + caps_resolved = TRUE; + + G_LOCK (clients); + for (l = clients; l; l = l->next) { + Client *cl = l->data; + if (cl->waiting_200_ok) { + send_response_200_ok (cl); + cl->waiting_200_ok = FALSE; + break; + } + } + G_UNLOCK (clients); + + G_UNLOCK (caps); +} + +int +main (gint argc, gchar ** argv) +{ + GSocketService *service; + GstElement *bin, *stream; + GstPad *srcpad, *ghostpad, *sinkpad; + GError *err = NULL; + GstBus *bus; + + gst_init (&argc, &argv); + + if (argc < 4) { + g_print ("usage: %s PORT \n" + "example: %s 8080 ( videotestsrc ! theoraenc ! oggmux name=stream )\n", + argv[0], argv[0]); + return -1; + } + + const gchar *port_str = argv[1]; + const int port = (int) g_ascii_strtoll(port_str, NULL, 10); + + bin = gst_parse_launchv ((const gchar **) argv + 2, &err); + if (!bin) { + g_print ("invalid pipeline: %s\n", err->message); + g_clear_error (&err); + return -2; + } + + stream = gst_bin_get_by_name (GST_BIN (bin), "stream"); + if (!stream) { + g_print ("no element with name \"stream\" found\n"); + gst_object_unref (bin); + return -3; + } + + srcpad = gst_element_get_static_pad (stream, "src"); + if (!srcpad) { + g_print ("no \"src\" pad in element \"stream\" found\n"); + gst_object_unref (stream); + gst_object_unref (bin); + return -4; + } + + content_type = g_strdup (""); + g_signal_connect (srcpad, "notify::caps", + G_CALLBACK (on_stream_caps_changed), + NULL); + + ghostpad = gst_ghost_pad_new ("src", srcpad); + gst_element_add_pad (GST_ELEMENT (bin), ghostpad); + gst_object_unref (srcpad); + + pipeline = gst_pipeline_new (NULL); + + multisocketsink = gst_element_factory_make ("multisocketsink", NULL); + g_object_set (multisocketsink, + "unit-format", GST_FORMAT_TIME, + "units-max", (gint64) 7 * GST_SECOND, + "units-soft-max", (gint64) 3 * GST_SECOND, + "recover-policy", 3 /* keyframe */ , + "timeout", (guint64) 10 * GST_SECOND, + "sync-method", 1 /* next-keyframe */ , + NULL); + + gst_bin_add_many (GST_BIN (pipeline), bin, multisocketsink, NULL); + + sinkpad = gst_element_get_static_pad (multisocketsink, "sink"); + gst_pad_link (ghostpad, sinkpad); + gst_object_unref (sinkpad); + + bus = gst_element_get_bus (pipeline); + gst_bus_add_signal_watch (bus); + g_signal_connect (bus, "message", G_CALLBACK (on_message), NULL); + gst_object_unref (bus); + + g_signal_connect (multisocketsink, "client-socket-removed", + G_CALLBACK (on_client_socket_removed), NULL); + + loop = g_main_loop_new (NULL, FALSE); + + if (gst_element_set_state (pipeline, + GST_STATE_READY) == GST_STATE_CHANGE_FAILURE) { + gst_object_unref (pipeline); + g_main_loop_unref (loop); + g_print ("Failed to set pipeline to ready\n"); + return -5; + } + + service = g_socket_service_new (); + g_socket_listener_add_inet_port (G_SOCKET_LISTENER (service), port, NULL, + NULL); + + g_signal_connect (service, "incoming", G_CALLBACK (on_new_connection), NULL); + + g_socket_service_start (service); + + g_print ("Listening on http://127.0.0.1:%d/\n", port); + + g_main_loop_run (loop); + + g_socket_service_stop (service); + g_object_unref (service); + + gst_element_set_state (pipeline, GST_STATE_NULL); + gst_object_unref (pipeline); + + g_main_loop_unref (loop); + + return 0; +}