mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-03 16:09:39 +00:00
1427 lines
41 KiB
C
1427 lines
41 KiB
C
/* GStreamer
|
|
* Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
|
|
/**
|
|
* SECTION:element-curlsink
|
|
* @title: curlsink
|
|
* @short_description: sink that uploads data to a server using libcurl
|
|
*
|
|
* This is a network sink that uses libcurl as a client to upload data to
|
|
* a server (e.g. a HTTP/FTP server).
|
|
*
|
|
* ## Example launch line (upload a JPEG file to an HTTP server)
|
|
* |[
|
|
* gst-launch-1.0 filesrc location=image.jpg ! jpegparse ! curlsink \
|
|
* file-name=image.jpg \
|
|
* location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/ \
|
|
* user=test passwd=test \
|
|
* content-type=image/jpeg \
|
|
* use-content-length=false
|
|
* ]|
|
|
*
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include <curl/curl.h>
|
|
#include <string.h>
|
|
#include <stdio.h>
|
|
|
|
#if HAVE_SYS_SOCKET_H
|
|
#include <sys/socket.h>
|
|
#endif
|
|
#include <sys/types.h>
|
|
#if HAVE_NETINET_IN_H
|
|
#include <netinet/in.h>
|
|
#endif
|
|
#include <unistd.h>
|
|
#if HAVE_NETINET_IP_H
|
|
#include <netinet/ip.h>
|
|
#endif
|
|
#if HAVE_NETINET_TCP_H
|
|
#include <netinet/tcp.h>
|
|
#endif
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
|
|
#include "gstcurlbasesink.h"
|
|
|
|
/* Default values */
|
|
#define GST_CAT_DEFAULT gst_curl_base_sink_debug
|
|
#define DEFAULT_URL "localhost:5555"
|
|
#define DEFAULT_TIMEOUT 30
|
|
#define DEFAULT_QOS_DSCP 0
|
|
|
|
#define DSCP_MIN 0
|
|
#define DSCP_MAX 63
|
|
|
|
|
|
/* Plugin specific settings */
|
|
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
|
|
GST_PAD_SINK,
|
|
GST_PAD_ALWAYS,
|
|
GST_STATIC_CAPS_ANY);
|
|
|
|
GST_DEBUG_CATEGORY_STATIC (gst_curl_base_sink_debug);
|
|
|
|
enum
|
|
{
|
|
PROP_0,
|
|
PROP_LOCATION,
|
|
PROP_USER_NAME,
|
|
PROP_USER_PASSWD,
|
|
PROP_FILE_NAME,
|
|
PROP_TIMEOUT,
|
|
PROP_QOS_DSCP
|
|
};
|
|
|
|
/* Object class function declarations */
|
|
static void gst_curl_base_sink_finalize (GObject * gobject);
|
|
static void gst_curl_base_sink_set_property (GObject * object, guint prop_id,
|
|
const GValue * value, GParamSpec * pspec);
|
|
static void gst_curl_base_sink_get_property (GObject * object, guint prop_id,
|
|
GValue * value, GParamSpec * pspec);
|
|
|
|
/* BaseSink class function declarations */
|
|
static GstFlowReturn gst_curl_base_sink_render (GstBaseSink * bsink,
|
|
GstBuffer * buf);
|
|
static gboolean gst_curl_base_sink_event (GstBaseSink * bsink,
|
|
GstEvent * event);
|
|
static gboolean gst_curl_base_sink_start (GstBaseSink * bsink);
|
|
static gboolean gst_curl_base_sink_stop (GstBaseSink * bsink);
|
|
static gboolean gst_curl_base_sink_unlock (GstBaseSink * bsink);
|
|
static gboolean gst_curl_base_sink_unlock_stop (GstBaseSink * bsink);
|
|
|
|
/* private functions */
|
|
|
|
static gboolean gst_curl_base_sink_transfer_setup_unlocked
|
|
(GstCurlBaseSink * sink);
|
|
static gboolean gst_curl_base_sink_transfer_start_unlocked
|
|
(GstCurlBaseSink * sink);
|
|
static void gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink);
|
|
static size_t gst_curl_base_sink_transfer_read_cb (void *ptr, size_t size,
|
|
size_t nmemb, void *stream);
|
|
static size_t gst_curl_base_sink_transfer_write_cb (void *ptr, size_t size,
|
|
size_t nmemb, void *stream);
|
|
static size_t gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
|
|
void *curl_ptr, size_t block_size, guint * last_chunk);
|
|
#ifndef GST_DISABLE_GST_DEBUG
|
|
static int gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type,
|
|
char *data, size_t size, void *clientp);
|
|
#endif
|
|
static int gst_curl_base_sink_transfer_socket_cb (void *clientp,
|
|
curl_socket_t curlfd, curlsocktype purpose);
|
|
static gpointer gst_curl_base_sink_transfer_thread_func (gpointer data);
|
|
static gint gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink);
|
|
static CURLcode gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink);
|
|
|
|
static gboolean gst_curl_base_sink_wait_for_data_unlocked
|
|
(GstCurlBaseSink * sink);
|
|
static void gst_curl_base_sink_new_file_notify_unlocked
|
|
(GstCurlBaseSink * sink);
|
|
static void gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
|
|
(GstCurlBaseSink * sink);
|
|
static void gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink);
|
|
static void gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink);
|
|
static void gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink);
|
|
|
|
static void handle_transfer (GstCurlBaseSink * sink);
|
|
static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
|
|
size_t max_bytes_to_send, guint * last_chunk);
|
|
|
|
#define parent_class gst_curl_base_sink_parent_class
|
|
G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK);
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_default_has_buffered_data_unlocked (GstCurlBaseSink * sink)
|
|
{
|
|
return sink->transfer_buf->len > 0;
|
|
}
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_has_buffered_data_unlocked (GstCurlBaseSink * sink)
|
|
{
|
|
GstCurlBaseSinkClass *klass;
|
|
gboolean res = FALSE;
|
|
|
|
klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
|
|
|
|
if (klass->has_buffered_data_unlocked)
|
|
res = klass->has_buffered_data_unlocked (sink);
|
|
|
|
return res;
|
|
}
|
|
|
|
static void
|
|
gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
|
|
{
|
|
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
|
|
GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
|
|
GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
|
|
|
|
GST_DEBUG_CATEGORY_INIT (gst_curl_base_sink_debug, "curlbasesink", 0,
|
|
"curl base sink element");
|
|
GST_DEBUG_OBJECT (klass, "class_init");
|
|
|
|
gst_element_class_set_static_metadata (element_class,
|
|
"Curl base sink",
|
|
"Sink/Network",
|
|
"Upload data over the network to a server using libcurl",
|
|
"Patricia Muscalu <patricia@axis.com>");
|
|
|
|
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_base_sink_event);
|
|
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_base_sink_render);
|
|
gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_base_sink_start);
|
|
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_base_sink_stop);
|
|
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock);
|
|
gstbasesink_class->unlock_stop =
|
|
GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock_stop);
|
|
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_base_sink_finalize);
|
|
|
|
gobject_class->set_property = gst_curl_base_sink_set_property;
|
|
gobject_class->get_property = gst_curl_base_sink_get_property;
|
|
|
|
klass->handle_transfer = handle_transfer;
|
|
klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb;
|
|
klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer;
|
|
klass->has_buffered_data_unlocked =
|
|
gst_curl_base_sink_default_has_buffered_data_unlocked;
|
|
|
|
/* FIXME: check against souphttpsrc and use same names for same properties */
|
|
g_object_class_install_property (gobject_class, PROP_LOCATION,
|
|
g_param_spec_string ("location", "Location",
|
|
"URI location to write to", NULL,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_USER_NAME,
|
|
g_param_spec_string ("user", "User name",
|
|
"User name to use for server authentication", NULL,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_USER_PASSWD,
|
|
g_param_spec_string ("passwd", "User password",
|
|
"User password to use for server authentication", NULL,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_FILE_NAME,
|
|
g_param_spec_string ("file-name", "Base file name",
|
|
"The base file name for the uploaded images", NULL,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_TIMEOUT,
|
|
g_param_spec_int ("timeout", "Timeout",
|
|
"Number of seconds waiting to write before timeout",
|
|
0, G_MAXINT, DEFAULT_TIMEOUT,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
|
|
g_param_spec_int ("qos-dscp",
|
|
"QoS diff srv code point",
|
|
"Quality of Service, differentiated services code point (0 default)",
|
|
DSCP_MIN, DSCP_MAX, DEFAULT_QOS_DSCP,
|
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
|
|
|
gst_element_class_add_static_pad_template (element_class, &sinktemplate);
|
|
}
|
|
|
|
static void
|
|
gst_curl_base_sink_init (GstCurlBaseSink * sink)
|
|
{
|
|
sink->transfer_buf = g_malloc (sizeof (TransferBuffer));
|
|
sink->transfer_cond = g_malloc (sizeof (TransferCondition));
|
|
g_cond_init (&sink->transfer_cond->cond);
|
|
sink->transfer_cond->data_sent = FALSE;
|
|
sink->transfer_cond->data_available = FALSE;
|
|
sink->transfer_cond->wait_for_response = FALSE;
|
|
sink->timeout = DEFAULT_TIMEOUT;
|
|
sink->qos_dscp = DEFAULT_QOS_DSCP;
|
|
sink->url = g_strdup (DEFAULT_URL);
|
|
sink->transfer_thread_close = FALSE;
|
|
sink->new_file = TRUE;
|
|
sink->error = NULL;
|
|
sink->flow_ret = GST_FLOW_OK;
|
|
sink->is_live = FALSE;
|
|
}
|
|
|
|
static void
|
|
gst_curl_base_sink_finalize (GObject * gobject)
|
|
{
|
|
GstCurlBaseSink *this = GST_CURL_BASE_SINK (gobject);
|
|
|
|
GST_DEBUG ("finalizing curlsink");
|
|
if (this->transfer_thread != NULL) {
|
|
g_thread_join (this->transfer_thread);
|
|
}
|
|
|
|
g_cond_clear (&this->transfer_cond->cond);
|
|
g_free (this->transfer_cond);
|
|
g_free (this->transfer_buf);
|
|
|
|
g_free (this->url);
|
|
g_free (this->user);
|
|
g_free (this->passwd);
|
|
g_free (this->file_name);
|
|
if (this->fdset != NULL) {
|
|
gst_poll_free (this->fdset);
|
|
this->fdset = NULL;
|
|
}
|
|
G_OBJECT_CLASS (parent_class)->finalize (gobject);
|
|
}
|
|
|
|
void
|
|
gst_curl_base_sink_transfer_thread_notify_unlocked (GstCurlBaseSink * sink)
|
|
{
|
|
GST_LOG ("more data to send");
|
|
|
|
sink->transfer_cond->data_available = TRUE;
|
|
sink->transfer_cond->data_sent = FALSE;
|
|
sink->transfer_cond->wait_for_response = TRUE;
|
|
g_cond_signal (&sink->transfer_cond->cond);
|
|
}
|
|
|
|
void
|
|
gst_curl_base_sink_transfer_thread_close (GstCurlBaseSink * sink)
|
|
{
|
|
GST_OBJECT_LOCK (sink);
|
|
GST_LOG_OBJECT (sink, "setting transfer thread close flag");
|
|
sink->transfer_thread_close = TRUE;
|
|
g_cond_signal (&sink->transfer_cond->cond);
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
if (sink->transfer_thread != NULL) {
|
|
GST_LOG_OBJECT (sink, "waiting for transfer thread to finish");
|
|
g_thread_join (sink->transfer_thread);
|
|
sink->transfer_thread = NULL;
|
|
}
|
|
}
|
|
|
|
void
|
|
gst_curl_base_sink_set_live (GstCurlBaseSink * sink, gboolean live)
|
|
{
|
|
g_return_if_fail (GST_IS_CURL_BASE_SINK (sink));
|
|
|
|
GST_OBJECT_LOCK (sink);
|
|
sink->is_live = live;
|
|
GST_OBJECT_UNLOCK (sink);
|
|
}
|
|
|
|
gboolean
|
|
gst_curl_base_sink_is_live (GstCurlBaseSink * sink)
|
|
{
|
|
gboolean result;
|
|
|
|
g_return_val_if_fail (GST_IS_CURL_BASE_SINK (sink), FALSE);
|
|
|
|
GST_OBJECT_LOCK (sink);
|
|
result = sink->is_live;
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
return result;
|
|
}
|
|
|
|
static GstFlowReturn
|
|
gst_curl_base_sink_render (GstBaseSink * bsink, GstBuffer * buf)
|
|
{
|
|
GstCurlBaseSink *sink;
|
|
GstMapInfo map;
|
|
guint8 *data;
|
|
size_t size;
|
|
GstFlowReturn ret;
|
|
gchar *error;
|
|
|
|
GST_LOG ("enter render");
|
|
|
|
sink = GST_CURL_BASE_SINK (bsink);
|
|
|
|
gst_buffer_map (buf, &map, GST_MAP_READ);
|
|
data = map.data;
|
|
size = map.size;
|
|
|
|
if (size == 0) {
|
|
gst_buffer_unmap (buf, &map);
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
GST_OBJECT_LOCK (sink);
|
|
|
|
/* check if the transfer thread has encountered problems while the
|
|
* pipeline thread was working elsewhere */
|
|
if (sink->flow_ret != GST_FLOW_OK) {
|
|
goto done;
|
|
}
|
|
|
|
g_assert (sink->transfer_cond->data_available == FALSE);
|
|
|
|
/* if there is no transfer thread created, lets create one */
|
|
if (sink->transfer_thread == NULL) {
|
|
if (!gst_curl_base_sink_transfer_start_unlocked (sink)) {
|
|
sink->flow_ret = GST_FLOW_ERROR;
|
|
goto done;
|
|
}
|
|
}
|
|
|
|
/* make data available for the transfer thread and notify */
|
|
sink->transfer_buf->ptr = data;
|
|
sink->transfer_buf->len = size;
|
|
sink->transfer_buf->offset = 0;
|
|
gst_curl_base_sink_transfer_thread_notify_unlocked (sink);
|
|
|
|
/* wait for the transfer thread to send the data. This will be notified
|
|
* either when transfer is completed by the curl read callback or by
|
|
* the thread function if an error has occurred. */
|
|
gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked (sink);
|
|
|
|
done:
|
|
gst_buffer_unmap (buf, &map);
|
|
|
|
/* Hand over error from transfer thread to streaming thread */
|
|
error = sink->error;
|
|
sink->error = NULL;
|
|
ret = sink->flow_ret;
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
if (error != NULL) {
|
|
GST_ERROR_OBJECT (sink, "%s", error);
|
|
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s", error), (NULL));
|
|
g_free (error);
|
|
}
|
|
|
|
GST_LOG ("exit render");
|
|
|
|
return ret;
|
|
}
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_event (GstBaseSink * bsink, GstEvent * event)
|
|
{
|
|
GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
|
|
GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
|
|
|
|
switch (event->type) {
|
|
case GST_EVENT_EOS:
|
|
GST_DEBUG_OBJECT (sink, "received EOS");
|
|
gst_curl_base_sink_transfer_thread_close (sink);
|
|
gst_curl_base_sink_wait_for_response (sink);
|
|
break;
|
|
case GST_EVENT_CAPS:
|
|
if (klass->set_mime_type) {
|
|
GstCaps *caps;
|
|
gst_event_parse_caps (event, &caps);
|
|
klass->set_mime_type (sink, caps);
|
|
}
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
|
|
}
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_start (GstBaseSink * bsink)
|
|
{
|
|
GstCurlBaseSink *sink;
|
|
|
|
sink = GST_CURL_BASE_SINK (bsink);
|
|
|
|
/* reset flags */
|
|
sink->transfer_cond->data_sent = FALSE;
|
|
sink->transfer_cond->data_available = FALSE;
|
|
sink->transfer_cond->wait_for_response = FALSE;
|
|
sink->transfer_thread_close = FALSE;
|
|
sink->new_file = TRUE;
|
|
sink->flow_ret = GST_FLOW_OK;
|
|
|
|
if ((sink->fdset = gst_poll_new (TRUE)) == NULL) {
|
|
GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE,
|
|
("gst_poll_new failed: %s", g_strerror (errno)), (NULL));
|
|
return FALSE;
|
|
}
|
|
|
|
gst_poll_fd_init (&sink->fd);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_stop (GstBaseSink * bsink)
|
|
{
|
|
GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
|
|
|
|
gst_curl_base_sink_transfer_thread_close (sink);
|
|
if (sink->fdset != NULL) {
|
|
gst_poll_free (sink->fdset);
|
|
sink->fdset = NULL;
|
|
}
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_unlock (GstBaseSink * bsink)
|
|
{
|
|
GstCurlBaseSink *sink;
|
|
|
|
sink = GST_CURL_BASE_SINK (bsink);
|
|
|
|
GST_LOG_OBJECT (sink, "Flushing");
|
|
gst_poll_set_flushing (sink->fdset, TRUE);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_unlock_stop (GstBaseSink * bsink)
|
|
{
|
|
GstCurlBaseSink *sink;
|
|
|
|
sink = GST_CURL_BASE_SINK (bsink);
|
|
|
|
GST_LOG_OBJECT (sink, "No longer flushing");
|
|
gst_poll_set_flushing (sink->fdset, FALSE);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static void
|
|
gst_curl_base_sink_set_property (GObject * object, guint prop_id,
|
|
const GValue * value, GParamSpec * pspec)
|
|
{
|
|
GstCurlBaseSink *sink;
|
|
GstState cur_state;
|
|
|
|
g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
|
|
sink = GST_CURL_BASE_SINK (object);
|
|
|
|
gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0);
|
|
if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) {
|
|
GST_OBJECT_LOCK (sink);
|
|
|
|
switch (prop_id) {
|
|
case PROP_LOCATION:
|
|
g_free (sink->url);
|
|
sink->url = g_value_dup_string (value);
|
|
GST_DEBUG_OBJECT (sink, "url set to %s", sink->url);
|
|
break;
|
|
case PROP_USER_NAME:
|
|
g_free (sink->user);
|
|
sink->user = g_value_dup_string (value);
|
|
GST_DEBUG_OBJECT (sink, "user set to %s", sink->user);
|
|
break;
|
|
case PROP_USER_PASSWD:
|
|
g_free (sink->passwd);
|
|
sink->passwd = g_value_dup_string (value);
|
|
GST_DEBUG_OBJECT (sink, "passwd set to %s", sink->passwd);
|
|
break;
|
|
case PROP_FILE_NAME:
|
|
g_free (sink->file_name);
|
|
sink->file_name = g_value_dup_string (value);
|
|
GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
|
|
break;
|
|
case PROP_TIMEOUT:
|
|
sink->timeout = g_value_get_int (value);
|
|
GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
|
|
break;
|
|
case PROP_QOS_DSCP:
|
|
sink->qos_dscp = g_value_get_int (value);
|
|
gst_curl_base_sink_setup_dscp_unlocked (sink);
|
|
GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
|
|
break;
|
|
default:
|
|
GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id);
|
|
break;
|
|
}
|
|
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
return;
|
|
}
|
|
|
|
/* in PLAYING or PAUSED state */
|
|
GST_OBJECT_LOCK (sink);
|
|
|
|
switch (prop_id) {
|
|
case PROP_FILE_NAME:
|
|
g_free (sink->file_name);
|
|
sink->file_name = g_value_dup_string (value);
|
|
GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
|
|
gst_curl_base_sink_new_file_notify_unlocked (sink);
|
|
break;
|
|
case PROP_TIMEOUT:
|
|
sink->timeout = g_value_get_int (value);
|
|
GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
|
|
break;
|
|
case PROP_QOS_DSCP:
|
|
sink->qos_dscp = g_value_get_int (value);
|
|
gst_curl_base_sink_setup_dscp_unlocked (sink);
|
|
GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
|
|
break;
|
|
default:
|
|
GST_WARNING_OBJECT (sink, "cannot set property when PLAYING");
|
|
break;
|
|
}
|
|
|
|
GST_OBJECT_UNLOCK (sink);
|
|
}
|
|
|
|
static void
|
|
gst_curl_base_sink_get_property (GObject * object, guint prop_id,
|
|
GValue * value, GParamSpec * pspec)
|
|
{
|
|
GstCurlBaseSink *sink;
|
|
|
|
g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
|
|
sink = GST_CURL_BASE_SINK (object);
|
|
|
|
switch (prop_id) {
|
|
case PROP_LOCATION:
|
|
g_value_set_string (value, sink->url);
|
|
break;
|
|
case PROP_USER_NAME:
|
|
g_value_set_string (value, sink->user);
|
|
break;
|
|
case PROP_USER_PASSWD:
|
|
g_value_set_string (value, sink->passwd);
|
|
break;
|
|
case PROP_FILE_NAME:
|
|
g_value_set_string (value, sink->file_name);
|
|
break;
|
|
case PROP_TIMEOUT:
|
|
g_value_set_int (value, sink->timeout);
|
|
break;
|
|
case PROP_QOS_DSCP:
|
|
g_value_set_int (value, sink->qos_dscp);
|
|
break;
|
|
default:
|
|
GST_DEBUG_OBJECT (sink, "invalid property id");
|
|
break;
|
|
}
|
|
}
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_transfer_set_common_options_unlocked (GstCurlBaseSink * sink)
|
|
{
|
|
GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
|
|
CURLcode res;
|
|
|
|
#ifndef GST_DISABLE_GST_DEBUG
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_VERBOSE, 1);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set verbose: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGDATA, sink);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set debug user_data: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGFUNCTION,
|
|
gst_curl_base_sink_debug_cb);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set debug functions: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
#endif
|
|
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_URL, sink->url);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set URL: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_CONNECTTIMEOUT, sink->timeout);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set connection timeout: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
|
|
/* using signals in a multi-threaded application is dangerous */
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_NOSIGNAL, 1);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set no signalling: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
|
|
/* socket settings */
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTDATA, sink);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set sockopt user data: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTFUNCTION,
|
|
gst_curl_base_sink_transfer_socket_cb);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set sockopt function: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set read user data: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION,
|
|
klass->transfer_read_cb);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set read function: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_WRITEDATA, sink);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set write user data: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION,
|
|
gst_curl_base_sink_transfer_write_cb);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set write function: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
/* Time out in case transfer speed in bytes per second stay below
|
|
* CURLOPT_LOW_SPEED_LIMIT during CURLOPT_LOW_SPEED_TIME */
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_LIMIT, 1L);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set low speed limit: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_TIME,
|
|
(long) sink->timeout);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set low speed time: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
|
|
GST_LOG ("common options set");
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_transfer_set_options_unlocked (GstCurlBaseSink * sink)
|
|
{
|
|
GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
|
|
CURLcode res;
|
|
|
|
if (!gst_curl_base_sink_transfer_set_common_options_unlocked (sink)) {
|
|
return FALSE;
|
|
}
|
|
|
|
/* authentication settings */
|
|
if (sink->user != NULL && strlen (sink->user)) {
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_USERNAME, sink->user);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set user name: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
res = curl_easy_setopt (sink->curl, CURLOPT_PASSWORD, sink->passwd);
|
|
if (res != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to set password: %s",
|
|
curl_easy_strerror (res));
|
|
return FALSE;
|
|
}
|
|
}
|
|
|
|
if (klass->set_options_unlocked) {
|
|
return klass->set_options_unlocked (sink);
|
|
} else {
|
|
return FALSE;
|
|
}
|
|
}
|
|
|
|
static size_t
|
|
transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
|
|
size_t max_bytes_to_send, guint * last_chunk)
|
|
{
|
|
guint buf_len = buf->len;
|
|
size_t bytes_to_send = MIN (max_bytes_to_send, buf->len);
|
|
|
|
memcpy ((guint8 *) curl_ptr, buf->ptr + buf->offset, bytes_to_send);
|
|
buf->offset = buf->offset + bytes_to_send;
|
|
buf->len = buf->len - bytes_to_send;
|
|
|
|
/* the last data chunk */
|
|
if (bytes_to_send == buf_len) {
|
|
buf->offset = 0;
|
|
buf->len = 0;
|
|
*last_chunk = 1;
|
|
}
|
|
|
|
GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send);
|
|
|
|
return bytes_to_send;
|
|
}
|
|
|
|
static size_t
|
|
gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
|
|
void *curl_ptr, size_t block_size, guint * last_chunk)
|
|
{
|
|
TransferBuffer *buffer;
|
|
|
|
buffer = sink->transfer_buf;
|
|
GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT,
|
|
buffer->len, buffer->offset);
|
|
|
|
if (buffer->len <= 0) {
|
|
GST_WARNING ("got zero- or negative-length buffer");
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* more data in buffer(s) */
|
|
return transfer_data_buffer (curl_ptr, buffer, block_size, last_chunk);
|
|
}
|
|
|
|
static size_t
|
|
gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
|
|
void *stream)
|
|
{
|
|
GstCurlBaseSink *sink;
|
|
GstCurlBaseSinkClass *klass;
|
|
size_t max_bytes_to_send;
|
|
size_t bytes_to_send;
|
|
guint last_chunk = 0;
|
|
|
|
sink = (GstCurlBaseSink *) stream;
|
|
klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
|
|
|
|
max_bytes_to_send = size * nmemb;
|
|
|
|
/* wait for data to come available, if new file or thread close is set
|
|
* then zero will be returned to indicate end of current transfer */
|
|
GST_OBJECT_LOCK (sink);
|
|
if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) {
|
|
|
|
if (gst_curl_base_sink_has_buffered_data_unlocked (sink) &&
|
|
sink->transfer_thread_close) {
|
|
GST_WARNING_OBJECT (sink,
|
|
"discarding render data due to thread close flag");
|
|
|
|
GST_OBJECT_UNLOCK (sink);
|
|
return CURL_READFUNC_ABORT;
|
|
}
|
|
|
|
if (klass->flush_data_unlocked) {
|
|
bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr,
|
|
max_bytes_to_send, sink->new_file, sink->transfer_thread_close);
|
|
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
return bytes_to_send;
|
|
}
|
|
|
|
GST_OBJECT_UNLOCK (sink);
|
|
GST_LOG ("returning 0, no more data to send in this file");
|
|
|
|
return 0;
|
|
}
|
|
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
bytes_to_send = klass->transfer_data_buffer (sink, curl_ptr,
|
|
max_bytes_to_send, &last_chunk);
|
|
|
|
/* the last data chunk */
|
|
if (last_chunk) {
|
|
gst_curl_base_sink_data_sent_notify (sink);
|
|
}
|
|
|
|
return bytes_to_send;
|
|
}
|
|
|
|
static size_t
|
|
gst_curl_base_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size,
|
|
size_t nmemb, void G_GNUC_UNUSED * stream)
|
|
{
|
|
GstCurlBaseSink *sink;
|
|
GstCurlBaseSinkClass *klass;
|
|
size_t realsize = size * nmemb;
|
|
|
|
sink = (GstCurlBaseSink *) stream;
|
|
klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
|
|
|
|
if (klass->transfer_verify_response_code) {
|
|
if (!klass->transfer_verify_response_code (sink)) {
|
|
GST_DEBUG_OBJECT (sink, "response error");
|
|
GST_OBJECT_LOCK (sink);
|
|
sink->flow_ret = GST_FLOW_ERROR;
|
|
GST_OBJECT_UNLOCK (sink);
|
|
}
|
|
}
|
|
|
|
GST_DEBUG ("response %s", (gchar *) ptr);
|
|
|
|
return realsize;
|
|
}
|
|
|
|
CURLcode
|
|
gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink)
|
|
{
|
|
CURLcode code = CURLE_OK;
|
|
CURL *easy;
|
|
CURLMsg *msg;
|
|
gint msgs_left;
|
|
gchar *eff_url = NULL;
|
|
|
|
do {
|
|
easy = NULL;
|
|
while ((msg = curl_multi_info_read (sink->multi_handle, &msgs_left))) {
|
|
if (msg->msg == CURLMSG_DONE) {
|
|
easy = msg->easy_handle;
|
|
code = msg->data.result;
|
|
break;
|
|
}
|
|
}
|
|
if (easy) {
|
|
curl_easy_getinfo (easy, CURLINFO_EFFECTIVE_URL, &eff_url);
|
|
GST_DEBUG ("transfer done %s (%s-%d)\n", eff_url,
|
|
curl_easy_strerror (code), code);
|
|
}
|
|
} while (easy);
|
|
|
|
return code;
|
|
}
|
|
|
|
static void
|
|
handle_transfer (GstCurlBaseSink * sink)
|
|
{
|
|
GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
|
|
gint retval;
|
|
gint activated_fds;
|
|
gint running_handles;
|
|
gint timeout;
|
|
CURLMcode m_code;
|
|
CURLcode e_code;
|
|
|
|
GST_OBJECT_LOCK (sink);
|
|
timeout = sink->timeout;
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
GST_DEBUG_OBJECT (sink, "handling transfers");
|
|
|
|
/* Receiving CURLM_CALL_MULTI_PERFORM means that libcurl may have more data
|
|
available to send or receive - call simply curl_multi_perform before
|
|
poll() on more actions */
|
|
do {
|
|
m_code = curl_multi_perform (sink->multi_handle, &running_handles);
|
|
} while (m_code == CURLM_CALL_MULTI_PERFORM);
|
|
GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
|
|
|
|
while (running_handles && (m_code == CURLM_OK)) {
|
|
if (klass->transfer_prepare_poll_wait) {
|
|
klass->transfer_prepare_poll_wait (sink);
|
|
}
|
|
|
|
activated_fds = gst_poll_wait (sink->fdset, timeout * GST_SECOND);
|
|
if (G_UNLIKELY (activated_fds == -1)) {
|
|
if (errno == EAGAIN || errno == EINTR) {
|
|
GST_DEBUG_OBJECT (sink, "interrupted by signal");
|
|
} else if (errno == EBUSY) {
|
|
GST_DEBUG_OBJECT (sink, "poll stopped");
|
|
retval = GST_FLOW_EOS;
|
|
|
|
GST_OBJECT_LOCK (sink);
|
|
if (gst_curl_base_sink_has_buffered_data_unlocked (sink))
|
|
GST_WARNING_OBJECT (sink,
|
|
"discarding render data due to thread close flag");
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
goto fail;
|
|
} else {
|
|
sink->error = g_strdup_printf ("poll failed: %s", g_strerror (errno));
|
|
retval = GST_FLOW_ERROR;
|
|
goto fail;
|
|
}
|
|
} else if (G_UNLIKELY (activated_fds == 0)) {
|
|
sink->error = g_strdup_printf ("poll timed out after %" GST_TIME_FORMAT,
|
|
GST_TIME_ARGS (timeout * GST_SECOND));
|
|
retval = GST_FLOW_ERROR;
|
|
goto fail;
|
|
}
|
|
|
|
/* readable/writable sockets */
|
|
do {
|
|
m_code = curl_multi_perform (sink->multi_handle, &running_handles);
|
|
} while (m_code == CURLM_CALL_MULTI_PERFORM);
|
|
GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
|
|
}
|
|
|
|
if (m_code != CURLM_OK) {
|
|
sink->error = g_strdup_printf ("failed to write data: %s",
|
|
curl_multi_strerror (m_code));
|
|
retval = GST_FLOW_ERROR;
|
|
goto fail;
|
|
}
|
|
|
|
/* problems still might have occurred on individual transfers even when
|
|
* curl_multi_perform returns CURLM_OK */
|
|
if ((e_code = gst_curl_base_sink_transfer_check (sink)) != CURLE_OK) {
|
|
sink->error = g_strdup_printf ("failed to transfer data: %s",
|
|
curl_easy_strerror (e_code));
|
|
retval = GST_FLOW_ERROR;
|
|
goto fail;
|
|
}
|
|
|
|
gst_curl_base_sink_got_response_notify (sink);
|
|
|
|
GST_OBJECT_LOCK (sink);
|
|
if (sink->socket_type == CURLSOCKTYPE_ACCEPT) {
|
|
/* FIXME: remove this again once we can depend on libcurl > 7.44.0,
|
|
* see https://github.com/bagder/curl/issues/405.
|
|
*/
|
|
if (G_UNLIKELY (sink->fd.fd < 0)) {
|
|
sink->error = g_strdup_printf ("unknown error");
|
|
retval = GST_FLOW_ERROR;
|
|
GST_OBJECT_UNLOCK (sink);
|
|
goto fail;
|
|
}
|
|
if (!gst_poll_remove_fd (sink->fdset, &sink->fd)) {
|
|
sink->error = g_strdup_printf ("failed to remove fd");
|
|
retval = GST_FLOW_ERROR;
|
|
GST_OBJECT_UNLOCK (sink);
|
|
goto fail;
|
|
}
|
|
sink->fd.fd = -1;
|
|
}
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
return;
|
|
|
|
fail:
|
|
GST_OBJECT_LOCK (sink);
|
|
if (sink->flow_ret == GST_FLOW_OK) {
|
|
sink->flow_ret = retval;
|
|
}
|
|
GST_OBJECT_UNLOCK (sink);
|
|
return;
|
|
}
|
|
|
|
#ifndef GST_DISABLE_GST_DEBUG
|
|
static int
|
|
gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type, char *data,
|
|
size_t size, void *clientp)
|
|
{
|
|
GstCurlBaseSink *sink = (GstCurlBaseSink *) clientp;
|
|
gchar *msg = NULL;
|
|
|
|
switch (type) {
|
|
case CURLINFO_TEXT:
|
|
case CURLINFO_HEADER_IN:
|
|
case CURLINFO_HEADER_OUT:
|
|
msg = g_memdup (data, size);
|
|
if (size > 0) {
|
|
msg[size - 1] = '\0';
|
|
g_strchomp (msg);
|
|
}
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
switch (type) {
|
|
case CURLINFO_TEXT:
|
|
GST_DEBUG_OBJECT (sink, "%s", msg);
|
|
break;
|
|
case CURLINFO_HEADER_IN:
|
|
GST_DEBUG_OBJECT (sink, "incoming header: %s", msg);
|
|
break;
|
|
case CURLINFO_HEADER_OUT:
|
|
GST_DEBUG_OBJECT (sink, "outgoing header: %s", msg);
|
|
break;
|
|
case CURLINFO_DATA_IN:
|
|
GST_MEMDUMP_OBJECT (sink, "incoming data", (guint8 *) data, size);
|
|
break;
|
|
case CURLINFO_DATA_OUT:
|
|
GST_MEMDUMP_OBJECT (sink, "outgoing data", (guint8 *) data, size);
|
|
break;
|
|
case CURLINFO_SSL_DATA_IN:
|
|
GST_MEMDUMP_OBJECT (sink, "incoming ssl data", (guint8 *) data, size);
|
|
break;
|
|
case CURLINFO_SSL_DATA_OUT:
|
|
GST_MEMDUMP_OBJECT (sink, "outgoing ssl data", (guint8 *) data, size);
|
|
break;
|
|
default:
|
|
GST_DEBUG_OBJECT (sink, "unknown debug info type %d", type);
|
|
GST_MEMDUMP_OBJECT (sink, "unknown data", (guint8 *) data, size);
|
|
break;
|
|
}
|
|
g_free (msg);
|
|
return 0;
|
|
}
|
|
#endif
|
|
|
|
/* This function gets called by libcurl after the socket() call but before
|
|
* the connect() call. */
|
|
static int
|
|
gst_curl_base_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
|
|
curlsocktype socket_type)
|
|
{
|
|
GstCurlBaseSink *sink;
|
|
gboolean ret = TRUE;
|
|
|
|
sink = (GstCurlBaseSink *) clientp;
|
|
|
|
g_assert (sink);
|
|
|
|
if (curlfd < 0) {
|
|
/* signal an unrecoverable error to the library which will close the socket
|
|
and return CURLE_COULDNT_CONNECT
|
|
*/
|
|
GST_DEBUG_OBJECT (sink, "no curlfd");
|
|
return 1;
|
|
}
|
|
|
|
GST_OBJECT_LOCK (sink);
|
|
sink->socket_type = socket_type;
|
|
|
|
if (sink->fd.fd != curlfd) {
|
|
if (sink->fd.fd > 0 && sink->socket_type != CURLSOCKTYPE_ACCEPT) {
|
|
ret &= gst_poll_remove_fd (sink->fdset, &sink->fd);
|
|
}
|
|
sink->fd.fd = curlfd;
|
|
ret &= gst_poll_add_fd (sink->fdset, &sink->fd);
|
|
ret &= gst_poll_fd_ctl_write (sink->fdset, &sink->fd, TRUE);
|
|
ret &= gst_poll_fd_ctl_read (sink->fdset, &sink->fd, TRUE);
|
|
}
|
|
GST_DEBUG_OBJECT (sink, "fd: %d", sink->fd.fd);
|
|
gst_curl_base_sink_setup_dscp_unlocked (sink);
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
/* success */
|
|
return ret ? 0 : 1;
|
|
}
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_transfer_start_unlocked (GstCurlBaseSink * sink)
|
|
{
|
|
GError *error = NULL;
|
|
gboolean ret = TRUE;
|
|
|
|
GST_LOG ("creating transfer thread");
|
|
sink->transfer_thread_close = FALSE;
|
|
sink->new_file = TRUE;
|
|
sink->transfer_thread = g_thread_try_new ("curl-transfer", (GThreadFunc)
|
|
gst_curl_base_sink_transfer_thread_func, sink, &error);
|
|
|
|
if (sink->transfer_thread == NULL || error != NULL) {
|
|
ret = FALSE;
|
|
if (error) {
|
|
GST_ERROR_OBJECT (sink, "could not create thread %s", error->message);
|
|
g_error_free (error);
|
|
} else {
|
|
GST_ERROR_OBJECT (sink, "could not create thread for unknown reason");
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static gpointer
|
|
gst_curl_base_sink_transfer_thread_func (gpointer data)
|
|
{
|
|
GstCurlBaseSink *sink = (GstCurlBaseSink *) data;
|
|
GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
|
|
GstFlowReturn ret;
|
|
gboolean data_available;
|
|
|
|
GST_LOG ("transfer thread started");
|
|
GST_OBJECT_LOCK (sink);
|
|
if (!gst_curl_base_sink_transfer_setup_unlocked (sink)) {
|
|
/* no need to set sink->error, as it is set by the called function */
|
|
sink->flow_ret = GST_FLOW_ERROR;
|
|
goto done;
|
|
}
|
|
|
|
while (!sink->transfer_thread_close && sink->flow_ret == GST_FLOW_OK) {
|
|
/* we are working on a new file, clearing flag and setting a new file
|
|
* name */
|
|
sink->new_file = FALSE;
|
|
|
|
/* wait for data to arrive for this new file, if we get a new file name
|
|
* again before getting data we will simply skip transferring anything
|
|
* for this file and go directly to the new file */
|
|
data_available = gst_curl_base_sink_wait_for_data_unlocked (sink);
|
|
if (data_available) {
|
|
if (G_UNLIKELY (!klass->set_protocol_dynamic_options_unlocked (sink))) {
|
|
sink->error = g_strdup ("unexpected state");
|
|
sink->flow_ret = GST_FLOW_ERROR;
|
|
goto done;
|
|
}
|
|
}
|
|
|
|
/* stay unlocked while handling the actual transfer */
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
if (data_available) {
|
|
GST_LOG ("have data");
|
|
if (!gst_curl_base_sink_is_live (sink)) {
|
|
/* prepare transfer if needed */
|
|
if (klass->prepare_transfer) {
|
|
GST_OBJECT_LOCK (sink);
|
|
if (!klass->prepare_transfer (sink)) {
|
|
sink->flow_ret = GST_FLOW_ERROR;
|
|
goto done;
|
|
}
|
|
GST_OBJECT_UNLOCK (sink);
|
|
}
|
|
GST_LOG ("adding handle");
|
|
curl_multi_add_handle (sink->multi_handle, sink->curl);
|
|
}
|
|
|
|
/* Start driving the transfer. */
|
|
klass->handle_transfer (sink);
|
|
|
|
/* easy handle will be possibly re-used for next transfer, thus it needs
|
|
* to be removed from the multi stack and re-added again */
|
|
if (!gst_curl_base_sink_is_live (sink)) {
|
|
GST_LOG ("removing handle");
|
|
curl_multi_remove_handle (sink->multi_handle, sink->curl);
|
|
}
|
|
} else {
|
|
GST_LOG ("have no data yet");
|
|
}
|
|
|
|
/* lock again before looping to check the thread closed flag */
|
|
GST_OBJECT_LOCK (sink);
|
|
}
|
|
|
|
if (sink->is_live) {
|
|
GST_LOG ("removing handle");
|
|
curl_multi_remove_handle (sink->multi_handle, sink->curl);
|
|
}
|
|
|
|
done:
|
|
gst_curl_base_sink_transfer_cleanup (sink);
|
|
|
|
/* extract the error code so the lock does not have to be
|
|
* taken when calling the functions below that take the lock
|
|
* on their own */
|
|
ret = sink->flow_ret;
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
/* if there is a flow error, always notify the render function so it
|
|
* can return the flow error up along the pipeline. as an error has
|
|
* occurred there is no response to receive, so notify the event function
|
|
* so it doesn't block indefinitely waiting for a response. */
|
|
if (ret != GST_FLOW_OK) {
|
|
gst_curl_base_sink_data_sent_notify (sink);
|
|
gst_curl_base_sink_got_response_notify (sink);
|
|
}
|
|
|
|
GST_DEBUG ("exit thread func - transfer thread close flag: %d",
|
|
sink->transfer_thread_close);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_transfer_setup_unlocked (GstCurlBaseSink * sink)
|
|
{
|
|
g_assert (sink);
|
|
|
|
if (sink->curl == NULL) {
|
|
/* curl_easy_init automatically calls curl_global_init(3) */
|
|
if ((sink->curl = curl_easy_init ()) == NULL) {
|
|
sink->error = g_strdup ("failed to init curl easy handle");
|
|
return FALSE;
|
|
}
|
|
}
|
|
|
|
if (!gst_curl_base_sink_transfer_set_options_unlocked (sink)) {
|
|
if (!sink->error) {
|
|
sink->error = g_strdup ("failed to setup curl easy handle");
|
|
}
|
|
return FALSE;
|
|
}
|
|
|
|
/* init a multi stack (non-blocking interface to libcurl) */
|
|
if (sink->multi_handle == NULL) {
|
|
if ((sink->multi_handle = curl_multi_init ()) == NULL) {
|
|
sink->error = g_strdup ("failed to init curl multi handle");
|
|
return FALSE;
|
|
}
|
|
}
|
|
|
|
GST_LOG ("transfer setup done");
|
|
return TRUE;
|
|
}
|
|
|
|
static void
|
|
gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink)
|
|
{
|
|
if (sink->curl != NULL) {
|
|
if (sink->multi_handle != NULL) {
|
|
curl_multi_remove_handle (sink->multi_handle, sink->curl);
|
|
}
|
|
curl_easy_cleanup (sink->curl);
|
|
sink->curl = NULL;
|
|
}
|
|
|
|
if (sink->multi_handle != NULL) {
|
|
curl_multi_cleanup (sink->multi_handle);
|
|
sink->multi_handle = NULL;
|
|
}
|
|
}
|
|
|
|
static gboolean
|
|
gst_curl_base_sink_wait_for_data_unlocked (GstCurlBaseSink * sink)
|
|
{
|
|
gboolean data_available = FALSE;
|
|
|
|
GST_LOG ("waiting for data");
|
|
while (!sink->transfer_cond->data_available &&
|
|
!sink->transfer_thread_close && !sink->new_file) {
|
|
g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
|
|
}
|
|
|
|
if (sink->transfer_thread_close) {
|
|
GST_LOG ("wait for data aborted due to thread close");
|
|
} else if (sink->new_file) {
|
|
GST_LOG ("wait for data aborted due to new file name");
|
|
} else {
|
|
GST_LOG ("wait for data completed");
|
|
data_available = TRUE;
|
|
}
|
|
|
|
return data_available;
|
|
}
|
|
|
|
static void
|
|
gst_curl_base_sink_new_file_notify_unlocked (GstCurlBaseSink * sink)
|
|
{
|
|
GST_LOG ("new file name");
|
|
sink->new_file = TRUE;
|
|
g_cond_signal (&sink->transfer_cond->cond);
|
|
}
|
|
|
|
static void
|
|
gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
|
|
(GstCurlBaseSink * sink)
|
|
{
|
|
GST_LOG ("waiting for buffer send to complete");
|
|
|
|
/* this function should not check if the transfer thread is set to be closed
|
|
* since that flag only can be set by the EOS event (by the pipeline thread).
|
|
* This can therefore never happen while this function is running since this
|
|
* function also is called by the pipeline thread (in the render function) */
|
|
while (!sink->transfer_cond->data_sent) {
|
|
g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
|
|
}
|
|
GST_LOG ("buffer send completed");
|
|
}
|
|
|
|
static void
|
|
gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink)
|
|
{
|
|
GST_LOG ("transfer completed");
|
|
GST_OBJECT_LOCK (sink);
|
|
sink->transfer_cond->data_available = FALSE;
|
|
sink->transfer_cond->data_sent = TRUE;
|
|
g_cond_signal (&sink->transfer_cond->cond);
|
|
GST_OBJECT_UNLOCK (sink);
|
|
}
|
|
|
|
static void
|
|
gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink)
|
|
{
|
|
GST_LOG ("waiting for remote to send response code");
|
|
|
|
GST_OBJECT_LOCK (sink);
|
|
while (sink->transfer_cond->wait_for_response) {
|
|
g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
|
|
}
|
|
GST_OBJECT_UNLOCK (sink);
|
|
|
|
GST_LOG ("response code received");
|
|
}
|
|
|
|
static void
|
|
gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink)
|
|
{
|
|
GST_LOG ("got response code");
|
|
|
|
GST_OBJECT_LOCK (sink);
|
|
sink->transfer_cond->wait_for_response = FALSE;
|
|
g_cond_signal (&sink->transfer_cond->cond);
|
|
GST_OBJECT_UNLOCK (sink);
|
|
}
|
|
|
|
static gint
|
|
gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink)
|
|
{
|
|
gint tos;
|
|
gint af;
|
|
gint ret = -1;
|
|
union
|
|
{
|
|
struct sockaddr sa;
|
|
struct sockaddr_in6 sa_in6;
|
|
struct sockaddr_storage sa_stor;
|
|
} sa;
|
|
socklen_t slen = sizeof (sa);
|
|
|
|
if (getsockname (sink->fd.fd, &sa.sa, &slen) < 0) {
|
|
GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
|
|
return ret;
|
|
}
|
|
af = sa.sa.sa_family;
|
|
|
|
/* if this is an IPv4-mapped address then do IPv4 QoS */
|
|
if (af == AF_INET6) {
|
|
GST_DEBUG_OBJECT (sink, "check IP6 socket");
|
|
if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
|
|
GST_DEBUG_OBJECT (sink, "mapped to IPV4");
|
|
af = AF_INET;
|
|
}
|
|
}
|
|
/* extract and shift 6 bits of the DSCP */
|
|
tos = (sink->qos_dscp & 0x3f) << 2;
|
|
|
|
switch (af) {
|
|
case AF_INET:
|
|
ret = setsockopt (sink->fd.fd, IPPROTO_IP, IP_TOS, (void *) &tos,
|
|
sizeof (tos));
|
|
break;
|
|
case AF_INET6:
|
|
#ifdef IPV6_TCLASS
|
|
ret = setsockopt (sink->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, (void *) &tos,
|
|
sizeof (tos));
|
|
break;
|
|
#endif
|
|
default:
|
|
GST_ERROR_OBJECT (sink, "unsupported AF");
|
|
break;
|
|
}
|
|
if (ret) {
|
|
GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
|
|
}
|
|
|
|
return ret;
|
|
}
|