gstreamer/ext/gs/gstgssrc.cpp
Julien e9f5d94c93 gs: add source and sink for Google Cloud Storage
Useful when having a service that runs a GStreamer pipeline
or application in Google Cloud to avoid storing the inputs
and outputs in the running container or service. For example
when analyzing a video from a Google Cloud Storage bucket
and extracting images or converting the video and then uploading
the results into another Google Cloud Storage bucket.

- gssrc allows to read from a file located in Google Cloud
Storage and it supports seeking.
- gssink allows to write to a file located in Google Cloud
Storage. There are 2 modes, one similar to multifilesink and
the other similar to filesink.

Example:
  gst-launch-1.0 gssrc location=gs://mybucket/videos/sample.mp4 ! decodebin ! glimagesink
  gst-launch-1.0 playbin uri=gs://mybucket/videos/sample.mp4
  gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! gssink object-name="img/img%05d.png" bucket-name="mybucket" next-file=buffer
  gst-launch-1.0 filesrc location=sample.mp4 ! gssink object-name="videos/video.mp4" bucket-name="mybucket" next-file=none

When running locally simply set GOOGLE_APPLICATION_CREDENTIALS. But
when running in Google Cloud Run or Google Cloud Engine, just set the
"service-account-email" property on each element.

Closes #1264

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1369>
2021-03-18 22:32:48 +00:00

578 lines
17 KiB
C++

/* GStreamer
* Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
*
* gstgssrc.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-gssrc
* @title: gssrc
* @see_also: #GstGsSrc
*
* Read data from a file in a Google Cloud Storage.
*
* ## Example launch line
* ```
* gst-launch-1.0 gssrc location=gs://mybucket/myvideo.mkv ! decodebin !
* glimagesink
* ```
* ### Play a video from a Google Cloud Storage.
* ```
* gst-launch-1.0 gssrc location=gs://mybucket/myvideo.mkv ! decodebin ! navseek
* seek-offset=10 ! glimagesink
* ```
* ### Play a video from a Google Cloud Storage and seek using the keyboard
* from the terminal.
*
* See also: #GstGsSink
* Since: 1.20
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstgscommon.h"
#include "gstgssrc.h"
static GstStaticPadTemplate srctemplate =
GST_STATIC_PAD_TEMPLATE("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC(gst_gs_src_debug);
#define GST_CAT_DEFAULT gst_gs_src_debug
enum { LAST_SIGNAL };
#define DEFAULT_BLOCKSIZE 4 * 1024
enum { PROP_0, PROP_LOCATION, PROP_SERVICE_ACCOUNT_EMAIL };
class GSReadStream;
struct _GstGsSrc {
GstBaseSrc parent;
std::unique_ptr<google::cloud::storage::Client> gcs_client;
std::unique_ptr<GSReadStream> gcs_stream;
gchar* uri;
gchar* service_account_email;
std::string bucket_name;
std::string object_name;
guint64 read_position;
guint64 object_size;
};
static void gst_gs_src_finalize(GObject* object);
static void gst_gs_src_set_property(GObject* object,
guint prop_id,
const GValue* value,
GParamSpec* pspec);
static void gst_gs_src_get_property(GObject* object,
guint prop_id,
GValue* value,
GParamSpec* pspec);
static gboolean gst_gs_src_start(GstBaseSrc* basesrc);
static gboolean gst_gs_src_stop(GstBaseSrc* basesrc);
static gboolean gst_gs_src_is_seekable(GstBaseSrc* src);
static gboolean gst_gs_src_get_size(GstBaseSrc* src, guint64* size);
static GstFlowReturn gst_gs_src_fill(GstBaseSrc* src,
guint64 offset,
guint length,
GstBuffer* buf);
static gboolean gst_gs_src_query(GstBaseSrc* src, GstQuery* query);
static void gst_gs_src_uri_handler_init(gpointer g_iface, gpointer iface_data);
#define _do_init \
G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, gst_gs_src_uri_handler_init); \
GST_DEBUG_CATEGORY_INIT(gst_gs_src_debug, "gssrc", 0, "gssrc element");
#define gst_gs_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE(GstGsSrc, gst_gs_src, GST_TYPE_BASE_SRC, _do_init);
namespace gcs = google::cloud::storage;
class GSReadStream {
public:
GSReadStream(GstGsSrc* src,
const std::int64_t start = 0,
const std::int64_t end = -1)
: gcs_stream_(src->gcs_client->ReadObject(src->bucket_name,
src->object_name,
gcs::ReadFromOffset(start))) {}
~GSReadStream() { gcs_stream_.Close(); }
gcs::ObjectReadStream& stream() { return gcs_stream_; }
private:
gcs::ObjectReadStream gcs_stream_;
};
static void gst_gs_src_class_init(GstGsSrcClass* klass) {
GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
GstBaseSrcClass* gstbasesrc_class = GST_BASE_SRC_CLASS(klass);
gobject_class->set_property = gst_gs_src_set_property;
gobject_class->get_property = gst_gs_src_get_property;
/**
* GstGsSink:location:
*
* Name of the Google Cloud Storage bucket.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_LOCATION,
g_param_spec_string(
"location", "File Location", "Location of the file to read", NULL,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
/**
* GstGsSrc:service-account-email:
*
* Service Account Email to use for credentials.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
g_param_spec_string(
"service-account-email", "Service Account Email",
"Service Account Email to use for credentials", NULL,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
gobject_class->finalize = gst_gs_src_finalize;
gst_element_class_set_static_metadata(
gstelement_class, "Google Cloud Storage Source", "Source/File",
"Read from arbitrary point from a file in a Google Cloud Storage",
"Julien Isorce <jisorce@oblong.com>");
gst_element_class_add_static_pad_template(gstelement_class, &srctemplate);
gstbasesrc_class->start = GST_DEBUG_FUNCPTR(gst_gs_src_start);
gstbasesrc_class->stop = GST_DEBUG_FUNCPTR(gst_gs_src_stop);
gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR(gst_gs_src_is_seekable);
gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR(gst_gs_src_get_size);
gstbasesrc_class->fill = GST_DEBUG_FUNCPTR(gst_gs_src_fill);
gstbasesrc_class->query = GST_DEBUG_FUNCPTR(gst_gs_src_query);
}
static void gst_gs_src_init(GstGsSrc* src) {
src->gcs_stream = nullptr;
src->uri = NULL;
src->service_account_email = NULL;
src->read_position = 0;
src->object_size = 0;
gst_base_src_set_blocksize(GST_BASE_SRC(src), DEFAULT_BLOCKSIZE);
gst_base_src_set_dynamic_size(GST_BASE_SRC(src), FALSE);
gst_base_src_set_live(GST_BASE_SRC(src), FALSE);
}
static void gst_gs_src_finalize(GObject* object) {
GstGsSrc* src = GST_GS_SRC(object);
g_free(src->uri);
src->uri = NULL;
g_free(src->service_account_email);
src->service_account_email = NULL;
src->read_position = 0;
src->object_size = 0;
G_OBJECT_CLASS(parent_class)->finalize(object);
}
static gboolean gst_gs_src_set_location(GstGsSrc* src,
const gchar* location,
GError** err) {
GstState state = GST_STATE_NULL;
std::string filepath = location;
size_t delimiter = std::string::npos;
// The element must be stopped in order to do this.
GST_OBJECT_LOCK(src);
state = GST_STATE(src);
if (state != GST_STATE_READY && state != GST_STATE_NULL)
goto wrong_state;
GST_OBJECT_UNLOCK(src);
g_free(src->uri);
src->uri = NULL;
if (location) {
if (g_str_has_prefix(location, "gs://")) {
src->uri = g_strdup(location);
filepath = filepath.substr(5);
} else {
src->uri = g_strdup_printf("gs://%s", location);
filepath = location;
}
delimiter = filepath.find_first_of('/');
if (delimiter == std::string::npos)
goto wrong_location;
std::string bucket_name = filepath.substr(0, delimiter);
src->bucket_name = bucket_name;
src->object_name = filepath.substr(delimiter + 1);
GST_INFO_OBJECT(src, "uri is %s", src->uri);
GST_INFO_OBJECT(src, "bucket name is %s", src->bucket_name.c_str());
GST_INFO_OBJECT(src, "object name is %s", src->object_name.c_str());
}
g_object_notify(G_OBJECT(src), "location");
return TRUE;
// ERROR.
wrong_state : {
g_warning(
"Changing the `location' property on gssrc when a file is open"
"is not supported.");
if (err)
g_set_error(
err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
"Changing the `location' property on gssrc when a file is open is "
"not supported.");
GST_OBJECT_UNLOCK(src);
return FALSE;
}
wrong_location : {
if (err)
g_set_error(err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
"Failed to find a bucket name");
GST_OBJECT_UNLOCK(src);
return FALSE;
}
}
static gboolean gst_gs_src_set_service_account_email(
GstGsSrc* src,
const gchar* service_account_email) {
if (GST_STATE(src) == GST_STATE_PLAYING ||
GST_STATE(src) == GST_STATE_PAUSED) {
GST_WARNING_OBJECT(src,
"Setting a new service account email not supported in "
"PLAYING or PAUSED state");
return FALSE;
}
GST_OBJECT_LOCK(src);
g_free(src->service_account_email);
src->service_account_email = NULL;
if (service_account_email)
src->service_account_email = g_strdup(service_account_email);
GST_OBJECT_UNLOCK(src);
return TRUE;
}
static void gst_gs_src_set_property(GObject* object,
guint prop_id,
const GValue* value,
GParamSpec* pspec) {
GstGsSrc* src = GST_GS_SRC(object);
g_return_if_fail(GST_IS_GS_SRC(object));
switch (prop_id) {
case PROP_LOCATION:
gst_gs_src_set_location(src, g_value_get_string(value), NULL);
break;
case PROP_SERVICE_ACCOUNT_EMAIL:
gst_gs_src_set_service_account_email(src, g_value_get_string(value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static void gst_gs_src_get_property(GObject* object,
guint prop_id,
GValue* value,
GParamSpec* pspec) {
GstGsSrc* src = GST_GS_SRC(object);
g_return_if_fail(GST_IS_GS_SRC(object));
switch (prop_id) {
case PROP_LOCATION:
GST_OBJECT_LOCK(src);
g_value_set_string(value, src->uri);
GST_OBJECT_UNLOCK(src);
break;
case PROP_SERVICE_ACCOUNT_EMAIL:
GST_OBJECT_LOCK(src);
g_value_set_string(value, src->service_account_email);
GST_OBJECT_UNLOCK(src);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static gint gst_gs_read_stream(GstGsSrc* src,
guint8* data,
const guint64 offset,
const guint length) {
gint gcount = 0;
gchar* sdata = reinterpret_cast<gchar*>(data);
gcs::ObjectReadStream& stream = src->gcs_stream->stream();
while (!stream.eof()) {
stream.read(sdata, length);
if (stream.status().ok())
break;
GST_ERROR_OBJECT(src, "Restart after (%s)",
stream.status().message().c_str());
src->gcs_stream = std::make_unique<GSReadStream>(src, offset);
}
gcount = stream.gcount();
GST_INFO_OBJECT(src, "Client read %d bytes", gcount);
return gcount;
}
static GstFlowReturn gst_gs_src_fill(GstBaseSrc* basesrc,
guint64 offset,
guint length,
GstBuffer* buf) {
GstGsSrc* src = GST_GS_SRC(basesrc);
guint to_read = 0;
guint bytes_read = 0;
gint ret = 0;
GstMapInfo info = {};
guint8* data = NULL;
if (G_UNLIKELY(offset != (guint64)-1 && src->read_position != offset)) {
src->gcs_stream = std::make_unique<GSReadStream>(src, offset);
src->read_position = offset;
}
if (!gst_buffer_map(buf, &info, GST_MAP_WRITE))
goto buffer_write_fail;
data = info.data;
bytes_read = 0;
to_read = length;
while (to_read > 0) {
GST_INFO_OBJECT(src, "Reading %d bytes at offset 0x%" G_GINT64_MODIFIER "x",
to_read, offset + bytes_read);
ret = gst_gs_read_stream(src, data + bytes_read, offset, to_read);
if (G_UNLIKELY(ret < 0))
goto could_not_read;
if (G_UNLIKELY(ret == 0)) {
// Push any remaining data.
if (bytes_read > 0)
break;
goto eos;
}
to_read -= ret;
bytes_read += ret;
src->read_position += ret;
}
GST_INFO_OBJECT(
src, "Read %" G_GUINT32_FORMAT " bytes of %" G_GUINT32_FORMAT " length",
bytes_read, length);
gst_buffer_unmap(buf, &info);
if (bytes_read != length)
gst_buffer_resize(buf, 0, bytes_read);
GST_BUFFER_OFFSET(buf) = offset;
GST_BUFFER_OFFSET_END(buf) = offset + bytes_read;
return GST_FLOW_OK;
// ERROR.
could_not_read : {
GST_ELEMENT_ERROR(src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unmap(buf, &info);
gst_buffer_resize(buf, 0, 0);
return GST_FLOW_ERROR;
}
eos : {
GST_INFO_OBJECT(src, "EOS");
gst_buffer_unmap(buf, &info);
gst_buffer_resize(buf, 0, 0);
return GST_FLOW_EOS;
}
buffer_write_fail : {
GST_ELEMENT_ERROR(src, RESOURCE, WRITE, (NULL), ("Can't write to buffer"));
return GST_FLOW_ERROR;
}
}
static gboolean gst_gs_src_is_seekable(GstBaseSrc* basesrc) {
return TRUE;
}
static gboolean gst_gs_src_get_size(GstBaseSrc* basesrc, guint64* size) {
GstGsSrc* src = GST_GS_SRC(basesrc);
*size = src->object_size;
return TRUE;
}
static gboolean gst_gs_src_start(GstBaseSrc* basesrc) {
GstGsSrc* src = GST_GS_SRC(basesrc);
GError* err = NULL;
guint blocksize = 0;
src->read_position = 0;
src->object_size = 0;
if (src->uri == NULL || src->uri[0] == '\0') {
GST_ELEMENT_ERROR(src, RESOURCE, NOT_FOUND,
("No uri specified for reading."), (NULL));
return FALSE;
}
GST_INFO_OBJECT(src, "Opening file %s", src->uri);
src->gcs_client = gst_gs_create_client(src->service_account_email, &err);
if (err) {
GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ,
("Could not create client (%s)", err->message),
GST_ERROR_SYSTEM);
g_clear_error(&err);
return FALSE;
}
GST_INFO_OBJECT(src, "Parsed bucket name (%s) and object name (%s)",
src->bucket_name.c_str(), src->object_name.c_str());
google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
src->gcs_client->GetObjectMetadata(src->bucket_name, src->object_name);
if (!object_metadata) {
GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ,
("Could not get object metadata (%s)",
object_metadata.status().message().c_str()),
GST_ERROR_SYSTEM);
return FALSE;
}
src->object_size = object_metadata->size();
GST_INFO_OBJECT(src, "Object size %" G_GUINT64_FORMAT "\n", src->object_size);
src->gcs_stream = std::make_unique<GSReadStream>(src);
blocksize = gcs::ClientOptions(nullptr).download_buffer_size();
GST_INFO_OBJECT(src, "Set blocksize to %" G_GUINT32_FORMAT, blocksize);
gst_base_src_set_blocksize(GST_BASE_SRC(src), blocksize);
return TRUE;
}
static gboolean gst_gs_src_stop(GstBaseSrc* basesrc) {
GstGsSrc* src = GST_GS_SRC(basesrc);
src->gcs_stream = nullptr;
src->read_position = 0;
src->object_size = 0;
return TRUE;
}
static gboolean gst_gs_src_query(GstBaseSrc* src, GstQuery* query) {
gboolean ret;
switch (GST_QUERY_TYPE(query)) {
case GST_QUERY_SCHEDULING: {
// A pushsrc can by default never operate in pull mode override
// if you want something different.
gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEQUENTIAL, 1, -1, 0);
gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH);
ret = TRUE;
break;
}
default:
ret = GST_BASE_SRC_CLASS(parent_class)->query(src, query);
break;
}
return ret;
}
static GstURIType gst_gs_src_uri_get_type(GType type) {
return GST_URI_SRC;
}
static const gchar* const* gst_gs_src_uri_get_protocols(GType type) {
static const gchar* protocols[] = {"gs", NULL};
return protocols;
}
static gchar* gst_gs_src_uri_get_uri(GstURIHandler* handler) {
GstGsSrc* src = GST_GS_SRC(handler);
return g_strdup(src->uri);
}
static gboolean gst_gs_src_uri_set_uri(GstURIHandler* handler,
const gchar* uri,
GError** err) {
GstGsSrc* src = GST_GS_SRC(handler);
if (strcmp(uri, "gs://") == 0) {
// Special case for "gs://" as this is used by some applications
// to test with gst_element_make_from_uri if there's an element
// that supports the URI protocol.
gst_gs_src_set_location(src, NULL, NULL);
return TRUE;
}
return gst_gs_src_set_location(src, uri, err);
}
static void gst_gs_src_uri_handler_init(gpointer g_iface, gpointer iface_data) {
GstURIHandlerInterface* iface = (GstURIHandlerInterface*)g_iface;
iface->get_type = gst_gs_src_uri_get_type;
iface->get_protocols = gst_gs_src_uri_get_protocols;
iface->get_uri = gst_gs_src_uri_get_uri;
iface->set_uri = gst_gs_src_uri_set_uri;
}