queue: Add support for buffer lists

This commit is contained in:
Sebastian Dröge 2015-02-17 11:44:40 +02:00
parent 927666642e
commit 6369ba06ff

View file

@ -195,6 +195,8 @@ static void gst_queue_get_property (GObject * object,
static GstFlowReturn gst_queue_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer);
static GstFlowReturn gst_queue_chain_list (GstPad * pad, GstObject * parent,
GstBufferList * buffer_list);
static GstFlowReturn gst_queue_push_one (GstQueue * queue);
static void gst_queue_loop (GstPad * pad);
@ -417,6 +419,7 @@ gst_queue_class_init (GstQueueClass * klass)
GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event);
GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query);
GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain);
GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain_list);
}
static void
@ -425,6 +428,7 @@ gst_queue_init (GstQueue * queue)
queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
gst_pad_set_chain_list_function (queue->sinkpad, gst_queue_chain_list);
gst_pad_set_activatemode_function (queue->sinkpad,
gst_queue_sink_activate_mode);
gst_pad_set_event_function (queue->sinkpad, gst_queue_handle_sink_event);
@ -622,6 +626,60 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
update_time_level (queue);
}
typedef struct
{
GstClockTime timestamp;
gboolean with_duration;
} BufferListApplyTimeData;
static gboolean
buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
{
BufferListApplyTimeData *data = user_data;
GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT
" duration %" GST_TIME_FORMAT, idx,
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
data->timestamp = GST_BUFFER_TIMESTAMP (*buf);
if (data->with_duration && GST_BUFFER_DURATION_IS_VALID (*buf))
data->timestamp += GST_BUFFER_DURATION (*buf);
GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (data->timestamp));
return TRUE;
}
/* take a buffer list and update segment, updating the time level of the queue */
static void
apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list,
GstSegment * segment, gboolean with_duration, gboolean sink)
{
BufferListApplyTimeData data;
/* if no timestamp is set, assume it's continuous with the previous time */
data.timestamp = segment->position;
data.with_duration = with_duration;
gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &data);
GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
GST_TIME_ARGS (data.timestamp));
segment->position = data.timestamp;
if (sink)
queue->sink_tainted = TRUE;
else
queue->src_tainted = TRUE;
/* calc diff with other end */
update_time_level (queue);
}
static void
gst_queue_locked_flush (GstQueue * queue, gboolean full)
{
@ -678,6 +736,40 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
GST_QUEUE_SIGNAL_ADD (queue);
}
static gboolean
buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
{
guint *p_size = data;
gsize buf_size;
buf_size = gst_buffer_get_size (*buf);
GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
*p_size += buf_size;
return TRUE;
}
static inline void
gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
{
GstQueueItem *qitem;
GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
gsize bsize = 0;
gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &bsize);
/* add buffer to the statistics */
queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
queue->cur_level.bytes += bsize;
apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE, TRUE);
qitem = g_slice_new (GstQueueItem);
qitem->item = item;
qitem->is_query = FALSE;
qitem->size = bsize;
gst_queue_array_push_tail (queue->queue, qitem);
GST_QUEUE_SIGNAL_ADD (queue);
}
static inline void
gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
{
@ -751,7 +843,19 @@ gst_queue_locked_dequeue (GstQueue * queue)
/* if the queue is empty now, update the other side */
if (queue->cur_level.buffers == 0)
queue->cur_level.time = 0;
} else if (GST_IS_BUFFER_LIST (item)) {
GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved buffer list %p from queue", buffer_list);
queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
queue->cur_level.bytes -= bufsize;
apply_buffer_list (queue, buffer_list, &queue->src_segment, TRUE, FALSE);
/* if the queue is empty now, update the other side */
if (queue->cur_level.buffers == 0)
queue->cur_level.time = 0;
} else if (GST_IS_EVENT (item)) {
GstEvent *event = GST_EVENT_CAST (item);
@ -1014,11 +1118,27 @@ gst_queue_leak_downstream (GstQueue * queue)
}
}
static gboolean
discont_first_buffer (GstBuffer ** buffer, guint i, gpointer user_data)
{
GstQueue *queue = user_data;
GstBuffer *subbuffer = gst_buffer_make_writable (*buffer);
if (subbuffer) {
*buffer = subbuffer;
GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
} else {
GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
}
return FALSE;
}
static GstFlowReturn
gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
GstMiniObject * obj, gboolean is_list)
{
GstQueue *queue;
GstClockTime duration, timestamp;
queue = GST_QUEUE_CAST (parent);
@ -1030,13 +1150,22 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
if (queue->unexpected)
goto out_unexpected;
timestamp = GST_BUFFER_TIMESTAMP (buffer);
duration = GST_BUFFER_DURATION (buffer);
if (!is_list) {
GstClockTime duration, timestamp;
GstBuffer *buffer = GST_BUFFER_CAST (obj);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
timestamp = GST_BUFFER_TIMESTAMP (buffer);
duration = GST_BUFFER_DURATION (buffer);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
} else {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"received buffer list %p with %u buffers", obj,
gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj)));
}
/* We make space available if we're "full" according to whatever
* the user defined as "full". Note that this only applies to buffers.
@ -1091,19 +1220,33 @@ gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
}
if (queue->tail_needs_discont) {
GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
if (!is_list) {
GstBuffer *buffer = GST_BUFFER_CAST (obj);
GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
if (subbuffer) {
buffer = subbuffer;
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
if (subbuffer) {
buffer = subbuffer;
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
} else {
GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
}
obj = GST_MINI_OBJECT_CAST (buffer);
} else {
GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (obj);
buffer_list = gst_buffer_list_make_writable (buffer_list);
gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
obj = GST_MINI_OBJECT_CAST (buffer_list);
}
queue->tail_needs_discont = FALSE;
}
/* put buffer in queue now */
gst_queue_locked_enqueue_buffer (queue, buffer);
if (is_list)
gst_queue_locked_enqueue_buffer_list (queue, obj);
else
gst_queue_locked_enqueue_buffer (queue, obj);
GST_QUEUE_MUTEX_UNLOCK (queue);
return GST_FLOW_OK;
@ -1113,7 +1256,7 @@ out_unref:
{
GST_QUEUE_MUTEX_UNLOCK (queue);
gst_buffer_unref (buffer);
gst_mini_object_unref (obj);
return GST_FLOW_OK;
}
@ -1124,7 +1267,7 @@ out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"exit because task paused, reason: %s", gst_flow_get_name (ret));
GST_QUEUE_MUTEX_UNLOCK (queue);
gst_buffer_unref (buffer);
gst_mini_object_unref (obj);
return ret;
}
@ -1133,7 +1276,7 @@ out_eos:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
GST_QUEUE_MUTEX_UNLOCK (queue);
gst_buffer_unref (buffer);
gst_mini_object_unref (obj);
return GST_FLOW_EOS;
}
@ -1142,12 +1285,27 @@ out_unexpected:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
GST_QUEUE_MUTEX_UNLOCK (queue);
gst_buffer_unref (buffer);
gst_mini_object_unref (obj);
return GST_FLOW_EOS;
}
}
static GstFlowReturn
gst_queue_chain_list (GstPad * pad, GstObject * parent,
GstBufferList * buffer_list)
{
return gst_queue_chain_buffer_or_list (pad, parent,
GST_MINI_OBJECT_CAST (buffer_list), TRUE);
}
static GstFlowReturn
gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
return gst_queue_chain_buffer_or_list (pad, parent,
GST_MINI_OBJECT_CAST (buffer), FALSE);
}
/* dequeue an item from the queue an push it downstream. This functions returns
* the result of the push. */
static GstFlowReturn
@ -1155,31 +1313,49 @@ gst_queue_push_one (GstQueue * queue)
{
GstFlowReturn result = queue->srcresult;
GstMiniObject *data;
gboolean is_list;
data = gst_queue_locked_dequeue (queue);
if (data == NULL)
goto no_item;
next:
if (GST_IS_BUFFER (data)) {
GstBuffer *buffer;
is_list = GST_IS_BUFFER_LIST (data);
buffer = GST_BUFFER_CAST (data);
if (GST_IS_BUFFER (data) || is_list) {
if (!is_list) {
GstBuffer *buffer;
if (queue->head_needs_discont) {
GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
buffer = GST_BUFFER_CAST (data);
if (subbuffer) {
buffer = subbuffer;
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
} else {
GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
if (queue->head_needs_discont) {
GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
if (subbuffer) {
buffer = subbuffer;
GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
} else {
GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
}
queue->head_needs_discont = FALSE;
}
queue->head_needs_discont = FALSE;
}
GST_QUEUE_MUTEX_UNLOCK (queue);
result = gst_pad_push (queue->srcpad, buffer);
GST_QUEUE_MUTEX_UNLOCK (queue);
result = gst_pad_push (queue->srcpad, buffer);
} else {
GstBufferList *buffer_list;
buffer_list = GST_BUFFER_LIST_CAST (data);
if (queue->head_needs_discont) {
buffer_list = gst_buffer_list_make_writable (buffer_list);
gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
queue->head_needs_discont = FALSE;
}
GST_QUEUE_MUTEX_UNLOCK (queue);
result = gst_pad_push_list (queue->srcpad, buffer_list);
}
/* need to check for srcresult here as well */
GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
@ -1195,6 +1371,10 @@ next:
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping EOS buffer %p", data);
gst_buffer_unref (GST_BUFFER_CAST (data));
} else if (GST_IS_BUFFER_LIST (data)) {
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"dropping EOS buffer list %p", data);
gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
} else if (GST_IS_EVENT (data)) {
GstEvent *event = GST_EVENT_CAST (data);
GstEventType type = GST_EVENT_TYPE (event);