From 4de919a17ad8468ee058f57de3d8fc94fa281348 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 16 Sep 2013 15:53:47 +0200 Subject: [PATCH] jitterbuffer: use separate thread for timeouts Use a separate thread for scheduling the timeouts instead of using the downstream streaming thread that might block at any time. --- gst/rtpmanager/gstrtpjitterbuffer.c | 341 +++++++++++++++------------- 1 file changed, 187 insertions(+), 154 deletions(-) diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 6326476379..8391a136bd 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -117,17 +117,29 @@ enum if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \ goto label; \ } G_STMT_END - #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock)) -#define JBUF_WAIT(priv) (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock)) -#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \ - JBUF_WAIT(priv); \ - if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \ - goto label; \ +#define JBUF_WAIT_TIMER(priv) G_STMT_START { \ + (priv)->waiting_timer = TRUE; \ + g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \ + (priv)->waiting_timer = FALSE; \ +} G_STMT_END +#define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \ + if (G_UNLIKELY ((priv)->waiting_timer)) \ + g_cond_signal (&(priv)->jbuf_timer); \ } G_STMT_END -#define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond)) +#define JBUF_WAIT_EVENT(priv,label) G_STMT_START { \ + (priv)->waiting_event = TRUE; \ + g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \ + (priv)->waiting_event = FALSE; \ + if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \ + goto label; \ +} G_STMT_END +#define JBUF_SIGNAL_EVENT(priv) G_STMT_START { \ + if (G_UNLIKELY ((priv)->waiting_event)) \ + g_cond_signal (&(priv)->jbuf_event); \ +} G_STMT_END struct _GstRtpJitterBufferPrivate { @@ -136,13 +148,18 @@ struct _GstRtpJitterBufferPrivate RTPJitterBuffer *jbuf; GMutex jbuf_lock; - GCond jbuf_cond; - gboolean waiting; + gboolean waiting_timer; + GCond jbuf_timer; + gboolean waiting_event; + GCond jbuf_event; gboolean discont; gboolean ts_discont; gboolean active; guint64 out_offset; + gboolean timer_running; + GThread *timer_thread; + /* properties */ guint latency_ms; guint64 latency_ns; @@ -321,6 +338,8 @@ static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer); static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer); static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer); +static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer); + static void gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) { @@ -596,7 +615,8 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData)); priv->jbuf = rtp_jitter_buffer_new (); g_mutex_init (&priv->jbuf_lock); - g_cond_init (&priv->jbuf_cond); + g_cond_init (&priv->jbuf_timer); + g_cond_init (&priv->jbuf_event); /* reset skew detection initialy */ rtp_jitter_buffer_reset_skew (priv->jbuf); @@ -641,7 +661,8 @@ gst_rtp_jitter_buffer_finalize (GObject * object) g_array_free (jitterbuffer->priv->timers, TRUE); g_mutex_clear (&jitterbuffer->priv->jbuf_lock); - g_cond_clear (&jitterbuffer->priv->jbuf_cond); + g_cond_clear (&jitterbuffer->priv->jbuf_timer); + g_cond_clear (&jitterbuffer->priv->jbuf_event); g_object_unref (jitterbuffer->priv->jbuf); @@ -831,7 +852,7 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active, GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->out_offset)); priv->active = active; - JBUF_SIGNAL (priv); + JBUF_SIGNAL_EVENT (priv); } if (!active) { rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE); @@ -980,7 +1001,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer) priv->srcresult = GST_FLOW_FLUSHING; GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue"); /* this unblocks any waiting pops on the src pad task */ - JBUF_SIGNAL (priv); + JBUF_SIGNAL_EVENT (priv); /* unlock clock, we just unschedule, the entry will be released by the * locking streaming thread. */ unschedule_current_timer (jitterbuffer); @@ -1077,13 +1098,16 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, priv->last_pt = -1; /* block until we go to PLAYING */ priv->blocked = TRUE; + priv->timer_running = TRUE; + priv->timer_thread = + g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer); JBUF_UNLOCK (priv); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: JBUF_LOCK (priv); /* unblock to allow streaming in PLAYING */ priv->blocked = FALSE; - JBUF_SIGNAL (priv); + JBUF_SIGNAL_EVENT (priv); JBUF_UNLOCK (priv); break; default: @@ -1108,7 +1132,13 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, ret = GST_STATE_CHANGE_NO_PREROLL; break; case GST_STATE_CHANGE_PAUSED_TO_READY: + JBUF_LOCK (priv); gst_buffer_replace (&priv->last_sr, NULL); + priv->timer_running = FALSE; + JBUF_SIGNAL_TIMER (priv); + JBUF_UNLOCK (priv); + g_thread_join (priv->timer_thread); + priv->timer_thread = NULL; break; case GST_STATE_CHANGE_READY_TO_NULL: break; @@ -1229,7 +1259,7 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, if (ret && !priv->eos) { GST_INFO_OBJECT (jitterbuffer, "queuing EOS"); priv->eos = TRUE; - JBUF_SIGNAL (priv); + JBUF_SIGNAL_EVENT (priv); } else if (priv->eos) { GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS"); } else { @@ -1479,6 +1509,7 @@ add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, timer->rtx_retry = 0; } recalculate_timer (jitterbuffer, timer); + JBUF_SIGNAL_TIMER (priv); return timer; } @@ -1599,6 +1630,7 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, continue; if (gap == 0) { + GST_DEBUG ("found timer for current seqnum"); /* the timer for the current seqnum */ timer = test; } else if (gap > priv->rtx_delay_reorder) { @@ -1624,6 +1656,7 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, /* if we had a timer, remove it, we don't know when to expect the next * packet. */ remove_timer (jitterbuffer, timer); + JBUF_SIGNAL_EVENT (priv); } } @@ -1676,8 +1709,9 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, expected_dts = priv->last_in_dts + duration; if (priv->do_retransmission) { - expected++; type = TIMER_TYPE_EXPECTED; + if (find_timer (jitterbuffer, type, expected)) + expected++; } else { type = TIMER_TYPE_LOST; } @@ -1894,8 +1928,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, do_handle_sync (jitterbuffer); /* signal addition of new buffer when the _loop is waiting. */ - if (priv->waiting && priv->active) - JBUF_SIGNAL (priv); + if (priv->active && priv->waiting_timer) + JBUF_SIGNAL_EVENT (priv); /* let's unschedule and unblock any waiting buffers. We only want to do this * when the tail buffer changed */ @@ -2194,7 +2228,7 @@ wait: } /* the timeout for when we expected a packet expired */ -static GstFlowReturn +static void do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, GstClockTimeDiff clock_jitter) { @@ -2228,12 +2262,10 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, } reschedule_timer (jitterbuffer, timer, timer->seqnum, timer->rtx_base + timer->rtx_retry); - - return priv->srcresult; } /* a packet is lost */ -static GstFlowReturn +static void do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, GstClockTimeDiff clock_jitter) { @@ -2286,6 +2318,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff; /* remove timer now */ remove_timer (jitterbuffer, timer); + JBUF_SIGNAL_EVENT (priv); if (priv->do_lost) { GstEvent *event; @@ -2299,25 +2332,31 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, "late", G_TYPE_BOOLEAN, lost_packets_late, NULL)); JBUF_UNLOCK (priv); gst_pad_push_event (priv->srcpad, event); - JBUF_LOCK_CHECK (priv, flushing); - } - return GST_FLOW_OK; - - /* ERRORS */ -flushing: - { - GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); - return priv->srcresult; + JBUF_LOCK (priv); } } -static GstFlowReturn +static void do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer) { + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout"); remove_timer (jitterbuffer, timer); + JBUF_SIGNAL_EVENT (priv); +} - return GST_FLOW_EOS; +static void +do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, + GstClockTimeDiff clock_jitter) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + + GST_INFO_OBJECT (jitterbuffer, "got deadline timeout"); + + priv->next_seqnum = timer->seqnum; + remove_timer (jitterbuffer, timer); + JBUF_SIGNAL_EVENT (priv); } /* called when we need to wait for the next timeout. @@ -2327,137 +2366,127 @@ do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer) * * If there are no timers, we wait on a gcond until something new happens. */ -static GstFlowReturn +static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - GstFlowReturn result = GST_FLOW_OK; - gint i, len; - TimerData *timer = NULL; - GstClockTime timer_timeout = -1; - gint timer_idx; - len = priv->timers->len; - for (i = 0; i < len; i++) { - TimerData *test = &g_array_index (priv->timers, TimerData, i); - GstClockTime test_timeout = get_timeout (jitterbuffer, test); + JBUF_LOCK (priv); + while (priv->timer_running) { + TimerData *timer = NULL; + GstClockTime timer_timeout = -1; + gint i, len; + gint timer_idx; - GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %" GST_TIME_FORMAT, - i, test->seqnum, GST_TIME_ARGS (test_timeout)); + len = priv->timers->len; + for (i = 0; i < len; i++) { + TimerData *test = &g_array_index (priv->timers, TimerData, i); + GstClockTime test_timeout = get_timeout (jitterbuffer, test); - /* find the smallest timeout */ - if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) { - timer = test; - timer_timeout = test_timeout; + GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT, + i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout)); + + /* find the smallest timeout */ + if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) { + timer = test; + timer_timeout = test_timeout; + if (timer_timeout == -1) + break; + } + } + if (timer) { + GstClock *clock; + GstClockTime sync_time; + GstClockID id; + GstClockReturn ret; + GstClockTimeDiff clock_jitter; + + /* no timestamp, timeout immeditately */ if (timer_timeout == -1) - break; - } - } - if (timer) { - GstClock *clock; - GstClockTime sync_time; - GstClockID id; - GstClockReturn ret; - GstClockTimeDiff clock_jitter; + goto do_timeout; - /* no timestamp, timeout immeditately */ - if (timer_timeout == -1) - goto do_timeout; + GST_OBJECT_LOCK (jitterbuffer); + clock = GST_ELEMENT_CLOCK (jitterbuffer); + if (!clock) { + GST_OBJECT_UNLOCK (jitterbuffer); + /* let's just push if there is no clock */ + GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away"); + goto do_timeout; + } - GST_OBJECT_LOCK (jitterbuffer); - clock = GST_ELEMENT_CLOCK (jitterbuffer); - if (!clock) { + /* prepare for sync against clock */ + sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time; + /* add latency of peer to get input time */ + sync_time += priv->peer_latency; + + GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT + " with sync time %" GST_TIME_FORMAT, + GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time)); + + /* create an entry for the clock */ + id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time); + priv->unscheduled = FALSE; + priv->timer_timeout = timer_timeout; + priv->timer_seqnum = timer->seqnum; + timer_idx = timer->idx; GST_OBJECT_UNLOCK (jitterbuffer); - /* let's just push if there is no clock */ - GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away"); - goto do_timeout; + + /* release the lock so that the other end can push stuff or unlock */ + JBUF_UNLOCK (priv); + + ret = gst_clock_id_wait (id, &clock_jitter); + + JBUF_LOCK (priv); + if (!priv->timer_running) + break; + + GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT, + ret, priv->timer_seqnum, clock_jitter); + /* and free the entry */ + gst_clock_id_unref (id); + priv->clock_id = NULL; + + if (priv->timers->len <= timer_idx) + continue; + + /* we released the lock, the array might have changed */ + timer = &g_array_index (priv->timers, TimerData, timer_idx); + /* if changed to timeout immediately, do so */ + if (timer->timeout == -1) + goto do_timeout; + + /* if we got unscheduled and we are not flushing, it's because a new tail + * element became available in the queue or we flushed the queue. + * Grab it and try to push or sync. */ + if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) { + GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled"); + continue; + } + + do_timeout: + switch (timer->type) { + case TIMER_TYPE_EXPECTED: + do_expected_timeout (jitterbuffer, timer, clock_jitter); + break; + case TIMER_TYPE_LOST: + do_lost_timeout (jitterbuffer, timer, clock_jitter); + break; + case TIMER_TYPE_DEADLINE: + do_deadline_timeout (jitterbuffer, timer, clock_jitter); + break; + case TIMER_TYPE_EOS: + do_eos_timeout (jitterbuffer, timer); + break; + } + } else { + /* no timers, wait for activity */ + GST_DEBUG_OBJECT (jitterbuffer, "waiting"); + JBUF_WAIT_TIMER (priv); + GST_DEBUG_OBJECT (jitterbuffer, "waiting done"); } - - /* prepare for sync against clock */ - sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time; - /* add latency of peer to get input time */ - sync_time += priv->peer_latency; - - GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT - " with sync time %" GST_TIME_FORMAT, - GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time)); - - /* create an entry for the clock */ - id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time); - priv->unscheduled = FALSE; - priv->timer_timeout = timer_timeout; - priv->timer_seqnum = timer->seqnum; - timer_idx = timer->idx; - GST_OBJECT_UNLOCK (jitterbuffer); - - /* release the lock so that the other end can push stuff or unlock */ - JBUF_UNLOCK (priv); - - ret = gst_clock_id_wait (id, &clock_jitter); - - JBUF_LOCK (priv); - GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT, - ret, priv->timer_seqnum, clock_jitter); - /* and free the entry */ - gst_clock_id_unref (id); - priv->clock_id = NULL; - - /* at this point, the clock could have been unlocked by a timeout, a new - * tail element was added to the queue or because we are shutting down. Check - * for shutdown first. */ - if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) - goto flushing; - - if (priv->timers->len <= timer_idx) - goto done; - - /* we released the lock, the array might have changed */ - timer = &g_array_index (priv->timers, TimerData, timer_idx); - /* if changed to timeout immediately, do so */ - if (timer->timeout == -1) - goto do_timeout; - - /* if we got unscheduled and we are not flushing, it's because a new tail - * element became available in the queue or we flushed the queue. - * Grab it and try to push or sync. */ - if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) { - GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled"); - goto done; - } - - do_timeout: - switch (timer->type) { - case TIMER_TYPE_EXPECTED: - result = do_expected_timeout (jitterbuffer, timer, clock_jitter); - break; - case TIMER_TYPE_LOST: - result = do_lost_timeout (jitterbuffer, timer, clock_jitter); - break; - case TIMER_TYPE_DEADLINE: - priv->next_seqnum = timer->seqnum; - remove_timer (jitterbuffer, timer); - break; - case TIMER_TYPE_EOS: - result = do_eos_timeout (jitterbuffer, timer); - break; - } - } else { - /* no timers, wait for activity */ - GST_DEBUG_OBJECT (jitterbuffer, "waiting"); - priv->waiting = TRUE; - JBUF_WAIT_CHECK (priv, flushing); - priv->waiting = FALSE; - GST_DEBUG_OBJECT (jitterbuffer, "waiting done"); - } - -done: - return result; - -flushing: - { - GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); - return priv->srcresult; } + GST_DEBUG_OBJECT (jitterbuffer, "we are stopping"); + return; } /* @@ -2477,9 +2506,13 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) JBUF_LOCK_CHECK (priv, flushing); do { result = handle_next_buffer (jitterbuffer); - if (G_LIKELY (result == GST_FLOW_WAIT)) + if (G_LIKELY (result == GST_FLOW_WAIT)) { + GST_DEBUG_OBJECT (jitterbuffer, "waiting for event"); /* now wait for the next event */ - result = wait_next_timeout (jitterbuffer); + JBUF_WAIT_EVENT (priv, flushing); + GST_DEBUG_OBJECT (jitterbuffer, "waiting for event done"); + result = GST_FLOW_OK; + } } while (result == GST_FLOW_OK); JBUF_UNLOCK (priv);