gstreamer/ext/gs/gstgssink.cpp
Julien e9f5d94c93 gs: add source and sink for Google Cloud Storage
Useful when having a service that runs a GStreamer pipeline
or application in Google Cloud to avoid storing the inputs
and outputs in the running container or service. For example
when analyzing a video from a Google Cloud Storage bucket
and extracting images or converting the video and then uploading
the results into another Google Cloud Storage bucket.

- gssrc allows to read from a file located in Google Cloud
Storage and it supports seeking.
- gssink allows to write to a file located in Google Cloud
Storage. There are 2 modes, one similar to multifilesink and
the other similar to filesink.

Example:
  gst-launch-1.0 gssrc location=gs://mybucket/videos/sample.mp4 ! decodebin ! glimagesink
  gst-launch-1.0 playbin uri=gs://mybucket/videos/sample.mp4
  gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! gssink object-name="img/img%05d.png" bucket-name="mybucket" next-file=buffer
  gst-launch-1.0 filesrc location=sample.mp4 ! gssink object-name="videos/video.mp4" bucket-name="mybucket" next-file=none

When running locally simply set GOOGLE_APPLICATION_CREDENTIALS. But
when running in Google Cloud Run or Google Cloud Engine, just set the
"service-account-email" property on each element.

Closes #1264

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1369>
2021-03-18 22:32:48 +00:00

793 lines
26 KiB
C++

/* GStreamer
* Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
*
* gstgssink.cpp:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-gssink
* @title: gssink
* @see_also: #GstGsSrc
*
* Write incoming data to a series of sequentially-named remote files on a
* Google Cloud Storage bucket.
*
* The object-name property should contain a string with a \%d placeholder
* that will be substituted with the index for each filename.
*
* If the #GstGsSink:post-messages property is %TRUE, it sends an application
* message named `GstGsSink` after writing each buffer.
*
* The message's structure contains these fields:
*
* * #gchararray `filename`: the filename where the buffer was written.
* * #gchararray `date`: the date of the current buffer, NULL if no start date
* is provided.
* * #gint `index`: index of the buffer.
* * #GstClockTime `timestamp`: the timestamp of the buffer.
* * #GstClockTime `stream-time`: the stream time of the buffer.
* * #GstClockTime `running-time`: the running_time of the buffer.
* * #GstClockTime `duration`: the duration of the buffer.
* * #guint64 `offset`: the offset of the buffer that triggered the message.
* * #guint64 `offset-end`: the offset-end of the buffer that triggered the
* message.
*
* ## Example launch line
* ```
* gst-launch-1.0 videotestsrc num-buffers=15 ! pngenc ! gssink
* object-name="mypath/myframes/frame%05d.png" bucket-name="mybucket"
* next-file=buffer post-messages=true
* ```
* ### Upload 15 png images into gs://mybucket/mypath/myframes/ where the file
* names are frame00000.png, frame00001.png, ..., frame00014.png
* ```
* gst-launch-1.0 videotestsrc num-buffers=6 ! video/x-raw, framerate=2/1 !
* pngenc ! gssink start-date="2020-04-16T08:55:03Z"
* object-name="mypath/myframes/im_%s_%03d.png" bucket-name="mybucket"
* next-file=buffer post-messages=true
* ```
* ### Upload png 6 images into gs://mybucket/mypath/myframes/ where the file
* names are im_2020-04-16T08:55:03Z_000.png, im_2020-04-16T08:55:03Z_001.png,
* im_2020-04-16T08:55:04Z_002.png, im_2020-04-16T08:55:04Z_003.png,
* im_2020-04-16T08:55:05Z_004.png, im_2020-04-16T08:55:05Z_005.png.
* ```
* gst-launch-1.0 filesrc location=some_video.mp4 ! gssink
* object-name="mypath/myvideos/video.mp4" bucket-name="mybucket" next-file=none
* ```
* ### Upload any stream as a single file into Google Cloud Storage. Similar as
* filesink in this case. The file is then accessible from:
* gs://mybucket/mypath/myvideos/video.mp4
*
* See also: #GstGsSrc
* Since: 1.20
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstgscommon.h"
#include "gstgssink.h"
#include <algorithm>
static GstStaticPadTemplate sinktemplate =
GST_STATIC_PAD_TEMPLATE("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC(gst_gs_sink_debug);
#define GST_CAT_DEFAULT gst_gs_sink_debug
#define DEFAULT_INDEX 0
#define DEFAULT_NEXT_FILE GST_GS_SINK_NEXT_BUFFER
#define DEFAULT_OBJECT_NAME "%s_%05d"
#define DEFAULT_POST_MESSAGES FALSE
namespace gcs = google::cloud::storage;
enum {
PROP_0,
PROP_BUCKET_NAME,
PROP_OBJECT_NAME,
PROP_INDEX,
PROP_POST_MESSAGES,
PROP_NEXT_FILE,
PROP_SERVICE_ACCOUNT_EMAIL,
PROP_START_DATE,
};
class GSWriteStream;
struct _GstGsSink {
GstBaseSink parent;
std::unique_ptr<google::cloud::storage::Client> gcs_client;
std::unique_ptr<GSWriteStream> gcs_stream;
gchar* service_account_email;
gchar* bucket_name;
gchar* object_name;
gchar* start_date_str;
GDateTime* start_date;
gint index;
gboolean post_messages;
GstGsSinkNext next_file;
const gchar* content_type;
size_t nb_percent_format;
gboolean percent_s_is_first;
};
static void gst_gs_sink_finalize(GObject* object);
static void gst_gs_sink_set_property(GObject* object,
guint prop_id,
const GValue* value,
GParamSpec* pspec);
static void gst_gs_sink_get_property(GObject* object,
guint prop_id,
GValue* value,
GParamSpec* pspec);
static gboolean gst_gs_sink_start(GstBaseSink* bsink);
static gboolean gst_gs_sink_stop(GstBaseSink* sink);
static GstFlowReturn gst_gs_sink_render(GstBaseSink* sink, GstBuffer* buffer);
static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
GstBufferList* buffer_list);
static gboolean gst_gs_sink_set_caps(GstBaseSink* sink, GstCaps* caps);
static gboolean gst_gs_sink_event(GstBaseSink* sink, GstEvent* event);
#define GST_TYPE_GS_SINK_NEXT (gst_gs_sink_next_get_type())
static GType gst_gs_sink_next_get_type(void) {
static GType gs_sink_next_type = 0;
static const GEnumValue next_types[] = {
{GST_GS_SINK_NEXT_BUFFER, "New file for each buffer", "buffer"},
{GST_GS_SINK_NEXT_NONE, "Only one file, no next file", "none"},
{0, NULL, NULL}};
if (!gs_sink_next_type) {
gs_sink_next_type = g_enum_register_static("GstGsSinkNext", next_types);
}
return gs_sink_next_type;
}
#define gst_gs_sink_parent_class parent_class
G_DEFINE_TYPE(GstGsSink, gst_gs_sink, GST_TYPE_BASE_SINK);
class GSWriteStream {
public:
GSWriteStream(google::cloud::storage::Client& client,
const char* bucket_name,
const char* object_name,
const char* content_type)
: gcs_stream_(client.WriteObject(
bucket_name,
object_name,
gcs::WithObjectMetadata(
gcs::ObjectMetadata().set_content_type(content_type)))) {}
~GSWriteStream() { gcs_stream_.Close(); }
gcs::ObjectWriteStream& stream() { return gcs_stream_; }
private:
gcs::ObjectWriteStream gcs_stream_;
};
static void gst_gs_sink_class_init(GstGsSinkClass* klass) {
GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
GstBaseSinkClass* gstbasesink_class = GST_BASE_SINK_CLASS(klass);
gobject_class->set_property = gst_gs_sink_set_property;
gobject_class->get_property = gst_gs_sink_get_property;
/**
* GstGsSink:bucket-name:
*
* Name of the Google Cloud Storage bucket.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_BUCKET_NAME,
g_param_spec_string(
"bucket-name", "Bucket Name", "Google Cloud Storage Bucket Name",
NULL, (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:object-name:
*
* Full path name of the remote file.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_OBJECT_NAME,
g_param_spec_string(
"object-name", "Object Name", "Full path name of the remote file",
DEFAULT_OBJECT_NAME,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:index:
*
* Index to use with location property to create file names.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_INDEX,
g_param_spec_int(
"index", "Index",
"Index to use with location property to create file names. The "
"index is incremented by one for each buffer written.",
0, G_MAXINT, DEFAULT_INDEX,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:post-messages:
*
* Post a message on the GstBus for each file.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_POST_MESSAGES,
g_param_spec_boolean(
"post-messages", "Post Messages",
"Post a message for each file with information of the buffer",
DEFAULT_POST_MESSAGES,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:next-file:
*
* A #GstGsSinkNext that specifies when to start a new file.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_NEXT_FILE,
g_param_spec_enum(
"next-file", "Next File", "When to start a new file",
GST_TYPE_GS_SINK_NEXT, DEFAULT_NEXT_FILE,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:service-account-email:
*
* Service Account Email to use for credentials.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
g_param_spec_string(
"service-account-email", "Service Account Email",
"Service Account Email to use for credentials", NULL,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
/**
* GstGsSink:start-date:
*
* Start date in iso8601 format.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_START_DATE,
g_param_spec_string(
"start-date", "Start Date", "Start date in iso8601 format", NULL,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
gobject_class->finalize = gst_gs_sink_finalize;
gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR(gst_gs_sink_stop);
gstbasesink_class->render = GST_DEBUG_FUNCPTR(gst_gs_sink_render);
gstbasesink_class->render_list = GST_DEBUG_FUNCPTR(gst_gs_sink_render_list);
gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR(gst_gs_sink_set_caps);
gstbasesink_class->event = GST_DEBUG_FUNCPTR(gst_gs_sink_event);
GST_DEBUG_CATEGORY_INIT(gst_gs_sink_debug, "gssink", 0, "gssink element");
gst_element_class_add_static_pad_template(gstelement_class, &sinktemplate);
gst_element_class_set_static_metadata(
gstelement_class, "Google Cloud Storage Sink", "Sink/File",
"Write buffers to a sequentially named set of files on Google Cloud "
"Storage",
"Julien Isorce <jisorce@oblong.com>");
}
static void gst_gs_sink_init(GstGsSink* sink) {
sink->gcs_client = nullptr;
sink->gcs_stream = nullptr;
sink->index = DEFAULT_INDEX;
sink->post_messages = DEFAULT_POST_MESSAGES;
sink->service_account_email = NULL;
sink->bucket_name = NULL;
sink->object_name = g_strdup(DEFAULT_OBJECT_NAME);
sink->start_date_str = NULL;
sink->start_date = NULL;
sink->next_file = DEFAULT_NEXT_FILE;
sink->content_type = NULL;
sink->nb_percent_format = 0;
sink->percent_s_is_first = FALSE;
gst_base_sink_set_sync(GST_BASE_SINK(sink), FALSE);
}
static void gst_gs_sink_finalize(GObject* object) {
GstGsSink* sink = GST_GS_SINK(object);
sink->gcs_client = nullptr;
sink->gcs_stream = nullptr;
g_free(sink->service_account_email);
sink->service_account_email = NULL;
g_free(sink->bucket_name);
sink->bucket_name = NULL;
g_free(sink->object_name);
sink->object_name = NULL;
g_free(sink->start_date_str);
sink->start_date_str = NULL;
if (sink->start_date) {
g_date_time_unref(sink->start_date);
sink->start_date = NULL;
}
sink->content_type = NULL;
G_OBJECT_CLASS(parent_class)->finalize(object);
}
static gboolean gst_gs_sink_set_object_name(GstGsSink* sink,
const gchar* object_name) {
g_free(sink->object_name);
sink->object_name = NULL;
sink->nb_percent_format = 0;
sink->percent_s_is_first = FALSE;
if (!object_name) {
GST_ERROR_OBJECT(sink, "Object name is null");
return FALSE;
}
const std::string name(object_name);
sink->nb_percent_format = std::count(name.begin(), name.end(), '%');
if (sink->nb_percent_format > 2) {
GST_ERROR_OBJECT(sink, "Object name has too many formats");
return FALSE;
}
const size_t delimiter_percent_s = name.find("%s");
if (delimiter_percent_s == std::string::npos) {
if (sink->nb_percent_format == 2) {
GST_ERROR_OBJECT(sink, "Object name must have just one number format");
return FALSE;
}
sink->object_name = g_strdup(object_name);
return TRUE;
}
const size_t delimiter_percent = name.find_first_of('%');
if (delimiter_percent_s == delimiter_percent) {
sink->percent_s_is_first = TRUE;
if (name.find("%s", delimiter_percent_s + 1) != std::string::npos) {
GST_ERROR_OBJECT(sink, "Object name expect max one string format");
return FALSE;
}
}
sink->object_name = g_strdup(object_name);
return TRUE;
}
static void gst_gs_sink_set_property(GObject* object,
guint prop_id,
const GValue* value,
GParamSpec* pspec) {
GstGsSink* sink = GST_GS_SINK(object);
switch (prop_id) {
case PROP_BUCKET_NAME:
g_free(sink->bucket_name);
sink->bucket_name = g_strdup(g_value_get_string(value));
break;
case PROP_OBJECT_NAME:
gst_gs_sink_set_object_name(sink, g_value_get_string(value));
break;
case PROP_INDEX:
sink->index = g_value_get_int(value);
break;
case PROP_POST_MESSAGES:
sink->post_messages = g_value_get_boolean(value);
break;
case PROP_NEXT_FILE:
sink->next_file = (GstGsSinkNext)g_value_get_enum(value);
break;
case PROP_SERVICE_ACCOUNT_EMAIL:
g_free(sink->service_account_email);
sink->service_account_email = g_strdup(g_value_get_string(value));
break;
case PROP_START_DATE:
g_free(sink->start_date_str);
if (sink->start_date)
g_date_time_unref(sink->start_date);
sink->start_date_str = g_strdup(g_value_get_string(value));
sink->start_date =
g_date_time_new_from_iso8601(sink->start_date_str, NULL);
if (!sink->start_date) {
GST_ERROR_OBJECT(sink, "Failed to parse start date %s",
sink->start_date_str);
g_free(sink->start_date_str);
sink->start_date_str = NULL;
}
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static void gst_gs_sink_get_property(GObject* object,
guint prop_id,
GValue* value,
GParamSpec* pspec) {
GstGsSink* sink = GST_GS_SINK(object);
switch (prop_id) {
case PROP_BUCKET_NAME:
g_value_set_string(value, sink->bucket_name);
break;
case PROP_OBJECT_NAME:
g_value_set_string(value, sink->object_name);
break;
case PROP_INDEX:
g_value_set_int(value, sink->index);
break;
case PROP_POST_MESSAGES:
g_value_set_boolean(value, sink->post_messages);
break;
case PROP_NEXT_FILE:
g_value_set_enum(value, sink->next_file);
break;
case PROP_SERVICE_ACCOUNT_EMAIL:
g_value_set_string(value, sink->service_account_email);
break;
case PROP_START_DATE:
g_value_set_string(value, sink->start_date_str);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static gboolean gst_gs_sink_start(GstBaseSink* bsink) {
GstGsSink* sink = GST_GS_SINK(bsink);
GError* err = NULL;
if (!sink->bucket_name) {
GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Bucket name is required"),
GST_ERROR_SYSTEM);
return FALSE;
}
if (!sink->object_name) {
GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Object name is required"),
GST_ERROR_SYSTEM);
return FALSE;
}
sink->content_type = "";
sink->gcs_client = gst_gs_create_client(sink->service_account_email, &err);
if (err) {
GST_ELEMENT_ERROR(sink, RESOURCE, OPEN_READ,
("Could not create client (%s)", err->message),
GST_ERROR_SYSTEM);
g_clear_error(&err);
return FALSE;
}
GST_INFO_OBJECT(sink, "Using bucket name (%s) and object name (%s)",
sink->bucket_name, sink->object_name);
return TRUE;
}
static gboolean gst_gs_sink_stop(GstBaseSink* bsink) {
GstGsSink* sink = GST_GS_SINK(bsink);
sink->gcs_client = nullptr;
sink->gcs_stream = nullptr;
sink->content_type = NULL;
return TRUE;
}
static void gst_gs_sink_post_message_full(GstGsSink* sink,
GstClockTime timestamp,
GstClockTime duration,
GstClockTime offset,
GstClockTime offset_end,
GstClockTime running_time,
GstClockTime stream_time,
const char* filename,
const gchar* date) {
GstStructure* s;
if (!sink->post_messages)
return;
s = gst_structure_new("GstGsSink", "filename", G_TYPE_STRING, filename,
"date", G_TYPE_STRING, date, "index", G_TYPE_INT,
sink->index, "timestamp", G_TYPE_UINT64, timestamp,
"stream-time", G_TYPE_UINT64, stream_time,
"running-time", G_TYPE_UINT64, running_time, "duration",
G_TYPE_UINT64, duration, "offset", G_TYPE_UINT64,
offset, "offset-end", G_TYPE_UINT64, offset_end, NULL);
gst_element_post_message(GST_ELEMENT_CAST(sink),
gst_message_new_element(GST_OBJECT_CAST(sink), s));
}
static void gst_gs_sink_post_message_from_time(GstGsSink* sink,
GstClockTime timestamp,
GstClockTime duration,
const char* filename) {
GstClockTime running_time, stream_time;
guint64 offset, offset_end;
GstSegment* segment;
GstFormat format;
if (!sink->post_messages)
return;
segment = &GST_BASE_SINK(sink)->segment;
format = segment->format;
offset = -1;
offset_end = -1;
running_time = gst_segment_to_running_time(segment, format, timestamp);
stream_time = gst_segment_to_stream_time(segment, format, timestamp);
gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
running_time, stream_time, filename, NULL);
}
static void gst_gs_sink_post_message(GstGsSink* sink,
GstBuffer* buffer,
const char* filename,
const char* date) {
GstClockTime duration, timestamp;
GstClockTime running_time, stream_time;
guint64 offset, offset_end;
GstSegment* segment;
GstFormat format;
if (!sink->post_messages)
return;
segment = &GST_BASE_SINK(sink)->segment;
format = segment->format;
timestamp = GST_BUFFER_PTS(buffer);
duration = GST_BUFFER_DURATION(buffer);
offset = GST_BUFFER_OFFSET(buffer);
offset_end = GST_BUFFER_OFFSET_END(buffer);
running_time = gst_segment_to_running_time(segment, format, timestamp);
stream_time = gst_segment_to_stream_time(segment, format, timestamp);
gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
running_time, stream_time, filename, date);
}
static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
GstBuffer* buffer) {
GstMapInfo map = {0};
gchar* object_name = NULL;
gchar* buffer_date = NULL;
if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
return GST_FLOW_ERROR;
switch (sink->next_file) {
case GST_GS_SINK_NEXT_BUFFER: {
// Get buffer date if needed.
if (sink->start_date) {
if (sink->nb_percent_format != 2) {
GST_ERROR_OBJECT(sink, "Object name expects date and index");
gst_buffer_unmap(buffer, &map);
return GST_FLOW_ERROR;
}
if (!gst_gs_get_buffer_date(buffer, sink->start_date, &buffer_date)) {
GST_ERROR_OBJECT(sink, "Could not get buffer date %s", object_name);
gst_buffer_unmap(buffer, &map);
return GST_FLOW_ERROR;
}
if (sink->percent_s_is_first) {
object_name =
g_strdup_printf(sink->object_name, buffer_date, sink->index);
} else {
object_name =
g_strdup_printf(sink->object_name, sink->index, buffer_date);
}
} else {
if (sink->nb_percent_format != 1) {
GST_ERROR_OBJECT(sink, "Object name expects only an index");
gst_buffer_unmap(buffer, &map);
return GST_FLOW_ERROR;
}
object_name = g_strdup_printf(sink->object_name, sink->index);
}
GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject(
sink->bucket_name, object_name,
gcs::WithObjectMetadata(
gcs::ObjectMetadata().set_content_type(sink->content_type)));
gcs_stream.write(reinterpret_cast<const char*>(map.data), map.size);
if (gcs_stream.fail()) {
GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
}
gcs_stream.Close();
google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
sink->gcs_client->GetObjectMetadata(sink->bucket_name, object_name);
if (!object_metadata) {
GST_ERROR_OBJECT(
sink, "Could not get object metadata for object %s (%s)",
object_name, object_metadata.status().message().c_str());
gst_buffer_unmap(buffer, &map);
g_free(object_name);
g_free(buffer_date);
return GST_FLOW_ERROR;
}
GST_INFO_OBJECT(sink, "Wrote object %s of size %" G_GUINT64_FORMAT "\n",
object_name, object_metadata->size());
gst_gs_sink_post_message(sink, buffer, object_name, buffer_date);
g_free(object_name);
g_free(buffer_date);
++sink->index;
break;
}
case GST_GS_SINK_NEXT_NONE: {
if (!sink->gcs_stream) {
GST_INFO_OBJECT(sink, "Opening %s", sink->object_name);
sink->gcs_stream = std::make_unique<GSWriteStream>(
*sink->gcs_client.get(), sink->bucket_name, sink->object_name,
sink->content_type);
if (!sink->gcs_stream->stream().IsOpen()) {
GST_ELEMENT_ERROR(
sink, RESOURCE, OPEN_READ,
("Could not create write stream (%s)",
sink->gcs_stream->stream().last_status().message().c_str()),
GST_ERROR_SYSTEM);
gst_buffer_unmap(buffer, &map);
return GST_FLOW_OK;
}
}
GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
gcs::ObjectWriteStream& stream = sink->gcs_stream->stream();
stream.write(reinterpret_cast<const char*>(map.data), map.size);
if (stream.fail()) {
GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
}
break;
}
default:
g_assert_not_reached();
}
gst_buffer_unmap(buffer, &map);
return GST_FLOW_OK;
}
static GstFlowReturn gst_gs_sink_render(GstBaseSink* bsink, GstBuffer* buffer) {
GstGsSink* sink = GST_GS_SINK(bsink);
GstFlowReturn flow = GST_FLOW_OK;
flow = gst_gs_sink_write_buffer(sink, buffer);
return flow;
}
static gboolean buffer_list_copy_data(GstBuffer** buf,
guint idx,
gpointer data) {
GstBuffer* dest = GST_BUFFER_CAST(data);
guint num, i;
if (idx == 0)
gst_buffer_copy_into(dest, *buf, GST_BUFFER_COPY_METADATA, 0, -1);
num = gst_buffer_n_memory(*buf);
for (i = 0; i < num; ++i) {
GstMemory* mem;
mem = gst_buffer_get_memory(*buf, i);
gst_buffer_append_memory(dest, mem);
}
return TRUE;
}
/* Our assumption for now is that the buffers in a buffer list should always
* end up in the same file. If someone wants different behaviour, they'll just
* have to add a property for that. */
static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
GstBufferList* list) {
GstBuffer* buf;
guint size;
size = gst_buffer_list_calculate_size(list);
GST_LOG_OBJECT(sink, "total size of buffer list %p: %u", list, size);
/* copy all buffers in the list into one single buffer, so we can use
* the normal render function (FIXME: optimise to avoid the memcpy) */
buf = gst_buffer_new();
gst_buffer_list_foreach(list, buffer_list_copy_data, buf);
g_assert(gst_buffer_get_size(buf) == size);
gst_gs_sink_render(sink, buf);
gst_buffer_unref(buf);
return GST_FLOW_OK;
}
static gboolean gst_gs_sink_set_caps(GstBaseSink* bsink, GstCaps* caps) {
GstGsSink* sink = GST_GS_SINK(bsink);
GstStructure* s = gst_caps_get_structure(caps, 0);
sink->content_type = gst_structure_get_name(s);
GST_INFO_OBJECT(sink, "Content type: %s", sink->content_type);
return TRUE;
}
static gboolean gst_gs_sink_event(GstBaseSink* bsink, GstEvent* event) {
GstGsSink* sink = GST_GS_SINK(bsink);
switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_EOS:
if (sink->gcs_stream) {
sink->gcs_stream = nullptr;
gst_gs_sink_post_message_from_time(
sink, GST_BASE_SINK(sink)->segment.position, -1, sink->object_name);
}
break;
default:
break;
}
return GST_BASE_SINK_CLASS(parent_class)->event(bsink, event);
}