souphttpclientsink: Add the retry and retry-delay properties

These allow a failed request to be retried after the given number of seconds
instead of failing the pipeline. Take account of the Retry-After header if
present. Add retries parameter that controls the number of times an HTTP
request will be retried before failing.

https://bugzilla.gnome.org/show_bug.cgi?id=756318
This commit is contained in:
Graham Leggett 2015-10-11 22:07:54 +00:00 committed by Sebastian Dröge
parent 360a6509c7
commit af25e3cc93
2 changed files with 82 additions and 7 deletions

View file

@ -43,8 +43,6 @@
#include "gstsouphttpclientsink.h"
#include "gstsouputils.h"
#include <gst/glib-compat-private.h>
GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
#define GST_CAT_DEFAULT souphttpclientsink_dbg
@ -94,7 +92,9 @@ enum
PROP_PROXY_PW,
PROP_COOKIES,
PROP_SESSION,
PROP_SOUP_LOG_LEVEL
PROP_SOUP_LOG_LEVEL,
PROP_RETRY_DELAY,
PROP_RETRIES
};
#define DEFAULT_USER_AGENT "GStreamer souphttpclientsink "
@ -170,6 +170,14 @@ gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
g_object_class_install_property (gobject_class, PROP_COOKIES,
g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RETRY_DELAY,
g_param_spec_int ("retry-delay", "Retry Delay",
"Delay in seconds between retries after a failure", 1, G_MAXINT, 5,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RETRIES,
g_param_spec_int ("retries", "Retries",
"Maximum number of retries, zero to disable, -1 to retry forever",
-1, G_MAXINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSoupHttpClientSink::http-log-level:
*
@ -230,6 +238,8 @@ gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink)
souphttpsink->prop_session = NULL;
souphttpsink->timeout = 1;
souphttpsink->log_level = DEFAULT_SOUP_LOG_LEVEL;
souphttpsink->retry_delay = 5;
souphttpsink->retries = 0;
proxy = g_getenv ("http_proxy");
if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
GST_WARNING_OBJECT (souphttpsink,
@ -250,6 +260,7 @@ gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
souphttpsink->reason_phrase = NULL;
souphttpsink->status_code = 0;
souphttpsink->offset = 0;
souphttpsink->failures = 0;
g_list_free_full (souphttpsink->streamheader_buffers,
(GDestroyNotify) gst_buffer_unref);
@ -350,6 +361,12 @@ gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
case PROP_SOUP_LOG_LEVEL:
souphttpsink->log_level = g_value_get_enum (value);
break;
case PROP_RETRY_DELAY:
souphttpsink->retry_delay = g_value_get_int (value);
break;
case PROP_RETRIES:
souphttpsink->retries = g_value_get_int (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@ -405,6 +422,12 @@ gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
case PROP_SOUP_LOG_LEVEL:
g_value_set_enum (value, souphttpsink->log_level);
break;
case PROP_RETRY_DELAY:
g_value_set_int (value, souphttpsink->retry_delay);
break;
case PROP_RETRIES:
g_value_set_int (value, souphttpsink->retries);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
@ -585,8 +608,19 @@ gst_soup_http_client_sink_stop (GstBaseSink * sink)
g_object_unref (souphttpsink->session);
}
g_mutex_lock (&souphttpsink->mutex);
if (souphttpsink->timer) {
g_source_destroy (souphttpsink->timer);
g_source_unref (souphttpsink->timer);
souphttpsink->timer = NULL;
}
g_mutex_unlock (&souphttpsink->mutex);
if (souphttpsink->loop) {
g_main_loop_quit (souphttpsink->loop);
g_mutex_lock (&souphttpsink->mutex);
g_cond_signal (&souphttpsink->cond);
g_mutex_unlock (&souphttpsink->mutex);
g_thread_join (souphttpsink->thread);
g_main_loop_unref (souphttpsink->loop);
souphttpsink->loop = NULL;
@ -751,6 +785,11 @@ send_message (GstSoupHttpClientSink * souphttpsink)
{
g_mutex_lock (&souphttpsink->mutex);
send_message_locked (souphttpsink);
if (souphttpsink->timer) {
g_source_destroy (souphttpsink->timer);
g_source_unref (souphttpsink->timer);
souphttpsink->timer = NULL;
}
g_mutex_unlock (&souphttpsink->mutex);
return FALSE;
@ -769,8 +808,40 @@ callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
souphttpsink->message = NULL;
if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
souphttpsink->status_code = msg->status_code;
souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
souphttpsink->failures++;
if (souphttpsink->retries &&
(souphttpsink->retries < 0 ||
souphttpsink->retries >= souphttpsink->failures)) {
guint64 retry_delay;
const char *retry_after =
soup_message_headers_get_one (msg->response_headers,
"Retry-After");
if (retry_after) {
gchar *end = NULL;
retry_delay = g_ascii_strtoull (retry_after, &end, 10);
if (end || errno) {
retry_delay = souphttpsink->retry_delay;
} else {
retry_delay = MAX (retry_delay, souphttpsink->retry_delay);
}
GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
"status: %d %s (retrying PUT after %" G_GINT64_FORMAT
" seconds with Retry-After: %s)", msg->status_code,
msg->reason_phrase, retry_delay, retry_after);
} else {
retry_delay = souphttpsink->retry_delay;
GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
"status: %d %s (retrying PUT after %" G_GINT64_FORMAT
" seconds)", msg->status_code, msg->reason_phrase, retry_delay);
}
souphttpsink->timer = g_timeout_source_new_seconds (retry_delay);
g_source_set_callback (souphttpsink->timer, (GSourceFunc) (send_message),
souphttpsink, NULL);
g_source_attach (souphttpsink->timer, souphttpsink->context);
} else {
souphttpsink->status_code = msg->status_code;
souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
}
g_mutex_unlock (&souphttpsink->mutex);
return;
}
@ -778,6 +849,7 @@ callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
g_list_free_full (souphttpsink->sent_buffers,
(GDestroyNotify) gst_buffer_unref);
souphttpsink->sent_buffers = NULL;
souphttpsink->failures = 0;
send_message_locked (souphttpsink);
g_mutex_unlock (&souphttpsink->mutex);
@ -791,10 +863,9 @@ gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
gboolean wake;
if (souphttpsink->status_code != 0) {
/* FIXME we should allow a moderate amount of retries. */
GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
("Could not write to HTTP URI"),
("error: %d %s", souphttpsink->status_code,
("status: %d %s", souphttpsink->status_code,
souphttpsink->reason_phrase));
return GST_FLOW_ERROR;
}

View file

@ -43,6 +43,7 @@ struct _GstSoupHttpClientSink
GMainContext *context;
GMainLoop *loop;
GThread *thread;
GSource *timer;
SoupMessage *message;
SoupSession *session;
GList *queued_buffers;
@ -54,6 +55,7 @@ struct _GstSoupHttpClientSink
guint64 offset;
int timeout;
gint failures;
/* properties */
SoupSession *prop_session;
@ -67,6 +69,8 @@ struct _GstSoupHttpClientSink
gboolean automatic_redirect;
gchar **cookies;
SoupLoggerLogLevel log_level;
gint retry_delay;
gint retries;
};
struct _GstSoupHttpClientSinkClass