multiqueue: Add mode to synchronize deactivated/not-linked streams by the running time

Fixes bug #645107, #600648.
This commit is contained in:
Sebastian Dröge 2011-03-22 13:19:47 +01:00
parent 4a836cae9f
commit 9f83109706
2 changed files with 196 additions and 9 deletions

View file

@ -156,6 +156,8 @@ struct _GstSingleQueue
guint32 nextid; /* ID of the next object waiting to be pushed */
guint32 oldid; /* ID of the last object pushed (last in a series) */
guint32 last_oldid; /* Previously observed old_id, reset to MAXUINT32 on flush */
GstClockTime next_time; /* End running time of next buffer to be pushed */
GstClockTime last_time; /* Start running time of last pushed buffer */
GCond *turn; /* SingleQueue turn waiting conditional */
};
@ -179,6 +181,7 @@ static void gst_single_queue_free (GstSingleQueue * squeue);
static void wake_up_next_non_linked (GstMultiQueue * mq);
static void compute_high_id (GstMultiQueue * mq);
static void compute_high_time (GstMultiQueue * mq);
static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
@ -224,6 +227,7 @@ enum
#define DEFAULT_USE_BUFFERING FALSE
#define DEFAULT_LOW_PERCENT 10
#define DEFAULT_HIGH_PERCENT 99
#define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
enum
{
@ -237,6 +241,7 @@ enum
PROP_USE_BUFFERING,
PROP_LOW_PERCENT,
PROP_HIGH_PERCENT,
PROP_SYNC_BY_RUNNING_TIME,
PROP_LAST
};
@ -396,6 +401,22 @@ gst_multi_queue_class_init (GstMultiQueueClass * klass)
"High threshold for buffering to finish", 0, 100,
DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstMultiQueue:sync-by-running-time
*
* If enabled multiqueue will synchronize deactivated or not-linked streams
* to the activated and linked streams by taking the running time.
* Otherwise multiqueue will synchronize the deactivated or not-linked
* streams by keeping the order in which buffers and events arrived compared
* to active and linked streams.
*
* Since: 0.10.33
*/
g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME,
g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time",
"Synchronize deactivated or not-linked streams by running time",
DEFAULT_SYNC_BY_RUNNING_TIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gobject_class->finalize = gst_multi_queue_finalize;
@ -425,8 +446,11 @@ gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
mqueue->low_percent = DEFAULT_LOW_PERCENT;
mqueue->high_percent = DEFAULT_HIGH_PERCENT;
mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;
mqueue->counter = 1;
mqueue->highid = -1;
mqueue->high_time = GST_CLOCK_TIME_NONE;
mqueue->qlock = g_mutex_new ();
}
@ -499,6 +523,9 @@ gst_multi_queue_set_property (GObject * object, guint prop_id,
case PROP_HIGH_PERCENT:
mq->high_percent = g_value_get_int (value);
break;
case PROP_SYNC_BY_RUNNING_TIME:
mq->sync_by_running_time = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -541,6 +568,9 @@ gst_multi_queue_get_property (GObject * object, guint prop_id,
case PROP_HIGH_PERCENT:
g_value_set_int (value, mq->high_percent);
break;
case PROP_SYNC_BY_RUNNING_TIME:
g_value_set_boolean (value, mq->sync_by_running_time);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -740,8 +770,15 @@ gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
sq->nextid = 0;
sq->oldid = 0;
sq->last_oldid = G_MAXUINT32;
sq->next_time = GST_CLOCK_TIME_NONE;
sq->last_time = GST_CLOCK_TIME_NONE;
gst_data_queue_set_flushing (sq->queue, FALSE);
/* Reset high time to be recomputed next */
GST_MULTI_QUEUE_MUTEX_LOCK (mq);
mq->high_time = GST_CLOCK_TIME_NONE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
sq->flushing = FALSE;
GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
@ -946,6 +983,71 @@ apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
static GstClockTime
get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
{
GstClockTime time = GST_CLOCK_TIME_NONE;
if (GST_IS_BUFFER (object)) {
GstBuffer *buf = GST_BUFFER_CAST (object);
if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
time = GST_BUFFER_TIMESTAMP (buf);
if (end && GST_BUFFER_DURATION_IS_VALID (buf))
time += GST_BUFFER_DURATION (buf);
if (time > segment->stop)
time = segment->stop;
time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
}
} else if (GST_IS_BUFFER_LIST (object)) {
GstBufferList *list = GST_BUFFER_LIST_CAST (object);
GstBufferListIterator *it = gst_buffer_list_iterate (list);
GstBuffer *buf;
do {
while ((buf = gst_buffer_list_iterator_next (it))) {
if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
time = GST_BUFFER_TIMESTAMP (buf);
if (end && GST_BUFFER_DURATION_IS_VALID (buf))
time += GST_BUFFER_DURATION (buf);
if (time > segment->stop)
time = segment->stop;
time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
if (!end)
goto done;
} else if (!end) {
goto done;
}
}
} while (gst_buffer_list_iterator_next_group (it));
} else if (GST_IS_EVENT (object)) {
GstEvent *event = GST_EVENT_CAST (object);
/* For newsegment events return the running time of the start position */
if (GST_EVENT_TYPE (event) == GST_EVENT_NEWSEGMENT) {
GstSegment new_segment = *segment;
gboolean update;
gdouble rate, applied_rate;
GstFormat format;
gint64 start, stop, position;
gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
&format, &start, &stop, &position);
if (format == GST_FORMAT_TIME) {
gst_segment_set_newsegment_full (&new_segment, update, rate,
applied_rate, format, start, stop, position);
time =
gst_segment_to_running_time (&new_segment, GST_FORMAT_TIME,
new_segment.start);
}
}
}
done:
return time;
}
static GstFlowReturn
gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
GstMiniObject * object)
@ -1078,6 +1180,7 @@ gst_multi_queue_loop (GstPad * pad)
GstMiniObject *object = NULL;
guint32 newid;
GstFlowReturn result;
GstClockTime next_time;
sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
mq = sq->mqueue;
@ -1099,6 +1202,9 @@ gst_multi_queue_loop (GstPad * pad)
object = gst_multi_queue_item_steal_object (item);
gst_multi_queue_item_destroy (item);
/* Get running time of the item. Events will have GST_CLOCK_TIME_NONE */
next_time = get_running_time (&sq->src_segment, object, TRUE);
GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
sq->id, newid, sq->last_oldid);
@ -1107,9 +1213,9 @@ gst_multi_queue_loop (GstPad * pad)
* or it's the first loop, or we just passed the previous highid,
* we might need to wake some sleeping pad up, so there's extra work
* there too */
if (sq->srcresult == GST_FLOW_NOT_LINKED ||
(sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1)) ||
sq->last_oldid > mq->highid) {
if (sq->srcresult == GST_FLOW_NOT_LINKED
|| (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1))
|| sq->last_oldid > mq->highid) {
GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
@ -1124,6 +1230,7 @@ gst_multi_queue_loop (GstPad * pad)
/* Update the nextid so other threads know when to wake us up */
sq->nextid = newid;
sq->next_time = next_time;
/* Update the oldid (the last ID we output) for highid tracking */
if (sq->last_oldid != G_MAXUINT32)
@ -1134,10 +1241,20 @@ gst_multi_queue_loop (GstPad * pad)
/* Recompute the highid */
compute_high_id (mq);
while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
"newid %u and highid %u", sq->id, newid, mq->highid);
/* Recompute the high time */
compute_high_time (mq);
while (((mq->sync_by_running_time && next_time != GST_CLOCK_TIME_NONE &&
(mq->high_time == GST_CLOCK_TIME_NONE
|| next_time >= mq->high_time))
|| (!mq->sync_by_running_time && newid > mq->highid))
&& sq->srcresult == GST_FLOW_NOT_LINKED) {
GST_DEBUG_OBJECT (mq,
"queue %d sleeping for not-linked wakeup with "
"newid %u, highid %u, next_time %" GST_TIME_FORMAT
", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
/* Wake up all non-linked pads before we sleep */
wake_up_next_non_linked (mq);
@ -1151,8 +1268,13 @@ gst_multi_queue_loop (GstPad * pad)
goto out_flushing;
}
/* Recompute the high time */
compute_high_time (mq);
GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
"wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
"wakeup with newid %u, highid %u, next_time %" GST_TIME_FORMAT
", high_time %" GST_TIME_FORMAT, sq->id, newid, mq->highid,
GST_TIME_ARGS (next_time), GST_TIME_ARGS (mq->high_time));
}
/* Re-compute the high_id in case someone else pushed */
@ -1162,8 +1284,9 @@ gst_multi_queue_loop (GstPad * pad)
/* Wake up all non-linked pads */
wake_up_next_non_linked (mq);
}
/* We're done waiting, we can clear the nextid */
/* We're done waiting, we can clear the nextid and nexttime */
sq->nextid = 0;
sq->next_time = GST_CLOCK_TIME_NONE;
GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}
@ -1174,6 +1297,18 @@ gst_multi_queue_loop (GstPad * pad)
GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
gst_flow_get_name (sq->srcresult));
/* Update time stats */
next_time = get_running_time (&sq->src_segment, object, FALSE);
if (next_time != GST_CLOCK_TIME_NONE) {
if (sq->last_time == GST_CLOCK_TIME_NONE || sq->last_time < next_time)
sq->last_time = next_time;
if (mq->high_time == GST_CLOCK_TIME_NONE || mq->high_time <= next_time) {
/* Wake up all non-linked pads now that we advanceed the high time */
mq->high_time = next_time;
wake_up_next_non_linked (mq);
}
}
/* Try to push out the new object */
result = gst_single_queue_push_one (mq, sq, object);
sq->srcresult = result;
@ -1187,6 +1322,7 @@ gst_multi_queue_loop (GstPad * pad)
gst_flow_get_name (sq->srcresult));
sq->last_oldid = newid;
return;
out_flushing:
@ -1516,7 +1652,10 @@ wake_up_next_non_linked (GstMultiQueue * mq)
GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
if (sq->srcresult == GST_FLOW_NOT_LINKED) {
if (sq->nextid != 0 && sq->nextid <= mq->highid) {
if ((mq->sync_by_running_time && mq->high_time != GST_CLOCK_TIME_NONE
&& sq->next_time != GST_CLOCK_TIME_NONE
&& sq->next_time >= mq->high_time)
|| (sq->nextid != 0 && sq->nextid <= mq->highid)) {
GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
g_cond_signal (sq->turn);
}
@ -1567,6 +1706,49 @@ compute_high_id (GstMultiQueue * mq)
lowest);
}
/* WITH LOCK TAKEN */
static void
compute_high_time (GstMultiQueue * mq)
{
/* The high-id is either the highest id among the linked pads, or if all
* pads are not-linked, it's the lowest not-linked pad */
GList *tmp;
GstClockTime highest = GST_CLOCK_TIME_NONE;
GstClockTime lowest = GST_CLOCK_TIME_NONE;
for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
GST_LOG_OBJECT (mq,
"inspecting sq:%d , next_time:%" GST_TIME_FORMAT ", last_time:%"
GST_TIME_FORMAT ", srcresult:%s", sq->id, GST_TIME_ARGS (sq->next_time),
GST_TIME_ARGS (sq->last_time), gst_flow_get_name (sq->srcresult));
if (sq->srcresult == GST_FLOW_NOT_LINKED) {
/* No need to consider queues which are not waiting */
if (sq->next_time == GST_CLOCK_TIME_NONE) {
GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
continue;
}
if (lowest == GST_CLOCK_TIME_NONE || sq->next_time < lowest)
lowest = sq->next_time;
} else if (sq->srcresult != GST_FLOW_UNEXPECTED) {
/* If we don't have a global highid, or the global highid is lower than
* this single queue's last outputted id, store the queue's one,
* unless the singlequeue is at EOS (srcresult = UNEXPECTED) */
if (highest == GST_CLOCK_TIME_NONE || sq->last_time > highest)
highest = sq->last_time;
}
}
mq->high_time = highest;
GST_LOG_OBJECT (mq,
"High time is now : %" GST_TIME_FORMAT ", lowest non-linked %"
GST_TIME_FORMAT, GST_TIME_ARGS (mq->high_time), GST_TIME_ARGS (lowest));
}
#define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
((q)->max_size.format) <= (value))
@ -1770,6 +1952,8 @@ gst_single_queue_new (GstMultiQueue * mqueue, gint id)
sq->nextid = 0;
sq->oldid = 0;
sq->next_time = GST_CLOCK_TIME_NONE;
sq->last_time = GST_CLOCK_TIME_NONE;
sq->turn = g_cond_new ();
sq->sinktime = GST_CLOCK_TIME_NONE;

View file

@ -50,6 +50,8 @@ typedef struct _GstMultiQueueClass GstMultiQueueClass;
struct _GstMultiQueue {
GstElement element;
gboolean sync_by_running_time;
/* number of queues */
guint nbqueues;
@ -65,6 +67,7 @@ struct _GstMultiQueue {
guint32 counter; /* incoming object counter, use atomic accesses */
guint32 highid; /* contains highest id of last outputted object */
GstClockTime high_time; /* highest start running time */
GMutex * qlock; /* Global queue lock (vs object lock or individual */
/* queues lock). Protects nbqueues, queues, global */