diff --git a/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.c b/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.c index 8496e8343f..ec44d48bff 100644 --- a/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.c +++ b/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.c @@ -84,6 +84,83 @@ #include +/* this is a simple wrapper class around SoupSession; it exists in order to + * have a refcountable owner for the actual SoupSession + the thread it runs + * in and its main loop (we cannot inverse the ownership hierarchy, because + * the thread + loop are actually longer lived than the session) + * + * it is entirely private to this implementation + */ + +#define GST_TYPE_SOUP_SESSION (gst_soup_session_get_type()) +#define GST_SOUP_SESSION(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SOUP_SESSION, GstSoupSession)) + +GType gst_soup_session_get_type (void); + +typedef struct _GstSoupSessionClass GstSoupSessionClass; + +struct _GstSoupSession +{ + GObject parent_instance; + + SoupSession *session; + GThread *thread; + GMainLoop *loop; +}; + +struct _GstSoupSessionClass +{ + GObjectClass parent_class; +}; + +G_DEFINE_TYPE (GstSoupSession, gst_soup_session, G_TYPE_OBJECT); + +static void +gst_soup_session_init (GstSoupSession * sess) +{ +} + +static gboolean +_soup_session_finalize_cb (gpointer user_data) +{ + GstSoupSession *sess = user_data; + + g_clear_object (&sess->session); + g_main_loop_quit (sess->loop); + + return FALSE; +} + +static void +gst_soup_session_finalize (GObject * obj) +{ + GstSoupSession *sess = GST_SOUP_SESSION (obj); + GSource *src; + + /* handle disposing of failure cases */ + if (!sess->loop) + return; + + src = g_idle_source_new (); + + g_source_set_callback (src, _soup_session_finalize_cb, sess, NULL); + g_source_attach (src, g_main_loop_get_context (sess->loop)); + g_source_unref (src); + + /* finish off thread and the loop; ensure it's not from the thread */ + g_assert (!g_main_context_is_owner (g_main_loop_get_context (sess->loop))); + g_thread_join (sess->thread); + g_main_loop_unref (sess->loop); +} + +static void +gst_soup_session_class_init (GstSoupSessionClass * klass) +{ + GObjectClass *gclass = G_OBJECT_CLASS (klass); + + gclass->finalize = gst_soup_session_finalize; +} + GST_DEBUG_CATEGORY_STATIC (souphttpsrc_debug); #define GST_CAT_DEFAULT souphttpsrc_debug @@ -176,7 +253,6 @@ static gboolean gst_soup_http_src_set_proxy (GstSoupHTTPSrc * src, static char *gst_soup_http_src_unicodify (const char *str); static gboolean gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method); -static void gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src); static gboolean gst_soup_http_src_add_range_header (GstSoupHTTPSrc * src, guint64 offset, guint64 stop_offset); static gboolean gst_soup_http_src_session_open (GstSoupHTTPSrc * src); @@ -455,6 +531,7 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src) src->retry_count = 0; src->have_size = FALSE; src->got_headers = FALSE; + src->headers_ret = GST_FLOW_OK; src->seekable = FALSE; src->read_position = 0; src->request_position = 0; @@ -467,12 +544,6 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src) src->last_socket_read_time = 0; g_cancellable_reset (src->cancellable); - g_mutex_lock (&src->mutex); - if (src->input_stream) { - g_object_unref (src->input_stream); - src->input_stream = NULL; - } - g_mutex_unlock (&src->mutex); gst_caps_replace (&src->src_caps, NULL); g_free (src->iradio_name); @@ -488,8 +559,8 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src) { const gchar *proxy; - g_mutex_init (&src->mutex); - g_cond_init (&src->have_headers_cond); + g_mutex_init (&src->session_mutex); + g_cond_init (&src->session_cond); src->cancellable = g_cancellable_new (); src->location = NULL; src->redirection_uri = NULL; @@ -503,7 +574,6 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src) src->iradio_mode = DEFAULT_IRADIO_MODE; src->session = NULL; src->external_session = NULL; - src->forced_external_session = FALSE; src->msg = NULL; src->timeout = DEFAULT_TIMEOUT; src->log_level = DEFAULT_SOUP_LOG_LEVEL; @@ -537,10 +607,7 @@ gst_soup_http_src_dispose (GObject * gobject) gst_soup_http_src_session_close (src); - if (src->external_session) { - g_object_unref (src->external_session); - src->external_session = NULL; - } + g_clear_object (&src->external_session); G_OBJECT_CLASS (parent_class)->dispose (gobject); } @@ -552,8 +619,8 @@ gst_soup_http_src_finalize (GObject * gobject) GST_DEBUG_OBJECT (src, "finalize"); - g_mutex_clear (&src->mutex); - g_cond_clear (&src->have_headers_cond); + g_mutex_clear (&src->session_mutex); + g_cond_clear (&src->session_cond); g_object_unref (src->cancellable); g_free (src->location); g_free (src->redirection_uri); @@ -809,13 +876,6 @@ gst_soup_http_src_unicodify (const gchar * str) return gst_tag_freeform_string_to_utf8 (str, -1, env_vars); } -static void -gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src) -{ - g_cancellable_cancel (src->cancellable); - g_cond_signal (&src->have_headers_cond); -} - static gboolean gst_soup_http_src_add_range_header (GstSoupHTTPSrc * src, guint64 offset, guint64 stop_offset) @@ -921,10 +981,136 @@ gst_soup_http_src_add_extra_headers (GstSoupHTTPSrc * src) return gst_structure_foreach (src->extra_headers, _append_extra_headers, src); } +static gpointer +thread_func (gpointer user_data) +{ + GstSoupHTTPSrc *src = user_data; + GMainContext *ctx; + + GST_DEBUG_OBJECT (src, "thread start"); + + ctx = g_main_loop_get_context (src->session->loop); + + g_main_context_push_thread_default (ctx); + + /* We explicitly set User-Agent to NULL here and overwrite it per message + * to be able to have the same session with different User-Agents per + * source */ + src->session->session = + _soup_session_new_with_options ("user-agent", NULL, + "timeout", src->timeout, "tls-interaction", src->tls_interaction, + /* Unset the limit the number of maximum allowed connections */ + "max-conns", src->session_is_shared ? G_MAXINT : 10, + "max-conns-per-host", src->session_is_shared ? G_MAXINT : 2, NULL); + + if (!src->session->session) { + return NULL; + } + + if (gst_soup_loader_get_api_version () == 3) { + if (src->proxy != NULL) { + GProxyResolver *proxy_resolver; + char *proxy_string = gst_soup_uri_to_string (src->proxy); + proxy_resolver = g_simple_proxy_resolver_new (proxy_string, NULL); + g_free (proxy_string); + g_object_set (src->session->session, "proxy-resolver", proxy_resolver, + NULL); + g_object_unref (proxy_resolver); + } + } else { + g_object_set (src->session->session, "ssl-strict", src->ssl_strict, NULL); + if (src->proxy != NULL) { + g_object_set (src->session->session, "proxy-uri", src->proxy->soup_uri, + NULL); + } + } + + gst_soup_util_log_setup (src->session->session, src->log_level, + GST_ELEMENT (src)); + if (gst_soup_loader_get_api_version () < 3) { + _soup_session_add_feature_by_type (src->session->session, + _soup_content_decoder_get_type ()); + } + _soup_session_add_feature_by_type (src->session->session, + _soup_cookie_jar_get_type ()); + + if (src->session_is_shared) { + GstContext *context; + GstMessage *message; + GstStructure *s; + + GST_DEBUG_OBJECT (src, "Sharing session %p", src->session->session); + + context = gst_context_new (GST_SOUP_SESSION_CONTEXT, TRUE); + s = gst_context_writable_structure (context); + gst_structure_set (s, "session", GST_TYPE_SOUP_SESSION, src->session, NULL); + + /* during this time the src is locked by the parent thread, + * which is waiting, so this is safe to do + */ + GST_OBJECT_UNLOCK (src); + gst_element_set_context (GST_ELEMENT_CAST (src), context); + message = gst_message_new_have_context (GST_OBJECT_CAST (src), context); + gst_element_post_message (GST_ELEMENT_CAST (src), message); + GST_OBJECT_LOCK (src); + } else { + src->session_is_shared = FALSE; + } + + /* soup2: connect the authenticate handler for the src that spawned the + * session (i.e. the first owner); other users of this session will connect + * their own after fetching the external session; the callback will handle + * this correctly (it checks if the message belongs to the current src + * and exits early if it does not) + */ + if (gst_soup_loader_get_api_version () < 3) { + g_signal_connect (src->session->session, "authenticate", + G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src); + } + + if (!src->session_is_shared) { + if (src->tls_database) + g_object_set (src->session->session, "tls-database", src->tls_database, + NULL); + else if (gst_soup_loader_get_api_version () == 2) { + if (src->ssl_ca_file) + g_object_set (src->session->session, "ssl-ca-file", src->ssl_ca_file, + NULL); + else + g_object_set (src->session->session, "ssl-use-system-ca-file", + src->ssl_use_system_ca_file, NULL); + } + } + + g_main_loop_run (src->session->loop); + + g_main_context_pop_thread_default (ctx); + + GST_DEBUG_OBJECT (src, "thread stop"); + + return NULL; +} + +static gboolean +_session_ready_cb (gpointer user_data) +{ + GstSoupHTTPSrc *src = user_data; + + GST_DEBUG_OBJECT (src, "thread ready"); + + g_mutex_lock (&src->session_mutex); + g_cond_signal (&src->session_cond); + g_mutex_unlock (&src->session_mutex); + + return FALSE; +} + +/* called with session_mutex taken */ static gboolean gst_soup_http_src_session_open (GstSoupHTTPSrc * src) { - GProxyResolver *proxy_resolver; + GstQuery *query; + gboolean can_share; if (src->session) { GST_DEBUG_OBJECT (src, "Session is already open"); @@ -937,158 +1123,148 @@ gst_soup_http_src_session_open (GstSoupHTTPSrc * src) return FALSE; } - if (!src->session) { - GstQuery *query; - gboolean can_share = (src->timeout == DEFAULT_TIMEOUT) - && (src->cookies == NULL) - && (src->ssl_strict == DEFAULT_SSL_STRICT) - && (src->tls_interaction == NULL) && (src->proxy == NULL) - && (src->tls_database == DEFAULT_TLS_DATABASE); + can_share = (src->timeout == DEFAULT_TIMEOUT) + && (src->cookies == NULL) + && (src->ssl_strict == DEFAULT_SSL_STRICT) + && (src->tls_interaction == NULL) && (src->proxy == NULL) + && (src->tls_database == DEFAULT_TLS_DATABASE); - if (gst_soup_loader_get_api_version () == 2) - can_share = can_share && (src->ssl_ca_file == DEFAULT_SSL_CA_FILE) && - (src->ssl_use_system_ca_file == DEFAULT_SSL_USE_SYSTEM_CA_FILE); + if (gst_soup_loader_get_api_version () == 2) + can_share = can_share && (src->ssl_ca_file == DEFAULT_SSL_CA_FILE) && + (src->ssl_use_system_ca_file == DEFAULT_SSL_USE_SYSTEM_CA_FILE); - query = gst_query_new_context (GST_SOUP_SESSION_CONTEXT); - if (gst_pad_peer_query (GST_BASE_SRC_PAD (src), query)) { - GstContext *context; + query = gst_query_new_context (GST_SOUP_SESSION_CONTEXT); + if (gst_pad_peer_query (GST_BASE_SRC_PAD (src), query)) { + GstContext *context; - gst_query_parse_context (query, &context); - gst_element_set_context (GST_ELEMENT_CAST (src), context); - } else { - GstMessage *message; + gst_query_parse_context (query, &context); + gst_element_set_context (GST_ELEMENT_CAST (src), context); + } else { + GstMessage *message; - message = - gst_message_new_need_context (GST_OBJECT_CAST (src), - GST_SOUP_SESSION_CONTEXT); - gst_element_post_message (GST_ELEMENT_CAST (src), message); - } - gst_query_unref (query); + message = + gst_message_new_need_context (GST_OBJECT_CAST (src), + GST_SOUP_SESSION_CONTEXT); + gst_element_post_message (GST_ELEMENT_CAST (src), message); + } + gst_query_unref (query); - GST_OBJECT_LOCK (src); - if (src->external_session && (can_share || src->forced_external_session)) { - GST_DEBUG_OBJECT (src, "Using external session %p", - src->external_session); - src->session = g_object_ref (src->external_session); - src->session_is_shared = TRUE; - } else { - GST_DEBUG_OBJECT (src, "Creating session (can share %d)", can_share); + GST_OBJECT_LOCK (src); - /* We explicitly set User-Agent to NULL here and overwrite it per message - * to be able to have the same session with different User-Agents per - * source */ - src->session = - _soup_session_new_with_options ("user-agent", NULL, - "timeout", src->timeout, "tls-interaction", src->tls_interaction, - /* Unset the limit the number of maximum allowed connections */ - "max-conns", can_share ? G_MAXINT : 10, - "max-conns-per-host", can_share ? G_MAXINT : 2, NULL); - - if (gst_soup_loader_get_api_version () == 3) { - if (src->proxy != NULL) { - char *proxy_string = gst_soup_uri_to_string (src->proxy); - proxy_resolver = g_simple_proxy_resolver_new (proxy_string, NULL); - g_free (proxy_string); - g_object_set (src->session, "proxy-resolver", proxy_resolver, NULL); - g_object_unref (proxy_resolver); - } - } else { - g_object_set (src->session, "ssl-strict", src->ssl_strict, NULL); - if (src->proxy != NULL) { - g_object_set (src->session, "proxy-uri", src->proxy->soup_uri, NULL); - } - } - - if (src->session) { - gst_soup_util_log_setup (src->session, src->log_level, - GST_ELEMENT (src)); - if (gst_soup_loader_get_api_version () < 3) { - _soup_session_add_feature_by_type (src->session, - _soup_content_decoder_get_type ()); - } - _soup_session_add_feature_by_type (src->session, - _soup_cookie_jar_get_type ()); - - if (can_share) { - GstContext *context; - GstMessage *message; - GstStructure *s; - - GST_DEBUG_OBJECT (src, "Sharing session %p", src->session); - src->session_is_shared = TRUE; - - context = gst_context_new (GST_SOUP_SESSION_CONTEXT, TRUE); - s = gst_context_writable_structure (context); - gst_structure_set (s, "session", _soup_session_get_type (), - src->session, "force", G_TYPE_BOOLEAN, FALSE, NULL); - - gst_object_ref (src->session); - GST_OBJECT_UNLOCK (src); - gst_element_set_context (GST_ELEMENT_CAST (src), context); - message = - gst_message_new_have_context (GST_OBJECT_CAST (src), context); - gst_element_post_message (GST_ELEMENT_CAST (src), message); - GST_OBJECT_LOCK (src); - gst_object_unref (src->session); - } else { - src->session_is_shared = FALSE; - } - } - } - - if (!src->session) { - GST_ELEMENT_ERROR (src, LIBRARY, INIT, - (NULL), ("Failed to create session")); - GST_OBJECT_UNLOCK (src); - return FALSE; - } + src->session_is_shared = can_share; + if (src->external_session && can_share) { + GST_DEBUG_OBJECT (src, "Using external session %p", src->external_session); + src->session = g_object_ref (src->external_session); + /* for soup2, connect another authenticate handler; see thread_func */ if (gst_soup_loader_get_api_version () < 3) { - g_signal_connect (src->session, "authenticate", + g_signal_connect (src->session->session, "authenticate", G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src); } - - if (!src->session_is_shared) { - if (src->tls_database) - g_object_set (src->session, "tls-database", src->tls_database, NULL); - else if (gst_soup_loader_get_api_version () == 2) { - if (src->ssl_ca_file) - g_object_set (src->session, "ssl-ca-file", src->ssl_ca_file, NULL); - else - g_object_set (src->session, "ssl-use-system-ca-file", - src->ssl_use_system_ca_file, NULL); - } - } - GST_OBJECT_UNLOCK (src); } else { - GST_DEBUG_OBJECT (src, "Re-using session"); + GMainContext *ctx; + GSource *source; + + GST_DEBUG_OBJECT (src, "Creating session (can share %d)", can_share); + + src->session = + GST_SOUP_SESSION (g_object_new (GST_TYPE_SOUP_SESSION, NULL)); + + ctx = g_main_context_new (); + + src->session->loop = g_main_loop_new (ctx, FALSE); + /* now owned by the loop */ + g_main_context_unref (ctx); + + src->session->thread = g_thread_try_new ("souphttpsrc-thread", + thread_func, src, NULL); + + if (!src->session->thread) { + goto err; + } + + source = g_idle_source_new (); + g_source_set_callback (source, _session_ready_cb, src, NULL); + g_source_attach (source, ctx); + g_source_unref (source); + + GST_DEBUG_OBJECT (src, "Waiting for thread to start..."); + while (!g_main_loop_is_running (src->session->loop)) + g_cond_wait (&src->session_cond, &src->session_mutex); + GST_DEBUG_OBJECT (src, "Soup thread started"); } + GST_OBJECT_UNLOCK (src); + return TRUE; + +err: + g_clear_object (&src->session); + GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL), ("Failed to create session")); + GST_OBJECT_UNLOCK (src); + + return FALSE; +} + +static gboolean +_session_close_cb (gpointer user_data) +{ + GstSoupHTTPSrc *src = user_data; + + if (src->msg) { + gst_soup_session_cancel_message (src->session->session, src->msg, + src->cancellable); + g_clear_object (&src->msg); + } + + if (src->session_is_shared) + _soup_session_abort (src->session->session); + + /* there may be multiple of this callback attached to the session, + * each with different data pointer; disconnect the one we are closing + * the session for, leave the others alone + */ + g_signal_handlers_disconnect_by_func (src->session->session, + G_CALLBACK (gst_soup_http_src_authenticate_cb_2), src); + + g_mutex_lock (&src->session_mutex); + g_clear_object (&src->session); + g_cond_signal (&src->session_cond); + g_mutex_unlock (&src->session_mutex); + + return FALSE; } static void gst_soup_http_src_session_close (GstSoupHTTPSrc * src) { + GSource *source; + GstSoupSession *sess; + GST_DEBUG_OBJECT (src, "Closing session"); - g_mutex_lock (&src->mutex); - if (src->msg) { - gst_soup_session_cancel_message (src->session, src->msg, src->cancellable); - g_object_unref (src->msg); - src->msg = NULL; + if (!src->session) { + return; } - if (src->session) { - if (!src->session_is_shared) - _soup_session_abort (src->session); - g_signal_handlers_disconnect_by_func (src->session, - G_CALLBACK (gst_soup_http_src_authenticate_cb), src); - g_object_unref (src->session); - src->session = NULL; - } + /* ensure _session_close_cb does not deadlock us */ + sess = g_object_ref (src->session); - g_mutex_unlock (&src->mutex); + source = g_idle_source_new (); + + g_mutex_lock (&src->session_mutex); + + g_source_set_callback (source, _session_close_cb, src, NULL); + g_source_attach (source, g_main_loop_get_context (src->session->loop)); + g_source_unref (source); + + while (src->session) + g_cond_wait (&src->session_cond, &src->session_mutex); + + g_mutex_unlock (&src->session_mutex); + + /* finally dispose of our reference from the gst thread */ + g_object_unref (sess); } static void @@ -1235,7 +1411,6 @@ gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg) } src->got_headers = TRUE; - g_cond_broadcast (&src->have_headers_cond); http_headers_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, http_headers); @@ -1610,51 +1785,98 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method) return TRUE; } -/* Lock taken */ +struct GstSoupSendSrc +{ + GstSoupHTTPSrc *src; + GError *error; +}; + +static void +_session_send_cb (GObject * source, GAsyncResult * res, gpointer user_data) +{ + struct GstSoupSendSrc *msrc = user_data; + GstSoupHTTPSrc *src = msrc->src; + GError *error = NULL; + + g_mutex_lock (&src->session_mutex); + + src->input_stream = _soup_session_send_finish (src->session->session, + res, &error); + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + src->headers_ret = GST_FLOW_FLUSHING; + } else { + src->headers_ret = gst_soup_http_src_got_headers (src, src->msg); + } + + if (!src->input_stream) { + GST_DEBUG_OBJECT (src, "Sending message failed: %s", error->message); + msrc->error = error; + } + + g_cond_broadcast (&src->session_cond); + g_mutex_unlock (&src->session_mutex); +} + +static gboolean +_session_send_idle_cb (gpointer user_data) +{ + struct GstSoupSendSrc *msrc = user_data; + GstSoupHTTPSrc *src = msrc->src; + + _soup_session_send_async (src->session->session, src->msg, src->cancellable, + _session_send_cb, msrc); + + return FALSE; +} + +/* called with session lock taken */ static GstFlowReturn gst_soup_http_src_send_message (GstSoupHTTPSrc * src) { GstFlowReturn ret; - GError *error = NULL; + GSource *source; + struct GstSoupSendSrc msrc; g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR); g_assert (src->input_stream == NULL); - src->input_stream = - _soup_session_send (src->session, src->msg, src->cancellable, &error); + msrc.src = src; + msrc.error = NULL; - if (error) - GST_DEBUG_OBJECT (src, "Sending message failed: %s", error->message); + source = g_idle_source_new (); - if (g_cancellable_is_cancelled (src->cancellable)) { - ret = GST_FLOW_FLUSHING; - goto done; - } + src->headers_ret = GST_FLOW_OK; + + g_source_set_callback (source, _session_send_idle_cb, &msrc, NULL); + g_source_attach (source, g_main_loop_get_context (src->session->loop)); + g_source_unref (source); + + while (!src->input_stream && !msrc.error) + g_cond_wait (&src->session_cond, &src->session_mutex); + + ret = src->headers_ret; - ret = gst_soup_http_src_got_headers (src, src->msg); if (ret != GST_FLOW_OK) { goto done; } if (!src->input_stream) { - GST_DEBUG_OBJECT (src, "Didn't get an input stream: %s", error->message); + GST_DEBUG_OBJECT (src, "Didn't get an input stream: %s", + msrc.error->message); ret = GST_FLOW_ERROR; goto done; } - if (SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (src->msg))) { - GST_DEBUG_OBJECT (src, "Successfully got a reply"); - } else { - /* FIXME - be more helpful to people debugging */ - ret = GST_FLOW_ERROR; - } + /* if an input stream exists, it was always successful */ + GST_DEBUG_OBJECT (src, "Successfully got a reply"); done: - if (error) - g_error_free (error); + g_clear_error (&msrc.error); return ret; } +/* called with session lock taken */ static GstFlowReturn gst_soup_http_src_do_request (GstSoupHTTPSrc * src, const gchar * method) { @@ -1791,13 +2013,49 @@ gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read) } } +struct GstSoupReadResult +{ + GstSoupHTTPSrc *src; + GError *error; + void *buffer; + gsize bufsize; + gssize nbytes; +}; + +static void +_session_read_cb (GObject * source, GAsyncResult * ret, gpointer user_data) +{ + struct GstSoupReadResult *res = user_data; + + g_mutex_lock (&res->src->session_mutex); + + res->nbytes = g_input_stream_read_finish (G_INPUT_STREAM (source), + ret, &res->error); + + g_cond_signal (&res->src->session_cond); + g_mutex_unlock (&res->src->session_mutex); +} + +static gboolean +_session_read_idle_cb (gpointer user_data) +{ + struct GstSoupReadResult *res = user_data; + + g_input_stream_read_async (res->src->input_stream, res->buffer, + res->bufsize, G_PRIORITY_DEFAULT, res->src->cancellable, + _session_read_cb, res); + + return FALSE; +} + static GstFlowReturn gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf) { - gssize read_bytes; + struct GstSoupReadResult res; GstMapInfo mapinfo; GstBaseSrc *bsrc; GstFlowReturn ret; + GSource *source; bsrc = GST_BASE_SRC_CAST (src); @@ -1812,31 +2070,54 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf) return GST_FLOW_ERROR; } - read_bytes = - g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size, - src->cancellable, NULL); - GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input", - read_bytes); + res.src = src; + res.buffer = mapinfo.data; + res.bufsize = mapinfo.size; + res.error = NULL; + res.nbytes = -1; - g_mutex_lock (&src->mutex); - if (g_cancellable_is_cancelled (src->cancellable)) { + source = g_idle_source_new (); + + g_mutex_lock (&src->session_mutex); + + g_source_set_callback (source, _session_read_idle_cb, &res, NULL); + /* invoke on libsoup thread */ + g_source_attach (source, g_main_loop_get_context (src->session->loop)); + g_source_unref (source); + + /* wait for it */ + while (!res.error && res.nbytes < 0) + g_cond_wait (&src->session_cond, &src->session_mutex); + g_mutex_unlock (&src->session_mutex); + + GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input", + res.nbytes); + + if (res.error) { + /* retry by default */ + GstFlowReturn ret = GST_FLOW_CUSTOM_ERROR; + if (g_error_matches (res.error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + ret = GST_FLOW_FLUSHING; + } else { + GST_ERROR_OBJECT (src, "Got error from libsoup: %s", res.error->message); + } + g_error_free (res.error); gst_buffer_unmap (*outbuf, &mapinfo); gst_buffer_unref (*outbuf); - g_mutex_unlock (&src->mutex); - return GST_FLOW_FLUSHING; + return ret; } gst_buffer_unmap (*outbuf, &mapinfo); - if (read_bytes > 0) { - gst_buffer_set_size (*outbuf, read_bytes); + if (res.nbytes > 0) { + gst_buffer_set_size (*outbuf, res.nbytes); GST_BUFFER_OFFSET (*outbuf) = bsrc->segment.position; ret = GST_FLOW_OK; - gst_soup_http_src_update_position (src, read_bytes); + gst_soup_http_src_update_position (src, res.nbytes); /* Got some data, reset retry counter */ src->retry_count = 0; - gst_soup_http_src_check_update_blocksize (src, read_bytes); + gst_soup_http_src_check_update_blocksize (src, res.nbytes); src->last_socket_read_time = g_get_monotonic_time () * GST_USECOND; @@ -1845,39 +2126,92 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf) * otherwise we would have to cancel the message and close the connection */ if (bsrc->segment.stop != -1 - && bsrc->segment.position + read_bytes >= bsrc->segment.stop) { + && bsrc->segment.position + res.nbytes >= bsrc->segment.stop) { + SoupMessage *msg = src->msg; guint8 tmp[128]; - g_object_unref (src->msg); + res.buffer = tmp; + res.bufsize = sizeof (tmp); + res.nbytes = -1; + src->msg = NULL; src->have_body = TRUE; + g_mutex_lock (&src->session_mutex); + + source = g_idle_source_new (); + + g_source_set_callback (source, _session_read_idle_cb, &res, NULL); /* This should return immediately as we're at the end of the range */ - read_bytes = - g_input_stream_read (src->input_stream, tmp, sizeof (tmp), - src->cancellable, NULL); - if (read_bytes > 0) + g_source_attach (source, g_main_loop_get_context (src->session->loop)); + g_source_unref (source); + + while (!res.error && res.nbytes < 0) + g_cond_wait (&src->session_cond, &src->session_mutex); + g_mutex_unlock (&src->session_mutex); + + g_clear_error (&res.error); + g_object_unref (msg); + + if (res.nbytes > 0) GST_ERROR_OBJECT (src, - "Read %" G_GSIZE_FORMAT " bytes after end of range", read_bytes); + "Read %" G_GSIZE_FORMAT " bytes after end of range", res.nbytes); } } else { gst_buffer_unref (*outbuf); - if (read_bytes < 0 || - (src->have_size && src->read_position < src->content_size)) { + if (src->have_size && src->read_position < src->content_size) { /* Maybe the server disconnected, retry */ ret = GST_FLOW_CUSTOM_ERROR; } else { - g_object_unref (src->msg); + g_clear_object (&src->msg); src->msg = NULL; ret = GST_FLOW_EOS; src->have_body = TRUE; } } - g_mutex_unlock (&src->mutex); + + g_clear_error (&res.error); return ret; } +static gboolean +_session_stream_clear_cb (gpointer user_data) +{ + GstSoupHTTPSrc *src = user_data; + + g_mutex_lock (&src->session_mutex); + + g_clear_object (&src->input_stream); + + g_cond_signal (&src->session_cond); + g_mutex_unlock (&src->session_mutex); + + return FALSE; +} + +static void +gst_soup_http_src_stream_clear (GstSoupHTTPSrc * src) +{ + GSource *source; + + if (!src->input_stream) + return; + + g_mutex_lock (&src->session_mutex); + + source = g_idle_source_new (); + + g_source_set_callback (source, _session_stream_clear_cb, src, NULL); + g_source_attach (source, g_main_loop_get_context (src->session->loop)); + g_source_unref (source); + + while (src->input_stream) + g_cond_wait (&src->session_cond, &src->session_mutex); + + g_mutex_unlock (&src->session_mutex); +} + static GstFlowReturn gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) { @@ -1888,33 +2222,28 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) src = GST_SOUP_HTTP_SRC (psrc); retry: - g_mutex_lock (&src->mutex); /* Check for pending position change */ - if (src->request_position != src->read_position) { - if (src->input_stream) { - g_input_stream_close (src->input_stream, src->cancellable, NULL); - g_object_unref (src->input_stream); - src->input_stream = NULL; - } + if (src->request_position != src->read_position && src->input_stream) { + gst_soup_http_src_stream_clear (src); } if (g_cancellable_is_cancelled (src->cancellable)) { ret = GST_FLOW_FLUSHING; - g_mutex_unlock (&src->mutex); goto done; } /* If we have no open connection to the server, start one */ if (!src->input_stream) { *outbuf = NULL; + g_mutex_lock (&src->session_mutex); ret = gst_soup_http_src_do_request (src, src->method ? src->method : SOUP_METHOD_GET); http_headers_event = src->http_headers_event; src->http_headers_event = NULL; + g_mutex_unlock (&src->session_mutex); } - g_mutex_unlock (&src->mutex); if (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_ERROR) { if (http_headers_event) { @@ -1932,12 +2261,9 @@ done: if (http_headers_event) gst_event_unref (http_headers_event); - g_mutex_lock (&src->mutex); if (src->input_stream) { - g_object_unref (src->input_stream); - src->input_stream = NULL; + gst_soup_http_src_stream_clear (src); } - g_mutex_unlock (&src->mutex); if (ret == GST_FLOW_CUSTOM_ERROR) { ret = GST_FLOW_OK; goto retry; @@ -1945,9 +2271,7 @@ done: } if (ret == GST_FLOW_FLUSHING) { - g_mutex_lock (&src->mutex); src->retry_count = 0; - g_mutex_unlock (&src->mutex); } return ret; @@ -1957,10 +2281,14 @@ static gboolean gst_soup_http_src_start (GstBaseSrc * bsrc) { GstSoupHTTPSrc *src = GST_SOUP_HTTP_SRC (bsrc); + gboolean ret; GST_DEBUG_OBJECT (src, "start(\"%s\")", src->location); - return gst_soup_http_src_session_open (src); + g_mutex_lock (&src->session_mutex); + ret = gst_soup_http_src_session_open (src); + g_mutex_unlock (&src->session_mutex); + return ret; } static gboolean @@ -1970,8 +2298,11 @@ gst_soup_http_src_stop (GstBaseSrc * bsrc) src = GST_SOUP_HTTP_SRC (bsrc); GST_DEBUG_OBJECT (src, "stop()"); + + gst_soup_http_src_stream_clear (src); + if (src->keep_alive && !src->msg && !src->session_is_shared) - gst_soup_http_src_cancel_message (src); + g_cancellable_cancel (src->cancellable); else gst_soup_http_src_session_close (src); @@ -2010,17 +2341,13 @@ gst_soup_http_src_set_context (GstElement * element, GstContext * context) const GstStructure *s = gst_context_get_structure (context); GST_OBJECT_LOCK (src); - if (src->external_session) - g_object_unref (src->external_session); - src->external_session = NULL; - gst_structure_get (s, "session", _soup_session_get_type (), - &src->external_session, NULL); - src->forced_external_session = FALSE; - gst_structure_get (s, "force", G_TYPE_BOOLEAN, - &src->forced_external_session, NULL); - GST_DEBUG_OBJECT (src, "Setting external session %p (force: %d)", - src->external_session, src->forced_external_session); + g_clear_object (&src->external_session); + gst_structure_get (s, "session", GST_TYPE_SOUP_SESSION, + &src->external_session, NULL); + + GST_DEBUG_OBJECT (src, "Setting external session %p", + src->external_session); GST_OBJECT_UNLOCK (src); } @@ -2036,7 +2363,7 @@ gst_soup_http_src_unlock (GstBaseSrc * bsrc) src = GST_SOUP_HTTP_SRC (bsrc); GST_DEBUG_OBJECT (src, "unlock()"); - gst_soup_http_src_cancel_message (src); + g_cancellable_cancel (src->cancellable); return TRUE; } @@ -2080,19 +2407,20 @@ gst_soup_http_src_check_seekable (GstSoupHTTPSrc * src) * loops. */ if (!src->got_headers && GST_STATE (src) >= GST_STATE_PAUSED) { - g_mutex_lock (&src->mutex); + g_mutex_lock (&src->session_mutex); while (!src->got_headers && !g_cancellable_is_cancelled (src->cancellable) && ret == GST_FLOW_OK) { if ((src->msg && _soup_message_get_method (src->msg) != SOUP_METHOD_HEAD)) { /* wait for the current request to finish */ - g_cond_wait (&src->have_headers_cond, &src->mutex); + g_cond_wait (&src->session_cond, &src->session_mutex); + ret = src->headers_ret; } else { if (gst_soup_http_src_session_open (src)) { ret = gst_soup_http_src_do_request (src, SOUP_METHOD_HEAD); } } } - g_mutex_unlock (&src->mutex); + g_mutex_unlock (&src->session_mutex); } } diff --git a/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.h b/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.h index a7184fee62..89705943bf 100644 --- a/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.h +++ b/subprojects/gst-plugins-good/ext/soup/gstsouphttpsrc.h @@ -45,6 +45,9 @@ typedef enum { GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED, } GstSoupHTTPSrcSessionIOStatus; +/* opaque from here, implementation detail */ +typedef struct _GstSoupSession GstSoupSession; + struct _GstSoupHTTPSrc { GstPushSrc element; @@ -59,15 +62,15 @@ struct _GstSoupHTTPSrc { gchar *proxy_id; /* Authentication user id for proxy URI. */ gchar *proxy_pw; /* Authentication user password for proxy URI. */ gchar **cookies; /* HTTP request cookies. */ - SoupSession *session; /* Async context. */ + GstSoupSession *session; /* Libsoup session wrapper. */ gboolean session_is_shared; - SoupSession *external_session; /* Shared via GstContext */ - gboolean forced_external_session; /* If session was explicitly set from application */ + GstSoupSession *external_session; /* Shared via GstContext */ SoupMessage *msg; /* Request message. */ gint retry_count; /* Number of retries since we received data */ gint max_retries; /* Maximum number of retries */ gchar *method; /* HTTP method */ + GstFlowReturn headers_ret; gboolean got_headers; /* Already received headers from the server */ gboolean have_size; /* Received and parsed Content-Length header. */ @@ -111,8 +114,12 @@ struct _GstSoupHTTPSrc { guint timeout; - GMutex mutex; - GCond have_headers_cond; + /* This mutex-cond pair is used to talk to the soup session thread; it is + * per src to allow concurrent access to shared sessions (if it was inside + * the shared session structure, it would be effectively global) + */ + GMutex session_mutex; + GCond session_cond; GstEvent *http_headers_event; diff --git a/subprojects/gst-plugins-good/ext/soup/gstsouploader.c b/subprojects/gst-plugins-good/ext/soup/gstsouploader.c index 119f90706a..2c525a6e72 100644 --- a/subprojects/gst-plugins-good/ext/soup/gstsouploader.c +++ b/subprojects/gst-plugins-good/ext/soup/gstsouploader.c @@ -111,6 +111,10 @@ typedef struct _GstSoupVTable void (*_soup_auth_authenticate) (SoupAuth * auth, const char *username, const char *password); const char *(*_soup_message_get_method_3) (SoupMessage * msg); + GInputStream *(*_soup_session_send_async_2) (SoupSession * session, SoupMessage * msg, + GCancellable * cancellable, GAsyncReadyCallback callback, gpointer user_data); + GInputStream *(*_soup_session_send_async_3) (SoupSession * session, SoupMessage * msg, + int io_priority, GCancellable * cancellable, GAsyncReadyCallback callback, gpointer user_data); GInputStream *(*_soup_session_send_finish) (SoupSession * session, GAsyncResult * result, GError ** error); GInputStream *(*_soup_session_send) (SoupSession * session, SoupMessage * msg, @@ -220,6 +224,7 @@ gst_soup_load_library (void) LOAD_VERSIONED_SYMBOL (2, soup_uri_to_string); LOAD_VERSIONED_SYMBOL (2, soup_message_get_uri); LOAD_VERSIONED_SYMBOL (2, soup_session_cancel_message); + LOAD_VERSIONED_SYMBOL (2, soup_session_send_async); } else { vtable->lib_version = 3; LOAD_VERSIONED_SYMBOL (3, soup_logger_new); @@ -232,6 +237,7 @@ gst_soup_load_library (void) LOAD_VERSIONED_SYMBOL (3, soup_message_get_method); LOAD_VERSIONED_SYMBOL (3, soup_message_get_reason_phrase); LOAD_VERSIONED_SYMBOL (3, soup_message_get_status); + LOAD_VERSIONED_SYMBOL (3, soup_session_send_async); } LOAD_SYMBOL (soup_auth_authenticate); @@ -795,6 +801,32 @@ _soup_message_get_method (SoupMessage * msg) #endif } +void +_soup_session_send_async (SoupSession * session, SoupMessage * msg, + GCancellable * cancellable, GAsyncReadyCallback callback, + gpointer user_data) +{ +#ifdef STATIC_SOUP +#if STATIC_SOUP == 2 + return soup_session_send_async (session, msg, cancellable, + callback, user_data); +#else + return soup_session_send_async (session, msg, G_PRIORITY_DEFAULT, + cancellable, callback, user_data); +#endif +#else + if (gst_soup_vtable.lib_version == 3) { + g_assert (gst_soup_vtable._soup_session_send_async_3 != NULL); + gst_soup_vtable._soup_session_send_async_3 (session, msg, + G_PRIORITY_DEFAULT, cancellable, callback, user_data); + } else { + g_assert (gst_soup_vtable._soup_session_send_async_2 != NULL); + gst_soup_vtable._soup_session_send_async_2 (session, msg, + cancellable, callback, user_data); + } +#endif +} + GInputStream * _soup_session_send_finish (SoupSession * session, GAsyncResult * result, GError ** error) diff --git a/subprojects/gst-plugins-good/ext/soup/gstsouploader.h b/subprojects/gst-plugins-good/ext/soup/gstsouploader.h index 3025862193..ea5e16e252 100644 --- a/subprojects/gst-plugins-good/ext/soup/gstsouploader.h +++ b/subprojects/gst-plugins-good/ext/soup/gstsouploader.h @@ -99,6 +99,12 @@ void _soup_auth_authenticate (SoupAuth *auth, const char *username, const char *_soup_message_get_method (SoupMessage *msg); +void _soup_session_send_async (SoupSession *session, + SoupMessage *msg, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + GInputStream *_soup_session_send_finish (SoupSession *session, GAsyncResult *result, GError **error);