diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index ecb5081401..1ed91182bb 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -333,7 +333,9 @@ struct _GstRtpJitterBufferPrivate GstClockTime last_in_pts; guint32 next_in_seqnum; - GArray *timers; + /* "normal" timers */ + RtpTimerQueue *timers; + /* timers used for RTX statistics backlog */ RtpTimerQueue *rtx_stats_timers; /* start and stop ranges */ @@ -479,7 +481,6 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer, static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer); static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer); -static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer); static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer); @@ -956,7 +957,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) priv->last_rtptime = -1; priv->avg_jitter = 0; priv->segment_seqnum = GST_SEQNUM_INVALID; - priv->timers = g_array_new (FALSE, TRUE, sizeof (RtpTimer)); + priv->timers = rtp_timer_queue_new (); priv->rtx_stats_timers = rtp_timer_queue_new (); priv->jbuf = rtp_jitter_buffer_new (); g_mutex_init (&priv->jbuf_lock); @@ -1041,7 +1042,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object) jitterbuffer = GST_RTP_JITTER_BUFFER (object); priv = jitterbuffer->priv; - g_array_free (priv->timers, TRUE); + g_object_unref (priv->timers); g_object_unref (priv->rtx_stats_timers); g_mutex_clear (&priv->jbuf_lock); g_cond_clear (&priv->jbuf_queue); @@ -1533,7 +1534,7 @@ gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer) rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL); rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE); rtp_jitter_buffer_reset_skew (priv->jbuf); - remove_all_timers (jitterbuffer); + rtp_timer_queue_remove_all (priv->timers); g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL); g_queue_clear (&priv->gap_packets); JBUF_UNLOCK (priv); @@ -1980,22 +1981,11 @@ apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp) return timestamp; } -static RtpTimer * -find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum) +static GstClockTimeDiff +timeout_offset (GstRtpJitterBuffer * jitterbuffer) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - RtpTimer *timer = NULL; - gint i, len; - - len = priv->timers->len; - for (i = 0; i < len; i++) { - RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i); - if (test->seqnum == seqnum) { - timer = test; - break; - } - } - return timer; + return priv->ts_offset + priv->out_offset + priv->latency_ns; } static void @@ -2011,172 +2001,43 @@ unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer) } static GstClockTime -get_timeout (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer) +get_pts_timeout (const RtpTimer * timer) { - GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - GstClockTime test_timeout; - - if ((test_timeout = timer->timeout) == -1) + if (timer->timeout == -1) return -1; - if (timer->type != RTP_TIMER_EXPECTED) { - /* add our latency and offset to get output times. */ - test_timeout = apply_offset (jitterbuffer, test_timeout); - test_timeout += priv->latency_ns; - } - return test_timeout; + return timer->timeout - timer->offset; } static void -recalculate_timer (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer) -{ - GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - - if (priv->clock_id) { - GstClockTime timeout = get_timeout (jitterbuffer, timer); - - GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT, - GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout)); - - if (timeout == -1 || timeout < priv->timer_timeout) - unschedule_current_timer (jitterbuffer); - } -} - -static RtpTimer * -add_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimerType type, - guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay, - GstClockTime duration) +update_current_timer (GstRtpJitterBuffer * jitterbuffer) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; RtpTimer *timer; - gint len; - GST_DEBUG_OBJECT (jitterbuffer, - "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %" - GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout), - GST_TIME_ARGS (delay)); + timer = rtp_timer_queue_peek_earliest (priv->timers); - len = priv->timers->len; - g_array_set_size (priv->timers, len + 1); - timer = &g_array_index (priv->timers, RtpTimer, len); - timer->idx = len; - timer->type = type; - timer->seqnum = seqnum; - timer->num = num; - timer->timeout = timeout + delay; - timer->duration = duration; - if (type == RTP_TIMER_EXPECTED) { - timer->rtx_base = timeout; - timer->rtx_delay = delay; - timer->rtx_retry = 0; - } - timer->rtx_last = GST_CLOCK_TIME_NONE; - timer->num_rtx_retry = 0; - timer->num_rtx_received = 0; - recalculate_timer (jitterbuffer, timer); - JBUF_SIGNAL_TIMER (priv); - - return timer; -} - -static void -reschedule_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer, - guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset) -{ - GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - gboolean seqchange, timechange; - guint16 oldseq; - GstClockTime new_timeout; - - oldseq = timer->seqnum; - new_timeout = timeout + delay; - seqchange = oldseq != seqnum; - timechange = timer->timeout != new_timeout; - - if (!seqchange && !timechange) { - GST_DEBUG_OBJECT (jitterbuffer, - "No changes in seqnum (%d) and timeout (%" GST_TIME_FORMAT - "), skipping", oldseq, GST_TIME_ARGS (timer->timeout)); - return; - } - - GST_DEBUG_OBJECT (jitterbuffer, - "replace timer %d for seqnum %d->%d timeout %" GST_TIME_FORMAT - "->%" GST_TIME_FORMAT, timer->type, oldseq, seqnum, - GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (new_timeout)); - - timer->timeout = new_timeout; - timer->seqnum = seqnum; - if (reset) { - GST_DEBUG_OBJECT (jitterbuffer, "reset rtx delay %" GST_TIME_FORMAT - "->%" GST_TIME_FORMAT, GST_TIME_ARGS (timer->rtx_delay), - GST_TIME_ARGS (delay)); - timer->rtx_base = timeout; - timer->rtx_delay = delay; - timer->rtx_retry = 0; - } - if (seqchange) { - timer->num_rtx_retry = 0; - timer->num_rtx_received = 0; - } - - if (priv->clock_id) { - /* we changed the seqnum and there is a timer currently waiting with this - * seqnum, unschedule it */ - if (seqchange && priv->timer_seqnum == oldseq) - unschedule_current_timer (jitterbuffer); - /* we changed the time, check if it is earlier than what we are waiting - * for and unschedule if so */ - else if (timechange) - recalculate_timer (jitterbuffer, timer); - } -} - -static RtpTimer * -set_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimerType type, - guint16 seqnum, GstClockTime timeout) -{ - RtpTimer *timer; - - /* find the seqnum timer */ - timer = find_timer (jitterbuffer, seqnum); + /* we never need to wakeup the timer thread when there is no more timers, if + * it was waiting on a clock id, it will simply do later and then wait on + * the conditions */ if (timer == NULL) { - timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1); - } else { - reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE); + GST_DEBUG_OBJECT (jitterbuffer, "no more timers"); + return; } - return timer; -} -static void -remove_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer) -{ - GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - guint idx; + GST_DEBUG_OBJECT (jitterbuffer, "waiting till %" GST_TIME_FORMAT + " and earliest timeout is at %" GST_TIME_FORMAT, + GST_TIME_ARGS (priv->timer_timeout), GST_TIME_ARGS (timer->timeout)); - if (timer->idx == -1) + /* wakeup the timer thread in case the timer queue was empty */ + JBUF_SIGNAL_TIMER (priv); + + /* no need to wait if the current wait is earlier or later */ + if (timer->timeout != -1 && timer->timeout >= priv->timer_timeout) return; - if (priv->clock_id && priv->timer_seqnum == timer->seqnum) - unschedule_current_timer (jitterbuffer); - - idx = timer->idx; - GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx); - g_array_remove_index_fast (priv->timers, idx); - timer->idx = idx; - - JBUF_SIGNAL_TIMER (priv); -} - -static void -remove_all_timers (GstRtpJitterBuffer * jitterbuffer) -{ - GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - GST_DEBUG_OBJECT (jitterbuffer, "removed all timers"); - g_array_set_size (priv->timers, 0); + /* for other cases, force a reschedule of the timer thread */ unschedule_current_timer (jitterbuffer); - JBUF_SIGNAL_TIMER (priv); } /* get the extra delay to wait before sending RTX */ @@ -2216,14 +2077,13 @@ get_rtx_delay (GstRtpJitterBufferPrivate * priv) /* Check if packet with seqnum is already considered definitely lost by being * part of a "lost timer" for multiple packets */ static gboolean -already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum) +already_lost (GstRtpJitterBuffer * jitterbuffer, GstClockTime pts, + guint16 seqnum) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - gint i, len; - len = priv->timers->len; - for (i = 0; i < len; i++) { - RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i); + RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers); + while (test && test->timeout <= pts) { gint gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum); if (test->num > 1 && test->type == RTP_TIMER_LOST && gap >= 0 && @@ -2232,6 +2092,8 @@ already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum) seqnum, test->seqnum, (test->seqnum + test->num - 1) & 0xffff); return TRUE; } + + test = rtp_timer_get_next (test); } return FALSE; @@ -2254,25 +2116,37 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - /* go through all timers and unschedule the ones with a large gap */ + /* schedule immediatly expected timer which exceed the maximum RTX delay + * reorder configuration */ if (priv->do_retransmission && priv->rtx_delay_reorder > 0) { - gint i, len; - len = priv->timers->len; - for (i = 0; i < len; i++) { - RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i); + RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers); + while (test) { gint gap; + /* filter the timer type to speed up this loop */ + if (test->type != RTP_TIMER_EXPECTED) { + test = rtp_timer_get_next (test); + continue; + } + gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum); GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", test->type, test->seqnum, seqnum, gap); - if (gap > priv->rtx_delay_reorder) { - /* max gap, we exceeded the max reorder distance and we don't expect the - * missing packet to be this reordered */ - if (test->num_rtx_retry == 0 && test->type == RTP_TIMER_EXPECTED) - reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE); - } + /* if this expected packet have a smaller gap then the configured one, + * then earlier timer are not expected to have bigger gap as the timer + * queue is ordered */ + if (gap <= priv->rtx_delay_reorder) + break; + + /* max gap, we exceevded the max reorder distance and we don't expect the + * missing packet to be this reordered */ + if (test->num_rtx_retry == 0 && test->type == RTP_TIMER_EXPECTED) + rtp_timer_queue_update_timer (priv->timers, test, test->seqnum, + -1, 0, 0, FALSE); + + test = rtp_timer_get_next (test); } } @@ -2320,16 +2194,17 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, if (timer) { timer->type = RTP_TIMER_EXPECTED; - reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected, - delay, TRUE); + rtp_timer_queue_update_timer (priv->timers, timer, priv->next_in_seqnum, + expected, delay, 0, TRUE); } else { - add_timer (jitterbuffer, RTP_TIMER_EXPECTED, priv->next_in_seqnum, 0, + rtp_timer_queue_set_expected (priv->timers, priv->next_in_seqnum, expected, delay, priv->packet_spacing); } } else if (timer && timer->type != RTP_TIMER_DEADLINE) { /* if we had a timer, remove it, we don't know when to expect the next * packet. */ - remove_timer (jitterbuffer, timer); + rtp_timer_queue_unschedule (priv->timers, timer); + rtp_timer_free (timer); } } @@ -2434,8 +2309,9 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, /* this timer will fire immediately and the lost event will be pushed from * the timer thread */ if (lost_packets > 0) { - add_timer (jitterbuffer, RTP_TIMER_LOST, expected, lost_packets, - priv->last_in_pts + duration, 0, gap_time); + rtp_timer_queue_set_lost (priv->timers, expected, lost_packets, + priv->last_in_pts + duration, gap_time, + timeout_offset (jitterbuffer)); expected += lost_packets; priv->last_in_pts += gap_time; } @@ -2451,7 +2327,7 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, } if (priv->do_retransmission) { - RtpTimer *timer = find_timer (jitterbuffer, expected); + RtpTimer *timer = rtp_timer_queue_find (priv->timers, expected); GstClockTime rtx_delay = get_rtx_delay (priv); /* if we had a timer for the first missing packet, update it. */ @@ -2461,8 +2337,8 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, timer->duration = duration; if (timeout > (expected_pts + delay) && timer->num_rtx_retry == 0) { - reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_pts, - delay, TRUE); + rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum, + expected_pts, delay, 0, TRUE); } expected++; expected_pts += duration; @@ -2472,15 +2348,15 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, /* minimum delay the expected-timer has "waited" is the elapsed time * since expected arrival of the missing packet */ GstClockTime delay = MAX (rtx_delay, pts - expected_pts); - add_timer (jitterbuffer, RTP_TIMER_EXPECTED, expected, 0, expected_pts, + rtp_timer_queue_set_expected (priv->timers, expected, expected_pts, delay, duration); expected_pts += duration; expected++; } } else { while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) { - add_timer (jitterbuffer, RTP_TIMER_LOST, expected, 0, expected_pts, 0, - duration); + rtp_timer_queue_set_lost (priv->timers, expected, 0, expected_pts, + duration, timeout_offset (jitterbuffer)); expected_pts += duration; expected++; } @@ -2678,7 +2554,7 @@ gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer, rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item_and_retain_sticky_events, &events); rtp_jitter_buffer_reset_skew (priv->jbuf); - remove_all_timers (jitterbuffer); + rtp_timer_queue_remove_all (priv->timers); priv->discont = TRUE; priv->last_popped_seqnum = -1; @@ -2752,7 +2628,7 @@ gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer) if (!item) return FALSE; - timer = find_timer (jitterbuffer, item->seqnum); + timer = rtp_timer_queue_find (priv->timers, item->seqnum); if (!timer || timer->type != RTP_TIMER_DEADLINE) return FALSE; @@ -2761,6 +2637,7 @@ gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer) GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now", priv->faststart_min_packets); timer->timeout = -1; + rtp_timer_queue_reschedule (priv->timers, timer); return TRUE; } @@ -2907,14 +2784,22 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, "packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate, max_dropout, max_misorder); + timer = rtp_timer_queue_find (priv->timers, seqnum); + if (GST_BUFFER_IS_RETRANSMISSION (buffer)) { + if (G_UNLIKELY (!priv->do_retransmission)) + goto unsolicited_rtx; + + if (!timer) + timer = rtp_timer_queue_find (priv->rtx_stats_timers, seqnum); + + /* If the first buffer is an (old) rtx, e.g. from before a reset, or + * already lost, ignore it */ + if (!timer || expected == -1) + goto unsolicited_rtx; + } + /* now check against our expected seqnum */ if (G_UNLIKELY (expected == -1)) { - if (G_UNLIKELY (GST_BUFFER_IS_RETRANSMISSION (buffer))) { - /* If the first buffer is an (old) rtx, e.g. from before a reset, - * ignore it. */ - goto unsolicited_rtx; - } - GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum); /* calculate a pts based on rtptime and arrival time (dts) */ @@ -2931,7 +2816,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, /* we don't know what the next_in_seqnum should be, wait for the last * possible moment to push this buffer, maybe we get an earlier seqnum * while we wait */ - set_timer (jitterbuffer, RTP_TIMER_DEADLINE, seqnum, pts); + rtp_timer_queue_set_deadline (priv->timers, seqnum, pts, + timeout_offset (jitterbuffer)); do_next_seqnum = TRUE; /* take rtptime and pts to calculate packet spacing */ @@ -2945,20 +2831,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d", expected, seqnum, gap); - if (G_UNLIKELY (GST_BUFFER_IS_RETRANSMISSION (buffer))) { - if (G_UNLIKELY (!priv->do_retransmission)) - goto unsolicited_rtx; - - /* If this packet is a rtx that we may have actually requested, - * make sure we actually did, or whether we still need it. */ - timer = find_timer (jitterbuffer, seqnum); - if (!timer) - timer = timer_queue_find (priv->rtx_stats_timers, seqnum); - if (!timer) - goto unsolicited_rtx; - } - - if (G_UNLIKELY (gap > 0 && priv->timers->len >= max_dropout)) { + if (G_UNLIKELY (gap > 0 && + rtp_timer_queue_length (priv->timers) >= max_dropout)) { /* If we have timers for more than RTP_MAX_DROPOUT packets * pending this means that we have a huge gap overall. We can * reset the jitterbuffer at this point because there's @@ -2966,7 +2840,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, * sensible with the past data. Just try again from the * next packet */ GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting", - priv->timers->len, max_dropout); + rtp_timer_queue_length (priv->timers), max_dropout); gst_buffer_unref (buffer); return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum); } @@ -3029,13 +2903,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, priv->next_in_seqnum = (seqnum + 1) & 0xffff; } - timer = find_timer (jitterbuffer, seqnum); - if (GST_BUFFER_IS_RETRANSMISSION (buffer)) { - if (!timer) - timer = rtp_timer_queue_find (priv->rtx_stats_timers, seqnum); - if (timer) - timer->num_rtx_received++; - } + if (GST_BUFFER_IS_RETRANSMISSION (buffer)) + timer->num_rtx_received++; /* At 2^15, we would detect a seqnum rollover too early, therefore * limit the queue size. But let's not limit it to a number that is @@ -3043,6 +2912,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, * sequence number, let's allow at least 10k packets in any case. */ while (rtp_jitter_buffer_is_full (priv->jbuf) && priv->srcresult == GST_FLOW_OK) { + update_current_timer (jitterbuffer); JBUF_SIGNAL_EVENT (priv); JBUF_WAIT_QUEUE (priv); } @@ -3072,7 +2942,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, } } - if (already_lost (jitterbuffer, seqnum)) + if (already_lost (jitterbuffer, pts, seqnum)) goto already_lost; /* let's drop oldest packet if the queue is already full and drop-on-latency @@ -3155,6 +3025,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, msg = check_buffering_percent (jitterbuffer, percent); finished: + update_current_timer (jitterbuffer); JBUF_UNLOCK (priv); if (msg) @@ -3353,7 +3224,8 @@ update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated)); if (estimated != -1 && priv->estimated_eos != estimated) { - set_timer (jitterbuffer, RTP_TIMER_EOS, -1, estimated); + rtp_timer_queue_set_eos (priv->timers, estimated, + timeout_offset (jitterbuffer)); priv->estimated_eos = estimated; } } @@ -3442,7 +3314,7 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum) if (type == ITEM_TYPE_EVENT && outevent && GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) { g_assert (priv->eos); - while (priv->timers->len > 0) { + while (rtp_timer_queue_length (priv->timers) > 0) { /* Stopping timers */ unschedule_current_timer (jitterbuffer); JBUF_WAIT_TIMER (priv); @@ -3737,6 +3609,7 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer, GstClockTime rtx_retry_period; GstClockTime rtx_retry_timeout; GstClock *clock; + GstClockTimeDiff offset = 0; GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %" GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now)); @@ -3780,23 +3653,26 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer, /* calculate the timeout for the next retransmission attempt */ timer->rtx_retry += rtx_retry_timeout; - GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %" + GST_DEBUG_OBJECT (jitterbuffer, "timer #%i base %" GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u", - GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay), - GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry); + timer->seqnum, GST_TIME_ARGS (timer->rtx_base), + GST_TIME_ARGS (timer->rtx_delay), GST_TIME_ARGS (timer->rtx_retry), + timer->num_rtx_retry); if ((priv->rtx_max_retries != -1 && timer->num_rtx_retry >= priv->rtx_max_retries) || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period) || (timer->rtx_base + rtx_retry_period < now)) { - GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer"); + GST_DEBUG_OBJECT (jitterbuffer, "reschedule #%i as LOST timer", + timer->seqnum); /* too many retransmission request, we now convert the timer * to a lost timer, leave the num_rtx_retry as it is for stats */ timer->type = RTP_TIMER_LOST; timer->rtx_delay = 0; timer->rtx_retry = 0; + offset = timeout_offset (jitterbuffer); } - reschedule_timer (jitterbuffer, timer, timer->seqnum, - timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE); + rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum, + timer->rtx_base + timer->rtx_retry, timer->rtx_delay, offset, FALSE); JBUF_UNLOCK (priv); gst_pad_push_event (priv->sinkpad, event); @@ -3835,7 +3711,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer, /* we now only accept seqnum bigger than this */ if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) { priv->next_in_seqnum = next_in_seqnum; - priv->last_in_pts = apply_offset (jitterbuffer, timer->timeout); + priv->last_in_pts = apply_offset (jitterbuffer, get_pts_timeout (timer)); } /* Avoid creating events if we don't need it. Note that we still need to create @@ -3844,7 +3720,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer, if (priv->do_lost) { GstClockTime duration, timestamp; /* create paket lost event */ - timestamp = apply_offset (jitterbuffer, timer->timeout); + timestamp = apply_offset (jitterbuffer, get_pts_timeout (timer)); duration = timer->duration; if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0) duration = priv->packet_spacing; @@ -3862,14 +3738,13 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer, rtp_jitter_buffer_free_item (item); if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) { - RtpTimer *stats_timer = rtp_timer_dup (timer); /* Store info to update stats if the packet arrives too late */ - /* TODO this one could be zero-copy when the timers array is replaced */ - stats_timer->timeout = now + priv->rtx_stats_timeout * GST_MSECOND; - stats_timer->type = RTP_TIMER_LOST; - rtp_timer_queue_insert (priv->rtx_stats_timers, stats_timer); + timer->timeout = now + priv->rtx_stats_timeout * GST_MSECOND; + timer->type = RTP_TIMER_LOST; + rtp_timer_queue_insert (priv->rtx_stats_timers, timer); + } else { + rtp_timer_free (timer); } - remove_timer (jitterbuffer, timer); if (head) JBUF_SIGNAL_EVENT (priv); @@ -3884,7 +3759,7 @@ do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer, GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout"); - remove_timer (jitterbuffer, timer); + rtp_timer_free (timer); if (!priv->eos) { GstEvent *event; @@ -3911,7 +3786,7 @@ do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer, * only mess with current ongoing seqnum if still unknown */ if (priv->next_seqnum == -1) priv->next_seqnum = timer->seqnum; - remove_timer (jitterbuffer, timer); + rtp_timer_free (timer); JBUF_SIGNAL_EVENT (priv); return TRUE; @@ -3956,8 +3831,14 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) JBUF_LOCK (priv); while (priv->timer_running) { RtpTimer *timer = NULL; - GstClockTime timer_timeout = -1; - gint i, len; + GQueue timers = G_QUEUE_INIT; + + /* don't produce data in paused */ + while (priv->blocked) { + JBUF_WAIT_TIMER (priv); + if (!priv->timer_running) + goto stopping; + } /* If we have a clock, update "now" now with the very * latest running time we have. If timers are unscheduled below we @@ -3982,77 +3863,33 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) if (priv->do_retransmission) rtp_timer_queue_remove_until (priv->rtx_stats_timers, now); - /* Iterate "normal" timers */ - len = priv->timers->len; - for (i = 0; i < len;) { - RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i); - GstClockTime test_timeout = get_timeout (jitterbuffer, test); - gboolean save_best = FALSE; - - GST_DEBUG_OBJECT (jitterbuffer, - "%d, %d, %d, %" GST_TIME_FORMAT " diff:%" GST_STIME_FORMAT, i, - test->type, test->seqnum, GST_TIME_ARGS (test_timeout), - GST_STIME_ARGS ((gint64) (test_timeout - now))); - - /* Weed out anything too late */ - if (test->type == RTP_TIMER_LOST && - (test_timeout == -1 || test_timeout <= now)) { - GST_DEBUG_OBJECT (jitterbuffer, "Weeding out late entry"); - do_lost_timeout (jitterbuffer, test, now); - if (!priv->timer_running) - break; - /* We don't move the iterator forward since we just removed the current entry, - * but we update the termination condition */ - len = priv->timers->len; - } else { - /* find the smallest timeout */ - if (timer == NULL) { - save_best = TRUE; - } else if (timer_timeout == -1) { - /* we already have an immediate timeout, the new timer must be an - * immediate timer with smaller seqnum to become the best */ - if (test_timeout == -1 - && (gst_rtp_buffer_compare_seqnum (test->seqnum, - timer->seqnum) > 0)) - save_best = TRUE; - } else if (test_timeout == -1) { - /* first immediate timer */ - save_best = TRUE; - } else if (test_timeout < timer_timeout) { - /* earlier timer */ - save_best = TRUE; - } else if (test_timeout == timer_timeout - && (gst_rtp_buffer_compare_seqnum (test->seqnum, - timer->seqnum) > 0)) { - /* same timer, smaller seqnum */ - save_best = TRUE; + /* Iterate expired "normal" timers */ + while ((timer = rtp_timer_queue_pop_until (priv->timers, now))) { + do { + if (timer->type == RTP_TIMER_LOST) { + GST_DEBUG_OBJECT (jitterbuffer, "Weeding out expired lost timers"); + do_lost_timeout (jitterbuffer, timer, now); + } else { + g_queue_push_tail_link (&timers, (GList *) timer); } + } while ((timer = rtp_timer_queue_pop_until (priv->timers, now))); - if (save_best) { - GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i); - timer = test; - timer_timeout = test_timeout; - } - i++; - } + /* execetute the remaining timers */ + while ((timer = (RtpTimer *) g_queue_pop_head_link (&timers))) + do_timeout (jitterbuffer, timer, now); } - if (timer && !priv->blocked) { + + timer = rtp_timer_queue_peek_earliest (priv->timers); + if (timer) { GstClock *clock; GstClockTime sync_time; GstClockID id; GstClockReturn ret; GstClockTimeDiff clock_jitter; - if (timer_timeout == -1 || timer_timeout <= now || priv->eos) { - /* We have normally removed all lost timers in the loop above */ - g_assert (timer->type != RTP_TIMER_LOST); - - do_timeout (jitterbuffer, timer, now); - /* check here, do_timeout could have released the lock */ - if (!priv->timer_running) - break; - continue; - } + /* we poped all immediate and due timer, so this should just never + * happens */ + g_assert (GST_CLOCK_TIME_IS_VALID (timer->timeout)); GST_OBJECT_LOCK (jitterbuffer); clock = GST_ELEMENT_CLOCK (jitterbuffer); @@ -4060,22 +3897,22 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) GST_OBJECT_UNLOCK (jitterbuffer); /* let's just push if there is no clock */ GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away"); - now = timer_timeout; + now = timer->timeout; continue; } /* prepare for sync against clock */ - sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time; + sync_time = timer->timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time; /* add latency of peer to get input time */ sync_time += priv->peer_latency; - GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT - " with sync time %" GST_TIME_FORMAT, - GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time)); + GST_DEBUG_OBJECT (jitterbuffer, "timer #%i sync to timestamp %" + GST_TIME_FORMAT " with sync time %" GST_TIME_FORMAT, timer->seqnum, + GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (sync_time)); /* create an entry for the clock */ id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time); - priv->timer_timeout = timer_timeout; + priv->timer_timeout = timer->timeout; priv->timer_seqnum = timer->seqnum; GST_OBJECT_UNLOCK (jitterbuffer); @@ -4085,6 +3922,7 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) ret = gst_clock_id_wait (id, &clock_jitter); JBUF_LOCK (priv); + if (!priv->timer_running) { gst_clock_id_unref (id); priv->clock_id = NULL; @@ -4092,21 +3930,29 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) } if (ret != GST_CLOCK_UNSCHEDULED) { - now = timer_timeout + MAX (clock_jitter, 0); + now = timer->timeout + MAX (clock_jitter, 0); GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum, GST_STIME_ARGS (clock_jitter)); } else { GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled"); } + /* and free the entry */ gst_clock_id_unref (id); priv->clock_id = NULL; } else { + /* when draining the timers, the pusher thread will reuse our + * condition to wait for completion. Signal that thread before + * sleeping again here */ + if (priv->eos) + JBUF_SIGNAL_TIMER (priv); + /* no timers, wait for activity */ JBUF_WAIT_TIMER (priv); } } +stopping: JBUF_UNLOCK (priv); GST_DEBUG_OBJECT (jitterbuffer, "we are stopping"); diff --git a/gst/rtpmanager/rtptimerqueue.h b/gst/rtpmanager/rtptimerqueue.h index cd8f2847a1..b172125497 100644 --- a/gst/rtpmanager/rtptimerqueue.h +++ b/gst/rtpmanager/rtptimerqueue.h @@ -59,9 +59,6 @@ typedef struct GList list; gboolean queued; - /* FIXME remove */ - guint idx; - guint16 seqnum; guint num; RtpTimerType type;