diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index ffb3b2c1b1..1ed4c1395a 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -225,6 +225,8 @@ static void gst_queue2_get_property (GObject * object, static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer); +static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * buffer_list); static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue); static void gst_queue2_loop (GstPad * pad); @@ -255,6 +257,14 @@ static gboolean gst_queue2_is_filled (GstQueue2 * queue); static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range); +typedef enum +{ + GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0, + GST_QUEUE2_ITEM_TYPE_BUFFER, + GST_QUEUE2_ITEM_TYPE_BUFFER_LIST, + GST_QUEUE2_ITEM_TYPE_EVENT +} GstQueue2ItemType; + /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */ static void @@ -379,6 +389,8 @@ gst_queue2_init (GstQueue2 * queue) gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue2_chain)); + gst_pad_set_chain_list_function (queue->sinkpad, + GST_DEBUG_FUNCPTR (gst_queue2_chain_list)); gst_pad_set_activatemode_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode)); gst_pad_set_event_function (queue->sinkpad, @@ -711,6 +723,52 @@ apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment, update_time_level (queue); } +static GstBufferListItem +buffer_list_apply_time (GstBuffer ** buf, guint group, guint idx, gpointer data) +{ + GstClockTime *timestamp = data; + + GST_TRACE ("buffer %u in group %u has ts %" GST_TIME_FORMAT + " duration %" GST_TIME_FORMAT, idx, group, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (*buf))); + + if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf)) + *timestamp = GST_BUFFER_TIMESTAMP (*buf); + + if (GST_BUFFER_DURATION_IS_VALID (*buf)) + *timestamp += GST_BUFFER_DURATION (*buf); + + GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp)); + return GST_BUFFER_LIST_CONTINUE; +} + +/* take a buffer list and update segment, updating the time level of the queue */ +static void +apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list, + GstSegment * segment, gboolean is_sink) +{ + GstClockTime timestamp; + + /* if no timestamp is set, assume it's continuous with the previous time */ + timestamp = segment->last_stop; + + gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, ×tamp); + + GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT, + GST_TIME_ARGS (timestamp)); + + gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp); + + if (is_sink) + queue->sink_tainted = TRUE; + else + queue->src_tainted = TRUE; + + /* calc diff with other end */ + update_time_level (queue); +} + static void update_buffering (GstQueue2 * queue) { @@ -1740,11 +1798,41 @@ handle_error: } } +static GstBufferListItem +buffer_list_create_write (GstBuffer ** buf, guint group, guint idx, gpointer q) +{ + GstQueue2 *queue = q; + + GST_TRACE_OBJECT (queue, "writing buffer %u in group %u of size %u bytes", + idx, group, GST_BUFFER_SIZE (*buf)); + + if (!gst_queue2_create_write (queue, *buf)) { + GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out"); + return GST_BUFFER_LIST_END; + } + + return GST_BUFFER_LIST_CONTINUE; +} + +static GstBufferListItem +buffer_list_calc_size (GstBuffer ** buf, guint group, guint idx, gpointer data) +{ + guint *p_size = data; + guint buf_size; + + buf_size = GST_BUFFER_SIZE (*buf); + GST_TRACE ("buffer %u in group %u has size %u", idx, group, buf_size); + *p_size += buf_size; + + return GST_BUFFER_LIST_CONTINUE; +} + /* enqueue an item an update the level stats */ static void -gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer) +gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, + GstQueue2ItemType item_type) { - if (isbuffer) { + if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) { GstBuffer *buffer; guint size; @@ -1767,7 +1855,32 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item, gboolean isbuffer) /* FIXME - check return value? */ gst_queue2_create_write (queue, buffer); } - } else if (GST_IS_EVENT (item)) { + } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) { + GstBufferList *buffer_list; + guint size = 0; + + buffer_list = GST_BUFFER_LIST_CAST (item); + + gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size); + GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size); + + /* add buffer to the statistics */ + if (QUEUE_IS_USING_QUEUE (queue)) { + queue->cur_level.buffers++; + queue->cur_level.bytes += size; + } + queue->bytes_in += size; + + /* apply new buffer to segment stats */ + apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE); + + /* update the byterate stats */ + update_in_rates (queue); + + if (!QUEUE_IS_USING_QUEUE (queue)) { + gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue); + } + } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) { GstEvent *event; event = GST_EVENT_CAST (item); @@ -1839,7 +1952,7 @@ unexpected_event: /* dequeue an item from the queue and update level stats */ static GstMiniObject * -gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer) +gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type) { GstMiniObject *item; @@ -1857,7 +1970,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer) buffer = GST_BUFFER_CAST (item); size = gst_buffer_get_size (buffer); - *is_buffer = TRUE; + *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER; GST_CAT_LOG_OBJECT (queue_dataflow, queue, "retrieved buffer %p from queue", buffer); @@ -1878,7 +1991,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer) } else if (GST_IS_EVENT (item)) { GstEvent *event = GST_EVENT_CAST (item); - *is_buffer = FALSE; + *item_type = GST_QUEUE2_ITEM_TYPE_EVENT; GST_CAT_LOG_OBJECT (queue_dataflow, queue, "retrieved event %p from queue", event); @@ -1894,11 +2007,36 @@ gst_queue2_locked_dequeue (GstQueue2 * queue, gboolean * is_buffer) default: break; } + } else if (GST_IS_BUFFER_LIST (item)) { + GstBufferList *buffer_list; + guint size = 0; + + buffer_list = GST_BUFFER_LIST_CAST (item); + gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &size); + *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST; + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "retrieved buffer list %p from queue", buffer_list); + + if (QUEUE_IS_USING_QUEUE (queue)) { + queue->cur_level.buffers--; + queue->cur_level.bytes -= size; + } + queue->bytes_out += size; + + apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE); + /* update the byterate stats */ + update_out_rates (queue); + /* update the buffering */ + if (queue->use_buffering) + update_buffering (queue); + } else { g_warning ("Unexpected item %p dequeued from queue %s (refcounting problem?)", item, GST_OBJECT_NAME (queue)); item = NULL; + *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN; } GST_QUEUE2_SIGNAL_DEL (queue); @@ -1990,7 +2128,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent, /* refuse more events on EOS */ if (queue->is_eos) goto out_eos; - gst_queue2_locked_enqueue (queue, event, FALSE); + gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT); GST_QUEUE2_MUTEX_UNLOCK (queue); } else { /* non-serialized events are passed upstream. */ @@ -2094,18 +2232,9 @@ gst_queue2_is_filled (GstQueue2 * queue) } static GstFlowReturn -gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue, + GstMiniObject * item, GstQueue2ItemType item_type) { - GstQueue2 *queue; - - queue = GST_QUEUE2 (parent); - - GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %" - G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %" - GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), - GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); - /* we have to lock the queue since we span threads */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing); /* when we received EOS, we refuse more data */ @@ -2119,7 +2248,7 @@ gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) goto out_flushing; /* put buffer in queue now */ - gst_queue2_locked_enqueue (queue, buffer, TRUE); + gst_queue2_locked_enqueue (queue, item, item_type); GST_QUEUE2_MUTEX_UNLOCK (queue); return GST_FLOW_OK; @@ -2132,7 +2261,7 @@ out_flushing: GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because task paused, reason: %s", gst_flow_get_name (ret)); GST_QUEUE2_MUTEX_UNLOCK (queue); - gst_buffer_unref (buffer); + gst_mini_object_unref (item); return ret; } @@ -2140,7 +2269,7 @@ out_eos: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); GST_QUEUE2_MUTEX_UNLOCK (queue); - gst_buffer_unref (buffer); + gst_mini_object_unref (item); return GST_FLOW_EOS; } @@ -2148,12 +2277,88 @@ out_unexpected: { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); GST_QUEUE2_MUTEX_UNLOCK (queue); - gst_buffer_unref (buffer); + gst_mini_object_unref (item); return GST_FLOW_EOS; } } +static GstFlowReturn +gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstQueue2 *queue; + + queue = GST_QUEUE2 (parent); + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of " + "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %" + GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), + GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); + + return gst_queue2_chain_buffer_or_buffer_list (queue, + GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER); +} + +static GstFlowReturn +gst_queue2_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * buffer_list) +{ + GstQueue2 *queue; + + queue = GST_QUEUE2 (parent); + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "received buffer list %p", buffer_list); + + return gst_queue2_chain_buffer_or_buffer_list (queue, + GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST); +} + +static GstMiniObject * +gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type) +{ + GstMiniObject *data; + + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream"); + + /* stop pushing buffers, we dequeue all items until we see an item that we + * can push again, which is EOS or SEGMENT. If there is nothing in the + * queue we can push, we set a flag to make the sinkpad refuse more + * buffers with an EOS return value until we receive something + * pushable again or we get flushed. */ + while ((data = gst_queue2_locked_dequeue (queue, item_type))) { + if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping EOS buffer %p", data); + gst_buffer_unref (GST_BUFFER_CAST (data)); + } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) { + GstEvent *event = GST_EVENT_CAST (data); + GstEventType type = GST_EVENT_TYPE (event); + + if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) { + /* we found a pushable item in the queue, push it out */ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event)); + return data; + } + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping EOS event %p", event); + gst_event_unref (event); + } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping EOS buffer list %p", data); + gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data)); + } + } + /* no more items in the queue. Set the unexpected flag so that upstream + * make us refuse any more buffers on the sinkpad. Since we will still + * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the + * task function does not shut down. */ + queue->unexpected = TRUE; + return NULL; +} + /* dequeue an item from the queue an push it downstream. This functions returns * the result of the push. */ static GstFlowReturn @@ -2161,16 +2366,16 @@ gst_queue2_push_one (GstQueue2 * queue) { GstFlowReturn result = GST_FLOW_OK; GstMiniObject *data; - gboolean is_buffer = FALSE; + GstQueue2ItemType item_type; - data = gst_queue2_locked_dequeue (queue, &is_buffer); + data = gst_queue2_locked_dequeue (queue, &item_type); if (data == NULL) goto no_item; next: GST_QUEUE2_MUTEX_UNLOCK (queue); - if (is_buffer) { + if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) { GstBuffer *buffer; #if 0 GstCaps *caps; @@ -2193,41 +2398,11 @@ next: /* need to check for srcresult here as well */ GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); if (result == GST_FLOW_EOS) { - GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream"); - /* stop pushing buffers, we dequeue all items until we see an item that we - * can push again, which is EOS or SEGMENT. If there is nothing in the - * queue we can push, we set a flag to make the sinkpad refuse more - * buffers with an EOS return value until we receive something - * pushable again or we get flushed. */ - while ((data = gst_queue2_locked_dequeue (queue, &is_buffer))) { - if (is_buffer) { - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "dropping EOS buffer %p", data); - gst_buffer_unref (GST_BUFFER_CAST (data)); - } else if (GST_IS_EVENT (data)) { - GstEvent *event = GST_EVENT_CAST (data); - GstEventType type = GST_EVENT_TYPE (event); - - if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) { - /* we found a pushable item in the queue, push it out */ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "pushing pushable event %s after EOS", - GST_EVENT_TYPE_NAME (event)); - goto next; - } - GST_CAT_LOG_OBJECT (queue_dataflow, queue, - "dropping EOS event %p", event); - gst_event_unref (event); - } - } - /* no more items in the queue. Set the unexpected flag so that upstream - * make us refuse any more buffers on the sinkpad. Since we will still - * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the - * task function does not shut down. */ - queue->unexpected = TRUE; - result = GST_FLOW_OK; + data = gst_queue2_dequeue_on_eos (queue, &item_type); + if (data != NULL) + goto next; } - } else if (GST_IS_EVENT (data)) { + } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) { GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); @@ -2241,6 +2416,30 @@ next: } GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); + } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) { + GstBufferList *buffer_list; + GstBuffer *first_buf; + GstCaps *caps; + + buffer_list = GST_BUFFER_LIST_CAST (data); + + first_buf = gst_buffer_list_get (buffer_list, 0, 0); + caps = (first_buf != NULL) ? GST_BUFFER_CAPS (first_buf) : NULL; + + /* set caps before pushing the buffer so that core does not try to do + * something fancy to check if this is possible. */ + if (caps && caps != GST_PAD_CAPS (queue->srcpad)) + gst_pad_set_caps (queue->srcpad, caps); + + result = gst_pad_push_list (queue->srcpad, buffer_list); + + /* need to check for srcresult here as well */ + GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing); + if (result == GST_FLOW_EOS) { + data = gst_queue2_dequeue_on_eos (queue, &item_type); + if (data != NULL) + goto next; + } } return result;