From 8816764112408766889c8b680a3af51115df4bf5 Mon Sep 17 00:00:00 2001 From: Thiago Santos Date: Sat, 14 May 2016 11:09:33 -0300 Subject: [PATCH] souphttpsrc: reduce reading latency by using non-blocking read Non-blocking read will return the amount of data available without blocking to wait for the full requested size. The downside is that now it souphttpsrc needs to have a waiting mechanism in case there is no data available yet to avoid busy looping arond the inputstream. --- ext/soup/gstsouphttpsrc.c | 115 ++++++++++++++++++++++++++++++++------ ext/soup/gstsouphttpsrc.h | 4 ++ 2 files changed, 103 insertions(+), 16 deletions(-) diff --git a/ext/soup/gstsouphttpsrc.c b/ext/soup/gstsouphttpsrc.c index b707d08997..75707cbf21 100644 --- a/ext/soup/gstsouphttpsrc.c +++ b/ext/soup/gstsouphttpsrc.c @@ -77,6 +77,7 @@ #endif #include #include +#include #include #include "gstsouphttpsrc.h" #include "gstsouputils.h" @@ -175,6 +176,7 @@ static void gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, static void gst_soup_http_src_authenticate_cb (SoupSession * session, SoupMessage * msg, SoupAuth * auth, gboolean retrying, GstSoupHTTPSrc * src); +static void gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src); #define gst_soup_http_src_parent_class parent_class G_DEFINE_TYPE_WITH_CODE (GstSoupHTTPSrc, gst_soup_http_src, GST_TYPE_PUSH_SRC, @@ -439,10 +441,7 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src) src->ret = GST_FLOW_OK; g_cancellable_reset (src->cancellable); - if (src->input_stream) { - g_object_unref (src->input_stream); - src->input_stream = NULL; - } + gst_soup_http_src_destroy_input_stream (src); gst_caps_replace (&src->src_caps, NULL); g_free (src->iradio_name); @@ -461,6 +460,7 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src) g_mutex_init (&src->mutex); g_cond_init (&src->have_headers_cond); src->cancellable = g_cancellable_new (); + src->poll_context = g_main_context_new (); src->location = NULL; src->redirection_uri = NULL; src->automatic_redirect = TRUE; @@ -515,6 +515,7 @@ gst_soup_http_src_finalize (GObject * gobject) g_mutex_clear (&src->mutex); g_cond_clear (&src->have_headers_cond); g_object_unref (src->cancellable); + g_main_context_unref (src->poll_context); g_free (src->location); g_free (src->redirection_uri); g_free (src->user_agent); @@ -764,6 +765,21 @@ gst_soup_http_src_unicodify (const gchar * str) return gst_tag_freeform_string_to_utf8 (str, -1, env_vars); } +static void +gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src) +{ + if (src->input_stream) { + if (src->poll_source) { + g_source_destroy (src->poll_source); + g_source_unref (src->poll_source); + src->poll_source = NULL; + } + g_input_stream_close (src->input_stream, src->cancellable, NULL); + g_object_unref (src->input_stream); + src->input_stream = NULL; + } +} + static void gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src) { @@ -1355,11 +1371,25 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method) return TRUE; } +static void +gst_soup_http_src_check_input_stream_interfaces (GstSoupHTTPSrc * src) +{ + if (!src->input_stream) + return; + + src->has_pollable_interface = G_IS_POLLABLE_INPUT_STREAM (src->input_stream) + && g_pollable_input_stream_can_poll ((GPollableInputStream *) + src->input_stream); +} + static GstFlowReturn gst_soup_http_src_send_message (GstSoupHTTPSrc * src) { g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR); + g_assert (src->input_stream == NULL); + g_assert (src->poll_source == NULL); + /* FIXME We are ignoring the GError here, might be useful to debug */ src->input_stream = soup_session_send (src->session, src->msg, src->cancellable, NULL); @@ -1380,6 +1410,8 @@ gst_soup_http_src_send_message (GstSoupHTTPSrc * src) return GST_FLOW_ERROR; } + gst_soup_http_src_check_input_stream_interfaces (src); + return GST_FLOW_OK; } @@ -1448,6 +1480,38 @@ gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read) } } +static gboolean +_gst_soup_http_src_data_available_callback (GObject * pollable_stream, + gpointer udata) +{ + GstSoupHTTPSrc *src = udata; + + src->have_data = TRUE; + return TRUE; +} + +/* Need to wait on a gsource to know when data is available */ +static gboolean +gst_soup_http_src_wait_for_data (GstSoupHTTPSrc * src) +{ + src->have_data = FALSE; + + if (!src->poll_source) { + src->poll_source = + g_pollable_input_stream_create_source ((GPollableInputStream *) + src->input_stream, src->cancellable); + g_source_set_callback (src->poll_source, + (GSourceFunc) _gst_soup_http_src_data_available_callback, src, NULL); + g_source_attach (src->poll_source, src->poll_context); + } + + while (!src->have_data && !g_cancellable_is_cancelled (src->cancellable)) { + g_main_context_iteration (src->poll_context, TRUE); + } + + return src->have_data; +} + static GstFlowReturn gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf) { @@ -1455,6 +1519,7 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf) GstMapInfo mapinfo; GstBaseSrc *bsrc; GstFlowReturn ret; + GError *err = NULL; bsrc = GST_BASE_SRC_CAST (src); @@ -1469,9 +1534,34 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf) return GST_FLOW_ERROR; } - read_bytes = - g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size, - src->cancellable, NULL); + if (src->has_pollable_interface) { + while (1) { + read_bytes = + g_pollable_input_stream_read_nonblocking ((GPollableInputStream *) + src->input_stream, mapinfo.data, mapinfo.size, src->cancellable, + &err); + if (read_bytes == -1) { + if (err && g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_error_free (err); + err = NULL; + + /* no data yet, wait */ + if (gst_soup_http_src_wait_for_data (src)) + /* retry */ + continue; + } + } + break; + } + } else { + read_bytes = + g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size, + src->cancellable, NULL); + } + + if (err) + g_error_free (err); + GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input", read_bytes); @@ -1518,11 +1608,7 @@ retry: /* Check for pending position change */ if (src->request_position != src->read_position) { - if (src->input_stream) { - g_input_stream_close (src->input_stream, src->cancellable, NULL); - g_object_unref (src->input_stream); - src->input_stream = NULL; - } + gst_soup_http_src_destroy_input_stream (src); } if (g_cancellable_is_cancelled (src->cancellable)) { @@ -1559,10 +1645,7 @@ done: gst_event_unref (http_headers_event); g_mutex_lock (&src->mutex); - if (src->input_stream) { - g_object_unref (src->input_stream); - src->input_stream = NULL; - } + gst_soup_http_src_destroy_input_stream (src); g_mutex_unlock (&src->mutex); if (ret == GST_FLOW_CUSTOM_ERROR) goto retry; diff --git a/ext/soup/gstsouphttpsrc.h b/ext/soup/gstsouphttpsrc.h index 2d5b3ee840..a5e259b1c2 100644 --- a/ext/soup/gstsouphttpsrc.h +++ b/ext/soup/gstsouphttpsrc.h @@ -89,6 +89,10 @@ struct _GstSoupHTTPSrc { GCancellable *cancellable; GInputStream *input_stream; + gboolean has_pollable_interface; + gboolean have_data; + GMainContext *poll_context; + GSource *poll_source; /* Shoutcast/icecast metadata extraction handling. */ gboolean iradio_mode;