diff --git a/ChangeLog b/ChangeLog index 05f4fa851d..489557192d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,46 @@ +2007-06-15 Wim Taymans + + * libs/gst/base/gstdataqueue.c: (gst_data_queue_cleanup), + (gst_data_queue_finalize), (gst_data_queue_locked_is_empty), + (gst_data_queue_set_flushing), (gst_data_queue_push), + (gst_data_queue_pop), (gst_data_queue_drop_head), + (gst_data_queue_limits_changed), (gst_data_queue_get_level): + * libs/gst/base/gstdataqueue.h: + Various cleanups. + Added methods to get the current levels and to inform the queue that the + 'full' limits changed. + + * plugins/elements/gstmultiqueue.c: (gst_multi_queue_init), + (gst_multi_queue_finalize), (gst_multi_queue_set_property), + (gst_single_queue_flush), (update_time_level), (apply_segment), + (apply_buffer), (gst_single_queue_push_one), + (gst_multi_queue_item_steal_object), + (gst_multi_queue_item_destroy), (gst_multi_queue_item_new), + (gst_multi_queue_loop), (gst_multi_queue_chain), + (gst_multi_queue_sink_activate_push), (gst_multi_queue_sink_event), + (gst_multi_queue_getcaps), (gst_multi_queue_src_activate_push), + (gst_multi_queue_src_query), (single_queue_overrun_cb), + (single_queue_underrun_cb), (single_queue_check_full), + (gst_single_queue_new): + Keep track of time in the queue by measuring the difference between + running_time on input and output. This gives more accurate results and + can compensate for segments correctly. + Make a queue by default only 5 buffers deep. We will now increase the + buffer size depending on the filledness of the other queues. + Factor out commong flush code. + Make sure we don't add additional refcounts to buffers when we can avoid + it. + Propagate GstFlowReturn differently. + Use GSlice for intermediate GstMultiQueueItems. + Keep track of EOS. + Resize queues on over and underruns based on filled level of other + queues. + When checking if the queue is filled, prefer to measure in time if we + can and fall back to bytes when no time is known. + + * plugins/elements/gstqueue.c: + Fix return value. + 2007-06-15 Wim Taymans * libs/gst/base/gstbasetransform.c: diff --git a/libs/gst/base/gstdataqueue.c b/libs/gst/base/gstdataqueue.c index a3bfde2973..1ec4839acc 100644 --- a/libs/gst/base/gstdataqueue.c +++ b/libs/gst/base/gstdataqueue.c @@ -248,7 +248,6 @@ gst_data_queue_cleanup (GstDataQueue * queue) queue->cur_level.visible = 0; queue->cur_level.bytes = 0; queue->cur_level.time = 0; - } /* called only once, as opposed to dispose */ @@ -269,8 +268,7 @@ gst_data_queue_finalize (GObject * object) g_cond_free (queue->item_add); g_cond_free (queue->item_del); - if (G_OBJECT_CLASS (parent_class)->finalize) - G_OBJECT_CLASS (parent_class)->finalize (object); + G_OBJECT_CLASS (parent_class)->finalize (object); } static void @@ -304,7 +302,6 @@ gst_data_queue_locked_is_full (GstDataQueue * queue) * #gst_data_queue_pop will be released. * MT safe. */ - void gst_data_queue_flush (GstDataQueue * queue) { @@ -323,7 +320,6 @@ gst_data_queue_flush (GstDataQueue * queue) * * Returns: #TRUE if @queue is empty. */ - gboolean gst_data_queue_is_empty (GstDataQueue * queue) { @@ -346,7 +342,6 @@ gst_data_queue_is_empty (GstDataQueue * queue) * * Returns: #TRUE if @queue is full. */ - gboolean gst_data_queue_is_full (GstDataQueue * queue) { @@ -369,14 +364,14 @@ gst_data_queue_is_full (GstDataQueue * queue) * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight * away with a return value of #FALSE. While the @queue is in flushing state, * all calls to those two functions will return #FALSE. + * * MT Safe. */ - void gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing) { - GST_DEBUG ("queue:%p , flushing:%d", queue, flushing); + GST_DATA_QUEUE_MUTEX_LOCK (queue); queue->flushing = flushing; if (flushing) { @@ -407,28 +402,28 @@ gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing) gboolean gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item) { - gboolean res = FALSE; + g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); + g_return_val_if_fail (item != NULL, FALSE); - GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); STATUS (queue, "before pushing"); /* We ALWAYS need to check for queue fillness */ - while (gst_data_queue_locked_is_full (queue)) { + if (gst_data_queue_locked_is_full (queue)) { GST_DATA_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_FULL], 0); - GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); /* signal might have removed some items */ while (gst_data_queue_locked_is_full (queue)) { g_cond_wait (queue->item_del, queue->qlock); if (queue->flushing) - goto done; + goto flushing; } } g_queue_push_tail (queue->queue, item); - res = TRUE; if (item->visible) queue->cur_level.visible++; @@ -436,15 +431,19 @@ gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item) queue->cur_level.time += item->duration; STATUS (queue, "after pushing"); - g_cond_signal (queue->item_add); -done: GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - GST_DEBUG ("queue:%p, result:%d", queue, res); + return TRUE; - return res; + /* ERRORS */ +flushing: + { + GST_DEBUG ("queue:%p, we are flushing", queue); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + return FALSE; + } } /** @@ -459,35 +458,30 @@ done: * * Returns: #TRUE if an @item was successfully retrieved from the @queue. */ - gboolean gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item) { - gboolean res = FALSE; - - GST_DEBUG ("queue:%p", queue); - + g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); g_return_val_if_fail (item != NULL, FALSE); - GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); STATUS (queue, "before popping"); - while (gst_data_queue_locked_is_empty (queue)) { + if (gst_data_queue_locked_is_empty (queue)) { GST_DATA_QUEUE_MUTEX_UNLOCK (queue); g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_EMPTY], 0); - GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); + GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); while (gst_data_queue_locked_is_empty (queue)) { g_cond_wait (queue->item_add, queue->qlock); if (queue->flushing) - goto done; + goto flushing; } } /* Get the item from the GQueue */ *item = g_queue_pop_head (queue->queue); - res = TRUE; /* update current level counter */ if ((*item)->visible) @@ -495,14 +489,20 @@ gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item) queue->cur_level.bytes -= (*item)->size; queue->cur_level.time -= (*item)->duration; + STATUS (queue, "after popping"); g_cond_signal (queue->item_del); -done: GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - GST_DEBUG ("queue:%p , res:%d", queue, res); + return TRUE; - return res; + /* ERRORS */ +flushing: + { + GST_DEBUG ("queue:%p, we are flushing", queue); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); + return FALSE; + } } /** @@ -514,7 +514,6 @@ done: * * Returns: TRUE if an element was removed. */ - gboolean gst_data_queue_drop_head (GstDataQueue * queue, GType type) { @@ -522,6 +521,8 @@ gst_data_queue_drop_head (GstDataQueue * queue, GType type) GList *item; GstDataQueueItem *leak = NULL; + g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); + GST_DEBUG ("queue:%p", queue); GST_DATA_QUEUE_MUTEX_LOCK (queue); @@ -556,6 +557,39 @@ done: return res; } +/** + * gst_data_queue_limits_changed: + * @queue: The #GstDataQueue + * + * Inform the queue that the limits for the fullness check have changed and that + * any blocking gst_data_queue_push() should be unblocked to recheck the limts. + */ +void +gst_data_queue_limits_changed (GstDataQueue * queue) +{ + g_return_if_fail (GST_IS_DATA_QUEUE (queue)); + + GST_DATA_QUEUE_MUTEX_LOCK (queue); + GST_DEBUG ("signal del"); + g_cond_signal (queue->item_del); + GST_DATA_QUEUE_MUTEX_UNLOCK (queue); +} + +/** + * gst_data_queue_get_level: + * @queue: The #GstDataQueue + * @level: the location to store the result + * + * Get the current level of the queue. + */ +void +gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level) +{ + level->visible = queue->cur_level.visible; + level->bytes = queue->cur_level.bytes; + level->time = queue->cur_level.time; +} + static void gst_data_queue_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) diff --git a/libs/gst/base/gstdataqueue.h b/libs/gst/base/gstdataqueue.h index 503e44c74d..44bac88b21 100644 --- a/libs/gst/base/gstdataqueue.h +++ b/libs/gst/base/gstdataqueue.h @@ -137,19 +137,23 @@ struct _GstDataQueueClass GType gst_data_queue_get_type (void); -GstDataQueue *gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, - gpointer checkdata); +GstDataQueue * gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, + gpointer checkdata); -gboolean gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item); -gboolean gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item); +gboolean gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item); +gboolean gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item); -void gst_data_queue_flush (GstDataQueue * queue); -void gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing); +void gst_data_queue_flush (GstDataQueue * queue); +void gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing); -gboolean gst_data_queue_drop_head (GstDataQueue * queue, GType type); +gboolean gst_data_queue_drop_head (GstDataQueue * queue, GType type); -gboolean gst_data_queue_is_full (GstDataQueue * queue); -gboolean gst_data_queue_is_empty (GstDataQueue * queue); +gboolean gst_data_queue_is_full (GstDataQueue * queue); +gboolean gst_data_queue_is_empty (GstDataQueue * queue); + +void gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize *level); +void gst_data_queue_limits_changed (GstDataQueue * queue); G_END_DECLS + #endif /* __GST_DATA_QUEUE_H__ */ diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index d4208af11f..8b1309c3be 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -34,7 +34,6 @@ * Structure containing all information and properties about * a single queue. */ - typedef struct _GstSingleQueue GstSingleQueue; struct _GstSingleQueue @@ -49,10 +48,14 @@ struct _GstSingleQueue /* flowreturn of previous srcpad push */ GstFlowReturn srcresult; + GstSegment sink_segment; + GstSegment src_segment; /* queue of data */ GstDataQueue *queue; GstDataQueueSize max_size, extra_size; + GstClockTime cur_time; + gboolean is_eos; gboolean inextra; /* TRUE if the queue is currently in extradata mode */ /* Protected by global lock */ @@ -101,13 +104,20 @@ GST_ELEMENT_DETAILS ("MultiQueue", "Multiple data queue", "Edward Hervey "); +/* default limits, we try to keep up to 2 seconds of data and if there is not + * time, up to 10 MB. The number of buffers is dynamically scaled to make sure + * there is data in the queues. Normally, the byte and time limits are not hit + * in theses conditions. */ #define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */ -#define DEFAULT_MAX_SIZE_BUFFERS 200 -#define DEFAULT_MAX_SIZE_TIME GST_SECOND +#define DEFAULT_MAX_SIZE_BUFFERS 5 +#define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND +/* second limits. When we hit one of the above limits we are probably dealing + * with a badly muxed file and we scale the limits to these emergency values. + * This is currently not yet implemented. */ #define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */ -#define DEFAULT_EXTRA_SIZE_BUFFERS 200 -#define DEFAULT_EXTRA_SIZE_TIME GST_SECOND +#define DEFAULT_EXTRA_SIZE_BUFFERS 5 +#define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND /* Signals and args */ enum @@ -129,19 +139,10 @@ enum }; #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ - GST_CAT_LOG_OBJECT (multi_queue_debug, q, \ - "locking qlock from thread %p", \ - g_thread_self ()); \ g_mutex_lock (q->qlock); \ - GST_CAT_LOG_OBJECT (multi_queue_debug, q, \ - "locked qlock from thread %p", \ - g_thread_self ()); \ } G_STMT_END #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ - GST_CAT_LOG_OBJECT (multi_queue_debug, q, \ - "unlocking qlock from thread %p", \ - g_thread_self ()); \ g_mutex_unlock (q->qlock); \ } G_STMT_END @@ -155,6 +156,8 @@ static GstPad *gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp, const gchar * name); static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad); +static void gst_multi_queue_loop (GstPad * pad); + #define _do_init(bla) \ GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element"); @@ -236,7 +239,6 @@ gst_multi_queue_class_init (GstMultiQueueClass * klass) static void gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass) { - mqueue->nbqueues = 0; mqueue->queues = NULL; @@ -253,8 +255,6 @@ gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass) mqueue->nextnotlinked = -1; mqueue->qlock = g_mutex_new (); - - /* FILLME ? */ } static void @@ -269,8 +269,7 @@ gst_multi_queue_finalize (GObject * object) /* free/unref instance data */ g_mutex_free (mqueue->qlock); - if (G_OBJECT_CLASS (parent_class)->finalize) - G_OBJECT_CLASS (parent_class)->finalize (object); + G_OBJECT_CLASS (parent_class)->finalize (object); } #define SET_CHILD_PROPERTY(mq,name,value) G_STMT_START { \ @@ -314,7 +313,6 @@ gst_multi_queue_set_property (GObject * object, guint prop_id, G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } - } static void @@ -423,59 +421,202 @@ gst_multi_queue_release_pad (GstElement * element, GstPad * pad) } static gboolean +gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush) +{ + gboolean result; + + GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"), + sq->id); + + if (flush) { + sq->srcresult = GST_FLOW_WRONG_STATE; + gst_data_queue_set_flushing (sq->queue, TRUE); + + /* wake up non-linked task */ + GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task", + sq->id); + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + g_cond_signal (sq->turn); + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + + GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id); + result = gst_pad_pause_task (sq->srcpad); + } else { + gst_data_queue_flush (sq->queue); + gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); + gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); + sq->srcresult = GST_FLOW_OK; + sq->max_size.visible = mq->max_size.visible; + sq->is_eos = FALSE; + sq->inextra = FALSE; + sq->nextid = -1; + sq->oldid = -1; + gst_data_queue_set_flushing (sq->queue, FALSE); + + GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id); + result = + gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop, + sq->srcpad); + } + return result; +} + +/* calculate the diff between running time on the sink and src of the queue. + * This is the total amount of time in the queue. */ +static void +update_time_level (GstMultiQueue * mq, GstSingleQueue * sq) +{ + gint64 sink_time, src_time; + + sink_time = + gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME, + sq->sink_segment.last_stop); + + src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME, + sq->src_segment.last_stop); + + GST_DEBUG_OBJECT (mq, + "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id, + GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time)); + + if (sink_time >= src_time) + sq->cur_time = sink_time - src_time; + else + sq->cur_time = 0; +} + +/* take a NEWSEGMENT event and apply the values to segment, updating the time + * level of queue. */ +static void +apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event, + GstSegment * segment) +{ + gboolean update; + GstFormat format; + gdouble rate, arate; + gint64 start, stop, time; + + gst_event_parse_new_segment_full (event, &update, &rate, &arate, + &format, &start, &stop, &time); + + /* now configure the values, we use these to track timestamps on the + * sinkpad. */ + if (format != GST_FORMAT_TIME) { + /* non-time format, pretent the current time segment is closed with a + * 0 start and unknown stop time. */ + update = FALSE; + format = GST_FORMAT_TIME; + start = 0; + stop = -1; + time = 0; + } + gst_segment_set_newsegment_full (segment, update, + rate, arate, format, start, stop, time); + + GST_DEBUG_OBJECT (mq, + "queue %d, configured NEWSEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment); + + /* segment can update the time level of the queue */ + update_time_level (mq, sq); +} + +/* take a buffer and update segment, updating the time level of the queue. */ +static void +apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstBuffer * buffer, + GstSegment * segment) +{ + GstClockTime duration, timestamp; + + timestamp = GST_BUFFER_TIMESTAMP (buffer); + duration = GST_BUFFER_DURATION (buffer); + + /* if no timestamp is set, assume it's continuous with the previous + * time */ + if (timestamp == GST_CLOCK_TIME_NONE) + timestamp = segment->last_stop; + + /* add duration */ + if (duration != GST_CLOCK_TIME_NONE) + timestamp += duration; + + GST_DEBUG_OBJECT (mq, "queue %d, last_stop updated to %" GST_TIME_FORMAT, + sq->id, GST_TIME_ARGS (timestamp)); + + gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp); + + /* calc diff with other end */ + update_time_level (mq, sq); +} + +static GstFlowReturn gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, GstMiniObject * object) { + GstFlowReturn result = GST_FLOW_OK; + if (GST_IS_BUFFER (object)) { - GstBuffer *buf; + GstBuffer *buffer; - buf = gst_buffer_ref (GST_BUFFER_CAST (object)); - sq->srcresult = gst_pad_push (sq->srcpad, buf); + buffer = GST_BUFFER_CAST (object); - if ((sq->srcresult != GST_FLOW_OK) - && (sq->srcresult != GST_FLOW_NOT_LINKED)) { - GST_DEBUG_OBJECT (mq, "GstSingleQueue %d : pausing queue, reason %s", - sq->id, gst_flow_get_name (sq->srcresult)); - gst_data_queue_set_flushing (sq->queue, TRUE); - gst_pad_pause_task (sq->srcpad); - } + apply_buffer (mq, sq, buffer, &sq->src_segment); + + result = gst_pad_push (sq->srcpad, buffer); } else if (GST_IS_EVENT (object)) { GstEvent *event; - event = gst_event_ref (GST_EVENT_CAST (object)); - if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { - sq->srcresult = GST_FLOW_UNEXPECTED; + event = GST_EVENT_CAST (object); - GST_DEBUG_OBJECT (mq, "GstSingleQueue %d : pausing queue, got EOS", - sq->id); - gst_data_queue_set_flushing (sq->queue, TRUE); - gst_pad_pause_task (sq->srcpad); + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + result = GST_FLOW_UNEXPECTED; + break; + case GST_EVENT_NEWSEGMENT: + apply_segment (mq, sq, event, &sq->src_segment); + break; + default: + break; } + gst_pad_push_event (sq->srcpad, event); } else { g_warning ("Unexpected object in singlequeue %d (refcounting problem?)", sq->id); } + return result; - return FALSE; + /* ERRORS */ +} + +static GstMiniObject * +gst_multi_queue_item_steal_object (GstMultiQueueItem * item) +{ + GstMiniObject *res; + + res = item->object; + item->object = NULL; + + return res; } static void gst_multi_queue_item_destroy (GstMultiQueueItem * item) { - gst_mini_object_unref (item->object); - g_free (item); + if (item->object) + gst_mini_object_unref (item->object); + g_slice_free (GstMultiQueueItem, item); } /* takes ownership of passed mini object! */ static GstMultiQueueItem * -gst_multi_queue_item_new (GstMiniObject * object) +gst_multi_queue_item_new (GstMiniObject * object, guint32 curid) { GstMultiQueueItem *item; - item = g_new0 (GstMultiQueueItem, 1); + item = g_slice_new (GstMultiQueueItem); item->object = object; item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy; + item->posid = curid; if (GST_IS_BUFFER (object)) { item->size = GST_BUFFER_SIZE (object); @@ -483,8 +624,11 @@ gst_multi_queue_item_new (GstMiniObject * object) if (item->duration == GST_CLOCK_TIME_NONE) item->duration = 0; item->visible = TRUE; + } else { + item->size = 0; + item->duration = 0; + item->visible = FALSE; } - return item; } @@ -498,101 +642,95 @@ gst_multi_queue_loop (GstPad * pad) GstMiniObject *object; guint32 newid; guint32 oldid = -1; + GstFlowReturn result; sq = (GstSingleQueue *) gst_pad_get_element_private (pad); mq = sq->mqueue; restart: GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); - if (!(gst_data_queue_pop (sq->queue, &sitem))) { - /* QUEUE FLUSHING */ - if (sq->srcresult != GST_FLOW_OK) - goto out_flushing; - else - GST_WARNING_OBJECT (mq, - "data_queue_pop() returned FALSE, but srcresult == GST_FLOW_OK !"); - } else { - item = (GstMultiQueueItem *) sitem; - newid = item->posid; - object = item->object; - GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", - sq->id, newid, oldid); + if (!(gst_data_queue_pop (sq->queue, &sitem))) + goto out_flushing; - /* 1. Only check turn if : - * _ We haven't pushed anything yet - * _ OR the new id isn't the follower of the previous one (continuous segment) */ - if ((oldid == -1) || (newid != (oldid + 1))) { - GST_MULTI_QUEUE_MUTEX_LOCK (mq); + item = (GstMultiQueueItem *) sitem; + newid = item->posid; + /* steal the object and destroy the item */ + object = gst_multi_queue_item_steal_object (item); + gst_multi_queue_item_destroy (item); - GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", - gst_flow_get_name (sq->srcresult)); + GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", + sq->id, newid, oldid); - /* preamble : if we're not linked, set the newid as the next one we want */ - if (sq->srcresult == GST_FLOW_NOT_LINKED) - sq->nextid = newid; + /* 1. Only check turn if : + * _ We haven't pushed anything yet + * _ OR the new id isn't the follower of the previous one (continuous segment) */ + if ((oldid == -1) || (newid != (oldid + 1))) { + GST_MULTI_QUEUE_MUTEX_LOCK (mq); - /* store the last id we outputted */ - if (oldid != -1) - sq->oldid = oldid; + GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); - /* 2. If there's a queue waiting to push, wake it up. If it's us the */ - /* check below (3.) will avoid us waiting. */ - wake_up_next_non_linked (mq); + /* preamble : if we're not linked, set the newid as the next one we want */ + if (sq->srcresult == GST_FLOW_NOT_LINKED) + sq->nextid = newid; - /* 3. If we're not linked AND our nextid is higher than the highest oldid outputted - * _ Update global next-not-linked - * _ Wait on our conditional - */ - while ((sq->srcresult == GST_FLOW_NOT_LINKED) - && (mq->nextnotlinked != sq->id)) { - compute_next_non_linked (mq); - g_cond_wait (sq->turn, mq->qlock); - } + /* store the last id we outputted */ + if (oldid != -1) + sq->oldid = oldid; - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + /* 2. If there's a queue waiting to push, wake it up. If it's us the */ + /* check below (3.) will avoid us waiting. */ + wake_up_next_non_linked (mq); - /* 4. Check again status, maybe we're flushing */ - if ((sq->srcresult != GST_FLOW_OK) - && (sq->srcresult != GST_FLOW_NOT_LINKED)) { - gst_multi_queue_item_destroy (item); - goto out_flushing; - } + /* 3. If we're not linked AND our nextid is higher than the highest oldid outputted + * _ Update global next-not-linked + * _ Wait on our conditional + */ + while ((sq->srcresult == GST_FLOW_NOT_LINKED) + && (mq->nextnotlinked != sq->id)) { + compute_next_non_linked (mq); + g_cond_wait (sq->turn, mq->qlock); } - - GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", - gst_flow_get_name (sq->srcresult)); - - /* 4. Try to push out the new object */ - gst_single_queue_push_one (mq, sq, object); - - GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s", - gst_flow_get_name (sq->srcresult)); - - gst_multi_queue_item_destroy (item); - oldid = newid; - - /* 5. if GstFlowReturn is non-fatal, goto restart */ - if ((sq->srcresult == GST_FLOW_OK) - || (sq->srcresult == GST_FLOW_NOT_LINKED)) - goto restart; + /* 4. Check again status, maybe we're flushing */ + if ((sq->srcresult != GST_FLOW_OK)) { + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + gst_mini_object_unref (object); + goto out_flushing; + } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } + GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); -beach: - return; + /* 4. Try to push out the new object */ + result = gst_single_queue_push_one (mq, sq, object); + sq->srcresult = result; + if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED) + goto out_flushing; + + GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s", + gst_flow_get_name (sq->srcresult)); + + oldid = newid; + + /* restart to get the next element */ + goto restart; + + /* ERRORS */ out_flushing: { + gst_data_queue_set_flushing (sq->queue, TRUE); gst_pad_pause_task (sq->srcpad); GST_CAT_LOG_OBJECT (multi_queue_debug, mq, "SingleQueue[%d] task paused, reason:%s", sq->id, gst_flow_get_name (sq->srcresult)); - goto beach; + return; } } - /** * gst_multi_queue_chain: * @@ -600,7 +738,6 @@ out_flushing: * _ we don't have leak behavioures, * _ we push with a unique id (curid) */ - static GstFlowReturn gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer) { @@ -613,7 +750,7 @@ gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer) sq = gst_pad_get_element_private (pad); mq = (GstMultiQueue *) gst_pad_get_parent (pad); - /* Get an unique incrementing id */ + /* Get a unique incrementing id */ GST_MULTI_QUEUE_MUTEX_LOCK (mq); curid = mq->counter++; GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); @@ -621,18 +758,28 @@ gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer) GST_LOG_OBJECT (mq, "SingleQueue %d : about to push buffer with id %d", sq->id, curid); - item = gst_multi_queue_item_new ((GstMiniObject *) buffer); - item->posid = curid; + item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid); - if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) { - GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", - sq->id, gst_flow_get_name (sq->srcresult)); - gst_multi_queue_item_destroy (item); - ret = sq->srcresult; - } + /* update time level */ + apply_buffer (mq, sq, buffer, &sq->sink_segment); + if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) + goto flushing; + +done: gst_object_unref (mq); + return ret; + + /* ERRORS */ +flushing: + { + ret = sq->srcresult; + GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", + sq->id, gst_flow_get_name (ret)); + gst_multi_queue_item_destroy (item); + goto done; + } } static gboolean @@ -642,13 +789,12 @@ gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active) sq = (GstSingleQueue *) gst_pad_get_element_private (pad); - if (active) + if (active) { sq->srcresult = GST_FLOW_OK; - else { + } else { sq->srcresult = GST_FLOW_WRONG_STATE; gst_data_queue_flush (sq->queue); } - return TRUE; } @@ -659,50 +805,40 @@ gst_multi_queue_sink_event (GstPad * pad, GstEvent * event) GstMultiQueue *mq; guint32 curid; GstMultiQueueItem *item; + gboolean res; + GstEventType type; sq = (GstSingleQueue *) gst_pad_get_element_private (pad); mq = (GstMultiQueue *) gst_pad_get_parent (pad); - switch (GST_EVENT_TYPE (event)) { + type = GST_EVENT_TYPE (event); + + switch (type) { case GST_EVENT_FLUSH_START: GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event", sq->id); - gst_pad_push_event (sq->srcpad, event); + res = gst_pad_push_event (sq->srcpad, event); - sq->srcresult = GST_FLOW_WRONG_STATE; - gst_data_queue_set_flushing (sq->queue, TRUE); - - /* wake up non-linked task */ - GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task", - sq->id); - GST_MULTI_QUEUE_MUTEX_LOCK (mq); - g_cond_signal (sq->turn); - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - - gst_pad_pause_task (sq->srcpad); + gst_single_queue_flush (mq, sq, TRUE); goto done; case GST_EVENT_FLUSH_STOP: GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event", sq->id); - gst_pad_push_event (sq->srcpad, event); + res = gst_pad_push_event (sq->srcpad, event); - gst_data_queue_flush (sq->queue); - gst_data_queue_set_flushing (sq->queue, FALSE); - sq->srcresult = GST_FLOW_OK; - sq->nextid = -1; - sq->oldid = -1; - - GST_DEBUG_OBJECT (mq, "SingleQueue %d : restarting task", sq->id); - gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop, - sq->srcpad); + gst_single_queue_flush (mq, sq, FALSE); goto done; + case GST_EVENT_NEWSEGMENT: + apply_segment (mq, sq, event, &sq->sink_segment); + break; + default: if (!(GST_EVENT_IS_SERIALIZED (event))) { - gst_pad_push_event (sq->srcpad, event); + res = gst_pad_push_event (sq->srcpad, event); goto done; } break; @@ -713,22 +849,33 @@ gst_multi_queue_sink_event (GstPad * pad, GstEvent * event) curid = mq->counter++; GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - item = gst_multi_queue_item_new ((GstMiniObject *) event); - item->posid = curid; + item = gst_multi_queue_item_new ((GstMiniObject *) event, curid); GST_DEBUG_OBJECT (mq, "SingleQueue %d : Adding event %p of type %s with id %d", sq->id, event, GST_EVENT_TYPE_NAME (event), curid); - if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) { - GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", - sq->id, gst_flow_get_name (sq->srcresult)); - gst_multi_queue_item_destroy (item); - } + if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) + goto flushing; + + /* mark EOS when we received one, we must do that after putting the + * buffer in the queue because EOS marks the buffer as filled. No need to take + * a lock, the _check_full happens from this thread only, right before pushing + * into dataqueue. */ + if (type == GST_EVENT_EOS) + sq->is_eos = TRUE; done: gst_object_unref (mq); - return TRUE; + return res; + +flushing: + { + GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", + sq->id, gst_flow_get_name (sq->srcresult)); + gst_multi_queue_item_destroy (item); + goto done; + } } static GstCaps * @@ -738,8 +885,6 @@ gst_multi_queue_getcaps (GstPad * pad) GstPad *otherpad; GstCaps *result; - GST_LOG_OBJECT (pad, "..."); - otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad; GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad"); @@ -773,27 +918,12 @@ gst_multi_queue_src_activate_push (GstPad * pad, gboolean active) GST_LOG ("SingleQueue %d", sq->id); if (active) { - sq->srcresult = GST_FLOW_OK; - gst_data_queue_set_flushing (sq->queue, FALSE); - result = gst_pad_start_task (pad, (GstTaskFunction) gst_multi_queue_loop, - pad); + result = gst_single_queue_flush (mq, sq, FALSE); } else { - /* 1. unblock loop function */ - sq->srcresult = GST_FLOW_WRONG_STATE; - gst_data_queue_set_flushing (sq->queue, TRUE); - - /* 2. unblock potentially non-linked pad */ - GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task", - sq->id); - GST_MULTI_QUEUE_MUTEX_LOCK (mq); - g_cond_signal (sq->turn); - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - - /* 3. make sure streaming finishes */ - result = gst_pad_stop_task (pad); - gst_data_queue_set_flushing (sq->queue, FALSE); + result = gst_single_queue_flush (mq, sq, TRUE); + /* make sure streaming finishes */ + result |= gst_pad_stop_task (pad); } - return result; } @@ -818,8 +948,7 @@ gst_multi_queue_src_query (GstPad * pad, GstQuery * query) GstPad *peerpad; gboolean res; - /* FILLME */ - /* Handle position offset depending on queue size */ + /* FIXME, Handle position offset depending on queue size */ /* default handling */ if (!(peerpad = gst_pad_get_peer (sq->sinkpad))) @@ -831,6 +960,7 @@ gst_multi_queue_src_query (GstPad * pad, GstQuery * query) return res; + /* ERRORS */ no_peer: { GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer"); @@ -838,7 +968,6 @@ no_peer: } } - /* * Next-non-linked functions */ @@ -903,32 +1032,34 @@ compute_next_non_linked (GstMultiQueue * mq) /* * GstSingleQueue functions */ - static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) { GstMultiQueue *mq = sq->mqueue; GList *tmp; + GstDataQueueSize size; + + gst_data_queue_get_level (sq->queue, &size); GST_LOG_OBJECT (sq->mqueue, "Single Queue %d is full", sq->id); - if (!sq->inextra) { - /* Check if at least one other queue is empty... */ - GST_MULTI_QUEUE_MUTEX_LOCK (mq); - for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { - GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; + GST_MULTI_QUEUE_MUTEX_LOCK (mq); + for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { + GstSingleQueue *ssq = (GstSingleQueue *) tmp->data; - if (gst_data_queue_is_empty (ssq->queue)) { - /* ... if so set sq->inextra to TRUE and don't emit overrun signal */ + if (gst_data_queue_is_empty (ssq->queue)) { + if (size.visible == sq->max_size.visible) { + sq->max_size.visible++; GST_DEBUG_OBJECT (mq, - "Another queue is empty, bumping single queue into extra data mode"); - sq->inextra = TRUE; - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); - goto beach; + "Another queue is empty, bumping single queue %d max visible to %d", + sq->id, sq->max_size.visible); + g_print ("overrun: queue %d, limit %d\n", sq->id, sq->max_size.visible); } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); + goto beach; } - GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); } + GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); /* Overrun is always forwarded, since this is blocking the upstream element */ g_signal_emit (G_OBJECT (sq->mqueue), gst_multi_queue_signals[SIGNAL_OVERRUN], @@ -946,17 +1077,28 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq) GList *tmp; GST_LOG_OBJECT (mq, - "Single Queue %d is empty, Checking if all single queues are empty", - sq->id); + "Single Queue %d is empty, Checking other single queues", sq->id); GST_MULTI_QUEUE_MUTEX_LOCK (mq); for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { GstSingleQueue *sq = (GstSingleQueue *) tmp->data; - if (!gst_data_queue_is_empty (sq->queue)) { - empty = FALSE; - break; + if (gst_data_queue_is_full (sq->queue)) { + GstDataQueueSize size; + + gst_data_queue_get_level (sq->queue, &size); + if (size.visible == sq->max_size.visible) { + sq->max_size.visible++; + GST_DEBUG_OBJECT (mq, + "queue %d is filled, bumping its max visible to %d", sq->id, + sq->max_size.visible); + g_print ("underrun: queue %d, limit %d\n", sq->id, + sq->max_size.visible); + gst_data_queue_limits_changed (sq->queue); + } } + if (!gst_data_queue_is_empty (sq->queue)) + empty = FALSE; } GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); @@ -972,31 +1114,27 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes, { gboolean res; - /* In all cases (extra mode or not), we check how the queue current level - * compares to max_size. */ - res = (((sq->max_size.visible != 0) && - sq->max_size.visible < visible) || - ((sq->max_size.bytes != 0) && - sq->max_size.bytes < bytes) || - ((sq->max_size.time != 0) && sq->max_size.time < time)); + GST_DEBUG ("queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT + "/%" G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes, + sq->max_size.bytes, sq->cur_time, sq->max_size.time); - if (G_UNLIKELY (sq->inextra)) { - /* If we're in extra mode, one of two things can happen to check for - * fullness: */ + /* we are always filled on EOS */ + if (sq->is_eos) + return TRUE; - if (!res) - /* #1 : Either we are not full against normal max_size levels, in which - * case we can go out of extra mode. */ - sq->inextra = FALSE; - else - /* #2 : Or else, the check should be done against max_size + extra_size */ - res = (((sq->max_size.visible != 0) && - (sq->max_size.visible + sq->extra_size.visible) < visible) || - ((sq->max_size.bytes != 0) && - (sq->max_size.bytes + sq->extra_size.bytes) < bytes) || - ((sq->max_size.time != 0) && - (sq->max_size.time + sq->extra_size.time) < time)); +#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \ + (sq->max_size.format) <= (value)) + /* we never go past the max visible items */ + if (IS_FILLED (visible, visible)) + return TRUE; + + if (sq->cur_time != 0) { + /* if we have valid time in the queue, check */ + res = IS_FILLED (time, sq->cur_time); + } else { + /* no valid time, check bytes */ + res = IS_FILLED (bytes, bytes); } return res; } @@ -1024,8 +1162,6 @@ gst_single_queue_new (GstMultiQueue * mqueue) /* copy over max_size and extra_size so we don't need to take the lock * any longer when checking if the queue is full. */ - /* FIXME : We can't modify those values once the single queue is created - * since we don't have any lock protecting those values. */ sq->max_size.visible = mqueue->max_size.visible; sq->max_size.bytes = mqueue->max_size.bytes; sq->max_size.time = mqueue->max_size.time; @@ -1039,20 +1175,20 @@ gst_single_queue_new (GstMultiQueue * mqueue) GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id); sq->mqueue = mqueue; - sq->srcresult = GST_FLOW_OK; + sq->srcresult = GST_FLOW_WRONG_STATE; sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction) single_queue_check_full, sq); + sq->is_eos = FALSE; + gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME); + gst_segment_init (&sq->src_segment, GST_FORMAT_TIME); sq->nextid = -1; sq->oldid = -1; sq->turn = g_cond_new (); - /* FIXME : attach to underrun/overrun signals to handle non-starvation - * OR should this be handled when we check if the queue is full/empty before pushing/popping ? */ - + /* attach to underrun/overrun signals to handle non-starvation */ g_signal_connect (G_OBJECT (sq->queue), "full", G_CALLBACK (single_queue_overrun_cb), sq); - g_signal_connect (G_OBJECT (sq->queue), "empty", G_CALLBACK (single_queue_underrun_cb), sq); diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index e2416cba2b..3172706a89 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -195,7 +195,7 @@ static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer); static GstFlowReturn gst_queue_bufferalloc (GstPad * pad, guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf); static gboolean gst_queue_acceptcaps (GstPad * pad, GstCaps * caps); -static gboolean gst_queue_push_one (GstQueue * queue); +static GstFlowReturn gst_queue_push_one (GstQueue * queue); static void gst_queue_loop (GstPad * pad); static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event);