rtpjitterbuffer: Keep JBUF lock while processing timers

Until now, do_expected_timeout() was shortly dropping the JBUF_LOCK in order
to push RTX event event without causing deadlock. As a side effect, some
CPU hung would happen as the timerqueue would get filled while looping over
the due timers. To mitigate this, we were processing the lost timer first and
placing into a queue the remainign to be processed later.

In the gap caused by an unlock, we could endup receiving one of the seqnum
present in the pending timers. In that case, the timer would not be found and
a new one was created. When we then update the expected timer, the seqnum
would already exist and the updated timer would be lost.

In this patch we remove the unlock from do_expected_timeout() and place all
pending RTX event into a queue (instead of pending timer). Then, as soon as
we have selected a timer to wait (or if there is no timer to wait for) we send
all the upstream RTX events. As we no longer unlock, we no longer need to pop
more then one timer from the queue, and we do so with the lock held, which
blocks any new colliding timers from being created.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/616>
This commit is contained in:
Nicolas Dufresne 2020-03-27 15:48:32 -04:00
parent 0594d2f981
commit b4f421e9aa

View file

@ -3774,7 +3774,7 @@ update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer,
/* the timeout for when we expected a packet expired */
static gboolean
do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
GstClockTime now)
GstClockTime now, GQueue * events)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstEvent *event;
@ -3812,6 +3812,7 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
"deadline", G_TYPE_UINT, rtx_deadline_ms,
"packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
"avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
g_queue_push_tail (events, event);
GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
priv->num_rtx_requests++;
@ -3849,10 +3850,6 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
timer->rtx_base + timer->rtx_retry, timer->rtx_delay, offset, FALSE);
JBUF_UNLOCK (priv);
gst_pad_push_event (priv->sinkpad, event);
JBUF_LOCK (priv);
return FALSE;
}
@ -3922,13 +3919,13 @@ do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
static gboolean
do_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
GstClockTime now)
GstClockTime now, GQueue * events)
{
gboolean removed = FALSE;
switch (timer->type) {
case RTP_TIMER_EXPECTED:
removed = do_expected_timeout (jitterbuffer, timer, now);
removed = do_expected_timeout (jitterbuffer, timer, now, events);
break;
case RTP_TIMER_LOST:
removed = do_lost_timeout (jitterbuffer, timer, now);
@ -3943,6 +3940,35 @@ do_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
return removed;
}
static void
push_rtx_events_unlocked (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstEvent *event;
while ((event = (GstEvent *) g_queue_pop_head (events)))
gst_pad_push_event (priv->sinkpad, event);
}
/* called with JBUF lock
*
* Pushes all events in @events queue.
*
* Returns: %TRUE if the timer thread is not longer running
*/
static void
push_rtx_events (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
if (events->length == 0)
return;
JBUF_UNLOCK (priv);
push_rtx_events_unlocked (jitterbuffer, events);
JBUF_LOCK (priv);
}
/* called when we need to wait for the next timeout.
*
* We loop over the array of recorded timeouts and wait for the earliest one.
@ -3959,7 +3985,7 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
JBUF_LOCK (priv);
while (priv->timer_running) {
RtpTimer *timer = NULL;
GQueue timers = G_QUEUE_INIT;
GQueue events = G_QUEUE_INIT;
/* don't produce data in paused */
while (priv->blocked) {
@ -3992,25 +4018,8 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
rtp_timer_queue_remove_until (priv->rtx_stats_timers, now);
/* Iterate expired "normal" timers */
while ((timer = rtp_timer_queue_pop_until (priv->timers, now))) {
do {
if (timer->type == RTP_TIMER_LOST) {
GST_DEBUG_OBJECT (jitterbuffer, "Weeding out expired lost timers");
do_lost_timeout (jitterbuffer, timer, now);
} else {
g_queue_push_tail_link (&timers, (GList *) timer);
}
} while ((timer = rtp_timer_queue_pop_until (priv->timers, now)));
/* execute the remaining timers */
while ((timer = (RtpTimer *) g_queue_pop_head_link (&timers)))
do_timeout (jitterbuffer, timer, now);
/* do_expected_timeout(), called by do_timeout will drop the
* JBUF_LOCK, so we need to check if we are still running */
if (!priv->timer_running)
goto stopping;
}
while ((timer = rtp_timer_queue_pop_until (priv->timers, now)))
do_timeout (jitterbuffer, timer, now, &events);
timer = rtp_timer_queue_peek_earliest (priv->timers);
if (timer) {
@ -4031,6 +4040,7 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
/* let's just push if there is no clock */
GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
now = timer->timeout;
push_rtx_events (jitterbuffer, &events);
continue;
}
@ -4052,11 +4062,14 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
/* release the lock so that the other end can push stuff or unlock */
JBUF_UNLOCK (priv);
push_rtx_events_unlocked (jitterbuffer, &events);
ret = gst_clock_id_wait (id, &clock_jitter);
JBUF_LOCK (priv);
if (!priv->timer_running) {
g_queue_clear_full (&events, (GDestroyNotify) gst_event_unref);
gst_clock_id_unref (id);
priv->clock_id = NULL;
break;
@ -4075,6 +4088,8 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
gst_clock_id_unref (id);
priv->clock_id = NULL;
} else {
push_rtx_events_unlocked (jitterbuffer, &events);
/* when draining the timers, the pusher thread will reuse our
* condition to wait for completion. Signal that thread before
* sleeping again here */