rtpjitterbuffer: add support for serialized queries

See https://bugzilla.gnome.org/show_bug.cgi?id=723850
This commit is contained in:
Wim Taymans 2014-02-14 15:59:46 +01:00
parent 6af234e29e
commit 353e510f94

View file

@ -195,6 +195,24 @@ enum
} \ } \
} G_STMT_END } G_STMT_END
#define JBUF_WAIT_QUERY(priv,label) G_STMT_START { \
GST_DEBUG ("waiting query"); \
(priv)->waiting_query = TRUE; \
g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
(priv)->waiting_query = FALSE; \
GST_DEBUG ("waiting query done"); \
if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
goto label; \
} G_STMT_END
#define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START { \
(priv)->last_query = res; \
if (G_UNLIKELY ((priv)->waiting_query)) { \
GST_DEBUG ("signal query"); \
g_cond_signal (&(priv)->jbuf_query); \
} \
} G_STMT_END
struct _GstRtpJitterBufferPrivate struct _GstRtpJitterBufferPrivate
{ {
GstPad *sinkpad, *srcpad; GstPad *sinkpad, *srcpad;
@ -206,6 +224,9 @@ struct _GstRtpJitterBufferPrivate
GCond jbuf_timer; GCond jbuf_timer;
gboolean waiting_event; gboolean waiting_event;
GCond jbuf_event; GCond jbuf_event;
gboolean waiting_query;
GCond jbuf_query;
gboolean last_query;
gboolean discont; gboolean discont;
gboolean ts_discont; gboolean ts_discont;
gboolean active; gboolean active;
@ -704,6 +725,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
g_mutex_init (&priv->jbuf_lock); g_mutex_init (&priv->jbuf_lock);
g_cond_init (&priv->jbuf_timer); g_cond_init (&priv->jbuf_timer);
g_cond_init (&priv->jbuf_event); g_cond_init (&priv->jbuf_event);
g_cond_init (&priv->jbuf_query);
/* reset skew detection initialy */ /* reset skew detection initialy */
rtp_jitter_buffer_reset_skew (priv->jbuf); rtp_jitter_buffer_reset_skew (priv->jbuf);
@ -739,9 +761,12 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK); GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
} }
#define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
#define ITEM_TYPE_BUFFER 0 #define ITEM_TYPE_BUFFER 0
#define ITEM_TYPE_LOST 1 #define ITEM_TYPE_LOST 1
#define ITEM_TYPE_EVENT 2 #define ITEM_TYPE_EVENT 2
#define ITEM_TYPE_QUERY 3
static RTPJitterBufferItem * static RTPJitterBufferItem *
alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts, alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
@ -784,6 +809,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
g_mutex_clear (&priv->jbuf_lock); g_mutex_clear (&priv->jbuf_lock);
g_cond_clear (&priv->jbuf_timer); g_cond_clear (&priv->jbuf_timer);
g_cond_clear (&priv->jbuf_event); g_cond_clear (&priv->jbuf_event);
g_cond_clear (&priv->jbuf_query);
rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL); rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
g_object_unref (priv->jbuf); g_object_unref (priv->jbuf);
@ -1124,6 +1150,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue"); GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
/* this unblocks any waiting pops on the src pad task */ /* this unblocks any waiting pops on the src pad task */
JBUF_SIGNAL_EVENT (priv); JBUF_SIGNAL_EVENT (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
JBUF_UNLOCK (priv); JBUF_UNLOCK (priv);
} }
@ -1261,6 +1288,7 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
priv->timer_running = FALSE; priv->timer_running = FALSE;
unschedule_current_timer (jitterbuffer); unschedule_current_timer (jitterbuffer);
JBUF_SIGNAL_TIMER (priv); JBUF_SIGNAL_TIMER (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
JBUF_UNLOCK (priv); JBUF_UNLOCK (priv);
g_thread_join (priv->timer_thread); g_thread_join (priv->timer_thread);
priv->timer_thread = NULL; priv->timer_thread = NULL;
@ -2224,8 +2252,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
old_item = rtp_jitter_buffer_peek (priv->jbuf); old_item = rtp_jitter_buffer_peek (priv->jbuf);
/* only drop non-event buffers */ if (IS_DROPABLE (old_item)) {
if (old_item->type != ITEM_TYPE_EVENT) {
old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent); old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p", GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
old_item); old_item);
@ -2404,53 +2431,62 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn result; GstFlowReturn result;
RTPJitterBufferItem *item; RTPJitterBufferItem *item;
GstBuffer *outbuf; GstBuffer *outbuf = NULL;
GstEvent *outevent; GstEvent *outevent = NULL;
GstQuery *outquery = NULL;
GstClockTime dts, pts; GstClockTime dts, pts;
gint percent = -1; gint percent = -1;
gboolean is_buffer, do_push = TRUE; gboolean do_push = TRUE;
guint type;
/* when we get here we are ready to pop and push the buffer */ /* when we get here we are ready to pop and push the buffer */
item = rtp_jitter_buffer_pop (priv->jbuf, &percent); item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
type = item->type;
is_buffer = GST_IS_BUFFER (item->data); switch (type) {
case ITEM_TYPE_BUFFER:
check_buffering_percent (jitterbuffer, &percent);
if (is_buffer) { /* we need to make writable to change the flags and timestamps */
check_buffering_percent (jitterbuffer, &percent); outbuf = gst_buffer_make_writable (item->data);
/* we need to make writable to change the flags and timestamps */ if (G_UNLIKELY (priv->discont)) {
outbuf = gst_buffer_make_writable (item->data); /* set DISCONT flag when we missed a packet. We pushed the buffer writable
* into the jitterbuffer so we can modify now. */
GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
priv->discont = FALSE;
}
if (G_UNLIKELY (priv->ts_discont)) {
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
priv->ts_discont = FALSE;
}
if (G_UNLIKELY (priv->discont)) { dts =
/* set DISCONT flag when we missed a packet. We pushed the buffer writable gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
* into the jitterbuffer so we can modify now. */ pts =
GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont"); gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
priv->discont = FALSE;
}
if (G_UNLIKELY (priv->ts_discont)) {
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
priv->ts_discont = FALSE;
}
dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts); /* apply timestamp with offset to buffer now */
pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts); GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
/* apply timestamp with offset to buffer now */ /* update the elapsed time when we need to check against the npt stop time. */
GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts); update_estimated_eos (jitterbuffer, item);
GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
/* update the elapsed time when we need to check against the npt stop time. */ priv->last_out_time = GST_BUFFER_PTS (outbuf);
update_estimated_eos (jitterbuffer, item); break;
case ITEM_TYPE_LOST:
priv->last_out_time = GST_BUFFER_PTS (outbuf);
} else {
outevent = item->data;
if (item->type == ITEM_TYPE_LOST) {
priv->discont = TRUE; priv->discont = TRUE;
if (!priv->do_lost) if (!priv->do_lost)
do_push = FALSE; do_push = FALSE;
} /* FALLTHROUGH */
case ITEM_TYPE_EVENT:
outevent = item->data;
break;
case ITEM_TYPE_QUERY:
outquery = item->data;
break;
} }
/* now we are ready to push the buffer. Save the seqnum and release the lock /* now we are ready to push the buffer. Save the seqnum and release the lock
@ -2464,28 +2500,45 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
item->data = NULL; item->data = NULL;
free_item (item); free_item (item);
if (is_buffer) { switch (type) {
/* push buffer */ case ITEM_TYPE_BUFFER:
if (percent != -1) /* push buffer */
post_buffering_percent (jitterbuffer, percent); if (percent != -1)
post_buffering_percent (jitterbuffer, percent);
GST_DEBUG_OBJECT (jitterbuffer, GST_DEBUG_OBJECT (jitterbuffer,
"Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT, "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)), seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
GST_TIME_ARGS (GST_BUFFER_PTS (outbuf))); GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
result = gst_pad_push (priv->srcpad, outbuf); result = gst_pad_push (priv->srcpad, outbuf);
} else {
GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum);
if (do_push) JBUF_LOCK_CHECK (priv, out_flushing);
gst_pad_push_event (priv->srcpad, outevent); break;
else case ITEM_TYPE_LOST:
gst_event_unref (outevent); case ITEM_TYPE_EVENT:
GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum);
result = GST_FLOW_OK; if (do_push)
gst_pad_push_event (priv->srcpad, outevent);
else
gst_event_unref (outevent);
result = GST_FLOW_OK;
JBUF_LOCK_CHECK (priv, out_flushing);
break;
case ITEM_TYPE_QUERY:
{
gboolean res;
res = gst_pad_peer_query (priv->srcpad, outquery);
JBUF_LOCK_CHECK (priv, out_flushing);
GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
JBUF_SIGNAL_QUERY (priv, res);
break;
}
} }
JBUF_LOCK_CHECK (priv, out_flushing);
return result; return result;
/* ERRORS */ /* ERRORS */
@ -2904,7 +2957,7 @@ static void
gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
{ {
GstRtpJitterBufferPrivate *priv; GstRtpJitterBufferPrivate *priv;
GstFlowReturn result; GstFlowReturn result = GST_FLOW_OK;
priv = jitterbuffer->priv; priv = jitterbuffer->priv;
@ -2920,8 +2973,6 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
while (result == GST_FLOW_OK); while (result == GST_FLOW_OK);
/* store result for upstream */ /* store result for upstream */
priv->srcresult = result; priv->srcresult = result;
JBUF_UNLOCK (priv);
/* if we get here we need to pause */ /* if we get here we need to pause */
goto pause; goto pause;
@ -2929,15 +2980,17 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
flushing: flushing:
{ {
result = priv->srcresult; result = priv->srcresult;
JBUF_UNLOCK (priv);
goto pause; goto pause;
} }
pause: pause:
{ {
const gchar *reason = gst_flow_get_name (result);
GstEvent *event; GstEvent *event;
GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason); JBUF_SIGNAL_QUERY (priv, FALSE);
JBUF_UNLOCK (priv);
GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
gst_flow_get_name (result));
gst_pad_pause_task (priv->srcpad); gst_pad_pause_task (priv->srcpad);
if (result == GST_FLOW_EOS) { if (result == GST_FLOW_EOS) {
event = gst_event_new_eos (); event = gst_event_new_eos ();
@ -3113,6 +3166,11 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
GstQuery * query) GstQuery * query)
{ {
gboolean res = FALSE; gboolean res = FALSE;
GstRtpJitterBuffer *jitterbuffer;
GstRtpJitterBufferPrivate *priv;
jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
priv = jitterbuffer->priv;
switch (GST_QUERY_TYPE (query)) { switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_CAPS: case GST_QUERY_CAPS:
@ -3128,14 +3186,30 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
} }
default: default:
if (GST_QUERY_IS_SERIALIZED (query)) { if (GST_QUERY_IS_SERIALIZED (query)) {
GST_WARNING_OBJECT (pad, "unhandled serialized query"); RTPJitterBufferItem *item;
res = FALSE;
JBUF_LOCK_CHECK (priv, out_flushing);
GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
JBUF_SIGNAL_EVENT (priv);
JBUF_WAIT_QUERY (priv, out_flushing);
res = priv->last_query;
JBUF_UNLOCK (priv);
} else { } else {
res = gst_pad_query_default (pad, parent, query); res = gst_pad_query_default (pad, parent, query);
} }
break; break;
} }
return res; return res;
/* ERRORS */
out_flushing:
{
GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
JBUF_UNLOCK (priv);
return FALSE;
}
} }
static gboolean static gboolean