From 9f3345fcc21be9fb785cf0bc8d74d0e560173f01 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 23 Sep 2013 11:18:46 +0200 Subject: [PATCH] rtpjitterbuffer: schedule lost event differently Schedule the lost event by placing it inside the jitterbuffer with the seqnum that was lost so that the pushing thread can interleave and push it properly. --- gst/rtpmanager/gstrtpjitterbuffer.c | 104 +++++++++++++++++----------- 1 file changed, 62 insertions(+), 42 deletions(-) diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index d069087683..b347381a7f 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -1724,6 +1724,7 @@ send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum, gboolean late) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + guint next_seqnum; /* we had a gap and thus we lost some packets. Create an event for this. */ if (lost_packets > 1) @@ -1733,10 +1734,12 @@ send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum, GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum); priv->num_late += lost_packets; - priv->discont = TRUE; + + next_seqnum = seqnum + lost_packets - 1; if (priv->do_lost) { GstEvent *event; + RTPJitterBufferItem *item; /* create paket lost event */ event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, @@ -1745,14 +1748,15 @@ send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum, "timestamp", G_TYPE_UINT64, timestamp, "duration", G_TYPE_UINT64, duration, "late", G_TYPE_BOOLEAN, late, NULL)); - JBUF_UNLOCK (priv); - gst_pad_push_event (priv->srcpad, event); - JBUF_LOCK (priv); + + item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, next_seqnum, -1); + rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL); } - /* update our expected next packet but make sure the seqnum increases */ - if (seqnum + lost_packets > priv->next_seqnum) { - priv->next_seqnum = (seqnum + lost_packets) & 0xffff; - priv->last_popped_seqnum = seqnum; + if (seqnum == priv->next_seqnum) { + GST_DEBUG_OBJECT (jitterbuffer, "lost seqnum %d == %d next_seqnum -> %d", + seqnum, priv->next_seqnum, next_seqnum); + priv->next_seqnum = next_seqnum & 0xffff; + priv->last_popped_seqnum = next_seqnum; priv->last_out_time = timestamp; } } @@ -2199,60 +2203,76 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum) GstFlowReturn result; RTPJitterBufferItem *item; GstBuffer *outbuf; + GstEvent *outevent; GstClockTime dts, pts; gint percent = -1; + gboolean is_buffer; /* when we get here we are ready to pop and push the buffer */ item = rtp_jitter_buffer_pop (priv->jbuf, &percent); - check_buffering_percent (jitterbuffer, &percent); + is_buffer = GST_IS_BUFFER (item->data); - /* we need to make writable to change the flags and timestamps */ - outbuf = gst_buffer_make_writable (item->data); - item->data = NULL; + if (is_buffer) { + check_buffering_percent (jitterbuffer, &percent); - if (G_UNLIKELY (priv->discont)) { - /* set DISCONT flag when we missed a packet. We pushed the buffer writable - * into the jitterbuffer so we can modify now. */ - GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont"); - GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); - priv->discont = FALSE; + /* we need to make writable to change the flags and timestamps */ + outbuf = gst_buffer_make_writable (item->data); + + if (G_UNLIKELY (priv->discont)) { + /* set DISCONT flag when we missed a packet. We pushed the buffer writable + * into the jitterbuffer so we can modify now. */ + GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont"); + GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); + priv->discont = FALSE; + } + if (G_UNLIKELY (priv->ts_discont)) { + GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC); + priv->ts_discont = FALSE; + } + + dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts); + pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts); + + /* apply timestamp with offset to buffer now */ + GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts); + GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts); + + /* update the elapsed time when we need to check against the npt stop time. */ + update_estimated_eos (jitterbuffer, item); + + priv->last_out_time = GST_BUFFER_PTS (outbuf); + } else { + outevent = item->data; + if (item->type == ITEM_TYPE_LOST) + priv->discont = TRUE; } - if (G_UNLIKELY (priv->ts_discont)) { - GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC); - priv->ts_discont = FALSE; - } - - dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts); - pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts); - - /* apply timestamp with offset to buffer now */ - GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts); - GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts); - - /* update the elapsed time when we need to check against the npt stop time. */ - update_estimated_eos (jitterbuffer, item); /* now we are ready to push the buffer. Save the seqnum and release the lock * so the other end can push stuff in the queue again. */ priv->last_popped_seqnum = seqnum; - priv->last_out_time = GST_BUFFER_PTS (outbuf); priv->next_seqnum = (seqnum + 1) & 0xffff; JBUF_UNLOCK (priv); + item->data = NULL; free_item (item); - if (percent != -1) - post_buffering_percent (jitterbuffer, percent); + if (is_buffer) { + /* push buffer */ + if (percent != -1) + post_buffering_percent (jitterbuffer, percent); - /* push buffer */ - GST_DEBUG_OBJECT (jitterbuffer, - "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT, - seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)), - GST_TIME_ARGS (GST_BUFFER_PTS (outbuf))); - - result = gst_pad_push (priv->srcpad, outbuf); + GST_DEBUG_OBJECT (jitterbuffer, + "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT, + seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)), + GST_TIME_ARGS (GST_BUFFER_PTS (outbuf))); + result = gst_pad_push (priv->srcpad, outbuf); + } else { + GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum); + gst_pad_push_event (priv->srcpad, outevent); + result = GST_FLOW_OK; + } JBUF_LOCK_CHECK (priv, out_flushing); return result;