srt: Add support for streamheaders to sinks

https://bugzilla.gnome.org/show_bug.cgi?id=793503
This commit is contained in:
Jan Alexander Steffens (heftig) 2018-02-16 09:17:40 +01:00 committed by Olivier Crête
parent 8a5dab1c06
commit 37a9e0fff9
4 changed files with 128 additions and 0 deletions

View file

@ -123,15 +123,70 @@ gst_srt_base_sink_finalize (GObject * object)
{
GstSRTBaseSink *self = GST_SRT_BASE_SINK (object);
g_clear_pointer (&self->headers, gst_buffer_list_unref);
g_clear_pointer (&self->uri, gst_uri_unref);
g_clear_pointer (&self->passphrase, g_free);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
static gboolean
gst_srt_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
{
GstSRTBaseSink *self = GST_SRT_BASE_SINK (sink);
GstStructure *s;
const GValue *streamheader;
GST_DEBUG_OBJECT (self, "setcaps %" GST_PTR_FORMAT, caps);
g_clear_pointer (&self->headers, gst_buffer_list_unref);
s = gst_caps_get_structure (caps, 0);
streamheader = gst_structure_get_value (s, "streamheader");
if (!streamheader) {
GST_DEBUG_OBJECT (self, "'streamheader' field not present");
} else if (GST_VALUE_HOLDS_BUFFER (streamheader)) {
GST_DEBUG_OBJECT (self, "'streamheader' field holds buffer");
self->headers = gst_buffer_list_new_sized (1);
gst_buffer_list_add (self->headers, g_value_dup_boxed (streamheader));
} else if (GST_VALUE_HOLDS_ARRAY (streamheader)) {
guint i, size;
GST_DEBUG_OBJECT (self, "'streamheader' field holds array");
size = gst_value_array_get_size (streamheader);
self->headers = gst_buffer_list_new_sized (size);
for (i = 0; i < size; i++) {
const GValue *v = gst_value_array_get_value (streamheader, i);
if (!GST_VALUE_HOLDS_BUFFER (v)) {
GST_ERROR_OBJECT (self, "'streamheader' item of unexpected type '%s'",
G_VALUE_TYPE_NAME (v));
return FALSE;
}
gst_buffer_list_add (self->headers, g_value_dup_boxed (v));
}
} else {
GST_ERROR_OBJECT (self, "'streamheader' field has unexpected type '%s'",
G_VALUE_TYPE_NAME (streamheader));
return FALSE;
}
GST_DEBUG_OBJECT (self, "Collected streamheaders: %u buffers",
self->headers ? gst_buffer_list_length (self->headers) : 0);
return TRUE;
}
static gboolean
gst_srt_base_sink_stop (GstBaseSink * sink)
{
GstSRTBaseSink *self = GST_SRT_BASE_SINK (sink);
g_clear_pointer (&self->headers, gst_buffer_list_unref);
return TRUE;
}
@ -143,6 +198,12 @@ gst_srt_base_sink_render (GstBaseSink * sink, GstBuffer * buffer)
GstSRTBaseSinkClass *bclass = GST_SRT_BASE_SINK_GET_CLASS (sink);
GstFlowReturn ret = GST_FLOW_OK;
if (self->headers && GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
GST_DEBUG_OBJECT (self, "Have streamheaders,"
" ignoring header %" GST_PTR_FORMAT, buffer);
return GST_FLOW_OK;
}
GST_TRACE_OBJECT (self, "sending buffer %p, offset %"
G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
@ -203,6 +264,7 @@ gst_srt_base_sink_class_init (GstSRTBaseSinkClass * klass)
g_object_class_install_properties (gobject_class, PROP_LAST, properties);
gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_srt_base_sink_set_caps);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_srt_base_sink_stop);
gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_srt_base_sink_render);
}
@ -283,6 +345,46 @@ gst_srt_base_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
iface->set_uri = gst_srt_base_sink_uri_set_uri;
}
gboolean
gst_srt_base_sink_send_headers (GstSRTBaseSink * self,
GstSRTBaseSinkSendCallback send_cb, gpointer user_data)
{
guint size, i;
g_return_val_if_fail (GST_IS_SRT_BASE_SINK (self), FALSE);
g_return_val_if_fail (send_cb, FALSE);
if (!self->headers)
return TRUE;
size = gst_buffer_list_length (self->headers);
GST_DEBUG_OBJECT (self, "Sending %u stream headers", size);
for (i = 0; i < size; i++) {
GstBuffer *buffer = gst_buffer_list_get (self->headers, i);
GstMapInfo info;
gboolean ret;
GST_TRACE_OBJECT (self, "sending header %u %" GST_PTR_FORMAT, i, buffer);
if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
GST_ELEMENT_ERROR (self, RESOURCE, READ,
("Could not map the input stream"), (NULL));
return FALSE;
}
ret = send_cb (self, &info, user_data);
gst_buffer_unmap (buffer, &info);
if (!ret)
return FALSE;
}
return TRUE;
}
GstStructure *
gst_srt_base_sink_get_stats (GSocketAddress * sockaddr, SRTSOCKET sock)
{

View file

@ -45,6 +45,7 @@ struct _GstSRTBaseSink {
GstBaseSink parent;
GstUri *uri;
GstBufferList *headers;
gint latency;
gchar *passphrase;
gint key_length;
@ -65,6 +66,12 @@ struct _GstSRTBaseSinkClass {
GST_EXPORT
GType gst_srt_base_sink_get_type (void);
typedef gboolean (*GstSRTBaseSinkSendCallback) (GstSRTBaseSink *sink,
const GstMapInfo *mapinfo, gpointer user_data);
gboolean gst_srt_base_sink_send_headers (GstSRTBaseSink *sink,
GstSRTBaseSinkSendCallback send_cb, gpointer user_data);
GstStructure * gst_srt_base_sink_get_stats (GSocketAddress *sockaddr,
SRTSOCKET sock);

View file

@ -68,6 +68,8 @@ struct _GstSRTClientSinkPrivate
gboolean rendez_vous;
gchar *bind_address;
guint16 bind_port;
gboolean sent_headers;
};
#define GST_SRT_CLIENT_SINK_GET_PRIVATE(obj) \
@ -190,6 +192,14 @@ gst_srt_client_sink_send_buffer (GstSRTBaseSink * sink,
GstSRTClientSink *self = GST_SRT_CLIENT_SINK (sink);
GstSRTClientSinkPrivate *priv = GST_SRT_CLIENT_SINK_GET_PRIVATE (self);
if (!priv->sent_headers) {
if (!gst_srt_base_sink_send_headers (sink, send_buffer_internal,
GINT_TO_POINTER (priv->sock)))
return FALSE;
priv->sent_headers = TRUE;
}
return send_buffer_internal (sink, mapinfo, GINT_TO_POINTER (priv->sock));
}
@ -214,6 +224,8 @@ gst_srt_client_sink_stop (GstBaseSink * sink)
g_clear_object (&priv->sockaddr);
priv->sent_headers = FALSE;
return GST_BASE_SINK_CLASS (parent_class)->stop (sink);
}

View file

@ -103,6 +103,7 @@ typedef struct
{
int sock;
GSocketAddress *sockaddr;
gboolean sent_headers;
} SRTClient;
static SRTClient *
@ -418,6 +419,12 @@ gst_srt_server_sink_send_buffer (GstSRTBaseSink * sink,
SRTClient *client = clients->data;
clients = clients->next;
if (!client->sent_headers) {
if (!gst_srt_base_sink_send_headers (sink, send_buffer_internal, client))
goto err;
client->sent_headers = TRUE;
}
if (!send_buffer_internal (sink, mapinfo, client))
goto err;