Revert "WIP revert soup"

This reverts commit fdac3a7a23.

Was not supposed to be pushed but a local workaround for
https://bugzilla.gnome.org/show_bug.cgi?id=693911#c13
This commit is contained in:
Sebastian Dröge 2016-06-06 10:47:52 +03:00
parent 389e0abeb0
commit 4a2455b744
4 changed files with 317 additions and 497 deletions

View file

@ -4,8 +4,8 @@ libgstsouphttpsrc_la_SOURCES = gstsouphttpsrc.c gstsouphttpclientsink.c gstsoupu
libgstsouphttpsrc_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) \
$(GST_CFLAGS) $(SOUP_CFLAGS) \
-DSOUP_VERSION_MIN_REQUIRED=SOUP_VERSION_2_26 \
-DSOUP_VERSION_MAX_ALLOWED=SOUP_DEPRECATED_IN_2_26
-DSOUP_VERSION_MIN_REQUIRED=SOUP_VERSION_2_48 \
-DSOUP_VERSION_MAX_ALLOWED=SOUP_DEPRECATED_IN_2_48
libgstsouphttpsrc_la_LIBADD = $(GST_PLUGINS_BASE_LIBS) -lgsttag-@GST_API_VERSION@ $(GST_BASE_LIBS) $(SOUP_LIBS)
libgstsouphttpsrc_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
libgstsouphttpsrc_la_LIBTOOLFLAGS = $(GST_PLUGIN_LIBTOOLFLAGS)

View file

@ -264,8 +264,10 @@ gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
g_list_free_full (souphttpsink->streamheader_buffers,
(GDestroyNotify) gst_buffer_unref);
souphttpsink->streamheader_buffers = NULL;
g_list_free_full (souphttpsink->sent_buffers,
(GDestroyNotify) gst_buffer_unref);
souphttpsink->sent_buffers = NULL;
}
static gboolean

View file

@ -77,19 +77,11 @@
#endif
#include <gst/gstelement.h>
#include <gst/gst-i18n-plugin.h>
#include <gio/gio.h>
#include <libsoup/soup.h>
#include "gstsouphttpsrc.h"
#include "gstsouputils.h"
/* libsoup before 2.47.0 was stealing our main context from us,
* so we can't reliable use it to clean up all pending resources
* once we're done... let's just continue leaking on old versions.
* https://bugzilla.gnome.org/show_bug.cgi?id=663944
*/
#if defined(SOUP_MINOR_VERSION) && SOUP_MINOR_VERSION >= 47
#define LIBSOUP_DOES_NOT_STEAL_OUR_CONTEXT 1
#endif
#include <gst/tag/tag.h>
GST_DEBUG_CATEGORY_STATIC (souphttpsrc_debug);
@ -173,31 +165,18 @@ static char *gst_soup_http_src_unicodify (const char *str);
static gboolean gst_soup_http_src_build_message (GstSoupHTTPSrc * src,
const gchar * method);
static void gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src);
static void gst_soup_http_src_queue_message (GstSoupHTTPSrc * src);
static gboolean gst_soup_http_src_add_range_header (GstSoupHTTPSrc * src,
guint64 offset, guint64 stop_offset);
static void gst_soup_http_src_session_unpause_message (GstSoupHTTPSrc * src);
static void gst_soup_http_src_session_pause_message (GstSoupHTTPSrc * src);
static gboolean gst_soup_http_src_session_open (GstSoupHTTPSrc * src);
static void gst_soup_http_src_session_close (GstSoupHTTPSrc * src);
static void gst_soup_http_src_parse_status (SoupMessage * msg,
GstSoupHTTPSrc * src);
static void gst_soup_http_src_chunk_free (gpointer gstbuf);
static SoupBuffer *gst_soup_http_src_chunk_allocator (SoupMessage * msg,
gsize max_len, gpointer user_data);
static void gst_soup_http_src_got_chunk_cb (SoupMessage * msg,
SoupBuffer * chunk, GstSoupHTTPSrc * src);
static void gst_soup_http_src_response_cb (SoupSession * session,
SoupMessage * msg, GstSoupHTTPSrc * src);
static void gst_soup_http_src_got_headers_cb (SoupMessage * msg,
GstSoupHTTPSrc * src);
static void gst_soup_http_src_got_body_cb (SoupMessage * msg,
GstSoupHTTPSrc * src);
static void gst_soup_http_src_finished_cb (SoupMessage * msg,
GstSoupHTTPSrc * src);
static void gst_soup_http_src_got_headers (GstSoupHTTPSrc * src,
SoupMessage * msg);
static void gst_soup_http_src_authenticate_cb (SoupSession * session,
SoupMessage * msg, SoupAuth * auth, gboolean retrying,
GstSoupHTTPSrc * src);
static void gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src);
#define gst_soup_http_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstSoupHTTPSrc, gst_soup_http_src, GST_TYPE_PUSH_SRC,
@ -450,8 +429,6 @@ gst_soup_http_src_class_init (GstSoupHTTPSrcClass * klass)
static void
gst_soup_http_src_reset (GstSoupHTTPSrc * src)
{
src->interrupted = FALSE;
src->retry = FALSE;
src->retry_count = 0;
src->have_size = FALSE;
src->got_headers = FALSE;
@ -463,6 +440,8 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src)
src->have_body = FALSE;
src->ret = GST_FLOW_OK;
g_cancellable_reset (src->cancellable);
gst_soup_http_src_destroy_input_stream (src);
gst_caps_replace (&src->src_caps, NULL);
g_free (src->iradio_name);
@ -479,7 +458,9 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src)
const gchar *proxy;
g_mutex_init (&src->mutex);
g_cond_init (&src->request_finished_cond);
g_cond_init (&src->have_headers_cond);
src->cancellable = g_cancellable_new ();
src->poll_context = g_main_context_new ();
src->location = NULL;
src->redirection_uri = NULL;
src->automatic_redirect = TRUE;
@ -490,8 +471,6 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src)
src->proxy_pw = NULL;
src->cookies = NULL;
src->iradio_mode = DEFAULT_IRADIO_MODE;
src->loop = NULL;
src->context = NULL;
src->session = NULL;
src->msg = NULL;
src->timeout = DEFAULT_TIMEOUT;
@ -534,7 +513,9 @@ gst_soup_http_src_finalize (GObject * gobject)
GST_DEBUG_OBJECT (src, "finalize");
g_mutex_clear (&src->mutex);
g_cond_clear (&src->request_finished_cond);
g_cond_clear (&src->have_headers_cond);
g_object_unref (src->cancellable);
g_main_context_unref (src->poll_context);
g_free (src->location);
g_free (src->redirection_uri);
g_free (src->user_agent);
@ -785,23 +766,25 @@ gst_soup_http_src_unicodify (const gchar * str)
}
static void
gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src)
{
if (src->msg != NULL) {
GST_INFO_OBJECT (src, "Cancelling message");
src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED;
soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED);
if (src->input_stream) {
if (src->poll_source) {
g_source_destroy (src->poll_source);
g_source_unref (src->poll_source);
src->poll_source = NULL;
}
g_input_stream_close (src->input_stream, src->cancellable, NULL);
g_object_unref (src->input_stream);
src->input_stream = NULL;
}
src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
src->msg = NULL;
}
static void
gst_soup_http_src_queue_message (GstSoupHTTPSrc * src)
gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
{
soup_session_queue_message (src->session, src->msg,
(SoupSessionCallback) gst_soup_http_src_response_cb, src);
src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED;
g_cancellable_cancel (src->cancellable);
g_cond_signal (&src->have_headers_cond);
}
static gboolean
@ -906,19 +889,6 @@ gst_soup_http_src_add_extra_headers (GstSoupHTTPSrc * src)
return gst_structure_foreach (src->extra_headers, _append_extra_headers, src);
}
static void
gst_soup_http_src_session_unpause_message (GstSoupHTTPSrc * src)
{
soup_session_unpause_message (src->session, src->msg);
}
static void
gst_soup_http_src_session_pause_message (GstSoupHTTPSrc * src)
{
soup_session_pause_message (src->session, src->msg);
}
static gboolean
gst_soup_http_src_session_open (GstSoupHTTPSrc * src)
{
@ -933,32 +903,17 @@ gst_soup_http_src_session_open (GstSoupHTTPSrc * src)
return FALSE;
}
if (!src->context)
src->context = g_main_context_new ();
if (!src->loop)
src->loop = g_main_loop_new (src->context, TRUE);
if (!src->loop) {
GST_ELEMENT_ERROR (src, LIBRARY, INIT,
(NULL), ("Failed to start GMainLoop"));
g_main_context_unref (src->context);
return FALSE;
}
if (!src->session) {
GST_DEBUG_OBJECT (src, "Creating session");
if (src->proxy == NULL) {
src->session =
soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
src->context, SOUP_SESSION_USER_AGENT, src->user_agent,
SOUP_SESSION_TIMEOUT, src->timeout,
soup_session_new_with_options (SOUP_SESSION_USER_AGENT,
src->user_agent, SOUP_SESSION_TIMEOUT, src->timeout,
SOUP_SESSION_SSL_STRICT, src->ssl_strict,
SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_PROXY_RESOLVER_DEFAULT,
SOUP_SESSION_TLS_INTERACTION, src->tls_interaction, NULL);
} else {
src->session =
soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
src->context, SOUP_SESSION_PROXY_URI, src->proxy,
soup_session_new_with_options (SOUP_SESSION_PROXY_URI, src->proxy,
SOUP_SESSION_TIMEOUT, src->timeout,
SOUP_SESSION_SSL_STRICT, src->ssl_strict,
SOUP_SESSION_USER_AGENT, src->user_agent,
@ -996,22 +951,11 @@ gst_soup_http_src_session_open (GstSoupHTTPSrc * src)
return TRUE;
}
#ifdef LIBSOUP_DOES_NOT_STEAL_OUR_CONTEXT
static gboolean
dummy_idle_cb (gpointer data)
{
return FALSE /* Idle source is removed */ ;
}
#endif
static void
gst_soup_http_src_session_close (GstSoupHTTPSrc * src)
{
GST_DEBUG_OBJECT (src, "Closing session");
if (src->loop)
g_main_loop_quit (src->loop);
g_mutex_lock (&src->mutex);
if (src->session) {
soup_session_abort (src->session); /* This unrefs the message. */
@ -1019,33 +963,6 @@ gst_soup_http_src_session_close (GstSoupHTTPSrc * src)
src->session = NULL;
src->msg = NULL;
}
if (src->loop) {
#ifdef LIBSOUP_DOES_NOT_STEAL_OUR_CONTEXT
GSource *idle_source;
/* Iterating the main context to give GIO cancellables a chance
* to initiate cleanups. Wihout this, resources allocated by
* libsoup for the connection are not released and socket fd is
* leaked. */
idle_source = g_idle_source_new ();
/* Suppressing "idle souce without callback" warning */
g_source_set_callback (idle_source, dummy_idle_cb, NULL, NULL);
g_source_set_priority (idle_source, G_PRIORITY_LOW);
g_source_attach (idle_source, src->context);
/* Acquiring the context. Idle source guarantees that we'll not block. */
g_main_context_push_thread_default (src->context);
g_main_context_iteration (src->context, TRUE);
/* Ensuring that there's no unhandled pending events left. */
while (g_main_context_iteration (src->context, FALSE));
g_main_context_pop_thread_default (src->context);
g_source_unref (idle_source);
#endif
g_main_loop_unref (src->loop);
g_main_context_unref (src->context);
src->loop = NULL;
src->context = NULL;
}
g_mutex_unlock (&src->mutex);
}
@ -1100,7 +1017,7 @@ insert_http_header (const gchar * name, const gchar * value, gpointer user_data)
}
static void
gst_soup_http_src_got_headers_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg)
{
const char *value;
GstTagList *tag_list;
@ -1127,11 +1044,14 @@ gst_soup_http_src_got_headers_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
return;
}
if (msg->status_code == SOUP_STATUS_UNAUTHORIZED)
if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
/* force an error */
gst_soup_http_src_parse_status (msg, src);
return;
}
src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING;
src->got_headers = TRUE;
g_cond_broadcast (&src->have_headers_cond);
http_headers = gst_structure_new_empty ("http-headers");
gst_structure_set (http_headers, "uri", G_TYPE_STRING, src->location, NULL);
@ -1223,7 +1143,7 @@ gst_soup_http_src_got_headers_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
if (param != NULL)
rate = atol (param);
src->src_caps = gst_caps_new_simple ("audio/x-raw",
src->src_caps = gst_caps_new_simple ("audio/x-unaligned-raw",
"format", G_TYPE_STRING, "S16BE",
"layout", G_TYPE_STRING, "interleaved",
"channels", G_TYPE_INT, channels, "rate", G_TYPE_INT, rate, NULL);
@ -1299,251 +1219,23 @@ gst_soup_http_src_got_headers_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
* GST_FLOW_ERROR from the create function instead of having
* got_chunk_cb overwrite src->ret with FLOW_OK again. */
if (src->ret == GST_FLOW_ERROR || src->ret == GST_FLOW_EOS) {
gst_soup_http_src_session_pause_message (src);
if (src->loop)
g_main_loop_quit (src->loop);
}
g_cond_signal (&src->request_finished_cond);
}
/* Have body. Signal EOS. */
static void
gst_soup_http_src_got_body_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
static GstBuffer *
gst_soup_http_src_alloc_buffer (GstSoupHTTPSrc * src)
{
if (G_UNLIKELY (msg != src->msg)) {
GST_DEBUG_OBJECT (src, "got body, but not for current message");
return;
}
if (G_UNLIKELY (src->session_io_status !=
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
/* Probably a redirect. */
return;
}
GST_DEBUG_OBJECT (src, "got body");
src->ret = GST_FLOW_EOS;
src->have_body = TRUE;
/* no need to interrupt the message here, we do it on the
* finished_cb anyway if needed. And getting the body might mean
* that the connection was hang up before finished. This happens when
* the pipeline is stalled for too long (long pauses during playback).
* Best to let it continue from here and pause because it reached the
* final bytes based on content_size or received an out of range error */
}
/* Finished. Signal EOS. */
static void
gst_soup_http_src_finished_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
{
if (G_UNLIKELY (msg != src->msg)) {
GST_DEBUG_OBJECT (src, "finished, but not for current message");
return;
}
GST_INFO_OBJECT (src, "finished, io status: %d", src->session_io_status);
src->ret = GST_FLOW_EOS;
if (src->session_io_status == GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED) {
/* gst_soup_http_src_cancel_message() triggered this; probably a seek
* that occurred in the QUEUEING state; i.e. before the connection setup
* was complete. Do nothing */
GST_DEBUG_OBJECT (src, "cancelled");
} else if (src->session_io_status ==
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING && src->read_position > 0 &&
(src->have_size && src->read_position < src->content_size) &&
(src->max_retries == -1 || src->retry_count < src->max_retries)) {
/* The server disconnected while streaming. Reconnect and seeking to the
* last location. */
src->retry = TRUE;
src->retry_count++;
src->ret = GST_FLOW_CUSTOM_ERROR;
} else if (G_UNLIKELY (src->session_io_status !=
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
if (msg->method == SOUP_METHOD_HEAD) {
GST_DEBUG_OBJECT (src, "Ignoring error %d:%s during HEAD request",
msg->status_code, msg->reason_phrase);
} else {
gst_soup_http_src_parse_status (msg, src);
}
}
if (src->loop)
g_main_loop_quit (src->loop);
g_cond_signal (&src->request_finished_cond);
}
/* Buffer lifecycle management.
*
* gst_soup_http_src_create() runs the GMainLoop for this element, to let
* Soup take control.
* A GstBuffer is allocated in gst_soup_http_src_chunk_allocator() and
* associated with a SoupBuffer.
* Soup reads HTTP data in the GstBuffer's data buffer.
* The gst_soup_http_src_got_chunk_cb() is then called with the SoupBuffer.
* That sets gst_soup_http_src_create()'s return argument to the GstBuffer,
* increments its refcount (to 2), pauses the flow of data from the HTTP
* source to prevent gst_soup_http_src_got_chunk_cb() from being called
* again and breaks out of the GMainLoop.
* Because the SOUP_MESSAGE_OVERWRITE_CHUNKS flag is set, Soup frees the
* SoupBuffer and calls gst_soup_http_src_chunk_free(), which decrements the
* refcount (to 1).
* gst_soup_http_src_create() returns the GstBuffer. It will be freed by a
* downstream element.
* If Soup fails to read HTTP data, it does not call
* gst_soup_http_src_got_chunk_cb(), but still frees the SoupBuffer and
* calls gst_soup_http_src_chunk_free(), which decrements the GstBuffer's
* refcount to 0, freeing it.
*/
typedef struct
{
GstBuffer *buffer;
GstMapInfo map;
} SoupGstChunk;
static void
gst_soup_http_src_chunk_free (gpointer user_data)
{
SoupGstChunk *chunk = (SoupGstChunk *) user_data;
gst_buffer_unmap (chunk->buffer, &chunk->map);
gst_buffer_unref (chunk->buffer);
g_slice_free (SoupGstChunk, chunk);
}
static SoupBuffer *
gst_soup_http_src_chunk_allocator (SoupMessage * msg, gsize max_len,
gpointer user_data)
{
GstSoupHTTPSrc *src = (GstSoupHTTPSrc *) user_data;
GstBaseSrc *basesrc = GST_BASE_SRC_CAST (src);
GstBuffer *gstbuf;
SoupBuffer *soupbuf;
gsize length;
GstFlowReturn rc;
SoupGstChunk *chunk;
GstBuffer *gstbuf;
if (max_len)
length = MIN (basesrc->blocksize, max_len);
else
length = basesrc->blocksize;
GST_DEBUG_OBJECT (src, "alloc %" G_GSIZE_FORMAT " bytes <= %" G_GSIZE_FORMAT,
length, max_len);
rc = GST_BASE_SRC_CLASS (parent_class)->alloc (basesrc, -1, length, &gstbuf);
rc = GST_BASE_SRC_CLASS (parent_class)->alloc (basesrc, -1,
basesrc->blocksize, &gstbuf);
if (G_UNLIKELY (rc != GST_FLOW_OK)) {
/* Failed to allocate buffer. Stall SoupSession and return error code
* to create(). */
src->ret = rc;
g_main_loop_quit (src->loop);
return NULL;
}
chunk = g_slice_new0 (SoupGstChunk);
chunk->buffer = gstbuf;
gst_buffer_map (gstbuf, &chunk->map, GST_MAP_READWRITE);
soupbuf = soup_buffer_new_with_owner (chunk->map.data, chunk->map.size,
chunk, gst_soup_http_src_chunk_free);
return soupbuf;
}
static void
gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk,
GstSoupHTTPSrc * src)
{
GstBaseSrc *basesrc;
guint64 new_position;
SoupGstChunk *gchunk;
if (G_UNLIKELY (msg != src->msg)) {
GST_DEBUG_OBJECT (src, "got chunk, but not for current message");
return;
}
if (G_UNLIKELY (!src->outbuf)) {
GST_DEBUG_OBJECT (src, "got chunk but we're not expecting one");
src->ret = GST_FLOW_OK;
gst_soup_http_src_cancel_message (src);
g_main_loop_quit (src->loop);
return;
}
/* We got data, reset the retry counter */
src->retry_count = 0;
src->have_body = FALSE;
if (G_UNLIKELY (src->session_io_status !=
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
/* Probably a redirect. */
return;
}
basesrc = GST_BASE_SRC_CAST (src);
GST_DEBUG_OBJECT (src, "got chunk of %" G_GSIZE_FORMAT " bytes",
chunk->length);
/* Extract the GstBuffer from the SoupBuffer and set its fields. */
gchunk = (SoupGstChunk *) soup_buffer_get_owner (chunk);
*src->outbuf = gchunk->buffer;
gst_buffer_resize (*src->outbuf, 0, chunk->length);
GST_BUFFER_OFFSET (*src->outbuf) = basesrc->segment.position;
gst_buffer_ref (*src->outbuf);
new_position = src->read_position + chunk->length;
if (G_LIKELY (src->request_position == src->read_position))
src->request_position = new_position;
src->read_position = new_position;
if (src->have_size) {
if (new_position > src->content_size) {
GST_DEBUG_OBJECT (src, "Got position previous estimated content size "
"(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT ")", new_position,
src->content_size);
src->content_size = new_position;
basesrc->segment.duration = src->content_size;
gst_element_post_message (GST_ELEMENT (src),
gst_message_new_duration_changed (GST_OBJECT (src)));
} else if (new_position == src->content_size) {
GST_DEBUG_OBJECT (src, "We're EOS now");
}
}
src->ret = GST_FLOW_OK;
g_main_loop_quit (src->loop);
gst_soup_http_src_session_pause_message (src);
}
static void
gst_soup_http_src_response_cb (SoupSession * session, SoupMessage * msg,
GstSoupHTTPSrc * src)
{
if (G_UNLIKELY (msg != src->msg)) {
GST_DEBUG_OBJECT (src, "got response %d: %s, but not for current message",
msg->status_code, msg->reason_phrase);
return;
}
if (G_UNLIKELY (src->session_io_status !=
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)
&& SOUP_STATUS_IS_REDIRECTION (msg->status_code)) {
/* Ignore redirections. */
return;
}
GST_INFO_OBJECT (src, "got response %d: %s", msg->status_code,
msg->reason_phrase);
if (src->session_io_status == GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING &&
src->read_position > 0 && (src->have_size
&& src->read_position < src->content_size) &&
(src->max_retries == -1 || src->retry_count < src->max_retries)) {
/* The server disconnected while streaming. Reconnect and seeking to the
* last location. */
src->retry = TRUE;
src->retry_count++;
} else {
gst_soup_http_src_parse_status (msg, src);
}
/* The session's SoupMessage object expires after this callback returns. */
src->msg = NULL;
g_main_loop_quit (src->loop);
return gstbuf;
}
#define SOUP_HTTP_SRC_ERROR(src,soup_msg,cat,code,error_message) \
@ -1579,8 +1271,6 @@ gst_soup_http_src_parse_status (SoupMessage * msg, GstSoupHTTPSrc * src)
break;
case SOUP_STATUS_IO_ERROR:
if (src->max_retries == -1 || src->retry_count < src->max_retries) {
src->retry = TRUE;
src->retry_count++;
src->ret = GST_FLOW_CUSTOM_ERROR;
} else {
SOUP_HTTP_SRC_ERROR (src, msg, RESOURCE, READ,
@ -1653,7 +1343,6 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method)
("Error parsing URL."), ("URL: %s", src->location));
return FALSE;
}
src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
if (!src->keep_alive) {
soup_message_headers_append (src->msg->request_headers, "Connection",
"close");
@ -1670,20 +1359,9 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method)
*cookie);
}
}
src->retry = FALSE;
g_signal_connect (src->msg, "got_headers",
G_CALLBACK (gst_soup_http_src_got_headers_cb), src);
g_signal_connect (src->msg, "got_body",
G_CALLBACK (gst_soup_http_src_got_body_cb), src);
g_signal_connect (src->msg, "finished",
G_CALLBACK (gst_soup_http_src_finished_cb), src);
g_signal_connect (src->msg, "got_chunk",
G_CALLBACK (gst_soup_http_src_got_chunk_cb), src);
soup_message_set_flags (src->msg, SOUP_MESSAGE_OVERWRITE_CHUNKS |
(src->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
soup_message_set_chunk_allocator (src->msg,
gst_soup_http_src_chunk_allocator, src, NULL);
gst_soup_http_src_add_range_header (src, src->request_position,
src->stop_position);
@ -1692,146 +1370,291 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method)
return TRUE;
}
static GstFlowReturn
gst_soup_http_src_do_request (GstSoupHTTPSrc * src, const gchar * method,
GstBuffer ** outbuf)
static void
gst_soup_http_src_check_input_stream_interfaces (GstSoupHTTPSrc * src)
{
/* If we're not OK, just go out of here */
if (!src->input_stream)
return;
src->has_pollable_interface = G_IS_POLLABLE_INPUT_STREAM (src->input_stream)
&& g_pollable_input_stream_can_poll ((GPollableInputStream *)
src->input_stream);
}
static GstFlowReturn
gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
{
g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR);
g_assert (src->input_stream == NULL);
g_assert (src->poll_source == NULL);
/* FIXME We are ignoring the GError here, might be useful to debug */
src->input_stream =
soup_session_send (src->session, src->msg, src->cancellable, NULL);
if (g_cancellable_is_cancelled (src->cancellable))
return GST_FLOW_FLUSHING;
gst_soup_http_src_got_headers (src, src->msg);
if (src->ret != GST_FLOW_OK) {
GST_DEBUG_OBJECT (src, "Previous flow return not OK: %s",
gst_flow_get_name (src->ret));
return src->ret;
}
if (!src->input_stream) {
GST_DEBUG_OBJECT (src, "Didn't get an input stream");
return GST_FLOW_ERROR;
}
if (SOUP_STATUS_IS_SUCCESSFUL (src->msg->status_code)) {
GST_DEBUG_OBJECT (src, "Successfully got a reply");
} else {
/* FIXME - be more helpful to people debugging */
return GST_FLOW_ERROR;
}
gst_soup_http_src_check_input_stream_interfaces (src);
return GST_FLOW_OK;
}
static GstFlowReturn
gst_soup_http_src_do_request (GstSoupHTTPSrc * src, const gchar * method)
{
if (src->max_retries != -1 && src->retry_count > src->max_retries) {
GST_DEBUG_OBJECT (src, "Max retries reached");
src->ret = GST_FLOW_ERROR;
return src->ret;
}
src->retry_count++;
/* EOS immediately if we have an empty segment */
if (src->request_position == src->stop_position)
return GST_FLOW_EOS;
GST_LOG_OBJECT (src, "Running request for method: %s", method);
if (src->msg && (src->request_position != src->read_position)) {
if (src->session_io_status == GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE) {
/* EOS immediately if we have an empty segment */
if (src->request_position == src->stop_position)
return GST_FLOW_EOS;
/* Update the position if we are retrying */
if (src->msg && (src->request_position != src->read_position)) {
gst_soup_http_src_add_range_header (src, src->request_position,
src->stop_position);
} else {
GST_DEBUG_OBJECT (src, "Seek from position %" G_GUINT64_FORMAT
" to %" G_GUINT64_FORMAT ": requeueing connection request",
src->read_position, src->request_position);
gst_soup_http_src_cancel_message (src);
}
}
if (!src->msg) {
/* EOS immediately if we have an empty segment */
if (src->request_position == src->stop_position)
return GST_FLOW_EOS;
if (!gst_soup_http_src_build_message (src, method))
if (!src->msg) {
if (!gst_soup_http_src_build_message (src, method)) {
return GST_FLOW_ERROR;
}
}
src->ret = GST_FLOW_CUSTOM_ERROR;
src->outbuf = outbuf;
do {
if (src->interrupted) {
if (g_cancellable_is_cancelled (src->cancellable)) {
GST_INFO_OBJECT (src, "interrupted");
src->ret = GST_FLOW_FLUSHING;
break;
goto done;
}
if (src->retry) {
GST_INFO_OBJECT (src, "Reconnecting");
src->ret = gst_soup_http_src_send_message (src);
/* EOS immediately if we have an empty segment */
if (src->request_position == src->stop_position)
return GST_FLOW_EOS;
done:
return src->ret;
}
if (!gst_soup_http_src_build_message (src, method))
static void
gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read)
{
GstBaseSrc *basesrc = GST_BASE_SRC_CAST (src);
guint64 new_position;
new_position = src->read_position + bytes_read;
if (G_LIKELY (src->request_position == src->read_position))
src->request_position = new_position;
src->read_position = new_position;
if (src->have_size) {
if (new_position > src->content_size) {
GST_DEBUG_OBJECT (src, "Got position previous estimated content size "
"(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT ")", new_position,
src->content_size);
src->content_size = new_position;
basesrc->segment.duration = src->content_size;
gst_element_post_message (GST_ELEMENT (src),
gst_message_new_duration_changed (GST_OBJECT (src)));
} else if (new_position == src->content_size) {
GST_DEBUG_OBJECT (src, "We're EOS now");
}
}
}
static gboolean
_gst_soup_http_src_data_available_callback (GObject * pollable_stream,
gpointer udata)
{
GstSoupHTTPSrc *src = udata;
src->have_data = TRUE;
return TRUE;
}
/* Need to wait on a gsource to know when data is available */
static gboolean
gst_soup_http_src_wait_for_data (GstSoupHTTPSrc * src)
{
src->have_data = FALSE;
if (!src->poll_source) {
src->poll_source =
g_pollable_input_stream_create_source ((GPollableInputStream *)
src->input_stream, src->cancellable);
g_source_set_callback (src->poll_source,
(GSourceFunc) _gst_soup_http_src_data_available_callback, src, NULL);
g_source_attach (src->poll_source, src->poll_context);
}
while (!src->have_data && !g_cancellable_is_cancelled (src->cancellable)) {
g_main_context_iteration (src->poll_context, TRUE);
}
return src->have_data;
}
static GstFlowReturn
gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
{
gssize read_bytes;
GstMapInfo mapinfo;
GstBaseSrc *bsrc;
GstFlowReturn ret;
GError *err = NULL;
bsrc = GST_BASE_SRC_CAST (src);
*outbuf = gst_soup_http_src_alloc_buffer (src);
if (!*outbuf) {
GST_WARNING_OBJECT (src, "Failed to allocate buffer");
return GST_FLOW_ERROR;
src->retry = FALSE;
}
if (!gst_buffer_map (*outbuf, &mapinfo, GST_MAP_WRITE)) {
GST_WARNING_OBJECT (src, "Failed to map buffer");
return GST_FLOW_ERROR;
}
if (src->has_pollable_interface) {
while (1) {
read_bytes =
g_pollable_input_stream_read_nonblocking ((GPollableInputStream *)
src->input_stream, mapinfo.data, mapinfo.size, src->cancellable,
&err);
if (read_bytes == -1) {
if (err && g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_error_free (err);
err = NULL;
/* no data yet, wait */
if (gst_soup_http_src_wait_for_data (src))
/* retry */
continue;
}
if (!src->msg) {
GST_DEBUG_OBJECT (src, "EOS reached");
}
break;
}
switch (src->session_io_status) {
case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE:
GST_INFO_OBJECT (src, "Queueing connection request");
gst_soup_http_src_queue_message (src);
break;
case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED:
break;
case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING:
gst_soup_http_src_session_unpause_message (src);
break;
case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED:
/* Impossible. */
break;
} else {
read_bytes =
g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
src->cancellable, NULL);
}
if (src->ret == GST_FLOW_CUSTOM_ERROR) {
g_main_context_push_thread_default (src->context);
g_main_loop_run (src->loop);
g_main_context_pop_thread_default (src->context);
}
if (err)
g_error_free (err);
} while (src->ret == GST_FLOW_CUSTOM_ERROR);
GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input",
read_bytes);
/* Let the request finish if we had a stop position and are there */
if (src->ret == GST_FLOW_OK && src->stop_position != -1
&& src->read_position >= src->stop_position) {
src->outbuf = NULL;
gst_soup_http_src_session_unpause_message (src);
g_main_context_push_thread_default (src->context);
g_main_loop_run (src->loop);
g_main_context_pop_thread_default (src->context);
g_cond_signal (&src->request_finished_cond);
/* Return OK unconditionally here, src->ret will
* be most likely be EOS now but we want to
* consume the buffer we got above */
return GST_FLOW_OK;
}
if (src->ret == GST_FLOW_CUSTOM_ERROR)
src->ret = GST_FLOW_EOS;
g_cond_signal (&src->request_finished_cond);
/* basesrc assumes that we don't return a buffer if
* something else than OK is returned. It will just
* leak any buffer we might accidentially provide
* here.
*
* This can potentially happen during flushing.
*/
if (src->ret != GST_FLOW_OK && outbuf && *outbuf) {
g_mutex_lock (&src->mutex);
if (g_cancellable_is_cancelled (src->cancellable)) {
gst_buffer_unmap (*outbuf, &mapinfo);
gst_buffer_unref (*outbuf);
*outbuf = NULL;
g_mutex_unlock (&src->mutex);
return GST_FLOW_FLUSHING;
}
g_mutex_unlock (&src->mutex);
return src->ret;
gst_buffer_unmap (*outbuf, &mapinfo);
if (read_bytes > 0) {
gst_buffer_set_size (*outbuf, read_bytes);
GST_BUFFER_OFFSET (*outbuf) = bsrc->segment.position;
ret = GST_FLOW_OK;
gst_soup_http_src_update_position (src, read_bytes);
/* Got some data, reset retry counter */
src->retry_count = 0;
} else {
gst_buffer_unref (*outbuf);
if (read_bytes < 0) {
/* Maybe the server disconnected, retry */
ret = GST_FLOW_CUSTOM_ERROR;
} else {
ret = GST_FLOW_EOS;
src->have_body = TRUE;
}
}
return ret;
}
static GstFlowReturn
gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstSoupHTTPSrc *src;
GstFlowReturn ret;
GstEvent *http_headers_event;
GstFlowReturn ret = GST_FLOW_OK;
GstEvent *http_headers_event = NULL;
src = GST_SOUP_HTTP_SRC (psrc);
retry:
g_mutex_lock (&src->mutex);
/* Check for pending position change */
if (src->request_position != src->read_position) {
gst_soup_http_src_destroy_input_stream (src);
}
if (g_cancellable_is_cancelled (src->cancellable)) {
ret = GST_FLOW_FLUSHING;
g_mutex_unlock (&src->mutex);
goto done;
}
/* If we have no open connection to the server, start one */
if (!src->input_stream) {
*outbuf = NULL;
ret =
gst_soup_http_src_do_request (src,
src->method ? src->method : SOUP_METHOD_GET, outbuf);
src->method ? src->method : SOUP_METHOD_GET);
http_headers_event = src->http_headers_event;
src->http_headers_event = NULL;
}
g_mutex_unlock (&src->mutex);
if (http_headers_event)
if (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_ERROR) {
if (http_headers_event) {
gst_pad_push_event (GST_BASE_SRC_PAD (src), http_headers_event);
http_headers_event = NULL;
}
}
if (ret == GST_FLOW_OK)
ret = gst_soup_http_src_read_buffer (src, outbuf);
done:
GST_DEBUG_OBJECT (src, "Returning %d %s", ret, gst_flow_get_name (ret));
if (ret != GST_FLOW_OK) {
if (http_headers_event)
gst_event_unref (http_headers_event);
g_mutex_lock (&src->mutex);
gst_soup_http_src_destroy_input_stream (src);
g_mutex_unlock (&src->mutex);
if (ret == GST_FLOW_CUSTOM_ERROR)
goto retry;
}
return ret;
}
@ -1891,11 +1714,8 @@ gst_soup_http_src_unlock (GstBaseSrc * bsrc)
src = GST_SOUP_HTTP_SRC (bsrc);
GST_DEBUG_OBJECT (src, "unlock()");
src->interrupted = TRUE;
src->ret = GST_FLOW_FLUSHING;
if (src->loop)
g_main_loop_quit (src->loop);
g_cond_signal (&src->request_finished_cond);
gst_soup_http_src_cancel_message (src);
return TRUE;
}
@ -1908,8 +1728,8 @@ gst_soup_http_src_unlock_stop (GstBaseSrc * bsrc)
src = GST_SOUP_HTTP_SRC (bsrc);
GST_DEBUG_OBJECT (src, "unlock_stop()");
src->interrupted = FALSE;
src->ret = GST_FLOW_OK;
g_cancellable_reset (src->cancellable);
return TRUE;
}
@ -1941,14 +1761,14 @@ gst_soup_http_src_check_seekable (GstSoupHTTPSrc * src)
*/
if (!src->got_headers && GST_STATE (src) >= GST_STATE_PAUSED) {
g_mutex_lock (&src->mutex);
while (!src->got_headers && !src->interrupted && ret == GST_FLOW_OK) {
if ((src->msg && src->msg->method != SOUP_METHOD_HEAD) &&
src->session_io_status != GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE) {
while (!src->got_headers && !g_cancellable_is_cancelled (src->cancellable)
&& ret == GST_FLOW_OK) {
if ((src->msg && src->msg->method != SOUP_METHOD_HEAD)) {
/* wait for the current request to finish */
g_cond_wait (&src->request_finished_cond, &src->mutex);
g_cond_wait (&src->have_headers_cond, &src->mutex);
} else {
if (gst_soup_http_src_session_open (src)) {
ret = gst_soup_http_src_do_request (src, SOUP_METHOD_HEAD, NULL);
ret = gst_soup_http_src_do_request (src, SOUP_METHOD_HEAD);
}
}
}
@ -1956,8 +1776,6 @@ gst_soup_http_src_check_seekable (GstSoupHTTPSrc * src)
/* A HEAD request shouldn't lead to EOS */
src->ret = GST_FLOW_OK;
}
/* resets status to idle */
gst_soup_http_src_cancel_message (src);
g_mutex_unlock (&src->mutex);
}
}

View file

@ -59,16 +59,9 @@ struct _GstSoupHTTPSrc {
gchar *proxy_id; /* Authentication user id for proxy URI. */
gchar *proxy_pw; /* Authentication user password for proxy URI. */
gchar **cookies; /* HTTP request cookies. */
GMainContext *context; /* I/O context. */
GMainLoop *loop; /* Event loop. */
SoupSession *session; /* Async context. */
GstSoupHTTPSrcSessionIOStatus session_io_status;
/* Async I/O status. */
SoupMessage *msg; /* Request message. */
GstFlowReturn ret; /* Return code from callback. */
GstBuffer **outbuf; /* Return buffer allocated by callback. */
gboolean interrupted; /* Signal unlock(). */
gboolean retry; /* Should attempt to reconnect. */
gint retry_count; /* Number of retries since we received data */
gint max_retries; /* Maximum number of retries */
gchar *method; /* HTTP method */
@ -94,6 +87,13 @@ struct _GstSoupHTTPSrc {
GTlsDatabase *tls_database;
GTlsInteraction *tls_interaction;
GCancellable *cancellable;
GInputStream *input_stream;
gboolean has_pollable_interface;
gboolean have_data;
GMainContext *poll_context;
GSource *poll_source;
/* Shoutcast/icecast metadata extraction handling. */
gboolean iradio_mode;
GstCaps *src_caps;
@ -110,7 +110,7 @@ struct _GstSoupHTTPSrc {
guint timeout;
GMutex mutex;
GCond request_finished_cond;
GCond have_headers_cond;
GstEvent *http_headers_event;
};