From 0bcefa7350ef8fc7c9b222cf1750a67eb49304f6 Mon Sep 17 00:00:00 2001 From: Daniel Kolesa Date: Fri, 21 Jan 2022 16:09:30 +0100 Subject: [PATCH] soup: move libsoup session into its own thread Starting with libsoup3, there is no attempt to handle thread safety inside the library, and it was never considered fully safe before either. Therefore, move all session handling into its own thread. The libsoup thread has its own context and main loop. When some request is made or a response needs to be read, an idle source is created to issue that; the gstreamer thread issuing that waits for that to be complete. There is a per-src condition variable to deal with that. Since the thread/loop needs to be longer-lived than the soup session itself, a wrapper object is provided to contain them. The soup session only has a single reference, owned by the wrapper object. It is no longer possible to force an external session, since this does not seem to be used anywhere within gstreamer and would be tricky to implement; this is because one would not have to provide just a session, but also the complete thread arrangement made in the same way as the system currently does internally, in order to be safe. Messages are still built gstreamer-side. It is safe to do so until the message is sent on the session. Headers are also processed on the gstreamer side, which should likewise be safe. All requests as well as reads on the libsoup thread are issued asynchronously. That allows libsoup to schedule things with as little blocking as possible, and means that concurrent access to the session is possible, when sharing the session. Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/947 Part-of: --- .../ext/soup/gstsouphttpsrc.c | 786 +++++++++++++----- .../ext/soup/gstsouphttpsrc.h | 17 +- .../gst-plugins-good/ext/soup/gstsouploader.c | 32 + .../gst-plugins-good/ext/soup/gstsouploader.h | 6 + 4 files changed, 607 insertions(+), 234 deletions(-) diff --git a/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.c b/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.c index 8496e8343f..ec44d48bff 100644 --- a/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.c +++ b/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.c @@ -84,6 +84,83 @@ #include +/* this is a simple wrapper class around SoupSession; it exists in order to + * have a refcountable owner for the actual SoupSession + the thread it runs + * in and its main loop (we cannot inverse the ownership hierarchy, because + * the thread + loop are actually longer lived than the session) + * + * it is entirely private to this implementation + */ + +#define GST_TYPE_SOUP_SESSION (gst_soup_session_get_type()) +#define GST_SOUP_SESSION(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SOUP_SESSION, GstSoupSession)) + +GType gst_soup_session_get_type (void); + +typedef struct _GstSoupSessionClass GstSoupSessionClass; + +struct _GstSoupSession +{ + GObject parent_instance; + + SoupSession *session; + GThread *thread; + GMainLoop *loop; +}; + +struct _GstSoupSessionClass +{ + GObjectClass parent_class; +}; + +G_DEFINE_TYPE (GstSoupSession, gst_soup_session, G_TYPE_OBJECT); + +static void +gst_soup_session_init (GstSoupSession * sess) +{ +} + +static gboolean +_soup_session_finalize_cb (gpointer user_data) +{ + GstSoupSession *sess = user_data; + + g_clear_object (&sess->session); + g_main_loop_quit (sess->loop); + + return FALSE; +} + +static void +gst_soup_session_finalize (GObject * obj) +{ + GstSoupSession *sess = GST_SOUP_SESSION (obj); + GSource *src; + + /* handle disposing of failure cases */ + if (!sess->loop) + return; + + src = g_idle_source_new (); + + g_source_set_callback (src, _soup_session_finalize_cb, sess, NULL); + g_source_attach (src, g_main_loop_get_context (sess->loop)); + g_source_unref (src); + + /* finish off thread and the loop; ensure it's not from the thread */ + g_assert (!g_main_context_is_owner (g_main_loop_get_context (sess->loop))); + g_thread_join (sess->thread); + g_main_loop_unref (sess->loop); +} + +static void +gst_soup_session_class_init (GstSoupSessionClass * klass) +{ + GObjectClass *gclass = G_OBJECT_CLASS (klass); + + gclass->finalize = gst_soup_session_finalize; +} + GST_DEBUG_CATEGORY_STATIC (souphttpsrc_debug); #define GST_CAT_DEFAULT souphttpsrc_debug @@ -176,7 +253,6 @@ static gboolean gst_soup_http_src_set_proxy (GstSoupHTTPSrc * src, static char *gst_soup_http_src_unicodify (const char *str); static gboolean gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method); -static void gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src); static gboolean gst_soup_http_src_add_range_header (GstSoupHTTPSrc * src, guint64 offset, guint64 stop_offset); static gboolean gst_soup_http_src_session_open (GstSoupHTTPSrc * src); @@ -455,6 +531,7 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src) src->retry_count = 0; src->have_size = FALSE; src->got_headers = FALSE; + src->headers_ret = GST_FLOW_OK; src->seekable = FALSE; src->read_position = 0; src->request_position = 0; @@ -467,12 +544,6 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src) src->last_socket_read_time = 0; g_cancellable_reset (src->cancellable); - g_mutex_lock (&src->mutex); - if (src->input_stream) { - g_object_unref (src->input_stream); - src->input_stream = NULL; - } - g_mutex_unlock (&src->mutex); gst_caps_replace (&src->src_caps, NULL); g_free (src->iradio_name); @@ -488,8 +559,8 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src) { const gchar *proxy; - g_mutex_init (&src->mutex); - g_cond_init (&src->have_headers_cond); + g_mutex_init (&src->session_mutex); + g_cond_init (&src->session_cond); src->cancellable = g_cancellable_new (); src->location = NULL; src->redirection_uri = NULL; @@ -503,7 +574,6 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src) src->iradio_mode = DEFAULT_IRADIO_MODE; src->session = NULL; src->external_session = NULL; - src->forced_external_session = FALSE; src->msg = NULL; src->timeout = DEFAULT_TIMEOUT; src->log_level = DEFAULT_SOUP_LOG_LEVEL; @@ -537,10 +607,7 @@ gst_soup_http_src_dispose (GObject * gobject) gst_soup_http_src_session_close (src); - if (src->external_session) { - g_object_unref (src->external_session); - src->external_session = NULL; - } + g_clear_object (&src->external_session); G_OBJECT_CLASS (parent_class)->dispose (gobject); } @@ -552,8 +619,8 @@ gst_soup_http_src_finalize (GObject * gobject) GST_DEBUG_OBJECT (src, "finalize"); - g_mutex_clear (&src->mutex); - g_cond_clear (&src->have_headers_cond); + g_mutex_clear (&src->session_mutex); + g_cond_clear (&src->session_cond); g_object_unref (src->cancellable); g_free (src->location); g_free (src->redirection_uri); @@ -809,13 +876,6 @@ gst_soup_http_src_unicodify (const gchar * str) return gst_tag_freeform_string_to_utf8 (str, -1, env_vars); } -static void -gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src) -{ - g_cancellable_cancel (src->cancellable); - g_cond_signal (&src->have_headers_cond); -} - static gboolean gst_soup_http_src_add_range_header (GstSoupHTTPSrc * src, guint64 offset, guint64 stop_offset) @@ -921,10 +981,136 @@ gst_soup_http_src_add_extra_headers (GstSoupHTTPSrc * src) return gst_structure_foreach (src->extra_headers, _append_extra_headers, src); } +static gpointer +thread_func (gpointer user_data) +{ + GstSoupHTTPSrc *src = user_data; + GMainContext *ctx; + + GST_DEBUG_OBJECT (src, "thread start"); + + ctx = g_main_loop_get_context (src->session->loop); + + g_main_context_push_thread_default (ctx); + + /* We explicitly set User-Agent to NULL here and overwrite it per message + * to be able to have the same session with different User-Agents per + * source */ + src->session->session = + _soup_session_new_with_options ("user-agent", NULL, + "timeout", src->timeout, "tls-interaction", src->tls_interaction, + /* Unset the limit the number of maximum allowed connections */ + "max-conns", src->session_is_shared ? G_MAXINT : 10, + "max-conns-per-host", src->session_is_shared ? G_MAXINT : 2, NULL); + + if (!src->session->session) { + return NULL; + } + + if (gst_soup_loader_get_api_version () == 3) { + if (src->proxy != NULL) { + GProxyResolver *proxy_resolver; + char *proxy_string = gst_soup_uri_to_string (src->proxy); + proxy_resolver = g_simple_proxy_resolver_new (proxy_string, NULL); + g_free (proxy_string); + g_object_set (src->session->session, "proxy-resolver", proxy_resolver, + NULL); + g_object_unref (proxy_resolver); + } + } else { + g_object_set (src->session->session, "ssl-strict", src->ssl_strict, NULL); + if (src->proxy != NULL) { + g_object_set (src->session->session, "proxy-uri", src->proxy->soup_uri, + NULL); + } + } + + gst_soup_util_log_setup (src->session->session, src->log_level, + GST_ELEMENT (src)); + if (gst_soup_loader_get_api_version () < 3) { + _soup_session_add_feature_by_type (src->session->session, + _soup_content_decoder_get_type ()); + } + _soup_session_add_feature_by_type (src->session->session, + _soup_cookie_jar_get_type ()); + + if (src->session_is_shared) { + GstContext *context; + GstMessage *message; + GstStructure *s; + + GST_DEBUG_OBJECT (src, "Sharing session %p", src->session->session); + + context = gst_context_new (GST_SOUP_SESSION_CONTEXT, TRUE); + s = gst_context_writable_structure (context); + gst_structure_set (s, "session", GST_TYPE_SOUP_SESSION, src->session, NULL); + + /* during this time the src is locked by the parent thread, + * which is waiting, so this is safe to do + */ + GST_OBJECT_UNLOCK (src); + gst_element_set_context (GST_ELEMENT_CAST (src), context); + message = gst_message_new_have_context (GST_OBJECT_CAST (src), context); + gst_element_post_message (GST_ELEMENT_CAST (src), message); + GST_OBJECT_LOCK (src); + } else { + src->session_is_shared = FALSE; + } + + /* soup2: connect the authenticate handler for the src that spawned the + * session (i.e. the first owner); other users of this session will connect + * their own after fetching the external session; the callback will handle + * this correctly (it checks if the message belongs to the current src + * and exits early if it does not) + */ + if (gst_soup_loader_get_api_version () < 3) { + g_signal_connect (src->session->session, "authenticate", + G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src); + } + + if (!src->session_is_shared) { + if (src->tls_database) + g_object_set (src->session->session, "tls-database", src->tls_database, + NULL); + else if (gst_soup_loader_get_api_version () == 2) { + if (src->ssl_ca_file) + g_object_set (src->session->session, "ssl-ca-file", src->ssl_ca_file, + NULL); + else + g_object_set (src->session->session, "ssl-use-system-ca-file", + src->ssl_use_system_ca_file, NULL); + } + } + + g_main_loop_run (src->session->loop); + + g_main_context_pop_thread_default (ctx); + + GST_DEBUG_OBJECT (src, "thread stop"); + + return NULL; +} + +static gboolean +_session_ready_cb (gpointer user_data) +{ + GstSoupHTTPSrc *src = user_data; + + GST_DEBUG_OBJECT (src, "thread ready"); + + g_mutex_lock (&src->session_mutex); + g_cond_signal (&src->session_cond); + g_mutex_unlock (&src->session_mutex); + + return FALSE; +} + +/* called with session_mutex taken */ static gboolean gst_soup_http_src_session_open (GstSoupHTTPSrc * src) { - GProxyResolver *proxy_resolver; + GstQuery *query; + gboolean can_share; if (src->session) { GST_DEBUG_OBJECT (src, "Session is already open"); @@ -937,158 +1123,148 @@ gst_soup_http_src_session_open (GstSoupHTTPSrc * src) return FALSE; } - if (!src->session) { - GstQuery *query; - gboolean can_share = (src->timeout == DEFAULT_TIMEOUT) - && (src->cookies == NULL) - && (src->ssl_strict == DEFAULT_SSL_STRICT) - && (src->tls_interaction == NULL) && (src->proxy == NULL) - && (src->tls_database == DEFAULT_TLS_DATABASE); + can_share = (src->timeout == DEFAULT_TIMEOUT) + && (src->cookies == NULL) + && (src->ssl_strict == DEFAULT_SSL_STRICT) + && (src->tls_interaction == NULL) && (src->proxy == NULL) + && (src->tls_database == DEFAULT_TLS_DATABASE); - if (gst_soup_loader_get_api_version () == 2) - can_share = can_share && (src->ssl_ca_file == DEFAULT_SSL_CA_FILE) && - (src->ssl_use_system_ca_file == DEFAULT_SSL_USE_SYSTEM_CA_FILE); + if (gst_soup_loader_get_api_version () == 2) + can_share = can_share && (src->ssl_ca_file == DEFAULT_SSL_CA_FILE) && + (src->ssl_use_system_ca_file == DEFAULT_SSL_USE_SYSTEM_CA_FILE); - query = gst_query_new_context (GST_SOUP_SESSION_CONTEXT); - if (gst_pad_peer_query (GST_BASE_SRC_PAD (src), query)) { - GstContext *context; + query = gst_query_new_context (GST_SOUP_SESSION_CONTEXT); + if (gst_pad_peer_query (GST_BASE_SRC_PAD (src), query)) { + GstContext *context; - gst_query_parse_context (query, &context); - gst_element_set_context (GST_ELEMENT_CAST (src), context); - } else { - GstMessage *message; + gst_query_parse_context (query, &context); + gst_element_set_context (GST_ELEMENT_CAST (src), context); + } else { + GstMessage *message; - message = - gst_message_new_need_context (GST_OBJECT_CAST (src), - GST_SOUP_SESSION_CONTEXT); - gst_element_post_message (GST_ELEMENT_CAST (src), message); - } - gst_query_unref (query); + message = + gst_message_new_need_context (GST_OBJECT_CAST (src), + GST_SOUP_SESSION_CONTEXT); + gst_element_post_message (GST_ELEMENT_CAST (src), message); + } + gst_query_unref (query); - GST_OBJECT_LOCK (src); - if (src->external_session && (can_share || src->forced_external_session)) { - GST_DEBUG_OBJECT (src, "Using external session %p", - src->external_session); - src->session = g_object_ref (src->external_session); - src->session_is_shared = TRUE; - } else { - GST_DEBUG_OBJECT (src, "Creating session (can share %d)", can_share); + GST_OBJECT_LOCK (src); - /* We explicitly set User-Agent to NULL here and overwrite it per message - * to be able to have the same session with different User-Agents per - * source */ - src->session = - _soup_session_new_with_options ("user-agent", NULL, - "timeout", src->timeout, "tls-interaction", src->tls_interaction, - /* Unset the limit the number of maximum allowed connections */ - "max-conns", can_share ? G_MAXINT : 10, - "max-conns-per-host", can_share ? G_MAXINT : 2, NULL); - - if (gst_soup_loader_get_api_version () == 3) { - if (src->proxy != NULL) { - char *proxy_string = gst_soup_uri_to_string (src->proxy); - proxy_resolver = g_simple_proxy_resolver_new (proxy_string, NULL); - g_free (proxy_string); - g_object_set (src->session, "proxy-resolver", proxy_resolver, NULL); - g_object_unref (proxy_resolver); - } - } else { - g_object_set (src->session, "ssl-strict", src->ssl_strict, NULL); - if (src->proxy != NULL) { - g_object_set (src->session, "proxy-uri", src->proxy->soup_uri, NULL); - } - } - - if (src->session) { - gst_soup_util_log_setup (src->session, src->log_level, - GST_ELEMENT (src)); - if (gst_soup_loader_get_api_version () < 3) { - _soup_session_add_feature_by_type (src->session, - _soup_content_decoder_get_type ()); - } - _soup_session_add_feature_by_type (src->session, - _soup_cookie_jar_get_type ()); - - if (can_share) { - GstContext *context; - GstMessage *message; - GstStructure *s; - - GST_DEBUG_OBJECT (src, "Sharing session %p", src->session); - src->session_is_shared = TRUE; - - context = gst_context_new (GST_SOUP_SESSION_CONTEXT, TRUE); - s = gst_context_writable_structure (context); - gst_structure_set (s, "session", _soup_session_get_type (), - src->session, "force", G_TYPE_BOOLEAN, FALSE, NULL); - - gst_object_ref (src->session); - GST_OBJECT_UNLOCK (src); - gst_element_set_context (GST_ELEMENT_CAST (src), context); - message = - gst_message_new_have_context (GST_OBJECT_CAST (src), context); - gst_element_post_message (GST_ELEMENT_CAST (src), message); - GST_OBJECT_LOCK (src); - gst_object_unref (src->session); - } else { - src->session_is_shared = FALSE; - } - } - } - - if (!src->session) { - GST_ELEMENT_ERROR (src, LIBRARY, INIT, - (NULL), ("Failed to create session")); - GST_OBJECT_UNLOCK (src); - return FALSE; - } + src->session_is_shared = can_share; + if (src->external_session && can_share) { + GST_DEBUG_OBJECT (src, "Using external session %p", src->external_session); + src->session = g_object_ref (src->external_session); + /* for soup2, connect another authenticate handler; see thread_func */ if (gst_soup_loader_get_api_version () < 3) { - g_signal_connect (src->session, "authenticate", + g_signal_connect (src->session->session, "authenticate", G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src); } - - if (!src->session_is_shared) { - if (src->tls_database) - g_object_set (src->session, "tls-database", src->tls_database, NULL); - else if (gst_soup_loader_get_api_version () == 2) { - if (src->ssl_ca_file) - g_object_set (src->session, "ssl-ca-file", src->ssl_ca_file, NULL); - else - g_object_set (src->session, "ssl-use-system-ca-file", - src->ssl_use_system_ca_file, NULL); - } - } - GST_OBJECT_UNLOCK (src); } else { - GST_DEBUG_OBJECT (src, "Re-using session"); + GMainContext *ctx; + GSource *source; + + GST_DEBUG_OBJECT (src, "Creating session (can share %d)", can_share); + + src->session = + GST_SOUP_SESSION (g_object_new (GST_TYPE_SOUP_SESSION, NULL)); + + ctx = g_main_context_new (); + + src->session->loop = g_main_loop_new (ctx, FALSE); + /* now owned by the loop */ + g_main_context_unref (ctx); + + src->session->thread = g_thread_try_new ("souphttpsrc-thread", + thread_func, src, NULL); + + if (!src->session->thread) { + goto err; + } + + source = g_idle_source_new (); + g_source_set_callback (source, _session_ready_cb, src, NULL); + g_source_attach (source, ctx); + g_source_unref (source); + + GST_DEBUG_OBJECT (src, "Waiting for thread to start..."); + while (!g_main_loop_is_running (src->session->loop)) + g_cond_wait (&src->session_cond, &src->session_mutex); + GST_DEBUG_OBJECT (src, "Soup thread started"); } + GST_OBJECT_UNLOCK (src); + return TRUE; + +err: + g_clear_object (&src->session); + GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL), ("Failed to create session")); + GST_OBJECT_UNLOCK (src); + + return FALSE; +} + +static gboolean +_session_close_cb (gpointer user_data) +{ + GstSoupHTTPSrc *src = user_data; + + if (src->msg) { + gst_soup_session_cancel_message (src->session->session, src->msg, + src->cancellable); + g_clear_object (&src->msg); + } + + if (src->session_is_shared) + _soup_session_abort (src->session->session); + + /* there may be multiple of this callback attached to the session, + * each with different data pointer; disconnect the one we are closing + * the session for, leave the others alone + */ + g_signal_handlers_disconnect_by_func (src->session->session, + G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src); + + g_mutex_lock (&src->session_mutex); + g_clear_object (&src->session); + g_cond_signal (&src->session_cond); + g_mutex_unlock (&src->session_mutex); + + return FALSE; } static void gst_soup_http_src_session_close (GstSoupHTTPSrc * src) { + GSource *source; + GstSoupSession *sess; + GST_DEBUG_OBJECT (src, "Closing session"); - g_mutex_lock (&src->mutex); - if (src->msg) { - gst_soup_session_cancel_message (src->session, src->msg, src->cancellable); - g_object_unref (src->msg); - src->msg = NULL; + if (!src->session) { + return; } - if (src->session) { - if (!src->session_is_shared) - _soup_session_abort (src->session); - g_signal_handlers_disconnect_by_func (src->session, - G_CALLBACK (gst_soup_http_src_authenticate_cb), src); - g_object_unref (src->session); - src->session = NULL; - } + /* ensure _session_close_cb does not deadlock us */ + sess = g_object_ref (src->session); - g_mutex_unlock (&src->mutex); + source = g_idle_source_new (); + + g_mutex_lock (&src->session_mutex); + + g_source_set_callback (source, _session_close_cb, src, NULL); + g_source_attach (source, g_main_loop_get_context (src->session->loop)); + g_source_unref (source); + + while (src->session) + g_cond_wait (&src->session_cond, &src->session_mutex); + + g_mutex_unlock (&src->session_mutex); + + /* finally dispose of our reference from the gst thread */ + g_object_unref (sess); } static void @@ -1235,7 +1411,6 @@ gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg) } src->got_headers = TRUE; - g_cond_broadcast (&src->have_headers_cond); http_headers_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, http_headers); @@ -1610,51 +1785,98 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method) return TRUE; } -/* Lock taken */ +struct GstSoupSendSrc +{ + GstSoupHTTPSrc *src; + GError *error; +}; + +static void +_session_send_cb (GObject * source, GAsyncResult * res, gpointer user_data) +{ + struct GstSoupSendSrc *msrc = user_data; + GstSoupHTTPSrc *src = msrc->src; + GError *error = NULL; + + g_mutex_lock (&src->session_mutex); + + src->input_stream = _soup_session_send_finish (src->session->session, + res, &error); + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + src->headers_ret = GST_FLOW_FLUSHING; + } else { + src->headers_ret = gst_soup_http_src_got_headers (src, src->msg); + } + + if (!src->input_stream) { + GST_DEBUG_OBJECT (src, "Sending message failed: %s", error->message); + msrc->error = error; + } + + g_cond_broadcast (&src->session_cond); + g_mutex_unlock (&src->session_mutex); +} + +static gboolean +_session_send_idle_cb (gpointer user_data) +{ + struct GstSoupSendSrc *msrc = user_data; + GstSoupHTTPSrc *src = msrc->src; + + _soup_session_send_async (src->session->session, src->msg, src->cancellable, + _session_send_cb, msrc); + + return FALSE; +} + +/* called with session lock taken */ static GstFlowReturn gst_soup_http_src_send_message (GstSoupHTTPSrc * src) { GstFlowReturn ret; - GError *error = NULL; + GSource *source; + struct GstSoupSendSrc msrc; g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR); g_assert (src->input_stream == NULL); - src->input_stream = - _soup_session_send (src->session, src->msg, src->cancellable, &error); + msrc.src = src; + msrc.error = NULL; - if (error) - GST_DEBUG_OBJECT (src, "Sending message failed: %s", error->message); + source = g_idle_source_new (); - if (g_cancellable_is_cancelled (src->cancellable)) { - ret = GST_FLOW_FLUSHING; - goto done; - } + src->headers_ret = GST_FLOW_OK; + + g_source_set_callback (source, _session_send_idle_cb, &msrc, NULL); + g_source_attach (source, g_main_loop_get_context (src->session->loop)); + g_source_unref (source); + + while (!src->input_stream && !msrc.error) + g_cond_wait (&src->session_cond, &src->session_mutex); + + ret = src->headers_ret; - ret = gst_soup_http_src_got_headers (src, src->msg); if (ret != GST_FLOW_OK) { goto done; } if (!src->input_stream) { - GST_DEBUG_OBJECT (src, "Didn't get an input stream: %s", error->message); + GST_DEBUG_OBJECT (src, "Didn't get an input stream: %s", + msrc.error->message); ret = GST_FLOW_ERROR; goto done; } - if (SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (src->msg))) { - GST_DEBUG_OBJECT (src, "Successfully got a reply"); - } else { - /* FIXME - be more helpful to people debugging */ - ret = GST_FLOW_ERROR; - } + /* if an input stream exists, it was always successful */ + GST_DEBUG_OBJECT (src, "Successfully got a reply"); done: - if (error) - g_error_free (error); + g_clear_error (&msrc.error); return ret; } +/* called with session lock taken */ static GstFlowReturn gst_soup_http_src_do_request (GstSoupHTTPSrc * src, const gchar * method) { @@ -1791,13 +2013,49 @@ gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read) } } +struct GstSoupReadResult +{ + GstSoupHTTPSrc *src; + GError *error; + void *buffer; + gsize bufsize; + gssize nbytes; +}; + +static void +_session_read_cb (GObject * source, GAsyncResult * ret, gpointer user_data) +{ + struct GstSoupReadResult *res = user_data; + + g_mutex_lock (&res->src->session_mutex); + + res->nbytes = g_input_stream_read_finish (G_INPUT_STREAM (source), + ret, &res->error); + + g_cond_signal (&res->src->session_cond); + g_mutex_unlock (&res->src->session_mutex); +} + +static gboolean +_session_read_idle_cb (gpointer user_data) +{ + struct GstSoupReadResult *res = user_data; + + g_input_stream_read_async (res->src->input_stream, res->buffer, + res->bufsize, G_PRIORITY_DEFAULT, res->src->cancellable, + _session_read_cb, res); + + return FALSE; +} + static GstFlowReturn gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf) { - gssize read_bytes; + struct GstSoupReadResult res; GstMapInfo mapinfo; GstBaseSrc *bsrc; GstFlowReturn ret; + GSource *source; bsrc = GST_BASE_SRC_CAST (src); @@ -1812,31 +2070,54 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf) return GST_FLOW_ERROR; } - read_bytes = - g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size, - src->cancellable, NULL); - GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input", - read_bytes); + res.src = src; + res.buffer = mapinfo.data; + res.bufsize = mapinfo.size; + res.error = NULL; + res.nbytes = -1; - g_mutex_lock (&src->mutex); - if (g_cancellable_is_cancelled (src->cancellable)) { + source = g_idle_source_new (); + + g_mutex_lock (&src->session_mutex); + + g_source_set_callback (source, _session_read_idle_cb, &res, NULL); + /* invoke on libsoup thread */ + g_source_attach (source, g_main_loop_get_context (src->session->loop)); + g_source_unref (source); + + /* wait for it */ + while (!res.error && res.nbytes < 0) + g_cond_wait (&src->session_cond, &src->session_mutex); + g_mutex_unlock (&src->session_mutex); + + GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input", + res.nbytes); + + if (res.error) { + /* retry by default */ + GstFlowReturn ret = GST_FLOW_CUSTOM_ERROR; + if (g_error_matches (res.error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + ret = GST_FLOW_FLUSHING; + } else { + GST_ERROR_OBJECT (src, "Got error from libsoup: %s", res.error->message); + } + g_error_free (res.error); gst_buffer_unmap (*outbuf, &mapinfo); gst_buffer_unref (*outbuf); - g_mutex_unlock (&src->mutex); - return GST_FLOW_FLUSHING; + return ret; } gst_buffer_unmap (*outbuf, &mapinfo); - if (read_bytes > 0) { - gst_buffer_set_size (*outbuf, read_bytes); + if (res.nbytes > 0) { + gst_buffer_set_size (*outbuf, res.nbytes); GST_BUFFER_OFFSET (*outbuf) = bsrc->segment.position; ret = GST_FLOW_OK; - gst_soup_http_src_update_position (src, read_bytes); + gst_soup_http_src_update_position (src, res.nbytes); /* Got some data, reset retry counter */ src->retry_count = 0; - gst_soup_http_src_check_update_blocksize (src, read_bytes); + gst_soup_http_src_check_update_blocksize (src, res.nbytes); src->last_socket_read_time = g_get_monotonic_time () * GST_USECOND; @@ -1845,39 +2126,92 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf) * otherwise we would have to cancel the message and close the connection */ if (bsrc->segment.stop != -1 - && bsrc->segment.position + read_bytes >= bsrc->segment.stop) { + && bsrc->segment.position + res.nbytes >= bsrc->segment.stop) { + SoupMessage *msg = src->msg; guint8 tmp[128]; - g_object_unref (src->msg); + res.buffer = tmp; + res.bufsize = sizeof (tmp); + res.nbytes = -1; + src->msg = NULL; src->have_body = TRUE; + g_mutex_lock (&src->session_mutex); + + source = g_idle_source_new (); + + g_source_set_callback (source, _session_read_idle_cb, &res, NULL); /* This should return immediately as we're at the end of the range */ - read_bytes = - g_input_stream_read (src->input_stream, tmp, sizeof (tmp), - src->cancellable, NULL); - if (read_bytes > 0) + g_source_attach (source, g_main_loop_get_context (src->session->loop)); + g_source_unref (source); + + while (!res.error && res.nbytes < 0) + g_cond_wait (&src->session_cond, &src->session_mutex); + g_mutex_unlock (&src->session_mutex); + + g_clear_error (&res.error); + g_object_unref (msg); + + if (res.nbytes > 0) GST_ERROR_OBJECT (src, - "Read %" G_GSIZE_FORMAT " bytes after end of range", read_bytes); + "Read %" G_GSIZE_FORMAT " bytes after end of range", res.nbytes); } } else { gst_buffer_unref (*outbuf); - if (read_bytes < 0 || - (src->have_size && src->read_position < src->content_size)) { + if (src->have_size && src->read_position < src->content_size) { /* Maybe the server disconnected, retry */ ret = GST_FLOW_CUSTOM_ERROR; } else { - g_object_unref (src->msg); + g_clear_object (&src->msg); src->msg = NULL; ret = GST_FLOW_EOS; src->have_body = TRUE; } } - g_mutex_unlock (&src->mutex); + + g_clear_error (&res.error); return ret; } +static gboolean +_session_stream_clear_cb (gpointer user_data) +{ + GstSoupHTTPSrc *src = user_data; + + g_mutex_lock (&src->session_mutex); + + g_clear_object (&src->input_stream); + + g_cond_signal (&src->session_cond); + g_mutex_unlock (&src->session_mutex); + + return FALSE; +} + +static void +gst_soup_http_src_stream_clear (GstSoupHTTPSrc * src) +{ + GSource *source; + + if (!src->input_stream) + return; + + g_mutex_lock (&src->session_mutex); + + source = g_idle_source_new (); + + g_source_set_callback (source, _session_stream_clear_cb, src, NULL); + g_source_attach (source, g_main_loop_get_context (src->session->loop)); + g_source_unref (source); + + while (src->input_stream) + g_cond_wait (&src->session_cond, &src->session_mutex); + + g_mutex_unlock (&src->session_mutex); +} + static GstFlowReturn gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) { @@ -1888,33 +2222,28 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) src = GST_SOUP_HTTP_SRC (psrc); retry: - g_mutex_lock (&src->mutex); /* Check for pending position change */ - if (src->request_position != src->read_position) { - if (src->input_stream) { - g_input_stream_close (src->input_stream, src->cancellable, NULL); - g_object_unref (src->input_stream); - src->input_stream = NULL; - } + if (src->request_position != src->read_position && src->input_stream) { + gst_soup_http_src_stream_clear (src); } if (g_cancellable_is_cancelled (src->cancellable)) { ret = GST_FLOW_FLUSHING; - g_mutex_unlock (&src->mutex); goto done; } /* If we have no open connection to the server, start one */ if (!src->input_stream) { *outbuf = NULL; + g_mutex_lock (&src->session_mutex); ret = gst_soup_http_src_do_request (src, src->method ? src->method : SOUP_METHOD_GET); http_headers_event = src->http_headers_event; src->http_headers_event = NULL; + g_mutex_unlock (&src->session_mutex); } - g_mutex_unlock (&src->mutex); if (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_ERROR) { if (http_headers_event) { @@ -1932,12 +2261,9 @@ done: if (http_headers_event) gst_event_unref (http_headers_event); - g_mutex_lock (&src->mutex); if (src->input_stream) { - g_object_unref (src->input_stream); - src->input_stream = NULL; + gst_soup_http_src_stream_clear (src); } - g_mutex_unlock (&src->mutex); if (ret == GST_FLOW_CUSTOM_ERROR) { ret = GST_FLOW_OK; goto retry; @@ -1945,9 +2271,7 @@ done: } if (ret == GST_FLOW_FLUSHING) { - g_mutex_lock (&src->mutex); src->retry_count = 0; - g_mutex_unlock (&src->mutex); } return ret; @@ -1957,10 +2281,14 @@ static gboolean gst_soup_http_src_start (GstBaseSrc * bsrc) { GstSoupHTTPSrc *src = GST_SOUP_HTTP_SRC (bsrc); + gboolean ret; GST_DEBUG_OBJECT (src, "start(\"%s\")", src->location); - return gst_soup_http_src_session_open (src); + g_mutex_lock (&src->session_mutex); + ret = gst_soup_http_src_session_open (src); + g_mutex_unlock (&src->session_mutex); + return ret; } static gboolean @@ -1970,8 +2298,11 @@ gst_soup_http_src_stop (GstBaseSrc * bsrc) src = GST_SOUP_HTTP_SRC (bsrc); GST_DEBUG_OBJECT (src, "stop()"); + + gst_soup_http_src_stream_clear (src); + if (src->keep_alive && !src->msg && !src->session_is_shared) - gst_soup_http_src_cancel_message (src); + g_cancellable_cancel (src->cancellable); else gst_soup_http_src_session_close (src); @@ -2010,17 +2341,13 @@ gst_soup_http_src_set_context (GstElement * element, GstContext * context) const GstStructure *s = gst_context_get_structure (context); GST_OBJECT_LOCK (src); - if (src->external_session) - g_object_unref (src->external_session); - src->external_session = NULL; - gst_structure_get (s, "session", _soup_session_get_type (), - &src->external_session, NULL); - src->forced_external_session = FALSE; - gst_structure_get (s, "force", G_TYPE_BOOLEAN, - &src->forced_external_session, NULL); - GST_DEBUG_OBJECT (src, "Setting external session %p (force: %d)", - src->external_session, src->forced_external_session); + g_clear_object (&src->external_session); + gst_structure_get (s, "session", GST_TYPE_SOUP_SESSION, + &src->external_session, NULL); + + GST_DEBUG_OBJECT (src, "Setting external session %p", + src->external_session); GST_OBJECT_UNLOCK (src); } @@ -2036,7 +2363,7 @@ gst_soup_http_src_unlock (GstBaseSrc * bsrc) src = GST_SOUP_HTTP_SRC (bsrc); GST_DEBUG_OBJECT (src, "unlock()"); - gst_soup_http_src_cancel_message (src); + g_cancellable_cancel (src->cancellable); return TRUE; } @@ -2080,19 +2407,20 @@ gst_soup_http_src_check_seekable (GstSoupHTTPSrc * src) * loops. */ if (!src->got_headers && GST_STATE (src) >= GST_STATE_PAUSED) { - g_mutex_lock (&src->mutex); + g_mutex_lock (&src->session_mutex); while (!src->got_headers && !g_cancellable_is_cancelled (src->cancellable) && ret == GST_FLOW_OK) { if ((src->msg && _soup_message_get_method (src->msg) != SOUP_METHOD_HEAD)) { /* wait for the current request to finish */ - g_cond_wait (&src->have_headers_cond, &src->mutex); + g_cond_wait (&src->session_cond, &src->session_mutex); + ret = src->headers_ret; } else { if (gst_soup_http_src_session_open (src)) { ret = gst_soup_http_src_do_request (src, SOUP_METHOD_HEAD); } } } - g_mutex_unlock (&src->mutex); + g_mutex_unlock (&src->session_mutex); } } diff --git a/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.h b/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.h index a7184fee62..89705943bf 100644 --- a/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.h +++ b/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.h @@ -45,6 +45,9 @@ typedef enum { GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED, } GstSoupHTTPSrcSessionIOStatus; +/* opaque from here, implementation detail */ +typedef struct _GstSoupSession GstSoupSession; + struct _GstSoupHTTPSrc { GstPushSrc element; @@ -59,15 +62,15 @@ struct _GstSoupHTTPSrc { gchar *proxy_id; /* Authentication user id for proxy URI. */ gchar *proxy_pw; /* Authentication user password for proxy URI. */ gchar **cookies; /* HTTP request cookies. */ - SoupSession *session; /* Async context. */ + GstSoupSession *session; /* Libsoup session wrapper. */ gboolean session_is_shared; - SoupSession *external_session; /* Shared via GstContext */ - gboolean forced_external_session; /* If session was explicitly set from application */ + GstSoupSession *external_session; /* Shared via GstContext */ SoupMessage *msg; /* Request message. */ gint retry_count; /* Number of retries since we received data */ gint max_retries; /* Maximum number of retries */ gchar *method; /* HTTP method */ + GstFlowReturn headers_ret; gboolean got_headers; /* Already received headers from the server */ gboolean have_size; /* Received and parsed Content-Length header. */ @@ -111,8 +114,12 @@ struct _GstSoupHTTPSrc { guint timeout; - GMutex mutex; - GCond have_headers_cond; + /* This mutex-cond pair is used to talk to the soup session thread; it is + * per src to allow concurrent access to shared sessions (if it was inside + * the shared session structure, it would be effectively global) + */ + GMutex session_mutex; + GCond session_cond; GstEvent *http_headers_event; diff --git a/subprojects/gst-plugins-good/ext/soup/gstsouploader.c b/subprojects/gst-plugins-good/ext/soup/gstsouploader.c index 119f90706a..2c525a6e72 100644 --- a/subprojects/gst-plugins-good/ext/soup/gstsouploader.c +++ b/subprojects/gst-plugins-good/ext/soup/gstsouploader.c @@ -111,6 +111,10 @@ typedef struct _GstSoupVTable void (*_soup_auth_authenticate) (SoupAuth * auth, const char *username, const char *password); const char *(*_soup_message_get_method_3) (SoupMessage * msg); + GInputStream *(*_soup_session_send_async_2) (SoupSession * session, SoupMessage * msg, + GCancellable * cancellable, GAsyncReadyCallback callback, gpointer user_data); + GInputStream *(*_soup_session_send_async_3) (SoupSession * session, SoupMessage * msg, + int io_priority, GCancellable * cancellable, GAsyncReadyCallback callback, gpointer user_data); GInputStream *(*_soup_session_send_finish) (SoupSession * session, GAsyncResult * result, GError ** error); GInputStream *(*_soup_session_send) (SoupSession * session, SoupMessage * msg, @@ -220,6 +224,7 @@ gst_soup_load_library (void) LOAD_VERSIONED_SYMBOL (2, soup_uri_to_string); LOAD_VERSIONED_SYMBOL (2, soup_message_get_uri); LOAD_VERSIONED_SYMBOL (2, soup_session_cancel_message); + LOAD_VERSIONED_SYMBOL (2, soup_session_send_async); } else { vtable->lib_version = 3; LOAD_VERSIONED_SYMBOL (3, soup_logger_new); @@ -232,6 +237,7 @@ gst_soup_load_library (void) LOAD_VERSIONED_SYMBOL (3, soup_message_get_method); LOAD_VERSIONED_SYMBOL (3, soup_message_get_reason_phrase); LOAD_VERSIONED_SYMBOL (3, soup_message_get_status); + LOAD_VERSIONED_SYMBOL (3, soup_session_send_async); } LOAD_SYMBOL (soup_auth_authenticate); @@ -795,6 +801,32 @@ _soup_message_get_method (SoupMessage * msg) #endif } +void +_soup_session_send_async (SoupSession * session, SoupMessage * msg, + GCancellable * cancellable, GAsyncReadyCallback callback, + gpointer user_data) +{ +#ifdef STATIC_SOUP +#if STATIC_SOUP == 2 + return soup_session_send_async (session, msg, cancellable, + callback, user_data); +#else + return soup_session_send_async (session, msg, G_PRIORITY_DEFAULT, + cancellable, callback, user_data); +#endif +#else + if (gst_soup_vtable.lib_version == 3) { + g_assert (gst_soup_vtable._soup_session_send_async_3 != NULL); + gst_soup_vtable._soup_session_send_async_3 (session, msg, + G_PRIORITY_DEFAULT, cancellable, callback, user_data); + } else { + g_assert (gst_soup_vtable._soup_session_send_async_2 != NULL); + gst_soup_vtable._soup_session_send_async_2 (session, msg, + cancellable, callback, user_data); + } +#endif +} + GInputStream * _soup_session_send_finish (SoupSession * session, GAsyncResult * result, GError ** error) diff --git a/subprojects/gst-plugins-good/ext/soup/gstsouploader.h b/subprojects/gst-plugins-good/ext/soup/gstsouploader.h index 3025862193..ea5e16e252 100644 --- a/subprojects/gst-plugins-good/ext/soup/gstsouploader.h +++ b/subprojects/gst-plugins-good/ext/soup/gstsouploader.h @@ -99,6 +99,12 @@ void _soup_auth_authenticate (SoupAuth *auth, const char *username, const char *_soup_message_get_method (SoupMessage *msg); +void _soup_session_send_async (SoupSession *session, + SoupMessage *msg, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + GInputStream *_soup_session_send_finish (SoupSession *session, GAsyncResult *result, GError **error);