mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-23 18:21:04 +00:00
gst/rtpmanager/gstrtpjitterbuffer.c: Only peek at the tail element instead of popping it off, which allows us to grea...
Original commit message from CVS: * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop): Only peek at the tail element instead of popping it off, which allows us to greatly simplify things when the tail element changes. * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_event_recv_rtp_sink): * gst/rtpmanager/gstrtpssrcdemux.c: (gst_rtp_ssrc_demux_sink_event): Forward FLUSH events instead of leaking them. * gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_reset_skew), (calculate_skew), (rtp_jitter_buffer_insert): * gst/rtpmanager/rtpjitterbuffer.h: Remove the tail-changed callback in favour of a simple boolean when we insert a buffer in the queue. Add method to peek the tail of the buffer.
This commit is contained in:
parent
b09507ab0c
commit
387f41e157
5 changed files with 35 additions and 46 deletions
|
@ -135,6 +135,7 @@ struct _GstRtpJitterBufferPrivate
|
||||||
RTPJitterBuffer *jbuf;
|
RTPJitterBuffer *jbuf;
|
||||||
GMutex *jbuf_lock;
|
GMutex *jbuf_lock;
|
||||||
GCond *jbuf_cond;
|
GCond *jbuf_cond;
|
||||||
|
gboolean waiting;
|
||||||
|
|
||||||
/* properties */
|
/* properties */
|
||||||
guint latency_ms;
|
guint latency_ms;
|
||||||
|
@ -798,6 +799,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
GstFlowReturn ret = GST_FLOW_OK;
|
GstFlowReturn ret = GST_FLOW_OK;
|
||||||
GstClockTime timestamp;
|
GstClockTime timestamp;
|
||||||
guint64 latency_ts;
|
guint64 latency_ts;
|
||||||
|
gboolean tail;
|
||||||
|
|
||||||
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
|
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
|
||||||
|
|
||||||
|
@ -868,19 +870,19 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
||||||
/* now insert the packet into the queue in sorted order. This function returns
|
/* now insert the packet into the queue in sorted order. This function returns
|
||||||
* FALSE if a packet with the same seqnum was already in the queue, meaning we
|
* FALSE if a packet with the same seqnum was already in the queue, meaning we
|
||||||
* have a duplicate. */
|
* have a duplicate. */
|
||||||
if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp))
|
if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp, &tail))
|
||||||
goto duplicate;
|
goto duplicate;
|
||||||
|
|
||||||
/* signal addition of new buffer */
|
/* signal addition of new buffer when the _loop is waiting. */
|
||||||
JBUF_SIGNAL (priv);
|
if (priv->waiting)
|
||||||
|
JBUF_SIGNAL (priv);
|
||||||
|
|
||||||
/* let's unschedule and unblock any waiting buffers. We only want to do this
|
/* let's unschedule and unblock any waiting buffers. We only want to do this
|
||||||
* if there is a currently waiting newer (> seqnum) buffer */
|
* when the tail buffer changed */
|
||||||
if (priv->clock_id) {
|
if (priv->clock_id && tail) {
|
||||||
if (priv->waiting_seqnum > seqnum) {
|
GST_DEBUG_OBJECT (jitterbuffer,
|
||||||
gst_clock_id_unschedule (priv->clock_id);
|
"Unscheduling waiting buffer, new tail buffer");
|
||||||
GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting buffer");
|
gst_clock_id_unschedule (priv->clock_id);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
|
GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
|
||||||
|
@ -963,7 +965,7 @@ apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
|
||||||
*
|
*
|
||||||
* For each pushed buffer, the seqnum is recorded, if the next buffer B has a
|
* For each pushed buffer, the seqnum is recorded, if the next buffer B has a
|
||||||
* different seqnum (missing packets before B), this function will wait for the
|
* different seqnum (missing packets before B), this function will wait for the
|
||||||
* missing packet to arrive up to the rtp timestamp of buffer B.
|
* missing packet to arrive up to the timestamp of buffer B.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
|
gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
|
||||||
|
@ -978,7 +980,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
|
||||||
|
|
||||||
JBUF_LOCK_CHECK (priv, flushing);
|
JBUF_LOCK_CHECK (priv, flushing);
|
||||||
again:
|
again:
|
||||||
GST_DEBUG_OBJECT (jitterbuffer, "Popping item");
|
GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
|
||||||
while (TRUE) {
|
while (TRUE) {
|
||||||
|
|
||||||
/* always wait if we are blocked */
|
/* always wait if we are blocked */
|
||||||
|
@ -991,11 +993,17 @@ again:
|
||||||
goto do_eos;
|
goto do_eos;
|
||||||
}
|
}
|
||||||
/* wait for packets or flushing now */
|
/* wait for packets or flushing now */
|
||||||
|
priv->waiting = TRUE;
|
||||||
JBUF_WAIT_CHECK (priv, flushing);
|
JBUF_WAIT_CHECK (priv, flushing);
|
||||||
|
priv->waiting = FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* pop a buffer, we must have a buffer now */
|
/* peek a buffer, we're just looking at the timestamp and the sequence number.
|
||||||
outbuf = rtp_jitter_buffer_pop (priv->jbuf);
|
* If all is fine, we'll pop and push it. If the sequence number is wrong we
|
||||||
|
* wait on the timestamp. In the chain function we will unlock the wait when a
|
||||||
|
* new buffer is available. The peeked buffer is valid for as long as we hold
|
||||||
|
* the jitterbuffer lock. */
|
||||||
|
outbuf = rtp_jitter_buffer_peek (priv->jbuf);
|
||||||
seqnum = gst_rtp_buffer_get_seq (outbuf);
|
seqnum = gst_rtp_buffer_get_seq (outbuf);
|
||||||
|
|
||||||
/* get the timestamp, this is already corrected for clock skew by the
|
/* get the timestamp, this is already corrected for clock skew by the
|
||||||
|
@ -1003,7 +1011,7 @@ again:
|
||||||
timestamp = GST_BUFFER_TIMESTAMP (outbuf);
|
timestamp = GST_BUFFER_TIMESTAMP (outbuf);
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (jitterbuffer,
|
GST_DEBUG_OBJECT (jitterbuffer,
|
||||||
"Popped buffer #%d, timestamp %" GST_TIME_FORMAT ", now %d left",
|
"Peeked buffer #%d, timestamp %" GST_TIME_FORMAT ", now %d left",
|
||||||
seqnum, GST_TIME_ARGS (timestamp),
|
seqnum, GST_TIME_ARGS (timestamp),
|
||||||
rtp_jitter_buffer_num_packets (priv->jbuf));
|
rtp_jitter_buffer_num_packets (priv->jbuf));
|
||||||
|
|
||||||
|
@ -1082,20 +1090,15 @@ again:
|
||||||
if (ret == GST_CLOCK_UNSCHEDULED) {
|
if (ret == GST_CLOCK_UNSCHEDULED) {
|
||||||
GST_DEBUG_OBJECT (jitterbuffer,
|
GST_DEBUG_OBJECT (jitterbuffer,
|
||||||
"Wait got unscheduled, will retry to push with new buffer");
|
"Wait got unscheduled, will retry to push with new buffer");
|
||||||
/* reinsert popped buffer into queue, no need to recalculate skew, we do
|
|
||||||
* that when inserting the buffer in the chain function */
|
|
||||||
if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf, -1)) {
|
|
||||||
GST_DEBUG_OBJECT (jitterbuffer,
|
|
||||||
"Duplicate packet #%d detected, dropping", seqnum);
|
|
||||||
priv->num_duplicates++;
|
|
||||||
gst_buffer_unref (outbuf);
|
|
||||||
}
|
|
||||||
goto again;
|
goto again;
|
||||||
}
|
}
|
||||||
/* Get new timestamp, latency might have changed */
|
/* Get new timestamp, latency might have changed */
|
||||||
out_time = apply_offset (jitterbuffer, timestamp);
|
out_time = apply_offset (jitterbuffer, timestamp);
|
||||||
}
|
}
|
||||||
push_buffer:
|
push_buffer:
|
||||||
|
/* when we get here we are ready to pop and push the buffer */
|
||||||
|
outbuf = rtp_jitter_buffer_pop (priv->jbuf);
|
||||||
|
|
||||||
/* check if we are pushing something unexpected */
|
/* check if we are pushing something unexpected */
|
||||||
if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
|
if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
|
||||||
gint dropped;
|
gint dropped;
|
||||||
|
|
|
@ -1020,6 +1020,7 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
|
||||||
switch (GST_EVENT_TYPE (event)) {
|
switch (GST_EVENT_TYPE (event)) {
|
||||||
case GST_EVENT_FLUSH_STOP:
|
case GST_EVENT_FLUSH_STOP:
|
||||||
gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
|
gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
|
||||||
|
ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
|
||||||
break;
|
break;
|
||||||
case GST_EVENT_NEWSEGMENT:
|
case GST_EVENT_NEWSEGMENT:
|
||||||
{
|
{
|
||||||
|
|
|
@ -344,7 +344,6 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
|
||||||
switch (GST_EVENT_TYPE (event)) {
|
switch (GST_EVENT_TYPE (event)) {
|
||||||
case GST_EVENT_FLUSH_STOP:
|
case GST_EVENT_FLUSH_STOP:
|
||||||
gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
|
gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
|
||||||
break;
|
|
||||||
case GST_EVENT_NEWSEGMENT:
|
case GST_EVENT_NEWSEGMENT:
|
||||||
default:
|
default:
|
||||||
{
|
{
|
||||||
|
|
|
@ -99,16 +99,6 @@ rtp_jitter_buffer_new (void)
|
||||||
return jbuf;
|
return jbuf;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
|
||||||
rtp_jitter_buffer_set_tail_changed (RTPJitterBuffer * jbuf, RTPTailChanged func,
|
|
||||||
gpointer user_data)
|
|
||||||
{
|
|
||||||
g_return_if_fail (jbuf != NULL);
|
|
||||||
|
|
||||||
jbuf->tail_changed = func;
|
|
||||||
jbuf->user_data = user_data;
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
void
|
||||||
rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer * jbuf, gint clock_rate)
|
rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer * jbuf, gint clock_rate)
|
||||||
{
|
{
|
||||||
|
@ -374,6 +364,7 @@ compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf)
|
||||||
* @jbuf: an #RTPJitterBuffer
|
* @jbuf: an #RTPJitterBuffer
|
||||||
* @buf: a buffer
|
* @buf: a buffer
|
||||||
* @time: a running_time when this buffer was received in nanoseconds
|
* @time: a running_time when this buffer was received in nanoseconds
|
||||||
|
* @tail: TRUE when the tail element changed.
|
||||||
*
|
*
|
||||||
* Inserts @buf into the packet queue of @jbuf. The sequence number of the
|
* Inserts @buf into the packet queue of @jbuf. The sequence number of the
|
||||||
* packet will be used to sort the packets. This function takes ownerhip of
|
* packet will be used to sort the packets. This function takes ownerhip of
|
||||||
|
@ -383,7 +374,7 @@ compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf)
|
||||||
*/
|
*/
|
||||||
gboolean
|
gboolean
|
||||||
rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
|
rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
|
||||||
GstClockTime time)
|
GstClockTime time, gboolean * tail)
|
||||||
{
|
{
|
||||||
GList *list;
|
GList *list;
|
||||||
gint func_ret = 1;
|
gint func_ret = 1;
|
||||||
|
@ -412,13 +403,12 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
|
||||||
|
|
||||||
if (list)
|
if (list)
|
||||||
g_queue_insert_before (jbuf->packets, list, buf);
|
g_queue_insert_before (jbuf->packets, list, buf);
|
||||||
else {
|
else
|
||||||
g_queue_push_tail (jbuf->packets, buf);
|
g_queue_push_tail (jbuf->packets, buf);
|
||||||
|
|
||||||
/* tail buffer changed, signal callback */
|
/* tail was changed when we did not find a previous packet */
|
||||||
if (jbuf->tail_changed)
|
if (tail)
|
||||||
jbuf->tail_changed (jbuf, jbuf->user_data);
|
*tail = (list == NULL);
|
||||||
}
|
|
||||||
|
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,9 +67,6 @@ struct _RTPJitterBuffer {
|
||||||
gint64 window_min;
|
gint64 window_min;
|
||||||
gint64 skew;
|
gint64 skew;
|
||||||
gint64 prev_send_diff;
|
gint64 prev_send_diff;
|
||||||
|
|
||||||
RTPTailChanged tail_changed;
|
|
||||||
gpointer user_data;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct _RTPJitterBufferClass {
|
struct _RTPJitterBufferClass {
|
||||||
|
@ -81,15 +78,14 @@ GType rtp_jitter_buffer_get_type (void);
|
||||||
/* managing lifetime */
|
/* managing lifetime */
|
||||||
RTPJitterBuffer* rtp_jitter_buffer_new (void);
|
RTPJitterBuffer* rtp_jitter_buffer_new (void);
|
||||||
|
|
||||||
void rtp_jitter_buffer_set_tail_changed (RTPJitterBuffer *jbuf, RTPTailChanged func,
|
|
||||||
gpointer user_data);
|
|
||||||
|
|
||||||
void rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer *jbuf, gint clock_rate);
|
void rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer *jbuf, gint clock_rate);
|
||||||
gint rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf);
|
gint rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf);
|
||||||
|
|
||||||
void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf);
|
void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf);
|
||||||
|
|
||||||
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf, GstClockTime time);
|
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf,
|
||||||
|
GstClockTime time, gboolean *tail);
|
||||||
|
GstBuffer * rtp_jitter_buffer_peek (RTPJitterBuffer *jbuf);
|
||||||
GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf);
|
GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf);
|
||||||
|
|
||||||
void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf);
|
void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf);
|
||||||
|
|
Loading…
Reference in a new issue