diff --git a/gst/rtpmanager/Makefile.am b/gst/rtpmanager/Makefile.am index 0defa28416..6221a9d187 100644 --- a/gst/rtpmanager/Makefile.am +++ b/gst/rtpmanager/Makefile.am @@ -14,6 +14,7 @@ libgstrtpmanager_la_SOURCES = gstrtpmanager.c \ rtpsession.c \ rtpsource.c \ rtpstats.c \ + rtptimerqueue.c \ gstrtpsession.c \ gstrtpfunnel.c @@ -30,6 +31,7 @@ noinst_HEADERS = gstrtpbin.h \ rtpsession.h \ rtpsource.h \ rtpstats.h \ + rtptimerqueue.h \ gstrtpsession.h \ gstrtpfunnel.h diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 4c06762746..8398a2fe24 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -111,6 +111,7 @@ #include "gstrtpjitterbuffer.h" #include "rtpjitterbuffer.h" #include "rtpstats.h" +#include "rtptimerqueue.h" #include diff --git a/gst/rtpmanager/meson.build b/gst/rtpmanager/meson.build index b800fecf9f..221c8e3e5f 100644 --- a/gst/rtpmanager/meson.build +++ b/gst/rtpmanager/meson.build @@ -13,6 +13,7 @@ rtpmanager_sources = [ 'rtpsession.c', 'rtpsource.c', 'rtpstats.c', + 'rtptimerqueue.c', 'gstrtpsession.c', 'gstrtpfunnel.c', ] diff --git a/gst/rtpmanager/rtptimerqueue.c b/gst/rtpmanager/rtptimerqueue.c new file mode 100644 index 0000000000..399a75f501 --- /dev/null +++ b/gst/rtpmanager/rtptimerqueue.c @@ -0,0 +1,660 @@ +/* GStreamer RTP Manager + * + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include + +#include "rtptimerqueue.h" + +struct _RtpTimerQueue +{ + GObject parent; + + GQueue timers; + GHashTable *hashtable; +}; + +G_DEFINE_TYPE (RtpTimerQueue, rtp_timer_queue, G_TYPE_OBJECT); + +/* some timer private helpers */ + +static RtpTimer * +rtp_timer_new (void) +{ + return g_slice_new0 (RtpTimer); +} + +static inline void +rtp_timer_set_next (RtpTimer * timer, RtpTimer * next) +{ + GList *list = (GList *) timer; + list->next = (GList *) next; +} + +static inline void +rtp_timer_set_prev (RtpTimer * timer, RtpTimer * prev) +{ + GList *list = (GList *) timer; + list->prev = (GList *) prev; +} + +static inline gboolean +rtp_timer_is_later (RtpTimer * timer, RtpTimer * next) +{ + if (next == NULL) + return FALSE; + + if (GST_CLOCK_TIME_IS_VALID (next->timeout)) { + if (!GST_CLOCK_TIME_IS_VALID (timer->timeout)) + return FALSE; + + if (timer->timeout > next->timeout) + return TRUE; + } + + if (timer->timeout == next->timeout && + gst_rtp_buffer_compare_seqnum (timer->seqnum, next->seqnum) < 0) + return TRUE; + + return FALSE; +} + +static inline gboolean +rtp_timer_is_sooner (RtpTimer * timer, RtpTimer * prev) +{ + if (prev == NULL) + return FALSE; + + if (GST_CLOCK_TIME_IS_VALID (prev->timeout)) { + if (!GST_CLOCK_TIME_IS_VALID (timer->timeout)) + return TRUE; + + if (timer->timeout < prev->timeout) + return TRUE; + } + + if (timer->timeout == prev->timeout && + gst_rtp_buffer_compare_seqnum (timer->seqnum, prev->seqnum) > 0) + return TRUE; + + return FALSE; +} + + +static inline RtpTimer * +rtp_timer_queue_get_tail (RtpTimerQueue * queue) +{ + return (RtpTimer *) queue->timers.tail; +} + +static inline void +rtp_timer_queue_set_tail (RtpTimerQueue * queue, RtpTimer * timer) +{ + queue->timers.tail = (GList *) timer; + g_assert (queue->timers.tail->next == NULL); +} + +static inline RtpTimer * +rtp_timer_queue_get_head (RtpTimerQueue * queue) +{ + return (RtpTimer *) queue->timers.head; +} + +static inline void +rtp_timer_queue_set_head (RtpTimerQueue * queue, RtpTimer * timer) +{ + queue->timers.head = (GList *) timer; + g_assert (queue->timers.head->prev == NULL); +} + +static void +rtp_timer_queue_insert_before (RtpTimerQueue * queue, RtpTimer * sibling, + RtpTimer * timer) +{ + if (sibling == rtp_timer_queue_get_head (queue)) { + rtp_timer_queue_set_head (queue, timer); + } else { + rtp_timer_set_prev (timer, rtp_timer_get_prev (sibling)); + rtp_timer_set_next (rtp_timer_get_prev (sibling), timer); + } + + rtp_timer_set_next (timer, sibling); + rtp_timer_set_prev (sibling, timer); + + queue->timers.length++; +} + +static void +rtp_timer_queue_insert_after (RtpTimerQueue * queue, RtpTimer * sibling, + RtpTimer * timer) +{ + if (sibling == rtp_timer_queue_get_tail (queue)) { + rtp_timer_queue_set_tail (queue, timer); + } else { + rtp_timer_set_next (timer, rtp_timer_get_next (sibling)); + rtp_timer_set_prev (rtp_timer_get_next (sibling), timer); + } + + rtp_timer_set_prev (timer, sibling); + rtp_timer_set_next (sibling, timer); + + queue->timers.length++; +} + +static void +rtp_timer_queue_insert_tail (RtpTimerQueue * queue, RtpTimer * timer) +{ + RtpTimer *it = rtp_timer_queue_get_tail (queue); + + while (it) { + if (!GST_CLOCK_TIME_IS_VALID (it->timeout)) + break; + + if (timer->timeout > it->timeout) + break; + + if (timer->timeout == it->timeout && + gst_rtp_buffer_compare_seqnum (timer->seqnum, it->seqnum) < 0) + break; + + it = rtp_timer_get_prev (it); + } + + /* the queue is empty, or this is the earliest timeout */ + if (it == NULL) + g_queue_push_head_link (&queue->timers, (GList *) timer); + else + rtp_timer_queue_insert_after (queue, it, timer); +} + +static void +rtp_timer_queue_insert_head (RtpTimerQueue * queue, RtpTimer * timer) +{ + RtpTimer *it = rtp_timer_queue_get_head (queue); + + while (it) { + if (GST_CLOCK_TIME_IS_VALID (it->timeout)) { + if (!GST_CLOCK_TIME_IS_VALID (timer->timeout)) + break; + + if (timer->timeout < it->timeout) + break; + } + + if (timer->timeout == it->timeout && + gst_rtp_buffer_compare_seqnum (timer->seqnum, it->seqnum) > 0) + break; + + it = rtp_timer_get_next (it); + } + + /* the queue is empty, or this is the oldest */ + if (it == NULL) + g_queue_push_tail_link (&queue->timers, (GList *) timer); + else + rtp_timer_queue_insert_before (queue, it, timer); +} + +static void +rtp_timer_queue_init (RtpTimerQueue * queue) +{ + queue->hashtable = g_hash_table_new (NULL, NULL); +} + +static void +rtp_timer_queue_finalize (GObject * object) +{ + RtpTimerQueue *queue = RTP_TIMER_QUEUE (object); + RtpTimer *timer; + + while ((timer = rtp_timer_queue_pop_until (queue, GST_CLOCK_TIME_NONE))) + rtp_timer_free (timer); + g_hash_table_unref (queue->hashtable); + g_assert (queue->timers.length == 0); +} + +static void +rtp_timer_queue_class_init (RtpTimerQueueClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + gobject_class->finalize = rtp_timer_queue_finalize; +} + +/** + * rtp_timer_free: + * + * Free a #RtpTimer structure. This should be used after a timer has been + * poped or unscheduled. The timer must be queued. + */ +void +rtp_timer_free (RtpTimer * timer) +{ + g_return_if_fail (timer); + g_return_if_fail (timer->queued == FALSE); + g_return_if_fail (timer->list.next == NULL); + g_return_if_fail (timer->list.prev == NULL); + + g_slice_free (RtpTimer, timer); +} + +/** + * rtp_timer_dup: + * @timer: a #RtpTimer + * + * This allow cloning a #RtpTimer structure. + * + * Returns: a copy of @timer + */ +RtpTimer * +rtp_timer_dup (const RtpTimer * timer) +{ + RtpTimer *copy = g_slice_new (RtpTimer); + memcpy (copy, timer, sizeof (RtpTimer)); + memset (©->list, 0, sizeof (GList)); + copy->queued = FALSE; + return copy; +} + +/** + * rtp_timer_queue_find: + * @queue: the #RtpTimerQueue object + * @seqnum: the sequence number of the #RtpTimer + * + * Lookup for a timer with @seqnum. Only one timer per seqnum exist withing + * the #RtpTimerQueue. This operation is o(1). + * + * Rerturn: the matching #RtpTimer or %NULL + */ +RtpTimer * +rtp_timer_queue_find (RtpTimerQueue * queue, guint seqnum) +{ + return g_hash_table_lookup (queue->hashtable, GINT_TO_POINTER (seqnum)); +} + +/** + * rtp_timer_queue_peek_earliest: + * @queue: the #RtpTimerQueue object + * + * Rerturns: the #RtpTimer with earliest timeout value + */ +RtpTimer * +rtp_timer_queue_peek_earliest (RtpTimerQueue * queue) +{ + return rtp_timer_queue_get_head (queue); +} + + +/** + * rtp_timer_queue_new: + * + * Returns: a freshly allocated #RtpTimerQueue + */ +RtpTimerQueue * +rtp_timer_queue_new (void) +{ + return g_object_new (RTP_TYPE_TIMER_QUEUE, NULL); +} + +/** + * rtp_timer_queue_insert: + * @queue: the #RtpTimerQueue object + * @timer: the #RtpTimer to insert + * + * Insert a timer into the queue. Earliest timer are at the head and then + * timer are sorted by seqnum (smaller seqnum first). This function is o(n) + * but it is expected that most timers added are schedule later, in which case + * the insertion will be faster. + * + * Returns: %FLASE is a timer with the same seqnum already existed + */ +gboolean +rtp_timer_queue_insert (RtpTimerQueue * queue, RtpTimer * timer) +{ + g_return_val_if_fail (timer->queued == FALSE, FALSE); + + if (rtp_timer_queue_find (queue, timer->seqnum)) + return FALSE; + + if (timer->timeout == -1) + rtp_timer_queue_insert_head (queue, timer); + else + rtp_timer_queue_insert_tail (queue, timer); + + g_hash_table_insert (queue->hashtable, + GINT_TO_POINTER (timer->seqnum), timer); + timer->queued = TRUE; + + return TRUE; +} + +/** + * rtp_timer_queue_reschedule: + * @queue: the #RtpTimerQueue object + * @timer: the #RtpTimer to reschedule + * + * This function moves @timer inside the queue to put it back to it's new + * location. This function is o(n) but it is assumed that nearby modification + * of the timeout will occure. + * + * Returns: %TRUE if the timer was moved + */ +gboolean +rtp_timer_queue_reschedule (RtpTimerQueue * queue, RtpTimer * timer) +{ + RtpTimer *it = timer; + + g_return_val_if_fail (timer->queued == TRUE, FALSE); + + while (rtp_timer_is_sooner (timer, rtp_timer_get_prev (it))) + it = rtp_timer_get_prev (it); + + if (it != timer) { + g_queue_unlink (&queue->timers, (GList *) timer); + rtp_timer_queue_insert_before (queue, it, timer); + return TRUE; + } + + while (rtp_timer_is_later (timer, rtp_timer_get_next (it))) + it = rtp_timer_get_next (it); + + if (it != timer) { + g_queue_unlink (&queue->timers, (GList *) timer); + rtp_timer_queue_insert_after (queue, it, timer); + return TRUE; + } + + return FALSE; +} + +/** + * rtp_timer_queue_unschedule: + * @queue: the #RtpTimerQueue + * @timer: the #RtpTimer to unschedule + * + * This removes a timer from the queue. The timer structure can be reused, + * or freed using rtp_timer_free(). This function is o(1). + */ +void +rtp_timer_queue_unschedule (RtpTimerQueue * queue, RtpTimer * timer) +{ + g_return_if_fail (timer->queued == TRUE); + + g_queue_unlink (&queue->timers, (GList *) timer); + g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (timer->seqnum)); + timer->queued = FALSE; +} + +/** + * rtp_timer_queue_pop_until: + * @queue: the #RtpTimerQueue + * @timeout: Time at witch timers expired + * + * Unschdedule and return the earliest packet that has a timeout smaller or + * equal to @timeout. The returns #RtpTimer must be freed with + * rtp_timer_free(). This function is o(1). + * + * Returns: an expired timer according to @timeout, or %NULL. + */ +RtpTimer * +rtp_timer_queue_pop_until (RtpTimerQueue * queue, GstClockTime timeout) +{ + RtpTimer *timer; + + timer = (RtpTimer *) g_queue_peek_head_link (&queue->timers); + if (!timer) + return NULL; + + if (!GST_CLOCK_TIME_IS_VALID (timer->timeout) || timer->timeout <= timeout) { + rtp_timer_queue_unschedule (queue, timer); + return timer; + } + + return NULL; +} + +/** + * rtp_timer_queue_remove_until: + * @queue: the #RtpTimerQueue + * @timeout: Time at witch timers expired + * + * Unschedule and free all timers that has a timeout smaller or equal to + * @timeout. + */ +void +rtp_timer_queue_remove_until (RtpTimerQueue * queue, GstClockTime timeout) +{ + RtpTimer *timer; + + while ((timer = rtp_timer_queue_pop_until (queue, timeout))) { + GST_LOG ("Removing expired timer #%d, %" GST_TIME_FORMAT " < %" + GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (timer->timeout), + GST_TIME_ARGS (timeout)); + rtp_timer_free (timer); + } +} + +/** + * rtp_timer_queue_remove_all: + * @queue: the #RtpTimerQueue + * + * Unschedule and free all timers from the queue. + */ +void +rtp_timer_queue_remove_all (RtpTimerQueue * queue) +{ + rtp_timer_queue_remove_until (queue, GST_CLOCK_TIME_NONE); +} + +/** + * rtp_timer_queue_set_timer: + * @queue: the #RtpTimerQueue + * @type: the #RtpTimerType + * @senum: the timer seqnum + * @num: the number of seqnum in the range (partially supported) + * @timeout: the timer timeout + * @delay: the additional delay (will be added to @timeout) + * @duration: the duration of the event related to the timer + * @offset: offset that can be used to convert the timeout to timestamp + * + * If there exist a timer with this seqnum it will be updated other a new + * timer is created and inserted into the queue. This function is o(n) except + * that it's optimized for later timer insertion. + */ +void +rtp_timer_queue_set_timer (RtpTimerQueue * queue, RtpTimerType type, + guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay, + GstClockTime duration, GstClockTimeDiff offset) +{ + RtpTimer *timer; + + timer = rtp_timer_queue_find (queue, seqnum); + if (!timer) + timer = rtp_timer_new (); + + /* for new timers or on seqnum change reset the RTX data */ + if (!timer->queued || timer->seqnum != seqnum) { + if (type == RTP_TIMER_EXPECTED) { + timer->rtx_base = timeout; + timer->rtx_delay = delay; + timer->rtx_retry = 0; + } + + timer->rtx_last = GST_CLOCK_TIME_NONE; + timer->num_rtx_retry = 0; + timer->num_rtx_received = 0; + } + + timer->type = type; + timer->seqnum = seqnum; + timer->num = num; + + if (timeout == -1) + timer->timeout = -1; + else + timer->timeout = timeout + delay + offset; + + timer->offset = offset; + timer->duration = duration; + + if (timer->queued) + rtp_timer_queue_reschedule (queue, timer); + else + rtp_timer_queue_insert (queue, timer); +} + +/** + * rtp_timer_queue_set_expected: + * @queue: the #RtpTimerQueue + * @senum: the timer seqnum + * @timeout: the timer timeout + * @delay: the additional delay (will be added to @timeout) + * @duration: the duration of the event related to the timer + * + * Specialized version of rtp_timer_queue_set_timer() that creates or updates a + * timer with type %RTP_TIMER_EXPECTED. Expected timers do not carry + * a timestamp, hence have no offset. + */ +void +rtp_timer_queue_set_expected (RtpTimerQueue * queue, guint16 seqnum, + GstClockTime timeout, GstClockTime delay, GstClockTime duration) +{ + rtp_timer_queue_set_timer (queue, RTP_TIMER_EXPECTED, seqnum, 0, timeout, + delay, duration, 0); +} + +/** + * rtp_timer_queue_set_lost: + * @queue: the #RtpTimerQueue + * @senum: the timer seqnum + * @num: the number of seqnum in the range (partially supported) + * @timeout: the timer timeout + * @duration: the duration of the event related to the timer + * @offset: offset that can be used to convert the timeout to timestamp + * + * Specialized version of rtp_timer_queue_set_timer() that creates or updates a + * timer with type %RTP_TIMER_LOST. + */ +void +rtp_timer_queue_set_lost (RtpTimerQueue * queue, guint16 seqnum, + guint num, GstClockTime timeout, GstClockTime duration, + GstClockTimeDiff offset) +{ + rtp_timer_queue_set_timer (queue, RTP_TIMER_LOST, seqnum, num, timeout, 0, + duration, offset); +} + +/** + * rtp_timer_queue_set_eos: + * @queue: the #RtpTimerQueue + * @timeout: the timer timeout + * @offset: offset that can be used to convert the timeout to timestamp + * + * Specialized version of rtp_timer_queue_set_timer() that creates or updates a + * timer with type %RTP_TIMER_EOS. There is only one such a timer and it has + * the special seqnum value -1 (FIXME this is not an invalid seqnum,). + */ +void +rtp_timer_queue_set_eos (RtpTimerQueue * queue, GstClockTime timeout, + GstClockTimeDiff offset) +{ + rtp_timer_queue_set_timer (queue, RTP_TIMER_EOS, -1, 0, timeout, 0, 0, + offset); +} + +/** + * rtp_timer_queue_set_deadline: + * @queue: the #RtpTimerQueue + * @senum: the timer seqnum + * @timeout: the timer timeout + * @offset: offset that can be used to convert the timeout to timestamp + * + * Specialized version of rtp_timer_queue_set_timer() that creates or updates a + * timer with type %RTP_TIMER_DEADLINE. There should be only one such a timer, + * its seqnum matches the first packet to be output. + */ +void +rtp_timer_queue_set_deadline (RtpTimerQueue * queue, guint16 seqnum, + GstClockTime timeout, GstClockTimeDiff offset) +{ + rtp_timer_queue_set_timer (queue, RTP_TIMER_DEADLINE, seqnum, 0, timeout, 0, + 0, offset); +} + +/** + * rtp_timer_queue_update_timer: + * @queue: the #RtpTimerQueue + * @senum: the timer seqnum + * @timeout: the timer timeout + * @delay: the additional delay (will be added to @timeout) + * @offset: offset that can be used to convert the timeout to timestamp + * @reset: if the RTX statistics should be reset + * + * A utility to update an already queued timer. + */ +void +rtp_timer_queue_update_timer (RtpTimerQueue * queue, RtpTimer * timer, + guint16 seqnum, GstClockTime timeout, GstClockTime delay, + GstClockTimeDiff offset, gboolean reset) +{ + g_return_if_fail (timer != NULL); + + if (reset) { + GST_DEBUG ("reset rtx delay %" GST_TIME_FORMAT "->%" GST_TIME_FORMAT, + GST_TIME_ARGS (timer->rtx_delay), GST_TIME_ARGS (delay)); + timer->rtx_base = timeout; + timer->rtx_delay = delay; + timer->rtx_retry = 0; + } + + if (timer->seqnum != seqnum) { + timer->num_rtx_retry = 0; + timer->num_rtx_received = 0; + + if (timer->queued) { + g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (timer->seqnum)); + g_hash_table_insert (queue->hashtable, GINT_TO_POINTER (seqnum), timer); + } + } + + if (timeout == -1) + timer->timeout = -1; + else + timer->timeout = timeout + delay + offset; + + timer->seqnum = seqnum; + timer->offset = offset; + + if (timer->queued) + rtp_timer_queue_reschedule (queue, timer); + else + rtp_timer_queue_insert (queue, timer); +} + +/** + * rtp_timer_queue_length: + * @queue: the #RtpTimerQueue + * + * Returns: the number of timers in the #RtpTimerQueue + */ +guint +rtp_timer_queue_length (RtpTimerQueue * queue) +{ + return queue->timers.length; +} diff --git a/gst/rtpmanager/rtptimerqueue.h b/gst/rtpmanager/rtptimerqueue.h new file mode 100644 index 0000000000..b172125497 --- /dev/null +++ b/gst/rtpmanager/rtptimerqueue.h @@ -0,0 +1,128 @@ +/* GStreamer RTP Manager + * + * Copyright 2007 Collabora Ltd, + * Copyright 2007 Nokia Corporation + * @author: Philippe Kalaf . + * Copyright 2007 Wim Taymans + * Copyright 2015 Kurento (http://kurento.org/) + * @author: Miguel ParĂ­s + * Copyright 2016 Pexip AS + * @author: Havard Graff + * @author: Stian Selnes + * Copyright (C) 2019 Net Insight AB + * Author: Nicolas Dufresne + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include + +#ifndef __RTP_TIMER_QUEUE_H__ +#define __RTP_TIMER_QUEUE_H__ + +#define RTP_TYPE_TIMER_QUEUE rtp_timer_queue_get_type() +G_DECLARE_FINAL_TYPE (RtpTimerQueue, rtp_timer_queue, RTP_TIMER, QUEUE, GObject); + +/** + * RtpTimerType: + * @RTP_TIMER_EXPECTED: This is used to track when to emit retranmission + * requests. They may be converted into %RTP_TIMER_LOST + * timer if the number of retry has been exhausted. + * @RTP_TIMER_LOST: This is used to track when a packet is considered lost. + * @RTP_TIMER_DEADLINE: This is used to track when the jitterbuffer should + * start pushing buffers. + * @RTP_TIMER_EOS: This is used to track when end of stream is reached. + */ +typedef enum +{ + RTP_TIMER_EXPECTED, + RTP_TIMER_LOST, + RTP_TIMER_DEADLINE, + RTP_TIMER_EOS +} RtpTimerType; + +typedef struct +{ + GList list; + gboolean queued; + + guint16 seqnum; + guint num; + RtpTimerType type; + GstClockTime timeout; + GstClockTimeDiff offset; + GstClockTime duration; + GstClockTime rtx_base; + GstClockTime rtx_delay; + GstClockTime rtx_retry; + GstClockTime rtx_last; + guint num_rtx_retry; + guint num_rtx_received; +} RtpTimer; + +void rtp_timer_free (RtpTimer * timer); +RtpTimer * rtp_timer_dup (const RtpTimer * timer); + +static inline RtpTimer * rtp_timer_get_next (RtpTimer * timer) +{ + GList *list = (GList *) timer; + return (RtpTimer *) list->next; +} + +static inline RtpTimer * rtp_timer_get_prev (RtpTimer * timer) +{ + GList *list = (GList *) timer; + return (RtpTimer *) list->prev; +} + +RtpTimerQueue * rtp_timer_queue_new (void); + +RtpTimer * rtp_timer_queue_find (RtpTimerQueue * queue, guint seqnum); + +RtpTimer * rtp_timer_queue_peek_earliest (RtpTimerQueue * queue); + +gboolean rtp_timer_queue_insert (RtpTimerQueue * queue, RtpTimer * timer); + +gboolean rtp_timer_queue_reschedule (RtpTimerQueue * queue, RtpTimer * timer); + +void rtp_timer_queue_unschedule (RtpTimerQueue * queue, RtpTimer * timer); + +RtpTimer * rtp_timer_queue_pop_until (RtpTimerQueue * queue, GstClockTime timeout); + +void rtp_timer_queue_remove_until (RtpTimerQueue * queue, GstClockTime timeout); + +void rtp_timer_queue_remove_all (RtpTimerQueue * queue); + +void rtp_timer_queue_set_timer (RtpTimerQueue * queue, RtpTimerType type, + guint16 seqnum, guint num, GstClockTime timeout, + GstClockTime delay, GstClockTime duration, + GstClockTimeDiff offset); +void rtp_timer_queue_set_expected (RtpTimerQueue * queue, guint16 seqnum, + GstClockTime timeout, GstClockTime delay, + GstClockTime duration); +void rtp_timer_queue_set_lost (RtpTimerQueue * queue, guint16 seqnum, + guint num, GstClockTime timeout, + GstClockTime duration, GstClockTimeDiff offset); +void rtp_timer_queue_set_eos (RtpTimerQueue * queue, GstClockTime timeout, + GstClockTimeDiff offset); +void rtp_timer_queue_set_deadline (RtpTimerQueue * queue, guint16 seqnum, + GstClockTime timeout, GstClockTimeDiff offset); +void rtp_timer_queue_update_timer (RtpTimerQueue * queue, RtpTimer * timer, guint16 seqnum, + GstClockTime timeout, GstClockTime delay, + GstClockTimeDiff offset, gboolean reset); +guint rtp_timer_queue_length (RtpTimerQueue * queue); + +#endif diff --git a/tests/check/Makefile.am b/tests/check/Makefile.am index c046a4c989..37b3929995 100644 --- a/tests/check/Makefile.am +++ b/tests/check/Makefile.am @@ -600,8 +600,14 @@ elements_videocrop_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(CFLA elements_videofilter_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS) elements_videofilter_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstvideo-$(GST_API_VERSION) $(LDADD) -elements_rtpjitterbuffer_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS) -elements_rtpjitterbuffer_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(LDADD) +elements_rtpjitterbuffer_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_NET_CFLAGS) $(GIO_CFLAGS) $(CFLAGS) $(AM_CFLAGS) +elements_rtpjitterbuffer_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(GST_NET_LIBS) $(GIO_LIBS) $(LDADD) +elements_rtpjitterbuffer_SOURCES = \ + elements/rtpjitterbuffer.c \ + ../../gst/rtpmanager/gstrtpjitterbuffer.c \ + ../../gst/rtpmanager/rtpjitterbuffer.c \ + ../../gst/rtpmanager/rtpstats.c \ + ../../gst/rtpmanager/rtptimerqueue.c elements_rtprtx_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS) elements_rtprtx_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(LDADD) diff --git a/tests/check/elements/rtpjitterbuffer.c b/tests/check/elements/rtpjitterbuffer.c index e471cd5f73..6e07b4d4f8 100644 --- a/tests/check/elements/rtpjitterbuffer.c +++ b/tests/check/elements/rtpjitterbuffer.c @@ -31,6 +31,9 @@ #include +#include "gst/rtpmanager/gstrtpjitterbuffer.h" +#include "gst/rtpmanager/rtptimerqueue.h" + /* For ease of programming we use globals to keep refs for our floating * src and sink pads we create; otherwise we always have to do get_pad, * get_peer, and then remove references in every test function */ @@ -2752,13 +2755,313 @@ GST_START_TEST (test_rtx_does_not_affect_pts_calculation) GST_END_TEST; +GST_START_TEST (test_timer_queue_set_timer) +{ + RtpTimerQueue *queue = rtp_timer_queue_new (); + RtpTimer *timer10, *timer0; + + rtp_timer_queue_set_timer (queue, RTP_TIMER_EXPECTED, 10, 0, + 1 * GST_SECOND, 2 * GST_SECOND, 5 * GST_SECOND, 0); + timer10 = rtp_timer_queue_find (queue, 10); + fail_unless (timer10); + fail_unless_equals_int (10, timer10->seqnum); + fail_unless_equals_int (0, timer10->num); + fail_unless_equals_int (RTP_TIMER_EXPECTED, timer10->type); + /* timer10->timeout = timerout + delay */ + fail_unless_equals_uint64 (3 * GST_SECOND, timer10->timeout); + fail_unless_equals_uint64 (5 * GST_SECOND, timer10->duration); + fail_unless_equals_uint64 (1 * GST_SECOND, timer10->rtx_base); + fail_unless_equals_uint64 (2 * GST_SECOND, timer10->rtx_delay); + fail_unless_equals_uint64 (0, timer10->rtx_retry); + fail_unless_equals_uint64 (GST_CLOCK_TIME_NONE, timer10->rtx_last); + fail_unless_equals_int (0, timer10->num_rtx_retry); + fail_unless_equals_int (0, timer10->num_rtx_received); + + rtp_timer_queue_set_timer (queue, RTP_TIMER_LOST, 0, 10, + 0 * GST_SECOND, 2 * GST_SECOND, 0, 0); + timer0 = rtp_timer_queue_find (queue, 0); + fail_unless (timer0); + fail_unless_equals_int (0, timer0->seqnum); + fail_unless_equals_int (10, timer0->num); + fail_unless_equals_int (RTP_TIMER_LOST, timer0->type); + fail_unless_equals_uint64 (2 * GST_SECOND, timer0->timeout); + fail_unless_equals_uint64 (0, timer0->duration); + fail_unless_equals_uint64 (0, timer0->rtx_base); + fail_unless_equals_uint64 (0, timer0->rtx_delay); + fail_unless_equals_uint64 (0, timer0->rtx_retry); + fail_unless_equals_uint64 (GST_CLOCK_TIME_NONE, timer0->rtx_last); + fail_unless_equals_int (0, timer0->num_rtx_retry); + fail_unless_equals_int (0, timer0->num_rtx_received); + + /* also check order while at it */ + fail_unless (timer10->list.next == NULL); + fail_unless (timer10->list.prev == (GList *) timer0); + fail_unless (timer0->list.next == (GList *) timer10); + fail_unless (timer0->list.prev == NULL); + + g_object_unref (queue); +} + +GST_END_TEST; + +GST_START_TEST (test_timer_queue_insert_head) +{ + RtpTimerQueue *queue = rtp_timer_queue_new (); + RtpTimer *timer, *next, *prev; + + rtp_timer_queue_set_deadline (queue, 1, -1, 0); + rtp_timer_queue_set_deadline (queue, 3, -1, 0); + rtp_timer_queue_set_deadline (queue, 2, -1, 0); + rtp_timer_queue_set_deadline (queue, 0, -1, 0); + + timer = rtp_timer_queue_find (queue, 0); + fail_if (timer == NULL); + fail_unless_equals_int (0, timer->seqnum); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_unless (prev == NULL); + fail_if (next == NULL); + fail_unless_equals_int (1, next->seqnum); + + timer = rtp_timer_queue_find (queue, 3); + fail_if (timer == NULL); + fail_unless_equals_int (3, timer->seqnum); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_if (prev == NULL); + fail_unless_equals_int (2, prev->seqnum); + fail_unless (next == NULL); + + timer = rtp_timer_queue_find (queue, 2); + fail_if (timer == NULL); + fail_unless_equals_int (2, timer->seqnum); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_if (prev == NULL); + fail_if (next == NULL); + fail_unless_equals_int (1, prev->seqnum); + fail_unless_equals_int (3, next->seqnum); + + timer = rtp_timer_queue_find (queue, 1); + fail_if (timer == NULL); + fail_unless_equals_int (1, timer->seqnum); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_if (prev == NULL); + fail_if (next == NULL); + fail_unless_equals_int (0, prev->seqnum); + fail_unless_equals_int (2, next->seqnum); + + g_object_unref (queue); +} + +GST_END_TEST; + +GST_START_TEST (test_timer_queue_reschedule) +{ + RtpTimerQueue *queue = rtp_timer_queue_new (); + RtpTimer *timer, *next, *prev; + + rtp_timer_queue_set_deadline (queue, 3, 1 * GST_SECOND, 0); + rtp_timer_queue_set_deadline (queue, 1, 2 * GST_SECOND, 0); + rtp_timer_queue_set_deadline (queue, 2, 3 * GST_SECOND, 0); + rtp_timer_queue_set_deadline (queue, 0, 4 * GST_SECOND, 0); + + timer = rtp_timer_queue_find (queue, 1); + fail_if (timer == NULL); + + /* move to head, making sure seqnum order is respected */ + rtp_timer_queue_set_deadline (queue, 1, 1 * GST_SECOND, 0); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_unless (prev == NULL); + fail_if (next == NULL); + fail_unless_equals_int (3, next->seqnum); + + /* move head back */ + rtp_timer_queue_set_deadline (queue, 1, 2 * GST_SECOND, 0); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_if (prev == NULL); + fail_if (next == NULL); + fail_unless_equals_int (3, prev->seqnum); + fail_unless_equals_int (2, next->seqnum); + + /* move to tail */ + timer = rtp_timer_queue_find (queue, 2); + fail_if (timer == NULL); + rtp_timer_queue_set_deadline (queue, 2, 4 * GST_SECOND, 0); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_if (prev == NULL); + fail_unless (next == NULL); + fail_unless_equals_int (0, prev->seqnum); + + /* move tail back */ + rtp_timer_queue_set_deadline (queue, 2, 3 * GST_SECOND, 0); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_if (prev == NULL); + fail_if (next == NULL); + fail_unless_equals_int (1, prev->seqnum); + fail_unless_equals_int (0, next->seqnum); + + /* not moving toward head */ + rtp_timer_queue_set_deadline (queue, 2, 2 * GST_SECOND, 0); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_if (prev == NULL); + fail_if (next == NULL); + fail_unless_equals_int (1, prev->seqnum); + fail_unless_equals_int (0, next->seqnum); + + /* not moving toward tail */ + rtp_timer_queue_set_deadline (queue, 2, 3 * GST_SECOND, 0); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_if (prev == NULL); + fail_if (next == NULL); + fail_unless_equals_int (1, prev->seqnum); + fail_unless_equals_int (0, next->seqnum); + + /* inner move toward head */ + rtp_timer_queue_set_deadline (queue, 2, GST_SECOND + GST_SECOND / 2, 0); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_if (prev == NULL); + fail_if (next == NULL); + fail_unless_equals_int (3, prev->seqnum); + fail_unless_equals_int (1, next->seqnum); + + /* inner move toward tail */ + rtp_timer_queue_set_deadline (queue, 2, 3 * GST_SECOND, 0); + next = (RtpTimer *) timer->list.next; + prev = (RtpTimer *) timer->list.prev; + fail_if (prev == NULL); + fail_if (next == NULL); + fail_unless_equals_int (1, prev->seqnum); + fail_unless_equals_int (0, next->seqnum); + + g_object_unref (queue); +} + +GST_END_TEST; + +GST_START_TEST (test_timer_queue_pop_until) +{ + RtpTimerQueue *queue = rtp_timer_queue_new (); + RtpTimer *timer; + + rtp_timer_queue_set_deadline (queue, 2, 2 * GST_SECOND, 0); + rtp_timer_queue_set_deadline (queue, 1, 1 * GST_SECOND, 0); + rtp_timer_queue_set_deadline (queue, 0, -1, 0); + + timer = rtp_timer_queue_pop_until (queue, 1 * GST_SECOND); + fail_if (timer == NULL); + fail_unless_equals_int (0, timer->seqnum); + rtp_timer_free (timer); + + timer = rtp_timer_queue_pop_until (queue, 1 * GST_SECOND); + fail_if (timer == NULL); + fail_unless_equals_int (1, timer->seqnum); + rtp_timer_free (timer); + + timer = rtp_timer_queue_pop_until (queue, 1 * GST_SECOND); + fail_unless (timer == NULL); + + g_object_unref (queue); +} + +GST_END_TEST; + +GST_START_TEST (test_timer_queue_update_timer_seqnum) +{ + RtpTimerQueue *queue = rtp_timer_queue_new (); + RtpTimer *timer; + + rtp_timer_queue_set_deadline (queue, 2, 2 * GST_SECOND, 0); + + timer = rtp_timer_queue_find (queue, 2); + fail_if (timer == NULL); + + rtp_timer_queue_update_timer (queue, timer, 3, 3 * GST_SECOND, 0, 0, FALSE); + + timer = rtp_timer_queue_find (queue, 2); + fail_unless (timer == NULL); + timer = rtp_timer_queue_find (queue, 3); + fail_if (timer == NULL); + + fail_unless_equals_int (1, rtp_timer_queue_length (queue)); + + g_object_unref (queue); +} + +GST_END_TEST; + +GST_START_TEST (test_timer_queue_dup_timer) +{ + RtpTimerQueue *queue = rtp_timer_queue_new (); + RtpTimer *timer; + + rtp_timer_queue_set_deadline (queue, 2, 2 * GST_SECOND, 0); + + timer = rtp_timer_queue_find (queue, 2); + fail_if (timer == NULL); + + timer = rtp_timer_dup (timer); + timer->seqnum = 3; + rtp_timer_queue_insert (queue, timer); + + fail_unless_equals_int (2, rtp_timer_queue_length (queue)); + + g_object_unref (queue); +} + +GST_END_TEST; + +GST_START_TEST (test_timer_queue_timer_offset) +{ + RtpTimerQueue *queue = rtp_timer_queue_new (); + RtpTimer *timer; + + rtp_timer_queue_set_timer (queue, RTP_TIMER_EXPECTED, 2, 0, 2 * GST_SECOND, + GST_MSECOND, 0, GST_USECOND); + + timer = rtp_timer_queue_find (queue, 2); + fail_if (timer == NULL); + fail_unless_equals_uint64 (2 * GST_SECOND + GST_MSECOND + GST_USECOND, + timer->timeout); + fail_unless_equals_int64 (GST_USECOND, timer->offset); + + rtp_timer_queue_update_timer (queue, timer, 2, 3 * GST_SECOND, + 2 * GST_MSECOND, 2 * GST_USECOND, FALSE); + fail_unless_equals_uint64 (3 * GST_SECOND + 2 * GST_MSECOND + + 2 * GST_USECOND, timer->timeout); + fail_unless_equals_int64 (2 * GST_USECOND, timer->offset); + + g_object_unref (queue); +} + +GST_END_TEST; + static Suite * rtpjitterbuffer_suite (void) { Suite *s = suite_create ("rtpjitterbuffer"); TCase *tc_chain = tcase_create ("general"); + gst_element_register (NULL, "rtpjitterbuffer", GST_RANK_NONE, + GST_TYPE_RTP_JITTER_BUFFER); + suite_add_tcase (s, tc_chain); + tcase_add_test (tc_chain, test_timer_queue_set_timer); + tcase_add_test (tc_chain, test_timer_queue_insert_head); + tcase_add_test (tc_chain, test_timer_queue_reschedule); + tcase_add_test (tc_chain, test_timer_queue_pop_until); + tcase_add_test (tc_chain, test_timer_queue_update_timer_seqnum); + tcase_add_test (tc_chain, test_timer_queue_dup_timer); + tcase_add_test (tc_chain, test_timer_queue_timer_offset); + tcase_add_test (tc_chain, test_push_forward_seq); tcase_add_test (tc_chain, test_push_backward_seq); tcase_add_test (tc_chain, test_push_unordered); diff --git a/tests/check/meson.build b/tests/check/meson.build index 3f644e2bd1..78cf84dc87 100644 --- a/tests/check/meson.build +++ b/tests/check/meson.build @@ -68,7 +68,11 @@ good_tests = [ [ 'elements/rtpbin_buffer_list' ], [ 'elements/rtpcollision' ], [ 'elements/rtpfunnel' ], - [ 'elements/rtpjitterbuffer' ], + [ 'elements/rtpjitterbuffer', false, [gstrtp_dep], + ['../../gst/rtpmanager/gstrtpjitterbuffer.c', + '../../gst/rtpmanager/rtpjitterbuffer.c', + '../../gst/rtpmanager/rtpstats.c', + '../../gst/rtpmanager/rtptimerqueue.c']], [ 'elements/rtpmux' ], [ 'elements/rtprtx' ], [ 'elements/rtpsession' ], @@ -188,7 +192,7 @@ foreach t : good_tests env.set('GST_REGISTRY', join_paths(meson.current_build_dir(), '@0@.registry'.format(test_name))) exe = executable(test_name, fname, extra_sources, - include_directories : [configinc], + include_directories : [configinc, libsinc], c_args : ['-DHAVE_CONFIG_H=1' ] + test_defines, dependencies : [libm] + test_deps + extra_deps, )