curlhttpsrc: fix various leaks and thread safety issues

curlhttpsrc uses a single thread running the
gst_curl_http_src_curl_multi_loop() function to handle receiving
data and messages from libcurl. Each instance of curlhttpsrc adds
an entry into a queue in GstCurlHttpSrcMultiTaskContext and waits
for the multi_loop to perform the HTTP request.

Valgrind has shown up race conditions and memory leaks:
1. gst_curl_http_src_change_state() does not wait for the multi_loop
   to complete before going to the NULL state, which means that
   an instance of GstCurlHttpSrc can be released while
   gst_curl_http_src_curl_multi_loop() still has a reference to it.
2. if multiple elements try to be removed from the queue at once,
   only the last one is deleted.
3. source->caps is leaked
4. curl multi_handle is leaked
5. leak of curl_handle if URI not set
6. leak of http_headers when reusing element
7. null pointer dereference in negotiate caps
8. double-free of the default user-agent string
9. leak of multi_task_context.task

This commit changes the logic so that each element has a connection
status, which is used by the multi_loop to decide when to remove an
element from its queue. An instance of curlhttpsrc will not enter
the NULL state until its reference has been removed from the queue.

When shutting down the curl multi loop, the memory allocated from the
call to curl_multi_init() is now released.

When gstadaptivedemux uses a URI source element, it will re-use
it for multiple requests, moving it between READY and PLAYING
between each request. curlhttpsrc was leaking the http_headers
structure in this use case.

The gst_curl_http_src_negotiate_caps() function extracts the
"response-headers" field from the http_headers, but did not check
that this field might be NULL.

If the user-agent property is set, the global user-agent string
was freed. This caused a double-free error if the user-agent is
ever set a second time during the execution of the process.

There are situations within curlhttpsrc where the code needs
both the global multi_task_context mutex and the per-element
buffer_mutex. To avoid deadlocks, it is vital that the order in
which these are requested is always the same. This commit modifies
the locking order to always be in the order:
 1. multi_task_context.task_rec_mutex
 2. buffer_mutex

Fixes #876
This commit is contained in:
Alex Ashley 2019-02-05 16:34:40 +00:00 committed by Tim-Philipp Müller
parent d2d912f34a
commit c2fe4e58ad
5 changed files with 303 additions and 230 deletions

View file

@ -60,7 +60,7 @@
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXY ((void *)0)
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYUSERNAME ((void *)0)
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYPASSWORD ((void *)0)
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT gst_curl_http_src_default_useragent
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "GStreamer curlhttpsrc libcurl"
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_ACCEPT_ENCODING FALSE
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION 1L
#define GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS -1

View file

@ -73,6 +73,46 @@
* </refsect2>
*/
/*
* Thread safety notes.
*
* GstCurlHttpSrc uses a single thread running the
* gst_curl_http_src_curl_multi_loop() function to handle receiving
* data and messages from libcurl. Each instance of GstCurlHttpSrc adds
* an entry into a queue in GstCurlHttpSrcMultiTaskContext and waits
* for the multi_loop to perform the HTTP request.
*
* When an instance of GstCurlHttpSrc wants to make a request (i.e.
* it has moved to the PLAYING state) it adds itself to the
* multi_task_context.queue list and signals the multi_loop task.
*
* Each instance of GstCurlHttpSrc uses buffer_mutex and buffer_cond
* to wait for gst_curl_http_src_curl_multi_loop() to perform the
* request and signal completion.
*
* Each instance of GstCurlHttpSrc is protected by the mutexes:
* 1. uri_mutex
* 2. buffer_mutex
*
* uri_mutex is used to protect access to the uri field.
*
* buffer_mutex is used to protect access to buffer_cond, state and
* connection_status.
*
* The gst_curl_http_src_curl_multi_loop() function uses the mutexes:
* 1. multi_task_context.task_rec_mutex
* 2. multi_task_context.mutex
*
* multi_task_context.task_rec_mutex is only used by GstTask.
*
* multi_task_context.mutex is used to protect access to queue and state
*
* To avoid deadlock, it is vital that if both multi_task_context.mutex
* and buffer_mutex are required, that they are locked in the order:
* 1. multi_task_context.mutex
* 2. buffer_mutex
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
@ -87,6 +127,35 @@ GST_DEBUG_CATEGORY_STATIC (gst_curl_http_src_debug);
#define GST_CAT_DEFAULT gst_curl_http_src_debug
GST_DEBUG_CATEGORY_STATIC (gst_curl_loop_debug);
enum
{
PROP_0,
PROP_URI,
PROP_USERNAME,
PROP_PASSWORD,
PROP_PROXYURI,
PROP_PROXYUSERNAME,
PROP_PROXYPASSWORD,
PROP_COOKIES,
PROP_USERAGENT,
PROP_HEADERS,
PROP_COMPRESS,
PROP_REDIRECT,
PROP_MAXREDIRECT,
PROP_KEEPALIVE,
PROP_TIMEOUT,
PROP_STRICT_SSL,
PROP_SSL_CA_FILE,
PROP_RETRIES,
PROP_CONNECTIONMAXTIME,
PROP_MAXCONCURRENT_SERVER,
PROP_MAXCONCURRENT_PROXY,
PROP_MAXCONCURRENT_GLOBAL,
PROP_HTTPVERSION,
PROP_IRADIO_MODE,
PROP_MAX
};
/*
* Make a source pad template to be able to kick out recv'd data
*/
@ -138,14 +207,15 @@ static size_t gst_curl_http_src_get_header (void *header, size_t size,
static size_t gst_curl_http_src_get_chunks (void *chunk, size_t size,
size_t nmemb, void *src);
static void gst_curl_http_src_request_remove (GstCurlHttpSrc * src);
static void gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src);
static char *gst_curl_http_src_strcasestr (const char *haystack,
const char *needle);
curl_version_info_data *gst_curl_http_src_curl_capabilities;
GstCurlHttpVersion pref_http_ver;
gchar *gst_curl_http_src_default_useragent;
static curl_version_info_data *gst_curl_http_src_curl_capabilities = NULL;
static GstCurlHttpVersion pref_http_ver;
#define GST_TYPE_CURL_HTTP_VERSION (gst_curl_http_version_get_type ())
static GType
gst_curl_http_version_get_type (void)
{
@ -237,10 +307,6 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
pref_http_ver = default_http_version;
}
gst_curl_http_src_default_useragent =
g_strdup_printf ("GStreamer curlhttpsrc libcurl/%s",
gst_curl_http_src_curl_capabilities->version);
gobject_class->set_property = gst_curl_http_src_set_property;
gobject_class->get_property = gst_curl_http_src_get_property;
gobject_class->finalize = gst_curl_http_src_finalize;
@ -285,7 +351,8 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
g_object_class_install_property (gobject_class, PROP_USERAGENT,
g_param_spec_string ("user-agent", "User-Agent",
"URI of resource requested", GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT,
"URI of resource requested",
GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/<curl-version>",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_COMPRESS,
@ -390,9 +457,13 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
__LINE__, NULL, "Testing the curl_multi_loop debugging prints");
#endif
klass->multi_task_context.task = NULL;
klass->multi_task_context.refcount = 0;
klass->multi_task_context.queue = NULL;
klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
klass->multi_task_context.multi_handle = NULL;
g_mutex_init (&klass->multi_task_context.mutex);
g_cond_init (&klass->multi_task_context.signal);
g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
gst_element_class_set_static_metadata (gstelement_class,
"HTTP Client Source using libcURL",
@ -410,8 +481,10 @@ gst_curl_http_src_set_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_URI:
g_mutex_lock (&source->uri_mutex);
g_free (source->uri);
source->uri = g_value_dup_string (value);
g_mutex_unlock (&source->uri_mutex);
break;
case PROP_USERNAME:
g_free (source->username);
@ -447,7 +520,9 @@ gst_curl_http_src_set_property (GObject * object, guint prop_id,
const GstStructure *s = gst_value_get_structure (value);
if (source->request_headers)
gst_structure_free (source->request_headers);
source->request_headers = s ? gst_structure_copy (s) : NULL;
source->request_headers =
s ? gst_structure_copy (s) :
gst_structure_new_empty (REQUEST_HEADERS_NAME);
}
break;
case PROP_COMPRESS:
@ -505,7 +580,9 @@ gst_curl_http_src_get_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_URI:
g_mutex_lock (&source->uri_mutex);
g_value_set_string (value, source->uri);
g_mutex_unlock (&source->uri_mutex);
break;
case PROP_USERNAME:
g_value_set_string (value, source->username);
@ -591,9 +668,12 @@ gst_curl_http_src_init (GstCurlHttpSrc * source)
source->proxy_user = NULL;
source->proxy_pass = NULL;
source->cookies = NULL;
source->user_agent = GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT;
g_assert (gst_curl_http_src_curl_capabilities != NULL);
source->user_agent =
g_strdup_printf (GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/%s",
gst_curl_http_src_curl_capabilities->version);
source->number_cookies = 0;
source->request_headers = NULL;
source->request_headers = gst_structure_new_empty (REQUEST_HEADERS_NAME);
source->allow_3xx_redirect = GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION;
source->max_3xx_redirects = GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS;
source->keep_alive = GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE;
@ -608,8 +688,8 @@ gst_curl_http_src_init (GstCurlHttpSrc * source)
source->total_retries = GSTCURL_HANDLE_DEFAULT_RETRIES;
source->retries_remaining = source->total_retries;
source->slist = NULL;
source->accept_compressed_encodings = FALSE;
gst_caps_replace (&source->caps, NULL);
gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE);
source->proxy_uri = g_strdup (g_getenv ("http_proxy"));
@ -617,18 +697,23 @@ gst_curl_http_src_init (GstCurlHttpSrc * source)
g_mutex_init (&source->uri_mutex);
g_mutex_init (&source->buffer_mutex);
g_cond_init (&source->signal);
g_cond_init (&source->buffer_cond);
source->buffer = NULL;
source->buffer_len = 0;
source->state = GSTCURL_NONE;
source->pending_state = GSTCURL_NONE;
source->status_code = 0;
source->transfer_begun = FALSE;
source->data_received = FALSE;
source->connection_status = GSTCURL_NOT_CONNECTED;
source->http_headers = NULL;
source->content_type = NULL;
source->status_code = 0;
source->hdrs_updated = FALSE;
source->curl_result = CURLE_OK;
gst_caps_replace (&source->caps, NULL);
GSTCURL_FUNCTION_EXIT (source);
}
@ -666,6 +751,8 @@ gst_curl_http_src_ref_multi (GstCurlHttpSrc * src)
#endif
/* Start the thread */
g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
klass->multi_task_context.task = gst_task_new (
(GstTaskFunction) gst_curl_http_src_curl_multi_loop,
(gpointer) & klass->multi_task_context, NULL);
@ -709,13 +796,20 @@ gst_curl_http_src_unref_multi (GstCurlHttpSrc * src)
GST_INFO_OBJECT (src, "Closing instance, worker thread refcount is now %u",
klass->multi_task_context.refcount);
if (klass->multi_task_context.refcount <= 0) {
if (klass->multi_task_context.refcount == 0) {
/* Everything's done! Clean up. */
gst_task_pause (klass->multi_task_context.task);
gst_task_stop (klass->multi_task_context.task);
klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
g_cond_signal (&klass->multi_task_context.signal);
g_mutex_unlock (&klass->multi_task_context.mutex);
GST_DEBUG_OBJECT (src, "Joining curl_multi_loop task...");
gst_task_join (klass->multi_task_context.task);
gst_object_unref (klass->multi_task_context.task);
klass->multi_task_context.task = NULL;
curl_multi_cleanup (klass->multi_task_context.multi_handle);
klass->multi_task_context.multi_handle = NULL;
g_rec_mutex_clear (&klass->multi_task_context.task_rec_mutex);
GST_DEBUG_OBJECT (src, "multi_task_context cleanup complete");
} else {
g_mutex_unlock (&klass->multi_task_context.mutex);
}
@ -734,6 +828,9 @@ gst_curl_http_src_finalize (GObject * obj)
gst_curl_http_src_cleanup_instance (src);
GSTCURL_FUNCTION_EXIT (src);
/* Chain up to parent class */
G_OBJECT_CLASS (gst_curl_http_src_parent_class)->finalize (obj);
}
/*
@ -748,20 +845,25 @@ gst_curl_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GstCurlHttpSrc *src = GST_CURLHTTPSRC (psrc);
GstCurlHttpSrcClass *klass;
GstStructure *empty_headers;
GstBaseSrc *basesrc;
GSTCURL_FUNCTION_ENTRY (src);
klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
GstCurlHttpSrcClass);
basesrc = GST_BASE_SRC_CAST (src);
GSTCURL_FUNCTION_ENTRY (src);
retry:
ret = GST_FLOW_OK;
/* NOTE: when both the buffer_mutex and multi_task_context.mutex are
needed, multi_task_context.mutex must be acquired first */
g_mutex_lock (&klass->multi_task_context.mutex);
g_mutex_lock (&src->buffer_mutex);
if (src->state == GSTCURL_UNLOCK) {
ret = GST_FLOW_FLUSHING;
goto escape;
}
retry:
if (!src->transfer_begun) {
GST_DEBUG_OBJECT (src, "Starting new request for URI %s", src->uri);
/* Create the Easy Handle and set up the session. */
@ -771,19 +873,14 @@ retry:
goto escape;
}
g_mutex_lock (&klass->multi_task_context.mutex);
if (gst_curl_http_src_add_queue_item (&klass->multi_task_context.queue, src)
== FALSE) {
GST_ERROR_OBJECT (src, "Couldn't create new queue item! Aborting...");
ret = GST_FLOW_ERROR;
goto escape;
}
/* Signal the worker thread */
klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT;
g_cond_signal (&klass->multi_task_context.signal);
g_mutex_unlock (&klass->multi_task_context.mutex);
src->state = GSTCURL_OK;
src->transfer_begun = TRUE;
@ -791,6 +888,9 @@ retry:
GST_DEBUG_OBJECT (src, "Submitted request for URI %s to curl", src->uri);
if (src->http_headers != NULL) {
gst_structure_free (src->http_headers);
}
empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
URI_NAME, G_TYPE_STRING, src->uri,
@ -800,9 +900,12 @@ retry:
GST_INFO_OBJECT (src, "Created a new headers object");
}
g_mutex_unlock (&klass->multi_task_context.mutex);
/* Wait for data to become available, then punt it downstream */
while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)) {
g_cond_wait (&src->signal, &src->buffer_mutex);
while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)
&& (src->connection_status == GSTCURL_CONNECTED)) {
g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
}
if (src->state == GSTCURL_UNLOCK) {
@ -811,14 +914,16 @@ retry:
src->buffer = NULL;
src->buffer_len = 0;
}
ret = GST_FLOW_FLUSHING;
goto escape;
g_mutex_unlock (&src->buffer_mutex);
return GST_FLOW_FLUSHING;
}
ret = gst_curl_http_src_handle_response (src);
switch (ret) {
case GST_FLOW_ERROR:
goto escape; /* Don't attempt a retry, just bomb out */
/* Don't attempt a retry, just bomb out */
g_mutex_unlock (&src->buffer_mutex);
return ret;
case GST_FLOW_CUSTOM_ERROR:
if (src->data_received == TRUE) {
/*
@ -830,14 +935,14 @@ retry:
*/
GST_WARNING_OBJECT (src,
"Failed mid-transfer, can't continue for URI %s", src->uri);
ret = GST_FLOW_ERROR;
goto escape;
g_mutex_unlock (&src->buffer_mutex);
return GST_FLOW_ERROR;
}
src->retries_remaining--;
if (src->retries_remaining == 0) {
GST_WARNING_OBJECT (src, "Out of retries for URI %s", src->uri);
ret = GST_FLOW_ERROR; /* Don't attempt a retry, just bomb out */
goto escape;
g_mutex_unlock (&src->buffer_mutex);
return GST_FLOW_ERROR; /* Don't attempt a retry, just bomb out */
}
GST_INFO_OBJECT (src, "Attempting retry for URI %s", src->uri);
src->state = GSTCURL_NONE;
@ -903,9 +1008,13 @@ retry:
GST_ERROR_OBJECT (src, "Unknown state of %u", src->state);
}
}
g_mutex_unlock (&src->buffer_mutex);
GSTCURL_FUNCTION_EXIT (src);
return ret;
escape:
g_mutex_unlock (&src->buffer_mutex);
g_mutex_unlock (&klass->multi_task_context.mutex);
GSTCURL_FUNCTION_EXIT (src);
return ret;
@ -942,6 +1051,13 @@ gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
gint i;
GSTCURL_FUNCTION_ENTRY (s);
/* This is mandatory and yet not default option, so if this is NULL
* then something very bad is going on. */
if (s->uri == NULL) {
GST_ERROR_OBJECT (s, "No URI for curl!");
return NULL;
}
handle = curl_easy_init ();
if (handle == NULL) {
GST_ERROR_OBJECT (s, "Couldn't init a curl easy handle!");
@ -949,14 +1065,7 @@ gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
}
GST_INFO_OBJECT (s, "Creating a new handle for URI %s", s->uri);
/* This is mandatory and yet not default option, so if this is NULL
* then something very bad is going on. */
if (s->uri == NULL) {
GST_ERROR_OBJECT (s, "No URI for curl!");
return NULL;
}
gst_curl_setopt_str (s, handle, CURLOPT_URL, s->uri);
gst_curl_setopt_str (s, handle, CURLOPT_USERNAME, s->username);
gst_curl_setopt_str (s, handle, CURLOPT_PASSWORD, s->password);
gst_curl_setopt_str (s, handle, CURLOPT_PROXY, s->proxy_uri);
@ -1164,7 +1273,6 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
RESPONSE_HEADERS_NAME);
if (gst_structure_n_fields (gst_value_get_structure (response_headers)) > 0) {
GstEvent *hdrs_event;
GstStructure *empty_headers;
gst_element_post_message (GST_ELEMENT_CAST (src),
gst_message_new_element (GST_OBJECT_CAST (src),
@ -1172,15 +1280,9 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
/* gst_event_new_custom takes ownership of our structure */
hdrs_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
src->http_headers);
gst_structure_copy (src->http_headers));
gst_pad_push_event (GST_BASE_SRC_PAD (src), hdrs_event);
GST_INFO_OBJECT (src, "Pushed headers downstream");
empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
URI_NAME, G_TYPE_STRING, src->uri,
REQUEST_HEADERS_NAME, GST_TYPE_STRUCTURE, src->request_headers,
RESPONSE_HEADERS_NAME, GST_TYPE_STRUCTURE, empty_headers, NULL);
gst_structure_free (empty_headers);
}
src->hdrs_updated = FALSE;
@ -1198,29 +1300,28 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
static gboolean
gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src)
{
const GValue *response_headers;
const GstStructure *response_struct;
GST_INFO_OBJECT (src, "Negotiating caps...");
if (src->caps && src->http_headers) {
const GValue *response_headers = gst_structure_get_value (src->http_headers,
RESPONSE_HEADERS_NAME);
if (gst_structure_has_field (gst_value_get_structure (response_headers),
"content-type") == TRUE) {
const GValue *gv_content_type =
gst_structure_get_value (gst_value_get_structure (response_headers),
"content-type");
if (G_VALUE_HOLDS_STRING (gv_content_type) == TRUE) {
const gchar *content_type = g_value_get_string (gv_content_type);
GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s",
content_type);
src->caps = gst_caps_make_writable (src->caps);
gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
content_type, NULL);
if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
GST_ERROR_OBJECT (src, "Setting caps failed!");
return FALSE;
}
} else {
GST_ERROR_OBJECT (src, "Content Type doesn't contain expected string");
response_headers =
gst_structure_get_value (src->http_headers, RESPONSE_HEADERS_NAME);
if (!response_headers) {
GST_WARNING_OBJECT (src, "Failed to get %s", RESPONSE_HEADERS_NAME);
return FALSE;
}
response_struct = gst_value_get_structure (response_headers);
if (gst_structure_has_field_typed (response_struct, "content-type",
G_TYPE_STRING)) {
const gchar *content_type =
gst_structure_get_string (response_struct, "content-type");
GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s", content_type);
src->caps = gst_caps_make_writable (src->caps);
gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
content_type, NULL);
if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
GST_ERROR_OBJECT (src, "Setting caps failed!");
return FALSE;
}
}
@ -1268,8 +1369,10 @@ gst_curl_http_src_change_state (GstElement * element, GstStateChange transition)
}
break;
case GST_STATE_CHANGE_READY_TO_NULL:
/* The pipeline has ended, so signal any running request to end. */
gst_curl_http_src_request_remove (source);
GST_DEBUG_OBJECT (source, "Removing from multi_loop queue...");
/* The pipeline has ended, so signal any running request to end
and wait until the multi_loop has stopped using this element */
gst_curl_http_src_wait_until_removed (source);
gst_curl_http_src_unref_multi (source);
break;
default:
@ -1314,17 +1417,25 @@ gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src)
g_free (src->cookies);
src->cookies = NULL;
g_free (src->user_agent);
src->user_agent = NULL;
g_mutex_clear (&src->buffer_mutex);
g_cond_clear (&src->signal);
g_cond_clear (&src->buffer_cond);
g_free (src->buffer);
src->buffer = NULL;
if (src->request_headers) {
gst_structure_free (src->request_headers);
src->request_headers = NULL;
}
if (src->http_headers != NULL) {
gst_structure_free (src->http_headers);
src->http_headers = NULL;
}
gst_caps_replace (&src->caps, NULL);
gst_curl_http_src_destroy_easy_handle (src);
}
@ -1338,10 +1449,12 @@ gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query)
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_URI:
g_mutex_lock (&src->uri_mutex);
gst_query_set_uri (query, src->uri);
if (src->redirect_uri != NULL) {
gst_query_set_uri_redirection (query, src->redirect_uri);
}
g_mutex_unlock (&src->uri_mutex);
ret = TRUE;
break;
default:
@ -1366,22 +1479,17 @@ gst_curl_http_src_get_content_length (GstBaseSrc * bsrc, guint64 * size)
response_headers = gst_structure_get_value (src->http_headers,
RESPONSE_HEADERS_NAME);
if (gst_structure_has_field (gst_value_get_structure (response_headers),
"content-length") == TRUE) {
const GValue *content_length =
gst_structure_get_value (gst_value_get_structure (response_headers),
if (gst_structure_has_field_typed (gst_value_get_structure (response_headers),
"content-length", G_TYPE_STRING)) {
const gchar *content_length =
gst_structure_get_string (gst_value_get_structure (response_headers),
"content-length");
if (G_VALUE_HOLDS_STRING (content_length) == TRUE) {
const gchar *len = g_value_get_string (content_length);
*size = (guint64) g_ascii_strtoull (len, NULL, 10);
ret = TRUE;
} else {
GST_ERROR_OBJECT (src, "Content Length doesn't contain expected string");
}
*size = (guint64) g_ascii_strtoull (content_length, NULL, 10);
ret = TRUE;
} else {
GST_DEBUG_OBJECT (src,
"No content length has yet been set, or there was an error!");
}
GST_DEBUG_OBJECT (src,
"No content length has yet been set, or there was an error!");
return ret;
}
@ -1449,6 +1557,7 @@ gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
source->uri = g_strdup (uri);
if (source->uri == NULL) {
g_mutex_unlock (&source->uri_mutex);
return FALSE;
}
source->retries_remaining = source->total_retries;
@ -1467,19 +1576,32 @@ static gboolean
gst_curl_http_src_unlock (GstBaseSrc * bsrc)
{
GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
gboolean want_removal = FALSE;
g_mutex_lock (&src->buffer_mutex);
if (src->state != GSTCURL_UNLOCK) {
if (src->state == GSTCURL_OK) {
/* A transfer is running, cancel it */
gst_curl_http_src_request_remove (src);
if (src->connection_status == GSTCURL_CONNECTED) {
src->connection_status = GSTCURL_WANT_REMOVAL;
}
want_removal = TRUE;
}
src->pending_state = src->state;
src->state = GSTCURL_UNLOCK;
}
g_cond_signal (&src->signal);
g_cond_signal (&src->buffer_cond);
g_mutex_unlock (&src->buffer_mutex);
if (want_removal) {
GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
GST_TYPE_CURL_HTTP_SRC,
GstCurlHttpSrcClass);
g_mutex_lock (&klass->multi_task_context.mutex);
g_cond_signal (&klass->multi_task_context.signal);
g_mutex_unlock (&klass->multi_task_context.mutex);
}
return TRUE;
}
@ -1496,7 +1618,7 @@ gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc)
g_mutex_lock (&src->buffer_mutex);
src->state = src->pending_state;
src->pending_state = GSTCURL_NONE;
g_cond_signal (&src->signal);
g_cond_signal (&src->buffer_cond);
g_mutex_unlock (&src->buffer_mutex);
return TRUE;
@ -1510,9 +1632,10 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
{
GstCurlHttpSrcMultiTaskContext *context;
GstCurlHttpSrcQueueElement *qelement, *qnext;
int i, still_running;
gboolean cond = FALSE;
gint i, still_running = 0;
CURLMsg *curl_message;
GstCurlHttpSrc *elt;
guint active = 0;
context = (GstCurlHttpSrcMultiTaskContext *) thread_data;
@ -1521,49 +1644,60 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
/* Someone is holding a reference to us, but isn't using us so to avoid
* unnecessary clock cycle wasting, sit in a conditional wait until woken.
*/
while (context->state == GSTCURL_MULTI_LOOP_STATE_WAIT) {
GSTCURL_DEBUG_PRINT ("Entering wait state...");
while (context->queue == NULL
&& context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
GSTCURL_DEBUG_PRINT ("Waiting for an element to be added...");
g_cond_wait (&context->signal, &context->mutex);
GSTCURL_DEBUG_PRINT ("Received wake up call!");
}
if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
GSTCURL_INFO_PRINT ("Got instruction to shut down");
goto out;
}
if (context->state == GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT) {
GSTCURL_DEBUG_PRINT ("Received a new item on the queue!");
if (context->queue == NULL) {
GSTCURL_ERROR_PRINT ("Request Queue was empty on a Queue Event!");
context->state = GSTCURL_MULTI_LOOP_STATE_WAIT;
return;
}
/*
* Use the running mutex to lock access to each element, as the
* mutex's memory barriers stop cache optimisations from meaning
* flag values can't be trusted. The trylock will only let us in
* once and should fail immediately prior.
*/
qelement = context->queue;
while (qelement != NULL) {
if (g_mutex_trylock (&qelement->running) == TRUE) {
/* check for elements that need to be started or removed */
qelement = context->queue;
while (qelement != NULL) {
qnext = qelement->next;
elt = qelement->p;
/* NOTE: when both the buffer_mutex and multi_task_context.mutex are
needed, multi_task_context.mutex must be acquired first */
g_mutex_lock (&elt->buffer_mutex);
if (elt->connection_status == GSTCURL_WANT_REMOVAL) {
curl_multi_remove_handle (context->multi_handle, elt->curl_handle);
if (elt->state == GSTCURL_UNLOCK) {
elt->pending_state = GSTCURL_REMOVED;
} else {
elt->state = GSTCURL_REMOVED;
}
elt->connection_status = GSTCURL_NOT_CONNECTED;
gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
g_cond_signal (&elt->buffer_cond);
} else if (elt->connection_status == GSTCURL_CONNECTED) {
active++;
if (g_atomic_int_compare_and_exchange (&qelement->running, 0, 1)) {
GSTCURL_DEBUG_PRINT ("Adding easy handle for URI %s", qelement->p->uri);
cond = TRUE;
curl_multi_add_handle (context->multi_handle, qelement->p->curl_handle);
}
qelement = qelement->next;
}
g_mutex_unlock (&elt->buffer_mutex);
qelement = qnext;
}
if (cond != TRUE) {
GSTCURL_WARNING_PRINT ("All curl handles already added for QUEUE_EVENT!");
} else {
GSTCURL_DEBUG_PRINT ("Finished adding all handles, continuing.");
context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
}
g_mutex_unlock (&context->mutex);
} else if (context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
if (active == 0) {
GSTCURL_DEBUG_PRINT ("No active elements");
goto out;
}
/* perform a select() on all of the active sockets and process any
messages from curl */
{
struct timeval timeout;
gint rc;
fd_set fdread, fdwrite, fdexcep;
int maxfd = -1;
long curl_timeo = -1;
gboolean cond = FALSE;
/* Because curl can possibly take some time here, be nice and let go of the
* mutex so other threads can perform state/queue operations as we don't
@ -1604,6 +1738,8 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
break;
}
g_mutex_lock (&context->mutex);
/*
* Check the CURL message buffer to find out if any transfers have
* completed. If they have, call the signal_finished function which
@ -1617,73 +1753,17 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
} else if (curl_message->msg == CURLMSG_DONE) {
/* A hack, but I have seen curl_message->easy_handle being
* NULL randomly, so check for that. */
g_mutex_lock (&context->mutex);
if (curl_message->easy_handle == NULL) {
break;
if (curl_message->easy_handle != NULL) {
curl_multi_remove_handle (context->multi_handle,
curl_message->easy_handle);
gst_curl_http_src_remove_queue_handle (&context->queue,
curl_message->easy_handle, curl_message->data.result);
}
curl_multi_remove_handle (context->multi_handle,
curl_message->easy_handle);
gst_curl_http_src_remove_queue_handle (&context->queue,
curl_message->easy_handle, curl_message->data.result);
g_mutex_unlock (&context->mutex);
}
}
if (still_running == 0) {
/* We've finished processing, so set the state to wait.
*
* This is a little more complex, as we need to catch the edge
* case of another thread adding a queue item while we've been
* working.
*/
g_mutex_lock (&context->mutex);
if ((context->state != GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT) &&
(context->state != GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL)) {
context->state = GSTCURL_MULTI_LOOP_STATE_WAIT;
}
g_mutex_unlock (&context->mutex);
}
}
/* Is the following even necessary any more...? */
else if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
g_mutex_unlock (&context->mutex);
/* Something wants us to shut down, so best to do a full cleanup as it
* might be that something's gone bang.
*/
/*gst_curl_http_src_unref_multi (NULL, GSTCURL_RETURN_PIPELINE_NULL, TRUE); */
GSTCURL_INFO_PRINT ("Got instruction to shut down");
} else if (context->state == GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL) {
qelement = context->queue;
while (qelement != NULL) {
qnext = qelement->next;
if (qelement->p == context->request_removal_element) {
g_mutex_lock (&qelement->p->buffer_mutex);
curl_multi_remove_handle (context->multi_handle,
context->request_removal_element->curl_handle);
if (qelement->p->state == GSTCURL_UNLOCK) {
qelement->p->pending_state = GSTCURL_REMOVED;
} else {
qelement->p->state = GSTCURL_REMOVED;
}
g_cond_signal (&qelement->p->signal);
g_mutex_unlock (&qelement->p->buffer_mutex);
gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
}
qelement = qnext;
}
context->request_removal_element = NULL;
context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
g_mutex_unlock (&context->mutex);
} else {
GSTCURL_WARNING_PRINT ("Curl Loop State was invalid or unsupported");
GSTCURL_WARNING_PRINT ("Signal State is %d, resetting to RUNNING.",
context->state);
/* Reset to running, so if there isn't anything to do it'll be
* changed the WAIT once curl_multi_perform says it has no active
* handles. */
context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
g_mutex_unlock (&context->mutex);
}
out:
g_mutex_unlock (&context->mutex);
}
/*
@ -1763,8 +1843,8 @@ gst_curl_http_src_get_header (void *header, size_t size, size_t nmemb,
/* If header field already exists, append to the end */
if (gst_structure_has_field (response_headers, header_key) == TRUE) {
header_value = g_strdup_printf ("%s, %s",
g_value_get_string (gst_structure_get_value (response_headers,
header_key)), header_tpl[1]);
gst_structure_get_string (response_headers, header_key),
header_tpl[1]);
gst_structure_set ((GstStructure *) response_headers, header_key,
G_TYPE_STRING, header_value, NULL);
g_free (header_value);
@ -1850,7 +1930,7 @@ gst_curl_http_src_get_chunks (void *chunk, size_t size, size_t nmemb, void *src)
}
memcpy (s->buffer + s->buffer_len, chunk, chunk_len);
s->buffer_len += chunk_len;
g_cond_signal (&s->signal);
g_cond_signal (&s->buffer_cond);
g_mutex_unlock (&s->buffer_mutex);
return chunk_len;
}
@ -1864,10 +1944,29 @@ gst_curl_http_src_request_remove (GstCurlHttpSrc * src)
GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
GST_TYPE_CURL_HTTP_SRC,
GstCurlHttpSrcClass);
g_mutex_lock (&klass->multi_task_context.mutex);
klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL;
klass->multi_task_context.request_removal_element = src;
g_mutex_lock (&klass->multi_task_context.mutex);
g_mutex_lock (&src->buffer_mutex);
if (src->connection_status == GSTCURL_CONNECTED) {
src->connection_status = GSTCURL_WANT_REMOVAL;
}
g_mutex_unlock (&src->buffer_mutex);
g_cond_signal (&klass->multi_task_context.signal);
g_mutex_unlock (&klass->multi_task_context.mutex);
}
/*
* Request a cancellation of a currently running curl handle and
* block this thread until the src element has been removed
* from the queue
*/
static void
gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src)
{
gst_curl_http_src_request_remove (src);
g_mutex_lock (&src->buffer_mutex);
while (src->connection_status != GSTCURL_NOT_CONNECTED) {
g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
}
g_mutex_unlock (&src->buffer_mutex);
}

View file

@ -110,18 +110,12 @@ struct _GstCurlHttpSrcMultiTaskContext
guint refcount;
GCond signal;
GstCurlHttpSrc *request_removal_element;
GstCurlHttpSrcQueueElement *queue;
enum
{
GSTCURL_MULTI_LOOP_STATE_WAIT = 0,
GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT,
GSTCURL_MULTI_LOOP_STATE_RUNNING,
GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL,
GSTCURL_MULTI_LOOP_STATE_STOP,
GSTCURL_MULTI_LOOP_STATE_MAX
GSTCURL_MULTI_LOOP_STATE_STOP
} state;
/* < private > */
@ -200,11 +194,16 @@ struct _GstCurlHttpSrc
} state, pending_state;
CURL *curl_handle;
GMutex buffer_mutex;
GCond signal;
GCond buffer_cond;
gchar *buffer;
guint buffer_len;
gboolean transfer_begun;
gboolean data_received;
enum {
GSTCURL_NOT_CONNECTED,
GSTCURL_CONNECTED,
GSTCURL_WANT_REMOVAL
} connection_status;
/*
* Response Headers
@ -220,34 +219,6 @@ struct _GstCurlHttpSrc
GstCaps *caps;
};
enum
{
PROP_0,
PROP_URI,
PROP_USERNAME,
PROP_PASSWORD,
PROP_PROXYURI,
PROP_PROXYUSERNAME,
PROP_PROXYPASSWORD,
PROP_COOKIES,
PROP_USERAGENT,
PROP_HEADERS,
PROP_COMPRESS,
PROP_REDIRECT,
PROP_MAXREDIRECT,
PROP_KEEPALIVE,
PROP_TIMEOUT,
PROP_STRICT_SSL,
PROP_SSL_CA_FILE,
PROP_RETRIES,
PROP_CONNECTIONMAXTIME,
PROP_MAXCONCURRENT_SERVER,
PROP_MAXCONCURRENT_PROXY,
PROP_MAXCONCURRENT_GLOBAL,
PROP_HTTPVERSION,
PROP_MAX
};
GType gst_curl_http_src_get_type (void);
G_END_DECLS

View file

@ -83,8 +83,9 @@ gst_curl_http_src_add_queue_item (GstCurlHttpSrcQueueElement ** queue,
}
insert_point->p = s;
g_mutex_init (&insert_point->running);
g_atomic_int_set (&insert_point->running, 0);
insert_point->next = NULL;
s->connection_status = GSTCURL_CONNECTED;
return TRUE;
}
@ -127,6 +128,7 @@ gst_curl_http_src_remove_queue_item (GstCurlHttpSrcQueueElement ** queue,
prev_qelement->next = this_qelement->next;
}
g_free (this_qelement);
s->connection_status = GSTCURL_NOT_CONNECTED;
return TRUE;
}
@ -164,12 +166,13 @@ gst_curl_http_src_remove_queue_handle (GstCurlHttpSrcQueueElement ** queue,
this_qelement->p->uri); */
/* First, signal the transfer owner thread to wake up */
g_mutex_lock (&this_qelement->p->buffer_mutex);
g_cond_signal (&this_qelement->p->signal);
g_cond_signal (&this_qelement->p->buffer_cond);
if (this_qelement->p->state != GSTCURL_UNLOCK) {
this_qelement->p->state = GSTCURL_DONE;
} else {
this_qelement->p->pending_state = GSTCURL_DONE;
}
this_qelement->p->connection_status = GSTCURL_NOT_CONNECTED;
this_qelement->p->curl_result = result;
g_mutex_unlock (&this_qelement->p->buffer_mutex);

View file

@ -51,7 +51,7 @@
struct _GstCurlHttpSrcQueueElement
{
GstCurlHttpSrc *p;
GMutex running;
volatile gint running;
GstCurlHttpSrcQueueElement *next;
};