gssink: add metadata property

This property can be used to set metadata on the storage object.

Similar API has been added to the S3 sink already, see
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/613

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1377>
This commit is contained in:
Guillaume Desmottes 2021-11-23 09:28:57 +01:00
parent 578fddbf58
commit d67a63a298

View file

@ -111,6 +111,7 @@ enum {
PROP_SERVICE_ACCOUNT_EMAIL,
PROP_START_DATE,
PROP_SERVICE_ACCOUNT_CREDENTIALS,
PROP_METADATA,
};
class GSWriteStream;
@ -132,6 +133,7 @@ struct _GstGsSink {
const gchar* content_type;
size_t nb_percent_format;
gboolean percent_s_is_first;
GstStructure* metadata;
};
static void gst_gs_sink_finalize(GObject* object);
@ -177,12 +179,10 @@ class GSWriteStream {
GSWriteStream(google::cloud::storage::Client& client,
const char* bucket_name,
const char* object_name,
const char* content_type)
: gcs_stream_(client.WriteObject(
bucket_name,
object_name,
gcs::WithObjectMetadata(
gcs::ObjectMetadata().set_content_type(content_type)))) {}
gcs::ObjectMetadata metadata)
: gcs_stream_(client.WriteObject(bucket_name,
object_name,
gcs::WithObjectMetadata(metadata))) {}
~GSWriteStream() { gcs_stream_.Close(); }
gcs::ObjectWriteStream& stream() { return gcs_stream_; }
@ -315,6 +315,24 @@ static void gst_gs_sink_class_init(GstGsSinkClass* klass) {
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
/**
* GstGsSink:metadata:
*
* A map of metadata to store with the object; field values need to be
* convertible to strings.
*
* Since: 1.20
*/
g_object_class_install_property(
gobject_class, PROP_METADATA,
g_param_spec_boxed(
"metadata", "Metadata",
"A map of metadata to store with the object; field values need to be "
"convertible to strings.",
GST_TYPE_STRUCTURE,
(GParamFlags)(G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY)));
gobject_class->finalize = gst_gs_sink_finalize;
gstbasesink_class->start = GST_DEBUG_FUNCPTR(gst_gs_sink_start);
@ -373,6 +391,7 @@ static void gst_gs_sink_finalize(GObject* object) {
sink->start_date = NULL;
}
sink->content_type = NULL;
g_clear_pointer(&sink->metadata, gst_structure_free);
G_OBJECT_CLASS(parent_class)->finalize(object);
}
@ -466,6 +485,10 @@ static void gst_gs_sink_set_property(GObject* object,
sink->start_date_str = NULL;
}
break;
case PROP_METADATA:
g_clear_pointer(&sink->metadata, gst_structure_free);
sink->metadata = (GstStructure*)g_value_dup_boxed(value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
@ -503,6 +526,9 @@ static void gst_gs_sink_get_property(GObject* object,
case PROP_START_DATE:
g_value_set_string(value, sink->start_date_str);
break;
case PROP_METADATA:
g_value_set_boxed(value, sink->metadata);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
@ -632,6 +658,34 @@ static void gst_gs_sink_post_message(GstGsSink* sink,
running_time, stream_time, filename, date);
}
struct AddMetadataIter {
GstGsSink* sink;
gcs::ObjectMetadata* metadata;
};
static gboolean add_metadata_foreach(GQuark field_id,
const GValue* value,
gpointer user_data) {
struct AddMetadataIter* it = (struct AddMetadataIter*)user_data;
GValue svalue = G_VALUE_INIT;
g_value_init(&svalue, G_TYPE_STRING);
if (g_value_transform(value, &svalue)) {
const gchar* key = g_quark_to_string(field_id);
const gchar* value = g_value_get_string(&svalue);
GST_LOG_OBJECT(it->sink, "metadata '%s' -> '%s'", key, value);
it->metadata->upsert_metadata(key, value);
} else {
GST_WARNING_OBJECT(it->sink, "Failed to convert metadata '%s' to string",
g_quark_to_string(field_id));
}
g_value_unset(&svalue);
return TRUE;
}
static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
GstBuffer* buffer) {
GstMapInfo map = {0};
@ -641,6 +695,15 @@ static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
if (!gst_buffer_map(buffer, &map, GST_MAP_READ))
return GST_FLOW_ERROR;
gcs::ObjectMetadata metadata =
gcs::ObjectMetadata().set_content_type(sink->content_type);
if (sink->metadata) {
struct AddMetadataIter it = {sink, &metadata};
gst_structure_foreach(sink->metadata, add_metadata_foreach, &it);
}
switch (sink->next_file) {
case GST_GS_SINK_NEXT_BUFFER: {
// Get buffer date if needed.
@ -677,9 +740,8 @@ static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
GST_INFO_OBJECT(sink, "Writing %" G_GSIZE_FORMAT " bytes", map.size);
gcs::ObjectWriteStream gcs_stream = sink->gcs_client->WriteObject(
sink->bucket_name, object_name,
gcs::WithObjectMetadata(
gcs::ObjectMetadata().set_content_type(sink->content_type)));
sink->bucket_name, object_name, gcs::WithObjectMetadata(metadata));
gcs_stream.write(reinterpret_cast<const char*>(map.data), map.size);
if (gcs_stream.fail()) {
GST_WARNING_OBJECT(sink, "Failed to write to %s", object_name);
@ -712,7 +774,7 @@ static GstFlowReturn gst_gs_sink_write_buffer(GstGsSink* sink,
GST_INFO_OBJECT(sink, "Opening %s", sink->object_name);
sink->gcs_stream = std::make_unique<GSWriteStream>(
*sink->gcs_client.get(), sink->bucket_name, sink->object_name,
sink->content_type);
metadata);
if (!sink->gcs_stream->stream().IsOpen()) {
GST_ELEMENT_ERROR(