dtlsdec: Add support for buffer lists

This commit is contained in:
Sebastian Dröge 2015-03-18 09:57:32 +01:00
parent 2082476efb
commit 989c2ef67b

View file

@ -99,6 +99,8 @@ static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher,
static gboolean on_peer_certificate_received (GstDtlsConnection *, gchar * pem,
GstDtlsDec *);
static GstFlowReturn sink_chain (GstPad *, GstObject * parent, GstBuffer *);
static GstFlowReturn sink_chain_list (GstPad *, GstObject * parent,
GstBufferList *);
static GstDtlsAgent *get_agent_by_pem (const gchar * pem);
static void agent_weak_ref_notify (gchar * pem, GstDtlsAgent *);
@ -202,6 +204,7 @@ gst_dtls_dec_init (GstDtlsDec * self)
g_return_if_fail (sink);
gst_pad_set_chain_function (sink, GST_DEBUG_FUNCPTR (sink_chain));
gst_pad_set_chain_list_function (sink, GST_DEBUG_FUNCPTR (sink_chain_list));
gst_element_add_pad (GST_ELEMENT (self), sink);
}
@ -440,12 +443,85 @@ on_peer_certificate_received (GstDtlsConnection * connection, gchar * pem,
return TRUE;
}
static gint
process_buffer (GstDtlsDec * self, GstBuffer * buffer)
{
GstMapInfo map_info;
gint size;
if (!gst_buffer_map (buffer, &map_info, GST_MAP_READWRITE))
return 0;
if (!map_info.size) {
gst_buffer_unmap (buffer, &map_info);
return 0;
}
size =
gst_dtls_connection_process (self->connection, map_info.data,
map_info.size);
gst_buffer_unmap (buffer, &map_info);
if (size <= 0)
return size;
gst_buffer_set_size (buffer, size);
return size;
}
static gboolean
process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
{
GstDtlsDec *self = GST_DTLS_DEC (user_data);
gint size;
*buffer = gst_buffer_make_writable (*buffer);
size = process_buffer (self, *buffer);
if (size <= 0)
gst_buffer_replace (buffer, NULL);
return TRUE;
}
static GstFlowReturn
sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
{
GstDtlsDec *self = GST_DTLS_DEC (parent);
GstFlowReturn ret = GST_FLOW_OK;
list = gst_buffer_list_make_writable (list);
gst_buffer_list_foreach (list, process_buffer_from_list, self);
if (gst_buffer_list_length (list) == 0) {
GST_DEBUG_OBJECT (self, "Not produced any buffers");
gst_buffer_list_unref (list);
return GST_FLOW_OK;
}
g_mutex_lock (&self->src_mutex);
if (self->src) {
GST_LOG_OBJECT (self, "decoded buffer list with length %u, pushing",
gst_buffer_list_length (list));
ret = gst_pad_push_list (self->src, list);
} else {
GST_LOG_OBJECT (self, "dropped buffer list with length %d, not linked",
gst_buffer_list_length (list));
gst_buffer_list_unref (list);
}
g_mutex_unlock (&self->src_mutex);
return ret;
}
static GstFlowReturn
sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstDtlsDec *self = GST_DTLS_DEC (parent);
GstFlowReturn ret = GST_FLOW_OK;
GstMapInfo map_info = GST_MAP_INFO_INIT;
gint size;
if (!self->agent) {
@ -456,17 +532,8 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GST_DEBUG_OBJECT (self, "received buffer from %s with length %zd",
self->connection_id, gst_buffer_get_size (buffer));
gst_buffer_map (buffer, &map_info, GST_MAP_READWRITE);
if (!map_info.size) {
gst_buffer_unmap (buffer, &map_info);
return GST_FLOW_OK;
}
size =
gst_dtls_connection_process (self->connection, map_info.data,
map_info.size);
gst_buffer_unmap (buffer, &map_info);
buffer = gst_buffer_make_writable (buffer);
size = process_buffer (self, buffer);
if (size <= 0) {
gst_buffer_unref (buffer);
@ -477,7 +544,6 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
g_mutex_lock (&self->src_mutex);
if (self->src) {
gst_buffer_set_size (buffer, size);
GST_LOG_OBJECT (self, "decoded buffer with length %d, pushing", size);
ret = gst_pad_push (self->src, buffer);
} else {