soup: move libsoup session into its own thread

Starting with libsoup3, there is no attempt to handle thread safety
inside the library, and it was never considered fully safe before
either. Therefore, move all session handling into its own thread.

The libsoup thread has its own context and main loop. When some
request is made or a response needs to be read, an idle source
is created to issue that; the gstreamer thread issuing that waits
for that to be complete. There is a per-src condition variable to
deal with that.

Since the thread/loop needs to be longer-lived than the soup
session itself, a wrapper object is provided to contain them. The
soup session only has a single reference, owned by the wrapper
object.

It is no longer possible to force an external session, since this
does not seem to be used anywhere within gstreamer and would be
tricky to implement; this is because one would not have to provide
just a session, but also the complete thread arrangement made in
the same way as the system currently does internally, in order to
be safe.

Messages are still built gstreamer-side. It is safe to do so until
the message is sent on the session. Headers are also processed on
the gstreamer side, which should likewise be safe.

All requests as well as reads on the libsoup thread are issued
asynchronously. That allows libsoup to schedule things with as
little blocking as possible, and means that concurrent access
to the session is possible, when sharing the session.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/947

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1555>
This commit is contained in:
Daniel Kolesa 2022-01-21 16:09:30 +01:00 committed by GStreamer Marge Bot
parent d86288904f
commit 0bcefa7350
4 changed files with 607 additions and 234 deletions

File diff suppressed because it is too large Load diff

View file

@ -45,6 +45,9 @@ typedef enum {
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED, GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_CANCELLED,
} GstSoupHTTPSrcSessionIOStatus; } GstSoupHTTPSrcSessionIOStatus;
/* opaque from here, implementation detail */
typedef struct _GstSoupSession GstSoupSession;
struct _GstSoupHTTPSrc { struct _GstSoupHTTPSrc {
GstPushSrc element; GstPushSrc element;
@ -59,15 +62,15 @@ struct _GstSoupHTTPSrc {
gchar *proxy_id; /* Authentication user id for proxy URI. */ gchar *proxy_id; /* Authentication user id for proxy URI. */
gchar *proxy_pw; /* Authentication user password for proxy URI. */ gchar *proxy_pw; /* Authentication user password for proxy URI. */
gchar **cookies; /* HTTP request cookies. */ gchar **cookies; /* HTTP request cookies. */
SoupSession *session; /* Async context. */ GstSoupSession *session; /* Libsoup session wrapper. */
gboolean session_is_shared; gboolean session_is_shared;
SoupSession *external_session; /* Shared via GstContext */ GstSoupSession *external_session; /* Shared via GstContext */
gboolean forced_external_session; /* If session was explicitly set from application */
SoupMessage *msg; /* Request message. */ SoupMessage *msg; /* Request message. */
gint retry_count; /* Number of retries since we received data */ gint retry_count; /* Number of retries since we received data */
gint max_retries; /* Maximum number of retries */ gint max_retries; /* Maximum number of retries */
gchar *method; /* HTTP method */ gchar *method; /* HTTP method */
GstFlowReturn headers_ret;
gboolean got_headers; /* Already received headers from the server */ gboolean got_headers; /* Already received headers from the server */
gboolean have_size; /* Received and parsed Content-Length gboolean have_size; /* Received and parsed Content-Length
header. */ header. */
@ -111,8 +114,12 @@ struct _GstSoupHTTPSrc {
guint timeout; guint timeout;
GMutex mutex; /* This mutex-cond pair is used to talk to the soup session thread; it is
GCond have_headers_cond; * per src to allow concurrent access to shared sessions (if it was inside
* the shared session structure, it would be effectively global)
*/
GMutex session_mutex;
GCond session_cond;
GstEvent *http_headers_event; GstEvent *http_headers_event;

View file

@ -111,6 +111,10 @@ typedef struct _GstSoupVTable
void (*_soup_auth_authenticate) (SoupAuth * auth, const char *username, void (*_soup_auth_authenticate) (SoupAuth * auth, const char *username,
const char *password); const char *password);
const char *(*_soup_message_get_method_3) (SoupMessage * msg); const char *(*_soup_message_get_method_3) (SoupMessage * msg);
GInputStream *(*_soup_session_send_async_2) (SoupSession * session, SoupMessage * msg,
GCancellable * cancellable, GAsyncReadyCallback callback, gpointer user_data);
GInputStream *(*_soup_session_send_async_3) (SoupSession * session, SoupMessage * msg,
int io_priority, GCancellable * cancellable, GAsyncReadyCallback callback, gpointer user_data);
GInputStream *(*_soup_session_send_finish) (SoupSession * session, GInputStream *(*_soup_session_send_finish) (SoupSession * session,
GAsyncResult * result, GError ** error); GAsyncResult * result, GError ** error);
GInputStream *(*_soup_session_send) (SoupSession * session, SoupMessage * msg, GInputStream *(*_soup_session_send) (SoupSession * session, SoupMessage * msg,
@ -220,6 +224,7 @@ gst_soup_load_library (void)
LOAD_VERSIONED_SYMBOL (2, soup_uri_to_string); LOAD_VERSIONED_SYMBOL (2, soup_uri_to_string);
LOAD_VERSIONED_SYMBOL (2, soup_message_get_uri); LOAD_VERSIONED_SYMBOL (2, soup_message_get_uri);
LOAD_VERSIONED_SYMBOL (2, soup_session_cancel_message); LOAD_VERSIONED_SYMBOL (2, soup_session_cancel_message);
LOAD_VERSIONED_SYMBOL (2, soup_session_send_async);
} else { } else {
vtable->lib_version = 3; vtable->lib_version = 3;
LOAD_VERSIONED_SYMBOL (3, soup_logger_new); LOAD_VERSIONED_SYMBOL (3, soup_logger_new);
@ -232,6 +237,7 @@ gst_soup_load_library (void)
LOAD_VERSIONED_SYMBOL (3, soup_message_get_method); LOAD_VERSIONED_SYMBOL (3, soup_message_get_method);
LOAD_VERSIONED_SYMBOL (3, soup_message_get_reason_phrase); LOAD_VERSIONED_SYMBOL (3, soup_message_get_reason_phrase);
LOAD_VERSIONED_SYMBOL (3, soup_message_get_status); LOAD_VERSIONED_SYMBOL (3, soup_message_get_status);
LOAD_VERSIONED_SYMBOL (3, soup_session_send_async);
} }
LOAD_SYMBOL (soup_auth_authenticate); LOAD_SYMBOL (soup_auth_authenticate);
@ -795,6 +801,32 @@ _soup_message_get_method (SoupMessage * msg)
#endif #endif
} }
void
_soup_session_send_async (SoupSession * session, SoupMessage * msg,
GCancellable * cancellable, GAsyncReadyCallback callback,
gpointer user_data)
{
#ifdef STATIC_SOUP
#if STATIC_SOUP == 2
return soup_session_send_async (session, msg, cancellable,
callback, user_data);
#else
return soup_session_send_async (session, msg, G_PRIORITY_DEFAULT,
cancellable, callback, user_data);
#endif
#else
if (gst_soup_vtable.lib_version == 3) {
g_assert (gst_soup_vtable._soup_session_send_async_3 != NULL);
gst_soup_vtable._soup_session_send_async_3 (session, msg,
G_PRIORITY_DEFAULT, cancellable, callback, user_data);
} else {
g_assert (gst_soup_vtable._soup_session_send_async_2 != NULL);
gst_soup_vtable._soup_session_send_async_2 (session, msg,
cancellable, callback, user_data);
}
#endif
}
GInputStream * GInputStream *
_soup_session_send_finish (SoupSession * session, _soup_session_send_finish (SoupSession * session,
GAsyncResult * result, GError ** error) GAsyncResult * result, GError ** error)

View file

@ -99,6 +99,12 @@ void _soup_auth_authenticate (SoupAuth *auth, const char *username,
const char *_soup_message_get_method (SoupMessage *msg); const char *_soup_message_get_method (SoupMessage *msg);
void _soup_session_send_async (SoupSession *session,
SoupMessage *msg,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
GInputStream *_soup_session_send_finish (SoupSession *session, GInputStream *_soup_session_send_finish (SoupSession *session,
GAsyncResult *result, GError **error); GAsyncResult *result, GError **error);