rtpjitterbuffer: use structure to hold packet information

Make the jitterbuffer operate on a structure containing all the packet
information. This avoids mapping the buffer multiple times just to get the RTP
information. It will also make it possible to store other miniobjects such as
events later.
This commit is contained in:
Wim Taymans 2013-09-20 23:48:20 +02:00
parent 1760817005
commit 39a2ba669d
3 changed files with 163 additions and 102 deletions

View file

@ -838,7 +838,7 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
{
GstRtpJitterBufferPrivate *priv;
GstClockTime last_out;
GstBuffer *head;
RTPJitterBufferItem *item;
priv = jbuf->priv;
@ -859,9 +859,9 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
if (!active) {
rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
}
if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
/* head buffer timestamp and offset gives our output time */
last_out = GST_BUFFER_DTS (head) + priv->ts_offset;
last_out = item->dts + priv->ts_offset;
} else {
/* use last known time when the buffer is empty */
last_out = priv->last_out_time;
@ -1809,6 +1809,20 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
}
}
static RTPJitterBufferItem *
alloc_item (void)
{
return g_slice_new (RTPJitterBufferItem);
}
static void
free_item (RTPJitterBufferItem * item)
{
if (item->data)
gst_mini_object_unref (item->data);
g_slice_free (RTPJitterBufferItem, item);
}
static GstFlowReturn
gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
@ -1825,6 +1839,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
guint8 pt;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
gboolean do_next_seqnum = FALSE;
RTPJitterBufferItem *item;
jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
@ -1986,27 +2001,28 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
GstBuffer *old_buf;
old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
RTPJitterBufferItem *old_item;
old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
old_buf);
gst_buffer_unref (old_buf);
old_item);
free_item (old_item);
}
}
/* we need to make the metadata writable before pushing it in the jitterbuffer
* because the jitterbuffer will update the PTS */
buffer = gst_buffer_make_writable (buffer);
GST_BUFFER_DTS (buffer) = dts;
GST_BUFFER_PTS (buffer) = pts;
item = alloc_item ();
item->data = buffer;
item->next = NULL;
item->prev = NULL;
item->dts = dts;
item->pts = pts;
item->seqnum = seqnum;
item->rtptime = rtptime;
/* now insert the packet into the queue in sorted order. This function returns
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, dts,
if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
&tail, &percent)))
goto duplicate;
@ -2084,23 +2100,20 @@ duplicate:
GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
seqnum);
priv->num_duplicates++;
gst_buffer_unref (buffer);
free_item (item);
goto finished;
}
}
static GstClockTime
compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
{
guint64 ext_time, elapsed;
guint32 rtp_time;
GstRtpJitterBufferPrivate *priv;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
priv = jitterbuffer->priv;
gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
gst_rtp_buffer_unmap (&rtp);
rtp_time = item->rtptime;
GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
@ -2121,7 +2134,8 @@ compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
}
static void
update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
RTPJitterBufferItem * item)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
@ -2129,7 +2143,7 @@ update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
&& priv->clock_base != -1 && priv->clock_rate > 0) {
guint64 elapsed, estimated;
elapsed = compute_elapsed (jitterbuffer, outbuf);
elapsed = compute_elapsed (jitterbuffer, item);
if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
guint64 left;
@ -2141,7 +2155,7 @@ update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
GST_TIME_ARGS (left));
out_time = GST_BUFFER_DTS (outbuf);
out_time = item->dts;
if (elapsed > 0)
estimated = gst_util_uint64_scale (out_time, left, elapsed);
@ -2171,15 +2185,20 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn result;
RTPJitterBufferItem *item;
GstBuffer *outbuf;
GstClockTime dts, pts;
gint percent = -1;
/* when we get here we are ready to pop and push the buffer */
outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
check_buffering_percent (jitterbuffer, &percent);
/* we need to make writable to change the flags and timestamps */
outbuf = gst_buffer_make_writable (item->data);
item->data = NULL;
if (G_UNLIKELY (priv->discont)) {
/* set DISCONT flag when we missed a packet. We pushed the buffer writable
* into the jitterbuffer so we can modify now. */
@ -2192,18 +2211,15 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
priv->ts_discont = FALSE;
}
dts = GST_BUFFER_DTS (outbuf);
pts = GST_BUFFER_PTS (outbuf);
dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, dts);
pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, pts);
dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
/* apply timestamp with offset to buffer now */
GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
/* update the elapsed time when we need to check against the npt stop time. */
update_estimated_eos (jitterbuffer, outbuf);
update_estimated_eos (jitterbuffer, item);
/* now we are ready to push the buffer. Save the seqnum and release the lock
* so the other end can push stuff in the queue again. */
@ -2212,6 +2228,8 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
priv->next_seqnum = (seqnum + 1) & 0xffff;
JBUF_UNLOCK (priv);
free_item (item);
if (percent != -1)
post_buffering_percent (jitterbuffer, percent);
@ -2245,11 +2263,10 @@ handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn result = GST_FLOW_OK;
GstBuffer *outbuf;
RTPJitterBufferItem *item;
guint16 seqnum;
guint32 next_seqnum;
gint gap;
GstRTPBuffer rtp = { NULL, };
/* only push buffers when PLAYING and active and not buffering */
if (priv->blocked || !priv->active ||
@ -2261,14 +2278,12 @@ again:
* If all is fine, we'll pop and push it. If the sequence number is wrong we
* wait for a timeout or something to change.
* The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
outbuf = rtp_jitter_buffer_peek (priv->jbuf);
if (outbuf == NULL)
item = rtp_jitter_buffer_peek (priv->jbuf);
if (item == NULL)
goto wait;
/* get the seqnum and the next expected seqnum */
gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
seqnum = gst_rtp_buffer_get_seq (&rtp);
gst_rtp_buffer_unmap (&rtp);
seqnum = item->seqnum;
next_seqnum = priv->next_seqnum;
@ -2288,12 +2303,13 @@ again:
/* no missing packet, pop and push */
result = pop_and_push_next (jitterbuffer, seqnum);
} else if (G_UNLIKELY (gap < 0)) {
RTPJitterBufferItem *item;
/* if we have a packet that we already pushed or considered dropped, pop it
* off and get the next packet */
GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
seqnum, next_seqnum);
outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL);
gst_buffer_unref (outbuf);
item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
free_item (item);
goto again;
} else {
/* the chain function has scheduled timers to request retransmission or

View file

@ -250,29 +250,24 @@ rtp_jitter_buffer_resync (RTPJitterBuffer * jbuf, GstClockTime time,
static guint64
get_buffer_level (RTPJitterBuffer * jbuf)
{
GstBuffer *high_buf = NULL, *low_buf = NULL;
RTPJitterBufferItem *high_buf = NULL, *low_buf = NULL;
guint64 level;
GList *find;
/* first first buffer with timestamp */
find = g_queue_peek_head_link (jbuf->packets);
while (find) {
high_buf = find->data;
if (GST_BUFFER_TIMESTAMP (high_buf) != -1)
high_buf = (RTPJitterBufferItem *) g_queue_peek_head_link (jbuf->packets);
while (high_buf) {
if (high_buf->dts != -1)
break;
high_buf = NULL;
find = g_list_next (find);
high_buf = (RTPJitterBufferItem *) g_list_next (high_buf);
}
find = g_queue_peek_tail_link (jbuf->packets);
while (find) {
low_buf = find->data;
if (GST_BUFFER_TIMESTAMP (low_buf) != -1)
low_buf = (RTPJitterBufferItem *) g_queue_peek_tail_link (jbuf->packets);
while (low_buf) {
if (low_buf->dts != -1)
break;
low_buf = NULL;
find = g_list_previous (find);
low_buf = (RTPJitterBufferItem *) g_list_previous (low_buf);
}
if (!high_buf || !low_buf || high_buf == low_buf) {
@ -280,8 +275,8 @@ get_buffer_level (RTPJitterBuffer * jbuf)
} else {
guint64 high_ts, low_ts;
high_ts = GST_BUFFER_TIMESTAMP (high_buf);
low_ts = GST_BUFFER_TIMESTAMP (low_buf);
high_ts = high_buf->dts;
low_ts = low_buf->dts;
if (high_ts > low_ts)
level = high_ts - low_ts;
@ -611,46 +606,73 @@ no_skew:
return out_time;
}
static void
queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item)
{
GQueue *queue = jbuf->packets;
GList *walk;
/* It's more likely that the packet was inserted in the front of the buffer */
if (G_LIKELY (list)) {
item->prev = list->prev;
item->next = list;
list->prev = item;
if (item->prev) {
item->prev->next = item;
} else {
queue->head = item;
}
} else {
queue->tail = g_list_concat (queue->tail, item);
if (queue->tail->next)
queue->tail = queue->tail->next;
else
queue->head = queue->tail;
}
queue->length++;
GST_DEBUG ("head %p, tail %p", queue->head, queue->tail);
for (walk = queue->head; walk; walk = walk->next) {
RTPJitterBufferItem *item = (RTPJitterBufferItem *) walk;
GST_DEBUG ("item %p, next %p, prev %p, #%d",
item, item->next, item->prev, item->seqnum);
}
}
/**
* rtp_jitter_buffer_insert:
* @jbuf: an #RTPJitterBuffer
* @buf: a buffer
* @time: a running_time when this buffer was received in nanoseconds
* @max_delay: the maximum lateness of @buf
* @item: an #RTPJitterBufferItem to insert
* @tail: TRUE when the tail element changed.
* @percent: the buffering percent after insertion
*
* Inserts @buf into the packet queue of @jbuf. The sequence number of the
* Inserts @item into the packet queue of @jbuf. The sequence number of the
* packet will be used to sort the packets. This function takes ownerhip of
* @buf when the function returns %TRUE.
* @buf should have writable metadata when calling this function.
*
* Returns: %FALSE if a packet with the same number already existed.
*/
gboolean
rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
GstClockTime time, gboolean * tail, gint * percent)
rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
gboolean * tail, gint * percent)
{
GList *list;
guint32 rtptime;
guint16 seqnum;
GstRTPBuffer rtp = { NULL };
GstClockTime dts;
g_return_val_if_fail (jbuf != NULL, FALSE);
g_return_val_if_fail (buf != NULL, FALSE);
g_return_val_if_fail (item != NULL, FALSE);
gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp);
seqnum = gst_rtp_buffer_get_seq (&rtp);
seqnum = item->seqnum;
/* loop the list to skip strictly smaller seqnum buffers */
for (list = jbuf->packets->head; list; list = g_list_next (list)) {
guint16 qseq;
gint gap;
GstRTPBuffer rtpb = { NULL };
RTPJitterBufferItem *qitem = (RTPJitterBufferItem *) list;
gst_rtp_buffer_map (GST_BUFFER_CAST (list->data), GST_MAP_READ, &rtpb);
qseq = gst_rtp_buffer_get_seq (&rtpb);
gst_rtp_buffer_unmap (&rtpb);
qseq = qitem->seqnum;
/* compare the new seqnum to the one in the buffer */
gap = gst_rtp_buffer_compare_seqnum (seqnum, qseq);
@ -664,13 +686,15 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
break;
}
rtptime = gst_rtp_buffer_get_timestamp (&rtp);
dts = item->dts;
rtptime = item->rtptime;
/* rtp time jumps are checked for during skew calculation, but bypassed
* in other mode, so mind those here and reset jb if needed.
* Only reset if valid input time, which is likely for UDP input
* where we expect this might happen due to async thread effects
* (in seek and state change cycles), but not so much for TCP input */
if (GST_CLOCK_TIME_IS_VALID (time) &&
if (GST_CLOCK_TIME_IS_VALID (dts) &&
jbuf->mode != RTP_JITTER_BUFFER_MODE_SLAVE &&
jbuf->base_time != -1 && jbuf->last_rtptime != -1) {
GstClockTime ext_rtptime = jbuf->ext_rtptime;
@ -693,25 +717,19 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
* mode we will adjust the outgoing timestamps according to the amount of
* time we spent buffering. */
if (jbuf->base_time == -1)
time = 0;
dts = 0;
else
time = -1;
dts = -1;
break;
case RTP_JITTER_BUFFER_MODE_SLAVE:
default:
break;
}
/* do skew calculation by measuring the difference between rtptime and the
* receive time, this function will retimestamp @buf with the skew corrected
* running time. */
time = calculate_skew (jbuf, rtptime, time);
GST_BUFFER_PTS (buf) = time;
* receive dts, this function will return the skew corrected rtptime. */
item->pts = calculate_skew (jbuf, rtptime, dts);
/* It's more likely that the packet was inserted in the front of the buffer */
if (G_LIKELY (list))
g_queue_insert_before (jbuf->packets, list, buf);
else
g_queue_push_tail (jbuf->packets, buf);
queue_do_insert (jbuf, list, (GList *) item);
/* buffering mode, update buffer stats */
if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
@ -724,14 +742,11 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
if (G_LIKELY (tail))
*tail = (list == NULL);
gst_rtp_buffer_unmap (&rtp);
return TRUE;
/* ERRORS */
duplicate:
{
gst_rtp_buffer_unmap (&rtp);
GST_WARNING ("duplicate packet %d found", (gint) seqnum);
return FALSE;
}
@ -748,14 +763,25 @@ duplicate:
*
* Returns: a #GstBuffer or %NULL when there was no packet in the queue.
*/
GstBuffer *
RTPJitterBufferItem *
rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent)
{
GstBuffer *buf;
GList *item = NULL;
GQueue *queue;
g_return_val_if_fail (jbuf != NULL, NULL);
buf = g_queue_pop_tail (jbuf->packets);
queue = jbuf->packets;
item = queue->tail;
if (item) {
queue->tail = item->prev;
if (queue->tail)
queue->tail->next = NULL;
else
queue->head = NULL;
queue->length--;
}
/* buffering mode, update buffer stats */
if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
@ -763,7 +789,7 @@ rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent)
else if (percent)
*percent = -1;
return buf;
return (RTPJitterBufferItem *) item;
}
/**
@ -776,16 +802,12 @@ rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent)
*
* Returns: a #GstBuffer or %NULL when there was no packet in the queue.
*/
GstBuffer *
RTPJitterBufferItem *
rtp_jitter_buffer_peek (RTPJitterBuffer * jbuf)
{
GstBuffer *buf;
g_return_val_if_fail (jbuf != NULL, NULL);
buf = g_queue_peek_tail (jbuf->packets);
return buf;
return (RTPJitterBufferItem *) jbuf->packets->tail;
}
/**

View file

@ -25,6 +25,7 @@
typedef struct _RTPJitterBuffer RTPJitterBuffer;
typedef struct _RTPJitterBufferClass RTPJitterBufferClass;
typedef struct _RTPJitterBufferItem RTPJitterBufferItem;
#define RTP_TYPE_JITTER_BUFFER (rtp_jitter_buffer_get_type())
#define RTP_JITTER_BUFFER(src) (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_JITTER_BUFFER,RTPJitterBuffer))
@ -99,6 +100,28 @@ struct _RTPJitterBufferClass {
GObjectClass parent_class;
};
/**
* RTPJitterBufferItem:
* @data: the data of the item
* @next: pointer to next item
* @prev: pointer to previous item
* @dts: input DTS
* @pts: output PTS
* @seqnum: seqnum
* @rtptime: rtp timestamp
*
* An object containing an RTP packet or event.
*/
struct _RTPJitterBufferItem {
gpointer data;
GList *next;
GList *prev;
GstClockTime dts;
GstClockTime pts;
guint seqnum;
guint rtptime;
};
GType rtp_jitter_buffer_get_type (void);
/* managing lifetime */
@ -115,11 +138,11 @@ guint32 rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf)
void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf);
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf,
GstClockTime time,
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf,
RTPJitterBufferItem *item,
gboolean *tail, gint *percent);
GstBuffer * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf);
GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf, gint *percent);
RTPJitterBufferItem * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf);
RTPJitterBufferItem * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf, gint *percent);
void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf);