gstreamer/subprojects/gst-plugins-bad/ext/gs/gstgssink.cpp
Sebastian Dröge a12762a29a gs: Add support for authenticating via Service Account Credentials
This allows authenticating directly with Server Account credentials
instead of having it configured on host system separately, and thus
allows using arbitrary accounts configured/selected at runtime.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/934>
2021-09-27 15:36:05 +00:00

823 lines
27 KiB
C++

/* GStreamer
* Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
*
* gstgssink.cpp:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-gssink
* @title: gssink
* @see_also: #GstGsSrc
*
* Write incoming data to a series of sequentially-named remote files on a
* Google Cloud Storage bucket.
*
* The object-name property should contain a string with a \%d placeholder
* that will be substituted with the index for each filename.
*
* If the #GstGsSink:post-messages property is %TRUE, it sends an application
* message named `GstGsSink` after writing each buffer.
*
* The message's structure contains these fields:
*
* * #gchararray `filename`: the filename where the buffer was written.
* * #gchararray `date`: the date of the current buffer, NULL if no start date
* is provided.
* * #gint `index`: index of the buffer.
* * #GstClockTime `timestamp`: the timestamp of the buffer.
* * #GstClockTime `stream-time`: the stream time of the buffer.
* * #GstClockTime `running-time`: the running_time of the buffer.
* * #GstClockTime `duration`: the duration of the buffer.
* * #guint64 `offset`: the offset of the buffer that triggered the message.
* * #guint64 `offset-end`: the offset-end of the buffer that triggered the
* message.
*
* ## Example launch line
* ```
* gst-launch-1.0 videotestsrc num-buffers=15 ! pngenc ! gssink
* object-name="mypath/myframes/frame%05d.png" bucket-name="mybucket"
* next-file=buffer post-messages=true
* ```
* ### Upload 15 png images into gs://mybucket/mypath/myframes/ where the file
* names are frame00000.png, frame00001.png, ..., frame00014.png
* ```
* gst-launch-1.0 videotestsrc num-buffers=6 ! video/x-raw, framerate=2/1 !
* pngenc ! gssink start-date="2020-04-16T08:55:03Z"
* object-name="mypath/myframes/im_%s_%03d.png" bucket-name="mybucket"
* next-file=buffer post-messages=true
* ```
* ### Upload png 6 images into gs://mybucket/mypath/myframes/ where the file
* names are im_2020-04-16T08:55:03Z_000.png, im_2020-04-16T08:55:03Z_001.png,
* im_2020-04-16T08:55:04Z_002.png, im_2020-04-16T08:55:04Z_003.png,
* im_2020-04-16T08:55:05Z_004.png, im_2020-04-16T08:55:05Z_005.png.
* ```
* gst-launch-1.0 filesrc location=some_video.mp4 ! gssink
* object-name="mypath/myvideos/video.mp4" bucket-name="mybucket" next-file=none
* ```
* ### Upload any stream as a single file into Google Cloud Storage. Similar as
* filesink in this case. The file is then accessible from:
* gs://mybucket/mypath/myvideos/video.mp4
*
* See also: #GstGsSrc
* Since: 1.20
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstgscommon.h"
#include "gstgssink.h"
#include <algorithm>
static GstStaticPadTemplate sinktemplate =
GST_STATIC_PAD_TEMPLATE("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC(gst_gs_sink_debug);
#define GST_CAT_DEFAULT gst_gs_sink_debug
#define DEFAULT_INDEX 0
#define DEFAULT_NEXT_FILE GST_GS_SINK_NEXT_BUFFER
#define DEFAULT_OBJECT_NAME "%s_%05d"
#define DEFAULT_POST_MESSAGES FALSE
namespace gcs = google::cloud::storage;
enum {
PROP_0,
PROP_BUCKET_NAME,
PROP_OBJECT_NAME,
PROP_INDEX,
PROP_POST_MESSAGES,
PROP_NEXT_FILE,
PROP_SERVICE_ACCOUNT_EMAIL,
PROP_START_DATE,
PROP_SERVICE_ACCOUNT_CREDENTIALS,
};
class GSWriteStream;
struct _GstGsSink {
GstBaseSink parent;
std::unique_ptr<google::cloud::storage::Client> gcs_client;
std::unique_ptr<GSWriteStream> gcs_stream;
gchar* service_account_email;
gchar* service_account_credentials;
gchar* bucket_name;
gchar* object_name;
gchar* start_date_str;
GDateTime* start_date;
gint index;
gboolean post_messages;
GstGsSinkNext next_file;
const gchar* content_type;
size_t nb_percent_format;
gboolean percent_s_is_first;
};
static void gst_gs_sink_finalize(GObject* object);
static void gst_gs_sink_set_property(GObject* object,
guint prop_id,
const GValue* value,
GParamSpec* pspec);
static void gst_gs_sink_get_property(GObject* object,
guint prop_id,
GValue* value,
GParamSpec* pspec);
static gboolean gst_gs_sink_start(GstBaseSink* bsink);
static gboolean gst_gs_sink_stop(GstBaseSink* sink);
static GstFlowReturn gst_gs_sink_render(GstBaseSink* sink, GstBuffer* buffer);
static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
GstBufferList* buffer_list);
static gboolean gst_gs_sink_set_caps(GstBaseSink* sink, GstCaps* caps);
static gboolean gst_gs_sink_event(GstBaseSink* sink, GstEvent* event);
#define GST_TYPE_GS_SINK_NEXT (gst_gs_sink_next_get_type())
static GType gst_gs_sink_next_get_type(void) {
static GType gs_sink_next_type = 0;
static const GEnumValue next_types[] = {
{GST_GS_SINK_NEXT_BUFFER, "New file for each buffer", "buffer"},
{GST_GS_SINK_NEXT_NONE, "Only one file, no next file", "none"},
{0, NULL, NULL}};
if (!gs_sink_next_type) {
gs_sink_next_type = g_enum_register_static("GstGsSinkNext", next_types);
}
return gs_sink_next_type;
}
#define gst_gs_sink_parent_class parent_class
G_DEFINE_TYPE(GstGsSink, gst_gs_sink, GST_TYPE_BASE_SINK);
GST_ELEMENT_REGISTER_DEFINE(gssink, "gssink", GST_RANK_NONE, GST_TYPE_GS_SINK)
class GSWriteStream {
public:
GSWriteStream(google::cloud::storage::Client& client,
const char* bucket_name,
const char* object_name,
const char* content_type)
: gcs_stream_(client.WriteObject(
bucket_name,
object_name,
gcs::WithObjectMetadata(
gcs::ObjectMetadata().set_content_type(content_type)))) {}
~GSWriteStream() { gcs_stream_.Close(); }
gcs::ObjectWriteStream& stream() { return gcs_stream_; }
private:
gcs::ObjectWriteStream gcs_stream_;
};
static void gst_gs_sink_class_init(GstGsSinkClass* klass) {
GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
GstBaseSinkClass* gstbasesink_class = GST_BASE_SINK_CLASS(klass);
gobject_class->set_property = gst_gs_sink_set_property;
gobject_class->get_property = gst_gs_sink_get_property;
/**
* GstGsSink:bucket-name:
*
* Name of the Google Cloud Storage bucket.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_BUCKET_NAME,
g_param_spec_string(
"bucket-name", "Bucket Name", "Google Cloud Storage Bucket Name",
NULL, (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:object-name:
*
* Full path name of the remote file.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_OBJECT_NAME,
g_param_spec_string(
"object-name", "Object Name", "Full path name of the remote file",
DEFAULT_OBJECT_NAME,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:index:
*
* Index to use with location property to create file names.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_INDEX,
g_param_spec_int(
"index", "Index",
"Index to use with location property to create file names. The "
"index is incremented by one for each buffer written.",
0, G_MAXINT, DEFAULT_INDEX,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:post-messages:
*
* Post a message on the GstBus for each file.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_POST_MESSAGES,
g_param_spec_boolean(
"post-messages", "Post Messages",
"Post a message for each file with information of the buffer",
DEFAULT_POST_MESSAGES,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:next-file:
*
* A #GstGsSinkNext that specifies when to start a new file.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_NEXT_FILE,
g_param_spec_enum(
"next-file", "Next File", "When to start a new file",
GST_TYPE_GS_SINK_NEXT, DEFAULT_NEXT_FILE,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:service-account-email:
*
* Service Account Email to use for credentials.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
g_param_spec_string(
"service-account-email", "Service Account Email",
"Service Account Email to use for credentials", NULL,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
/**
* GstGsSink:service-account-credentials:
*
* Service Account Credentials as a JSON string to use for credentials.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_SERVICE_ACCOUNT_CREDENTIALS,
g_param_spec_string(
"service-account-credentials", "Service Account Credentials",
"Service Account Credentials as a JSON string to use for credentials",
NULL,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
/**
* GstGsSink:start-date:
*
* Start date in iso8601 format.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_START_DATE,
g_param_spec_string(
"start-date", "Start Date", "Start date in iso8601 format", NULL,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
gobject_class->finalize = gst_gs_sink_finalize;
gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR(gst_gs_sink_stop);
gstbasesink_class->render = GST_DEBUG_FUNCPTR(gst_gs_sink_render);
gstbasesink_class->render_list = GST_DEBUG_FUNCPTR(gst_gs_sink_render_list);
gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR(gst_gs_sink_set_caps);
gstbasesink_class->event = GST_DEBUG_FUNCPTR(gst_gs_sink_event);
GST_DEBUG_CATEGORY_INIT(gst_gs_sink_debug, "gssink", 0, "gssink element");
gst_element_class_add_static_pad_template(gstelement_class, &sinktemplate);
gst_element_class_set_static_metadata(
gstelement_class, "Google Cloud Storage Sink", "Sink/File",
"Write buffers to a sequentially named set of files on Google Cloud "
"Storage",
"Julien Isorce <jisorce@oblong.com>");
}
static void gst_gs_sink_init(GstGsSink* sink) {
sink->gcs_client = nullptr;
sink->gcs_stream = nullptr;
sink->index = DEFAULT_INDEX;
sink->post_messages = DEFAULT_POST_MESSAGES;
sink->service_account_email = NULL;
sink->service_account_credentials = NULL;
sink->bucket_name = NULL;
sink->object_name = g_strdup(DEFAULT_OBJECT_NAME);
sink->start_date_str = NULL;
sink->start_date = NULL;
sink->next_file = DEFAULT_NEXT_FILE;
sink->content_type = NULL;
sink->nb_percent_format = 0;
sink->percent_s_is_first = FALSE;
gst_base_sink_set_sync(GST_BASE_SINK(sink), FALSE);
}
static void gst_gs_sink_finalize(GObject* object) {
GstGsSink* sink = GST_GS_SINK(object);
sink->gcs_client = nullptr;
sink->gcs_stream = nullptr;
g_free(sink->service_account_email);
sink->service_account_email = NULL;
g_free(sink->service_account_credentials);
sink->service_account_credentials = NULL;
g_free(sink->bucket_name);
sink->bucket_name = NULL;
g_free(sink->object_name);
sink->object_name = NULL;
g_free(sink->start_date_str);
sink->start_date_str = NULL;
if (sink->start_date) {
g_date_time_unref(sink->start_date);
sink->start_date = NULL;
}
sink->content_type = NULL;
G_OBJECT_CLASS(parent_class)->finalize(object);
}
static gboolean gst_gs_sink_set_object_name(GstGsSink* sink,
const gchar* object_name) {
g_free(sink->object_name);
sink->object_name = NULL;
sink->nb_percent_format = 0;
sink->percent_s_is_first = FALSE;
if (!object_name) {
GST_ERROR_OBJECT(sink, "Object name is null");
return FALSE;
}
const std::string name(object_name);
sink->nb_percent_format = std::count(name.begin(), name.end(), '%');
if (sink->nb_percent_format > 2) {
GST_ERROR_OBJECT(sink, "Object name has too many formats");
return FALSE;
}
const size_t delimiter_percent_s = name.find("%s");
if (delimiter_percent_s == std::string::npos) {
if (sink->nb_percent_format == 2) {
GST_ERROR_OBJECT(sink, "Object name must have just one number format");
return FALSE;
}
sink->object_name = g_strdup(object_name);
return TRUE;
}
const size_t delimiter_percent = name.find_first_of('%');
if (delimiter_percent_s == delimiter_percent) {
sink->percent_s_is_first = TRUE;
if (name.find("%s", delimiter_percent_s + 1) != std::string::npos) {
GST_ERROR_OBJECT(sink, "Object name expect max one string format");
return FALSE;
}
}
sink->object_name = g_strdup(object_name);
return TRUE;
}
static void gst_gs_sink_set_property(GObject* object,
guint prop_id,
const GValue* value,
GParamSpec* pspec) {
GstGsSink* sink = GST_GS_SINK(object);
switch (prop_id) {
case PROP_BUCKET_NAME:
g_free(sink->bucket_name);
sink->bucket_name = g_strdup(g_value_get_string(value));
break;
case PROP_OBJECT_NAME:
gst_gs_sink_set_object_name(sink, g_value_get_string(value));
break;
case PROP_INDEX:
sink->index = g_value_get_int(value);
break;
case PROP_POST_MESSAGES:
sink->post_messages = g_value_get_boolean(value);
break;
case PROP_NEXT_FILE:
sink->next_file = (GstGsSinkNext)g_value_get_enum(value);
break;
case PROP_SERVICE_ACCOUNT_EMAIL:
g_free(sink->service_account_email);
sink->service_account_email = g_strdup(g_value_get_string(value));
break;
case PROP_SERVICE_ACCOUNT_CREDENTIALS:
g_free(sink->service_account_credentials);
sink->service_account_credentials = g_strdup(g_value_get_string(value));
break;
case PROP_START_DATE:
g_free(sink->start_date_str);
if (sink->start_date)
g_date_time_unref(sink->start_date);
sink->start_date_str = g_strdup(g_value_get_string(value));
sink->start_date =
g_date_time_new_from_iso8601(sink->start_date_str, NULL);
if (!sink->start_date) {
GST_ERROR_OBJECT(sink, "Failed to parse start date %s",
sink->start_date_str);
g_free(sink->start_date_str);
sink->start_date_str = NULL;
}
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static void gst_gs_sink_get_property(GObject* object,
guint prop_id,
GValue* value,
GParamSpec* pspec) {
GstGsSink* sink = GST_GS_SINK(object);
switch (prop_id) {
case PROP_BUCKET_NAME:
g_value_set_string(value, sink->bucket_name);
break;
case PROP_OBJECT_NAME:
g_value_set_string(value, sink->object_name);
break;
case PROP_INDEX:
g_value_set_int(value, sink->index);
break;
case PROP_POST_MESSAGES:
g_value_set_boolean(value, sink->post_messages);
break;
case PROP_NEXT_FILE:
g_value_set_enum(value, sink->next_file);
break;
case PROP_SERVICE_ACCOUNT_EMAIL:
g_value_set_string(value, sink->service_account_email);
break;
case PROP_SERVICE_ACCOUNT_CREDENTIALS:
g_value_set_string(value, sink->service_account_credentials);
break;
case PROP_START_DATE:
g_value_set_string(value, sink->start_date_str);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static gboolean gst_gs_sink_start(GstBaseSink* bsink) {
GstGsSink* sink = GST_GS_SINK(bsink);
GError* err = NULL;
if (!sink->bucket_name) {
GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Bucket name is required"),
GST_ERROR_SYSTEM);
return FALSE;
}
if (!sink->object_name) {
GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Object name is required"),
GST_ERROR_SYSTEM);
return FALSE;
}
sink->content_type = "";
sink->gcs_client = gst_gs_create_client(
sink->service_account_email, sink->service_account_credentials, &err);
if (err) {
GST_ELEMENT_ERROR(sink, RESOURCE, OPEN_READ,
("Could not create client (%s)", err->message),
GST_ERROR_SYSTEM);
g_clear_error(&err);
return FALSE;
}
GST_INFO_OBJECT(sink, "Using bucket name (%s) and object name (%s)",
sink->bucket_name, sink->object_name);
return TRUE;
}
static gboolean gst_gs_sink_stop(GstBaseSink* bsink) {
GstGsSink* sink = GST_GS_SINK(bsink);
sink->gcs_client = nullptr;
sink->gcs_stream = nullptr;
sink->content_type = NULL;
return TRUE;
}
static void gst_gs_sink_post_message_full(GstGsSink* sink,
GstClockTime timestamp,
GstClockTime duration,
GstClockTime offset,
GstClockTime offset_end,
GstClockTime running_time,
GstClockTime stream_time,
const char* filename,
const gchar* date) {
GstStructure* s;
if (!sink->post_messages)
return;
s = gst_structure_new("GstGsSink", "filename", G_TYPE_STRING, filename,
"date", G_TYPE_STRING, date, "index", G_TYPE_INT,
sink->index, "timestamp", G_TYPE_UINT64, timestamp,
"stream-time", G_TYPE_UINT64, stream_time,
"running-time", G_TYPE_UINT64, running_time, "duration",
G_TYPE_UINT64, duration, "offset", G_TYPE_UINT64,
offset, "offset-end", G_TYPE_UINT64, offset_end, NULL);
gst_element_post_message(GST_ELEMENT_CAST(sink),
gst_message_new_element(GST_OBJECT_CAST(sink), s));
}
static void gst_gs_sink_post_message_from_time(GstGsSink* sink,
GstClockTime timestamp,
GstClockTime duration,
const char* filename) {
GstClockTime running_time, stream_time;
guint64 offset, offset_end;
GstSegment* segment;
GstFormat format;
if (!sink->post_messages)
return;
segment = &GST_BASE_SINK(sink)->segment;
format = segment->format;
offset = -1;
offset_end = -1;
running_time = gst_segment_to_running_time(segment, format, timestamp);
stream_time = gst_segment_to_stream_time(segment, format, timestamp);
gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
running_time, stream_time, filename, NULL);
}
static void gst_gs_sink_post_message(GstGsSink* sink,
GstBuffer* buffer,
const char* filename,
const char* date) {
GstClockTime duration, timestamp;
GstClockTime running_time, stream_time;
guint64 offset, offset_end;
GstSegment* segment;
GstFormat format;
if (!sink->post_messages)
return;
segment = &GST_BASE_SINK(sink)->segment;
format = segment->format;
timestamp = GST_BUFFER_PTS(buffer);
duration = GST_BUFFER_DURATION(buffer);
offset = GST_BUFFER_OFFSET(buffer);
offset_end = GST_BUFFER_OFFSET_END(buffer);
running_time = gst_segment_to_running_time(segment, format, timestamp);
stream_time = gst_segment_to_stream_time(segment, format, timestamp);
gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
running_time, stream_time, filename, date);
}
static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
GstBuffer* buffer) {
GstMapInfo map = {0};
gchar* object_name = NULL;
gchar* buffer_date = NULL;
if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
return GST_FLOW_ERROR;
switch (sink->next_file) {
case GST_GS_SINK_NEXT_BUFFER: {
// Get buffer date if needed.
if (sink->start_date) {
if (sink->nb_percent_format != 2) {
GST_ERROR_OBJECT(sink, "Object name expects date and index");
gst_buffer_unmap(buffer, &map);
return GST_FLOW_ERROR;
}
if (!gst_gs_get_buffer_date(buffer, sink->start_date, &buffer_date)) {
GST_ERROR_OBJECT(sink, "Could not get buffer date %s", object_name);
gst_buffer_unmap(buffer, &map);
return GST_FLOW_ERROR;
}
if (sink->percent_s_is_first) {
object_name =
g_strdup_printf(sink->object_name, buffer_date, sink->index);
} else {
object_name =
g_strdup_printf(sink->object_name, sink->index, buffer_date);
}
} else {
if (sink->nb_percent_format != 1) {
GST_ERROR_OBJECT(sink, "Object name expects only an index");
gst_buffer_unmap(buffer, &map);
return GST_FLOW_ERROR;
}
object_name = g_strdup_printf(sink->object_name, sink->index);
}
GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject(
sink->bucket_name, object_name,
gcs::WithObjectMetadata(
gcs::ObjectMetadata().set_content_type(sink->content_type)));
gcs_stream.write(reinterpret_cast<const char*>(map.data), map.size);
if (gcs_stream.fail()) {
GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
}
gcs_stream.Close();
google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
sink->gcs_client->GetObjectMetadata(sink->bucket_name, object_name);
if (!object_metadata) {
GST_ERROR_OBJECT(
sink, "Could not get object metadata for object %s (%s)",
object_name, object_metadata.status().message().c_str());
gst_buffer_unmap(buffer, &map);
g_free(object_name);
g_free(buffer_date);
return GST_FLOW_ERROR;
}
GST_INFO_OBJECT(sink, "Wrote object %s of size %" G_GUINT64_FORMAT "\n",
object_name, object_metadata->size());
gst_gs_sink_post_message(sink, buffer, object_name, buffer_date);
g_free(object_name);
g_free(buffer_date);
++sink->index;
break;
}
case GST_GS_SINK_NEXT_NONE: {
if (!sink->gcs_stream) {
GST_INFO_OBJECT(sink, "Opening %s", sink->object_name);
sink->gcs_stream = std::make_unique<GSWriteStream>(
*sink->gcs_client.get(), sink->bucket_name, sink->object_name,
sink->content_type);
if (!sink->gcs_stream->stream().IsOpen()) {
GST_ELEMENT_ERROR(
sink, RESOURCE, OPEN_READ,
("Could not create write stream (%s)",
sink->gcs_stream->stream().last_status().message().c_str()),
GST_ERROR_SYSTEM);
gst_buffer_unmap(buffer, &map);
return GST_FLOW_OK;
}
}
GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
gcs::ObjectWriteStream& stream = sink->gcs_stream->stream();
stream.write(reinterpret_cast<const char*>(map.data), map.size);
if (stream.fail()) {
GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
}
break;
}
default:
g_assert_not_reached();
}
gst_buffer_unmap(buffer, &map);
return GST_FLOW_OK;
}
static GstFlowReturn gst_gs_sink_render(GstBaseSink* bsink, GstBuffer* buffer) {
GstGsSink* sink = GST_GS_SINK(bsink);
GstFlowReturn flow = GST_FLOW_OK;
flow = gst_gs_sink_write_buffer(sink, buffer);
return flow;
}
static gboolean buffer_list_copy_data(GstBuffer** buf,
guint idx,
gpointer data) {
GstBuffer* dest = GST_BUFFER_CAST(data);
guint num, i;
if (idx == 0)
gst_buffer_copy_into(dest, *buf, GST_BUFFER_COPY_METADATA, 0, -1);
num = gst_buffer_n_memory(*buf);
for (i = 0; i < num; ++i) {
GstMemory* mem;
mem = gst_buffer_get_memory(*buf, i);
gst_buffer_append_memory(dest, mem);
}
return TRUE;
}
/* Our assumption for now is that the buffers in a buffer list should always
* end up in the same file. If someone wants different behaviour, they'll just
* have to add a property for that. */
static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
GstBufferList* list) {
GstBuffer* buf;
guint size;
size = gst_buffer_list_calculate_size(list);
GST_LOG_OBJECT(sink, "total size of buffer list %p: %u", list, size);
/* copy all buffers in the list into one single buffer, so we can use
* the normal render function (FIXME: optimise to avoid the memcpy) */
buf = gst_buffer_new();
gst_buffer_list_foreach(list, buffer_list_copy_data, buf);
g_assert(gst_buffer_get_size(buf) == size);
gst_gs_sink_render(sink, buf);
gst_buffer_unref(buf);
return GST_FLOW_OK;
}
static gboolean gst_gs_sink_set_caps(GstBaseSink* bsink, GstCaps* caps) {
GstGsSink* sink = GST_GS_SINK(bsink);
GstStructure* s = gst_caps_get_structure(caps, 0);
sink->content_type = gst_structure_get_name(s);
GST_INFO_OBJECT(sink, "Content type: %s", sink->content_type);
return TRUE;
}
static gboolean gst_gs_sink_event(GstBaseSink* bsink, GstEvent* event) {
GstGsSink* sink = GST_GS_SINK(bsink);
switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_EOS:
if (sink->gcs_stream) {
sink->gcs_stream = nullptr;
gst_gs_sink_post_message_from_time(
sink, GST_BASE_SINK(sink)->segment.position, -1, sink->object_name);
}
break;
default:
break;
}
return GST_BASE_SINK_CLASS(parent_class)->event(bsink, event);
}