From 353e510f94b30549eeca3c8ff2e573289c4560cd Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 14 Feb 2014 15:59:46 +0100 Subject: [PATCH] rtpjitterbuffer: add support for serialized queries See https://bugzilla.gnome.org/show_bug.cgi?id=723850 --- gst/rtpmanager/gstrtpjitterbuffer.c | 194 +++++++++++++++++++--------- 1 file changed, 134 insertions(+), 60 deletions(-) diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 6cade8052f..0a916018f3 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -195,6 +195,24 @@ enum } \ } G_STMT_END +#define JBUF_WAIT_QUERY(priv,label) G_STMT_START { \ + GST_DEBUG ("waiting query"); \ + (priv)->waiting_query = TRUE; \ + g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \ + (priv)->waiting_query = FALSE; \ + GST_DEBUG ("waiting query done"); \ + if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \ + goto label; \ +} G_STMT_END +#define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START { \ + (priv)->last_query = res; \ + if (G_UNLIKELY ((priv)->waiting_query)) { \ + GST_DEBUG ("signal query"); \ + g_cond_signal (&(priv)->jbuf_query); \ + } \ +} G_STMT_END + + struct _GstRtpJitterBufferPrivate { GstPad *sinkpad, *srcpad; @@ -206,6 +224,9 @@ struct _GstRtpJitterBufferPrivate GCond jbuf_timer; gboolean waiting_event; GCond jbuf_event; + gboolean waiting_query; + GCond jbuf_query; + gboolean last_query; gboolean discont; gboolean ts_discont; gboolean active; @@ -704,6 +725,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) g_mutex_init (&priv->jbuf_lock); g_cond_init (&priv->jbuf_timer); g_cond_init (&priv->jbuf_event); + g_cond_init (&priv->jbuf_query); /* reset skew detection initialy */ rtp_jitter_buffer_reset_skew (priv->jbuf); @@ -739,9 +761,12 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK); } +#define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST)) + #define ITEM_TYPE_BUFFER 0 #define ITEM_TYPE_LOST 1 #define ITEM_TYPE_EVENT 2 +#define ITEM_TYPE_QUERY 3 static RTPJitterBufferItem * alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts, @@ -784,6 +809,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object) g_mutex_clear (&priv->jbuf_lock); g_cond_clear (&priv->jbuf_timer); g_cond_clear (&priv->jbuf_event); + g_cond_clear (&priv->jbuf_query); rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL); g_object_unref (priv->jbuf); @@ -1124,6 +1150,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer) GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue"); /* this unblocks any waiting pops on the src pad task */ JBUF_SIGNAL_EVENT (priv); + JBUF_SIGNAL_QUERY (priv, FALSE); JBUF_UNLOCK (priv); } @@ -1261,6 +1288,7 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, priv->timer_running = FALSE; unschedule_current_timer (jitterbuffer); JBUF_SIGNAL_TIMER (priv); + JBUF_SIGNAL_QUERY (priv, FALSE); JBUF_UNLOCK (priv); g_thread_join (priv->timer_thread); priv->timer_thread = NULL; @@ -2224,8 +2252,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, old_item = rtp_jitter_buffer_peek (priv->jbuf); - /* only drop non-event buffers */ - if (old_item->type != ITEM_TYPE_EVENT) { + if (IS_DROPABLE (old_item)) { old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent); GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p", old_item); @@ -2404,53 +2431,62 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum) GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GstFlowReturn result; RTPJitterBufferItem *item; - GstBuffer *outbuf; - GstEvent *outevent; + GstBuffer *outbuf = NULL; + GstEvent *outevent = NULL; + GstQuery *outquery = NULL; GstClockTime dts, pts; gint percent = -1; - gboolean is_buffer, do_push = TRUE; + gboolean do_push = TRUE; + guint type; /* when we get here we are ready to pop and push the buffer */ item = rtp_jitter_buffer_pop (priv->jbuf, &percent); + type = item->type; - is_buffer = GST_IS_BUFFER (item->data); + switch (type) { + case ITEM_TYPE_BUFFER: + check_buffering_percent (jitterbuffer, &percent); - if (is_buffer) { - check_buffering_percent (jitterbuffer, &percent); + /* we need to make writable to change the flags and timestamps */ + outbuf = gst_buffer_make_writable (item->data); - /* we need to make writable to change the flags and timestamps */ - outbuf = gst_buffer_make_writable (item->data); + if (G_UNLIKELY (priv->discont)) { + /* set DISCONT flag when we missed a packet. We pushed the buffer writable + * into the jitterbuffer so we can modify now. */ + GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont"); + GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); + priv->discont = FALSE; + } + if (G_UNLIKELY (priv->ts_discont)) { + GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC); + priv->ts_discont = FALSE; + } - if (G_UNLIKELY (priv->discont)) { - /* set DISCONT flag when we missed a packet. We pushed the buffer writable - * into the jitterbuffer so we can modify now. */ - GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont"); - GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); - priv->discont = FALSE; - } - if (G_UNLIKELY (priv->ts_discont)) { - GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC); - priv->ts_discont = FALSE; - } + dts = + gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts); + pts = + gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts); - dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts); - pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts); + /* apply timestamp with offset to buffer now */ + GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts); + GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts); - /* apply timestamp with offset to buffer now */ - GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts); - GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts); + /* update the elapsed time when we need to check against the npt stop time. */ + update_estimated_eos (jitterbuffer, item); - /* update the elapsed time when we need to check against the npt stop time. */ - update_estimated_eos (jitterbuffer, item); - - priv->last_out_time = GST_BUFFER_PTS (outbuf); - } else { - outevent = item->data; - if (item->type == ITEM_TYPE_LOST) { + priv->last_out_time = GST_BUFFER_PTS (outbuf); + break; + case ITEM_TYPE_LOST: priv->discont = TRUE; if (!priv->do_lost) do_push = FALSE; - } + /* FALLTHROUGH */ + case ITEM_TYPE_EVENT: + outevent = item->data; + break; + case ITEM_TYPE_QUERY: + outquery = item->data; + break; } /* now we are ready to push the buffer. Save the seqnum and release the lock @@ -2464,28 +2500,45 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum) item->data = NULL; free_item (item); - if (is_buffer) { - /* push buffer */ - if (percent != -1) - post_buffering_percent (jitterbuffer, percent); + switch (type) { + case ITEM_TYPE_BUFFER: + /* push buffer */ + if (percent != -1) + post_buffering_percent (jitterbuffer, percent); - GST_DEBUG_OBJECT (jitterbuffer, - "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT, - seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)), - GST_TIME_ARGS (GST_BUFFER_PTS (outbuf))); - result = gst_pad_push (priv->srcpad, outbuf); - } else { - GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum); + GST_DEBUG_OBJECT (jitterbuffer, + "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT, + seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)), + GST_TIME_ARGS (GST_BUFFER_PTS (outbuf))); + result = gst_pad_push (priv->srcpad, outbuf); - if (do_push) - gst_pad_push_event (priv->srcpad, outevent); - else - gst_event_unref (outevent); + JBUF_LOCK_CHECK (priv, out_flushing); + break; + case ITEM_TYPE_LOST: + case ITEM_TYPE_EVENT: + GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum); - result = GST_FLOW_OK; + if (do_push) + gst_pad_push_event (priv->srcpad, outevent); + else + gst_event_unref (outevent); + + result = GST_FLOW_OK; + + JBUF_LOCK_CHECK (priv, out_flushing); + break; + case ITEM_TYPE_QUERY: + { + gboolean res; + + res = gst_pad_peer_query (priv->srcpad, outquery); + + JBUF_LOCK_CHECK (priv, out_flushing); + GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res); + JBUF_SIGNAL_QUERY (priv, res); + break; + } } - JBUF_LOCK_CHECK (priv, out_flushing); - return result; /* ERRORS */ @@ -2904,7 +2957,7 @@ static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) { GstRtpJitterBufferPrivate *priv; - GstFlowReturn result; + GstFlowReturn result = GST_FLOW_OK; priv = jitterbuffer->priv; @@ -2920,8 +2973,6 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) while (result == GST_FLOW_OK); /* store result for upstream */ priv->srcresult = result; - JBUF_UNLOCK (priv); - /* if we get here we need to pause */ goto pause; @@ -2929,15 +2980,17 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) flushing: { result = priv->srcresult; - JBUF_UNLOCK (priv); goto pause; } pause: { - const gchar *reason = gst_flow_get_name (result); GstEvent *event; - GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason); + JBUF_SIGNAL_QUERY (priv, FALSE); + JBUF_UNLOCK (priv); + + GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", + gst_flow_get_name (result)); gst_pad_pause_task (priv->srcpad); if (result == GST_FLOW_EOS) { event = gst_event_new_eos (); @@ -3113,6 +3166,11 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) { gboolean res = FALSE; + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + + jitterbuffer = GST_RTP_JITTER_BUFFER (parent); + priv = jitterbuffer->priv; switch (GST_QUERY_TYPE (query)) { case GST_QUERY_CAPS: @@ -3128,14 +3186,30 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent, } default: if (GST_QUERY_IS_SERIALIZED (query)) { - GST_WARNING_OBJECT (pad, "unhandled serialized query"); - res = FALSE; + RTPJitterBufferItem *item; + + JBUF_LOCK_CHECK (priv, out_flushing); + GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query"); + item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1); + rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL); + JBUF_SIGNAL_EVENT (priv); + JBUF_WAIT_QUERY (priv, out_flushing); + res = priv->last_query; + JBUF_UNLOCK (priv); } else { res = gst_pad_query_default (pad, parent, query); } break; } return res; + /* ERRORS */ +out_flushing: + { + GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); + JBUF_UNLOCK (priv); + return FALSE; + } + } static gboolean