gs: add source and sink for Google Cloud Storage

Useful when having a service that runs a GStreamer pipeline
or application in Google Cloud to avoid storing the inputs
and outputs in the running container or service. For example
when analyzing a video from a Google Cloud Storage bucket
and extracting images or converting the video and then uploading
the results into another Google Cloud Storage bucket.

- gssrc allows to read from a file located in Google Cloud
Storage and it supports seeking.
- gssink allows to write to a file located in Google Cloud
Storage. There are 2 modes, one similar to multifilesink and
the other similar to filesink.

Example:
  gst-launch-1.0 gssrc location=gs://mybucket/videos/sample.mp4 ! decodebin ! glimagesink
  gst-launch-1.0 playbin uri=gs://mybucket/videos/sample.mp4
  gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! gssink object-name="img/img%05d.png" bucket-name="mybucket" next-file=buffer
  gst-launch-1.0 filesrc location=sample.mp4 ! gssink object-name="videos/video.mp4" bucket-name="mybucket" next-file=none

When running locally simply set GOOGLE_APPLICATION_CREDENTIALS. But
when running in Google Cloud Run or Google Cloud Engine, just set the
"service-account-email" property on each element.

Closes #1264

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1369>
This commit is contained in:
Julien 2020-06-23 12:41:27 -07:00 committed by Julien Isorce
parent 872b7f503c
commit e9f5d94c93
13 changed files with 2010 additions and 0 deletions

View file

@ -25898,6 +25898,195 @@
"tracers": {},
"url": "Unknown package origin"
},
"gs": {
"description": "Read and write from and to a Google Cloud Storage",
"elements": {
"gssink": {
"author": "Julien Isorce <jisorce@oblong.com>",
"description": "Write buffers to a sequentially named set of files on Google Cloud Storage",
"hierarchy": [
"GstGsSink",
"GstBaseSink",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Sink/File",
"long-name": "Google Cloud Storage Sink",
"pad-templates": {
"sink": {
"caps": "ANY",
"direction": "sink",
"presence": "always"
}
},
"properties": {
"bucket-name": {
"blurb": "Google Cloud Storage Bucket Name",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"index": {
"blurb": "Index to use with location property to create file names. The index is incremented by one for each buffer written.",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "2147483647",
"min": "0",
"mutable": "null",
"readable": true,
"type": "gint",
"writable": true
},
"next-file": {
"blurb": "When to start a new file",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "buffer (0)",
"mutable": "null",
"readable": true,
"type": "GstGsSinkNext",
"writable": true
},
"object-name": {
"blurb": "Full path name of the remote file",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "%%s_%%05d",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"post-messages": {
"blurb": "Post a message for each file with information of the buffer",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "null",
"readable": true,
"type": "gboolean",
"writable": true
},
"service-account-email": {
"blurb": "Service Account Email to use for credentials",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "ready",
"readable": true,
"type": "gchararray",
"writable": true
},
"start-date": {
"blurb": "Start date in iso8601 format",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "ready",
"readable": true,
"type": "gchararray",
"writable": true
}
},
"rank": "none"
},
"gssrc": {
"author": "Julien Isorce <jisorce@oblong.com>",
"description": "Read from arbitrary point from a file in a Google Cloud Storage",
"hierarchy": [
"GstGsSrc",
"GstBaseSrc",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstURIHandler"
],
"klass": "Source/File",
"long-name": "Google Cloud Storage Source",
"pad-templates": {
"src": {
"caps": "ANY",
"direction": "src",
"presence": "always"
}
},
"properties": {
"location": {
"blurb": "Location of the file to read",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "ready",
"readable": true,
"type": "gchararray",
"writable": true
},
"service-account-email": {
"blurb": "Service Account Email to use for credentials",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "ready",
"readable": true,
"type": "gchararray",
"writable": true
}
},
"rank": "none"
}
},
"filename": "gstgs",
"license": "LGPL",
"other-types": {
"GstGsSinkNext": {
"kind": "enum",
"values": [
{
"desc": "New file for each buffer",
"name": "buffer",
"value": "0"
},
{
"desc": "New file for each buffer",
"name": "Only one file, no next file",
"value": "1"
}
]
}
},
"package": "GStreamer Bad Plug-ins",
"source": "gst-plugins-bad",
"tracers": {},
"url": "Unknown package origin"
},
"gsm": {
"description": "GSM encoder/decoder",
"elements": {

39
ext/gs/.clang-format Normal file
View file

@ -0,0 +1,39 @@
# Defines the Chromium style for automatic reformatting.
# http://clang.llvm.org/docs/ClangFormatStyleOptions.html
BasedOnStyle: Chromium
# This defaults to 'Auto'. Explicitly set it for a while, so that
# 'vector<vector<int> >' in existing files gets formatted to
# 'vector<vector<int>>'. ('Auto' means that clang-format will only use
# 'int>>' if the file already contains at least one such instance.)
Standard: Cpp11
# Make sure code like:
# IPC_BEGIN_MESSAGE_MAP()
# IPC_MESSAGE_HANDLER(WidgetHostViewHost_Update, OnUpdate)
# IPC_END_MESSAGE_MAP()
# gets correctly indented.
MacroBlockBegin: "^\
BEGIN_MSG_MAP|\
BEGIN_MSG_MAP_EX|\
BEGIN_SAFE_MSG_MAP_EX|\
CR_BEGIN_MSG_MAP_EX|\
IPC_BEGIN_MESSAGE_MAP|\
IPC_BEGIN_MESSAGE_MAP_WITH_PARAM|\
IPC_PROTOBUF_MESSAGE_TRAITS_BEGIN|\
IPC_STRUCT_BEGIN|\
IPC_STRUCT_BEGIN_WITH_PARENT|\
IPC_STRUCT_TRAITS_BEGIN|\
POLPARAMS_BEGIN|\
PPAPI_BEGIN_MESSAGE_MAP$"
MacroBlockEnd: "^\
CR_END_MSG_MAP|\
END_MSG_MAP|\
IPC_END_MESSAGE_MAP|\
IPC_PROTOBUF_MESSAGE_TRAITS_END|\
IPC_STRUCT_END|\
IPC_STRUCT_TRAITS_END|\
POLPARAMS_END|\
PPAPI_END_MESSAGE_MAP$"
# TODO: Remove this once clang-format r357700 is rolled in.
JavaImportGroups: ['android', 'androidx', 'com', 'dalvik', 'junit', 'org', 'com.google.android.apps.chrome', 'org.chromium', 'java', 'javax']

71
ext/gs/README.md Normal file
View file

@ -0,0 +1,71 @@
# Install the Google Cloud Storage dependencies.
```
sudo apt-get install \
cmake \
libcurl3-gnutls-dev \
libgrpc++-dev \
libprotobuf-dev \
protobuf-compiler-grpc
```
# Build the Google Cloud Storage library
```
git clone https://github.com/google/crc32c.git
cd crc32c && git checkout -b 1.1.1
mkdir build && cd build
cmake .. \
-GNinja \
-DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \
-DCMAKE_INSTALL_LIBDIR:PATH=lib \
-DBUILD_SHARED_LIBS=YES \
-DCRC32C_USE_GLOG=NO \
-DCRC32C_BUILD_TESTS=NO \
-DCRC32C_BUILD_BENCHMARKS=NO
ninja && ninja install
git clone https://github.com/abseil/abseil-cpp.git
git checkout master
mkdir build && cd build
cmake .. \
-GNinja \
-DBUILD_TESTING=NO \
-DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \
-DCMAKE_INSTALL_LIBDIR:PATH=lib \
-DBUILD_SHARED_LIBS=YES
ninja && ninja install
git clone https://github.com/googleapis/google-cloud-cpp.git
git checkout -b v1.25.0
mkdir build && cd build
cmake .. \
-GNinja \
-DCMAKE_INSTALL_PREFIX:PATH=~/dev/gst-build/prefix \
-DCMAKE_INSTALL_LIBDIR:PATH=lib \
-DBUILD_SHARED_LIBS=YES \
-DBUILD_TESTING=NO \
-DGOOGLE_CLOUD_CPP_ENABLE=storage
ninja && ninja install
```
# Running the gs elements locally
When running from the command line or in a container running locally, simply
set the credentials by exporting GOOGLE_APPLICATION_CREDENTIALS. If you are
not familiar with this environment variable, check the documentation
https://cloud.google.com/docs/authentication/getting-started
Note that you can restrict a service account to the role Storage Admin or
Storage Object Creator instead of the Project Owner role from the above
documentation.
# Running the gs elements in Google Cloud Run
Add the Storage Object Viewer role to the service account assigned to the
Cloud Run service where gssrc runs. For gssink add the role Storage Object
Creator. Then just set the service-account-email property on the element.
# Running the gs elements in Google Cloud Kubernetes
You need to set GOOGLE_APPLICATION_CREDENTIALS in the container and ship the
json file to which the environment variable points to.

56
ext/gs/gstgs.cpp Normal file
View file

@ -0,0 +1,56 @@
/* GStreamer
* Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
*
* gstgssrc.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* plugin-gs:
*
* The gs plugin contains elements to interact with with Google Cloud Storage.
* In particular with the gs:// protocol or by specifying the storage bucket.
*
* Since: 1.20
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstgssink.h"
#include "gstgssrc.h"
GST_DEBUG_CATEGORY (gst_gs_src_debug);
static gboolean
plugin_init (GstPlugin * plugin)
{
if (!gst_element_register (plugin, "gssrc", GST_RANK_NONE, GST_TYPE_GS_SRC))
return FALSE;
if (!gst_element_register (plugin, "gssink", GST_RANK_NONE, GST_TYPE_GS_SINK))
return FALSE;
return TRUE;
}
GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
GST_VERSION_MINOR,
gs,
"Read and write from and to a Google Cloud Storage",
plugin_init,
PACKAGE_VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)

134
ext/gs/gstgscommon.cpp Normal file
View file

@ -0,0 +1,134 @@
/* GStreamer
* Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
*
* gstgscommon.h:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include "gstgscommon.h"
#include "google/cloud/storage/oauth2/compute_engine_credentials.h"
namespace gcs = google::cloud::storage;
namespace {
#if !GLIB_CHECK_VERSION(2, 62, 0)
static inline gchar *
g_date_time_format_iso8601 (GDateTime * datetime)
{
GString *
outstr = NULL;
gchar *
main_date = NULL;
gint64 offset;
// Main date and time.
main_date = g_date_time_format (datetime, "%Y-%m-%dT%H:%M:%S");
outstr = g_string_new (main_date);
g_free (main_date);
// Timezone. Format it as `%:::z` unless the offset is zero, in which case
// we can simply use `Z`.
offset = g_date_time_get_utc_offset (datetime);
if (offset == 0) {
g_string_append_c (outstr, 'Z');
} else {
gchar *
time_zone = g_date_time_format (datetime, "%:::z");
g_string_append (outstr, time_zone);
g_free (time_zone);
}
return g_string_free (outstr, FALSE);
}
#endif
} // namespace
std::unique_ptr <
google::cloud::storage::Client >
gst_gs_create_client (const gchar * service_account_email, GError ** error)
{
if (service_account_email) {
// Meant to be used from a container running in the Cloud.
google::cloud::StatusOr < std::shared_ptr <
gcs::oauth2::Credentials >> creds (std::make_shared <
gcs::oauth2::ComputeEngineCredentials <>> (service_account_email));
if (!creds) {
g_set_error (error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_NOT_AUTHORIZED,
"Could not retrieve credentials for the given service account %s (%s)",
service_account_email, creds.status ().message ().c_str ());
return nullptr;
}
gcs::ClientOptions client_options (std::move (creds.value ()));
return std::make_unique < gcs::Client > (client_options,
gcs::StrictIdempotencyPolicy ());
}
// Default account. This is meant to retrieve the credentials automatically
// using diffrent methods.
google::cloud::StatusOr < gcs::ClientOptions > client_options =
gcs::ClientOptions::CreateDefaultClientOptions ();
if (!client_options) {
g_set_error (error, GST_RESOURCE_ERROR,
GST_RESOURCE_ERROR_NOT_AUTHORIZED,
"Could not create default client options (%s)",
client_options.status ().message ().c_str ());
return nullptr;
}
return std::make_unique < gcs::Client > (client_options.value (),
gcs::StrictIdempotencyPolicy ());
}
gboolean
gst_gs_get_buffer_date (GstBuffer * buffer, GDateTime * start_date,
gchar ** buffer_date_str_ptr)
{
gchar *
buffer_date_str = NULL;
GstClockTime buffer_timestamp = GST_CLOCK_TIME_NONE;
GTimeSpan buffer_timespan = 0;
if (!buffer || !start_date)
return FALSE;
buffer_timestamp = GST_BUFFER_PTS (buffer);
// GTimeSpan is in micro seconds.
buffer_timespan = GST_TIME_AS_USECONDS (buffer_timestamp);
GDateTime *
buffer_date = g_date_time_add (start_date, buffer_timespan);
if (!buffer_date)
return FALSE;
buffer_date_str = g_date_time_format_iso8601 (buffer_date);
g_date_time_unref (buffer_date);
if (!buffer_date_str)
return FALSE;
if (buffer_date_str_ptr)
*buffer_date_str_ptr = buffer_date_str;
return TRUE;
}

39
ext/gs/gstgscommon.h Normal file
View file

@ -0,0 +1,39 @@
/* GStreamer
* Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
*
* gstgscommon.h:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_GS_COMMON_H__
#define __GST_GS_COMMON_H__
#include <memory>
#include <gst/gst.h>
#include <google/cloud/storage/client.h>
std::unique_ptr<google::cloud::storage::Client> gst_gs_create_client(
const gchar* service_account_email,
GError** error);
gboolean gst_gs_get_buffer_date(GstBuffer* buffer,
GDateTime* start_date,
gchar** buffer_date_str_ptr);
#endif // __GST_GS_COMMON_H__

793
ext/gs/gstgssink.cpp Normal file
View file

@ -0,0 +1,793 @@
/* GStreamer
* Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
*
* gstgssink.cpp:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-gssink
* @title: gssink
* @see_also: #GstGsSrc
*
* Write incoming data to a series of sequentially-named remote files on a
* Google Cloud Storage bucket.
*
* The object-name property should contain a string with a \%d placeholder
* that will be substituted with the index for each filename.
*
* If the #GstGsSink:post-messages property is %TRUE, it sends an application
* message named `GstGsSink` after writing each buffer.
*
* The message's structure contains these fields:
*
* * #gchararray `filename`: the filename where the buffer was written.
* * #gchararray `date`: the date of the current buffer, NULL if no start date
* is provided.
* * #gint `index`: index of the buffer.
* * #GstClockTime `timestamp`: the timestamp of the buffer.
* * #GstClockTime `stream-time`: the stream time of the buffer.
* * #GstClockTime `running-time`: the running_time of the buffer.
* * #GstClockTime `duration`: the duration of the buffer.
* * #guint64 `offset`: the offset of the buffer that triggered the message.
* * #guint64 `offset-end`: the offset-end of the buffer that triggered the
* message.
*
* ## Example launch line
* ```
* gst-launch-1.0 videotestsrc num-buffers=15 ! pngenc ! gssink
* object-name="mypath/myframes/frame%05d.png" bucket-name="mybucket"
* next-file=buffer post-messages=true
* ```
* ### Upload 15 png images into gs://mybucket/mypath/myframes/ where the file
* names are frame00000.png, frame00001.png, ..., frame00014.png
* ```
* gst-launch-1.0 videotestsrc num-buffers=6 ! video/x-raw, framerate=2/1 !
* pngenc ! gssink start-date="2020-04-16T08:55:03Z"
* object-name="mypath/myframes/im_%s_%03d.png" bucket-name="mybucket"
* next-file=buffer post-messages=true
* ```
* ### Upload png 6 images into gs://mybucket/mypath/myframes/ where the file
* names are im_2020-04-16T08:55:03Z_000.png, im_2020-04-16T08:55:03Z_001.png,
* im_2020-04-16T08:55:04Z_002.png, im_2020-04-16T08:55:04Z_003.png,
* im_2020-04-16T08:55:05Z_004.png, im_2020-04-16T08:55:05Z_005.png.
* ```
* gst-launch-1.0 filesrc location=some_video.mp4 ! gssink
* object-name="mypath/myvideos/video.mp4" bucket-name="mybucket" next-file=none
* ```
* ### Upload any stream as a single file into Google Cloud Storage. Similar as
* filesink in this case. The file is then accessible from:
* gs://mybucket/mypath/myvideos/video.mp4
*
* See also: #GstGsSrc
* Since: 1.20
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstgscommon.h"
#include "gstgssink.h"
#include <algorithm>
static GstStaticPadTemplate sinktemplate =
GST_STATIC_PAD_TEMPLATE("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC(gst_gs_sink_debug);
#define GST_CAT_DEFAULT gst_gs_sink_debug
#define DEFAULT_INDEX 0
#define DEFAULT_NEXT_FILE GST_GS_SINK_NEXT_BUFFER
#define DEFAULT_OBJECT_NAME "%s_%05d"
#define DEFAULT_POST_MESSAGES FALSE
namespace gcs = google::cloud::storage;
enum {
PROP_0,
PROP_BUCKET_NAME,
PROP_OBJECT_NAME,
PROP_INDEX,
PROP_POST_MESSAGES,
PROP_NEXT_FILE,
PROP_SERVICE_ACCOUNT_EMAIL,
PROP_START_DATE,
};
class GSWriteStream;
struct _GstGsSink {
GstBaseSink parent;
std::unique_ptr<google::cloud::storage::Client> gcs_client;
std::unique_ptr<GSWriteStream> gcs_stream;
gchar* service_account_email;
gchar* bucket_name;
gchar* object_name;
gchar* start_date_str;
GDateTime* start_date;
gint index;
gboolean post_messages;
GstGsSinkNext next_file;
const gchar* content_type;
size_t nb_percent_format;
gboolean percent_s_is_first;
};
static void gst_gs_sink_finalize(GObject* object);
static void gst_gs_sink_set_property(GObject* object,
guint prop_id,
const GValue* value,
GParamSpec* pspec);
static void gst_gs_sink_get_property(GObject* object,
guint prop_id,
GValue* value,
GParamSpec* pspec);
static gboolean gst_gs_sink_start(GstBaseSink* bsink);
static gboolean gst_gs_sink_stop(GstBaseSink* sink);
static GstFlowReturn gst_gs_sink_render(GstBaseSink* sink, GstBuffer* buffer);
static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
GstBufferList* buffer_list);
static gboolean gst_gs_sink_set_caps(GstBaseSink* sink, GstCaps* caps);
static gboolean gst_gs_sink_event(GstBaseSink* sink, GstEvent* event);
#define GST_TYPE_GS_SINK_NEXT (gst_gs_sink_next_get_type())
static GType gst_gs_sink_next_get_type(void) {
static GType gs_sink_next_type = 0;
static const GEnumValue next_types[] = {
{GST_GS_SINK_NEXT_BUFFER, "New file for each buffer", "buffer"},
{GST_GS_SINK_NEXT_NONE, "Only one file, no next file", "none"},
{0, NULL, NULL}};
if (!gs_sink_next_type) {
gs_sink_next_type = g_enum_register_static("GstGsSinkNext", next_types);
}
return gs_sink_next_type;
}
#define gst_gs_sink_parent_class parent_class
G_DEFINE_TYPE(GstGsSink, gst_gs_sink, GST_TYPE_BASE_SINK);
class GSWriteStream {
public:
GSWriteStream(google::cloud::storage::Client& client,
const char* bucket_name,
const char* object_name,
const char* content_type)
: gcs_stream_(client.WriteObject(
bucket_name,
object_name,
gcs::WithObjectMetadata(
gcs::ObjectMetadata().set_content_type(content_type)))) {}
~GSWriteStream() { gcs_stream_.Close(); }
gcs::ObjectWriteStream& stream() { return gcs_stream_; }
private:
gcs::ObjectWriteStream gcs_stream_;
};
static void gst_gs_sink_class_init(GstGsSinkClass* klass) {
GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
GstBaseSinkClass* gstbasesink_class = GST_BASE_SINK_CLASS(klass);
gobject_class->set_property = gst_gs_sink_set_property;
gobject_class->get_property = gst_gs_sink_get_property;
/**
* GstGsSink:bucket-name:
*
* Name of the Google Cloud Storage bucket.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_BUCKET_NAME,
g_param_spec_string(
"bucket-name", "Bucket Name", "Google Cloud Storage Bucket Name",
NULL, (GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:object-name:
*
* Full path name of the remote file.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_OBJECT_NAME,
g_param_spec_string(
"object-name", "Object Name", "Full path name of the remote file",
DEFAULT_OBJECT_NAME,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:index:
*
* Index to use with location property to create file names.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_INDEX,
g_param_spec_int(
"index", "Index",
"Index to use with location property to create file names. The "
"index is incremented by one for each buffer written.",
0, G_MAXINT, DEFAULT_INDEX,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:post-messages:
*
* Post a message on the GstBus for each file.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_POST_MESSAGES,
g_param_spec_boolean(
"post-messages", "Post Messages",
"Post a message for each file with information of the buffer",
DEFAULT_POST_MESSAGES,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:next-file:
*
* A #GstGsSinkNext that specifies when to start a new file.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_NEXT_FILE,
g_param_spec_enum(
"next-file", "Next File", "When to start a new file",
GST_TYPE_GS_SINK_NEXT, DEFAULT_NEXT_FILE,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
/**
* GstGsSink:service-account-email:
*
* Service Account Email to use for credentials.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
g_param_spec_string(
"service-account-email", "Service Account Email",
"Service Account Email to use for credentials", NULL,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
/**
* GstGsSink:start-date:
*
* Start date in iso8601 format.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_START_DATE,
g_param_spec_string(
"start-date", "Start Date", "Start date in iso8601 format", NULL,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
gobject_class->finalize = gst_gs_sink_finalize;
gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR(gst_gs_sink_stop);
gstbasesink_class->render = GST_DEBUG_FUNCPTR(gst_gs_sink_render);
gstbasesink_class->render_list = GST_DEBUG_FUNCPTR(gst_gs_sink_render_list);
gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR(gst_gs_sink_set_caps);
gstbasesink_class->event = GST_DEBUG_FUNCPTR(gst_gs_sink_event);
GST_DEBUG_CATEGORY_INIT(gst_gs_sink_debug, "gssink", 0, "gssink element");
gst_element_class_add_static_pad_template(gstelement_class, &sinktemplate);
gst_element_class_set_static_metadata(
gstelement_class, "Google Cloud Storage Sink", "Sink/File",
"Write buffers to a sequentially named set of files on Google Cloud "
"Storage",
"Julien Isorce <jisorce@oblong.com>");
}
static void gst_gs_sink_init(GstGsSink* sink) {
sink->gcs_client = nullptr;
sink->gcs_stream = nullptr;
sink->index = DEFAULT_INDEX;
sink->post_messages = DEFAULT_POST_MESSAGES;
sink->service_account_email = NULL;
sink->bucket_name = NULL;
sink->object_name = g_strdup(DEFAULT_OBJECT_NAME);
sink->start_date_str = NULL;
sink->start_date = NULL;
sink->next_file = DEFAULT_NEXT_FILE;
sink->content_type = NULL;
sink->nb_percent_format = 0;
sink->percent_s_is_first = FALSE;
gst_base_sink_set_sync(GST_BASE_SINK(sink), FALSE);
}
static void gst_gs_sink_finalize(GObject* object) {
GstGsSink* sink = GST_GS_SINK(object);
sink->gcs_client = nullptr;
sink->gcs_stream = nullptr;
g_free(sink->service_account_email);
sink->service_account_email = NULL;
g_free(sink->bucket_name);
sink->bucket_name = NULL;
g_free(sink->object_name);
sink->object_name = NULL;
g_free(sink->start_date_str);
sink->start_date_str = NULL;
if (sink->start_date) {
g_date_time_unref(sink->start_date);
sink->start_date = NULL;
}
sink->content_type = NULL;
G_OBJECT_CLASS(parent_class)->finalize(object);
}
static gboolean gst_gs_sink_set_object_name(GstGsSink* sink,
const gchar* object_name) {
g_free(sink->object_name);
sink->object_name = NULL;
sink->nb_percent_format = 0;
sink->percent_s_is_first = FALSE;
if (!object_name) {
GST_ERROR_OBJECT(sink, "Object name is null");
return FALSE;
}
const std::string name(object_name);
sink->nb_percent_format = std::count(name.begin(), name.end(), '%');
if (sink->nb_percent_format > 2) {
GST_ERROR_OBJECT(sink, "Object name has too many formats");
return FALSE;
}
const size_t delimiter_percent_s = name.find("%s");
if (delimiter_percent_s == std::string::npos) {
if (sink->nb_percent_format == 2) {
GST_ERROR_OBJECT(sink, "Object name must have just one number format");
return FALSE;
}
sink->object_name = g_strdup(object_name);
return TRUE;
}
const size_t delimiter_percent = name.find_first_of('%');
if (delimiter_percent_s == delimiter_percent) {
sink->percent_s_is_first = TRUE;
if (name.find("%s", delimiter_percent_s + 1) != std::string::npos) {
GST_ERROR_OBJECT(sink, "Object name expect max one string format");
return FALSE;
}
}
sink->object_name = g_strdup(object_name);
return TRUE;
}
static void gst_gs_sink_set_property(GObject* object,
guint prop_id,
const GValue* value,
GParamSpec* pspec) {
GstGsSink* sink = GST_GS_SINK(object);
switch (prop_id) {
case PROP_BUCKET_NAME:
g_free(sink->bucket_name);
sink->bucket_name = g_strdup(g_value_get_string(value));
break;
case PROP_OBJECT_NAME:
gst_gs_sink_set_object_name(sink, g_value_get_string(value));
break;
case PROP_INDEX:
sink->index = g_value_get_int(value);
break;
case PROP_POST_MESSAGES:
sink->post_messages = g_value_get_boolean(value);
break;
case PROP_NEXT_FILE:
sink->next_file = (GstGsSinkNext)g_value_get_enum(value);
break;
case PROP_SERVICE_ACCOUNT_EMAIL:
g_free(sink->service_account_email);
sink->service_account_email = g_strdup(g_value_get_string(value));
break;
case PROP_START_DATE:
g_free(sink->start_date_str);
if (sink->start_date)
g_date_time_unref(sink->start_date);
sink->start_date_str = g_strdup(g_value_get_string(value));
sink->start_date =
g_date_time_new_from_iso8601(sink->start_date_str, NULL);
if (!sink->start_date) {
GST_ERROR_OBJECT(sink, "Failed to parse start date %s",
sink->start_date_str);
g_free(sink->start_date_str);
sink->start_date_str = NULL;
}
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static void gst_gs_sink_get_property(GObject* object,
guint prop_id,
GValue* value,
GParamSpec* pspec) {
GstGsSink* sink = GST_GS_SINK(object);
switch (prop_id) {
case PROP_BUCKET_NAME:
g_value_set_string(value, sink->bucket_name);
break;
case PROP_OBJECT_NAME:
g_value_set_string(value, sink->object_name);
break;
case PROP_INDEX:
g_value_set_int(value, sink->index);
break;
case PROP_POST_MESSAGES:
g_value_set_boolean(value, sink->post_messages);
break;
case PROP_NEXT_FILE:
g_value_set_enum(value, sink->next_file);
break;
case PROP_SERVICE_ACCOUNT_EMAIL:
g_value_set_string(value, sink->service_account_email);
break;
case PROP_START_DATE:
g_value_set_string(value, sink->start_date_str);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static gboolean gst_gs_sink_start(GstBaseSink* bsink) {
GstGsSink* sink = GST_GS_SINK(bsink);
GError* err = NULL;
if (!sink->bucket_name) {
GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Bucket name is required"),
GST_ERROR_SYSTEM);
return FALSE;
}
if (!sink->object_name) {
GST_ELEMENT_ERROR(sink, RESOURCE, SETTINGS, ("Object name is required"),
GST_ERROR_SYSTEM);
return FALSE;
}
sink->content_type = "";
sink->gcs_client = gst_gs_create_client(sink->service_account_email, &err);
if (err) {
GST_ELEMENT_ERROR(sink, RESOURCE, OPEN_READ,
("Could not create client (%s)", err->message),
GST_ERROR_SYSTEM);
g_clear_error(&err);
return FALSE;
}
GST_INFO_OBJECT(sink, "Using bucket name (%s) and object name (%s)",
sink->bucket_name, sink->object_name);
return TRUE;
}
static gboolean gst_gs_sink_stop(GstBaseSink* bsink) {
GstGsSink* sink = GST_GS_SINK(bsink);
sink->gcs_client = nullptr;
sink->gcs_stream = nullptr;
sink->content_type = NULL;
return TRUE;
}
static void gst_gs_sink_post_message_full(GstGsSink* sink,
GstClockTime timestamp,
GstClockTime duration,
GstClockTime offset,
GstClockTime offset_end,
GstClockTime running_time,
GstClockTime stream_time,
const char* filename,
const gchar* date) {
GstStructure* s;
if (!sink->post_messages)
return;
s = gst_structure_new("GstGsSink", "filename", G_TYPE_STRING, filename,
"date", G_TYPE_STRING, date, "index", G_TYPE_INT,
sink->index, "timestamp", G_TYPE_UINT64, timestamp,
"stream-time", G_TYPE_UINT64, stream_time,
"running-time", G_TYPE_UINT64, running_time, "duration",
G_TYPE_UINT64, duration, "offset", G_TYPE_UINT64,
offset, "offset-end", G_TYPE_UINT64, offset_end, NULL);
gst_element_post_message(GST_ELEMENT_CAST(sink),
gst_message_new_element(GST_OBJECT_CAST(sink), s));
}
static void gst_gs_sink_post_message_from_time(GstGsSink* sink,
GstClockTime timestamp,
GstClockTime duration,
const char* filename) {
GstClockTime running_time, stream_time;
guint64 offset, offset_end;
GstSegment* segment;
GstFormat format;
if (!sink->post_messages)
return;
segment = &GST_BASE_SINK(sink)->segment;
format = segment->format;
offset = -1;
offset_end = -1;
running_time = gst_segment_to_running_time(segment, format, timestamp);
stream_time = gst_segment_to_stream_time(segment, format, timestamp);
gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
running_time, stream_time, filename, NULL);
}
static void gst_gs_sink_post_message(GstGsSink* sink,
GstBuffer* buffer,
const char* filename,
const char* date) {
GstClockTime duration, timestamp;
GstClockTime running_time, stream_time;
guint64 offset, offset_end;
GstSegment* segment;
GstFormat format;
if (!sink->post_messages)
return;
segment = &GST_BASE_SINK(sink)->segment;
format = segment->format;
timestamp = GST_BUFFER_PTS(buffer);
duration = GST_BUFFER_DURATION(buffer);
offset = GST_BUFFER_OFFSET(buffer);
offset_end = GST_BUFFER_OFFSET_END(buffer);
running_time = gst_segment_to_running_time(segment, format, timestamp);
stream_time = gst_segment_to_stream_time(segment, format, timestamp);
gst_gs_sink_post_message_full(sink, timestamp, duration, offset, offset_end,
running_time, stream_time, filename, date);
}
static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
GstBuffer* buffer) {
GstMapInfo map = {0};
gchar* object_name = NULL;
gchar* buffer_date = NULL;
if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
return GST_FLOW_ERROR;
switch (sink->next_file) {
case GST_GS_SINK_NEXT_BUFFER: {
// Get buffer date if needed.
if (sink->start_date) {
if (sink->nb_percent_format != 2) {
GST_ERROR_OBJECT(sink, "Object name expects date and index");
gst_buffer_unmap(buffer, &map);
return GST_FLOW_ERROR;
}
if (!gst_gs_get_buffer_date(buffer, sink->start_date, &buffer_date)) {
GST_ERROR_OBJECT(sink, "Could not get buffer date %s", object_name);
gst_buffer_unmap(buffer, &map);
return GST_FLOW_ERROR;
}
if (sink->percent_s_is_first) {
object_name =
g_strdup_printf(sink->object_name, buffer_date, sink->index);
} else {
object_name =
g_strdup_printf(sink->object_name, sink->index, buffer_date);
}
} else {
if (sink->nb_percent_format != 1) {
GST_ERROR_OBJECT(sink, "Object name expects only an index");
gst_buffer_unmap(buffer, &map);
return GST_FLOW_ERROR;
}
object_name = g_strdup_printf(sink->object_name, sink->index);
}
GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject(
sink->bucket_name, object_name,
gcs::WithObjectMetadata(
gcs::ObjectMetadata().set_content_type(sink->content_type)));
gcs_stream.write(reinterpret_cast<const char*>(map.data), map.size);
if (gcs_stream.fail()) {
GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
}
gcs_stream.Close();
google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
sink->gcs_client->GetObjectMetadata(sink->bucket_name, object_name);
if (!object_metadata) {
GST_ERROR_OBJECT(
sink, "Could not get object metadata for object %s (%s)",
object_name, object_metadata.status().message().c_str());
gst_buffer_unmap(buffer, &map);
g_free(object_name);
g_free(buffer_date);
return GST_FLOW_ERROR;
}
GST_INFO_OBJECT(sink, "Wrote object %s of size %" G_GUINT64_FORMAT "\n",
object_name, object_metadata->size());
gst_gs_sink_post_message(sink, buffer, object_name, buffer_date);
g_free(object_name);
g_free(buffer_date);
++sink->index;
break;
}
case GST_GS_SINK_NEXT_NONE: {
if (!sink->gcs_stream) {
GST_INFO_OBJECT(sink, "Opening %s", sink->object_name);
sink->gcs_stream = std::make_unique<GSWriteStream>(
*sink->gcs_client.get(), sink->bucket_name, sink->object_name,
sink->content_type);
if (!sink->gcs_stream->stream().IsOpen()) {
GST_ELEMENT_ERROR(
sink, RESOURCE, OPEN_READ,
("Could not create write stream (%s)",
sink->gcs_stream->stream().last_status().message().c_str()),
GST_ERROR_SYSTEM);
gst_buffer_unmap(buffer, &map);
return GST_FLOW_OK;
}
}
GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
gcs::ObjectWriteStream& stream = sink->gcs_stream->stream();
stream.write(reinterpret_cast<const char*>(map.data), map.size);
if (stream.fail()) {
GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
}
break;
}
default:
g_assert_not_reached();
}
gst_buffer_unmap(buffer, &map);
return GST_FLOW_OK;
}
static GstFlowReturn gst_gs_sink_render(GstBaseSink* bsink, GstBuffer* buffer) {
GstGsSink* sink = GST_GS_SINK(bsink);
GstFlowReturn flow = GST_FLOW_OK;
flow = gst_gs_sink_write_buffer(sink, buffer);
return flow;
}
static gboolean buffer_list_copy_data(GstBuffer** buf,
guint idx,
gpointer data) {
GstBuffer* dest = GST_BUFFER_CAST(data);
guint num, i;
if (idx == 0)
gst_buffer_copy_into(dest, *buf, GST_BUFFER_COPY_METADATA, 0, -1);
num = gst_buffer_n_memory(*buf);
for (i = 0; i < num; ++i) {
GstMemory* mem;
mem = gst_buffer_get_memory(*buf, i);
gst_buffer_append_memory(dest, mem);
}
return TRUE;
}
/* Our assumption for now is that the buffers in a buffer list should always
* end up in the same file. If someone wants different behaviour, they'll just
* have to add a property for that. */
static GstFlowReturn gst_gs_sink_render_list(GstBaseSink* sink,
GstBufferList* list) {
GstBuffer* buf;
guint size;
size = gst_buffer_list_calculate_size(list);
GST_LOG_OBJECT(sink, "total size of buffer list %p: %u", list, size);
/* copy all buffers in the list into one single buffer, so we can use
* the normal render function (FIXME: optimise to avoid the memcpy) */
buf = gst_buffer_new();
gst_buffer_list_foreach(list, buffer_list_copy_data, buf);
g_assert(gst_buffer_get_size(buf) == size);
gst_gs_sink_render(sink, buf);
gst_buffer_unref(buf);
return GST_FLOW_OK;
}
static gboolean gst_gs_sink_set_caps(GstBaseSink* bsink, GstCaps* caps) {
GstGsSink* sink = GST_GS_SINK(bsink);
GstStructure* s = gst_caps_get_structure(caps, 0);
sink->content_type = gst_structure_get_name(s);
GST_INFO_OBJECT(sink, "Content type: %s", sink->content_type);
return TRUE;
}
static gboolean gst_gs_sink_event(GstBaseSink* bsink, GstEvent* event) {
GstGsSink* sink = GST_GS_SINK(bsink);
switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_EOS:
if (sink->gcs_stream) {
sink->gcs_stream = nullptr;
gst_gs_sink_post_message_from_time(
sink, GST_BASE_SINK(sink)->segment.position, -1, sink->object_name);
}
break;
default:
break;
}
return GST_BASE_SINK_CLASS(parent_class)->event(bsink, event);
}

47
ext/gs/gstgssink.h Normal file
View file

@ -0,0 +1,47 @@
/* GStreamer
* Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
*
* gstgssink.h:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_GS_SINK_H__
#define __GST_GS_SINK_H__
#include <gst/base/base.h>
#include <gst/gst.h>
G_BEGIN_DECLS
#define GST_TYPE_GS_SINK (gst_gs_sink_get_type())
G_DECLARE_FINAL_TYPE(GstGsSink, gst_gs_sink, GST, GS_SINK, GstBaseSink)
/**
* GstGsSinkNext:
* @GST_GS_SINK_NEXT_BUFFER: New file for each buffer.
* @GST_GS_SINK_NEXT_NONE: Only one file like filesink.
*
* File splitting modes.
* Since: 1.20
*/
typedef enum {
GST_GS_SINK_NEXT_BUFFER,
GST_GS_SINK_NEXT_NONE,
} GstGsSinkNext;
G_END_DECLS
#endif // __GST_GS_SINK_H__

578
ext/gs/gstgssrc.cpp Normal file
View file

@ -0,0 +1,578 @@
/* GStreamer
* Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
*
* gstgssrc.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:element-gssrc
* @title: gssrc
* @see_also: #GstGsSrc
*
* Read data from a file in a Google Cloud Storage.
*
* ## Example launch line
* ```
* gst-launch-1.0 gssrc location=gs://mybucket/myvideo.mkv ! decodebin !
* glimagesink
* ```
* ### Play a video from a Google Cloud Storage.
* ```
* gst-launch-1.0 gssrc location=gs://mybucket/myvideo.mkv ! decodebin ! navseek
* seek-offset=10 ! glimagesink
* ```
* ### Play a video from a Google Cloud Storage and seek using the keyboard
* from the terminal.
*
* See also: #GstGsSink
* Since: 1.20
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstgscommon.h"
#include "gstgssrc.h"
static GstStaticPadTemplate srctemplate =
GST_STATIC_PAD_TEMPLATE("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC(gst_gs_src_debug);
#define GST_CAT_DEFAULT gst_gs_src_debug
enum { LAST_SIGNAL };
#define DEFAULT_BLOCKSIZE 4 * 1024
enum { PROP_0, PROP_LOCATION, PROP_SERVICE_ACCOUNT_EMAIL };
class GSReadStream;
struct _GstGsSrc {
GstBaseSrc parent;
std::unique_ptr<google::cloud::storage::Client> gcs_client;
std::unique_ptr<GSReadStream> gcs_stream;
gchar* uri;
gchar* service_account_email;
std::string bucket_name;
std::string object_name;
guint64 read_position;
guint64 object_size;
};
static void gst_gs_src_finalize(GObject* object);
static void gst_gs_src_set_property(GObject* object,
guint prop_id,
const GValue* value,
GParamSpec* pspec);
static void gst_gs_src_get_property(GObject* object,
guint prop_id,
GValue* value,
GParamSpec* pspec);
static gboolean gst_gs_src_start(GstBaseSrc* basesrc);
static gboolean gst_gs_src_stop(GstBaseSrc* basesrc);
static gboolean gst_gs_src_is_seekable(GstBaseSrc* src);
static gboolean gst_gs_src_get_size(GstBaseSrc* src, guint64* size);
static GstFlowReturn gst_gs_src_fill(GstBaseSrc* src,
guint64 offset,
guint length,
GstBuffer* buf);
static gboolean gst_gs_src_query(GstBaseSrc* src, GstQuery* query);
static void gst_gs_src_uri_handler_init(gpointer g_iface, gpointer iface_data);
#define _do_init \
G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, gst_gs_src_uri_handler_init); \
GST_DEBUG_CATEGORY_INIT(gst_gs_src_debug, "gssrc", 0, "gssrc element");
#define gst_gs_src_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE(GstGsSrc, gst_gs_src, GST_TYPE_BASE_SRC, _do_init);
namespace gcs = google::cloud::storage;
class GSReadStream {
public:
GSReadStream(GstGsSrc* src,
const std::int64_t start = 0,
const std::int64_t end = -1)
: gcs_stream_(src->gcs_client->ReadObject(src->bucket_name,
src->object_name,
gcs::ReadFromOffset(start))) {}
~GSReadStream() { gcs_stream_.Close(); }
gcs::ObjectReadStream& stream() { return gcs_stream_; }
private:
gcs::ObjectReadStream gcs_stream_;
};
static void gst_gs_src_class_init(GstGsSrcClass* klass) {
GObjectClass* gobject_class = G_OBJECT_CLASS(klass);
GstElementClass* gstelement_class = GST_ELEMENT_CLASS(klass);
GstBaseSrcClass* gstbasesrc_class = GST_BASE_SRC_CLASS(klass);
gobject_class->set_property = gst_gs_src_set_property;
gobject_class->get_property = gst_gs_src_get_property;
/**
* GstGsSink:location:
*
* Name of the Google Cloud Storage bucket.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_LOCATION,
g_param_spec_string(
"location", "File Location", "Location of the file to read", NULL,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
/**
* GstGsSrc:service-account-email:
*
* Service Account Email to use for credentials.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_SERVICE_ACCOUNT_EMAIL,
g_param_spec_string(
"service-account-email", "Service Account Email",
"Service Account Email to use for credentials", NULL,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
gobject_class->finalize = gst_gs_src_finalize;
gst_element_class_set_static_metadata(
gstelement_class, "Google Cloud Storage Source", "Source/File",
"Read from arbitrary point from a file in a Google Cloud Storage",
"Julien Isorce <jisorce@oblong.com>");
gst_element_class_add_static_pad_template(gstelement_class, &srctemplate);
gstbasesrc_class->start = GST_DEBUG_FUNCPTR(gst_gs_src_start);
gstbasesrc_class->stop = GST_DEBUG_FUNCPTR(gst_gs_src_stop);
gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR(gst_gs_src_is_seekable);
gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR(gst_gs_src_get_size);
gstbasesrc_class->fill = GST_DEBUG_FUNCPTR(gst_gs_src_fill);
gstbasesrc_class->query = GST_DEBUG_FUNCPTR(gst_gs_src_query);
}
static void gst_gs_src_init(GstGsSrc* src) {
src->gcs_stream = nullptr;
src->uri = NULL;
src->service_account_email = NULL;
src->read_position = 0;
src->object_size = 0;
gst_base_src_set_blocksize(GST_BASE_SRC(src), DEFAULT_BLOCKSIZE);
gst_base_src_set_dynamic_size(GST_BASE_SRC(src), FALSE);
gst_base_src_set_live(GST_BASE_SRC(src), FALSE);
}
static void gst_gs_src_finalize(GObject* object) {
GstGsSrc* src = GST_GS_SRC(object);
g_free(src->uri);
src->uri = NULL;
g_free(src->service_account_email);
src->service_account_email = NULL;
src->read_position = 0;
src->object_size = 0;
G_OBJECT_CLASS(parent_class)->finalize(object);
}
static gboolean gst_gs_src_set_location(GstGsSrc* src,
const gchar* location,
GError** err) {
GstState state = GST_STATE_NULL;
std::string filepath = location;
size_t delimiter = std::string::npos;
// The element must be stopped in order to do this.
GST_OBJECT_LOCK(src);
state = GST_STATE(src);
if (state != GST_STATE_READY && state != GST_STATE_NULL)
goto wrong_state;
GST_OBJECT_UNLOCK(src);
g_free(src->uri);
src->uri = NULL;
if (location) {
if (g_str_has_prefix(location, "gs://")) {
src->uri = g_strdup(location);
filepath = filepath.substr(5);
} else {
src->uri = g_strdup_printf("gs://%s", location);
filepath = location;
}
delimiter = filepath.find_first_of('/');
if (delimiter == std::string::npos)
goto wrong_location;
std::string bucket_name = filepath.substr(0, delimiter);
src->bucket_name = bucket_name;
src->object_name = filepath.substr(delimiter + 1);
GST_INFO_OBJECT(src, "uri is %s", src->uri);
GST_INFO_OBJECT(src, "bucket name is %s", src->bucket_name.c_str());
GST_INFO_OBJECT(src, "object name is %s", src->object_name.c_str());
}
g_object_notify(G_OBJECT(src), "location");
return TRUE;
// ERROR.
wrong_state : {
g_warning(
"Changing the `location' property on gssrc when a file is open"
"is not supported.");
if (err)
g_set_error(
err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
"Changing the `location' property on gssrc when a file is open is "
"not supported.");
GST_OBJECT_UNLOCK(src);
return FALSE;
}
wrong_location : {
if (err)
g_set_error(err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
"Failed to find a bucket name");
GST_OBJECT_UNLOCK(src);
return FALSE;
}
}
static gboolean gst_gs_src_set_service_account_email(
GstGsSrc* src,
const gchar* service_account_email) {
if (GST_STATE(src) == GST_STATE_PLAYING ||
GST_STATE(src) == GST_STATE_PAUSED) {
GST_WARNING_OBJECT(src,
"Setting a new service account email not supported in "
"PLAYING or PAUSED state");
return FALSE;
}
GST_OBJECT_LOCK(src);
g_free(src->service_account_email);
src->service_account_email = NULL;
if (service_account_email)
src->service_account_email = g_strdup(service_account_email);
GST_OBJECT_UNLOCK(src);
return TRUE;
}
static void gst_gs_src_set_property(GObject* object,
guint prop_id,
const GValue* value,
GParamSpec* pspec) {
GstGsSrc* src = GST_GS_SRC(object);
g_return_if_fail(GST_IS_GS_SRC(object));
switch (prop_id) {
case PROP_LOCATION:
gst_gs_src_set_location(src, g_value_get_string(value), NULL);
break;
case PROP_SERVICE_ACCOUNT_EMAIL:
gst_gs_src_set_service_account_email(src, g_value_get_string(value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static void gst_gs_src_get_property(GObject* object,
guint prop_id,
GValue* value,
GParamSpec* pspec) {
GstGsSrc* src = GST_GS_SRC(object);
g_return_if_fail(GST_IS_GS_SRC(object));
switch (prop_id) {
case PROP_LOCATION:
GST_OBJECT_LOCK(src);
g_value_set_string(value, src->uri);
GST_OBJECT_UNLOCK(src);
break;
case PROP_SERVICE_ACCOUNT_EMAIL:
GST_OBJECT_LOCK(src);
g_value_set_string(value, src->service_account_email);
GST_OBJECT_UNLOCK(src);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}
static gint gst_gs_read_stream(GstGsSrc* src,
guint8* data,
const guint64 offset,
const guint length) {
gint gcount = 0;
gchar* sdata = reinterpret_cast<gchar*>(data);
gcs::ObjectReadStream& stream = src->gcs_stream->stream();
while (!stream.eof()) {
stream.read(sdata, length);
if (stream.status().ok())
break;
GST_ERROR_OBJECT(src, "Restart after (%s)",
stream.status().message().c_str());
src->gcs_stream = std::make_unique<GSReadStream>(src, offset);
}
gcount = stream.gcount();
GST_INFO_OBJECT(src, "Client read %d bytes", gcount);
return gcount;
}
static GstFlowReturn gst_gs_src_fill(GstBaseSrc* basesrc,
guint64 offset,
guint length,
GstBuffer* buf) {
GstGsSrc* src = GST_GS_SRC(basesrc);
guint to_read = 0;
guint bytes_read = 0;
gint ret = 0;
GstMapInfo info = {};
guint8* data = NULL;
if (G_UNLIKELY(offset != (guint64)-1 && src->read_position != offset)) {
src->gcs_stream = std::make_unique<GSReadStream>(src, offset);
src->read_position = offset;
}
if (!gst_buffer_map(buf, &info, GST_MAP_WRITE))
goto buffer_write_fail;
data = info.data;
bytes_read = 0;
to_read = length;
while (to_read > 0) {
GST_INFO_OBJECT(src, "Reading %d bytes at offset 0x%" G_GINT64_MODIFIER "x",
to_read, offset + bytes_read);
ret = gst_gs_read_stream(src, data + bytes_read, offset, to_read);
if (G_UNLIKELY(ret < 0))
goto could_not_read;
if (G_UNLIKELY(ret == 0)) {
// Push any remaining data.
if (bytes_read > 0)
break;
goto eos;
}
to_read -= ret;
bytes_read += ret;
src->read_position += ret;
}
GST_INFO_OBJECT(
src, "Read %" G_GUINT32_FORMAT " bytes of %" G_GUINT32_FORMAT " length",
bytes_read, length);
gst_buffer_unmap(buf, &info);
if (bytes_read != length)
gst_buffer_resize(buf, 0, bytes_read);
GST_BUFFER_OFFSET(buf) = offset;
GST_BUFFER_OFFSET_END(buf) = offset + bytes_read;
return GST_FLOW_OK;
// ERROR.
could_not_read : {
GST_ELEMENT_ERROR(src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
gst_buffer_unmap(buf, &info);
gst_buffer_resize(buf, 0, 0);
return GST_FLOW_ERROR;
}
eos : {
GST_INFO_OBJECT(src, "EOS");
gst_buffer_unmap(buf, &info);
gst_buffer_resize(buf, 0, 0);
return GST_FLOW_EOS;
}
buffer_write_fail : {
GST_ELEMENT_ERROR(src, RESOURCE, WRITE, (NULL), ("Can't write to buffer"));
return GST_FLOW_ERROR;
}
}
static gboolean gst_gs_src_is_seekable(GstBaseSrc* basesrc) {
return TRUE;
}
static gboolean gst_gs_src_get_size(GstBaseSrc* basesrc, guint64* size) {
GstGsSrc* src = GST_GS_SRC(basesrc);
*size = src->object_size;
return TRUE;
}
static gboolean gst_gs_src_start(GstBaseSrc* basesrc) {
GstGsSrc* src = GST_GS_SRC(basesrc);
GError* err = NULL;
guint blocksize = 0;
src->read_position = 0;
src->object_size = 0;
if (src->uri == NULL || src->uri[0] == '\0') {
GST_ELEMENT_ERROR(src, RESOURCE, NOT_FOUND,
("No uri specified for reading."), (NULL));
return FALSE;
}
GST_INFO_OBJECT(src, "Opening file %s", src->uri);
src->gcs_client = gst_gs_create_client(src->service_account_email, &err);
if (err) {
GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ,
("Could not create client (%s)", err->message),
GST_ERROR_SYSTEM);
g_clear_error(&err);
return FALSE;
}
GST_INFO_OBJECT(src, "Parsed bucket name (%s) and object name (%s)",
src->bucket_name.c_str(), src->object_name.c_str());
google::cloud::StatusOr<gcs::ObjectMetadata> object_metadata =
src->gcs_client->GetObjectMetadata(src->bucket_name, src->object_name);
if (!object_metadata) {
GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ,
("Could not get object metadata (%s)",
object_metadata.status().message().c_str()),
GST_ERROR_SYSTEM);
return FALSE;
}
src->object_size = object_metadata->size();
GST_INFO_OBJECT(src, "Object size %" G_GUINT64_FORMAT "\n", src->object_size);
src->gcs_stream = std::make_unique<GSReadStream>(src);
blocksize = gcs::ClientOptions(nullptr).download_buffer_size();
GST_INFO_OBJECT(src, "Set blocksize to %" G_GUINT32_FORMAT, blocksize);
gst_base_src_set_blocksize(GST_BASE_SRC(src), blocksize);
return TRUE;
}
static gboolean gst_gs_src_stop(GstBaseSrc* basesrc) {
GstGsSrc* src = GST_GS_SRC(basesrc);
src->gcs_stream = nullptr;
src->read_position = 0;
src->object_size = 0;
return TRUE;
}
static gboolean gst_gs_src_query(GstBaseSrc* src, GstQuery* query) {
gboolean ret;
switch (GST_QUERY_TYPE(query)) {
case GST_QUERY_SCHEDULING: {
// A pushsrc can by default never operate in pull mode override
// if you want something different.
gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEQUENTIAL, 1, -1, 0);
gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH);
ret = TRUE;
break;
}
default:
ret = GST_BASE_SRC_CLASS(parent_class)->query(src, query);
break;
}
return ret;
}
static GstURIType gst_gs_src_uri_get_type(GType type) {
return GST_URI_SRC;
}
static const gchar* const* gst_gs_src_uri_get_protocols(GType type) {
static const gchar* protocols[] = {"gs", NULL};
return protocols;
}
static gchar* gst_gs_src_uri_get_uri(GstURIHandler* handler) {
GstGsSrc* src = GST_GS_SRC(handler);
return g_strdup(src->uri);
}
static gboolean gst_gs_src_uri_set_uri(GstURIHandler* handler,
const gchar* uri,
GError** err) {
GstGsSrc* src = GST_GS_SRC(handler);
if (strcmp(uri, "gs://") == 0) {
// Special case for "gs://" as this is used by some applications
// to test with gst_element_make_from_uri if there's an element
// that supports the URI protocol.
gst_gs_src_set_location(src, NULL, NULL);
return TRUE;
}
return gst_gs_src_set_location(src, uri, err);
}
static void gst_gs_src_uri_handler_init(gpointer g_iface, gpointer iface_data) {
GstURIHandlerInterface* iface = (GstURIHandlerInterface*)g_iface;
iface->get_type = gst_gs_src_uri_get_type;
iface->get_protocols = gst_gs_src_uri_get_protocols;
iface->get_uri = gst_gs_src_uri_get_uri;
iface->set_uri = gst_gs_src_uri_set_uri;
}

34
ext/gs/gstgssrc.h Normal file
View file

@ -0,0 +1,34 @@
/* GStreamer
* Copyright (C) 2020 Julien Isorce <jisorce@oblong.com>
*
* gstgssrc.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_GS_SRC_H__
#define __GST_GS_SRC_H__
#include <gst/base/gstbasesrc.h>
#include <gst/gst.h>
G_BEGIN_DECLS
#define GST_TYPE_GS_SRC (gst_gs_src_get_type())
G_DECLARE_FINAL_TYPE(GstGsSrc, gst_gs_src, GST, GS_SRC, GstBaseSrc)
G_END_DECLS
#endif // __GST_GS_SRC_H__

28
ext/gs/meson.build Normal file
View file

@ -0,0 +1,28 @@
gs_sources = [
'gstgscommon.cpp',
'gstgssink.cpp',
'gstgssrc.cpp',
'gstgs.cpp',
]
gs_dep = dependency('storage_client', version : '>= 1.25.0', required : get_option('gs'))
if gs_dep.found()
gstgs = library('gstgs',
gs_sources,
c_args : gst_plugins_bad_args,
cpp_args : gst_plugins_bad_args,
include_directories : [configinc, libsinc],
dependencies : [gstbase_dep, gs_dep],
install : true,
install_dir : plugins_install_dir,
)
pkgconfig.generate(gstgs, install_dir : plugins_pkgconfig_install_dir)
plugins += [gstgs]
endif
clang_format_p = find_program('clang-format', required: false)
if clang_format_p.found()
run_command(clang_format_p, '--style=file', '-i', 'gstgssink.cpp', 'gstgssrc.cpp')
endif

View file

@ -18,6 +18,7 @@ subdir('fdkaac')
subdir('flite')
subdir('fluidsynth')
subdir('gme')
subdir('gs')
subdir('gsm')
subdir('hls')
subdir('iqa')

View file

@ -107,6 +107,7 @@ option('flite', type : 'feature', value : 'auto', description : 'Flite speech sy
option('fluidsynth', type : 'feature', value : 'auto', description : 'Fluidsynth MIDI decoder plugin')
option('gl', type : 'feature', value : 'auto', description : 'GStreamer OpenGL integration support (used by various plugins)')
option('gme', type : 'feature', value : 'auto', description : 'libgme gaming console music file decoder plugin')
option('gs', type : 'feature', value : 'auto', description : 'Google Cloud Storage source and sink plugin')
option('gsm', type : 'feature', value : 'auto', description : 'GSM encoder/decoder plugin')
option('ipcpipeline', type : 'feature', value : 'auto', description : 'Inter-process communication plugin')
option('iqa', type : 'feature', value : 'auto', description : 'Image quality assessment plugin')