/* GStreamer * Copyright (C) 2011 David Schleef * Copyright (C) 2021 Igalia S.L. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin Street, Suite 500, * Boston, MA 02110-1335, USA. */ /** * SECTION:element-gstsouphttpclientsink * @title: gstsouphttpclientsink * * The souphttpclientsink element sends pipeline data to an HTTP server * using HTTP PUT commands. * * ## Example launch line * |[ * gst-launch-1.0 -v videotestsrc num-buffers=300 ! theoraenc ! oggmux ! * souphttpclientsink location=http://server/filename.ogv * ]| * * This example encodes 10 seconds of video and sends it to the HTTP * server "server" using HTTP PUT commands. * */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include #include "gstsoupelements.h" #include "gstsouphttpclientsink.h" #include "gstsouputils.h" GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg); #define GST_CAT_DEFAULT souphttpclientsink_dbg /* prototypes */ static void gst_soup_http_client_sink_set_property (GObject * object, guint property_id, const GValue * value, GParamSpec * pspec); static void gst_soup_http_client_sink_get_property (GObject * object, guint property_id, GValue * value, GParamSpec * pspec); static void gst_soup_http_client_sink_dispose (GObject * object); static void gst_soup_http_client_sink_finalize (GObject * object); static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps); static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink); static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink); static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink); static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer); static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink); static gboolean authenticate (SoupMessage * msg, SoupAuth * auth, gboolean retrying, gpointer user_data); static void restarted (SoupMessage * msg, GBytes * body); static gboolean send_handle_status (SoupMessage * msg, GError * error, GstSoupHttpClientSink * sink); static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink, const gchar * uri); enum { PROP_0, PROP_LOCATION, PROP_USER_AGENT, PROP_AUTOMATIC_REDIRECT, PROP_PROXY, PROP_USER_ID, PROP_USER_PW, PROP_PROXY_ID, PROP_PROXY_PW, PROP_COOKIES, PROP_SESSION, PROP_SOUP_LOG_LEVEL, PROP_RETRY_DELAY, PROP_RETRIES }; #define DEFAULT_USER_AGENT "GStreamer souphttpclientsink " #define DEFAULT_SOUP_LOG_LEVEL SOUP_LOGGER_LOG_NONE /* pad templates */ static GstStaticPadTemplate gst_soup_http_client_sink_sink_template = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); /* class initialization */ #define gst_soup_http_client_sink_parent_class parent_class G_DEFINE_TYPE (GstSoupHttpClientSink, gst_soup_http_client_sink, GST_TYPE_BASE_SINK); static gboolean souphttpclientsink_element_init (GstPlugin * plugin); GST_ELEMENT_REGISTER_DEFINE_CUSTOM (souphttpclientsink, souphttpclientsink_element_init); static void gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass); gobject_class->set_property = gst_soup_http_client_sink_set_property; gobject_class->get_property = gst_soup_http_client_sink_get_property; gobject_class->dispose = gst_soup_http_client_sink_dispose; gobject_class->finalize = gst_soup_http_client_sink_finalize; g_object_class_install_property (gobject_class, PROP_LOCATION, g_param_spec_string ("location", "Location", "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_USER_AGENT, g_param_spec_string ("user-agent", "User-Agent", "Value of the User-Agent HTTP request header field", DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_AUTOMATIC_REDIRECT, g_param_spec_boolean ("automatic-redirect", "automatic-redirect", "Automatically follow HTTP redirects (HTTP Status Code 3xx)", TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_PROXY, g_param_spec_string ("proxy", "Proxy", "HTTP proxy server URI", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_USER_ID, g_param_spec_string ("user-id", "user-id", "user id for authentication", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_USER_PW, g_param_spec_string ("user-pw", "user-pw", "user password for authentication", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_PROXY_ID, g_param_spec_string ("proxy-id", "proxy-id", "user id for proxy authentication", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_PROXY_PW, g_param_spec_string ("proxy-pw", "proxy-pw", "user password for proxy authentication", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_SESSION, g_param_spec_object ("session", "session", "SoupSession object to use for communication", _soup_session_get_type (), G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_COOKIES, g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies", G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RETRY_DELAY, g_param_spec_int ("retry-delay", "Retry Delay", "Delay in seconds between retries after a failure", 1, G_MAXINT, 5, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_RETRIES, g_param_spec_int ("retries", "Retries", "Maximum number of retries, zero to disable, -1 to retry forever", -1, G_MAXINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstSoupHttpClientSink::http-log-level: * * If set and > 0, captures and dumps HTTP session data as * log messages if log level >= GST_LEVEL_TRACE * * Since: 1.4 */ g_object_class_install_property (gobject_class, PROP_SOUP_LOG_LEVEL, g_param_spec_enum ("http-log-level", "HTTP log level", "Set log level for soup's HTTP session log", _soup_logger_log_level_get_type (), DEFAULT_SOUP_LOG_LEVEL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gst_element_class_add_static_pad_template (gstelement_class, &gst_soup_http_client_sink_sink_template); gst_element_class_set_static_metadata (gstelement_class, "HTTP client sink", "Generic", "Sends streams to HTTP server via PUT", "David Schleef "); base_sink_class->set_caps = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps); base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start); base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop); base_sink_class->unlock = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock); base_sink_class->render = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render); } static void gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink) { const char *proxy; g_mutex_init (&souphttpsink->mutex); g_cond_init (&souphttpsink->cond); souphttpsink->location = NULL; souphttpsink->automatic_redirect = TRUE; souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT); souphttpsink->user_id = NULL; souphttpsink->user_pw = NULL; souphttpsink->proxy_id = NULL; souphttpsink->proxy_pw = NULL; souphttpsink->prop_session = NULL; souphttpsink->timeout = 1; souphttpsink->log_level = DEFAULT_SOUP_LOG_LEVEL; souphttpsink->retry_delay = 5; souphttpsink->retries = 0; souphttpsink->sent_buffers = NULL; proxy = g_getenv ("http_proxy"); if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) { GST_WARNING_OBJECT (souphttpsink, "The proxy in the http_proxy env var (\"%s\") cannot be parsed.", proxy); } gst_soup_http_client_sink_reset (souphttpsink); } static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink) { g_list_free_full (souphttpsink->queued_buffers, (GDestroyNotify) gst_buffer_unref); souphttpsink->queued_buffers = NULL; g_free (souphttpsink->reason_phrase); souphttpsink->reason_phrase = NULL; souphttpsink->status_code = 0; souphttpsink->offset = 0; souphttpsink->failures = 0; g_list_free_full (souphttpsink->streamheader_buffers, (GDestroyNotify) gst_buffer_unref); souphttpsink->streamheader_buffers = NULL; g_list_free_full (souphttpsink->sent_buffers, (GDestroyNotify) gst_buffer_unref); souphttpsink->sent_buffers = NULL; } static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink, const gchar * uri) { if (souphttpsink->proxy) { gst_soup_uri_free (souphttpsink->proxy); souphttpsink->proxy = NULL; } if (g_str_has_prefix (uri, "http://")) { souphttpsink->proxy = gst_soup_uri_new (uri); } else { gchar *new_uri = g_strconcat ("http://", uri, NULL); souphttpsink->proxy = gst_soup_uri_new (new_uri); g_free (new_uri); } return TRUE; } void gst_soup_http_client_sink_set_property (GObject * object, guint property_id, const GValue * value, GParamSpec * pspec) { GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object); g_mutex_lock (&souphttpsink->mutex); switch (property_id) { case PROP_SESSION: if (souphttpsink->prop_session) { g_object_unref (souphttpsink->prop_session); } souphttpsink->prop_session = g_value_dup_object (value); break; case PROP_LOCATION: g_free (souphttpsink->location); souphttpsink->location = g_value_dup_string (value); souphttpsink->offset = 0; if ((souphttpsink->location == NULL) || !gst_uri_is_valid (souphttpsink->location)) { GST_WARNING_OBJECT (souphttpsink, "The location (\"%s\") set, is not a valid uri.", souphttpsink->location); g_free (souphttpsink->location); souphttpsink->location = NULL; } break; case PROP_USER_AGENT: g_free (souphttpsink->user_agent); souphttpsink->user_agent = g_value_dup_string (value); break; case PROP_AUTOMATIC_REDIRECT: souphttpsink->automatic_redirect = g_value_get_boolean (value); break; case PROP_USER_ID: g_free (souphttpsink->user_id); souphttpsink->user_id = g_value_dup_string (value); break; case PROP_USER_PW: g_free (souphttpsink->user_pw); souphttpsink->user_pw = g_value_dup_string (value); break; case PROP_PROXY_ID: g_free (souphttpsink->proxy_id); souphttpsink->proxy_id = g_value_dup_string (value); break; case PROP_PROXY_PW: g_free (souphttpsink->proxy_pw); souphttpsink->proxy_pw = g_value_dup_string (value); break; case PROP_PROXY: { const gchar *proxy; proxy = g_value_get_string (value); if (proxy == NULL) { GST_WARNING ("proxy property cannot be NULL"); goto done; } if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) { GST_WARNING ("badly formatted proxy URI"); goto done; } break; } case PROP_COOKIES: g_strfreev (souphttpsink->cookies); souphttpsink->cookies = g_strdupv (g_value_get_boxed (value)); break; case PROP_SOUP_LOG_LEVEL: souphttpsink->log_level = g_value_get_enum (value); break; case PROP_RETRY_DELAY: souphttpsink->retry_delay = g_value_get_int (value); break; case PROP_RETRIES: souphttpsink->retries = g_value_get_int (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; } done: g_mutex_unlock (&souphttpsink->mutex); } void gst_soup_http_client_sink_get_property (GObject * object, guint property_id, GValue * value, GParamSpec * pspec) { GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object); switch (property_id) { case PROP_SESSION: g_value_set_object (value, souphttpsink->prop_session); break; case PROP_LOCATION: g_value_set_string (value, souphttpsink->location); break; case PROP_AUTOMATIC_REDIRECT: g_value_set_boolean (value, souphttpsink->automatic_redirect); break; case PROP_USER_AGENT: g_value_set_string (value, souphttpsink->user_agent); break; case PROP_USER_ID: g_value_set_string (value, souphttpsink->user_id); break; case PROP_USER_PW: g_value_set_string (value, souphttpsink->user_pw); break; case PROP_PROXY_ID: g_value_set_string (value, souphttpsink->proxy_id); break; case PROP_PROXY_PW: g_value_set_string (value, souphttpsink->proxy_pw); break; case PROP_PROXY: if (souphttpsink->proxy == NULL) g_value_set_static_string (value, ""); else { char *proxy = gst_soup_uri_to_string (souphttpsink->proxy); g_value_set_string (value, proxy); g_free (proxy); } break; case PROP_COOKIES: g_value_set_boxed (value, g_strdupv (souphttpsink->cookies)); break; case PROP_SOUP_LOG_LEVEL: g_value_set_enum (value, souphttpsink->log_level); break; case PROP_RETRY_DELAY: g_value_set_int (value, souphttpsink->retry_delay); break; case PROP_RETRIES: g_value_set_int (value, souphttpsink->retries); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; } } void gst_soup_http_client_sink_dispose (GObject * object) { GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object); /* clean up as possible. may be called multiple times */ if (souphttpsink->prop_session) g_object_unref (souphttpsink->prop_session); souphttpsink->prop_session = NULL; G_OBJECT_CLASS (parent_class)->dispose (object); } void gst_soup_http_client_sink_finalize (GObject * object) { GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object); /* clean up object here */ g_free (souphttpsink->user_agent); g_free (souphttpsink->user_id); g_free (souphttpsink->user_pw); g_free (souphttpsink->proxy_id); g_free (souphttpsink->proxy_pw); if (souphttpsink->proxy) gst_soup_uri_free (souphttpsink->proxy); g_free (souphttpsink->location); g_strfreev (souphttpsink->cookies); g_cond_clear (&souphttpsink->cond); g_mutex_clear (&souphttpsink->mutex); G_OBJECT_CLASS (parent_class)->finalize (object); } static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps) { GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink); GstStructure *structure; const GValue *value_array; int i, n; GST_DEBUG_OBJECT (souphttpsink, "new stream headers set"); structure = gst_caps_get_structure (caps, 0); value_array = gst_structure_get_value (structure, "streamheader"); if (value_array) { g_list_free_full (souphttpsink->streamheader_buffers, (GDestroyNotify) gst_buffer_unref); souphttpsink->streamheader_buffers = NULL; n = gst_value_array_get_size (value_array); for (i = 0; i < n; i++) { const GValue *value; GstBuffer *buffer; value = gst_value_array_get_value (value_array, i); buffer = GST_BUFFER (gst_value_get_buffer (value)); souphttpsink->streamheader_buffers = g_list_append (souphttpsink->streamheader_buffers, gst_buffer_ref (buffer)); } } return TRUE; } static gboolean thread_ready_idle_cb (gpointer data) { GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (data); GST_LOG_OBJECT (souphttpsink, "thread ready"); g_mutex_lock (&souphttpsink->mutex); g_cond_signal (&souphttpsink->cond); g_mutex_unlock (&souphttpsink->mutex); return FALSE; /* only run once */ } static gpointer thread_func (gpointer ptr) { GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr); GProxyResolver *proxy_resolver; GMainContext *context; GST_DEBUG ("thread start"); context = souphttpsink->context; g_main_context_push_thread_default (context); if (souphttpsink->proxy != NULL) { char *proxy_string = gst_soup_uri_to_string (souphttpsink->proxy); proxy_resolver = g_simple_proxy_resolver_new (proxy_string, NULL); g_free (proxy_string); } else proxy_resolver = g_object_ref (g_proxy_resolver_get_default ()); souphttpsink->session = _soup_session_new_with_options ("user-agent", souphttpsink->user_agent, "timeout", souphttpsink->timeout, "proxy-resolver", proxy_resolver, NULL); g_object_unref (proxy_resolver); if (gst_soup_loader_get_api_version () < 3) { g_signal_connect (souphttpsink->session, "authenticate", G_CALLBACK (authenticate), souphttpsink); } GST_DEBUG ("created session"); g_main_loop_run (souphttpsink->loop); g_main_context_pop_thread_default (context); GST_DEBUG ("thread quit"); return NULL; } static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink) { GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink); if (souphttpsink->prop_session) { souphttpsink->session = souphttpsink->prop_session; } else { GSource *source; GError *error = NULL; souphttpsink->context = g_main_context_new (); /* set up idle source to signal when the main loop is running and * it's safe for ::stop() to call g_main_loop_quit() */ source = g_idle_source_new (); g_source_set_callback (source, thread_ready_idle_cb, sink, NULL); g_source_attach (source, souphttpsink->context); g_source_unref (source); souphttpsink->loop = g_main_loop_new (souphttpsink->context, FALSE); g_mutex_lock (&souphttpsink->mutex); souphttpsink->thread = g_thread_try_new ("souphttpclientsink-thread", thread_func, souphttpsink, &error); if (error != NULL) { GST_DEBUG_OBJECT (souphttpsink, "failed to start thread, %s", error->message); g_error_free (error); g_mutex_unlock (&souphttpsink->mutex); return FALSE; } GST_LOG_OBJECT (souphttpsink, "waiting for main loop thread to start up"); while (!g_main_loop_is_running (souphttpsink->loop)) g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex); g_mutex_unlock (&souphttpsink->mutex); GST_LOG_OBJECT (souphttpsink, "main loop thread running"); } /* Set up logging */ gst_soup_util_log_setup (souphttpsink->session, souphttpsink->log_level, G_OBJECT (souphttpsink)); return TRUE; } static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink) { GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink); GST_DEBUG ("stop"); if (souphttpsink->prop_session == NULL) { _soup_session_abort (souphttpsink->session); g_object_unref (souphttpsink->session); } g_mutex_lock (&souphttpsink->mutex); if (souphttpsink->timer) { g_source_destroy (souphttpsink->timer); g_source_unref (souphttpsink->timer); souphttpsink->timer = NULL; } g_mutex_unlock (&souphttpsink->mutex); if (souphttpsink->loop) { g_main_loop_quit (souphttpsink->loop); g_mutex_lock (&souphttpsink->mutex); g_cond_signal (&souphttpsink->cond); g_mutex_unlock (&souphttpsink->mutex); g_thread_join (souphttpsink->thread); g_main_loop_unref (souphttpsink->loop); souphttpsink->loop = NULL; } if (souphttpsink->context) { g_main_context_unref (souphttpsink->context); souphttpsink->context = NULL; } gst_soup_http_client_sink_reset (souphttpsink); return TRUE; } static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink) { GST_DEBUG ("unlock"); return TRUE; } static void send_message_locked (GstSoupHttpClientSink * souphttpsink) { GList *g; guint64 n; GByteArray *array; GInputStream *in_stream; if (souphttpsink->queued_buffers == NULL || souphttpsink->message) { return; } /* If the URI went away, drop all these buffers */ if (souphttpsink->location == NULL) { GST_DEBUG_OBJECT (souphttpsink, "URI went away, dropping queued buffers"); g_list_free_full (souphttpsink->queued_buffers, (GDestroyNotify) gst_buffer_unref); souphttpsink->queued_buffers = NULL; return; } souphttpsink->message = _soup_message_new ("PUT", souphttpsink->location); if (souphttpsink->message == NULL) { GST_WARNING_OBJECT (souphttpsink, "URI could not be parsed while creating message."); g_list_free_full (souphttpsink->queued_buffers, (GDestroyNotify) gst_buffer_unref); souphttpsink->queued_buffers = NULL; return; } g_signal_connect (souphttpsink->message, "restarted", G_CALLBACK (restarted), souphttpsink->request_body); _soup_message_set_flags (souphttpsink->message, (souphttpsink->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT)); if (souphttpsink->cookies) { gchar **cookie; for (cookie = souphttpsink->cookies; *cookie != NULL; cookie++) { _soup_message_headers_append (_soup_message_get_request_headers (souphttpsink->message), "Cookie", *cookie); } } array = g_byte_array_new (); n = 0; if (souphttpsink->offset == 0) { for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) { GstBuffer *buffer = g->data; GstMapInfo map; GST_DEBUG_OBJECT (souphttpsink, "queueing stream headers"); gst_buffer_map (buffer, &map, GST_MAP_READ); g_byte_array_append (array, map.data, map.size); n += map.size; gst_buffer_unmap (buffer, &map); } } for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) { GstBuffer *buffer = g->data; if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) { GstMapInfo map; gst_buffer_map (buffer, &map, GST_MAP_READ); g_byte_array_append (array, map.data, map.size); n += map.size; gst_buffer_unmap (buffer, &map); } } { souphttpsink->request_body = g_byte_array_free_to_bytes (array); _soup_message_set_request_body_from_bytes (souphttpsink->message, NULL, souphttpsink->request_body); } if (souphttpsink->offset != 0) { char *s; s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*", souphttpsink->offset, souphttpsink->offset + n - 1); _soup_message_headers_append (_soup_message_get_request_headers (souphttpsink->message), "Content-Range", s); g_free (s); } if (n == 0) { GST_DEBUG_OBJECT (souphttpsink, "total size of buffers queued is 0, freeing everything"); g_list_free_full (souphttpsink->queued_buffers, (GDestroyNotify) gst_buffer_unref); souphttpsink->queued_buffers = NULL; g_clear_object (&souphttpsink->message); g_clear_pointer (&souphttpsink->request_body, g_bytes_unref); return; } in_stream = _soup_session_send (souphttpsink->session, souphttpsink->message, NULL, NULL); if (in_stream == NULL) { GError *error = NULL; if (!send_handle_status (souphttpsink->message, error, souphttpsink)) { g_object_unref (souphttpsink->message); g_clear_pointer (&souphttpsink->request_body, g_bytes_unref); g_clear_error (&error); return; } } souphttpsink->sent_buffers = souphttpsink->queued_buffers; g_clear_pointer (&souphttpsink->request_body, g_bytes_unref); g_object_unref (in_stream); g_list_free_full (souphttpsink->sent_buffers, (GDestroyNotify) gst_buffer_unref); souphttpsink->sent_buffers = NULL; souphttpsink->failures = 0; souphttpsink->queued_buffers = NULL; g_clear_object (&souphttpsink->message); souphttpsink->offset += n; } static gboolean send_message (GstSoupHttpClientSink * souphttpsink) { g_mutex_lock (&souphttpsink->mutex); send_message_locked (souphttpsink); if (souphttpsink->timer) { g_source_destroy (souphttpsink->timer); g_source_unref (souphttpsink->timer); souphttpsink->timer = NULL; } g_mutex_unlock (&souphttpsink->mutex); return FALSE; } static gboolean send_handle_status (SoupMessage * msg, GError * error, GstSoupHttpClientSink * sink) { if (error) { GST_DEBUG_OBJECT (sink, "callback error=%d %s", error->code, error->message); } else { GST_DEBUG_OBJECT (sink, "callback status=%d %s", _soup_message_get_status (msg), _soup_message_get_reason_phrase (msg)); } if (error || !SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (msg))) { sink->failures++; if (sink->retries && (sink->retries < 0 || sink->retries >= sink->failures)) { guint64 retry_delay; const char *retry_after; SoupMessageHeaders *res_hdrs; if (error) { retry_delay = sink->retry_delay; GST_WARNING_OBJECT (sink, "Could not write to HTTP URI: " "error: %d %s (retrying PUT after %" G_GINT64_FORMAT " seconds)", error->code, error->message, retry_delay); goto err_done; } res_hdrs = _soup_message_get_response_headers (msg); retry_after = _soup_message_headers_get_one (res_hdrs, "Retry-After"); if (retry_after) { gchar *end = NULL; retry_delay = g_ascii_strtoull (retry_after, &end, 10); if (end || errno) { retry_delay = sink->retry_delay; } else { retry_delay = MAX (retry_delay, sink->retry_delay); } GST_WARNING_OBJECT (sink, "Could not write to HTTP URI: " "status: %d %s (retrying PUT after %" G_GINT64_FORMAT " seconds with Retry-After: %s)", _soup_message_get_status (msg), _soup_message_get_reason_phrase (msg), retry_delay, retry_after); } else { retry_delay = sink->retry_delay; GST_WARNING_OBJECT (sink, "Could not write to HTTP URI: " "status: %d %s (retrying PUT after %" G_GINT64_FORMAT " seconds)", _soup_message_get_status (msg), _soup_message_get_reason_phrase (msg), retry_delay); } err_done: sink->timer = g_timeout_source_new_seconds (retry_delay); g_source_set_callback (sink->timer, (GSourceFunc) (send_message), sink, NULL); g_source_attach (sink->timer, sink->context); } else { sink->status_code = _soup_message_get_status (msg); sink->reason_phrase = g_strdup (_soup_message_get_reason_phrase (msg)); } return FALSE; } return TRUE; } static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer) { GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink); GSource *source; if (souphttpsink->status_code != 0) { GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE, ("Could not write to HTTP URI"), ("status: %d %s", souphttpsink->status_code, souphttpsink->reason_phrase)); return GST_FLOW_ERROR; } g_mutex_lock (&souphttpsink->mutex); if (souphttpsink->location != NULL) { souphttpsink->queued_buffers = g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer)); GST_DEBUG_OBJECT (souphttpsink, "setting callback for new buffers"); source = g_idle_source_new (); g_source_set_callback (source, (GSourceFunc) (send_message), souphttpsink, NULL); g_source_attach (source, souphttpsink->context); g_source_unref (source); } g_mutex_unlock (&souphttpsink->mutex); return GST_FLOW_OK; } static void restarted (SoupMessage * msg, GBytes * body) { _soup_message_set_request_body_from_bytes (msg, NULL, body); } static gboolean authenticate (SoupMessage * msg, SoupAuth * auth, gboolean retrying, gpointer user_data) { GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data); if (!retrying) { SoupStatus status_code = _soup_message_get_status (msg); /* First time authentication only, if we fail and are called again with retry true fall through */ if (status_code == SOUP_STATUS_UNAUTHORIZED) { if (souphttpsink->user_id && souphttpsink->user_pw) _soup_auth_authenticate (auth, souphttpsink->user_id, souphttpsink->user_pw); } else if (status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED) { if (souphttpsink->proxy_id && souphttpsink->proxy_pw) _soup_auth_authenticate (auth, souphttpsink->proxy_id, souphttpsink->proxy_pw); } } return FALSE; } static gboolean souphttpclientsink_element_init (GstPlugin * plugin) { gboolean ret = TRUE; GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0, "souphttpclientsink element"); if (!soup_element_init (plugin)) return TRUE; ret = gst_element_register (plugin, "souphttpclientsink", GST_RANK_NONE, GST_TYPE_SOUP_HTTP_CLIENT_SINK); return ret; }