mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-03 16:09:39 +00:00
e9f5d94c93
Useful when having a service that runs a GStreamer pipeline or application in Google Cloud to avoid storing the inputs and outputs in the running container or service. For example when analyzing a video from a Google Cloud Storage bucket and extracting images or converting the video and then uploading the results into another Google Cloud Storage bucket. - gssrc allows to read from a file located in Google Cloud Storage and it supports seeking. - gssink allows to write to a file located in Google Cloud Storage. There are 2 modes, one similar to multifilesink and the other similar to filesink. Example: gst-launch-1.0 gssrc location=gs://mybucket/videos/sample.mp4 ! decodebin ! glimagesink gst-launch-1.0 playbin uri=gs://mybucket/videos/sample.mp4 gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! gssink object-name="img/img%05d.png" bucket-name="mybucket" next-file=buffer gst-launch-1.0 filesrc location=sample.mp4 ! gssink object-name="videos/video.mp4" bucket-name="mybucket" next-file=none When running locally simply set GOOGLE_APPLICATION_CREDENTIALS. But when running in Google Cloud Run or Google Cloud Engine, just set the "service-account-email" property on each element. Closes #1264 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1369>
793 lines
26 KiB
C++
793 lines
26 KiB
C++
/* GStreamer
|
|
* Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
|
|
*
|
|
* gstgssink.cpp:
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Library General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Library General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Library General Public
|
|
* License along with this library; if not, write to the
|
|
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
* Boston, MA 02110-1301, USA.
|
|
*/
|
|
/**
|
|
* SECTION:element-gssink
|
|
* @title: gssink
|
|
* @see_also: #GstGsSrc
|
|
*
|
|
* Write incoming data to a series of sequentially-named remote files on a
|
|
* Google Cloud Storage bucket.
|
|
*
|
|
* The object-name property should contain a string with a \%d placeholder
|
|
* that will be substituted with the index for each filename.
|
|
*
|
|
* If the #GstGsSink:post-messages property is %TRUE, it sends an application
|
|
* message named `GstGsSink` after writing each buffer.
|
|
*
|
|
* The message's structure contains these fields:
|
|
*
|
|
* * #gchararray `filename`: the filename where the buffer was written.
|
|
* * #gchararray `date`: the date of the current buffer, NULL if no start date
|
|
* is provided.
|
|
* * #gint `index`: index of the buffer.
|
|
* * #GstClockTime `timestamp`: the timestamp of the buffer.
|
|
* * #GstClockTime `stream-time`: the stream time of the buffer.
|
|
* * #GstClockTime `running-time`: the running_time of the buffer.
|
|
* * #GstClockTime `duration`: the duration of the buffer.
|
|
* * #guint64 `offset`: the offset of the buffer that triggered the message.
|
|
* * #guint64 `offset-end`: the offset-end of the buffer that triggered the
|
|
* message.
|
|
*
|
|
* ## Example launch line
|
|
* ```
|
|
* gst-launch-1.0 videotestsrc num-buffers=15 ! pngenc ! gssink
|
|
* object-name="mypath/myframes/frame%05d.png" bucket-name="mybucket"
|
|
* next-file=buffer post-messages=true
|
|
* ```
|
|
* ### Upload 15 png images into gs://mybucket/mypath/myframes/ where the file
|
|
* names are frame00000.png, frame00001.png, ..., frame00014.png
|
|
* ```
|
|
* gst-launch-1.0 videotestsrc num-buffers=6 ! video/x-raw, framerate=2/1 !
|
|
* pngenc ! gssink start-date="2020-04-16T08:55:03Z"
|
|
* object-name="mypath/myframes/im_%s_%03d.png" bucket-name="mybucket"
|
|
* next-file=buffer post-messages=true
|
|
* ```
|
|
* ### Upload png 6 images into gs://mybucket/mypath/myframes/ where the file
|
|
* names are im_2020-04-16T08:55:03Z_000.png, im_2020-04-16T08:55:03Z_001.png,
|
|
* im_2020-04-16T08:55:04Z_002.png, im_2020-04-16T08:55:04Z_003.png,
|
|
* im_2020-04-16T08:55:05Z_004.png, im_2020-04-16T08:55:05Z_005.png.
|
|
* ```
|
|
* gst-launch-1.0 filesrc location=some_video.mp4 ! gssink
|
|
* object-name="mypath/myvideos/video.mp4" bucket-name="mybucket" next-file=none
|
|
* ```
|
|
* ### Upload any stream as a single file into Google Cloud Storage. Similar as
|
|
* filesink in this case. The file is then accessible from:
|
|
* gs://mybucket/mypath/myvideos/video.mp4
|
|
*
|
|
* See also: #GstGsSrc
|
|
* Since: 1.20
|
|
*/
|
|
|
|
#ifdef HAVE_CONFIG_H
|
|
#include "config.h"
|
|
#endif
|
|
|
|
#include "gstgscommon.h"
|
|
#include "gstgssink.h"
|
|
|
|
#include <algorithm>
|
|
|
|
static GstStaticPadTemplate sinktemplate =
|
|
GST_STATIC_PAD_TEMPLATE("sink",
|
|
GST_PAD_SINK,
|
|
GST_PAD_ALWAYS,
|
|
GST_STATIC_CAPS_ANY);
|
|
|
|
GST_DEBUG_CATEGORY_STATIC(gst_gs_sink_debug);
|
|
#define GST_CAT_DEFAULT gst_gs_sink_debug
|
|
|
|
#define DEFAULT_INDEX 0
|
|
#define DEFAULT_NEXT_FILE GST_GS_SINK_NEXT_BUFFER
|
|
#define DEFAULT_OBJECT_NAME "%s_%05d"
|
|
#define DEFAULT_POST_MESSAGES FALSE
|
|
|
|
namespace gcs = google::cloud::storage;
|
|
|
|
enum {
|
|
PROP_0,
|
|
PROP_BUCKET_NAME,
|
|
PROP_OBJECT_NAME,
|
|
PROP_INDEX,
|
|
PROP_POST_MESSAGES,
|
|
PROP_NEXT_FILE,
|
|
PROP_SERVICE_ACCOUNT_EMAIL,
|
|
PROP_START_DATE,
|
|
};
|
|
|
|
class GSWriteStream;
|
|
|
|
struct _GstGsSink {
|
|
GstBaseSink parent;
|
|
|
|
std::unique_ptr<google::cloud::storage::Client> gcs_client;
|
|
std::unique_ptr<GSWriteStream> gcs_stream;
|
|
gchar* service_account_email;
|
|
gchar* bucket_name;
|
|
gchar* object_name;
|
|
gchar* start_date_str;
|
|
GDateTime* start_date;
|
|
gint index;
|
|
gboolean post_messages;
|
|
GstGsSinkNext next_file;
|
|
const gchar* content_type;
|
|
size_t nb_percent_format;
|
|
gboolean percent_s_is_first;
|
|
};
|
|
|
|
static void gst_gs_sink_finalize(GObject* object);
|
|
|
|
static void gst_gs_sink_set_property(GObject* object,
|
|
guint prop_id,
|
|
const GValue* value,
|
|
GParamSpec* pspec);
|
|
static void gst_gs_sink_get_property(GObject* object,
|
|
guint prop_id,
|
|
GValue* value,
|
|
GParamSpec* pspec);
|
|
|
|
static gboolean gst_gs_sink_start(GstBaseSink* bsink);
|
|
static gboolean gst_gs_sink_stop(GstBaseSink* sink);
|
|
static GstFlowReturn gst_gs_sink_render(GstBaseSink* sink, GstBuffer* buffer);
|
|
static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
|
|
GstBufferList* buffer_list);
|
|
static gboolean gst_gs_sink_set_caps(GstBaseSink* sink, GstCaps* caps);
|
|
static gboolean gst_gs_sink_event(GstBaseSink* sink, GstEvent* event);
|
|
|
|
#define GST_TYPE_GS_SINK_NEXT (gst_gs_sink_next_get_type())
|
|
static GType gst_gs_sink_next_get_type(void) {
|
|
static GType gs_sink_next_type = 0;
|
|
static const GEnumValue next_types[] = {
|
|
{GST_GS_SINK_NEXT_BUFFER, "New file for each buffer", "buffer"},
|
|
{GST_GS_SINK_NEXT_NONE, "Only one file, no next file", "none"},
|
|
{0, NULL, NULL}};
|
|
|
|
if (!gs_sink_next_type) {
|
|
gs_sink_next_type = g_enum_register_static("GstGsSinkNext", next_types);
|
|
}
|
|
|
|
return gs_sink_next_type;
|
|
}
|
|
|
|
#define gst_gs_sink_parent_class parent_class
|
|
G_DEFINE_TYPE(GstGsSink, gst_gs_sink, GST_TYPE_BASE_SINK);
|
|
|
|
class GSWriteStream {
|
|
public:
|
|
GSWriteStream(google::cloud::storage::Client& client,
|
|
const char* bucket_name,
|
|
const char* object_name,
|
|
const char* content_type)
|
|
: gcs_stream_(client.WriteObject(
|
|
bucket_name,
|
|
object_name,
|
|
gcs::WithObjectMetadata(
|
|
gcs::ObjectMetadata().set_content_type(content_type)))) {}
|
|
~GSWriteStream() { gcs_stream_.Close(); }
|
|
|
|
gcs::ObjectWriteStream& stream() { return gcs_stream_; }
|
|
|
|
private:
|
|
gcs::ObjectWriteStream gcs_stream_;
|
|
};
|
|
|
|
static void gst_gs_sink_class_init(GstGsSinkClass* klass) {
|
|
GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
|
|
GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
|
|
GstBaseSinkClass* gstbasesink_class = GST_BASE_SINK_CLASS(klass);
|
|
|
|
gobject_class->set_property = gst_gs_sink_set_property;
|
|
gobject_class->get_property = gst_gs_sink_get_property;
|
|
|
|
/**
|
|
* GstGsSink:bucket-name:
|
|
*
|
|
* Name of the Google Cloud Storage bucket.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property(
|
|
gobject_class, PROP_BUCKET_NAME,
|
|
g_param_spec_string(
|
|
"bucket-name", "Bucket Name", "Google Cloud Storage Bucket Name",
|
|
NULL, (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
|
|
|
|
/**
|
|
* GstGsSink:object-name:
|
|
*
|
|
* Full path name of the remote file.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property(
|
|
gobject_class, PROP_OBJECT_NAME,
|
|
g_param_spec_string(
|
|
"object-name", "Object Name", "Full path name of the remote file",
|
|
DEFAULT_OBJECT_NAME,
|
|
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
|
|
|
|
/**
|
|
* GstGsSink:index:
|
|
*
|
|
* Index to use with location property to create file names.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property(
|
|
gobject_class, PROP_INDEX,
|
|
g_param_spec_int(
|
|
"index", "Index",
|
|
"Index to use with location property to create file names. The "
|
|
"index is incremented by one for each buffer written.",
|
|
0, G_MAXINT, DEFAULT_INDEX,
|
|
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
|
|
|
|
/**
|
|
* GstGsSink:post-messages:
|
|
*
|
|
* Post a message on the GstBus for each file.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property(
|
|
gobject_class, PROP_POST_MESSAGES,
|
|
g_param_spec_boolean(
|
|
"post-messages", "Post Messages",
|
|
"Post a message for each file with information of the buffer",
|
|
DEFAULT_POST_MESSAGES,
|
|
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
|
|
/**
|
|
* GstGsSink:next-file:
|
|
*
|
|
* A #GstGsSinkNext that specifies when to start a new file.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property(
|
|
gobject_class, PROP_NEXT_FILE,
|
|
g_param_spec_enum(
|
|
"next-file", "Next File", "When to start a new file",
|
|
GST_TYPE_GS_SINK_NEXT, DEFAULT_NEXT_FILE,
|
|
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
|
|
|
|
/**
|
|
* GstGsSink:service-account-email:
|
|
*
|
|
* Service Account Email to use for credentials.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property(
|
|
gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
|
|
g_param_spec_string(
|
|
"service-account-email", "Service Account Email",
|
|
"Service Account Email to use for credentials", NULL,
|
|
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
|
|
GST_PARAM_MUTABLE_READY)));
|
|
|
|
/**
|
|
* GstGsSink:start-date:
|
|
*
|
|
* Start date in iso8601 format.
|
|
*
|
|
* Since: 1.20
|
|
*/
|
|
g_object_class_install_property(
|
|
gobject_class, PROP_START_DATE,
|
|
g_param_spec_string(
|
|
"start-date", "Start Date", "Start date in iso8601 format", NULL,
|
|
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
|
|
GST_PARAM_MUTABLE_READY)));
|
|
|
|
gobject_class->finalize = gst_gs_sink_finalize;
|
|
|
|
gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start);
|
|
gstbasesink_class->stop = GST_DEBUG_FUNCPTR(gst_gs_sink_stop);
|
|
gstbasesink_class->render = GST_DEBUG_FUNCPTR(gst_gs_sink_render);
|
|
gstbasesink_class->render_list = GST_DEBUG_FUNCPTR(gst_gs_sink_render_list);
|
|
gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR(gst_gs_sink_set_caps);
|
|
gstbasesink_class->event = GST_DEBUG_FUNCPTR(gst_gs_sink_event);
|
|
|
|
GST_DEBUG_CATEGORY_INIT(gst_gs_sink_debug, "gssink", 0, "gssink element");
|
|
|
|
gst_element_class_add_static_pad_template(gstelement_class, &sinktemplate);
|
|
gst_element_class_set_static_metadata(
|
|
gstelement_class, "Google Cloud Storage Sink", "Sink/File",
|
|
"Write buffers to a sequentially named set of files on Google Cloud "
|
|
"Storage",
|
|
"Julien Isorce <jisorce@oblong.com>");
|
|
}
|
|
|
|
static void gst_gs_sink_init(GstGsSink* sink) {
|
|
sink->gcs_client = nullptr;
|
|
sink->gcs_stream = nullptr;
|
|
sink->index = DEFAULT_INDEX;
|
|
sink->post_messages = DEFAULT_POST_MESSAGES;
|
|
sink->service_account_email = NULL;
|
|
sink->bucket_name = NULL;
|
|
sink->object_name = g_strdup(DEFAULT_OBJECT_NAME);
|
|
sink->start_date_str = NULL;
|
|
sink->start_date = NULL;
|
|
sink->next_file = DEFAULT_NEXT_FILE;
|
|
sink->content_type = NULL;
|
|
sink->nb_percent_format = 0;
|
|
sink->percent_s_is_first = FALSE;
|
|
|
|
gst_base_sink_set_sync(GST_BASE_SINK(sink), FALSE);
|
|
}
|
|
|
|
static void gst_gs_sink_finalize(GObject* object) {
|
|
GstGsSink* sink = GST_GS_SINK(object);
|
|
|
|
sink->gcs_client = nullptr;
|
|
sink->gcs_stream = nullptr;
|
|
g_free(sink->service_account_email);
|
|
sink->service_account_email = NULL;
|
|
g_free(sink->bucket_name);
|
|
sink->bucket_name = NULL;
|
|
g_free(sink->object_name);
|
|
sink->object_name = NULL;
|
|
g_free(sink->start_date_str);
|
|
sink->start_date_str = NULL;
|
|
if (sink->start_date) {
|
|
g_date_time_unref(sink->start_date);
|
|
sink->start_date = NULL;
|
|
}
|
|
sink->content_type = NULL;
|
|
|
|
G_OBJECT_CLASS(parent_class)->finalize(object);
|
|
}
|
|
|
|
static gboolean gst_gs_sink_set_object_name(GstGsSink* sink,
|
|
const gchar* object_name) {
|
|
g_free(sink->object_name);
|
|
sink->object_name = NULL;
|
|
sink->nb_percent_format = 0;
|
|
sink->percent_s_is_first = FALSE;
|
|
|
|
if (!object_name) {
|
|
GST_ERROR_OBJECT(sink, "Object name is null");
|
|
return FALSE;
|
|
}
|
|
|
|
const std::string name(object_name);
|
|
sink->nb_percent_format = std::count(name.begin(), name.end(), '%');
|
|
if (sink->nb_percent_format > 2) {
|
|
GST_ERROR_OBJECT(sink, "Object name has too many formats");
|
|
return FALSE;
|
|
}
|
|
|
|
const size_t delimiter_percent_s = name.find("%s");
|
|
if (delimiter_percent_s == std::string::npos) {
|
|
if (sink->nb_percent_format == 2) {
|
|
GST_ERROR_OBJECT(sink, "Object name must have just one number format");
|
|
return FALSE;
|
|
}
|
|
sink->object_name = g_strdup(object_name);
|
|
return TRUE;
|
|
}
|
|
|
|
const size_t delimiter_percent = name.find_first_of('%');
|
|
if (delimiter_percent_s == delimiter_percent) {
|
|
sink->percent_s_is_first = TRUE;
|
|
|
|
if (name.find("%s", delimiter_percent_s + 1) != std::string::npos) {
|
|
GST_ERROR_OBJECT(sink, "Object name expect max one string format");
|
|
return FALSE;
|
|
}
|
|
}
|
|
|
|
sink->object_name = g_strdup(object_name);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static void gst_gs_sink_set_property(GObject* object,
|
|
guint prop_id,
|
|
const GValue* value,
|
|
GParamSpec* pspec) {
|
|
GstGsSink* sink = GST_GS_SINK(object);
|
|
|
|
switch (prop_id) {
|
|
case PROP_BUCKET_NAME:
|
|
g_free(sink->bucket_name);
|
|
sink->bucket_name = g_strdup(g_value_get_string(value));
|
|
break;
|
|
case PROP_OBJECT_NAME:
|
|
gst_gs_sink_set_object_name(sink, g_value_get_string(value));
|
|
break;
|
|
case PROP_INDEX:
|
|
sink->index = g_value_get_int(value);
|
|
break;
|
|
case PROP_POST_MESSAGES:
|
|
sink->post_messages = g_value_get_boolean(value);
|
|
break;
|
|
case PROP_NEXT_FILE:
|
|
sink->next_file = (GstGsSinkNext)g_value_get_enum(value);
|
|
break;
|
|
case PROP_SERVICE_ACCOUNT_EMAIL:
|
|
g_free(sink->service_account_email);
|
|
sink->service_account_email = g_strdup(g_value_get_string(value));
|
|
break;
|
|
case PROP_START_DATE:
|
|
g_free(sink->start_date_str);
|
|
if (sink->start_date)
|
|
g_date_time_unref(sink->start_date);
|
|
sink->start_date_str = g_strdup(g_value_get_string(value));
|
|
sink->start_date =
|
|
g_date_time_new_from_iso8601(sink->start_date_str, NULL);
|
|
if (!sink->start_date) {
|
|
GST_ERROR_OBJECT(sink, "Failed to parse start date %s",
|
|
sink->start_date_str);
|
|
g_free(sink->start_date_str);
|
|
sink->start_date_str = NULL;
|
|
}
|
|
break;
|
|
default:
|
|
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static void gst_gs_sink_get_property(GObject* object,
|
|
guint prop_id,
|
|
GValue* value,
|
|
GParamSpec* pspec) {
|
|
GstGsSink* sink = GST_GS_SINK(object);
|
|
|
|
switch (prop_id) {
|
|
case PROP_BUCKET_NAME:
|
|
g_value_set_string(value, sink->bucket_name);
|
|
break;
|
|
case PROP_OBJECT_NAME:
|
|
g_value_set_string(value, sink->object_name);
|
|
break;
|
|
case PROP_INDEX:
|
|
g_value_set_int(value, sink->index);
|
|
break;
|
|
case PROP_POST_MESSAGES:
|
|
g_value_set_boolean(value, sink->post_messages);
|
|
break;
|
|
case PROP_NEXT_FILE:
|
|
g_value_set_enum(value, sink->next_file);
|
|
break;
|
|
case PROP_SERVICE_ACCOUNT_EMAIL:
|
|
g_value_set_string(value, sink->service_account_email);
|
|
break;
|
|
case PROP_START_DATE:
|
|
g_value_set_string(value, sink->start_date_str);
|
|
break;
|
|
default:
|
|
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static gboolean gst_gs_sink_start(GstBaseSink* bsink) {
|
|
GstGsSink* sink = GST_GS_SINK(bsink);
|
|
GError* err = NULL;
|
|
|
|
if (!sink->bucket_name) {
|
|
GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Bucket name is required"),
|
|
GST_ERROR_SYSTEM);
|
|
return FALSE;
|
|
}
|
|
|
|
if (!sink->object_name) {
|
|
GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Object name is required"),
|
|
GST_ERROR_SYSTEM);
|
|
return FALSE;
|
|
}
|
|
|
|
sink->content_type = "";
|
|
|
|
sink->gcs_client = gst_gs_create_client(sink->service_account_email, &err);
|
|
if (err) {
|
|
GST_ELEMENT_ERROR(sink, RESOURCE, OPEN_READ,
|
|
("Could not create client (%s)", err->message),
|
|
GST_ERROR_SYSTEM);
|
|
g_clear_error(&err);
|
|
return FALSE;
|
|
}
|
|
|
|
GST_INFO_OBJECT(sink, "Using bucket name (%s) and object name (%s)",
|
|
sink->bucket_name, sink->object_name);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean gst_gs_sink_stop(GstBaseSink* bsink) {
|
|
GstGsSink* sink = GST_GS_SINK(bsink);
|
|
|
|
sink->gcs_client = nullptr;
|
|
sink->gcs_stream = nullptr;
|
|
sink->content_type = NULL;
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static void gst_gs_sink_post_message_full(GstGsSink* sink,
|
|
GstClockTime timestamp,
|
|
GstClockTime duration,
|
|
GstClockTime offset,
|
|
GstClockTime offset_end,
|
|
GstClockTime running_time,
|
|
GstClockTime stream_time,
|
|
const char* filename,
|
|
const gchar* date) {
|
|
GstStructure* s;
|
|
|
|
if (!sink->post_messages)
|
|
return;
|
|
|
|
s = gst_structure_new("GstGsSink", "filename", G_TYPE_STRING, filename,
|
|
"date", G_TYPE_STRING, date, "index", G_TYPE_INT,
|
|
sink->index, "timestamp", G_TYPE_UINT64, timestamp,
|
|
"stream-time", G_TYPE_UINT64, stream_time,
|
|
"running-time", G_TYPE_UINT64, running_time, "duration",
|
|
G_TYPE_UINT64, duration, "offset", G_TYPE_UINT64,
|
|
offset, "offset-end", G_TYPE_UINT64, offset_end, NULL);
|
|
|
|
gst_element_post_message(GST_ELEMENT_CAST(sink),
|
|
gst_message_new_element(GST_OBJECT_CAST(sink), s));
|
|
}
|
|
|
|
static void gst_gs_sink_post_message_from_time(GstGsSink* sink,
|
|
GstClockTime timestamp,
|
|
GstClockTime duration,
|
|
const char* filename) {
|
|
GstClockTime running_time, stream_time;
|
|
guint64 offset, offset_end;
|
|
GstSegment* segment;
|
|
GstFormat format;
|
|
|
|
if (!sink->post_messages)
|
|
return;
|
|
|
|
segment = &GST_BASE_SINK(sink)->segment;
|
|
format = segment->format;
|
|
|
|
offset = -1;
|
|
offset_end = -1;
|
|
|
|
running_time = gst_segment_to_running_time(segment, format, timestamp);
|
|
stream_time = gst_segment_to_stream_time(segment, format, timestamp);
|
|
|
|
gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
|
|
running_time, stream_time, filename, NULL);
|
|
}
|
|
|
|
static void gst_gs_sink_post_message(GstGsSink* sink,
|
|
GstBuffer* buffer,
|
|
const char* filename,
|
|
const char* date) {
|
|
GstClockTime duration, timestamp;
|
|
GstClockTime running_time, stream_time;
|
|
guint64 offset, offset_end;
|
|
GstSegment* segment;
|
|
GstFormat format;
|
|
|
|
if (!sink->post_messages)
|
|
return;
|
|
|
|
segment = &GST_BASE_SINK(sink)->segment;
|
|
format = segment->format;
|
|
|
|
timestamp = GST_BUFFER_PTS(buffer);
|
|
duration = GST_BUFFER_DURATION(buffer);
|
|
offset = GST_BUFFER_OFFSET(buffer);
|
|
offset_end = GST_BUFFER_OFFSET_END(buffer);
|
|
|
|
running_time = gst_segment_to_running_time(segment, format, timestamp);
|
|
stream_time = gst_segment_to_stream_time(segment, format, timestamp);
|
|
|
|
gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
|
|
running_time, stream_time, filename, date);
|
|
}
|
|
|
|
static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
|
|
GstBuffer* buffer) {
|
|
GstMapInfo map = {0};
|
|
gchar* object_name = NULL;
|
|
gchar* buffer_date = NULL;
|
|
|
|
if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
|
|
return GST_FLOW_ERROR;
|
|
|
|
switch (sink->next_file) {
|
|
case GST_GS_SINK_NEXT_BUFFER: {
|
|
// Get buffer date if needed.
|
|
if (sink->start_date) {
|
|
if (sink->nb_percent_format != 2) {
|
|
GST_ERROR_OBJECT(sink, "Object name expects date and index");
|
|
gst_buffer_unmap(buffer, &map);
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
if (!gst_gs_get_buffer_date(buffer, sink->start_date, &buffer_date)) {
|
|
GST_ERROR_OBJECT(sink, "Could not get buffer date %s", object_name);
|
|
gst_buffer_unmap(buffer, &map);
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
if (sink->percent_s_is_first) {
|
|
object_name =
|
|
g_strdup_printf(sink->object_name, buffer_date, sink->index);
|
|
} else {
|
|
object_name =
|
|
g_strdup_printf(sink->object_name, sink->index, buffer_date);
|
|
}
|
|
} else {
|
|
if (sink->nb_percent_format != 1) {
|
|
GST_ERROR_OBJECT(sink, "Object name expects only an index");
|
|
gst_buffer_unmap(buffer, &map);
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
object_name = g_strdup_printf(sink->object_name, sink->index);
|
|
}
|
|
|
|
GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
|
|
|
|
gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject(
|
|
sink->bucket_name, object_name,
|
|
gcs::WithObjectMetadata(
|
|
gcs::ObjectMetadata().set_content_type(sink->content_type)));
|
|
gcs_stream.write(reinterpret_cast<const char*>(map.data), map.size);
|
|
if (gcs_stream.fail()) {
|
|
GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
|
|
}
|
|
gcs_stream.Close();
|
|
|
|
google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
|
|
sink->gcs_client->GetObjectMetadata(sink->bucket_name, object_name);
|
|
if (!object_metadata) {
|
|
GST_ERROR_OBJECT(
|
|
sink, "Could not get object metadata for object %s (%s)",
|
|
object_name, object_metadata.status().message().c_str());
|
|
gst_buffer_unmap(buffer, &map);
|
|
g_free(object_name);
|
|
g_free(buffer_date);
|
|
return GST_FLOW_ERROR;
|
|
}
|
|
|
|
GST_INFO_OBJECT(sink, "Wrote object %s of size %" G_GUINT64_FORMAT "\n",
|
|
object_name, object_metadata->size());
|
|
|
|
gst_gs_sink_post_message(sink, buffer, object_name, buffer_date);
|
|
g_free(object_name);
|
|
g_free(buffer_date);
|
|
++sink->index;
|
|
break;
|
|
}
|
|
case GST_GS_SINK_NEXT_NONE: {
|
|
if (!sink->gcs_stream) {
|
|
GST_INFO_OBJECT(sink, "Opening %s", sink->object_name);
|
|
sink->gcs_stream = std::make_unique<GSWriteStream>(
|
|
*sink->gcs_client.get(), sink->bucket_name, sink->object_name,
|
|
sink->content_type);
|
|
|
|
if (!sink->gcs_stream->stream().IsOpen()) {
|
|
GST_ELEMENT_ERROR(
|
|
sink, RESOURCE, OPEN_READ,
|
|
("Could not create write stream (%s)",
|
|
sink->gcs_stream->stream().last_status().message().c_str()),
|
|
GST_ERROR_SYSTEM);
|
|
gst_buffer_unmap(buffer, &map);
|
|
return GST_FLOW_OK;
|
|
}
|
|
}
|
|
|
|
GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
|
|
|
|
gcs::ObjectWriteStream& stream = sink->gcs_stream->stream();
|
|
stream.write(reinterpret_cast<const char*>(map.data), map.size);
|
|
if (stream.fail()) {
|
|
GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
|
|
}
|
|
break;
|
|
}
|
|
default:
|
|
g_assert_not_reached();
|
|
}
|
|
|
|
gst_buffer_unmap(buffer, &map);
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
static GstFlowReturn gst_gs_sink_render(GstBaseSink* bsink, GstBuffer* buffer) {
|
|
GstGsSink* sink = GST_GS_SINK(bsink);
|
|
GstFlowReturn flow = GST_FLOW_OK;
|
|
|
|
flow = gst_gs_sink_write_buffer(sink, buffer);
|
|
return flow;
|
|
}
|
|
|
|
static gboolean buffer_list_copy_data(GstBuffer** buf,
|
|
guint idx,
|
|
gpointer data) {
|
|
GstBuffer* dest = GST_BUFFER_CAST(data);
|
|
guint num, i;
|
|
|
|
if (idx == 0)
|
|
gst_buffer_copy_into(dest, *buf, GST_BUFFER_COPY_METADATA, 0, -1);
|
|
|
|
num = gst_buffer_n_memory(*buf);
|
|
for (i = 0; i < num; ++i) {
|
|
GstMemory* mem;
|
|
|
|
mem = gst_buffer_get_memory(*buf, i);
|
|
gst_buffer_append_memory(dest, mem);
|
|
}
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
/* Our assumption for now is that the buffers in a buffer list should always
|
|
* end up in the same file. If someone wants different behaviour, they'll just
|
|
* have to add a property for that. */
|
|
static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
|
|
GstBufferList* list) {
|
|
GstBuffer* buf;
|
|
guint size;
|
|
|
|
size = gst_buffer_list_calculate_size(list);
|
|
GST_LOG_OBJECT(sink, "total size of buffer list %p: %u", list, size);
|
|
|
|
/* copy all buffers in the list into one single buffer, so we can use
|
|
* the normal render function (FIXME: optimise to avoid the memcpy) */
|
|
buf = gst_buffer_new();
|
|
gst_buffer_list_foreach(list, buffer_list_copy_data, buf);
|
|
g_assert(gst_buffer_get_size(buf) == size);
|
|
|
|
gst_gs_sink_render(sink, buf);
|
|
gst_buffer_unref(buf);
|
|
|
|
return GST_FLOW_OK;
|
|
}
|
|
|
|
static gboolean gst_gs_sink_set_caps(GstBaseSink* bsink, GstCaps* caps) {
|
|
GstGsSink* sink = GST_GS_SINK(bsink);
|
|
GstStructure* s = gst_caps_get_structure(caps, 0);
|
|
|
|
sink->content_type = gst_structure_get_name(s);
|
|
|
|
GST_INFO_OBJECT(sink, "Content type: %s", sink->content_type);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
static gboolean gst_gs_sink_event(GstBaseSink* bsink, GstEvent* event) {
|
|
GstGsSink* sink = GST_GS_SINK(bsink);
|
|
|
|
switch (GST_EVENT_TYPE(event)) {
|
|
case GST_EVENT_EOS:
|
|
if (sink->gcs_stream) {
|
|
sink->gcs_stream = nullptr;
|
|
gst_gs_sink_post_message_from_time(
|
|
sink, GST_BASE_SINK(sink)->segment.position, -1, sink->object_name);
|
|
}
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return GST_BASE_SINK_CLASS(parent_class)->event(bsink, event);
|
|
}
|