diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 8389d8c812..5425771578 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -838,7 +838,7 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active, { GstRtpJitterBufferPrivate *priv; GstClockTime last_out; - GstBuffer *head; + RTPJitterBufferItem *item; priv = jbuf->priv; @@ -859,9 +859,9 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active, if (!active) { rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE); } - if ((head = rtp_jitter_buffer_peek (priv->jbuf))) { + if ((item = rtp_jitter_buffer_peek (priv->jbuf))) { /* head buffer timestamp and offset gives our output time */ - last_out = GST_BUFFER_DTS (head) + priv->ts_offset; + last_out = item->dts + priv->ts_offset; } else { /* use last known time when the buffer is empty */ last_out = priv->last_out_time; @@ -1809,6 +1809,20 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, } } +static RTPJitterBufferItem * +alloc_item (void) +{ + return g_slice_new (RTPJitterBufferItem); +} + +static void +free_item (RTPJitterBufferItem * item) +{ + if (item->data) + gst_mini_object_unref (item->data); + g_slice_free (RTPJitterBufferItem, item); +} + static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) @@ -1825,6 +1839,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, guint8 pt; GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; gboolean do_next_seqnum = FALSE; + RTPJitterBufferItem *item; jitterbuffer = GST_RTP_JITTER_BUFFER (parent); @@ -1986,27 +2001,28 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000); if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) { - GstBuffer *old_buf; - - old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent); + RTPJitterBufferItem *old_item; + old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent); GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p", - old_buf); - - gst_buffer_unref (old_buf); + old_item); + free_item (old_item); } } - /* we need to make the metadata writable before pushing it in the jitterbuffer - * because the jitterbuffer will update the PTS */ - buffer = gst_buffer_make_writable (buffer); - GST_BUFFER_DTS (buffer) = dts; - GST_BUFFER_PTS (buffer) = pts; + item = alloc_item (); + item->data = buffer; + item->next = NULL; + item->prev = NULL; + item->dts = dts; + item->pts = pts; + item->seqnum = seqnum; + item->rtptime = rtptime; /* now insert the packet into the queue in sorted order. This function returns * FALSE if a packet with the same seqnum was already in the queue, meaning we * have a duplicate. */ - if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, dts, + if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, &tail, &percent))) goto duplicate; @@ -2084,23 +2100,20 @@ duplicate: GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping", seqnum); priv->num_duplicates++; - gst_buffer_unref (buffer); + free_item (item); goto finished; } } static GstClockTime -compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf) +compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item) { guint64 ext_time, elapsed; guint32 rtp_time; GstRtpJitterBufferPrivate *priv; - GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; priv = jitterbuffer->priv; - gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp); - rtp_time = gst_rtp_buffer_get_timestamp (&rtp); - gst_rtp_buffer_unmap (&rtp); + rtp_time = item->rtptime; GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %" G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp); @@ -2121,7 +2134,8 @@ compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf) } static void -update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf) +update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, + RTPJitterBufferItem * item) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; @@ -2129,7 +2143,7 @@ update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf) && priv->clock_base != -1 && priv->clock_rate > 0) { guint64 elapsed, estimated; - elapsed = compute_elapsed (jitterbuffer, outbuf); + elapsed = compute_elapsed (jitterbuffer, item); if (elapsed > priv->last_elapsed || !priv->last_elapsed) { guint64 left; @@ -2141,7 +2155,7 @@ update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf) GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT, GST_TIME_ARGS (left)); - out_time = GST_BUFFER_DTS (outbuf); + out_time = item->dts; if (elapsed > 0) estimated = gst_util_uint64_scale (out_time, left, elapsed); @@ -2171,15 +2185,20 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GstFlowReturn result; + RTPJitterBufferItem *item; GstBuffer *outbuf; GstClockTime dts, pts; gint percent = -1; /* when we get here we are ready to pop and push the buffer */ - outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent); + item = rtp_jitter_buffer_pop (priv->jbuf, &percent); check_buffering_percent (jitterbuffer, &percent); + /* we need to make writable to change the flags and timestamps */ + outbuf = gst_buffer_make_writable (item->data); + item->data = NULL; + if (G_UNLIKELY (priv->discont)) { /* set DISCONT flag when we missed a packet. We pushed the buffer writable * into the jitterbuffer so we can modify now. */ @@ -2192,18 +2211,15 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum) priv->ts_discont = FALSE; } - dts = GST_BUFFER_DTS (outbuf); - pts = GST_BUFFER_PTS (outbuf); - - dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, dts); - pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, pts); + dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts); + pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts); /* apply timestamp with offset to buffer now */ GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts); GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts); /* update the elapsed time when we need to check against the npt stop time. */ - update_estimated_eos (jitterbuffer, outbuf); + update_estimated_eos (jitterbuffer, item); /* now we are ready to push the buffer. Save the seqnum and release the lock * so the other end can push stuff in the queue again. */ @@ -2212,6 +2228,8 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum) priv->next_seqnum = (seqnum + 1) & 0xffff; JBUF_UNLOCK (priv); + free_item (item); + if (percent != -1) post_buffering_percent (jitterbuffer, percent); @@ -2245,11 +2263,10 @@ handle_next_buffer (GstRtpJitterBuffer * jitterbuffer) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GstFlowReturn result = GST_FLOW_OK; - GstBuffer *outbuf; + RTPJitterBufferItem *item; guint16 seqnum; guint32 next_seqnum; gint gap; - GstRTPBuffer rtp = { NULL, }; /* only push buffers when PLAYING and active and not buffering */ if (priv->blocked || !priv->active || @@ -2261,14 +2278,12 @@ again: * If all is fine, we'll pop and push it. If the sequence number is wrong we * wait for a timeout or something to change. * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */ - outbuf = rtp_jitter_buffer_peek (priv->jbuf); - if (outbuf == NULL) + item = rtp_jitter_buffer_peek (priv->jbuf); + if (item == NULL) goto wait; /* get the seqnum and the next expected seqnum */ - gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp); - seqnum = gst_rtp_buffer_get_seq (&rtp); - gst_rtp_buffer_unmap (&rtp); + seqnum = item->seqnum; next_seqnum = priv->next_seqnum; @@ -2288,12 +2303,13 @@ again: /* no missing packet, pop and push */ result = pop_and_push_next (jitterbuffer, seqnum); } else if (G_UNLIKELY (gap < 0)) { + RTPJitterBufferItem *item; /* if we have a packet that we already pushed or considered dropped, pop it * off and get the next packet */ GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping", seqnum, next_seqnum); - outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL); - gst_buffer_unref (outbuf); + item = rtp_jitter_buffer_pop (priv->jbuf, NULL); + free_item (item); goto again; } else { /* the chain function has scheduled timers to request retransmission or diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c index dce01394bc..3641f4cf50 100644 --- a/gst/rtpmanager/rtpjitterbuffer.c +++ b/gst/rtpmanager/rtpjitterbuffer.c @@ -250,29 +250,24 @@ rtp_jitter_buffer_resync (RTPJitterBuffer * jbuf, GstClockTime time, static guint64 get_buffer_level (RTPJitterBuffer * jbuf) { - GstBuffer *high_buf = NULL, *low_buf = NULL; + RTPJitterBufferItem *high_buf = NULL, *low_buf = NULL; guint64 level; - GList *find; /* first first buffer with timestamp */ - find = g_queue_peek_head_link (jbuf->packets); - while (find) { - high_buf = find->data; - if (GST_BUFFER_TIMESTAMP (high_buf) != -1) + high_buf = (RTPJitterBufferItem *) g_queue_peek_head_link (jbuf->packets); + while (high_buf) { + if (high_buf->dts != -1) break; - high_buf = NULL; - find = g_list_next (find); + high_buf = (RTPJitterBufferItem *) g_list_next (high_buf); } - find = g_queue_peek_tail_link (jbuf->packets); - while (find) { - low_buf = find->data; - if (GST_BUFFER_TIMESTAMP (low_buf) != -1) + low_buf = (RTPJitterBufferItem *) g_queue_peek_tail_link (jbuf->packets); + while (low_buf) { + if (low_buf->dts != -1) break; - low_buf = NULL; - find = g_list_previous (find); + low_buf = (RTPJitterBufferItem *) g_list_previous (low_buf); } if (!high_buf || !low_buf || high_buf == low_buf) { @@ -280,8 +275,8 @@ get_buffer_level (RTPJitterBuffer * jbuf) } else { guint64 high_ts, low_ts; - high_ts = GST_BUFFER_TIMESTAMP (high_buf); - low_ts = GST_BUFFER_TIMESTAMP (low_buf); + high_ts = high_buf->dts; + low_ts = low_buf->dts; if (high_ts > low_ts) level = high_ts - low_ts; @@ -611,46 +606,73 @@ no_skew: return out_time; } +static void +queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item) +{ + GQueue *queue = jbuf->packets; + GList *walk; + + /* It's more likely that the packet was inserted in the front of the buffer */ + if (G_LIKELY (list)) { + item->prev = list->prev; + item->next = list; + list->prev = item; + if (item->prev) { + item->prev->next = item; + } else { + queue->head = item; + } + } else { + queue->tail = g_list_concat (queue->tail, item); + if (queue->tail->next) + queue->tail = queue->tail->next; + else + queue->head = queue->tail; + } + queue->length++; + + GST_DEBUG ("head %p, tail %p", queue->head, queue->tail); + for (walk = queue->head; walk; walk = walk->next) { + RTPJitterBufferItem *item = (RTPJitterBufferItem *) walk; + GST_DEBUG ("item %p, next %p, prev %p, #%d", + item, item->next, item->prev, item->seqnum); + } +} + /** * rtp_jitter_buffer_insert: * @jbuf: an #RTPJitterBuffer - * @buf: a buffer - * @time: a running_time when this buffer was received in nanoseconds - * @max_delay: the maximum lateness of @buf + * @item: an #RTPJitterBufferItem to insert * @tail: TRUE when the tail element changed. + * @percent: the buffering percent after insertion * - * Inserts @buf into the packet queue of @jbuf. The sequence number of the + * Inserts @item into the packet queue of @jbuf. The sequence number of the * packet will be used to sort the packets. This function takes ownerhip of * @buf when the function returns %TRUE. - * @buf should have writable metadata when calling this function. * * Returns: %FALSE if a packet with the same number already existed. */ gboolean -rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf, - GstClockTime time, gboolean * tail, gint * percent) +rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item, + gboolean * tail, gint * percent) { GList *list; guint32 rtptime; guint16 seqnum; - GstRTPBuffer rtp = { NULL }; + GstClockTime dts; g_return_val_if_fail (jbuf != NULL, FALSE); - g_return_val_if_fail (buf != NULL, FALSE); + g_return_val_if_fail (item != NULL, FALSE); - gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp); - - seqnum = gst_rtp_buffer_get_seq (&rtp); + seqnum = item->seqnum; /* loop the list to skip strictly smaller seqnum buffers */ for (list = jbuf->packets->head; list; list = g_list_next (list)) { guint16 qseq; gint gap; - GstRTPBuffer rtpb = { NULL }; + RTPJitterBufferItem *qitem = (RTPJitterBufferItem *) list; - gst_rtp_buffer_map (GST_BUFFER_CAST (list->data), GST_MAP_READ, &rtpb); - qseq = gst_rtp_buffer_get_seq (&rtpb); - gst_rtp_buffer_unmap (&rtpb); + qseq = qitem->seqnum; /* compare the new seqnum to the one in the buffer */ gap = gst_rtp_buffer_compare_seqnum (seqnum, qseq); @@ -664,13 +686,15 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf, break; } - rtptime = gst_rtp_buffer_get_timestamp (&rtp); + dts = item->dts; + rtptime = item->rtptime; + /* rtp time jumps are checked for during skew calculation, but bypassed * in other mode, so mind those here and reset jb if needed. * Only reset if valid input time, which is likely for UDP input * where we expect this might happen due to async thread effects * (in seek and state change cycles), but not so much for TCP input */ - if (GST_CLOCK_TIME_IS_VALID (time) && + if (GST_CLOCK_TIME_IS_VALID (dts) && jbuf->mode != RTP_JITTER_BUFFER_MODE_SLAVE && jbuf->base_time != -1 && jbuf->last_rtptime != -1) { GstClockTime ext_rtptime = jbuf->ext_rtptime; @@ -693,25 +717,19 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf, * mode we will adjust the outgoing timestamps according to the amount of * time we spent buffering. */ if (jbuf->base_time == -1) - time = 0; + dts = 0; else - time = -1; + dts = -1; break; case RTP_JITTER_BUFFER_MODE_SLAVE: default: break; } /* do skew calculation by measuring the difference between rtptime and the - * receive time, this function will retimestamp @buf with the skew corrected - * running time. */ - time = calculate_skew (jbuf, rtptime, time); - GST_BUFFER_PTS (buf) = time; + * receive dts, this function will return the skew corrected rtptime. */ + item->pts = calculate_skew (jbuf, rtptime, dts); - /* It's more likely that the packet was inserted in the front of the buffer */ - if (G_LIKELY (list)) - g_queue_insert_before (jbuf->packets, list, buf); - else - g_queue_push_tail (jbuf->packets, buf); + queue_do_insert (jbuf, list, (GList *) item); /* buffering mode, update buffer stats */ if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER) @@ -724,14 +742,11 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf, if (G_LIKELY (tail)) *tail = (list == NULL); - gst_rtp_buffer_unmap (&rtp); - return TRUE; /* ERRORS */ duplicate: { - gst_rtp_buffer_unmap (&rtp); GST_WARNING ("duplicate packet %d found", (gint) seqnum); return FALSE; } @@ -748,14 +763,25 @@ duplicate: * * Returns: a #GstBuffer or %NULL when there was no packet in the queue. */ -GstBuffer * +RTPJitterBufferItem * rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent) { - GstBuffer *buf; + GList *item = NULL; + GQueue *queue; g_return_val_if_fail (jbuf != NULL, NULL); - buf = g_queue_pop_tail (jbuf->packets); + queue = jbuf->packets; + + item = queue->tail; + if (item) { + queue->tail = item->prev; + if (queue->tail) + queue->tail->next = NULL; + else + queue->head = NULL; + queue->length--; + } /* buffering mode, update buffer stats */ if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER) @@ -763,7 +789,7 @@ rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent) else if (percent) *percent = -1; - return buf; + return (RTPJitterBufferItem *) item; } /** @@ -776,16 +802,12 @@ rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent) * * Returns: a #GstBuffer or %NULL when there was no packet in the queue. */ -GstBuffer * +RTPJitterBufferItem * rtp_jitter_buffer_peek (RTPJitterBuffer * jbuf) { - GstBuffer *buf; - g_return_val_if_fail (jbuf != NULL, NULL); - buf = g_queue_peek_tail (jbuf->packets); - - return buf; + return (RTPJitterBufferItem *) jbuf->packets->tail; } /** diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h index ec4cad4ef9..7b47b78f81 100644 --- a/gst/rtpmanager/rtpjitterbuffer.h +++ b/gst/rtpmanager/rtpjitterbuffer.h @@ -25,6 +25,7 @@ typedef struct _RTPJitterBuffer RTPJitterBuffer; typedef struct _RTPJitterBufferClass RTPJitterBufferClass; +typedef struct _RTPJitterBufferItem RTPJitterBufferItem; #define RTP_TYPE_JITTER_BUFFER (rtp_jitter_buffer_get_type()) #define RTP_JITTER_BUFFER(src) (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_JITTER_BUFFER,RTPJitterBuffer)) @@ -99,6 +100,28 @@ struct _RTPJitterBufferClass { GObjectClass parent_class; }; +/** + * RTPJitterBufferItem: + * @data: the data of the item + * @next: pointer to next item + * @prev: pointer to previous item + * @dts: input DTS + * @pts: output PTS + * @seqnum: seqnum + * @rtptime: rtp timestamp + * + * An object containing an RTP packet or event. + */ +struct _RTPJitterBufferItem { + gpointer data; + GList *next; + GList *prev; + GstClockTime dts; + GstClockTime pts; + guint seqnum; + guint rtptime; +}; + GType rtp_jitter_buffer_get_type (void); /* managing lifetime */ @@ -115,11 +138,11 @@ guint32 rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf) void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf); -gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf, - GstClockTime time, +gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, + RTPJitterBufferItem *item, gboolean *tail, gint *percent); -GstBuffer * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf); -GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf, gint *percent); +RTPJitterBufferItem * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf); +RTPJitterBufferItem * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf, gint *percent); void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf);