libs/gst/base/gstdataqueue.*: Various cleanups.

Original commit message from CVS:
* libs/gst/base/gstdataqueue.c: (gst_data_queue_cleanup),
(gst_data_queue_finalize), (gst_data_queue_locked_is_empty),
(gst_data_queue_set_flushing), (gst_data_queue_push),
(gst_data_queue_pop), (gst_data_queue_drop_head),
(gst_data_queue_limits_changed), (gst_data_queue_get_level):
* libs/gst/base/gstdataqueue.h:
Various cleanups.
Added methods to get the current levels and to inform the queue that the
'full' limits changed.
* plugins/elements/gstmultiqueue.c: (gst_multi_queue_init),
(gst_multi_queue_finalize), (gst_multi_queue_set_property),
(gst_single_queue_flush), (update_time_level), (apply_segment),
(apply_buffer), (gst_single_queue_push_one),
(gst_multi_queue_item_steal_object),
(gst_multi_queue_item_destroy), (gst_multi_queue_item_new),
(gst_multi_queue_loop), (gst_multi_queue_chain),
(gst_multi_queue_sink_activate_push), (gst_multi_queue_sink_event),
(gst_multi_queue_getcaps), (gst_multi_queue_src_activate_push),
(gst_multi_queue_src_query), (single_queue_overrun_cb),
(single_queue_underrun_cb), (single_queue_check_full),
(gst_single_queue_new):
Keep track of time in the queue by measuring the difference between
running_time on input and output. This gives more accurate results and
can compensate for segments correctly.
Make a queue by default only 5 buffers deep. We will now increase the
buffer size depending on the filledness of the other queues.
Factor out commong flush code.
Make sure we don't add additional refcounts to buffers when we can avoid
it.
Propagate GstFlowReturn differently.
Use GSlice for intermediate GstMultiQueueItems.
Keep track of EOS.
Resize queues on over and underruns based on filled level of other
queues.
When checking if the queue is filled, prefer to measure in time if we
can and fall back to bytes when no time is known.
* plugins/elements/gstqueue.c:
Fix return value.
This commit is contained in:
Wim Taymans 2007-06-15 11:00:32 +00:00
parent 58cc3d675b
commit 0a3da772bb
5 changed files with 484 additions and 267 deletions

View file

@ -1,3 +1,46 @@
2007-06-15 Wim Taymans <wim@fluendo.com>
* libs/gst/base/gstdataqueue.c: (gst_data_queue_cleanup),
(gst_data_queue_finalize), (gst_data_queue_locked_is_empty),
(gst_data_queue_set_flushing), (gst_data_queue_push),
(gst_data_queue_pop), (gst_data_queue_drop_head),
(gst_data_queue_limits_changed), (gst_data_queue_get_level):
* libs/gst/base/gstdataqueue.h:
Various cleanups.
Added methods to get the current levels and to inform the queue that the
'full' limits changed.
* plugins/elements/gstmultiqueue.c: (gst_multi_queue_init),
(gst_multi_queue_finalize), (gst_multi_queue_set_property),
(gst_single_queue_flush), (update_time_level), (apply_segment),
(apply_buffer), (gst_single_queue_push_one),
(gst_multi_queue_item_steal_object),
(gst_multi_queue_item_destroy), (gst_multi_queue_item_new),
(gst_multi_queue_loop), (gst_multi_queue_chain),
(gst_multi_queue_sink_activate_push), (gst_multi_queue_sink_event),
(gst_multi_queue_getcaps), (gst_multi_queue_src_activate_push),
(gst_multi_queue_src_query), (single_queue_overrun_cb),
(single_queue_underrun_cb), (single_queue_check_full),
(gst_single_queue_new):
Keep track of time in the queue by measuring the difference between
running_time on input and output. This gives more accurate results and
can compensate for segments correctly.
Make a queue by default only 5 buffers deep. We will now increase the
buffer size depending on the filledness of the other queues.
Factor out commong flush code.
Make sure we don't add additional refcounts to buffers when we can avoid
it.
Propagate GstFlowReturn differently.
Use GSlice for intermediate GstMultiQueueItems.
Keep track of EOS.
Resize queues on over and underruns based on filled level of other
queues.
When checking if the queue is filled, prefer to measure in time if we
can and fall back to bytes when no time is known.
* plugins/elements/gstqueue.c:
Fix return value.
2007-06-15 Wim Taymans <wim@fluendo.com> 2007-06-15 Wim Taymans <wim@fluendo.com>
* libs/gst/base/gstbasetransform.c: * libs/gst/base/gstbasetransform.c:

View file

@ -248,7 +248,6 @@ gst_data_queue_cleanup (GstDataQueue * queue)
queue->cur_level.visible = 0; queue->cur_level.visible = 0;
queue->cur_level.bytes = 0; queue->cur_level.bytes = 0;
queue->cur_level.time = 0; queue->cur_level.time = 0;
} }
/* called only once, as opposed to dispose */ /* called only once, as opposed to dispose */
@ -269,8 +268,7 @@ gst_data_queue_finalize (GObject * object)
g_cond_free (queue->item_add); g_cond_free (queue->item_add);
g_cond_free (queue->item_del); g_cond_free (queue->item_del);
if (G_OBJECT_CLASS (parent_class)->finalize) G_OBJECT_CLASS (parent_class)->finalize (object);
G_OBJECT_CLASS (parent_class)->finalize (object);
} }
static void static void
@ -304,7 +302,6 @@ gst_data_queue_locked_is_full (GstDataQueue * queue)
* #gst_data_queue_pop will be released. * #gst_data_queue_pop will be released.
* MT safe. * MT safe.
*/ */
void void
gst_data_queue_flush (GstDataQueue * queue) gst_data_queue_flush (GstDataQueue * queue)
{ {
@ -323,7 +320,6 @@ gst_data_queue_flush (GstDataQueue * queue)
* *
* Returns: #TRUE if @queue is empty. * Returns: #TRUE if @queue is empty.
*/ */
gboolean gboolean
gst_data_queue_is_empty (GstDataQueue * queue) gst_data_queue_is_empty (GstDataQueue * queue)
{ {
@ -346,7 +342,6 @@ gst_data_queue_is_empty (GstDataQueue * queue)
* *
* Returns: #TRUE if @queue is full. * Returns: #TRUE if @queue is full.
*/ */
gboolean gboolean
gst_data_queue_is_full (GstDataQueue * queue) gst_data_queue_is_full (GstDataQueue * queue)
{ {
@ -369,14 +364,14 @@ gst_data_queue_is_full (GstDataQueue * queue)
* blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
* away with a return value of #FALSE. While the @queue is in flushing state, * away with a return value of #FALSE. While the @queue is in flushing state,
* all calls to those two functions will return #FALSE. * all calls to those two functions will return #FALSE.
*
* MT Safe. * MT Safe.
*/ */
void void
gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing) gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
{ {
GST_DEBUG ("queue:%p , flushing:%d", queue, flushing); GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
GST_DATA_QUEUE_MUTEX_LOCK (queue); GST_DATA_QUEUE_MUTEX_LOCK (queue);
queue->flushing = flushing; queue->flushing = flushing;
if (flushing) { if (flushing) {
@ -407,28 +402,28 @@ gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
gboolean gboolean
gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item) gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
{ {
gboolean res = FALSE; g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
g_return_val_if_fail (item != NULL, FALSE);
GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
STATUS (queue, "before pushing"); STATUS (queue, "before pushing");
/* We ALWAYS need to check for queue fillness */ /* We ALWAYS need to check for queue fillness */
while (gst_data_queue_locked_is_full (queue)) { if (gst_data_queue_locked_is_full (queue)) {
GST_DATA_QUEUE_MUTEX_UNLOCK (queue); GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_FULL], 0); g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_FULL], 0);
GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
/* signal might have removed some items */ /* signal might have removed some items */
while (gst_data_queue_locked_is_full (queue)) { while (gst_data_queue_locked_is_full (queue)) {
g_cond_wait (queue->item_del, queue->qlock); g_cond_wait (queue->item_del, queue->qlock);
if (queue->flushing) if (queue->flushing)
goto done; goto flushing;
} }
} }
g_queue_push_tail (queue->queue, item); g_queue_push_tail (queue->queue, item);
res = TRUE;
if (item->visible) if (item->visible)
queue->cur_level.visible++; queue->cur_level.visible++;
@ -436,15 +431,19 @@ gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
queue->cur_level.time += item->duration; queue->cur_level.time += item->duration;
STATUS (queue, "after pushing"); STATUS (queue, "after pushing");
g_cond_signal (queue->item_add); g_cond_signal (queue->item_add);
done:
GST_DATA_QUEUE_MUTEX_UNLOCK (queue); GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
GST_DEBUG ("queue:%p, result:%d", queue, res); return TRUE;
return res; /* ERRORS */
flushing:
{
GST_DEBUG ("queue:%p, we are flushing", queue);
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
return FALSE;
}
} }
/** /**
@ -459,35 +458,30 @@ done:
* *
* Returns: #TRUE if an @item was successfully retrieved from the @queue. * Returns: #TRUE if an @item was successfully retrieved from the @queue.
*/ */
gboolean gboolean
gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item) gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
{ {
gboolean res = FALSE; g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
GST_DEBUG ("queue:%p", queue);
g_return_val_if_fail (item != NULL, FALSE); g_return_val_if_fail (item != NULL, FALSE);
GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
STATUS (queue, "before popping"); STATUS (queue, "before popping");
while (gst_data_queue_locked_is_empty (queue)) { if (gst_data_queue_locked_is_empty (queue)) {
GST_DATA_QUEUE_MUTEX_UNLOCK (queue); GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_EMPTY], 0); g_signal_emit (G_OBJECT (queue), gst_data_queue_signals[SIGNAL_EMPTY], 0);
GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, done); GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
while (gst_data_queue_locked_is_empty (queue)) { while (gst_data_queue_locked_is_empty (queue)) {
g_cond_wait (queue->item_add, queue->qlock); g_cond_wait (queue->item_add, queue->qlock);
if (queue->flushing) if (queue->flushing)
goto done; goto flushing;
} }
} }
/* Get the item from the GQueue */ /* Get the item from the GQueue */
*item = g_queue_pop_head (queue->queue); *item = g_queue_pop_head (queue->queue);
res = TRUE;
/* update current level counter */ /* update current level counter */
if ((*item)->visible) if ((*item)->visible)
@ -495,14 +489,20 @@ gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
queue->cur_level.bytes -= (*item)->size; queue->cur_level.bytes -= (*item)->size;
queue->cur_level.time -= (*item)->duration; queue->cur_level.time -= (*item)->duration;
STATUS (queue, "after popping");
g_cond_signal (queue->item_del); g_cond_signal (queue->item_del);
done:
GST_DATA_QUEUE_MUTEX_UNLOCK (queue); GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
GST_DEBUG ("queue:%p , res:%d", queue, res); return TRUE;
return res; /* ERRORS */
flushing:
{
GST_DEBUG ("queue:%p, we are flushing", queue);
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
return FALSE;
}
} }
/** /**
@ -514,7 +514,6 @@ done:
* *
* Returns: TRUE if an element was removed. * Returns: TRUE if an element was removed.
*/ */
gboolean gboolean
gst_data_queue_drop_head (GstDataQueue * queue, GType type) gst_data_queue_drop_head (GstDataQueue * queue, GType type)
{ {
@ -522,6 +521,8 @@ gst_data_queue_drop_head (GstDataQueue * queue, GType type)
GList *item; GList *item;
GstDataQueueItem *leak = NULL; GstDataQueueItem *leak = NULL;
g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
GST_DEBUG ("queue:%p", queue); GST_DEBUG ("queue:%p", queue);
GST_DATA_QUEUE_MUTEX_LOCK (queue); GST_DATA_QUEUE_MUTEX_LOCK (queue);
@ -556,6 +557,39 @@ done:
return res; return res;
} }
/**
* gst_data_queue_limits_changed:
* @queue: The #GstDataQueue
*
* Inform the queue that the limits for the fullness check have changed and that
* any blocking gst_data_queue_push() should be unblocked to recheck the limts.
*/
void
gst_data_queue_limits_changed (GstDataQueue * queue)
{
g_return_if_fail (GST_IS_DATA_QUEUE (queue));
GST_DATA_QUEUE_MUTEX_LOCK (queue);
GST_DEBUG ("signal del");
g_cond_signal (queue->item_del);
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
}
/**
* gst_data_queue_get_level:
* @queue: The #GstDataQueue
* @level: the location to store the result
*
* Get the current level of the queue.
*/
void
gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
{
level->visible = queue->cur_level.visible;
level->bytes = queue->cur_level.bytes;
level->time = queue->cur_level.time;
}
static void static void
gst_data_queue_set_property (GObject * object, gst_data_queue_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec) guint prop_id, const GValue * value, GParamSpec * pspec)

View file

@ -137,19 +137,23 @@ struct _GstDataQueueClass
GType gst_data_queue_get_type (void); GType gst_data_queue_get_type (void);
GstDataQueue *gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, GstDataQueue * gst_data_queue_new (GstDataQueueCheckFullFunction checkfull,
gpointer checkdata); gpointer checkdata);
gboolean gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item); gboolean gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item);
gboolean gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item); gboolean gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item);
void gst_data_queue_flush (GstDataQueue * queue); void gst_data_queue_flush (GstDataQueue * queue);
void gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing); void gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing);
gboolean gst_data_queue_drop_head (GstDataQueue * queue, GType type); gboolean gst_data_queue_drop_head (GstDataQueue * queue, GType type);
gboolean gst_data_queue_is_full (GstDataQueue * queue); gboolean gst_data_queue_is_full (GstDataQueue * queue);
gboolean gst_data_queue_is_empty (GstDataQueue * queue); gboolean gst_data_queue_is_empty (GstDataQueue * queue);
void gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize *level);
void gst_data_queue_limits_changed (GstDataQueue * queue);
G_END_DECLS G_END_DECLS
#endif /* __GST_DATA_QUEUE_H__ */ #endif /* __GST_DATA_QUEUE_H__ */

View file

@ -34,7 +34,6 @@
* Structure containing all information and properties about * Structure containing all information and properties about
* a single queue. * a single queue.
*/ */
typedef struct _GstSingleQueue GstSingleQueue; typedef struct _GstSingleQueue GstSingleQueue;
struct _GstSingleQueue struct _GstSingleQueue
@ -49,10 +48,14 @@ struct _GstSingleQueue
/* flowreturn of previous srcpad push */ /* flowreturn of previous srcpad push */
GstFlowReturn srcresult; GstFlowReturn srcresult;
GstSegment sink_segment;
GstSegment src_segment;
/* queue of data */ /* queue of data */
GstDataQueue *queue; GstDataQueue *queue;
GstDataQueueSize max_size, extra_size; GstDataQueueSize max_size, extra_size;
GstClockTime cur_time;
gboolean is_eos;
gboolean inextra; /* TRUE if the queue is currently in extradata mode */ gboolean inextra; /* TRUE if the queue is currently in extradata mode */
/* Protected by global lock */ /* Protected by global lock */
@ -101,13 +104,20 @@ GST_ELEMENT_DETAILS ("MultiQueue",
"Multiple data queue", "Multiple data queue",
"Edward Hervey <edward@fluendo.com>"); "Edward Hervey <edward@fluendo.com>");
/* default limits, we try to keep up to 2 seconds of data and if there is not
* time, up to 10 MB. The number of buffers is dynamically scaled to make sure
* there is data in the queues. Normally, the byte and time limits are not hit
* in theses conditions. */
#define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */ #define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
#define DEFAULT_MAX_SIZE_BUFFERS 200 #define DEFAULT_MAX_SIZE_BUFFERS 5
#define DEFAULT_MAX_SIZE_TIME GST_SECOND #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
/* second limits. When we hit one of the above limits we are probably dealing
* with a badly muxed file and we scale the limits to these emergency values.
* This is currently not yet implemented. */
#define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */ #define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
#define DEFAULT_EXTRA_SIZE_BUFFERS 200 #define DEFAULT_EXTRA_SIZE_BUFFERS 5
#define DEFAULT_EXTRA_SIZE_TIME GST_SECOND #define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
/* Signals and args */ /* Signals and args */
enum enum
@ -129,19 +139,10 @@ enum
}; };
#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
GST_CAT_LOG_OBJECT (multi_queue_debug, q, \
"locking qlock from thread %p", \
g_thread_self ()); \
g_mutex_lock (q->qlock); \ g_mutex_lock (q->qlock); \
GST_CAT_LOG_OBJECT (multi_queue_debug, q, \
"locked qlock from thread %p", \
g_thread_self ()); \
} G_STMT_END } G_STMT_END
#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
GST_CAT_LOG_OBJECT (multi_queue_debug, q, \
"unlocking qlock from thread %p", \
g_thread_self ()); \
g_mutex_unlock (q->qlock); \ g_mutex_unlock (q->qlock); \
} G_STMT_END } G_STMT_END
@ -155,6 +156,8 @@ static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
GstPadTemplate * temp, const gchar * name); GstPadTemplate * temp, const gchar * name);
static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad); static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
static void gst_multi_queue_loop (GstPad * pad);
#define _do_init(bla) \ #define _do_init(bla) \
GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element"); GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
@ -236,7 +239,6 @@ gst_multi_queue_class_init (GstMultiQueueClass * klass)
static void static void
gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass) gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
{ {
mqueue->nbqueues = 0; mqueue->nbqueues = 0;
mqueue->queues = NULL; mqueue->queues = NULL;
@ -253,8 +255,6 @@ gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
mqueue->nextnotlinked = -1; mqueue->nextnotlinked = -1;
mqueue->qlock = g_mutex_new (); mqueue->qlock = g_mutex_new ();
/* FILLME ? */
} }
static void static void
@ -269,8 +269,7 @@ gst_multi_queue_finalize (GObject * object)
/* free/unref instance data */ /* free/unref instance data */
g_mutex_free (mqueue->qlock); g_mutex_free (mqueue->qlock);
if (G_OBJECT_CLASS (parent_class)->finalize) G_OBJECT_CLASS (parent_class)->finalize (object);
G_OBJECT_CLASS (parent_class)->finalize (object);
} }
#define SET_CHILD_PROPERTY(mq,name,value) G_STMT_START { \ #define SET_CHILD_PROPERTY(mq,name,value) G_STMT_START { \
@ -314,7 +313,6 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
} }
} }
static void static void
@ -423,59 +421,202 @@ gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
} }
static gboolean static gboolean
gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
{
gboolean result;
GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
sq->id);
if (flush) {
sq->srcresult = GST_FLOW_WRONG_STATE;
gst_data_queue_set_flushing (sq->queue, TRUE);
/* wake up non-linked task */
GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
sq->id);
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
g_cond_signal (sq->turn);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
result = gst_pad_pause_task (sq->srcpad);
} else {
gst_data_queue_flush (sq->queue);
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
sq->srcresult = GST_FLOW_OK;
sq->max_size.visible = mq->max_size.visible;
sq->is_eos = FALSE;
sq->inextra = FALSE;
sq->nextid = -1;
sq->oldid = -1;
gst_data_queue_set_flushing (sq->queue, FALSE);
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
result =
gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
sq->srcpad);
}
return result;
}
/* calculate the diff between running time on the sink and src of the queue.
* This is the total amount of time in the queue. */
static void
update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
{
gint64 sink_time, src_time;
sink_time =
gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
sq->sink_segment.last_stop);
src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
sq->src_segment.last_stop);
GST_DEBUG_OBJECT (mq,
"queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
if (sink_time >= src_time)
sq->cur_time = sink_time - src_time;
else
sq->cur_time = 0;
}
/* take a NEWSEGMENT event and apply the values to segment, updating the time
* level of queue. */
static void
apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
GstSegment * segment)
{
gboolean update;
GstFormat format;
gdouble rate, arate;
gint64 start, stop, time;
gst_event_parse_new_segment_full (event, &update, &rate, &arate,
&format, &start, &stop, &time);
/* now configure the values, we use these to track timestamps on the
* sinkpad. */
if (format != GST_FORMAT_TIME) {
/* non-time format, pretent the current time segment is closed with a
* 0 start and unknown stop time. */
update = FALSE;
format = GST_FORMAT_TIME;
start = 0;
stop = -1;
time = 0;
}
gst_segment_set_newsegment_full (segment, update,
rate, arate, format, start, stop, time);
GST_DEBUG_OBJECT (mq,
"queue %d, configured NEWSEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
/* segment can update the time level of the queue */
update_time_level (mq, sq);
}
/* take a buffer and update segment, updating the time level of the queue. */
static void
apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstBuffer * buffer,
GstSegment * segment)
{
GstClockTime duration, timestamp;
timestamp = GST_BUFFER_TIMESTAMP (buffer);
duration = GST_BUFFER_DURATION (buffer);
/* if no timestamp is set, assume it's continuous with the previous
* time */
if (timestamp == GST_CLOCK_TIME_NONE)
timestamp = segment->last_stop;
/* add duration */
if (duration != GST_CLOCK_TIME_NONE)
timestamp += duration;
GST_DEBUG_OBJECT (mq, "queue %d, last_stop updated to %" GST_TIME_FORMAT,
sq->id, GST_TIME_ARGS (timestamp));
gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
/* calc diff with other end */
update_time_level (mq, sq);
}
static GstFlowReturn
gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq, gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
GstMiniObject * object) GstMiniObject * object)
{ {
GstFlowReturn result = GST_FLOW_OK;
if (GST_IS_BUFFER (object)) { if (GST_IS_BUFFER (object)) {
GstBuffer *buf; GstBuffer *buffer;
buf = gst_buffer_ref (GST_BUFFER_CAST (object)); buffer = GST_BUFFER_CAST (object);
sq->srcresult = gst_pad_push (sq->srcpad, buf);
if ((sq->srcresult != GST_FLOW_OK) apply_buffer (mq, sq, buffer, &sq->src_segment);
&& (sq->srcresult != GST_FLOW_NOT_LINKED)) {
GST_DEBUG_OBJECT (mq, "GstSingleQueue %d : pausing queue, reason %s", result = gst_pad_push (sq->srcpad, buffer);
sq->id, gst_flow_get_name (sq->srcresult));
gst_data_queue_set_flushing (sq->queue, TRUE);
gst_pad_pause_task (sq->srcpad);
}
} else if (GST_IS_EVENT (object)) { } else if (GST_IS_EVENT (object)) {
GstEvent *event; GstEvent *event;
event = gst_event_ref (GST_EVENT_CAST (object)); event = GST_EVENT_CAST (object);
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
sq->srcresult = GST_FLOW_UNEXPECTED;
GST_DEBUG_OBJECT (mq, "GstSingleQueue %d : pausing queue, got EOS", switch (GST_EVENT_TYPE (event)) {
sq->id); case GST_EVENT_EOS:
gst_data_queue_set_flushing (sq->queue, TRUE); result = GST_FLOW_UNEXPECTED;
gst_pad_pause_task (sq->srcpad); break;
case GST_EVENT_NEWSEGMENT:
apply_segment (mq, sq, event, &sq->src_segment);
break;
default:
break;
} }
gst_pad_push_event (sq->srcpad, event); gst_pad_push_event (sq->srcpad, event);
} else { } else {
g_warning ("Unexpected object in singlequeue %d (refcounting problem?)", g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
sq->id); sq->id);
} }
return result;
return FALSE; /* ERRORS */
}
static GstMiniObject *
gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
{
GstMiniObject *res;
res = item->object;
item->object = NULL;
return res;
} }
static void static void
gst_multi_queue_item_destroy (GstMultiQueueItem * item) gst_multi_queue_item_destroy (GstMultiQueueItem * item)
{ {
gst_mini_object_unref (item->object); if (item->object)
g_free (item); gst_mini_object_unref (item->object);
g_slice_free (GstMultiQueueItem, item);
} }
/* takes ownership of passed mini object! */ /* takes ownership of passed mini object! */
static GstMultiQueueItem * static GstMultiQueueItem *
gst_multi_queue_item_new (GstMiniObject * object) gst_multi_queue_item_new (GstMiniObject * object, guint32 curid)
{ {
GstMultiQueueItem *item; GstMultiQueueItem *item;
item = g_new0 (GstMultiQueueItem, 1); item = g_slice_new (GstMultiQueueItem);
item->object = object; item->object = object;
item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy; item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
item->posid = curid;
if (GST_IS_BUFFER (object)) { if (GST_IS_BUFFER (object)) {
item->size = GST_BUFFER_SIZE (object); item->size = GST_BUFFER_SIZE (object);
@ -483,8 +624,11 @@ gst_multi_queue_item_new (GstMiniObject * object)
if (item->duration == GST_CLOCK_TIME_NONE) if (item->duration == GST_CLOCK_TIME_NONE)
item->duration = 0; item->duration = 0;
item->visible = TRUE; item->visible = TRUE;
} else {
item->size = 0;
item->duration = 0;
item->visible = FALSE;
} }
return item; return item;
} }
@ -498,101 +642,95 @@ gst_multi_queue_loop (GstPad * pad)
GstMiniObject *object; GstMiniObject *object;
guint32 newid; guint32 newid;
guint32 oldid = -1; guint32 oldid = -1;
GstFlowReturn result;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad); sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
mq = sq->mqueue; mq = sq->mqueue;
restart: restart:
GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id); GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
if (!(gst_data_queue_pop (sq->queue, &sitem))) {
/* QUEUE FLUSHING */
if (sq->srcresult != GST_FLOW_OK)
goto out_flushing;
else
GST_WARNING_OBJECT (mq,
"data_queue_pop() returned FALSE, but srcresult == GST_FLOW_OK !");
} else {
item = (GstMultiQueueItem *) sitem;
newid = item->posid;
object = item->object;
GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d", if (!(gst_data_queue_pop (sq->queue, &sitem)))
sq->id, newid, oldid); goto out_flushing;
/* 1. Only check turn if : item = (GstMultiQueueItem *) sitem;
* _ We haven't pushed anything yet newid = item->posid;
* _ OR the new id isn't the follower of the previous one (continuous segment) */ /* steal the object and destroy the item */
if ((oldid == -1) || (newid != (oldid + 1))) { object = gst_multi_queue_item_steal_object (item);
GST_MULTI_QUEUE_MUTEX_LOCK (mq); gst_multi_queue_item_destroy (item);
GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s", GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
gst_flow_get_name (sq->srcresult)); sq->id, newid, oldid);
/* preamble : if we're not linked, set the newid as the next one we want */ /* 1. Only check turn if :
if (sq->srcresult == GST_FLOW_NOT_LINKED) * _ We haven't pushed anything yet
sq->nextid = newid; * _ OR the new id isn't the follower of the previous one (continuous segment) */
if ((oldid == -1) || (newid != (oldid + 1))) {
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
/* store the last id we outputted */ GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
if (oldid != -1) gst_flow_get_name (sq->srcresult));
sq->oldid = oldid;
/* 2. If there's a queue waiting to push, wake it up. If it's us the */ /* preamble : if we're not linked, set the newid as the next one we want */
/* check below (3.) will avoid us waiting. */ if (sq->srcresult == GST_FLOW_NOT_LINKED)
wake_up_next_non_linked (mq); sq->nextid = newid;
/* 3. If we're not linked AND our nextid is higher than the highest oldid outputted /* store the last id we outputted */
* _ Update global next-not-linked if (oldid != -1)
* _ Wait on our conditional sq->oldid = oldid;
*/
while ((sq->srcresult == GST_FLOW_NOT_LINKED)
&& (mq->nextnotlinked != sq->id)) {
compute_next_non_linked (mq);
g_cond_wait (sq->turn, mq->qlock);
}
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); /* 2. If there's a queue waiting to push, wake it up. If it's us the */
/* check below (3.) will avoid us waiting. */
wake_up_next_non_linked (mq);
/* 4. Check again status, maybe we're flushing */ /* 3. If we're not linked AND our nextid is higher than the highest oldid outputted
if ((sq->srcresult != GST_FLOW_OK) * _ Update global next-not-linked
&& (sq->srcresult != GST_FLOW_NOT_LINKED)) { * _ Wait on our conditional
gst_multi_queue_item_destroy (item); */
goto out_flushing; while ((sq->srcresult == GST_FLOW_NOT_LINKED)
} && (mq->nextnotlinked != sq->id)) {
compute_next_non_linked (mq);
g_cond_wait (sq->turn, mq->qlock);
} }
/* 4. Check again status, maybe we're flushing */
GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s", if ((sq->srcresult != GST_FLOW_OK)) {
gst_flow_get_name (sq->srcresult)); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
gst_mini_object_unref (object);
/* 4. Try to push out the new object */ goto out_flushing;
gst_single_queue_push_one (mq, sq, object); }
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
gst_multi_queue_item_destroy (item);
oldid = newid;
/* 5. if GstFlowReturn is non-fatal, goto restart */
if ((sq->srcresult == GST_FLOW_OK)
|| (sq->srcresult == GST_FLOW_NOT_LINKED))
goto restart;
} }
GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
beach: /* 4. Try to push out the new object */
return; result = gst_single_queue_push_one (mq, sq, object);
sq->srcresult = result;
if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED)
goto out_flushing;
GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
oldid = newid;
/* restart to get the next element */
goto restart;
/* ERRORS */
out_flushing: out_flushing:
{ {
gst_data_queue_set_flushing (sq->queue, TRUE);
gst_pad_pause_task (sq->srcpad); gst_pad_pause_task (sq->srcpad);
GST_CAT_LOG_OBJECT (multi_queue_debug, mq, GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
"SingleQueue[%d] task paused, reason:%s", "SingleQueue[%d] task paused, reason:%s",
sq->id, gst_flow_get_name (sq->srcresult)); sq->id, gst_flow_get_name (sq->srcresult));
goto beach; return;
} }
} }
/** /**
* gst_multi_queue_chain: * gst_multi_queue_chain:
* *
@ -600,7 +738,6 @@ out_flushing:
* _ we don't have leak behavioures, * _ we don't have leak behavioures,
* _ we push with a unique id (curid) * _ we push with a unique id (curid)
*/ */
static GstFlowReturn static GstFlowReturn
gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer) gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
{ {
@ -613,7 +750,7 @@ gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
sq = gst_pad_get_element_private (pad); sq = gst_pad_get_element_private (pad);
mq = (GstMultiQueue *) gst_pad_get_parent (pad); mq = (GstMultiQueue *) gst_pad_get_parent (pad);
/* Get an unique incrementing id */ /* Get a unique incrementing id */
GST_MULTI_QUEUE_MUTEX_LOCK (mq); GST_MULTI_QUEUE_MUTEX_LOCK (mq);
curid = mq->counter++; curid = mq->counter++;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
@ -621,18 +758,28 @@ gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
GST_LOG_OBJECT (mq, "SingleQueue %d : about to push buffer with id %d", GST_LOG_OBJECT (mq, "SingleQueue %d : about to push buffer with id %d",
sq->id, curid); sq->id, curid);
item = gst_multi_queue_item_new ((GstMiniObject *) buffer); item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
item->posid = curid;
if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) { /* update time level */
GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", apply_buffer (mq, sq, buffer, &sq->sink_segment);
sq->id, gst_flow_get_name (sq->srcresult));
gst_multi_queue_item_destroy (item);
ret = sq->srcresult;
}
if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
goto flushing;
done:
gst_object_unref (mq); gst_object_unref (mq);
return ret; return ret;
/* ERRORS */
flushing:
{
ret = sq->srcresult;
GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
sq->id, gst_flow_get_name (ret));
gst_multi_queue_item_destroy (item);
goto done;
}
} }
static gboolean static gboolean
@ -642,13 +789,12 @@ gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active)
sq = (GstSingleQueue *) gst_pad_get_element_private (pad); sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
if (active) if (active) {
sq->srcresult = GST_FLOW_OK; sq->srcresult = GST_FLOW_OK;
else { } else {
sq->srcresult = GST_FLOW_WRONG_STATE; sq->srcresult = GST_FLOW_WRONG_STATE;
gst_data_queue_flush (sq->queue); gst_data_queue_flush (sq->queue);
} }
return TRUE; return TRUE;
} }
@ -659,50 +805,40 @@ gst_multi_queue_sink_event (GstPad * pad, GstEvent * event)
GstMultiQueue *mq; GstMultiQueue *mq;
guint32 curid; guint32 curid;
GstMultiQueueItem *item; GstMultiQueueItem *item;
gboolean res;
GstEventType type;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad); sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
mq = (GstMultiQueue *) gst_pad_get_parent (pad); mq = (GstMultiQueue *) gst_pad_get_parent (pad);
switch (GST_EVENT_TYPE (event)) { type = GST_EVENT_TYPE (event);
switch (type) {
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event", GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
sq->id); sq->id);
gst_pad_push_event (sq->srcpad, event); res = gst_pad_push_event (sq->srcpad, event);
sq->srcresult = GST_FLOW_WRONG_STATE; gst_single_queue_flush (mq, sq, TRUE);
gst_data_queue_set_flushing (sq->queue, TRUE);
/* wake up non-linked task */
GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
sq->id);
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
g_cond_signal (sq->turn);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
gst_pad_pause_task (sq->srcpad);
goto done; goto done;
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event", GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
sq->id); sq->id);
gst_pad_push_event (sq->srcpad, event); res = gst_pad_push_event (sq->srcpad, event);
gst_data_queue_flush (sq->queue); gst_single_queue_flush (mq, sq, FALSE);
gst_data_queue_set_flushing (sq->queue, FALSE);
sq->srcresult = GST_FLOW_OK;
sq->nextid = -1;
sq->oldid = -1;
GST_DEBUG_OBJECT (mq, "SingleQueue %d : restarting task", sq->id);
gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
sq->srcpad);
goto done; goto done;
case GST_EVENT_NEWSEGMENT:
apply_segment (mq, sq, event, &sq->sink_segment);
break;
default: default:
if (!(GST_EVENT_IS_SERIALIZED (event))) { if (!(GST_EVENT_IS_SERIALIZED (event))) {
gst_pad_push_event (sq->srcpad, event); res = gst_pad_push_event (sq->srcpad, event);
goto done; goto done;
} }
break; break;
@ -713,22 +849,33 @@ gst_multi_queue_sink_event (GstPad * pad, GstEvent * event)
curid = mq->counter++; curid = mq->counter++;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
item = gst_multi_queue_item_new ((GstMiniObject *) event); item = gst_multi_queue_item_new ((GstMiniObject *) event, curid);
item->posid = curid;
GST_DEBUG_OBJECT (mq, GST_DEBUG_OBJECT (mq,
"SingleQueue %d : Adding event %p of type %s with id %d", sq->id, event, "SingleQueue %d : Adding event %p of type %s with id %d", sq->id, event,
GST_EVENT_TYPE_NAME (event), curid); GST_EVENT_TYPE_NAME (event), curid);
if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item))) { if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s", goto flushing;
sq->id, gst_flow_get_name (sq->srcresult));
gst_multi_queue_item_destroy (item); /* mark EOS when we received one, we must do that after putting the
} * buffer in the queue because EOS marks the buffer as filled. No need to take
* a lock, the _check_full happens from this thread only, right before pushing
* into dataqueue. */
if (type == GST_EVENT_EOS)
sq->is_eos = TRUE;
done: done:
gst_object_unref (mq); gst_object_unref (mq);
return TRUE; return res;
flushing:
{
GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
sq->id, gst_flow_get_name (sq->srcresult));
gst_multi_queue_item_destroy (item);
goto done;
}
} }
static GstCaps * static GstCaps *
@ -738,8 +885,6 @@ gst_multi_queue_getcaps (GstPad * pad)
GstPad *otherpad; GstPad *otherpad;
GstCaps *result; GstCaps *result;
GST_LOG_OBJECT (pad, "...");
otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad; otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad;
GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad"); GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad");
@ -773,27 +918,12 @@ gst_multi_queue_src_activate_push (GstPad * pad, gboolean active)
GST_LOG ("SingleQueue %d", sq->id); GST_LOG ("SingleQueue %d", sq->id);
if (active) { if (active) {
sq->srcresult = GST_FLOW_OK; result = gst_single_queue_flush (mq, sq, FALSE);
gst_data_queue_set_flushing (sq->queue, FALSE);
result = gst_pad_start_task (pad, (GstTaskFunction) gst_multi_queue_loop,
pad);
} else { } else {
/* 1. unblock loop function */ result = gst_single_queue_flush (mq, sq, TRUE);
sq->srcresult = GST_FLOW_WRONG_STATE; /* make sure streaming finishes */
gst_data_queue_set_flushing (sq->queue, TRUE); result |= gst_pad_stop_task (pad);
/* 2. unblock potentially non-linked pad */
GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
sq->id);
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
g_cond_signal (sq->turn);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* 3. make sure streaming finishes */
result = gst_pad_stop_task (pad);
gst_data_queue_set_flushing (sq->queue, FALSE);
} }
return result; return result;
} }
@ -818,8 +948,7 @@ gst_multi_queue_src_query (GstPad * pad, GstQuery * query)
GstPad *peerpad; GstPad *peerpad;
gboolean res; gboolean res;
/* FILLME */ /* FIXME, Handle position offset depending on queue size */
/* Handle position offset depending on queue size */
/* default handling */ /* default handling */
if (!(peerpad = gst_pad_get_peer (sq->sinkpad))) if (!(peerpad = gst_pad_get_peer (sq->sinkpad)))
@ -831,6 +960,7 @@ gst_multi_queue_src_query (GstPad * pad, GstQuery * query)
return res; return res;
/* ERRORS */
no_peer: no_peer:
{ {
GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer"); GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer");
@ -838,7 +968,6 @@ no_peer:
} }
} }
/* /*
* Next-non-linked functions * Next-non-linked functions
*/ */
@ -903,32 +1032,34 @@ compute_next_non_linked (GstMultiQueue * mq)
/* /*
* GstSingleQueue functions * GstSingleQueue functions
*/ */
static void static void
single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq) single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
{ {
GstMultiQueue *mq = sq->mqueue; GstMultiQueue *mq = sq->mqueue;
GList *tmp; GList *tmp;
GstDataQueueSize size;
gst_data_queue_get_level (sq->queue, &size);
GST_LOG_OBJECT (sq->mqueue, "Single Queue %d is full", sq->id); GST_LOG_OBJECT (sq->mqueue, "Single Queue %d is full", sq->id);
if (!sq->inextra) { GST_MULTI_QUEUE_MUTEX_LOCK (mq);
/* Check if at least one other queue is empty... */ for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
GST_MULTI_QUEUE_MUTEX_LOCK (mq); GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
if (gst_data_queue_is_empty (ssq->queue)) { if (gst_data_queue_is_empty (ssq->queue)) {
/* ... if so set sq->inextra to TRUE and don't emit overrun signal */ if (size.visible == sq->max_size.visible) {
sq->max_size.visible++;
GST_DEBUG_OBJECT (mq, GST_DEBUG_OBJECT (mq,
"Another queue is empty, bumping single queue into extra data mode"); "Another queue is empty, bumping single queue %d max visible to %d",
sq->inextra = TRUE; sq->id, sq->max_size.visible);
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); g_print ("overrun: queue %d, limit %d\n", sq->id, sq->max_size.visible);
goto beach;
} }
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
goto beach;
} }
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
} }
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
/* Overrun is always forwarded, since this is blocking the upstream element */ /* Overrun is always forwarded, since this is blocking the upstream element */
g_signal_emit (G_OBJECT (sq->mqueue), gst_multi_queue_signals[SIGNAL_OVERRUN], g_signal_emit (G_OBJECT (sq->mqueue), gst_multi_queue_signals[SIGNAL_OVERRUN],
@ -946,17 +1077,28 @@ single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
GList *tmp; GList *tmp;
GST_LOG_OBJECT (mq, GST_LOG_OBJECT (mq,
"Single Queue %d is empty, Checking if all single queues are empty", "Single Queue %d is empty, Checking other single queues", sq->id);
sq->id);
GST_MULTI_QUEUE_MUTEX_LOCK (mq); GST_MULTI_QUEUE_MUTEX_LOCK (mq);
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) { for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
GstSingleQueue *sq = (GstSingleQueue *) tmp->data; GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
if (!gst_data_queue_is_empty (sq->queue)) { if (gst_data_queue_is_full (sq->queue)) {
empty = FALSE; GstDataQueueSize size;
break;
gst_data_queue_get_level (sq->queue, &size);
if (size.visible == sq->max_size.visible) {
sq->max_size.visible++;
GST_DEBUG_OBJECT (mq,
"queue %d is filled, bumping its max visible to %d", sq->id,
sq->max_size.visible);
g_print ("underrun: queue %d, limit %d\n", sq->id,
sq->max_size.visible);
gst_data_queue_limits_changed (sq->queue);
}
} }
if (!gst_data_queue_is_empty (sq->queue))
empty = FALSE;
} }
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
@ -972,31 +1114,27 @@ single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
{ {
gboolean res; gboolean res;
/* In all cases (extra mode or not), we check how the queue current level GST_DEBUG ("queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT
* compares to max_size. */ "/%" G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
res = (((sq->max_size.visible != 0) && sq->max_size.bytes, sq->cur_time, sq->max_size.time);
sq->max_size.visible < visible) ||
((sq->max_size.bytes != 0) &&
sq->max_size.bytes < bytes) ||
((sq->max_size.time != 0) && sq->max_size.time < time));
if (G_UNLIKELY (sq->inextra)) { /* we are always filled on EOS */
/* If we're in extra mode, one of two things can happen to check for if (sq->is_eos)
* fullness: */ return TRUE;
if (!res) #define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
/* #1 : Either we are not full against normal max_size levels, in which (sq->max_size.format) <= (value))
* case we can go out of extra mode. */
sq->inextra = FALSE;
else
/* #2 : Or else, the check should be done against max_size + extra_size */
res = (((sq->max_size.visible != 0) &&
(sq->max_size.visible + sq->extra_size.visible) < visible) ||
((sq->max_size.bytes != 0) &&
(sq->max_size.bytes + sq->extra_size.bytes) < bytes) ||
((sq->max_size.time != 0) &&
(sq->max_size.time + sq->extra_size.time) < time));
/* we never go past the max visible items */
if (IS_FILLED (visible, visible))
return TRUE;
if (sq->cur_time != 0) {
/* if we have valid time in the queue, check */
res = IS_FILLED (time, sq->cur_time);
} else {
/* no valid time, check bytes */
res = IS_FILLED (bytes, bytes);
} }
return res; return res;
} }
@ -1024,8 +1162,6 @@ gst_single_queue_new (GstMultiQueue * mqueue)
/* copy over max_size and extra_size so we don't need to take the lock /* copy over max_size and extra_size so we don't need to take the lock
* any longer when checking if the queue is full. */ * any longer when checking if the queue is full. */
/* FIXME : We can't modify those values once the single queue is created
* since we don't have any lock protecting those values. */
sq->max_size.visible = mqueue->max_size.visible; sq->max_size.visible = mqueue->max_size.visible;
sq->max_size.bytes = mqueue->max_size.bytes; sq->max_size.bytes = mqueue->max_size.bytes;
sq->max_size.time = mqueue->max_size.time; sq->max_size.time = mqueue->max_size.time;
@ -1039,20 +1175,20 @@ gst_single_queue_new (GstMultiQueue * mqueue)
GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id); GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
sq->mqueue = mqueue; sq->mqueue = mqueue;
sq->srcresult = GST_FLOW_OK; sq->srcresult = GST_FLOW_WRONG_STATE;
sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction) sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
single_queue_check_full, sq); single_queue_check_full, sq);
sq->is_eos = FALSE;
gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
sq->nextid = -1; sq->nextid = -1;
sq->oldid = -1; sq->oldid = -1;
sq->turn = g_cond_new (); sq->turn = g_cond_new ();
/* FIXME : attach to underrun/overrun signals to handle non-starvation /* attach to underrun/overrun signals to handle non-starvation */
* OR should this be handled when we check if the queue is full/empty before pushing/popping ? */
g_signal_connect (G_OBJECT (sq->queue), "full", g_signal_connect (G_OBJECT (sq->queue), "full",
G_CALLBACK (single_queue_overrun_cb), sq); G_CALLBACK (single_queue_overrun_cb), sq);
g_signal_connect (G_OBJECT (sq->queue), "empty", g_signal_connect (G_OBJECT (sq->queue), "empty",
G_CALLBACK (single_queue_underrun_cb), sq); G_CALLBACK (single_queue_underrun_cb), sq);

View file

@ -195,7 +195,7 @@ static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer);
static GstFlowReturn gst_queue_bufferalloc (GstPad * pad, guint64 offset, static GstFlowReturn gst_queue_bufferalloc (GstPad * pad, guint64 offset,
guint size, GstCaps * caps, GstBuffer ** buf); guint size, GstCaps * caps, GstBuffer ** buf);
static gboolean gst_queue_acceptcaps (GstPad * pad, GstCaps * caps); static gboolean gst_queue_acceptcaps (GstPad * pad, GstCaps * caps);
static gboolean gst_queue_push_one (GstQueue * queue); static GstFlowReturn gst_queue_push_one (GstQueue * queue);
static void gst_queue_loop (GstPad * pad); static void gst_queue_loop (GstPad * pad);
static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event); static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event);