mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-19 14:56:36 +00:00
souphttpsrc: reduce reading latency by using non-blocking read
Non-blocking read will return the amount of data available without blocking to wait for the full requested size. The downside is that now it souphttpsrc needs to have a waiting mechanism in case there is no data available yet to avoid busy looping arond the inputstream.
This commit is contained in:
parent
fe34f46f32
commit
8816764112
2 changed files with 103 additions and 16 deletions
|
@ -77,6 +77,7 @@
|
||||||
#endif
|
#endif
|
||||||
#include <gst/gstelement.h>
|
#include <gst/gstelement.h>
|
||||||
#include <gst/gst-i18n-plugin.h>
|
#include <gst/gst-i18n-plugin.h>
|
||||||
|
#include <gio/gio.h>
|
||||||
#include <libsoup/soup.h>
|
#include <libsoup/soup.h>
|
||||||
#include "gstsouphttpsrc.h"
|
#include "gstsouphttpsrc.h"
|
||||||
#include "gstsouputils.h"
|
#include "gstsouputils.h"
|
||||||
|
@ -175,6 +176,7 @@ static void gst_soup_http_src_got_headers (GstSoupHTTPSrc * src,
|
||||||
static void gst_soup_http_src_authenticate_cb (SoupSession * session,
|
static void gst_soup_http_src_authenticate_cb (SoupSession * session,
|
||||||
SoupMessage * msg, SoupAuth * auth, gboolean retrying,
|
SoupMessage * msg, SoupAuth * auth, gboolean retrying,
|
||||||
GstSoupHTTPSrc * src);
|
GstSoupHTTPSrc * src);
|
||||||
|
static void gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src);
|
||||||
|
|
||||||
#define gst_soup_http_src_parent_class parent_class
|
#define gst_soup_http_src_parent_class parent_class
|
||||||
G_DEFINE_TYPE_WITH_CODE (GstSoupHTTPSrc, gst_soup_http_src, GST_TYPE_PUSH_SRC,
|
G_DEFINE_TYPE_WITH_CODE (GstSoupHTTPSrc, gst_soup_http_src, GST_TYPE_PUSH_SRC,
|
||||||
|
@ -439,10 +441,7 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src)
|
||||||
|
|
||||||
src->ret = GST_FLOW_OK;
|
src->ret = GST_FLOW_OK;
|
||||||
g_cancellable_reset (src->cancellable);
|
g_cancellable_reset (src->cancellable);
|
||||||
if (src->input_stream) {
|
gst_soup_http_src_destroy_input_stream (src);
|
||||||
g_object_unref (src->input_stream);
|
|
||||||
src->input_stream = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
gst_caps_replace (&src->src_caps, NULL);
|
gst_caps_replace (&src->src_caps, NULL);
|
||||||
g_free (src->iradio_name);
|
g_free (src->iradio_name);
|
||||||
|
@ -461,6 +460,7 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src)
|
||||||
g_mutex_init (&src->mutex);
|
g_mutex_init (&src->mutex);
|
||||||
g_cond_init (&src->have_headers_cond);
|
g_cond_init (&src->have_headers_cond);
|
||||||
src->cancellable = g_cancellable_new ();
|
src->cancellable = g_cancellable_new ();
|
||||||
|
src->poll_context = g_main_context_new ();
|
||||||
src->location = NULL;
|
src->location = NULL;
|
||||||
src->redirection_uri = NULL;
|
src->redirection_uri = NULL;
|
||||||
src->automatic_redirect = TRUE;
|
src->automatic_redirect = TRUE;
|
||||||
|
@ -515,6 +515,7 @@ gst_soup_http_src_finalize (GObject * gobject)
|
||||||
g_mutex_clear (&src->mutex);
|
g_mutex_clear (&src->mutex);
|
||||||
g_cond_clear (&src->have_headers_cond);
|
g_cond_clear (&src->have_headers_cond);
|
||||||
g_object_unref (src->cancellable);
|
g_object_unref (src->cancellable);
|
||||||
|
g_main_context_unref (src->poll_context);
|
||||||
g_free (src->location);
|
g_free (src->location);
|
||||||
g_free (src->redirection_uri);
|
g_free (src->redirection_uri);
|
||||||
g_free (src->user_agent);
|
g_free (src->user_agent);
|
||||||
|
@ -764,6 +765,21 @@ gst_soup_http_src_unicodify (const gchar * str)
|
||||||
return gst_tag_freeform_string_to_utf8 (str, -1, env_vars);
|
return gst_tag_freeform_string_to_utf8 (str, -1, env_vars);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src)
|
||||||
|
{
|
||||||
|
if (src->input_stream) {
|
||||||
|
if (src->poll_source) {
|
||||||
|
g_source_destroy (src->poll_source);
|
||||||
|
g_source_unref (src->poll_source);
|
||||||
|
src->poll_source = NULL;
|
||||||
|
}
|
||||||
|
g_input_stream_close (src->input_stream, src->cancellable, NULL);
|
||||||
|
g_object_unref (src->input_stream);
|
||||||
|
src->input_stream = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
|
gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
|
||||||
{
|
{
|
||||||
|
@ -1355,11 +1371,25 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method)
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
gst_soup_http_src_check_input_stream_interfaces (GstSoupHTTPSrc * src)
|
||||||
|
{
|
||||||
|
if (!src->input_stream)
|
||||||
|
return;
|
||||||
|
|
||||||
|
src->has_pollable_interface = G_IS_POLLABLE_INPUT_STREAM (src->input_stream)
|
||||||
|
&& g_pollable_input_stream_can_poll ((GPollableInputStream *)
|
||||||
|
src->input_stream);
|
||||||
|
}
|
||||||
|
|
||||||
static GstFlowReturn
|
static GstFlowReturn
|
||||||
gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
|
gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
|
||||||
{
|
{
|
||||||
g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR);
|
g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR);
|
||||||
|
|
||||||
|
g_assert (src->input_stream == NULL);
|
||||||
|
g_assert (src->poll_source == NULL);
|
||||||
|
|
||||||
/* FIXME We are ignoring the GError here, might be useful to debug */
|
/* FIXME We are ignoring the GError here, might be useful to debug */
|
||||||
src->input_stream =
|
src->input_stream =
|
||||||
soup_session_send (src->session, src->msg, src->cancellable, NULL);
|
soup_session_send (src->session, src->msg, src->cancellable, NULL);
|
||||||
|
@ -1380,6 +1410,8 @@ gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
|
||||||
return GST_FLOW_ERROR;
|
return GST_FLOW_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
gst_soup_http_src_check_input_stream_interfaces (src);
|
||||||
|
|
||||||
return GST_FLOW_OK;
|
return GST_FLOW_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1448,6 +1480,38 @@ gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static gboolean
|
||||||
|
_gst_soup_http_src_data_available_callback (GObject * pollable_stream,
|
||||||
|
gpointer udata)
|
||||||
|
{
|
||||||
|
GstSoupHTTPSrc *src = udata;
|
||||||
|
|
||||||
|
src->have_data = TRUE;
|
||||||
|
return TRUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Need to wait on a gsource to know when data is available */
|
||||||
|
static gboolean
|
||||||
|
gst_soup_http_src_wait_for_data (GstSoupHTTPSrc * src)
|
||||||
|
{
|
||||||
|
src->have_data = FALSE;
|
||||||
|
|
||||||
|
if (!src->poll_source) {
|
||||||
|
src->poll_source =
|
||||||
|
g_pollable_input_stream_create_source ((GPollableInputStream *)
|
||||||
|
src->input_stream, src->cancellable);
|
||||||
|
g_source_set_callback (src->poll_source,
|
||||||
|
(GSourceFunc) _gst_soup_http_src_data_available_callback, src, NULL);
|
||||||
|
g_source_attach (src->poll_source, src->poll_context);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!src->have_data && !g_cancellable_is_cancelled (src->cancellable)) {
|
||||||
|
g_main_context_iteration (src->poll_context, TRUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
return src->have_data;
|
||||||
|
}
|
||||||
|
|
||||||
static GstFlowReturn
|
static GstFlowReturn
|
||||||
gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
|
gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
|
||||||
{
|
{
|
||||||
|
@ -1455,6 +1519,7 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
|
||||||
GstMapInfo mapinfo;
|
GstMapInfo mapinfo;
|
||||||
GstBaseSrc *bsrc;
|
GstBaseSrc *bsrc;
|
||||||
GstFlowReturn ret;
|
GstFlowReturn ret;
|
||||||
|
GError *err = NULL;
|
||||||
|
|
||||||
bsrc = GST_BASE_SRC_CAST (src);
|
bsrc = GST_BASE_SRC_CAST (src);
|
||||||
|
|
||||||
|
@ -1469,9 +1534,34 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
|
||||||
return GST_FLOW_ERROR;
|
return GST_FLOW_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
read_bytes =
|
if (src->has_pollable_interface) {
|
||||||
g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
|
while (1) {
|
||||||
src->cancellable, NULL);
|
read_bytes =
|
||||||
|
g_pollable_input_stream_read_nonblocking ((GPollableInputStream *)
|
||||||
|
src->input_stream, mapinfo.data, mapinfo.size, src->cancellable,
|
||||||
|
&err);
|
||||||
|
if (read_bytes == -1) {
|
||||||
|
if (err && g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
|
||||||
|
g_error_free (err);
|
||||||
|
err = NULL;
|
||||||
|
|
||||||
|
/* no data yet, wait */
|
||||||
|
if (gst_soup_http_src_wait_for_data (src))
|
||||||
|
/* retry */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
read_bytes =
|
||||||
|
g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
|
||||||
|
src->cancellable, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (err)
|
||||||
|
g_error_free (err);
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input",
|
GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input",
|
||||||
read_bytes);
|
read_bytes);
|
||||||
|
|
||||||
|
@ -1518,11 +1608,7 @@ retry:
|
||||||
|
|
||||||
/* Check for pending position change */
|
/* Check for pending position change */
|
||||||
if (src->request_position != src->read_position) {
|
if (src->request_position != src->read_position) {
|
||||||
if (src->input_stream) {
|
gst_soup_http_src_destroy_input_stream (src);
|
||||||
g_input_stream_close (src->input_stream, src->cancellable, NULL);
|
|
||||||
g_object_unref (src->input_stream);
|
|
||||||
src->input_stream = NULL;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (g_cancellable_is_cancelled (src->cancellable)) {
|
if (g_cancellable_is_cancelled (src->cancellable)) {
|
||||||
|
@ -1559,10 +1645,7 @@ done:
|
||||||
gst_event_unref (http_headers_event);
|
gst_event_unref (http_headers_event);
|
||||||
|
|
||||||
g_mutex_lock (&src->mutex);
|
g_mutex_lock (&src->mutex);
|
||||||
if (src->input_stream) {
|
gst_soup_http_src_destroy_input_stream (src);
|
||||||
g_object_unref (src->input_stream);
|
|
||||||
src->input_stream = NULL;
|
|
||||||
}
|
|
||||||
g_mutex_unlock (&src->mutex);
|
g_mutex_unlock (&src->mutex);
|
||||||
if (ret == GST_FLOW_CUSTOM_ERROR)
|
if (ret == GST_FLOW_CUSTOM_ERROR)
|
||||||
goto retry;
|
goto retry;
|
||||||
|
|
|
@ -89,6 +89,10 @@ struct _GstSoupHTTPSrc {
|
||||||
|
|
||||||
GCancellable *cancellable;
|
GCancellable *cancellable;
|
||||||
GInputStream *input_stream;
|
GInputStream *input_stream;
|
||||||
|
gboolean has_pollable_interface;
|
||||||
|
gboolean have_data;
|
||||||
|
GMainContext *poll_context;
|
||||||
|
GSource *poll_source;
|
||||||
|
|
||||||
/* Shoutcast/icecast metadata extraction handling. */
|
/* Shoutcast/icecast metadata extraction handling. */
|
||||||
gboolean iradio_mode;
|
gboolean iradio_mode;
|
||||||
|
|
Loading…
Reference in a new issue