gst/rtpmanager/gstrtpjitterbuffer.c: Only peek at the tail element instead of popping it off, which allows us to grea...

Original commit message from CVS:
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop):
Only peek at the tail element instead of popping it off, which allows
us to greatly simplify things when the tail element changes.
* gst/rtpmanager/gstrtpsession.c:
(gst_rtp_session_event_recv_rtp_sink):
* gst/rtpmanager/gstrtpssrcdemux.c:
(gst_rtp_ssrc_demux_sink_event):
Forward FLUSH events instead of leaking them.
* gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_reset_skew),
(calculate_skew), (rtp_jitter_buffer_insert):
* gst/rtpmanager/rtpjitterbuffer.h:
Remove the tail-changed callback in favour of a simple boolean when we
insert a buffer in the queue.
Add method to peek the tail of the buffer.
This commit is contained in:
Wim Taymans 2007-10-05 12:07:37 +00:00
parent 69c24a99e4
commit 6e5d23b3d7
6 changed files with 55 additions and 46 deletions

View file

@ -1,3 +1,23 @@
2007-10-05 Wim Taymans <wim.taymans@gmail.com>
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop):
Only peek at the tail element instead of popping it off, which allows
us to greatly simplify things when the tail element changes.
* gst/rtpmanager/gstrtpsession.c:
(gst_rtp_session_event_recv_rtp_sink):
* gst/rtpmanager/gstrtpssrcdemux.c:
(gst_rtp_ssrc_demux_sink_event):
Forward FLUSH events instead of leaking them.
* gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_reset_skew),
(calculate_skew), (rtp_jitter_buffer_insert):
* gst/rtpmanager/rtpjitterbuffer.h:
Remove the tail-changed callback in favour of a simple boolean when we
insert a buffer in the queue.
Add method to peek the tail of the buffer.
2007-10-05 Sebastian Dröge <slomo@circular-chaos.org> 2007-10-05 Sebastian Dröge <slomo@circular-chaos.org>
Patch by: Gautier Portet <kassoulet at gmail dot com> Patch by: Gautier Portet <kassoulet at gmail dot com>

View file

@ -135,6 +135,7 @@ struct _GstRtpJitterBufferPrivate
RTPJitterBuffer *jbuf; RTPJitterBuffer *jbuf;
GMutex *jbuf_lock; GMutex *jbuf_lock;
GCond *jbuf_cond; GCond *jbuf_cond;
gboolean waiting;
/* properties */ /* properties */
guint latency_ms; guint latency_ms;
@ -798,6 +799,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
GstFlowReturn ret = GST_FLOW_OK; GstFlowReturn ret = GST_FLOW_OK;
GstClockTime timestamp; GstClockTime timestamp;
guint64 latency_ts; guint64 latency_ts;
gboolean tail;
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
@ -868,19 +870,19 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
/* now insert the packet into the queue in sorted order. This function returns /* now insert the packet into the queue in sorted order. This function returns
* FALSE if a packet with the same seqnum was already in the queue, meaning we * FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */ * have a duplicate. */
if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp)) if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp, &tail))
goto duplicate; goto duplicate;
/* signal addition of new buffer */ /* signal addition of new buffer when the _loop is waiting. */
JBUF_SIGNAL (priv); if (priv->waiting)
JBUF_SIGNAL (priv);
/* let's unschedule and unblock any waiting buffers. We only want to do this /* let's unschedule and unblock any waiting buffers. We only want to do this
* if there is a currently waiting newer (> seqnum) buffer */ * when the tail buffer changed */
if (priv->clock_id) { if (priv->clock_id && tail) {
if (priv->waiting_seqnum > seqnum) { GST_DEBUG_OBJECT (jitterbuffer,
gst_clock_id_unschedule (priv->clock_id); "Unscheduling waiting buffer, new tail buffer");
GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting buffer"); gst_clock_id_unschedule (priv->clock_id);
}
} }
GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets", GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
@ -963,7 +965,7 @@ apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
* *
* For each pushed buffer, the seqnum is recorded, if the next buffer B has a * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
* different seqnum (missing packets before B), this function will wait for the * different seqnum (missing packets before B), this function will wait for the
* missing packet to arrive up to the rtp timestamp of buffer B. * missing packet to arrive up to the timestamp of buffer B.
*/ */
static void static void
gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
@ -978,7 +980,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
JBUF_LOCK_CHECK (priv, flushing); JBUF_LOCK_CHECK (priv, flushing);
again: again:
GST_DEBUG_OBJECT (jitterbuffer, "Popping item"); GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
while (TRUE) { while (TRUE) {
/* always wait if we are blocked */ /* always wait if we are blocked */
@ -991,11 +993,17 @@ again:
goto do_eos; goto do_eos;
} }
/* wait for packets or flushing now */ /* wait for packets or flushing now */
priv->waiting = TRUE;
JBUF_WAIT_CHECK (priv, flushing); JBUF_WAIT_CHECK (priv, flushing);
priv->waiting = FALSE;
} }
/* pop a buffer, we must have a buffer now */ /* peek a buffer, we're just looking at the timestamp and the sequence number.
outbuf = rtp_jitter_buffer_pop (priv->jbuf); * If all is fine, we'll pop and push it. If the sequence number is wrong we
* wait on the timestamp. In the chain function we will unlock the wait when a
* new buffer is available. The peeked buffer is valid for as long as we hold
* the jitterbuffer lock. */
outbuf = rtp_jitter_buffer_peek (priv->jbuf);
seqnum = gst_rtp_buffer_get_seq (outbuf); seqnum = gst_rtp_buffer_get_seq (outbuf);
/* get the timestamp, this is already corrected for clock skew by the /* get the timestamp, this is already corrected for clock skew by the
@ -1003,7 +1011,7 @@ again:
timestamp = GST_BUFFER_TIMESTAMP (outbuf); timestamp = GST_BUFFER_TIMESTAMP (outbuf);
GST_DEBUG_OBJECT (jitterbuffer, GST_DEBUG_OBJECT (jitterbuffer,
"Popped buffer #%d, timestamp %" GST_TIME_FORMAT ", now %d left", "Peeked buffer #%d, timestamp %" GST_TIME_FORMAT ", now %d left",
seqnum, GST_TIME_ARGS (timestamp), seqnum, GST_TIME_ARGS (timestamp),
rtp_jitter_buffer_num_packets (priv->jbuf)); rtp_jitter_buffer_num_packets (priv->jbuf));
@ -1082,20 +1090,15 @@ again:
if (ret == GST_CLOCK_UNSCHEDULED) { if (ret == GST_CLOCK_UNSCHEDULED) {
GST_DEBUG_OBJECT (jitterbuffer, GST_DEBUG_OBJECT (jitterbuffer,
"Wait got unscheduled, will retry to push with new buffer"); "Wait got unscheduled, will retry to push with new buffer");
/* reinsert popped buffer into queue, no need to recalculate skew, we do
* that when inserting the buffer in the chain function */
if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf, -1)) {
GST_DEBUG_OBJECT (jitterbuffer,
"Duplicate packet #%d detected, dropping", seqnum);
priv->num_duplicates++;
gst_buffer_unref (outbuf);
}
goto again; goto again;
} }
/* Get new timestamp, latency might have changed */ /* Get new timestamp, latency might have changed */
out_time = apply_offset (jitterbuffer, timestamp); out_time = apply_offset (jitterbuffer, timestamp);
} }
push_buffer: push_buffer:
/* when we get here we are ready to pop and push the buffer */
outbuf = rtp_jitter_buffer_pop (priv->jbuf);
/* check if we are pushing something unexpected */ /* check if we are pushing something unexpected */
if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) { if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
gint dropped; gint dropped;

View file

@ -1020,6 +1020,7 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED); gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
break; break;
case GST_EVENT_NEWSEGMENT: case GST_EVENT_NEWSEGMENT:
{ {

View file

@ -344,7 +344,6 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED); gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
break;
case GST_EVENT_NEWSEGMENT: case GST_EVENT_NEWSEGMENT:
default: default:
{ {

View file

@ -99,16 +99,6 @@ rtp_jitter_buffer_new (void)
return jbuf; return jbuf;
} }
void
rtp_jitter_buffer_set_tail_changed (RTPJitterBuffer * jbuf, RTPTailChanged func,
gpointer user_data)
{
g_return_if_fail (jbuf != NULL);
jbuf->tail_changed = func;
jbuf->user_data = user_data;
}
void void
rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer * jbuf, gint clock_rate) rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer * jbuf, gint clock_rate)
{ {
@ -374,6 +364,7 @@ compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf)
* @jbuf: an #RTPJitterBuffer * @jbuf: an #RTPJitterBuffer
* @buf: a buffer * @buf: a buffer
* @time: a running_time when this buffer was received in nanoseconds * @time: a running_time when this buffer was received in nanoseconds
* @tail: TRUE when the tail element changed.
* *
* Inserts @buf into the packet queue of @jbuf. The sequence number of the * Inserts @buf into the packet queue of @jbuf. The sequence number of the
* packet will be used to sort the packets. This function takes ownerhip of * packet will be used to sort the packets. This function takes ownerhip of
@ -383,7 +374,7 @@ compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf)
*/ */
gboolean gboolean
rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf, rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
GstClockTime time) GstClockTime time, gboolean * tail)
{ {
GList *list; GList *list;
gint func_ret = 1; gint func_ret = 1;
@ -412,13 +403,12 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
if (list) if (list)
g_queue_insert_before (jbuf->packets, list, buf); g_queue_insert_before (jbuf->packets, list, buf);
else { else
g_queue_push_tail (jbuf->packets, buf); g_queue_push_tail (jbuf->packets, buf);
/* tail buffer changed, signal callback */ /* tail was changed when we did not find a previous packet */
if (jbuf->tail_changed) if (tail)
jbuf->tail_changed (jbuf, jbuf->user_data); *tail = (list == NULL);
}
return TRUE; return TRUE;
} }

View file

@ -67,9 +67,6 @@ struct _RTPJitterBuffer {
gint64 window_min; gint64 window_min;
gint64 skew; gint64 skew;
gint64 prev_send_diff; gint64 prev_send_diff;
RTPTailChanged tail_changed;
gpointer user_data;
}; };
struct _RTPJitterBufferClass { struct _RTPJitterBufferClass {
@ -81,15 +78,14 @@ GType rtp_jitter_buffer_get_type (void);
/* managing lifetime */ /* managing lifetime */
RTPJitterBuffer* rtp_jitter_buffer_new (void); RTPJitterBuffer* rtp_jitter_buffer_new (void);
void rtp_jitter_buffer_set_tail_changed (RTPJitterBuffer *jbuf, RTPTailChanged func,
gpointer user_data);
void rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer *jbuf, gint clock_rate); void rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer *jbuf, gint clock_rate);
gint rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf); gint rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf);
void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf); void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf);
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf, GstClockTime time); gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf,
GstClockTime time, gboolean *tail);
GstBuffer * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf);
GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf); GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf);
void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf); void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf);