gstreamer/ext/soup/gstsouphttpclientsink.c
Sebastian Dröge bc2d4c4c31 souphttpclientsink: If no proxy is set by properties, use the default libsoup proxy resolver
That is, use whatever system settings there might exist. This is the same
behaviour we use in the HTTP source.
2016-04-15 11:36:36 +03:00

919 lines
30 KiB
C

/* GStreamer
* Copyright (C) 2011 David Schleef <ds@entropywave.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
* Boston, MA 02110-1335, USA.
*/
/**
* SECTION:element-gstsouphttpclientsink
*
* The souphttpclientsink element sends pipeline data to an HTTP server
* using HTTP PUT commands.
*
* <refsect2>
* <title>Example launch line</title>
* |[
* gst-launch-1.0 -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
* souphttpclientsink location=http://server/filename.ogv
* ]|
*
* This example encodes 10 seconds of video and sends it to the HTTP
* server "server" using HTTP PUT commands.
* </refsect2>
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include "gstsouphttpclientsink.h"
#include "gstsouputils.h"
GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
#define GST_CAT_DEFAULT souphttpclientsink_dbg
/* prototypes */
static void gst_soup_http_client_sink_set_property (GObject * object,
guint property_id, const GValue * value, GParamSpec * pspec);
static void gst_soup_http_client_sink_get_property (GObject * object,
guint property_id, GValue * value, GParamSpec * pspec);
static void gst_soup_http_client_sink_dispose (GObject * object);
static void gst_soup_http_client_sink_finalize (GObject * object);
static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
GstCaps * caps);
static void gst_soup_http_client_sink_get_times (GstBaseSink * sink,
GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
static gboolean gst_soup_http_client_sink_event (GstBaseSink * sink,
GstEvent * event);
static GstFlowReturn gst_soup_http_client_sink_preroll (GstBaseSink * sink,
GstBuffer * buffer);
static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
GstBuffer * buffer);
static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
souphttpsink);
static void authenticate (SoupSession * session, SoupMessage * msg,
SoupAuth * auth, gboolean retrying, gpointer user_data);
static void callback (SoupSession * session, SoupMessage * msg,
gpointer user_data);
static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink *
souphttpsink, const gchar * uri);
enum
{
PROP_0,
PROP_LOCATION,
PROP_USER_AGENT,
PROP_AUTOMATIC_REDIRECT,
PROP_PROXY,
PROP_USER_ID,
PROP_USER_PW,
PROP_PROXY_ID,
PROP_PROXY_PW,
PROP_COOKIES,
PROP_SESSION,
PROP_SOUP_LOG_LEVEL,
PROP_RETRY_DELAY,
PROP_RETRIES
};
#define DEFAULT_USER_AGENT "GStreamer souphttpclientsink "
#define DEFAULT_SOUP_LOG_LEVEL SOUP_LOGGER_LOG_NONE
/* pad templates */
static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
/* class initialization */
#define gst_soup_http_client_sink_parent_class parent_class
G_DEFINE_TYPE (GstSoupHttpClientSink, gst_soup_http_client_sink,
GST_TYPE_BASE_SINK);
static void
gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
gobject_class->set_property = gst_soup_http_client_sink_set_property;
gobject_class->get_property = gst_soup_http_client_sink_get_property;
gobject_class->dispose = gst_soup_http_client_sink_dispose;
gobject_class->finalize = gst_soup_http_client_sink_finalize;
g_object_class_install_property (gobject_class,
PROP_LOCATION,
g_param_spec_string ("location", "Location",
"URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_USER_AGENT,
g_param_spec_string ("user-agent", "User-Agent",
"Value of the User-Agent HTTP request header field",
DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_AUTOMATIC_REDIRECT,
g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
"Automatically follow HTTP redirects (HTTP Status Code 3xx)",
TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_PROXY,
g_param_spec_string ("proxy", "Proxy",
"HTTP proxy server URI", "",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_USER_ID,
g_param_spec_string ("user-id", "user-id",
"user id for authentication", "",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_USER_PW,
g_param_spec_string ("user-pw", "user-pw",
"user password for authentication", "",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_PROXY_ID,
g_param_spec_string ("proxy-id", "proxy-id",
"user id for proxy authentication", "",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_PROXY_PW,
g_param_spec_string ("proxy-pw", "proxy-pw",
"user password for proxy authentication", "",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_SESSION,
g_param_spec_object ("session", "session",
"SoupSession object to use for communication",
SOUP_TYPE_SESSION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_COOKIES,
g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RETRY_DELAY,
g_param_spec_int ("retry-delay", "Retry Delay",
"Delay in seconds between retries after a failure", 1, G_MAXINT, 5,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RETRIES,
g_param_spec_int ("retries", "Retries",
"Maximum number of retries, zero to disable, -1 to retry forever",
-1, G_MAXINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstSoupHttpClientSink::http-log-level:
*
* If set and > 0, captures and dumps HTTP session data as
* log messages if log level >= GST_LEVEL_TRACE
*
* Since: 1.4
*/
g_object_class_install_property (gobject_class, PROP_SOUP_LOG_LEVEL,
g_param_spec_enum ("http-log-level", "HTTP log level",
"Set log level for soup's HTTP session log",
SOUP_TYPE_LOGGER_LOG_LEVEL, DEFAULT_SOUP_LOG_LEVEL,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_static_pad_template (gstelement_class,
&gst_soup_http_client_sink_sink_template);
gst_element_class_set_static_metadata (gstelement_class, "HTTP client sink",
"Generic", "Sends streams to HTTP server via PUT",
"David Schleef <ds@entropywave.com>");
base_sink_class->set_caps =
GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
if (0)
base_sink_class->get_times =
GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_get_times);
base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
base_sink_class->unlock =
GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
base_sink_class->event = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_event);
if (0)
base_sink_class->preroll =
GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_preroll);
base_sink_class->render =
GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);
GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0,
"souphttpclientsink element");
}
static void
gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink)
{
const char *proxy;
g_mutex_init (&souphttpsink->mutex);
g_cond_init (&souphttpsink->cond);
souphttpsink->location = NULL;
souphttpsink->automatic_redirect = TRUE;
souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
souphttpsink->user_id = NULL;
souphttpsink->user_pw = NULL;
souphttpsink->proxy_id = NULL;
souphttpsink->proxy_pw = NULL;
souphttpsink->prop_session = NULL;
souphttpsink->timeout = 1;
souphttpsink->log_level = DEFAULT_SOUP_LOG_LEVEL;
souphttpsink->retry_delay = 5;
souphttpsink->retries = 0;
proxy = g_getenv ("http_proxy");
if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
GST_WARNING_OBJECT (souphttpsink,
"The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
proxy);
}
gst_soup_http_client_sink_reset (souphttpsink);
}
static void
gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
{
g_list_free_full (souphttpsink->queued_buffers,
(GDestroyNotify) gst_buffer_unref);
souphttpsink->queued_buffers = NULL;
g_free (souphttpsink->reason_phrase);
souphttpsink->reason_phrase = NULL;
souphttpsink->status_code = 0;
souphttpsink->offset = 0;
souphttpsink->failures = 0;
g_list_free_full (souphttpsink->streamheader_buffers,
(GDestroyNotify) gst_buffer_unref);
g_list_free_full (souphttpsink->sent_buffers,
(GDestroyNotify) gst_buffer_unref);
}
static gboolean
gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
const gchar * uri)
{
if (souphttpsink->proxy) {
soup_uri_free (souphttpsink->proxy);
souphttpsink->proxy = NULL;
}
if (g_str_has_prefix (uri, "http://")) {
souphttpsink->proxy = soup_uri_new (uri);
} else {
gchar *new_uri = g_strconcat ("http://", uri, NULL);
souphttpsink->proxy = soup_uri_new (new_uri);
g_free (new_uri);
}
return TRUE;
}
void
gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
const GValue * value, GParamSpec * pspec)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
g_mutex_lock (&souphttpsink->mutex);
switch (property_id) {
case PROP_SESSION:
if (souphttpsink->prop_session) {
g_object_unref (souphttpsink->prop_session);
}
souphttpsink->prop_session = g_value_dup_object (value);
break;
case PROP_LOCATION:
g_free (souphttpsink->location);
souphttpsink->location = g_value_dup_string (value);
souphttpsink->offset = 0;
if ((souphttpsink->location == NULL)
|| !gst_uri_is_valid (souphttpsink->location)) {
GST_WARNING_OBJECT (souphttpsink,
"The location (\"%s\") set, is not a valid uri.",
souphttpsink->location);
g_free (souphttpsink->location);
souphttpsink->location = NULL;
}
break;
case PROP_USER_AGENT:
g_free (souphttpsink->user_agent);
souphttpsink->user_agent = g_value_dup_string (value);
break;
case PROP_AUTOMATIC_REDIRECT:
souphttpsink->automatic_redirect = g_value_get_boolean (value);
break;
case PROP_USER_ID:
g_free (souphttpsink->user_id);
souphttpsink->user_id = g_value_dup_string (value);
break;
case PROP_USER_PW:
g_free (souphttpsink->user_pw);
souphttpsink->user_pw = g_value_dup_string (value);
break;
case PROP_PROXY_ID:
g_free (souphttpsink->proxy_id);
souphttpsink->proxy_id = g_value_dup_string (value);
break;
case PROP_PROXY_PW:
g_free (souphttpsink->proxy_pw);
souphttpsink->proxy_pw = g_value_dup_string (value);
break;
case PROP_PROXY:
{
const gchar *proxy;
proxy = g_value_get_string (value);
if (proxy == NULL) {
GST_WARNING ("proxy property cannot be NULL");
goto done;
}
if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
GST_WARNING ("badly formatted proxy URI");
goto done;
}
break;
}
case PROP_COOKIES:
g_strfreev (souphttpsink->cookies);
souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
break;
case PROP_SOUP_LOG_LEVEL:
souphttpsink->log_level = g_value_get_enum (value);
break;
case PROP_RETRY_DELAY:
souphttpsink->retry_delay = g_value_get_int (value);
break;
case PROP_RETRIES:
souphttpsink->retries = g_value_get_int (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
done:
g_mutex_unlock (&souphttpsink->mutex);
}
void
gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
GValue * value, GParamSpec * pspec)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
switch (property_id) {
case PROP_SESSION:
g_value_set_object (value, souphttpsink->prop_session);
break;
case PROP_LOCATION:
g_value_set_string (value, souphttpsink->location);
break;
case PROP_AUTOMATIC_REDIRECT:
g_value_set_boolean (value, souphttpsink->automatic_redirect);
break;
case PROP_USER_AGENT:
g_value_set_string (value, souphttpsink->user_agent);
break;
case PROP_USER_ID:
g_value_set_string (value, souphttpsink->user_id);
break;
case PROP_USER_PW:
g_value_set_string (value, souphttpsink->user_pw);
break;
case PROP_PROXY_ID:
g_value_set_string (value, souphttpsink->proxy_id);
break;
case PROP_PROXY_PW:
g_value_set_string (value, souphttpsink->proxy_pw);
break;
case PROP_PROXY:
if (souphttpsink->proxy == NULL)
g_value_set_static_string (value, "");
else {
char *proxy = soup_uri_to_string (souphttpsink->proxy, FALSE);
g_value_set_string (value, proxy);
g_free (proxy);
}
break;
case PROP_COOKIES:
g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
break;
case PROP_SOUP_LOG_LEVEL:
g_value_set_enum (value, souphttpsink->log_level);
break;
case PROP_RETRY_DELAY:
g_value_set_int (value, souphttpsink->retry_delay);
break;
case PROP_RETRIES:
g_value_set_int (value, souphttpsink->retries);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
void
gst_soup_http_client_sink_dispose (GObject * object)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
/* clean up as possible. may be called multiple times */
if (souphttpsink->prop_session)
g_object_unref (souphttpsink->prop_session);
souphttpsink->prop_session = NULL;
G_OBJECT_CLASS (parent_class)->dispose (object);
}
void
gst_soup_http_client_sink_finalize (GObject * object)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
/* clean up object here */
g_free (souphttpsink->user_agent);
g_free (souphttpsink->user_id);
g_free (souphttpsink->user_pw);
g_free (souphttpsink->proxy_id);
g_free (souphttpsink->proxy_pw);
if (souphttpsink->proxy)
soup_uri_free (souphttpsink->proxy);
g_free (souphttpsink->location);
g_strfreev (souphttpsink->cookies);
g_cond_clear (&souphttpsink->cond);
g_mutex_clear (&souphttpsink->mutex);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static gboolean
gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
GstStructure *structure;
const GValue *value_array;
int i, n;
GST_DEBUG_OBJECT (souphttpsink, "new stream headers set");
structure = gst_caps_get_structure (caps, 0);
value_array = gst_structure_get_value (structure, "streamheader");
if (value_array) {
g_list_free_full (souphttpsink->streamheader_buffers,
(GDestroyNotify) gst_buffer_unref);
souphttpsink->streamheader_buffers = NULL;
n = gst_value_array_get_size (value_array);
for (i = 0; i < n; i++) {
const GValue *value;
GstBuffer *buffer;
value = gst_value_array_get_value (value_array, i);
buffer = GST_BUFFER (gst_value_get_buffer (value));
souphttpsink->streamheader_buffers =
g_list_append (souphttpsink->streamheader_buffers,
gst_buffer_ref (buffer));
}
}
return TRUE;
}
static void
gst_soup_http_client_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
GstClockTime * start, GstClockTime * end)
{
}
static gboolean
thread_ready_idle_cb (gpointer data)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (data);
GST_LOG_OBJECT (souphttpsink, "thread ready");
g_mutex_lock (&souphttpsink->mutex);
g_cond_signal (&souphttpsink->cond);
g_mutex_unlock (&souphttpsink->mutex);
return FALSE; /* only run once */
}
static gpointer
thread_func (gpointer ptr)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);
GST_DEBUG ("thread start");
g_main_loop_run (souphttpsink->loop);
GST_DEBUG ("thread quit");
return NULL;
}
static gboolean
gst_soup_http_client_sink_start (GstBaseSink * sink)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
if (souphttpsink->prop_session) {
souphttpsink->session = souphttpsink->prop_session;
} else {
GSource *source;
GError *error = NULL;
souphttpsink->context = g_main_context_new ();
/* set up idle source to signal when the main loop is running and
* it's safe for ::stop() to call g_main_loop_quit() */
source = g_idle_source_new ();
g_source_set_callback (source, thread_ready_idle_cb, sink, NULL);
g_source_attach (source, souphttpsink->context);
g_source_unref (source);
souphttpsink->loop = g_main_loop_new (souphttpsink->context, TRUE);
g_mutex_lock (&souphttpsink->mutex);
souphttpsink->thread = g_thread_try_new ("souphttpclientsink-thread",
thread_func, souphttpsink, &error);
if (error != NULL) {
GST_DEBUG_OBJECT (souphttpsink, "failed to start thread, %s",
error->message);
g_error_free (error);
g_mutex_unlock (&souphttpsink->mutex);
return FALSE;
}
GST_LOG_OBJECT (souphttpsink, "waiting for main loop thread to start up");
g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
g_mutex_unlock (&souphttpsink->mutex);
GST_LOG_OBJECT (souphttpsink, "main loop thread running");
if (souphttpsink->proxy == NULL) {
souphttpsink->session =
soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
souphttpsink->context, SOUP_SESSION_USER_AGENT,
souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_PROXY_RESOLVER_DEFAULT,
NULL);
} else {
souphttpsink->session =
soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
souphttpsink->context, SOUP_SESSION_USER_AGENT,
souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
SOUP_SESSION_PROXY_URI, souphttpsink->proxy, NULL);
}
g_signal_connect (souphttpsink->session, "authenticate",
G_CALLBACK (authenticate), souphttpsink);
}
/* Set up logging */
gst_soup_util_log_setup (souphttpsink->session, souphttpsink->log_level,
GST_ELEMENT (souphttpsink));
return TRUE;
}
static gboolean
gst_soup_http_client_sink_stop (GstBaseSink * sink)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
GST_DEBUG ("stop");
if (souphttpsink->prop_session == NULL) {
soup_session_abort (souphttpsink->session);
g_object_unref (souphttpsink->session);
}
g_mutex_lock (&souphttpsink->mutex);
if (souphttpsink->timer) {
g_source_destroy (souphttpsink->timer);
g_source_unref (souphttpsink->timer);
souphttpsink->timer = NULL;
}
g_mutex_unlock (&souphttpsink->mutex);
if (souphttpsink->loop) {
g_main_loop_quit (souphttpsink->loop);
g_mutex_lock (&souphttpsink->mutex);
g_cond_signal (&souphttpsink->cond);
g_mutex_unlock (&souphttpsink->mutex);
g_thread_join (souphttpsink->thread);
g_main_loop_unref (souphttpsink->loop);
souphttpsink->loop = NULL;
}
if (souphttpsink->context) {
g_main_context_unref (souphttpsink->context);
souphttpsink->context = NULL;
}
gst_soup_http_client_sink_reset (souphttpsink);
return TRUE;
}
static gboolean
gst_soup_http_client_sink_unlock (GstBaseSink * sink)
{
GST_DEBUG ("unlock");
return TRUE;
}
static gboolean
gst_soup_http_client_sink_event (GstBaseSink * sink, GstEvent * event)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
GST_DEBUG_OBJECT (souphttpsink, "event");
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
GST_DEBUG_OBJECT (souphttpsink, "got eos");
g_mutex_lock (&souphttpsink->mutex);
while (souphttpsink->message) {
GST_DEBUG_OBJECT (souphttpsink, "waiting");
g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
}
g_mutex_unlock (&souphttpsink->mutex);
GST_DEBUG_OBJECT (souphttpsink, "finished eos");
}
return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
}
static GstFlowReturn
gst_soup_http_client_sink_preroll (GstBaseSink * sink, GstBuffer * buffer)
{
GST_DEBUG ("preroll");
return GST_FLOW_OK;
}
static void
send_message_locked (GstSoupHttpClientSink * souphttpsink)
{
GList *g;
guint64 n;
if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
return;
}
/* If the URI went away, drop all these buffers */
if (souphttpsink->location == NULL) {
GST_DEBUG_OBJECT (souphttpsink, "URI went away, dropping queued buffers");
g_list_free_full (souphttpsink->queued_buffers,
(GDestroyNotify) gst_buffer_unref);
souphttpsink->queued_buffers = NULL;
return;
}
souphttpsink->message = soup_message_new ("PUT", souphttpsink->location);
if (souphttpsink->message == NULL) {
GST_WARNING_OBJECT (souphttpsink,
"URI could not be parsed while creating message.");
g_list_free_full (souphttpsink->queued_buffers,
(GDestroyNotify) gst_buffer_unref);
souphttpsink->queued_buffers = NULL;
return;
}
soup_message_set_flags (souphttpsink->message,
(souphttpsink->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
if (souphttpsink->cookies) {
gchar **cookie;
for (cookie = souphttpsink->cookies; *cookie != NULL; cookie++) {
soup_message_headers_append (souphttpsink->message->request_headers,
"Cookie", *cookie);
}
}
n = 0;
if (souphttpsink->offset == 0) {
for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
GstBuffer *buffer = g->data;
GstMapInfo map;
GST_DEBUG_OBJECT (souphttpsink, "queueing stream headers");
gst_buffer_map (buffer, &map, GST_MAP_READ);
/* Stream headers are updated whenever ::set_caps is called, so there's
* no guarantees about their lifetime and we ask libsoup to copy them
* into the message body with SOUP_MEMORY_COPY. */
soup_message_body_append (souphttpsink->message->request_body,
SOUP_MEMORY_COPY, map.data, map.size);
n += map.size;
gst_buffer_unmap (buffer, &map);
}
}
for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
GstBuffer *buffer = g->data;
if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
GstMapInfo map;
gst_buffer_map (buffer, &map, GST_MAP_READ);
/* Queued buffers are only freed in the next iteration of the mainloop
* after the message body has been written out, so we don't need libsoup
* to copy those while appending to the body. However, if the buffer is
* used elsewhere, it should be copied. Hence, SOUP_MEMORY_TEMPORARY. */
soup_message_body_append (souphttpsink->message->request_body,
SOUP_MEMORY_TEMPORARY, map.data, map.size);
n += map.size;
gst_buffer_unmap (buffer, &map);
}
}
if (souphttpsink->offset != 0) {
char *s;
s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
souphttpsink->offset, souphttpsink->offset + n - 1);
soup_message_headers_append (souphttpsink->message->request_headers,
"Content-Range", s);
g_free (s);
}
if (n == 0) {
GST_DEBUG_OBJECT (souphttpsink,
"total size of buffers queued is 0, freeing everything");
g_list_free_full (souphttpsink->queued_buffers,
(GDestroyNotify) gst_buffer_unref);
souphttpsink->queued_buffers = NULL;
g_object_unref (souphttpsink->message);
souphttpsink->message = NULL;
return;
}
souphttpsink->sent_buffers = souphttpsink->queued_buffers;
souphttpsink->queued_buffers = NULL;
GST_DEBUG_OBJECT (souphttpsink,
"queue message %" G_GUINT64_FORMAT " %" G_GUINT64_FORMAT,
souphttpsink->offset, n);
soup_session_queue_message (souphttpsink->session, souphttpsink->message,
callback, souphttpsink);
souphttpsink->offset += n;
}
static gboolean
send_message (GstSoupHttpClientSink * souphttpsink)
{
g_mutex_lock (&souphttpsink->mutex);
send_message_locked (souphttpsink);
if (souphttpsink->timer) {
g_source_destroy (souphttpsink->timer);
g_source_unref (souphttpsink->timer);
souphttpsink->timer = NULL;
}
g_mutex_unlock (&souphttpsink->mutex);
return FALSE;
}
static void
callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
GST_DEBUG_OBJECT (souphttpsink, "callback status=%d %s",
msg->status_code, msg->reason_phrase);
g_mutex_lock (&souphttpsink->mutex);
g_cond_signal (&souphttpsink->cond);
souphttpsink->message = NULL;
if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
souphttpsink->failures++;
if (souphttpsink->retries &&
(souphttpsink->retries < 0 ||
souphttpsink->retries >= souphttpsink->failures)) {
guint64 retry_delay;
const char *retry_after =
soup_message_headers_get_one (msg->response_headers,
"Retry-After");
if (retry_after) {
gchar *end = NULL;
retry_delay = g_ascii_strtoull (retry_after, &end, 10);
if (end || errno) {
retry_delay = souphttpsink->retry_delay;
} else {
retry_delay = MAX (retry_delay, souphttpsink->retry_delay);
}
GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
"status: %d %s (retrying PUT after %" G_GINT64_FORMAT
" seconds with Retry-After: %s)", msg->status_code,
msg->reason_phrase, retry_delay, retry_after);
} else {
retry_delay = souphttpsink->retry_delay;
GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
"status: %d %s (retrying PUT after %" G_GINT64_FORMAT
" seconds)", msg->status_code, msg->reason_phrase, retry_delay);
}
souphttpsink->timer = g_timeout_source_new_seconds (retry_delay);
g_source_set_callback (souphttpsink->timer, (GSourceFunc) (send_message),
souphttpsink, NULL);
g_source_attach (souphttpsink->timer, souphttpsink->context);
} else {
souphttpsink->status_code = msg->status_code;
souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
}
g_mutex_unlock (&souphttpsink->mutex);
return;
}
g_list_free_full (souphttpsink->sent_buffers,
(GDestroyNotify) gst_buffer_unref);
souphttpsink->sent_buffers = NULL;
souphttpsink->failures = 0;
send_message_locked (souphttpsink);
g_mutex_unlock (&souphttpsink->mutex);
}
static GstFlowReturn
gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
GSource *source;
gboolean wake;
if (souphttpsink->status_code != 0) {
GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
("Could not write to HTTP URI"),
("status: %d %s", souphttpsink->status_code,
souphttpsink->reason_phrase));
return GST_FLOW_ERROR;
}
g_mutex_lock (&souphttpsink->mutex);
if (souphttpsink->location != NULL) {
wake = (souphttpsink->queued_buffers == NULL);
souphttpsink->queued_buffers =
g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));
if (wake) {
GST_DEBUG_OBJECT (souphttpsink, "setting callback for new buffers");
source = g_idle_source_new ();
g_source_set_callback (source, (GSourceFunc) (send_message),
souphttpsink, NULL);
g_source_attach (source, souphttpsink->context);
g_source_unref (source);
}
}
g_mutex_unlock (&souphttpsink->mutex);
return GST_FLOW_OK;
}
static void
authenticate (SoupSession * session, SoupMessage * msg,
SoupAuth * auth, gboolean retrying, gpointer user_data)
{
GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
if (!retrying) {
/* First time authentication only, if we fail and are called again with retry true fall through */
if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
if (souphttpsink->user_id && souphttpsink->user_pw)
soup_auth_authenticate (auth, souphttpsink->user_id,
souphttpsink->user_pw);
} else if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED) {
if (souphttpsink->proxy_id && souphttpsink->proxy_pw)
soup_auth_authenticate (auth, souphttpsink->proxy_id,
souphttpsink->proxy_pw);
}
}
}