rtptimerqueue: Consolidate a data structure for timers

Implement a single timer queue for all timers. The goal is to always use
ordered queues for storing timers. This way, extracting timers for
execution becomes O(1). This also allow separating the clock wait
scheduling from the timer itself and ensure that we only wake up the
timer thread when strictly needed.

The knew data structure is still O(n) on insertions and reschedule,
but we now use proximity optimization so that normal cases should be
really fast. The GList structure is also embeded intot he RtpTimer
structure to reduce the number of allocations.
This commit is contained in:
Nicolas Dufresne 2019-06-12 09:59:31 -04:00 committed by Olivier Crête
parent a53ffb6e11
commit 37742cd36d
8 changed files with 1109 additions and 4 deletions

View file

@ -14,6 +14,7 @@ libgstrtpmanager_la_SOURCES = gstrtpmanager.c \
rtpsession.c \ rtpsession.c \
rtpsource.c \ rtpsource.c \
rtpstats.c \ rtpstats.c \
rtptimerqueue.c \
gstrtpsession.c \ gstrtpsession.c \
gstrtpfunnel.c gstrtpfunnel.c
@ -30,6 +31,7 @@ noinst_HEADERS = gstrtpbin.h \
rtpsession.h \ rtpsession.h \
rtpsource.h \ rtpsource.h \
rtpstats.h \ rtpstats.h \
rtptimerqueue.h \
gstrtpsession.h \ gstrtpsession.h \
gstrtpfunnel.h gstrtpfunnel.h

View file

@ -111,6 +111,7 @@
#include "gstrtpjitterbuffer.h" #include "gstrtpjitterbuffer.h"
#include "rtpjitterbuffer.h" #include "rtpjitterbuffer.h"
#include "rtpstats.h" #include "rtpstats.h"
#include "rtptimerqueue.h"
#include <gst/glib-compat-private.h> #include <gst/glib-compat-private.h>

View file

@ -13,6 +13,7 @@ rtpmanager_sources = [
'rtpsession.c', 'rtpsession.c',
'rtpsource.c', 'rtpsource.c',
'rtpstats.c', 'rtpstats.c',
'rtptimerqueue.c',
'gstrtpsession.c', 'gstrtpsession.c',
'gstrtpfunnel.c', 'gstrtpfunnel.c',
] ]

View file

@ -0,0 +1,660 @@
/* GStreamer RTP Manager
*
* Copyright (C) 2019 Net Insight AB
* Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <gst/rtp/gstrtpbuffer.h>
#include "rtptimerqueue.h"
struct _RtpTimerQueue
{
GObject parent;
GQueue timers;
GHashTable *hashtable;
};
G_DEFINE_TYPE (RtpTimerQueue, rtp_timer_queue, G_TYPE_OBJECT);
/* some timer private helpers */
static RtpTimer *
rtp_timer_new (void)
{
return g_slice_new0 (RtpTimer);
}
static inline void
rtp_timer_set_next (RtpTimer * timer, RtpTimer * next)
{
GList *list = (GList *) timer;
list->next = (GList *) next;
}
static inline void
rtp_timer_set_prev (RtpTimer * timer, RtpTimer * prev)
{
GList *list = (GList *) timer;
list->prev = (GList *) prev;
}
static inline gboolean
rtp_timer_is_later (RtpTimer * timer, RtpTimer * next)
{
if (next == NULL)
return FALSE;
if (GST_CLOCK_TIME_IS_VALID (next->timeout)) {
if (!GST_CLOCK_TIME_IS_VALID (timer->timeout))
return FALSE;
if (timer->timeout > next->timeout)
return TRUE;
}
if (timer->timeout == next->timeout &&
gst_rtp_buffer_compare_seqnum (timer->seqnum, next->seqnum) < 0)
return TRUE;
return FALSE;
}
static inline gboolean
rtp_timer_is_sooner (RtpTimer * timer, RtpTimer * prev)
{
if (prev == NULL)
return FALSE;
if (GST_CLOCK_TIME_IS_VALID (prev->timeout)) {
if (!GST_CLOCK_TIME_IS_VALID (timer->timeout))
return TRUE;
if (timer->timeout < prev->timeout)
return TRUE;
}
if (timer->timeout == prev->timeout &&
gst_rtp_buffer_compare_seqnum (timer->seqnum, prev->seqnum) > 0)
return TRUE;
return FALSE;
}
static inline RtpTimer *
rtp_timer_queue_get_tail (RtpTimerQueue * queue)
{
return (RtpTimer *) queue->timers.tail;
}
static inline void
rtp_timer_queue_set_tail (RtpTimerQueue * queue, RtpTimer * timer)
{
queue->timers.tail = (GList *) timer;
g_assert (queue->timers.tail->next == NULL);
}
static inline RtpTimer *
rtp_timer_queue_get_head (RtpTimerQueue * queue)
{
return (RtpTimer *) queue->timers.head;
}
static inline void
rtp_timer_queue_set_head (RtpTimerQueue * queue, RtpTimer * timer)
{
queue->timers.head = (GList *) timer;
g_assert (queue->timers.head->prev == NULL);
}
static void
rtp_timer_queue_insert_before (RtpTimerQueue * queue, RtpTimer * sibling,
RtpTimer * timer)
{
if (sibling == rtp_timer_queue_get_head (queue)) {
rtp_timer_queue_set_head (queue, timer);
} else {
rtp_timer_set_prev (timer, rtp_timer_get_prev (sibling));
rtp_timer_set_next (rtp_timer_get_prev (sibling), timer);
}
rtp_timer_set_next (timer, sibling);
rtp_timer_set_prev (sibling, timer);
queue->timers.length++;
}
static void
rtp_timer_queue_insert_after (RtpTimerQueue * queue, RtpTimer * sibling,
RtpTimer * timer)
{
if (sibling == rtp_timer_queue_get_tail (queue)) {
rtp_timer_queue_set_tail (queue, timer);
} else {
rtp_timer_set_next (timer, rtp_timer_get_next (sibling));
rtp_timer_set_prev (rtp_timer_get_next (sibling), timer);
}
rtp_timer_set_prev (timer, sibling);
rtp_timer_set_next (sibling, timer);
queue->timers.length++;
}
static void
rtp_timer_queue_insert_tail (RtpTimerQueue * queue, RtpTimer * timer)
{
RtpTimer *it = rtp_timer_queue_get_tail (queue);
while (it) {
if (!GST_CLOCK_TIME_IS_VALID (it->timeout))
break;
if (timer->timeout > it->timeout)
break;
if (timer->timeout == it->timeout &&
gst_rtp_buffer_compare_seqnum (timer->seqnum, it->seqnum) < 0)
break;
it = rtp_timer_get_prev (it);
}
/* the queue is empty, or this is the earliest timeout */
if (it == NULL)
g_queue_push_head_link (&queue->timers, (GList *) timer);
else
rtp_timer_queue_insert_after (queue, it, timer);
}
static void
rtp_timer_queue_insert_head (RtpTimerQueue * queue, RtpTimer * timer)
{
RtpTimer *it = rtp_timer_queue_get_head (queue);
while (it) {
if (GST_CLOCK_TIME_IS_VALID (it->timeout)) {
if (!GST_CLOCK_TIME_IS_VALID (timer->timeout))
break;
if (timer->timeout < it->timeout)
break;
}
if (timer->timeout == it->timeout &&
gst_rtp_buffer_compare_seqnum (timer->seqnum, it->seqnum) > 0)
break;
it = rtp_timer_get_next (it);
}
/* the queue is empty, or this is the oldest */
if (it == NULL)
g_queue_push_tail_link (&queue->timers, (GList *) timer);
else
rtp_timer_queue_insert_before (queue, it, timer);
}
static void
rtp_timer_queue_init (RtpTimerQueue * queue)
{
queue->hashtable = g_hash_table_new (NULL, NULL);
}
static void
rtp_timer_queue_finalize (GObject * object)
{
RtpTimerQueue *queue = RTP_TIMER_QUEUE (object);
RtpTimer *timer;
while ((timer = rtp_timer_queue_pop_until (queue, GST_CLOCK_TIME_NONE)))
rtp_timer_free (timer);
g_hash_table_unref (queue->hashtable);
g_assert (queue->timers.length == 0);
}
static void
rtp_timer_queue_class_init (RtpTimerQueueClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
gobject_class->finalize = rtp_timer_queue_finalize;
}
/**
* rtp_timer_free:
*
* Free a #RtpTimer structure. This should be used after a timer has been
* poped or unscheduled. The timer must be queued.
*/
void
rtp_timer_free (RtpTimer * timer)
{
g_return_if_fail (timer);
g_return_if_fail (timer->queued == FALSE);
g_return_if_fail (timer->list.next == NULL);
g_return_if_fail (timer->list.prev == NULL);
g_slice_free (RtpTimer, timer);
}
/**
* rtp_timer_dup:
* @timer: a #RtpTimer
*
* This allow cloning a #RtpTimer structure.
*
* Returns: a copy of @timer
*/
RtpTimer *
rtp_timer_dup (const RtpTimer * timer)
{
RtpTimer *copy = g_slice_new (RtpTimer);
memcpy (copy, timer, sizeof (RtpTimer));
memset (&copy->list, 0, sizeof (GList));
copy->queued = FALSE;
return copy;
}
/**
* rtp_timer_queue_find:
* @queue: the #RtpTimerQueue object
* @seqnum: the sequence number of the #RtpTimer
*
* Lookup for a timer with @seqnum. Only one timer per seqnum exist withing
* the #RtpTimerQueue. This operation is o(1).
*
* Rerturn: the matching #RtpTimer or %NULL
*/
RtpTimer *
rtp_timer_queue_find (RtpTimerQueue * queue, guint seqnum)
{
return g_hash_table_lookup (queue->hashtable, GINT_TO_POINTER (seqnum));
}
/**
* rtp_timer_queue_peek_earliest:
* @queue: the #RtpTimerQueue object
*
* Rerturns: the #RtpTimer with earliest timeout value
*/
RtpTimer *
rtp_timer_queue_peek_earliest (RtpTimerQueue * queue)
{
return rtp_timer_queue_get_head (queue);
}
/**
* rtp_timer_queue_new:
*
* Returns: a freshly allocated #RtpTimerQueue
*/
RtpTimerQueue *
rtp_timer_queue_new (void)
{
return g_object_new (RTP_TYPE_TIMER_QUEUE, NULL);
}
/**
* rtp_timer_queue_insert:
* @queue: the #RtpTimerQueue object
* @timer: the #RtpTimer to insert
*
* Insert a timer into the queue. Earliest timer are at the head and then
* timer are sorted by seqnum (smaller seqnum first). This function is o(n)
* but it is expected that most timers added are schedule later, in which case
* the insertion will be faster.
*
* Returns: %FLASE is a timer with the same seqnum already existed
*/
gboolean
rtp_timer_queue_insert (RtpTimerQueue * queue, RtpTimer * timer)
{
g_return_val_if_fail (timer->queued == FALSE, FALSE);
if (rtp_timer_queue_find (queue, timer->seqnum))
return FALSE;
if (timer->timeout == -1)
rtp_timer_queue_insert_head (queue, timer);
else
rtp_timer_queue_insert_tail (queue, timer);
g_hash_table_insert (queue->hashtable,
GINT_TO_POINTER (timer->seqnum), timer);
timer->queued = TRUE;
return TRUE;
}
/**
* rtp_timer_queue_reschedule:
* @queue: the #RtpTimerQueue object
* @timer: the #RtpTimer to reschedule
*
* This function moves @timer inside the queue to put it back to it's new
* location. This function is o(n) but it is assumed that nearby modification
* of the timeout will occure.
*
* Returns: %TRUE if the timer was moved
*/
gboolean
rtp_timer_queue_reschedule (RtpTimerQueue * queue, RtpTimer * timer)
{
RtpTimer *it = timer;
g_return_val_if_fail (timer->queued == TRUE, FALSE);
while (rtp_timer_is_sooner (timer, rtp_timer_get_prev (it)))
it = rtp_timer_get_prev (it);
if (it != timer) {
g_queue_unlink (&queue->timers, (GList *) timer);
rtp_timer_queue_insert_before (queue, it, timer);
return TRUE;
}
while (rtp_timer_is_later (timer, rtp_timer_get_next (it)))
it = rtp_timer_get_next (it);
if (it != timer) {
g_queue_unlink (&queue->timers, (GList *) timer);
rtp_timer_queue_insert_after (queue, it, timer);
return TRUE;
}
return FALSE;
}
/**
* rtp_timer_queue_unschedule:
* @queue: the #RtpTimerQueue
* @timer: the #RtpTimer to unschedule
*
* This removes a timer from the queue. The timer structure can be reused,
* or freed using rtp_timer_free(). This function is o(1).
*/
void
rtp_timer_queue_unschedule (RtpTimerQueue * queue, RtpTimer * timer)
{
g_return_if_fail (timer->queued == TRUE);
g_queue_unlink (&queue->timers, (GList *) timer);
g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (timer->seqnum));
timer->queued = FALSE;
}
/**
* rtp_timer_queue_pop_until:
* @queue: the #RtpTimerQueue
* @timeout: Time at witch timers expired
*
* Unschdedule and return the earliest packet that has a timeout smaller or
* equal to @timeout. The returns #RtpTimer must be freed with
* rtp_timer_free(). This function is o(1).
*
* Returns: an expired timer according to @timeout, or %NULL.
*/
RtpTimer *
rtp_timer_queue_pop_until (RtpTimerQueue * queue, GstClockTime timeout)
{
RtpTimer *timer;
timer = (RtpTimer *) g_queue_peek_head_link (&queue->timers);
if (!timer)
return NULL;
if (!GST_CLOCK_TIME_IS_VALID (timer->timeout) || timer->timeout <= timeout) {
rtp_timer_queue_unschedule (queue, timer);
return timer;
}
return NULL;
}
/**
* rtp_timer_queue_remove_until:
* @queue: the #RtpTimerQueue
* @timeout: Time at witch timers expired
*
* Unschedule and free all timers that has a timeout smaller or equal to
* @timeout.
*/
void
rtp_timer_queue_remove_until (RtpTimerQueue * queue, GstClockTime timeout)
{
RtpTimer *timer;
while ((timer = rtp_timer_queue_pop_until (queue, timeout))) {
GST_LOG ("Removing expired timer #%d, %" GST_TIME_FORMAT " < %"
GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (timer->timeout),
GST_TIME_ARGS (timeout));
rtp_timer_free (timer);
}
}
/**
* rtp_timer_queue_remove_all:
* @queue: the #RtpTimerQueue
*
* Unschedule and free all timers from the queue.
*/
void
rtp_timer_queue_remove_all (RtpTimerQueue * queue)
{
rtp_timer_queue_remove_until (queue, GST_CLOCK_TIME_NONE);
}
/**
* rtp_timer_queue_set_timer:
* @queue: the #RtpTimerQueue
* @type: the #RtpTimerType
* @senum: the timer seqnum
* @num: the number of seqnum in the range (partially supported)
* @timeout: the timer timeout
* @delay: the additional delay (will be added to @timeout)
* @duration: the duration of the event related to the timer
* @offset: offset that can be used to convert the timeout to timestamp
*
* If there exist a timer with this seqnum it will be updated other a new
* timer is created and inserted into the queue. This function is o(n) except
* that it's optimized for later timer insertion.
*/
void
rtp_timer_queue_set_timer (RtpTimerQueue * queue, RtpTimerType type,
guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
GstClockTime duration, GstClockTimeDiff offset)
{
RtpTimer *timer;
timer = rtp_timer_queue_find (queue, seqnum);
if (!timer)
timer = rtp_timer_new ();
/* for new timers or on seqnum change reset the RTX data */
if (!timer->queued || timer->seqnum != seqnum) {
if (type == RTP_TIMER_EXPECTED) {
timer->rtx_base = timeout;
timer->rtx_delay = delay;
timer->rtx_retry = 0;
}
timer->rtx_last = GST_CLOCK_TIME_NONE;
timer->num_rtx_retry = 0;
timer->num_rtx_received = 0;
}
timer->type = type;
timer->seqnum = seqnum;
timer->num = num;
if (timeout == -1)
timer->timeout = -1;
else
timer->timeout = timeout + delay + offset;
timer->offset = offset;
timer->duration = duration;
if (timer->queued)
rtp_timer_queue_reschedule (queue, timer);
else
rtp_timer_queue_insert (queue, timer);
}
/**
* rtp_timer_queue_set_expected:
* @queue: the #RtpTimerQueue
* @senum: the timer seqnum
* @timeout: the timer timeout
* @delay: the additional delay (will be added to @timeout)
* @duration: the duration of the event related to the timer
*
* Specialized version of rtp_timer_queue_set_timer() that creates or updates a
* timer with type %RTP_TIMER_EXPECTED. Expected timers do not carry
* a timestamp, hence have no offset.
*/
void
rtp_timer_queue_set_expected (RtpTimerQueue * queue, guint16 seqnum,
GstClockTime timeout, GstClockTime delay, GstClockTime duration)
{
rtp_timer_queue_set_timer (queue, RTP_TIMER_EXPECTED, seqnum, 0, timeout,
delay, duration, 0);
}
/**
* rtp_timer_queue_set_lost:
* @queue: the #RtpTimerQueue
* @senum: the timer seqnum
* @num: the number of seqnum in the range (partially supported)
* @timeout: the timer timeout
* @duration: the duration of the event related to the timer
* @offset: offset that can be used to convert the timeout to timestamp
*
* Specialized version of rtp_timer_queue_set_timer() that creates or updates a
* timer with type %RTP_TIMER_LOST.
*/
void
rtp_timer_queue_set_lost (RtpTimerQueue * queue, guint16 seqnum,
guint num, GstClockTime timeout, GstClockTime duration,
GstClockTimeDiff offset)
{
rtp_timer_queue_set_timer (queue, RTP_TIMER_LOST, seqnum, num, timeout, 0,
duration, offset);
}
/**
* rtp_timer_queue_set_eos:
* @queue: the #RtpTimerQueue
* @timeout: the timer timeout
* @offset: offset that can be used to convert the timeout to timestamp
*
* Specialized version of rtp_timer_queue_set_timer() that creates or updates a
* timer with type %RTP_TIMER_EOS. There is only one such a timer and it has
* the special seqnum value -1 (FIXME this is not an invalid seqnum,).
*/
void
rtp_timer_queue_set_eos (RtpTimerQueue * queue, GstClockTime timeout,
GstClockTimeDiff offset)
{
rtp_timer_queue_set_timer (queue, RTP_TIMER_EOS, -1, 0, timeout, 0, 0,
offset);
}
/**
* rtp_timer_queue_set_deadline:
* @queue: the #RtpTimerQueue
* @senum: the timer seqnum
* @timeout: the timer timeout
* @offset: offset that can be used to convert the timeout to timestamp
*
* Specialized version of rtp_timer_queue_set_timer() that creates or updates a
* timer with type %RTP_TIMER_DEADLINE. There should be only one such a timer,
* its seqnum matches the first packet to be output.
*/
void
rtp_timer_queue_set_deadline (RtpTimerQueue * queue, guint16 seqnum,
GstClockTime timeout, GstClockTimeDiff offset)
{
rtp_timer_queue_set_timer (queue, RTP_TIMER_DEADLINE, seqnum, 0, timeout, 0,
0, offset);
}
/**
* rtp_timer_queue_update_timer:
* @queue: the #RtpTimerQueue
* @senum: the timer seqnum
* @timeout: the timer timeout
* @delay: the additional delay (will be added to @timeout)
* @offset: offset that can be used to convert the timeout to timestamp
* @reset: if the RTX statistics should be reset
*
* A utility to update an already queued timer.
*/
void
rtp_timer_queue_update_timer (RtpTimerQueue * queue, RtpTimer * timer,
guint16 seqnum, GstClockTime timeout, GstClockTime delay,
GstClockTimeDiff offset, gboolean reset)
{
g_return_if_fail (timer != NULL);
if (reset) {
GST_DEBUG ("reset rtx delay %" GST_TIME_FORMAT "->%" GST_TIME_FORMAT,
GST_TIME_ARGS (timer->rtx_delay), GST_TIME_ARGS (delay));
timer->rtx_base = timeout;
timer->rtx_delay = delay;
timer->rtx_retry = 0;
}
if (timer->seqnum != seqnum) {
timer->num_rtx_retry = 0;
timer->num_rtx_received = 0;
if (timer->queued) {
g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (timer->seqnum));
g_hash_table_insert (queue->hashtable, GINT_TO_POINTER (seqnum), timer);
}
}
if (timeout == -1)
timer->timeout = -1;
else
timer->timeout = timeout + delay + offset;
timer->seqnum = seqnum;
timer->offset = offset;
if (timer->queued)
rtp_timer_queue_reschedule (queue, timer);
else
rtp_timer_queue_insert (queue, timer);
}
/**
* rtp_timer_queue_length:
* @queue: the #RtpTimerQueue
*
* Returns: the number of timers in the #RtpTimerQueue
*/
guint
rtp_timer_queue_length (RtpTimerQueue * queue)
{
return queue->timers.length;
}

View file

@ -0,0 +1,128 @@
/* GStreamer RTP Manager
*
* Copyright 2007 Collabora Ltd,
* Copyright 2007 Nokia Corporation
* @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
* Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
* Copyright 2015 Kurento (http://kurento.org/)
* @author: Miguel París <mparisdiaz@gmail.com>
* Copyright 2016 Pexip AS
* @author: Havard Graff <havard@pexip.com>
* @author: Stian Selnes <stian@pexip.com>
* Copyright (C) 2019 Net Insight AB
* Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <gst/gst.h>
#ifndef __RTP_TIMER_QUEUE_H__
#define __RTP_TIMER_QUEUE_H__
#define RTP_TYPE_TIMER_QUEUE rtp_timer_queue_get_type()
G_DECLARE_FINAL_TYPE (RtpTimerQueue, rtp_timer_queue, RTP_TIMER, QUEUE, GObject);
/**
* RtpTimerType:
* @RTP_TIMER_EXPECTED: This is used to track when to emit retranmission
* requests. They may be converted into %RTP_TIMER_LOST
* timer if the number of retry has been exhausted.
* @RTP_TIMER_LOST: This is used to track when a packet is considered lost.
* @RTP_TIMER_DEADLINE: This is used to track when the jitterbuffer should
* start pushing buffers.
* @RTP_TIMER_EOS: This is used to track when end of stream is reached.
*/
typedef enum
{
RTP_TIMER_EXPECTED,
RTP_TIMER_LOST,
RTP_TIMER_DEADLINE,
RTP_TIMER_EOS
} RtpTimerType;
typedef struct
{
GList list;
gboolean queued;
guint16 seqnum;
guint num;
RtpTimerType type;
GstClockTime timeout;
GstClockTimeDiff offset;
GstClockTime duration;
GstClockTime rtx_base;
GstClockTime rtx_delay;
GstClockTime rtx_retry;
GstClockTime rtx_last;
guint num_rtx_retry;
guint num_rtx_received;
} RtpTimer;
void rtp_timer_free (RtpTimer * timer);
RtpTimer * rtp_timer_dup (const RtpTimer * timer);
static inline RtpTimer * rtp_timer_get_next (RtpTimer * timer)
{
GList *list = (GList *) timer;
return (RtpTimer *) list->next;
}
static inline RtpTimer * rtp_timer_get_prev (RtpTimer * timer)
{
GList *list = (GList *) timer;
return (RtpTimer *) list->prev;
}
RtpTimerQueue * rtp_timer_queue_new (void);
RtpTimer * rtp_timer_queue_find (RtpTimerQueue * queue, guint seqnum);
RtpTimer * rtp_timer_queue_peek_earliest (RtpTimerQueue * queue);
gboolean rtp_timer_queue_insert (RtpTimerQueue * queue, RtpTimer * timer);
gboolean rtp_timer_queue_reschedule (RtpTimerQueue * queue, RtpTimer * timer);
void rtp_timer_queue_unschedule (RtpTimerQueue * queue, RtpTimer * timer);
RtpTimer * rtp_timer_queue_pop_until (RtpTimerQueue * queue, GstClockTime timeout);
void rtp_timer_queue_remove_until (RtpTimerQueue * queue, GstClockTime timeout);
void rtp_timer_queue_remove_all (RtpTimerQueue * queue);
void rtp_timer_queue_set_timer (RtpTimerQueue * queue, RtpTimerType type,
guint16 seqnum, guint num, GstClockTime timeout,
GstClockTime delay, GstClockTime duration,
GstClockTimeDiff offset);
void rtp_timer_queue_set_expected (RtpTimerQueue * queue, guint16 seqnum,
GstClockTime timeout, GstClockTime delay,
GstClockTime duration);
void rtp_timer_queue_set_lost (RtpTimerQueue * queue, guint16 seqnum,
guint num, GstClockTime timeout,
GstClockTime duration, GstClockTimeDiff offset);
void rtp_timer_queue_set_eos (RtpTimerQueue * queue, GstClockTime timeout,
GstClockTimeDiff offset);
void rtp_timer_queue_set_deadline (RtpTimerQueue * queue, guint16 seqnum,
GstClockTime timeout, GstClockTimeDiff offset);
void rtp_timer_queue_update_timer (RtpTimerQueue * queue, RtpTimer * timer, guint16 seqnum,
GstClockTime timeout, GstClockTime delay,
GstClockTimeDiff offset, gboolean reset);
guint rtp_timer_queue_length (RtpTimerQueue * queue);
#endif

View file

@ -600,8 +600,14 @@ elements_videocrop_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(CFLA
elements_videofilter_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS) elements_videofilter_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS)
elements_videofilter_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstvideo-$(GST_API_VERSION) $(LDADD) elements_videofilter_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstvideo-$(GST_API_VERSION) $(LDADD)
elements_rtpjitterbuffer_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS) elements_rtpjitterbuffer_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_NET_CFLAGS) $(GIO_CFLAGS) $(CFLAGS) $(AM_CFLAGS)
elements_rtpjitterbuffer_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(LDADD) elements_rtpjitterbuffer_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(GST_NET_LIBS) $(GIO_LIBS) $(LDADD)
elements_rtpjitterbuffer_SOURCES = \
elements/rtpjitterbuffer.c \
../../gst/rtpmanager/gstrtpjitterbuffer.c \
../../gst/rtpmanager/rtpjitterbuffer.c \
../../gst/rtpmanager/rtpstats.c \
../../gst/rtpmanager/rtptimerqueue.c
elements_rtprtx_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS) elements_rtprtx_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS)
elements_rtprtx_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(LDADD) elements_rtprtx_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(LDADD)

View file

@ -31,6 +31,9 @@
#include <gst/rtp/gstrtpbuffer.h> #include <gst/rtp/gstrtpbuffer.h>
#include "gst/rtpmanager/gstrtpjitterbuffer.h"
#include "gst/rtpmanager/rtptimerqueue.h"
/* For ease of programming we use globals to keep refs for our floating /* For ease of programming we use globals to keep refs for our floating
* src and sink pads we create; otherwise we always have to do get_pad, * src and sink pads we create; otherwise we always have to do get_pad,
* get_peer, and then remove references in every test function */ * get_peer, and then remove references in every test function */
@ -2752,13 +2755,313 @@ GST_START_TEST (test_rtx_does_not_affect_pts_calculation)
GST_END_TEST; GST_END_TEST;
GST_START_TEST (test_timer_queue_set_timer)
{
RtpTimerQueue *queue = rtp_timer_queue_new ();
RtpTimer *timer10, *timer0;
rtp_timer_queue_set_timer (queue, RTP_TIMER_EXPECTED, 10, 0,
1 * GST_SECOND, 2 * GST_SECOND, 5 * GST_SECOND, 0);
timer10 = rtp_timer_queue_find (queue, 10);
fail_unless (timer10);
fail_unless_equals_int (10, timer10->seqnum);
fail_unless_equals_int (0, timer10->num);
fail_unless_equals_int (RTP_TIMER_EXPECTED, timer10->type);
/* timer10->timeout = timerout + delay */
fail_unless_equals_uint64 (3 * GST_SECOND, timer10->timeout);
fail_unless_equals_uint64 (5 * GST_SECOND, timer10->duration);
fail_unless_equals_uint64 (1 * GST_SECOND, timer10->rtx_base);
fail_unless_equals_uint64 (2 * GST_SECOND, timer10->rtx_delay);
fail_unless_equals_uint64 (0, timer10->rtx_retry);
fail_unless_equals_uint64 (GST_CLOCK_TIME_NONE, timer10->rtx_last);
fail_unless_equals_int (0, timer10->num_rtx_retry);
fail_unless_equals_int (0, timer10->num_rtx_received);
rtp_timer_queue_set_timer (queue, RTP_TIMER_LOST, 0, 10,
0 * GST_SECOND, 2 * GST_SECOND, 0, 0);
timer0 = rtp_timer_queue_find (queue, 0);
fail_unless (timer0);
fail_unless_equals_int (0, timer0->seqnum);
fail_unless_equals_int (10, timer0->num);
fail_unless_equals_int (RTP_TIMER_LOST, timer0->type);
fail_unless_equals_uint64 (2 * GST_SECOND, timer0->timeout);
fail_unless_equals_uint64 (0, timer0->duration);
fail_unless_equals_uint64 (0, timer0->rtx_base);
fail_unless_equals_uint64 (0, timer0->rtx_delay);
fail_unless_equals_uint64 (0, timer0->rtx_retry);
fail_unless_equals_uint64 (GST_CLOCK_TIME_NONE, timer0->rtx_last);
fail_unless_equals_int (0, timer0->num_rtx_retry);
fail_unless_equals_int (0, timer0->num_rtx_received);
/* also check order while at it */
fail_unless (timer10->list.next == NULL);
fail_unless (timer10->list.prev == (GList *) timer0);
fail_unless (timer0->list.next == (GList *) timer10);
fail_unless (timer0->list.prev == NULL);
g_object_unref (queue);
}
GST_END_TEST;
GST_START_TEST (test_timer_queue_insert_head)
{
RtpTimerQueue *queue = rtp_timer_queue_new ();
RtpTimer *timer, *next, *prev;
rtp_timer_queue_set_deadline (queue, 1, -1, 0);
rtp_timer_queue_set_deadline (queue, 3, -1, 0);
rtp_timer_queue_set_deadline (queue, 2, -1, 0);
rtp_timer_queue_set_deadline (queue, 0, -1, 0);
timer = rtp_timer_queue_find (queue, 0);
fail_if (timer == NULL);
fail_unless_equals_int (0, timer->seqnum);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_unless (prev == NULL);
fail_if (next == NULL);
fail_unless_equals_int (1, next->seqnum);
timer = rtp_timer_queue_find (queue, 3);
fail_if (timer == NULL);
fail_unless_equals_int (3, timer->seqnum);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_if (prev == NULL);
fail_unless_equals_int (2, prev->seqnum);
fail_unless (next == NULL);
timer = rtp_timer_queue_find (queue, 2);
fail_if (timer == NULL);
fail_unless_equals_int (2, timer->seqnum);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_if (prev == NULL);
fail_if (next == NULL);
fail_unless_equals_int (1, prev->seqnum);
fail_unless_equals_int (3, next->seqnum);
timer = rtp_timer_queue_find (queue, 1);
fail_if (timer == NULL);
fail_unless_equals_int (1, timer->seqnum);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_if (prev == NULL);
fail_if (next == NULL);
fail_unless_equals_int (0, prev->seqnum);
fail_unless_equals_int (2, next->seqnum);
g_object_unref (queue);
}
GST_END_TEST;
GST_START_TEST (test_timer_queue_reschedule)
{
RtpTimerQueue *queue = rtp_timer_queue_new ();
RtpTimer *timer, *next, *prev;
rtp_timer_queue_set_deadline (queue, 3, 1 * GST_SECOND, 0);
rtp_timer_queue_set_deadline (queue, 1, 2 * GST_SECOND, 0);
rtp_timer_queue_set_deadline (queue, 2, 3 * GST_SECOND, 0);
rtp_timer_queue_set_deadline (queue, 0, 4 * GST_SECOND, 0);
timer = rtp_timer_queue_find (queue, 1);
fail_if (timer == NULL);
/* move to head, making sure seqnum order is respected */
rtp_timer_queue_set_deadline (queue, 1, 1 * GST_SECOND, 0);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_unless (prev == NULL);
fail_if (next == NULL);
fail_unless_equals_int (3, next->seqnum);
/* move head back */
rtp_timer_queue_set_deadline (queue, 1, 2 * GST_SECOND, 0);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_if (prev == NULL);
fail_if (next == NULL);
fail_unless_equals_int (3, prev->seqnum);
fail_unless_equals_int (2, next->seqnum);
/* move to tail */
timer = rtp_timer_queue_find (queue, 2);
fail_if (timer == NULL);
rtp_timer_queue_set_deadline (queue, 2, 4 * GST_SECOND, 0);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_if (prev == NULL);
fail_unless (next == NULL);
fail_unless_equals_int (0, prev->seqnum);
/* move tail back */
rtp_timer_queue_set_deadline (queue, 2, 3 * GST_SECOND, 0);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_if (prev == NULL);
fail_if (next == NULL);
fail_unless_equals_int (1, prev->seqnum);
fail_unless_equals_int (0, next->seqnum);
/* not moving toward head */
rtp_timer_queue_set_deadline (queue, 2, 2 * GST_SECOND, 0);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_if (prev == NULL);
fail_if (next == NULL);
fail_unless_equals_int (1, prev->seqnum);
fail_unless_equals_int (0, next->seqnum);
/* not moving toward tail */
rtp_timer_queue_set_deadline (queue, 2, 3 * GST_SECOND, 0);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_if (prev == NULL);
fail_if (next == NULL);
fail_unless_equals_int (1, prev->seqnum);
fail_unless_equals_int (0, next->seqnum);
/* inner move toward head */
rtp_timer_queue_set_deadline (queue, 2, GST_SECOND + GST_SECOND / 2, 0);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_if (prev == NULL);
fail_if (next == NULL);
fail_unless_equals_int (3, prev->seqnum);
fail_unless_equals_int (1, next->seqnum);
/* inner move toward tail */
rtp_timer_queue_set_deadline (queue, 2, 3 * GST_SECOND, 0);
next = (RtpTimer *) timer->list.next;
prev = (RtpTimer *) timer->list.prev;
fail_if (prev == NULL);
fail_if (next == NULL);
fail_unless_equals_int (1, prev->seqnum);
fail_unless_equals_int (0, next->seqnum);
g_object_unref (queue);
}
GST_END_TEST;
GST_START_TEST (test_timer_queue_pop_until)
{
RtpTimerQueue *queue = rtp_timer_queue_new ();
RtpTimer *timer;
rtp_timer_queue_set_deadline (queue, 2, 2 * GST_SECOND, 0);
rtp_timer_queue_set_deadline (queue, 1, 1 * GST_SECOND, 0);
rtp_timer_queue_set_deadline (queue, 0, -1, 0);
timer = rtp_timer_queue_pop_until (queue, 1 * GST_SECOND);
fail_if (timer == NULL);
fail_unless_equals_int (0, timer->seqnum);
rtp_timer_free (timer);
timer = rtp_timer_queue_pop_until (queue, 1 * GST_SECOND);
fail_if (timer == NULL);
fail_unless_equals_int (1, timer->seqnum);
rtp_timer_free (timer);
timer = rtp_timer_queue_pop_until (queue, 1 * GST_SECOND);
fail_unless (timer == NULL);
g_object_unref (queue);
}
GST_END_TEST;
GST_START_TEST (test_timer_queue_update_timer_seqnum)
{
RtpTimerQueue *queue = rtp_timer_queue_new ();
RtpTimer *timer;
rtp_timer_queue_set_deadline (queue, 2, 2 * GST_SECOND, 0);
timer = rtp_timer_queue_find (queue, 2);
fail_if (timer == NULL);
rtp_timer_queue_update_timer (queue, timer, 3, 3 * GST_SECOND, 0, 0, FALSE);
timer = rtp_timer_queue_find (queue, 2);
fail_unless (timer == NULL);
timer = rtp_timer_queue_find (queue, 3);
fail_if (timer == NULL);
fail_unless_equals_int (1, rtp_timer_queue_length (queue));
g_object_unref (queue);
}
GST_END_TEST;
GST_START_TEST (test_timer_queue_dup_timer)
{
RtpTimerQueue *queue = rtp_timer_queue_new ();
RtpTimer *timer;
rtp_timer_queue_set_deadline (queue, 2, 2 * GST_SECOND, 0);
timer = rtp_timer_queue_find (queue, 2);
fail_if (timer == NULL);
timer = rtp_timer_dup (timer);
timer->seqnum = 3;
rtp_timer_queue_insert (queue, timer);
fail_unless_equals_int (2, rtp_timer_queue_length (queue));
g_object_unref (queue);
}
GST_END_TEST;
GST_START_TEST (test_timer_queue_timer_offset)
{
RtpTimerQueue *queue = rtp_timer_queue_new ();
RtpTimer *timer;
rtp_timer_queue_set_timer (queue, RTP_TIMER_EXPECTED, 2, 0, 2 * GST_SECOND,
GST_MSECOND, 0, GST_USECOND);
timer = rtp_timer_queue_find (queue, 2);
fail_if (timer == NULL);
fail_unless_equals_uint64 (2 * GST_SECOND + GST_MSECOND + GST_USECOND,
timer->timeout);
fail_unless_equals_int64 (GST_USECOND, timer->offset);
rtp_timer_queue_update_timer (queue, timer, 2, 3 * GST_SECOND,
2 * GST_MSECOND, 2 * GST_USECOND, FALSE);
fail_unless_equals_uint64 (3 * GST_SECOND + 2 * GST_MSECOND +
2 * GST_USECOND, timer->timeout);
fail_unless_equals_int64 (2 * GST_USECOND, timer->offset);
g_object_unref (queue);
}
GST_END_TEST;
static Suite * static Suite *
rtpjitterbuffer_suite (void) rtpjitterbuffer_suite (void)
{ {
Suite *s = suite_create ("rtpjitterbuffer"); Suite *s = suite_create ("rtpjitterbuffer");
TCase *tc_chain = tcase_create ("general"); TCase *tc_chain = tcase_create ("general");
gst_element_register (NULL, "rtpjitterbuffer", GST_RANK_NONE,
GST_TYPE_RTP_JITTER_BUFFER);
suite_add_tcase (s, tc_chain); suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_timer_queue_set_timer);
tcase_add_test (tc_chain, test_timer_queue_insert_head);
tcase_add_test (tc_chain, test_timer_queue_reschedule);
tcase_add_test (tc_chain, test_timer_queue_pop_until);
tcase_add_test (tc_chain, test_timer_queue_update_timer_seqnum);
tcase_add_test (tc_chain, test_timer_queue_dup_timer);
tcase_add_test (tc_chain, test_timer_queue_timer_offset);
tcase_add_test (tc_chain, test_push_forward_seq); tcase_add_test (tc_chain, test_push_forward_seq);
tcase_add_test (tc_chain, test_push_backward_seq); tcase_add_test (tc_chain, test_push_backward_seq);
tcase_add_test (tc_chain, test_push_unordered); tcase_add_test (tc_chain, test_push_unordered);

View file

@ -68,7 +68,11 @@ good_tests = [
[ 'elements/rtpbin_buffer_list' ], [ 'elements/rtpbin_buffer_list' ],
[ 'elements/rtpcollision' ], [ 'elements/rtpcollision' ],
[ 'elements/rtpfunnel' ], [ 'elements/rtpfunnel' ],
[ 'elements/rtpjitterbuffer' ], [ 'elements/rtpjitterbuffer', false, [gstrtp_dep],
['../../gst/rtpmanager/gstrtpjitterbuffer.c',
'../../gst/rtpmanager/rtpjitterbuffer.c',
'../../gst/rtpmanager/rtpstats.c',
'../../gst/rtpmanager/rtptimerqueue.c']],
[ 'elements/rtpmux' ], [ 'elements/rtpmux' ],
[ 'elements/rtprtx' ], [ 'elements/rtprtx' ],
[ 'elements/rtpsession' ], [ 'elements/rtpsession' ],
@ -188,7 +192,7 @@ foreach t : good_tests
env.set('GST_REGISTRY', join_paths(meson.current_build_dir(), '@0@.registry'.format(test_name))) env.set('GST_REGISTRY', join_paths(meson.current_build_dir(), '@0@.registry'.format(test_name)))
exe = executable(test_name, fname, extra_sources, exe = executable(test_name, fname, extra_sources,
include_directories : [configinc], include_directories : [configinc, libsinc],
c_args : ['-DHAVE_CONFIG_H=1' ] + test_defines, c_args : ['-DHAVE_CONFIG_H=1' ] + test_defines,
dependencies : [libm] + test_deps + extra_deps, dependencies : [libm] + test_deps + extra_deps,
) )