rtpjitterbuffer: schedule lost event differently

Schedule the lost event by placing it inside the jitterbuffer with the seqnum
that was lost so that the pushing thread can interleave and push it properly.
This commit is contained in:
Wim Taymans 2013-09-23 11:18:46 +02:00
parent f40d6689f2
commit 9f3345fcc2

View file

@ -1724,6 +1724,7 @@ send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
gboolean late)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
guint next_seqnum;
/* we had a gap and thus we lost some packets. Create an event for this. */
if (lost_packets > 1)
@ -1733,10 +1734,12 @@ send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
priv->num_late += lost_packets;
priv->discont = TRUE;
next_seqnum = seqnum + lost_packets - 1;
if (priv->do_lost) {
GstEvent *event;
RTPJitterBufferItem *item;
/* create paket lost event */
event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
@ -1745,14 +1748,15 @@ send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
"timestamp", G_TYPE_UINT64, timestamp,
"duration", G_TYPE_UINT64, duration,
"late", G_TYPE_BOOLEAN, late, NULL));
JBUF_UNLOCK (priv);
gst_pad_push_event (priv->srcpad, event);
JBUF_LOCK (priv);
item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, next_seqnum, -1);
rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
}
/* update our expected next packet but make sure the seqnum increases */
if (seqnum + lost_packets > priv->next_seqnum) {
priv->next_seqnum = (seqnum + lost_packets) & 0xffff;
priv->last_popped_seqnum = seqnum;
if (seqnum == priv->next_seqnum) {
GST_DEBUG_OBJECT (jitterbuffer, "lost seqnum %d == %d next_seqnum -> %d",
seqnum, priv->next_seqnum, next_seqnum);
priv->next_seqnum = next_seqnum & 0xffff;
priv->last_popped_seqnum = next_seqnum;
priv->last_out_time = timestamp;
}
}
@ -2199,60 +2203,76 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
GstFlowReturn result;
RTPJitterBufferItem *item;
GstBuffer *outbuf;
GstEvent *outevent;
GstClockTime dts, pts;
gint percent = -1;
gboolean is_buffer;
/* when we get here we are ready to pop and push the buffer */
item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
check_buffering_percent (jitterbuffer, &percent);
is_buffer = GST_IS_BUFFER (item->data);
/* we need to make writable to change the flags and timestamps */
outbuf = gst_buffer_make_writable (item->data);
item->data = NULL;
if (is_buffer) {
check_buffering_percent (jitterbuffer, &percent);
if (G_UNLIKELY (priv->discont)) {
/* set DISCONT flag when we missed a packet. We pushed the buffer writable
* into the jitterbuffer so we can modify now. */
GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
priv->discont = FALSE;
/* we need to make writable to change the flags and timestamps */
outbuf = gst_buffer_make_writable (item->data);
if (G_UNLIKELY (priv->discont)) {
/* set DISCONT flag when we missed a packet. We pushed the buffer writable
* into the jitterbuffer so we can modify now. */
GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
priv->discont = FALSE;
}
if (G_UNLIKELY (priv->ts_discont)) {
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
priv->ts_discont = FALSE;
}
dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
/* apply timestamp with offset to buffer now */
GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
/* update the elapsed time when we need to check against the npt stop time. */
update_estimated_eos (jitterbuffer, item);
priv->last_out_time = GST_BUFFER_PTS (outbuf);
} else {
outevent = item->data;
if (item->type == ITEM_TYPE_LOST)
priv->discont = TRUE;
}
if (G_UNLIKELY (priv->ts_discont)) {
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
priv->ts_discont = FALSE;
}
dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
/* apply timestamp with offset to buffer now */
GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
/* update the elapsed time when we need to check against the npt stop time. */
update_estimated_eos (jitterbuffer, item);
/* now we are ready to push the buffer. Save the seqnum and release the lock
* so the other end can push stuff in the queue again. */
priv->last_popped_seqnum = seqnum;
priv->last_out_time = GST_BUFFER_PTS (outbuf);
priv->next_seqnum = (seqnum + 1) & 0xffff;
JBUF_UNLOCK (priv);
item->data = NULL;
free_item (item);
if (percent != -1)
post_buffering_percent (jitterbuffer, percent);
if (is_buffer) {
/* push buffer */
if (percent != -1)
post_buffering_percent (jitterbuffer, percent);
/* push buffer */
GST_DEBUG_OBJECT (jitterbuffer,
"Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
result = gst_pad_push (priv->srcpad, outbuf);
GST_DEBUG_OBJECT (jitterbuffer,
"Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
result = gst_pad_push (priv->srcpad, outbuf);
} else {
GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum);
gst_pad_push_event (priv->srcpad, outevent);
result = GST_FLOW_OK;
}
JBUF_LOCK_CHECK (priv, out_flushing);
return result;