gstreamer/gst/rtpmanager/async_jitter_queue.c
Wim Taymans 1e1c12e87e gst/rtpmanager/async_jitter_queue.c: Fix leak when flushing.
Original commit message from CVS:
* gst/rtpmanager/async_jitter_queue.c:
(async_jitter_queue_set_flushing_unlocked):
Fix leak when flushing.
* gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_clear_pt_map),
(gst_rtp_bin_class_init):
* gst/rtpmanager/gstrtpbin.h:
Add clear-pt-map signal.
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_flush_stop),
(gst_rtp_jitter_buffer_sink_event), (gst_rtp_jitter_buffer_loop):
Init clock-rate to -1 to mark unknow clock rate.
Fix flushing.
2007-05-14 15:28:36 +00:00

691 lines
19 KiB
C

/*
* Async Jitter Queue based on g_async_queue
* This code is GST RTP smart and deals with timestamps
*
* Farsight Voice+Video library
* Copyright 2007 Collabora Ltd,
* Copyright 2007 Nokia Corporation
* @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
*
* This is an async queue that has a buffering mecanism based on the set low
* and high threshold. When the lower threshold is reached, the queue will
* fill itself up until the higher threshold is reached before allowing any
* pops to occur. This allows a jitterbuffer of at least min threshold items
* to be available.
*/
/* GLIB - Library of useful routines for C programming
* Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
*
* GAsyncQueue: asynchronous queue implementation, based on Gqueue.
* Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
/*
* MT safe
*/
#include "config.h"
#include "async_jitter_queue.h"
#include <gst/gst.h>
#include <gst/rtp/gstrtpbuffer.h>
#define DEFAULT_LOW_THRESHOLD 0.1
#define DEFAULT_HIGH_THRESHOLD 0.9
struct _AsyncJitterQueue
{
GMutex *mutex;
GCond *cond;
GQueue *queue;
guint waiting_threads;
gint32 ref_count;
gfloat low_threshold;
gfloat high_threshold;
guint32 max_queue_length;
gboolean buffering;
gboolean pop_flushing;
gboolean pop_blocking;
guint pops_remaining;
guint32 tail_buffer_duration;
};
/**
* async_jitter_queue_new:
*
* Creates a new asynchronous queue with the initial reference count of 1.
*
* Return value: the new #AsyncJitterQueue.
**/
AsyncJitterQueue *
async_jitter_queue_new (void)
{
AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1);
retval->mutex = g_mutex_new ();
retval->cond = g_cond_new ();
retval->queue = g_queue_new ();
retval->waiting_threads = 0;
retval->ref_count = 1;
retval->low_threshold = DEFAULT_LOW_THRESHOLD;
retval->high_threshold = DEFAULT_HIGH_THRESHOLD;
retval->buffering = TRUE; /* we need to buffer initially */
retval->pop_flushing = TRUE;
retval->pop_blocking = TRUE;
retval->pops_remaining = 0;
retval->tail_buffer_duration = 0;
return retval;
}
/* checks buffering state and wakes up waiting pops */
void
signal_waiting_threads (AsyncJitterQueue * queue)
{
if (async_jitter_queue_length_ts_units_unlocked (queue) >=
queue->high_threshold * queue->max_queue_length) {
GST_DEBUG ("stop buffering");
queue->buffering = FALSE;
}
if (queue->waiting_threads > 0) {
if (!queue->buffering) {
g_cond_signal (queue->cond);
}
}
}
/**
* async_jitter_queue_ref:
* @queue: a #AsyncJitterQueue.
*
* Increases the reference count of the asynchronous @queue by 1. You
* do not need to hold the lock to call this function.
*
* Returns: the @queue that was passed in (since 2.6)
**/
AsyncJitterQueue *
async_jitter_queue_ref (AsyncJitterQueue * queue)
{
g_return_val_if_fail (queue, NULL);
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
g_atomic_int_inc (&queue->ref_count);
return queue;
}
/**
* async_jitter_queue_ref_unlocked:
* @queue: a #AsyncJitterQueue.
*
* Increases the reference count of the asynchronous @queue by 1.
**/
void
async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
g_atomic_int_inc (&queue->ref_count);
}
/**
* async_jitter_queue_set_low_threshold:
* @queue: a #AsyncJitterQueue.
* @threshold: the lower threshold (fraction of max size)
*
* Sets the low threshold on the queue. This threshold indicates the minimum
* number of items allowed in the queue before we refill it up to the set
* maximum threshold.
**/
void
async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue,
gfloat threshold)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
queue->low_threshold = threshold;
}
/**
* async_jitter_queue_set_max_threshold:
* @queue: a #AsyncJitterQueue.
* @threshold: the higher threshold (fraction of max size)
*
* Sets the high threshold on the queue. This threshold indicates the amount of
* items to fill in the queue before releasing any blocking pop calls. This
* blocking mecanism is only triggered when we reach the low threshold and must
* refill the queue.
**/
void
async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue,
gfloat threshold)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
queue->high_threshold = threshold;
}
/* set the maximum queue length in RTP timestamp units */
void
async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue,
guint32 max_length)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
queue->max_queue_length = max_length;
}
GQueue *
async_jitter_queue_get_g_queue (AsyncJitterQueue * queue)
{
g_return_val_if_fail (queue, NULL);
return queue->queue;
}
static guint32
calculate_ts_diff (guint32 high_ts, guint32 low_ts)
{
/* it needs to work if ts wraps */
if (high_ts >= low_ts) {
return high_ts - low_ts;
} else {
return high_ts + G_MAXUINT32 + 1 - low_ts;
}
}
/* this function returns the length of the queue in timestamp units. It will
* also add the duration of the last buffer in the queue */
/* FIXME This function wrongly assumes that there are no missing packets inside
* the buffer, in reality it needs to check for gaps and subsctract those from
* the total */
guint32
async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue)
{
guint32 tail_ts;
guint32 head_ts;
guint32 ret;
GstBuffer *head;
GstBuffer *tail;
g_return_val_if_fail (queue, 0);
if (queue->queue->length < 2) {
return 0;
}
tail = g_queue_peek_tail (queue->queue);
head = g_queue_peek_head (queue->queue);
if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head))
return 0;
tail_ts = gst_rtp_buffer_get_timestamp (tail);
head_ts = gst_rtp_buffer_get_timestamp (head);
ret = calculate_ts_diff (head_ts, tail_ts);
/* let's add the duration of the tail buffer */
ret += queue->tail_buffer_duration;
return ret;
}
/**
* async_jitter_queue_unref_and_unlock:
* @queue: a #AsyncJitterQueue.
*
* Decreases the reference count of the asynchronous @queue by 1 and
* releases the lock. This function must be called while holding the
* @queue's lock. If the reference count went to 0, the @queue will be
* destroyed and the memory allocated will be freed.
**/
void
async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
g_mutex_unlock (queue->mutex);
async_jitter_queue_unref (queue);
}
/**
* async_jitter_queue_unref:
* @queue: a #AsyncJitterQueue.
*
* Decreases the reference count of the asynchronous @queue by 1. If
* the reference count went to 0, the @queue will be destroyed and the
* memory allocated will be freed. So you are not allowed to use the
* @queue afterwards, as it might have disappeared. You do not need to
* hold the lock to call this function.
**/
void
async_jitter_queue_unref (AsyncJitterQueue * queue)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
if (g_atomic_int_dec_and_test (&queue->ref_count)) {
g_return_if_fail (queue->waiting_threads == 0);
g_mutex_free (queue->mutex);
if (queue->cond)
g_cond_free (queue->cond);
g_queue_free (queue->queue);
g_free (queue);
}
}
/**
* async_jitter_queue_lock:
* @queue: a #AsyncJitterQueue.
*
* Acquires the @queue's lock. After that you can only call the
* <function>async_jitter_queue_*_unlocked()</function> function variants on that
* @queue. Otherwise it will deadlock.
**/
void
async_jitter_queue_lock (AsyncJitterQueue * queue)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
g_mutex_lock (queue->mutex);
}
/**
* async_jitter_queue_unlock:
* @queue: a #AsyncJitterQueue.
*
* Releases the queue's lock.
**/
void
async_jitter_queue_unlock (AsyncJitterQueue * queue)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
g_mutex_unlock (queue->mutex);
}
/**
* async_jitter_queue_push:
* @queue: a #AsyncJitterQueue.
* @data: @data to push into the @queue.
*
* Pushes the @data into the @queue. @data must not be %NULL.
**/
void
async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
g_return_if_fail (data);
g_mutex_lock (queue->mutex);
async_jitter_queue_push_unlocked (queue, data);
g_mutex_unlock (queue->mutex);
}
/**
* async_jitter_queue_push_unlocked:
* @queue: a #AsyncJitterQueue.
* @data: @data to push into the @queue.
*
* Pushes the @data into the @queue. @data must not be %NULL. This
* function must be called while holding the @queue's lock.
**/
void
async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
g_return_if_fail (data);
g_queue_push_head (queue->queue, data);
signal_waiting_threads (queue);
}
/**
* async_jitter_queue_push_sorted:
* @queue: a #AsyncJitterQueue
* @data: the @data to push into the @queue
* @func: the #GCompareDataFunc is used to sort @queue. This function
* is passed two elements of the @queue. The function should return
* 0 if they are equal, a negative value if the first element
* should be higher in the @queue or a positive value if the first
* element should be lower in the @queue than the second element.
* @user_data: user data passed to @func.
*
* Inserts @data into @queue using @func to determine the new
* position.
*
* This function requires that the @queue is sorted before pushing on
* new elements.
*
* This function will lock @queue before it sorts the queue and unlock
* it when it is finished.
*
* For an example of @func see async_jitter_queue_sort().
*
* Since: 2.10
**/
gboolean
async_jitter_queue_push_sorted (AsyncJitterQueue * queue,
gpointer data, GCompareDataFunc func, gpointer user_data)
{
g_return_val_if_fail (queue != NULL, FALSE);
gboolean ret;
g_mutex_lock (queue->mutex);
ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data);
g_mutex_unlock (queue->mutex);
return ret;
}
/**
* async_jitter_queue_push_sorted_unlocked:
* @queue: a #AsyncJitterQueue
* @data: the @data to push into the @queue
* @func: the #GCompareDataFunc is used to sort @queue. This function
* is passed two elements of the @queue. The function should return
* 0 if they are equal, a negative value if the first element
* should be higher in the @queue or a positive value if the first
* element should be lower in the @queue than the second element.
* @user_data: user data passed to @func.
*
* Inserts @data into @queue using @func to determine the new
* position.
*
* This function requires that the @queue is sorted before pushing on
* new elements.
*
* If @GCompareDataFunc returns 0, this function does not insert @data and
* return FALSE.
*
* This function is called while holding the @queue's lock.
*
* For an example of @func see async_jitter_queue_sort().
*
* Since: 2.10
**/
gboolean
async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue,
gpointer data, GCompareDataFunc func, gpointer user_data)
{
GList *list;
gint func_ret = TRUE;
g_return_val_if_fail (queue != NULL, FALSE);
list = queue->queue->head;
while (list && (func_ret = func (list->data, data, user_data)) < 0)
list = list->next;
if (func_ret == 0) {
return FALSE;
}
if (list) {
g_queue_insert_before (queue->queue, list, data);
} else {
g_queue_push_tail (queue->queue, data);
}
signal_waiting_threads (queue);
return TRUE;
}
void
async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue,
GList * sibling, gpointer data)
{
g_return_if_fail (queue != NULL);
g_queue_insert_before (queue->queue, sibling, data);
signal_waiting_threads (queue);
}
static gpointer
async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
{
gpointer retval;
GstBuffer *tail_buffer = NULL;
guint tsunits;
if (queue->pop_flushing)
return NULL;
while (queue->pop_blocking) {
queue->waiting_threads++;
g_cond_wait (queue->cond, queue->mutex);
queue->waiting_threads--;
if (queue->pop_flushing)
return NULL;
}
tsunits = async_jitter_queue_length_ts_units_unlocked (queue);
GST_DEBUG ("tsunits %u, pops: %u, limit %d", tsunits, queue->pops_remaining,
(int) (queue->low_threshold * queue->max_queue_length));
if (tsunits <= queue->low_threshold * queue->max_queue_length
&& queue->pops_remaining == 0) {
if (!queue->buffering) {
GST_DEBUG ("start buffering");
queue->buffering = TRUE;
queue->pops_remaining = queue->queue->length;
}
GST_DEBUG ("wait for data");
while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
queue->waiting_threads++;
g_cond_wait (queue->cond, queue->mutex);
queue->waiting_threads--;
if (queue->pop_flushing)
return NULL;
}
}
retval = g_queue_pop_tail (queue->queue);
if (queue->pops_remaining)
queue->pops_remaining--;
tail_buffer = g_queue_peek_tail (queue->queue);
if (tail_buffer) {
if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) {
queue->tail_buffer_duration = 0;
} else if (gst_rtp_buffer_get_seq (tail_buffer)
- gst_rtp_buffer_get_seq (retval) == 1) {
queue->tail_buffer_duration =
calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer),
gst_rtp_buffer_get_timestamp (retval));
} else {
/* There is a sequence number gap -> we can't calculate the duration
* let's just set it to 0 */
queue->tail_buffer_duration = 0;
}
}
g_assert (retval);
return retval;
}
/**
* async_jitter_queue_pop:
* @queue: a #AsyncJitterQueue.
*
* Pops data from the @queue. This function blocks until data become
* available. If pop is disabled, tis function return NULL.
*
* Return value: data from the queue.
**/
gpointer
async_jitter_queue_pop (AsyncJitterQueue * queue)
{
gpointer retval;
g_return_val_if_fail (queue, NULL);
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
g_mutex_lock (queue->mutex);
retval = async_jitter_queue_pop_intern_unlocked (queue);
g_mutex_unlock (queue->mutex);
return retval;
}
/**
* async_jitter_queue_pop_unlocked:
* @queue: a #AsyncJitterQueue.
*
* Pops data from the @queue. This function blocks until data become
* available. This function must be called while holding the @queue's
* lock.
*
* Return value: data from the queue.
**/
gpointer
async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue)
{
g_return_val_if_fail (queue, NULL);
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
return async_jitter_queue_pop_intern_unlocked (queue);
}
/**
* async_jitter_queue_length:
* @queue: a #AsyncJitterQueue.
*
* Returns the length of the queue
* Return value: the length of the @queue.
**/
gint
async_jitter_queue_length (AsyncJitterQueue * queue)
{
gint retval;
g_return_val_if_fail (queue, 0);
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
g_mutex_lock (queue->mutex);
retval = queue->queue->length;
g_mutex_unlock (queue->mutex);
return retval;
}
/**
* async_jitter_queue_length_unlocked:
* @queue: a #AsyncJitterQueue.
*
* Returns the length of the queue.
*
* Return value: the length of the @queue.
**/
gint
async_jitter_queue_length_unlocked (AsyncJitterQueue * queue)
{
g_return_val_if_fail (queue, 0);
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
return queue->queue->length;
}
/**
* async_jitter_queue_set_flushing_unlocked:
* @queue: a #AsyncJitterQueue.
* @free_func: a function to call to free the elements
* @user_data: user data passed to @free_func
*
* This function is used to set/unset flushing. If flushing is set any
* waiting/blocked pops will be unblocked. Any subsequent calls to pop will
* return NULL. Flushing is set by default.
*/
void
async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue,
GFunc free_func, gpointer user_data)
{
gpointer elem;
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
queue->pop_flushing = TRUE;
/* let's unblock any remaining pops */
if (queue->waiting_threads > 0)
g_cond_broadcast (queue->cond);
/* free data from queue */
while ((elem = g_queue_pop_head (queue->queue)))
free_func (elem, user_data);
}
/**
* async_jitter_queue_unset_flushing_unlocked:
* @queue: a #AsyncJitterQueue.
* @free_func: a function to call to free the elements
* @user_data: user data passed to @free_func
*
* This function is used to set/unset flushing. If flushing is set any
* waiting/blocked pops will be unblocked. Any subsequent calls to pop will
* return NULL. Flushing is set by default.
*/
void
async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
queue->pop_flushing = FALSE;
/* let's unblock any remaining pops */
if (queue->waiting_threads > 0)
g_cond_broadcast (queue->cond);
}
/**
* async_jitter_queue_set_blocking_unlocked:
* @queue: a #AsyncJitterQueue.
* @enabled: a boolean to enable/disable blocking
*
* This function is used to enable/disable blocking. If blocking is enabled any
* pops will be blocked until the queue is unblocked. The queue is blocked by
* default.
*/
void
async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue,
gboolean blocking)
{
g_return_if_fail (queue);
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
queue->pop_blocking = blocking;
/* let's unblock any remaining pops */
if (queue->waiting_threads > 0)
g_cond_broadcast (queue->cond);
}