mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-28 19:20:35 +00:00
gst/rtpmanager/: Remove complicated async queue and replace with more simple jitterbuffer code while also fixing some...
Original commit message from CVS: * gst/rtpmanager/Makefile.am: * gst/rtpmanager/async_jitter_queue.c: * gst/rtpmanager/async_jitter_queue.h: * gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_class_init), (rtp_jitter_buffer_init), (rtp_jitter_buffer_finalize), (rtp_jitter_buffer_new), (compare_seqnum), (rtp_jitter_buffer_insert), (rtp_jitter_buffer_pop), (rtp_jitter_buffer_flush), (rtp_jitter_buffer_num_packets), (rtp_jitter_buffer_get_ts_diff): * gst/rtpmanager/rtpjitterbuffer.h: Remove complicated async queue and replace with more simple jitterbuffer code while also fixing some bugs. * gst/rtpmanager/gstrtpbin-marshal.list: * gst/rtpmanager/gstrtpbin.c: (on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated), (on_bye_ssrc), (on_bye_timeout), (on_timeout), (create_session), (gst_rtp_bin_class_init), (create_recv_rtp), (create_send_rtp): * gst/rtpmanager/gstrtpbin.h: * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_init), (gst_rtp_jitter_buffer_dispose), (gst_jitter_buffer_sink_parse_caps), (gst_rtp_jitter_buffer_flush_start), (gst_rtp_jitter_buffer_flush_stop), (gst_rtp_jitter_buffer_change_state), (gst_rtp_jitter_buffer_sink_event), (gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop), (gst_rtp_jitter_buffer_set_property): * gst/rtpmanager/gstrtpsession.c: (on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated), (on_bye_ssrc), (on_bye_timeout), (on_timeout), (gst_rtp_session_class_init), (gst_rtp_session_init): * gst/rtpmanager/gstrtpsession.h: * gst/rtpmanager/rtpsession.c: (on_bye_ssrc), (session_cleanup): Use new jitterbuffer code. Expose some new signals in preparation for handling EOS.
This commit is contained in:
parent
366a756552
commit
cdd82f2a95
12 changed files with 710 additions and 965 deletions
|
@ -13,10 +13,10 @@ BUILT_SOURCES = $(built_sources) $(built_headers)
|
|||
libgstrtpmanager_la_SOURCES = gstrtpmanager.c \
|
||||
gstrtpbin.c \
|
||||
gstrtpclient.c \
|
||||
async_jitter_queue.c \
|
||||
gstrtpjitterbuffer.c \
|
||||
gstrtpptdemux.c \
|
||||
gstrtpssrcdemux.c \
|
||||
rtpjitterbuffer.c \
|
||||
rtpsession.c \
|
||||
rtpsource.c \
|
||||
rtpstats.c \
|
||||
|
@ -27,10 +27,10 @@ nodist_libgstrtpmanager_la_SOURCES = \
|
|||
|
||||
noinst_HEADERS = gstrtpbin.h \
|
||||
gstrtpclient.h \
|
||||
async_jitter_queue.h \
|
||||
gstrtpjitterbuffer.h \
|
||||
gstrtpptdemux.h \
|
||||
gstrtpssrcdemux.h \
|
||||
rtpjitterbuffer.h \
|
||||
rtpsession.h \
|
||||
rtpsource.h \
|
||||
rtpstats.h \
|
||||
|
|
|
@ -1,692 +0,0 @@
|
|||
/*
|
||||
* Async Jitter Queue based on g_async_queue
|
||||
* This code is GST RTP smart and deals with timestamps
|
||||
*
|
||||
* Farsight Voice+Video library
|
||||
* Copyright 2007 Collabora Ltd,
|
||||
* Copyright 2007 Nokia Corporation
|
||||
* @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
|
||||
*
|
||||
* This is an async queue that has a buffering mecanism based on the set low
|
||||
* and high threshold. When the lower threshold is reached, the queue will
|
||||
* fill itself up until the higher threshold is reached before allowing any
|
||||
* pops to occur. This allows a jitterbuffer of at least min threshold items
|
||||
* to be available.
|
||||
*/
|
||||
|
||||
/* GLIB - Library of useful routines for C programming
|
||||
* Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
|
||||
*
|
||||
* GAsyncQueue: asynchronous queue implementation, based on Gqueue.
|
||||
* Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with this library; if not, write to the
|
||||
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
|
||||
* Boston, MA 02111-1307, USA.
|
||||
*/
|
||||
|
||||
/*
|
||||
* MT safe
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#include "async_jitter_queue.h"
|
||||
|
||||
#include <gst/gst.h>
|
||||
#include <gst/rtp/gstrtpbuffer.h>
|
||||
|
||||
#define DEFAULT_LOW_THRESHOLD 0.1
|
||||
#define DEFAULT_HIGH_THRESHOLD 0.9
|
||||
|
||||
struct _AsyncJitterQueue
|
||||
{
|
||||
GMutex *mutex;
|
||||
GCond *cond;
|
||||
GQueue *queue;
|
||||
guint waiting_threads;
|
||||
gint32 ref_count;
|
||||
gfloat low_threshold;
|
||||
gfloat high_threshold;
|
||||
guint32 max_queue_length;
|
||||
gboolean buffering;
|
||||
gboolean pop_flushing;
|
||||
gboolean pop_blocking;
|
||||
guint pops_remaining;
|
||||
guint32 tail_buffer_duration;
|
||||
};
|
||||
|
||||
/**
|
||||
* async_jitter_queue_new:
|
||||
*
|
||||
* Creates a new asynchronous queue with the initial reference count of 1.
|
||||
*
|
||||
* Return value: the new #AsyncJitterQueue.
|
||||
**/
|
||||
AsyncJitterQueue *
|
||||
async_jitter_queue_new (void)
|
||||
{
|
||||
AsyncJitterQueue *retval = g_new (AsyncJitterQueue, 1);
|
||||
|
||||
retval->mutex = g_mutex_new ();
|
||||
retval->cond = g_cond_new ();
|
||||
retval->queue = g_queue_new ();
|
||||
retval->waiting_threads = 0;
|
||||
retval->ref_count = 1;
|
||||
retval->low_threshold = DEFAULT_LOW_THRESHOLD;
|
||||
retval->high_threshold = DEFAULT_HIGH_THRESHOLD;
|
||||
retval->buffering = TRUE; /* we need to buffer initially */
|
||||
retval->pop_flushing = TRUE;
|
||||
retval->pop_blocking = TRUE;
|
||||
retval->pops_remaining = 0;
|
||||
retval->tail_buffer_duration = 0;
|
||||
return retval;
|
||||
}
|
||||
|
||||
/* checks buffering state and wakes up waiting pops */
|
||||
void
|
||||
signal_waiting_threads (AsyncJitterQueue * queue)
|
||||
{
|
||||
if (async_jitter_queue_length_ts_units_unlocked (queue) >=
|
||||
queue->high_threshold * queue->max_queue_length) {
|
||||
GST_DEBUG ("stop buffering");
|
||||
queue->buffering = FALSE;
|
||||
}
|
||||
|
||||
if (queue->waiting_threads > 0) {
|
||||
if (!queue->buffering) {
|
||||
g_cond_signal (queue->cond);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_ref:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
*
|
||||
* Increases the reference count of the asynchronous @queue by 1. You
|
||||
* do not need to hold the lock to call this function.
|
||||
*
|
||||
* Returns: the @queue that was passed in (since 2.6)
|
||||
**/
|
||||
AsyncJitterQueue *
|
||||
async_jitter_queue_ref (AsyncJitterQueue * queue)
|
||||
{
|
||||
g_return_val_if_fail (queue, NULL);
|
||||
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
|
||||
|
||||
g_atomic_int_inc (&queue->ref_count);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_ref_unlocked:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
*
|
||||
* Increases the reference count of the asynchronous @queue by 1.
|
||||
**/
|
||||
void
|
||||
async_jitter_queue_ref_unlocked (AsyncJitterQueue * queue)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
|
||||
g_atomic_int_inc (&queue->ref_count);
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_set_low_threshold:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
* @threshold: the lower threshold (fraction of max size)
|
||||
*
|
||||
* Sets the low threshold on the queue. This threshold indicates the minimum
|
||||
* number of items allowed in the queue before we refill it up to the set
|
||||
* maximum threshold.
|
||||
**/
|
||||
void
|
||||
async_jitter_queue_set_low_threshold (AsyncJitterQueue * queue,
|
||||
gfloat threshold)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
|
||||
queue->low_threshold = threshold;
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_set_max_threshold:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
* @threshold: the higher threshold (fraction of max size)
|
||||
*
|
||||
* Sets the high threshold on the queue. This threshold indicates the amount of
|
||||
* items to fill in the queue before releasing any blocking pop calls. This
|
||||
* blocking mecanism is only triggered when we reach the low threshold and must
|
||||
* refill the queue.
|
||||
**/
|
||||
void
|
||||
async_jitter_queue_set_high_threshold (AsyncJitterQueue * queue,
|
||||
gfloat threshold)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
|
||||
queue->high_threshold = threshold;
|
||||
}
|
||||
|
||||
/* set the maximum queue length in RTP timestamp units */
|
||||
void
|
||||
async_jitter_queue_set_max_queue_length (AsyncJitterQueue * queue,
|
||||
guint32 max_length)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
|
||||
queue->max_queue_length = max_length;
|
||||
}
|
||||
|
||||
GQueue *
|
||||
async_jitter_queue_get_g_queue (AsyncJitterQueue * queue)
|
||||
{
|
||||
g_return_val_if_fail (queue, NULL);
|
||||
|
||||
return queue->queue;
|
||||
}
|
||||
|
||||
static guint32
|
||||
calculate_ts_diff (guint32 high_ts, guint32 low_ts)
|
||||
{
|
||||
/* it needs to work if ts wraps */
|
||||
if (high_ts >= low_ts) {
|
||||
return high_ts - low_ts;
|
||||
} else {
|
||||
return high_ts + G_MAXUINT32 + 1 - low_ts;
|
||||
}
|
||||
}
|
||||
|
||||
/* this function returns the length of the queue in timestamp units. It will
|
||||
* also add the duration of the last buffer in the queue */
|
||||
/* FIXME This function wrongly assumes that there are no missing packets inside
|
||||
* the buffer, in reality it needs to check for gaps and subsctract those from
|
||||
* the total */
|
||||
guint32
|
||||
async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue * queue)
|
||||
{
|
||||
guint32 tail_ts;
|
||||
guint32 head_ts;
|
||||
guint32 ret;
|
||||
GstBuffer *head;
|
||||
GstBuffer *tail;
|
||||
|
||||
g_return_val_if_fail (queue, 0);
|
||||
|
||||
if (queue->queue->length < 2) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
tail = g_queue_peek_tail (queue->queue);
|
||||
head = g_queue_peek_head (queue->queue);
|
||||
|
||||
if (!GST_IS_BUFFER (tail) || !GST_IS_BUFFER (head))
|
||||
return 0;
|
||||
|
||||
tail_ts = gst_rtp_buffer_get_timestamp (tail);
|
||||
head_ts = gst_rtp_buffer_get_timestamp (head);
|
||||
|
||||
ret = calculate_ts_diff (head_ts, tail_ts);
|
||||
|
||||
/* let's add the duration of the tail buffer */
|
||||
ret += queue->tail_buffer_duration;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_unref_and_unlock:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
*
|
||||
* Decreases the reference count of the asynchronous @queue by 1 and
|
||||
* releases the lock. This function must be called while holding the
|
||||
* @queue's lock. If the reference count went to 0, the @queue will be
|
||||
* destroyed and the memory allocated will be freed.
|
||||
**/
|
||||
void
|
||||
async_jitter_queue_unref_and_unlock (AsyncJitterQueue * queue)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
|
||||
g_mutex_unlock (queue->mutex);
|
||||
async_jitter_queue_unref (queue);
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_unref:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
*
|
||||
* Decreases the reference count of the asynchronous @queue by 1. If
|
||||
* the reference count went to 0, the @queue will be destroyed and the
|
||||
* memory allocated will be freed. So you are not allowed to use the
|
||||
* @queue afterwards, as it might have disappeared. You do not need to
|
||||
* hold the lock to call this function.
|
||||
**/
|
||||
void
|
||||
async_jitter_queue_unref (AsyncJitterQueue * queue)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
|
||||
if (g_atomic_int_dec_and_test (&queue->ref_count)) {
|
||||
g_return_if_fail (queue->waiting_threads == 0);
|
||||
g_mutex_free (queue->mutex);
|
||||
if (queue->cond)
|
||||
g_cond_free (queue->cond);
|
||||
g_queue_free (queue->queue);
|
||||
g_free (queue);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_lock:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
*
|
||||
* Acquires the @queue's lock. After that you can only call the
|
||||
* <function>async_jitter_queue_*_unlocked()</function> function variants on that
|
||||
* @queue. Otherwise it will deadlock.
|
||||
**/
|
||||
void
|
||||
async_jitter_queue_lock (AsyncJitterQueue * queue)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
|
||||
g_mutex_lock (queue->mutex);
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_unlock:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
*
|
||||
* Releases the queue's lock.
|
||||
**/
|
||||
void
|
||||
async_jitter_queue_unlock (AsyncJitterQueue * queue)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
|
||||
g_mutex_unlock (queue->mutex);
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_push:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
* @data: @data to push into the @queue.
|
||||
*
|
||||
* Pushes the @data into the @queue. @data must not be %NULL.
|
||||
**/
|
||||
void
|
||||
async_jitter_queue_push (AsyncJitterQueue * queue, gpointer data)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
g_return_if_fail (data);
|
||||
|
||||
g_mutex_lock (queue->mutex);
|
||||
async_jitter_queue_push_unlocked (queue, data);
|
||||
g_mutex_unlock (queue->mutex);
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_push_unlocked:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
* @data: @data to push into the @queue.
|
||||
*
|
||||
* Pushes the @data into the @queue. @data must not be %NULL. This
|
||||
* function must be called while holding the @queue's lock.
|
||||
**/
|
||||
void
|
||||
async_jitter_queue_push_unlocked (AsyncJitterQueue * queue, gpointer data)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
g_return_if_fail (data);
|
||||
|
||||
g_queue_push_head (queue->queue, data);
|
||||
|
||||
signal_waiting_threads (queue);
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_push_sorted:
|
||||
* @queue: a #AsyncJitterQueue
|
||||
* @data: the @data to push into the @queue
|
||||
* @func: the #GCompareDataFunc is used to sort @queue. This function
|
||||
* is passed two elements of the @queue. The function should return
|
||||
* 0 if they are equal, a negative value if the first element
|
||||
* should be higher in the @queue or a positive value if the first
|
||||
* element should be lower in the @queue than the second element.
|
||||
* @user_data: user data passed to @func.
|
||||
*
|
||||
* Inserts @data into @queue using @func to determine the new
|
||||
* position.
|
||||
*
|
||||
* This function requires that the @queue is sorted before pushing on
|
||||
* new elements.
|
||||
*
|
||||
* This function will lock @queue before it sorts the queue and unlock
|
||||
* it when it is finished.
|
||||
*
|
||||
* For an example of @func see async_jitter_queue_sort().
|
||||
*
|
||||
* Since: 2.10
|
||||
**/
|
||||
gboolean
|
||||
async_jitter_queue_push_sorted (AsyncJitterQueue * queue,
|
||||
gpointer data, GCompareDataFunc func, gpointer user_data)
|
||||
{
|
||||
gboolean ret;
|
||||
|
||||
g_return_val_if_fail (queue != NULL, FALSE);
|
||||
|
||||
g_mutex_lock (queue->mutex);
|
||||
ret = async_jitter_queue_push_sorted_unlocked (queue, data, func, user_data);
|
||||
g_mutex_unlock (queue->mutex);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_push_sorted_unlocked:
|
||||
* @queue: a #AsyncJitterQueue
|
||||
* @data: the @data to push into the @queue
|
||||
* @func: the #GCompareDataFunc is used to sort @queue. This function
|
||||
* is passed two elements of the @queue. The function should return
|
||||
* 0 if they are equal, a negative value if the first element
|
||||
* should be higher in the @queue or a positive value if the first
|
||||
* element should be lower in the @queue than the second element.
|
||||
* @user_data: user data passed to @func.
|
||||
*
|
||||
* Inserts @data into @queue using @func to determine the new
|
||||
* position.
|
||||
*
|
||||
* This function requires that the @queue is sorted before pushing on
|
||||
* new elements.
|
||||
*
|
||||
* If @GCompareDataFunc returns 0, this function does not insert @data and
|
||||
* return FALSE.
|
||||
*
|
||||
* This function is called while holding the @queue's lock.
|
||||
*
|
||||
* For an example of @func see async_jitter_queue_sort().
|
||||
*
|
||||
* Since: 2.10
|
||||
**/
|
||||
gboolean
|
||||
async_jitter_queue_push_sorted_unlocked (AsyncJitterQueue * queue,
|
||||
gpointer data, GCompareDataFunc func, gpointer user_data)
|
||||
{
|
||||
GList *list;
|
||||
gint func_ret = TRUE;
|
||||
|
||||
g_return_val_if_fail (queue != NULL, FALSE);
|
||||
|
||||
list = queue->queue->head;
|
||||
while (list && (func_ret = func (list->data, data, user_data)) < 0)
|
||||
list = list->next;
|
||||
|
||||
if (func_ret == 0) {
|
||||
return FALSE;
|
||||
}
|
||||
if (list) {
|
||||
g_queue_insert_before (queue->queue, list, data);
|
||||
} else {
|
||||
g_queue_push_tail (queue->queue, data);
|
||||
}
|
||||
|
||||
signal_waiting_threads (queue);
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
void
|
||||
async_jitter_queue_insert_after_unlocked (AsyncJitterQueue * queue,
|
||||
GList * sibling, gpointer data)
|
||||
{
|
||||
g_return_if_fail (queue != NULL);
|
||||
|
||||
g_queue_insert_before (queue->queue, sibling, data);
|
||||
|
||||
signal_waiting_threads (queue);
|
||||
}
|
||||
|
||||
static gpointer
|
||||
async_jitter_queue_pop_intern_unlocked (AsyncJitterQueue * queue)
|
||||
{
|
||||
gpointer retval;
|
||||
GstBuffer *tail_buffer = NULL;
|
||||
guint tsunits;
|
||||
|
||||
if (queue->pop_flushing)
|
||||
return NULL;
|
||||
|
||||
while (queue->pop_blocking) {
|
||||
queue->waiting_threads++;
|
||||
g_cond_wait (queue->cond, queue->mutex);
|
||||
queue->waiting_threads--;
|
||||
if (queue->pop_flushing)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
tsunits = async_jitter_queue_length_ts_units_unlocked (queue);
|
||||
|
||||
GST_DEBUG ("tsunits %u, pops: %u, limit %d", tsunits, queue->pops_remaining,
|
||||
(int) (queue->low_threshold * queue->max_queue_length));
|
||||
|
||||
if (tsunits <= queue->low_threshold * queue->max_queue_length
|
||||
&& queue->pops_remaining == 0) {
|
||||
if (!queue->buffering) {
|
||||
GST_DEBUG ("start buffering");
|
||||
queue->buffering = TRUE;
|
||||
queue->pops_remaining = queue->queue->length;
|
||||
}
|
||||
|
||||
GST_DEBUG ("wait for data");
|
||||
while (!g_queue_peek_tail (queue->queue) || queue->pop_blocking) {
|
||||
queue->waiting_threads++;
|
||||
g_cond_wait (queue->cond, queue->mutex);
|
||||
queue->waiting_threads--;
|
||||
if (queue->pop_flushing)
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
retval = g_queue_pop_tail (queue->queue);
|
||||
if (queue->pops_remaining)
|
||||
queue->pops_remaining--;
|
||||
|
||||
tail_buffer = g_queue_peek_tail (queue->queue);
|
||||
if (tail_buffer) {
|
||||
if (!GST_IS_BUFFER (tail_buffer) || !GST_IS_BUFFER (retval)) {
|
||||
queue->tail_buffer_duration = 0;
|
||||
} else if (gst_rtp_buffer_get_seq (tail_buffer)
|
||||
- gst_rtp_buffer_get_seq (retval) == 1) {
|
||||
queue->tail_buffer_duration =
|
||||
calculate_ts_diff (gst_rtp_buffer_get_timestamp (tail_buffer),
|
||||
gst_rtp_buffer_get_timestamp (retval));
|
||||
} else {
|
||||
/* There is a sequence number gap -> we can't calculate the duration
|
||||
* let's just set it to 0 */
|
||||
queue->tail_buffer_duration = 0;
|
||||
}
|
||||
}
|
||||
|
||||
g_assert (retval);
|
||||
|
||||
return retval;
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_pop:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
*
|
||||
* Pops data from the @queue. This function blocks until data become
|
||||
* available. If pop is disabled, tis function return NULL.
|
||||
*
|
||||
* Return value: data from the queue.
|
||||
**/
|
||||
gpointer
|
||||
async_jitter_queue_pop (AsyncJitterQueue * queue)
|
||||
{
|
||||
gpointer retval;
|
||||
|
||||
g_return_val_if_fail (queue, NULL);
|
||||
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
|
||||
|
||||
g_mutex_lock (queue->mutex);
|
||||
retval = async_jitter_queue_pop_intern_unlocked (queue);
|
||||
g_mutex_unlock (queue->mutex);
|
||||
|
||||
return retval;
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_pop_unlocked:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
*
|
||||
* Pops data from the @queue. This function blocks until data become
|
||||
* available. This function must be called while holding the @queue's
|
||||
* lock.
|
||||
*
|
||||
* Return value: data from the queue.
|
||||
**/
|
||||
gpointer
|
||||
async_jitter_queue_pop_unlocked (AsyncJitterQueue * queue)
|
||||
{
|
||||
g_return_val_if_fail (queue, NULL);
|
||||
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, NULL);
|
||||
|
||||
return async_jitter_queue_pop_intern_unlocked (queue);
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_length:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
*
|
||||
* Returns the length of the queue
|
||||
* Return value: the length of the @queue.
|
||||
**/
|
||||
gint
|
||||
async_jitter_queue_length (AsyncJitterQueue * queue)
|
||||
{
|
||||
gint retval;
|
||||
|
||||
g_return_val_if_fail (queue, 0);
|
||||
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
|
||||
|
||||
g_mutex_lock (queue->mutex);
|
||||
retval = queue->queue->length;
|
||||
g_mutex_unlock (queue->mutex);
|
||||
|
||||
return retval;
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_length_unlocked:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
*
|
||||
* Returns the length of the queue.
|
||||
*
|
||||
* Return value: the length of the @queue.
|
||||
**/
|
||||
gint
|
||||
async_jitter_queue_length_unlocked (AsyncJitterQueue * queue)
|
||||
{
|
||||
g_return_val_if_fail (queue, 0);
|
||||
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
|
||||
|
||||
return queue->queue->length;
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_set_flushing_unlocked:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
* @free_func: a function to call to free the elements
|
||||
* @user_data: user data passed to @free_func
|
||||
*
|
||||
* This function is used to set/unset flushing. If flushing is set any
|
||||
* waiting/blocked pops will be unblocked. Any subsequent calls to pop will
|
||||
* return NULL. Flushing is set by default.
|
||||
*/
|
||||
void
|
||||
async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue * queue,
|
||||
GFunc free_func, gpointer user_data)
|
||||
{
|
||||
gpointer elem;
|
||||
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
|
||||
queue->pop_flushing = TRUE;
|
||||
/* let's unblock any remaining pops */
|
||||
if (queue->waiting_threads > 0)
|
||||
g_cond_broadcast (queue->cond);
|
||||
/* free data from queue */
|
||||
while ((elem = g_queue_pop_head (queue->queue)))
|
||||
free_func (elem, user_data);
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_unset_flushing_unlocked:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
* @free_func: a function to call to free the elements
|
||||
* @user_data: user data passed to @free_func
|
||||
*
|
||||
* This function is used to set/unset flushing. If flushing is set any
|
||||
* waiting/blocked pops will be unblocked. Any subsequent calls to pop will
|
||||
* return NULL. Flushing is set by default.
|
||||
*/
|
||||
void
|
||||
async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue * queue)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
|
||||
queue->pop_flushing = FALSE;
|
||||
/* let's unblock any remaining pops */
|
||||
if (queue->waiting_threads > 0)
|
||||
g_cond_broadcast (queue->cond);
|
||||
}
|
||||
|
||||
/**
|
||||
* async_jitter_queue_set_blocking_unlocked:
|
||||
* @queue: a #AsyncJitterQueue.
|
||||
* @enabled: a boolean to enable/disable blocking
|
||||
*
|
||||
* This function is used to enable/disable blocking. If blocking is enabled any
|
||||
* pops will be blocked until the queue is unblocked. The queue is blocked by
|
||||
* default.
|
||||
*/
|
||||
void
|
||||
async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue * queue,
|
||||
gboolean blocking)
|
||||
{
|
||||
g_return_if_fail (queue);
|
||||
g_return_if_fail (g_atomic_int_get (&queue->ref_count) > 0);
|
||||
|
||||
queue->pop_blocking = blocking;
|
||||
/* let's unblock any remaining pops */
|
||||
if (queue->waiting_threads > 0)
|
||||
g_cond_broadcast (queue->cond);
|
||||
}
|
|
@ -1,130 +0,0 @@
|
|||
/* Async Jitter Queue based on g_async_queue
|
||||
*
|
||||
* Farsight Voice+Video library
|
||||
* Copyright 2007 Collabora Ltd,
|
||||
* Copyright 2007 Nokia Corporation
|
||||
* @author: Philippe Khalaf <philippe.khalaf@collabora.co.uk>.
|
||||
*/
|
||||
|
||||
/* GLIB - Library of useful routines for C programming
|
||||
* Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public
|
||||
* License along with this library; if not, write to the
|
||||
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
|
||||
* Boston, MA 02111-1307, USA.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Modified by the GLib Team and others 1997-2000. See the AUTHORS
|
||||
* file for a list of people on the GLib Team. See the ChangeLog
|
||||
* files for a list of changes. These files are distributed with
|
||||
* GLib at ftp://ftp.gtk.org/pub/gtk/.
|
||||
*/
|
||||
|
||||
#ifndef __ASYNCJITTERQUEUE_H__
|
||||
#define __ASYNCJITTERQUEUE_H__
|
||||
|
||||
#include <glib.h>
|
||||
#include <glib/gthread.h>
|
||||
|
||||
G_BEGIN_DECLS
|
||||
|
||||
typedef struct _AsyncJitterQueue AsyncJitterQueue;
|
||||
|
||||
/* Asyncronous Queues, can be used to communicate between threads
|
||||
*/
|
||||
|
||||
/* Get a new AsyncJitterQueue with the ref_count 1 */
|
||||
AsyncJitterQueue* async_jitter_queue_new (void);
|
||||
|
||||
/* Lock and unlock a AsyncJitterQueue. All functions lock the queue for
|
||||
* themselves, but in certain cirumstances you want to hold the lock longer,
|
||||
* thus you lock the queue, call the *_unlocked functions and unlock it again.
|
||||
*/
|
||||
void async_jitter_queue_lock (AsyncJitterQueue *queue);
|
||||
void async_jitter_queue_unlock (AsyncJitterQueue *queue);
|
||||
|
||||
/* Ref and unref the AsyncJitterQueue. */
|
||||
AsyncJitterQueue* async_jitter_queue_ref (AsyncJitterQueue *queue);
|
||||
void async_jitter_queue_unref (AsyncJitterQueue *queue);
|
||||
#ifndef G_DISABLE_DEPRECATED
|
||||
/* You don't have to hold the lock for calling *_ref and *_unref anymore. */
|
||||
void async_jitter_queue_ref_unlocked (AsyncJitterQueue *queue);
|
||||
void async_jitter_queue_unref_and_unlock (AsyncJitterQueue *queue);
|
||||
#endif /* !G_DISABLE_DEPRECATED */
|
||||
|
||||
void async_jitter_queue_set_low_threshold (AsyncJitterQueue *queue,
|
||||
gfloat threshold);
|
||||
void async_jitter_queue_set_high_threshold (AsyncJitterQueue *queue,
|
||||
gfloat threshold);
|
||||
|
||||
void async_jitter_queue_set_max_queue_length (AsyncJitterQueue *queue,
|
||||
guint32 max_length);
|
||||
|
||||
/* Push data into the async queue. Must not be NULL. */
|
||||
void async_jitter_queue_push (AsyncJitterQueue *queue,
|
||||
gpointer data);
|
||||
void async_jitter_queue_push_unlocked (AsyncJitterQueue *queue,
|
||||
gpointer data);
|
||||
gboolean async_jitter_queue_push_sorted (AsyncJitterQueue *queue,
|
||||
gpointer data,
|
||||
GCompareDataFunc func,
|
||||
gpointer user_data);
|
||||
|
||||
void async_jitter_queue_insert_after_unlocked(AsyncJitterQueue *queue,
|
||||
GList *sibling,
|
||||
gpointer data);
|
||||
|
||||
gboolean async_jitter_queue_push_sorted_unlocked(AsyncJitterQueue *queue,
|
||||
gpointer data,
|
||||
GCompareDataFunc func,
|
||||
gpointer user_data);
|
||||
|
||||
/* Pop data from the async queue. When no data is there, the thread is blocked
|
||||
* until data arrives. */
|
||||
gpointer async_jitter_queue_pop (AsyncJitterQueue *queue);
|
||||
gpointer async_jitter_queue_pop_unlocked (AsyncJitterQueue *queue);
|
||||
|
||||
/* Try to pop data. NULL is returned in case of empty queue. */
|
||||
gpointer async_jitter_queue_try_pop (AsyncJitterQueue *queue);
|
||||
gpointer async_jitter_queue_try_pop_unlocked (AsyncJitterQueue *queue);
|
||||
|
||||
/* Wait for data until at maximum until end_time is reached. NULL is returned
|
||||
* in case of empty queue. */
|
||||
gpointer async_jitter_queue_timed_pop (AsyncJitterQueue *queue,
|
||||
GTimeVal *end_time);
|
||||
gpointer async_jitter_queue_timed_pop_unlocked (AsyncJitterQueue *queue,
|
||||
GTimeVal *end_time);
|
||||
|
||||
/* Return the length of the queue. Negative values mean that threads
|
||||
* are waiting, positve values mean that there are entries in the
|
||||
* queue. Actually this function returns the length of the queue minus
|
||||
* the number of waiting threads, async_jitter_queue_length == 0 could also
|
||||
* mean 'n' entries in the queue and 'n' thread waiting. Such can
|
||||
* happen due to locking of the queue or due to scheduling. */
|
||||
gint async_jitter_queue_length (AsyncJitterQueue *queue);
|
||||
gint async_jitter_queue_length_unlocked (AsyncJitterQueue *queue);
|
||||
|
||||
void async_jitter_queue_set_flushing_unlocked (AsyncJitterQueue* queue,
|
||||
GFunc free_func, gpointer user_data);
|
||||
void async_jitter_queue_unset_flushing_unlocked (AsyncJitterQueue* queue);
|
||||
void async_jitter_queue_set_blocking_unlocked (AsyncJitterQueue* queue,
|
||||
gboolean blocking);
|
||||
guint32
|
||||
async_jitter_queue_length_ts_units_unlocked (AsyncJitterQueue *queue);
|
||||
|
||||
G_END_DECLS
|
||||
|
||||
#endif /* __ASYNCJITTERQUEUE_H__ */
|
||||
|
|
@ -2,3 +2,4 @@ UINT:UINT
|
|||
BOXED:UINT
|
||||
BOXED:UINT,UINT
|
||||
VOID:UINT,OBJECT
|
||||
VOID:UINT,UINT
|
||||
|
|
|
@ -158,6 +158,13 @@ enum
|
|||
{
|
||||
SIGNAL_REQUEST_PT_MAP,
|
||||
SIGNAL_CLEAR_PT_MAP,
|
||||
|
||||
SIGNAL_ON_NEW_SSRC,
|
||||
SIGNAL_ON_SSRC_COLLISION,
|
||||
SIGNAL_ON_SSRC_VALIDATED,
|
||||
SIGNAL_ON_BYE_SSRC,
|
||||
SIGNAL_ON_BYE_TIMEOUT,
|
||||
SIGNAL_ON_TIMEOUT,
|
||||
LAST_SIGNAL
|
||||
};
|
||||
|
||||
|
@ -258,6 +265,48 @@ find_session_by_id (GstRTPBin * rtpbin, gint id)
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void
|
||||
on_new_ssrc (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
|
||||
{
|
||||
g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
|
||||
sess->id, ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_ssrc_collision (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
|
||||
{
|
||||
g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
|
||||
sess->id, ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_ssrc_validated (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
|
||||
{
|
||||
g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
|
||||
sess->id, ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_bye_ssrc (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
|
||||
{
|
||||
g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
|
||||
sess->id, ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_bye_timeout (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
|
||||
{
|
||||
g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
|
||||
sess->id, ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_timeout (GstElement * session, guint32 ssrc, GstRTPBinSession * sess)
|
||||
{
|
||||
g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
|
||||
sess->id, ssrc);
|
||||
}
|
||||
|
||||
/* create a session with the given id. Must be called with RTP_BIN_LOCK */
|
||||
static GstRTPBinSession *
|
||||
create_session (GstRTPBin * rtpbin, gint id)
|
||||
|
@ -284,6 +333,18 @@ create_session (GstRTPBin * rtpbin, gint id)
|
|||
g_signal_connect (session, "request-pt-map",
|
||||
(GCallback) pt_map_requested, sess);
|
||||
|
||||
g_signal_connect (sess->session, "on-new-ssrc",
|
||||
(GCallback) on_new_ssrc, sess);
|
||||
g_signal_connect (sess->session, "on-ssrc-collision",
|
||||
(GCallback) on_ssrc_collision, sess);
|
||||
g_signal_connect (sess->session, "on-ssrc-validated",
|
||||
(GCallback) on_ssrc_validated, sess);
|
||||
g_signal_connect (sess->session, "on-bye-ssrc",
|
||||
(GCallback) on_bye_ssrc, sess);
|
||||
g_signal_connect (sess->session, "on-bye-timeout",
|
||||
(GCallback) on_bye_timeout, sess);
|
||||
g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
|
||||
|
||||
gst_bin_add (GST_BIN_CAST (rtpbin), session);
|
||||
gst_element_set_state (session, GST_STATE_PLAYING);
|
||||
gst_bin_add (GST_BIN_CAST (rtpbin), demux);
|
||||
|
@ -557,6 +618,86 @@ gst_rtp_bin_class_init (GstRTPBinClass * klass)
|
|||
G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRTPBinClass, clear_pt_map),
|
||||
NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
|
||||
|
||||
/**
|
||||
* GstRTPBin::on-new-ssrc:
|
||||
* @rtpbin: the object which received the signal
|
||||
* @session: the session
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of a new SSRC that entered @session.
|
||||
*/
|
||||
gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC] =
|
||||
g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_new_ssrc),
|
||||
NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
|
||||
G_TYPE_UINT, G_TYPE_UINT);
|
||||
/**
|
||||
* GstRTPBin::on-ssrc_collision:
|
||||
* @rtpbin: the object which received the signal
|
||||
* @session: the session
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify when we have an SSRC collision
|
||||
*/
|
||||
gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION] =
|
||||
g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_ssrc_collision),
|
||||
NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
|
||||
G_TYPE_UINT, G_TYPE_UINT);
|
||||
/**
|
||||
* GstRTPBin::on-ssrc_validated:
|
||||
* @rtpbin: the object which received the signal
|
||||
* @session: the session
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of a new SSRC that became validated.
|
||||
*/
|
||||
gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED] =
|
||||
g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_ssrc_validated),
|
||||
NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
|
||||
G_TYPE_UINT, G_TYPE_UINT);
|
||||
|
||||
/**
|
||||
* GstRTPBin::on-bye-ssrc:
|
||||
* @rtpbin: the object which received the signal
|
||||
* @session: the session
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of an SSRC that became inactive because of a BYE packet.
|
||||
*/
|
||||
gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC] =
|
||||
g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_bye_ssrc),
|
||||
NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
|
||||
G_TYPE_UINT, G_TYPE_UINT);
|
||||
/**
|
||||
* GstRTPBin::on-bye-timeout:
|
||||
* @rtpbin: the object which received the signal
|
||||
* @session: the session
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of an SSRC that has timed out because of BYE
|
||||
*/
|
||||
gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT] =
|
||||
g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_bye_timeout),
|
||||
NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
|
||||
G_TYPE_UINT, G_TYPE_UINT);
|
||||
/**
|
||||
* GstRTPBin::on-timeout:
|
||||
* @rtpbin: the object which received the signal
|
||||
* @session: the session
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of an SSRC that has timed out
|
||||
*/
|
||||
gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT] =
|
||||
g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPBinClass, on_timeout),
|
||||
NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2,
|
||||
G_TYPE_UINT, G_TYPE_UINT);
|
||||
|
||||
gstelement_class->provide_clock =
|
||||
GST_DEBUG_FUNCPTR (gst_rtp_bin_provide_clock);
|
||||
gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
|
||||
|
|
|
@ -57,8 +57,14 @@ struct _GstRTPBinClass {
|
|||
|
||||
/* get the caps for pt */
|
||||
GstCaps* (*request_pt_map) (GstRTPBin *rtpbin, guint session, guint pt);
|
||||
|
||||
void (*clear_pt_map) (GstRTPBin *rtpbin);
|
||||
|
||||
void (*on_new_ssrc) (GstRTPBin *rtpbin, guint session, guint32 ssrc);
|
||||
void (*on_ssrc_collision) (GstRTPBin *rtpbin, guint session, guint32 ssrc);
|
||||
void (*on_ssrc_validated) (GstRTPBin *rtpbin, guint session, guint32 ssrc);
|
||||
void (*on_bye_ssrc) (GstRTPBin *rtpbin, guint session, guint32 ssrc);
|
||||
void (*on_bye_timeout) (GstRTPBin *rtpbin, guint session, guint32 ssrc);
|
||||
void (*on_timeout) (GstRTPBin *rtpbin, guint session, guint32 ssrc);
|
||||
};
|
||||
|
||||
GType gst_rtp_bin_get_type (void);
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
* Copyright 2007 Collabora Ltd,
|
||||
* Copyright 2007 Nokia Corporation
|
||||
* @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
|
||||
* Copyright 2007 Wim Taymans <wim@fluendo.com>
|
||||
* Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Library General Public
|
||||
|
@ -72,7 +72,7 @@
|
|||
#include "gstrtpbin-marshal.h"
|
||||
|
||||
#include "gstrtpjitterbuffer.h"
|
||||
#include "async_jitter_queue.h"
|
||||
#include "rtpjitterbuffer.h"
|
||||
|
||||
GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
|
||||
#define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
|
||||
|
@ -87,7 +87,7 @@ GST_ELEMENT_DETAILS ("RTP packet jitter-buffer",
|
|||
"Filter/Network/RTP",
|
||||
"A buffer that deals with network jitter and other transmission faults",
|
||||
"Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
|
||||
"Wim Taymans <wim@fluendo.com>");
|
||||
"Wim Taymans <wim.taymans@gmail.com>");
|
||||
|
||||
/* RTPJitterBuffer signals and args */
|
||||
enum
|
||||
|
@ -107,11 +107,32 @@ enum
|
|||
PROP_DROP_ON_LATENCY
|
||||
};
|
||||
|
||||
#define JBUF_LOCK(priv) (g_mutex_lock ((priv)->jbuf_lock))
|
||||
|
||||
#define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \
|
||||
JBUF_LOCK (priv); \
|
||||
if (priv->srcresult != GST_FLOW_OK) \
|
||||
goto label; \
|
||||
} G_STMT_END
|
||||
|
||||
#define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock))
|
||||
#define JBUF_WAIT(priv) (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock))
|
||||
|
||||
#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \
|
||||
JBUF_WAIT(priv); \
|
||||
if (priv->srcresult != GST_FLOW_OK) \
|
||||
goto label; \
|
||||
} G_STMT_END
|
||||
|
||||
#define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond))
|
||||
|
||||
struct _GstRTPJitterBufferPrivate
|
||||
{
|
||||
GstPad *sinkpad, *srcpad;
|
||||
|
||||
AsyncJitterQueue *queue;
|
||||
RTPJitterBuffer *jbuf;
|
||||
GMutex *jbuf_lock;
|
||||
GCond *jbuf_cond;
|
||||
|
||||
/* properties */
|
||||
guint latency_ms;
|
||||
|
@ -122,12 +143,16 @@ struct _GstRTPJitterBufferPrivate
|
|||
/* the next expected seqnum */
|
||||
guint32 next_seqnum;
|
||||
|
||||
/* state */
|
||||
gboolean eos;
|
||||
|
||||
/* clock rate and rtp timestamp offset */
|
||||
gint32 clock_rate;
|
||||
gint64 clock_base;
|
||||
|
||||
/* when we are shutting down */
|
||||
GstFlowReturn srcresult;
|
||||
gboolean blocked;
|
||||
|
||||
/* for sync */
|
||||
GstSegment segment;
|
||||
|
@ -292,9 +317,9 @@ gst_rtp_jitter_buffer_init (GstRTPJitterBuffer * jitterbuffer,
|
|||
priv->latency_ms = DEFAULT_LATENCY_MS;
|
||||
priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
|
||||
|
||||
priv->queue = async_jitter_queue_new ();
|
||||
async_jitter_queue_set_low_threshold (priv->queue, LOW_THRESHOLD);
|
||||
async_jitter_queue_set_high_threshold (priv->queue, HIGH_THRESHOLD);
|
||||
priv->jbuf = rtp_jitter_buffer_new ();
|
||||
priv->jbuf_lock = g_mutex_new ();
|
||||
priv->jbuf_cond = g_cond_new ();
|
||||
|
||||
priv->waiting_seqnum = -1;
|
||||
|
||||
|
@ -332,9 +357,9 @@ gst_rtp_jitter_buffer_dispose (GObject * object)
|
|||
GstRTPJitterBuffer *jitterbuffer;
|
||||
|
||||
jitterbuffer = GST_RTP_JITTER_BUFFER (object);
|
||||
if (jitterbuffer->priv->queue) {
|
||||
async_jitter_queue_unref (jitterbuffer->priv->queue);
|
||||
jitterbuffer->priv->queue = NULL;
|
||||
if (jitterbuffer->priv->jbuf) {
|
||||
g_object_unref (jitterbuffer->priv->jbuf);
|
||||
jitterbuffer->priv->jbuf = NULL;
|
||||
}
|
||||
|
||||
G_OBJECT_CLASS (parent_class)->dispose (object);
|
||||
|
@ -430,9 +455,6 @@ gst_jitter_buffer_sink_parse_caps (GstRTPJitterBuffer * jitterbuffer,
|
|||
} else
|
||||
priv->next_seqnum = -1;
|
||||
|
||||
async_jitter_queue_set_max_queue_length (priv->queue,
|
||||
priv->latency_ms * priv->clock_rate / 1000);
|
||||
|
||||
return TRUE;
|
||||
|
||||
/* ERRORS */
|
||||
|
@ -469,15 +491,6 @@ gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
|
|||
return res;
|
||||
}
|
||||
|
||||
static void
|
||||
free_func (gpointer data, GstRTPJitterBuffer * user_data)
|
||||
{
|
||||
if (GST_IS_BUFFER (data))
|
||||
gst_buffer_unref (GST_BUFFER_CAST (data));
|
||||
else
|
||||
gst_event_unref (GST_EVENT_CAST (data));
|
||||
}
|
||||
|
||||
static void
|
||||
gst_rtp_jitter_buffer_flush_start (GstRTPJitterBuffer * jitterbuffer)
|
||||
{
|
||||
|
@ -485,19 +498,18 @@ gst_rtp_jitter_buffer_flush_start (GstRTPJitterBuffer * jitterbuffer)
|
|||
|
||||
priv = jitterbuffer->priv;
|
||||
|
||||
async_jitter_queue_lock (priv->queue);
|
||||
JBUF_LOCK (priv);
|
||||
/* mark ourselves as flushing */
|
||||
priv->srcresult = GST_FLOW_WRONG_STATE;
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
|
||||
/* this unblocks any waiting pops on the src pad task */
|
||||
async_jitter_queue_set_flushing_unlocked (jitterbuffer->priv->queue,
|
||||
(GFunc) free_func, jitterbuffer);
|
||||
JBUF_SIGNAL (priv);
|
||||
rtp_jitter_buffer_flush (priv->jbuf);
|
||||
/* unlock clock, we just unschedule, the entry will be released by the
|
||||
* locking streaming thread. */
|
||||
if (priv->clock_id)
|
||||
gst_clock_id_unschedule (priv->clock_id);
|
||||
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
JBUF_UNLOCK (priv);
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -507,7 +519,7 @@ gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer)
|
|||
|
||||
priv = jitterbuffer->priv;
|
||||
|
||||
async_jitter_queue_lock (priv->queue);
|
||||
JBUF_LOCK (priv);
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
|
||||
/* Mark as non flushing */
|
||||
priv->srcresult = GST_FLOW_OK;
|
||||
|
@ -515,9 +527,8 @@ gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer)
|
|||
priv->last_popped_seqnum = -1;
|
||||
priv->next_seqnum = -1;
|
||||
priv->clock_rate = -1;
|
||||
/* allow pops from the src pad task */
|
||||
async_jitter_queue_unset_flushing_unlocked (jitterbuffer->priv->queue);
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
priv->eos = FALSE;
|
||||
JBUF_UNLOCK (priv);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
|
@ -566,21 +577,20 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
|
|||
case GST_STATE_CHANGE_NULL_TO_READY:
|
||||
break;
|
||||
case GST_STATE_CHANGE_READY_TO_PAUSED:
|
||||
async_jitter_queue_lock (priv->queue);
|
||||
JBUF_LOCK (priv);
|
||||
/* reset negotiated values */
|
||||
priv->clock_rate = -1;
|
||||
priv->clock_base = -1;
|
||||
/* block until we go to PLAYING */
|
||||
async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
|
||||
TRUE);
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
priv->blocked = TRUE;
|
||||
JBUF_UNLOCK (priv);
|
||||
break;
|
||||
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
|
||||
async_jitter_queue_lock (priv->queue);
|
||||
JBUF_LOCK (priv);
|
||||
/* unblock to allow streaming in PLAYING */
|
||||
async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
|
||||
FALSE);
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
priv->blocked = FALSE;
|
||||
JBUF_SIGNAL (priv);
|
||||
JBUF_UNLOCK (priv);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
@ -596,11 +606,10 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
|
|||
ret = GST_STATE_CHANGE_NO_PREROLL;
|
||||
break;
|
||||
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
|
||||
async_jitter_queue_lock (priv->queue);
|
||||
JBUF_LOCK (priv);
|
||||
/* block to stop streaming when PAUSED */
|
||||
async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
|
||||
TRUE);
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
priv->blocked = TRUE;
|
||||
JBUF_UNLOCK (priv);
|
||||
if (ret != GST_STATE_CHANGE_FAILURE)
|
||||
ret = GST_STATE_CHANGE_NO_PREROLL;
|
||||
break;
|
||||
|
@ -630,30 +639,6 @@ priv_compare_rtp_seq_lt (guint16 a, guint16 b)
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* gets the seqnum from the buffers and compare them
|
||||
*/
|
||||
static gint
|
||||
compare_rtp_buffers_seq_num (GstBuffer * a, GstBuffer * b)
|
||||
{
|
||||
gint ret;
|
||||
|
||||
if (GST_IS_BUFFER (a) && GST_IS_BUFFER (b)) {
|
||||
/* two buffers */
|
||||
ret = priv_compare_rtp_seq_lt
|
||||
(gst_rtp_buffer_get_seq (GST_BUFFER_CAST (a)),
|
||||
gst_rtp_buffer_get_seq (GST_BUFFER_CAST (b)));
|
||||
} else {
|
||||
/* one of them is an event, the event always goes before the other element
|
||||
* so we return -1. */
|
||||
if (GST_IS_EVENT (a))
|
||||
ret = -1;
|
||||
else
|
||||
ret = 1;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
|
||||
{
|
||||
|
@ -707,16 +692,20 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
|
|||
case GST_EVENT_EOS:
|
||||
{
|
||||
/* push EOS in queue. We always push it at the head */
|
||||
async_jitter_queue_lock (priv->queue);
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
|
||||
JBUF_LOCK (priv);
|
||||
/* check for flushing, we need to discard the event and return FALSE when
|
||||
* we are flushing */
|
||||
ret = priv->srcresult == GST_FLOW_OK;
|
||||
if (ret)
|
||||
async_jitter_queue_push_unlocked (priv->queue, event);
|
||||
else
|
||||
if (ret) {
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
|
||||
priv->eos = TRUE;
|
||||
JBUF_SIGNAL (priv);
|
||||
} else {
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
|
||||
gst_flow_get_name (priv->srcresult));
|
||||
gst_event_unref (event);
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
}
|
||||
JBUF_UNLOCK (priv);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -780,7 +769,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
|||
GstRTPJitterBuffer *jitterbuffer;
|
||||
GstRTPJitterBufferPrivate *priv;
|
||||
guint16 seqnum;
|
||||
GstFlowReturn ret;
|
||||
GstFlowReturn ret = GST_FLOW_OK;
|
||||
|
||||
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
|
||||
|
||||
|
@ -803,10 +792,10 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
|||
seqnum = gst_rtp_buffer_get_seq (buffer);
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum);
|
||||
|
||||
async_jitter_queue_lock (priv->queue);
|
||||
ret = priv->srcresult;
|
||||
if (ret != GST_FLOW_OK)
|
||||
goto out_flushing;
|
||||
JBUF_LOCK_CHECK (priv, out_flushing);
|
||||
/* don't accept more data on EOS */
|
||||
if (priv->eos)
|
||||
goto have_eos;
|
||||
|
||||
/* let's check if this buffer is too late, we cannot accept packets with
|
||||
* bigger seqnum than the one we already pushed. */
|
||||
|
@ -818,14 +807,18 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
|||
/* let's drop oldest packet if the queue is already full and drop-on-latency
|
||||
* is set. */
|
||||
if (priv->drop_on_latency) {
|
||||
if (async_jitter_queue_length_ts_units_unlocked (priv->queue) >=
|
||||
priv->latency_ms * priv->clock_rate / 1000) {
|
||||
guint64 latency_ts;
|
||||
|
||||
latency_ts =
|
||||
gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
|
||||
|
||||
if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) {
|
||||
GstBuffer *old_buf;
|
||||
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
|
||||
seqnum);
|
||||
|
||||
old_buf = async_jitter_queue_pop_unlocked (priv->queue);
|
||||
old_buf = rtp_jitter_buffer_pop (priv->jbuf);
|
||||
gst_buffer_unref (old_buf);
|
||||
}
|
||||
}
|
||||
|
@ -833,10 +826,12 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
|||
/* now insert the packet into the queue in sorted order. This function returns
|
||||
* FALSE if a packet with the same seqnum was already in the queue, meaning we
|
||||
* have a duplicate. */
|
||||
if (!async_jitter_queue_push_sorted_unlocked (priv->queue, buffer,
|
||||
(GCompareDataFunc) compare_rtp_buffers_seq_num, NULL))
|
||||
if (!rtp_jitter_buffer_insert (priv->jbuf, buffer))
|
||||
goto duplicate;
|
||||
|
||||
/* signal addition of new buffer */
|
||||
JBUF_SIGNAL (priv);
|
||||
|
||||
/* let's unschedule and unblock any waiting buffers. We only want to do this
|
||||
* if there is a currently waiting newer (> seqnum) buffer */
|
||||
if (priv->clock_id) {
|
||||
|
@ -846,11 +841,11 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
|
|||
}
|
||||
}
|
||||
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d on queue %d",
|
||||
seqnum, async_jitter_queue_length_unlocked (priv->queue));
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
|
||||
seqnum, rtp_jitter_buffer_num_packets (priv->jbuf));
|
||||
|
||||
finished:
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
JBUF_UNLOCK (priv);
|
||||
|
||||
gst_object_unref (jitterbuffer);
|
||||
|
||||
|
@ -875,10 +870,18 @@ not_negotiated:
|
|||
}
|
||||
out_flushing:
|
||||
{
|
||||
ret = priv->srcresult;
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
|
||||
gst_buffer_unref (buffer);
|
||||
goto finished;
|
||||
}
|
||||
have_eos:
|
||||
{
|
||||
ret = GST_FLOW_UNEXPECTED;
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
|
||||
gst_buffer_unref (buffer);
|
||||
goto finished;
|
||||
}
|
||||
too_late:
|
||||
{
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
|
||||
|
@ -908,8 +911,7 @@ static void
|
|||
gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer)
|
||||
{
|
||||
GstRTPJitterBufferPrivate *priv;
|
||||
gpointer elem;
|
||||
GstBuffer *outbuf;
|
||||
GstBuffer *outbuf = NULL;
|
||||
GstFlowReturn result;
|
||||
guint16 seqnum;
|
||||
guint32 rtp_time;
|
||||
|
@ -918,44 +920,24 @@ gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer)
|
|||
|
||||
priv = jitterbuffer->priv;
|
||||
|
||||
async_jitter_queue_lock (priv->queue);
|
||||
JBUF_LOCK_CHECK (priv, flushing);
|
||||
again:
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Popping item");
|
||||
/* pop a buffer, we will get NULL if the queue was shut down */
|
||||
elem = async_jitter_queue_pop_unlocked (priv->queue);
|
||||
if (!elem)
|
||||
goto no_elem;
|
||||
|
||||
/* special code for events */
|
||||
if (G_UNLIKELY (GST_IS_EVENT (elem))) {
|
||||
GstEvent *event = GST_EVENT_CAST (elem);
|
||||
|
||||
switch (GST_EVENT_TYPE (event)) {
|
||||
case GST_EVENT_EOS:
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Popped EOS from queue");
|
||||
/* we don't expect more data now, makes upstream perform EOS actions */
|
||||
priv->srcresult = GST_FLOW_UNEXPECTED;
|
||||
break;
|
||||
default:
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Popped event %s from queue",
|
||||
GST_EVENT_TYPE_NAME (event));
|
||||
break;
|
||||
}
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
|
||||
/* push event */
|
||||
gst_pad_push_event (priv->srcpad, event);
|
||||
return;
|
||||
/* wait if we are blocked or don't have a packet and eos */
|
||||
while (priv->blocked || !(rtp_jitter_buffer_num_packets (priv->jbuf)
|
||||
|| priv->eos)) {
|
||||
JBUF_WAIT_CHECK (priv, flushing);
|
||||
}
|
||||
if (priv->eos)
|
||||
goto do_eos;
|
||||
|
||||
/* we know it's a buffer now */
|
||||
outbuf = GST_BUFFER_CAST (elem);
|
||||
/* pop a buffer, we must have a buffer now */
|
||||
outbuf = rtp_jitter_buffer_pop (priv->jbuf);
|
||||
|
||||
seqnum = gst_rtp_buffer_get_seq (outbuf);
|
||||
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d from queue %d",
|
||||
gst_rtp_buffer_get_seq (outbuf),
|
||||
async_jitter_queue_length_unlocked (priv->queue));
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d, now %d left",
|
||||
seqnum, rtp_jitter_buffer_num_packets (priv->jbuf));
|
||||
|
||||
/* If we don't know what the next seqnum should be (== -1) we have to wait
|
||||
* because it might be possible that we are not receiving this buffer in-order,
|
||||
|
@ -1032,11 +1014,11 @@ again:
|
|||
GST_OBJECT_UNLOCK (jitterbuffer);
|
||||
|
||||
/* release the lock so that the other end can push stuff or unlock */
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
JBUF_UNLOCK (priv);
|
||||
|
||||
ret = gst_clock_id_wait (id, &jitter);
|
||||
|
||||
async_jitter_queue_lock (priv->queue);
|
||||
JBUF_LOCK (priv);
|
||||
/* and free the entry */
|
||||
gst_clock_id_unref (id);
|
||||
priv->clock_id = NULL;
|
||||
|
@ -1054,8 +1036,7 @@ again:
|
|||
GST_DEBUG_OBJECT (jitterbuffer,
|
||||
"Wait got unscheduled, will retry to push with new buffer");
|
||||
/* reinserting popped buffer into queue */
|
||||
if (!async_jitter_queue_push_sorted_unlocked (priv->queue, outbuf,
|
||||
(GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) {
|
||||
if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf)) {
|
||||
GST_DEBUG_OBJECT (jitterbuffer,
|
||||
"Duplicate packet #%d detected, dropping", seqnum);
|
||||
priv->num_duplicates++;
|
||||
|
@ -1087,7 +1068,7 @@ push_buffer:
|
|||
* so the other end can push stuff in the queue again. */
|
||||
priv->last_popped_seqnum = seqnum;
|
||||
priv->next_seqnum = (seqnum + 1) & 0xffff;
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
JBUF_UNLOCK (priv);
|
||||
|
||||
/* push buffer */
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d", seqnum);
|
||||
|
@ -1098,20 +1079,22 @@ push_buffer:
|
|||
return;
|
||||
|
||||
/* ERRORS */
|
||||
no_elem:
|
||||
do_eos:
|
||||
{
|
||||
/* store result, we are flushing now */
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "Pop returned NULL, we're flushing");
|
||||
priv->srcresult = GST_FLOW_WRONG_STATE;
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
|
||||
priv->srcresult = GST_FLOW_UNEXPECTED;
|
||||
gst_pad_pause_task (priv->srcpad);
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
|
||||
JBUF_UNLOCK (priv);
|
||||
return;
|
||||
}
|
||||
flushing:
|
||||
{
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
|
||||
gst_buffer_unref (outbuf);
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
if (outbuf)
|
||||
gst_buffer_unref (outbuf);
|
||||
JBUF_UNLOCK (priv);
|
||||
return;
|
||||
}
|
||||
pause:
|
||||
|
@ -1120,13 +1103,13 @@ pause:
|
|||
|
||||
GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
|
||||
|
||||
async_jitter_queue_lock (priv->queue);
|
||||
JBUF_LOCK (priv);
|
||||
/* store result */
|
||||
priv->srcresult = result;
|
||||
/* we don't post errors or anything because upstream will do that for us
|
||||
* when we pass the return value upstream. */
|
||||
gst_pad_pause_task (priv->srcpad);
|
||||
async_jitter_queue_unlock (priv->queue);
|
||||
JBUF_UNLOCK (priv);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -1194,11 +1177,7 @@ gst_rtp_jitter_buffer_set_property (GObject * object,
|
|||
old_latency = jitterbuffer->priv->latency_ms;
|
||||
|
||||
jitterbuffer->priv->latency_ms = new_latency;
|
||||
if (jitterbuffer->priv->clock_rate != -1) {
|
||||
async_jitter_queue_set_max_queue_length (jitterbuffer->priv->queue,
|
||||
gst_util_uint64_scale_int (new_latency,
|
||||
jitterbuffer->priv->clock_rate, 1000));
|
||||
}
|
||||
|
||||
/* post message if latency changed, this will infor the parent pipeline
|
||||
* that a latency reconfiguration is possible. */
|
||||
if (new_latency != old_latency) {
|
||||
|
|
|
@ -202,6 +202,13 @@ enum
|
|||
{
|
||||
SIGNAL_REQUEST_PT_MAP,
|
||||
SIGNAL_CLEAR_PT_MAP,
|
||||
|
||||
SIGNAL_ON_NEW_SSRC,
|
||||
SIGNAL_ON_SSRC_COLLISION,
|
||||
SIGNAL_ON_SSRC_VALIDATED,
|
||||
SIGNAL_ON_BYE_SSRC,
|
||||
SIGNAL_ON_BYE_TIMEOUT,
|
||||
SIGNAL_ON_TIMEOUT,
|
||||
LAST_SIGNAL
|
||||
};
|
||||
|
||||
|
@ -266,6 +273,48 @@ static void gst_rtp_session_clear_pt_map (GstRTPSession * rtpsession);
|
|||
|
||||
static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
|
||||
|
||||
static void
|
||||
on_new_ssrc (RTPSession * session, RTPSource * src, GstRTPSession * sess)
|
||||
{
|
||||
g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0,
|
||||
src->ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_ssrc_collision (RTPSession * session, RTPSource * src, GstRTPSession * sess)
|
||||
{
|
||||
g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0,
|
||||
src->ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_ssrc_validated (RTPSession * session, RTPSource * src, GstRTPSession * sess)
|
||||
{
|
||||
g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
|
||||
src->ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_bye_ssrc (RTPSession * session, RTPSource * src, GstRTPSession * sess)
|
||||
{
|
||||
g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0,
|
||||
src->ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_bye_timeout (RTPSession * session, RTPSource * src, GstRTPSession * sess)
|
||||
{
|
||||
g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
|
||||
src->ssrc);
|
||||
}
|
||||
|
||||
static void
|
||||
on_timeout (RTPSession * session, RTPSource * src, GstRTPSession * sess)
|
||||
{
|
||||
g_signal_emit (sess, gst_rtp_session_signals[SIGNAL_ON_TIMEOUT], 0,
|
||||
src->ssrc);
|
||||
}
|
||||
|
||||
GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
|
||||
|
||||
static void
|
||||
|
@ -332,6 +381,76 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass)
|
|||
G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRTPSessionClass, clear_pt_map),
|
||||
NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
|
||||
|
||||
/**
|
||||
* GstRTPSession::on-new-ssrc:
|
||||
* @sess: the object which received the signal
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of a new SSRC that entered @session.
|
||||
*/
|
||||
gst_rtp_session_signals[SIGNAL_ON_NEW_SSRC] =
|
||||
g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_new_ssrc),
|
||||
NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
|
||||
/**
|
||||
* GstRTPSession::on-ssrc_collision:
|
||||
* @sess: the object which received the signal
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify when we have an SSRC collision
|
||||
*/
|
||||
gst_rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] =
|
||||
g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass,
|
||||
on_ssrc_collision), NULL, NULL, g_cclosure_marshal_VOID__UINT,
|
||||
G_TYPE_NONE, 1, G_TYPE_UINT);
|
||||
/**
|
||||
* GstRTPSession::on-ssrc_validated:
|
||||
* @sess: the object which received the signal
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of a new SSRC that became validated.
|
||||
*/
|
||||
gst_rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] =
|
||||
g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass,
|
||||
on_ssrc_validated), NULL, NULL, g_cclosure_marshal_VOID__UINT,
|
||||
G_TYPE_NONE, 1, G_TYPE_UINT);
|
||||
|
||||
/**
|
||||
* GstRTPSession::on-bye-ssrc:
|
||||
* @sess: the object which received the signal
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of an SSRC that became inactive because of a BYE packet.
|
||||
*/
|
||||
gst_rtp_session_signals[SIGNAL_ON_BYE_SSRC] =
|
||||
g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_bye_ssrc),
|
||||
NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
|
||||
/**
|
||||
* GstRTPSession::on-bye-timeout:
|
||||
* @sess: the object which received the signal
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of an SSRC that has timed out because of BYE
|
||||
*/
|
||||
gst_rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
|
||||
g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_bye_timeout),
|
||||
NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
|
||||
/**
|
||||
* GstRTPSession::on-timeout:
|
||||
* @sess: the object which received the signal
|
||||
* @ssrc: the SSRC
|
||||
*
|
||||
* Notify of an SSRC that has timed out
|
||||
*/
|
||||
gst_rtp_session_signals[SIGNAL_ON_TIMEOUT] =
|
||||
g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
|
||||
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, on_timeout),
|
||||
NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
|
||||
|
||||
gstelement_class->change_state =
|
||||
GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
|
||||
gstelement_class->request_new_pad =
|
||||
|
@ -353,6 +472,19 @@ gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass)
|
|||
rtpsession->priv->session = rtp_session_new ();
|
||||
/* configure callbacks */
|
||||
rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
|
||||
/* configure signals */
|
||||
g_signal_connect (rtpsession->priv->session, "on-new-ssrc",
|
||||
(GCallback) on_new_ssrc, rtpsession);
|
||||
g_signal_connect (rtpsession->priv->session, "on-ssrc-collision",
|
||||
(GCallback) on_ssrc_collision, rtpsession);
|
||||
g_signal_connect (rtpsession->priv->session, "on-ssrc-validated",
|
||||
(GCallback) on_ssrc_validated, rtpsession);
|
||||
g_signal_connect (rtpsession->priv->session, "on-bye-ssrc",
|
||||
(GCallback) on_bye_ssrc, rtpsession);
|
||||
g_signal_connect (rtpsession->priv->session, "on-bye-timeout",
|
||||
(GCallback) on_bye_timeout, rtpsession);
|
||||
g_signal_connect (rtpsession->priv->session, "on-timeout",
|
||||
(GCallback) on_timeout, rtpsession);
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
|
@ -59,8 +59,14 @@ struct _GstRTPSessionClass {
|
|||
|
||||
/* signals */
|
||||
GstCaps* (*request_pt_map) (GstRTPSession *sess, guint pt);
|
||||
|
||||
void (*clear_pt_map) (GstRTPSession *sess);
|
||||
|
||||
void (*on_new_ssrc) (GstRTPSession *sess, guint32 ssrc);
|
||||
void (*on_ssrc_collision) (GstRTPSession *sess, guint32 ssrc);
|
||||
void (*on_ssrc_validated) (GstRTPSession *sess, guint32 ssrc);
|
||||
void (*on_bye_ssrc) (GstRTPSession *sess, guint32 ssrc);
|
||||
void (*on_bye_timeout) (GstRTPSession *sess, guint32 ssrc);
|
||||
void (*on_timeout) (GstRTPSession *sess, guint32 ssrc);
|
||||
};
|
||||
|
||||
GType gst_rtp_session_get_type (void);
|
||||
|
|
237
gst/rtpmanager/rtpjitterbuffer.c
Normal file
237
gst/rtpmanager/rtpjitterbuffer.c
Normal file
|
@ -0,0 +1,237 @@
|
|||
/* GStreamer
|
||||
* Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Library General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Library General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Library General Public
|
||||
* License along with this library; if not, write to the
|
||||
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
|
||||
* Boston, MA 02111-1307, USA.
|
||||
*/
|
||||
#include <string.h>
|
||||
|
||||
#include <gst/rtp/gstrtpbuffer.h>
|
||||
#include <gst/rtp/gstrtcpbuffer.h>
|
||||
|
||||
#include "rtpjitterbuffer.h"
|
||||
|
||||
GST_DEBUG_CATEGORY_STATIC (rtp_jitter_buffer_debug);
|
||||
#define GST_CAT_DEFAULT rtp_jitter_buffer_debug
|
||||
|
||||
/* signals and args */
|
||||
enum
|
||||
{
|
||||
LAST_SIGNAL
|
||||
};
|
||||
|
||||
enum
|
||||
{
|
||||
PROP_0
|
||||
};
|
||||
|
||||
/* GObject vmethods */
|
||||
static void rtp_jitter_buffer_finalize (GObject * object);
|
||||
|
||||
/* static guint rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 }; */
|
||||
|
||||
G_DEFINE_TYPE (RTPJitterBuffer, rtp_jitter_buffer, G_TYPE_OBJECT);
|
||||
|
||||
static void
|
||||
rtp_jitter_buffer_class_init (RTPJitterBufferClass * klass)
|
||||
{
|
||||
GObjectClass *gobject_class;
|
||||
|
||||
gobject_class = (GObjectClass *) klass;
|
||||
|
||||
gobject_class->finalize = rtp_jitter_buffer_finalize;
|
||||
|
||||
GST_DEBUG_CATEGORY_INIT (rtp_jitter_buffer_debug, "rtpjitterbuffer", 0,
|
||||
"RTP Jitter Buffer");
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_jitter_buffer_init (RTPJitterBuffer * jbuf)
|
||||
{
|
||||
jbuf->packets = g_queue_new ();
|
||||
}
|
||||
|
||||
static void
|
||||
rtp_jitter_buffer_finalize (GObject * object)
|
||||
{
|
||||
RTPJitterBuffer *jbuf;
|
||||
|
||||
jbuf = RTP_JITTER_BUFFER_CAST (object);
|
||||
|
||||
rtp_jitter_buffer_flush (jbuf);
|
||||
g_queue_free (jbuf->packets);
|
||||
|
||||
G_OBJECT_CLASS (rtp_jitter_buffer_parent_class)->finalize (object);
|
||||
}
|
||||
|
||||
/**
|
||||
* rtp_jitter_buffer_new:
|
||||
*
|
||||
* Create an #RTPJitterBuffer.
|
||||
*
|
||||
* Returns: a new #RTPJitterBuffer. Use g_object_unref() after usage.
|
||||
*/
|
||||
RTPJitterBuffer *
|
||||
rtp_jitter_buffer_new (void)
|
||||
{
|
||||
RTPJitterBuffer *jbuf;
|
||||
|
||||
jbuf = g_object_new (RTP_TYPE_JITTER_BUFFER, NULL);
|
||||
|
||||
return jbuf;
|
||||
}
|
||||
|
||||
static gint
|
||||
compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf)
|
||||
{
|
||||
guint16 seq1, seq2;
|
||||
|
||||
seq1 = gst_rtp_buffer_get_seq (a);
|
||||
seq2 = gst_rtp_buffer_get_seq (b);
|
||||
|
||||
/* check if diff more than half of the 16bit range */
|
||||
if (abs (seq2 - seq1) > (1 << 15)) {
|
||||
/* one of a/b has wrapped */
|
||||
return seq1 - seq2;
|
||||
} else {
|
||||
return seq2 - seq1;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* rtp_jitter_buffer_insert:
|
||||
* @jbuf: an #RTPJitterBuffer
|
||||
* @buf: a buffer
|
||||
*
|
||||
* Inserts @buf into the packet queue of @jbuf. The sequence number of the
|
||||
* packet will be used to sort the packets. This function takes ownerhip of
|
||||
* @buf when the function returns %TRUE.
|
||||
*
|
||||
* Returns: %FALSE if a packet with the same number already existed.
|
||||
*/
|
||||
gboolean
|
||||
rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf)
|
||||
{
|
||||
GList *list;
|
||||
gint func_ret = 1;
|
||||
|
||||
g_return_val_if_fail (jbuf != NULL, FALSE);
|
||||
g_return_val_if_fail (buf != NULL, FALSE);
|
||||
|
||||
/* loop the list to skip strictly smaller seqnum buffers */
|
||||
list = jbuf->packets->head;
|
||||
while (list
|
||||
&& (func_ret =
|
||||
compare_seqnum (GST_BUFFER_CAST (list->data), buf, jbuf)) < 0)
|
||||
list = list->next;
|
||||
|
||||
/* we hit a packet with the same seqnum, return FALSE to notify a duplicate */
|
||||
if (func_ret == 0)
|
||||
return FALSE;
|
||||
|
||||
if (list)
|
||||
g_queue_insert_before (jbuf->packets, list, buf);
|
||||
else
|
||||
g_queue_push_tail (jbuf->packets, buf);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
/**
|
||||
* rtp_jitter_buffer_pop:
|
||||
* @jbuf: an #RTPJitterBuffer
|
||||
*
|
||||
* Pops the oldest buffer from the packet queue of @jbuf.
|
||||
*
|
||||
* Returns: a #GstBuffer or %NULL when there was no packet in the queue.
|
||||
*/
|
||||
GstBuffer *
|
||||
rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf)
|
||||
{
|
||||
GstBuffer *buf;
|
||||
|
||||
g_return_val_if_fail (jbuf != NULL, FALSE);
|
||||
|
||||
buf = g_queue_pop_tail (jbuf->packets);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* rtp_jitter_buffer_flush:
|
||||
* @jbuf: an #RTPJitterBuffer
|
||||
*
|
||||
* Flush all packets from the jitterbuffer.
|
||||
*/
|
||||
void
|
||||
rtp_jitter_buffer_flush (RTPJitterBuffer * jbuf)
|
||||
{
|
||||
GstBuffer *buffer;
|
||||
|
||||
g_return_if_fail (jbuf != NULL);
|
||||
|
||||
while ((buffer = g_queue_pop_head (jbuf->packets)))
|
||||
gst_buffer_unref (buffer);
|
||||
}
|
||||
|
||||
/**
|
||||
* rtp_jitter_buffer_num_packets:
|
||||
* @jbuf: an #RTPJitterBuffer
|
||||
*
|
||||
* Get the number of packets currently in "jbuf.
|
||||
*
|
||||
* Returns: The number of packets in @jbuf.
|
||||
*/
|
||||
guint
|
||||
rtp_jitter_buffer_num_packets (RTPJitterBuffer * jbuf)
|
||||
{
|
||||
g_return_val_if_fail (jbuf != NULL, 0);
|
||||
|
||||
return jbuf->packets->length;
|
||||
}
|
||||
|
||||
/**
|
||||
* rtp_jitter_buffer_get_ts_diff:
|
||||
* @jbuf: an #RTPJitterBuffer
|
||||
*
|
||||
* Get the difference between the timestamps of first and last packet in the
|
||||
* jitterbuffer.
|
||||
*
|
||||
* Returns: The difference expressed in the timestamp units of the packets.
|
||||
*/
|
||||
guint32
|
||||
rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf)
|
||||
{
|
||||
guint32 high_ts, low_ts;
|
||||
GstBuffer *high_buf, *low_buf;
|
||||
|
||||
g_return_val_if_fail (jbuf != NULL, 0);
|
||||
|
||||
high_buf = g_queue_peek_head (jbuf->packets);
|
||||
low_buf = g_queue_peek_tail (jbuf->packets);
|
||||
|
||||
if (!high_buf || !low_buf || high_buf == low_buf)
|
||||
return 0;
|
||||
|
||||
high_ts = gst_rtp_buffer_get_timestamp (high_buf);
|
||||
low_ts = gst_rtp_buffer_get_timestamp (low_buf);
|
||||
|
||||
/* it needs to work if ts wraps */
|
||||
if (high_ts >= low_ts) {
|
||||
return high_ts - low_ts;
|
||||
} else {
|
||||
return high_ts + G_MAXUINT32 + 1 - low_ts;
|
||||
}
|
||||
}
|
67
gst/rtpmanager/rtpjitterbuffer.h
Normal file
67
gst/rtpmanager/rtpjitterbuffer.h
Normal file
|
@ -0,0 +1,67 @@
|
|||
/* GStreamer
|
||||
* Copyright (C) <2007> Wim Taymans <wim@fluendo.com>
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Library General Public
|
||||
* License as published by the Free Software Foundation; either
|
||||
* version 2 of the License, or (at your option) any later version.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Library General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Library General Public
|
||||
* License along with this library; if not, write to the
|
||||
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
|
||||
* Boston, MA 02111-1307, USA.
|
||||
*/
|
||||
|
||||
#ifndef __RTP_JITTER_BUFFER_H__
|
||||
#define __RTP_JITTER_BUFFER_H__
|
||||
|
||||
#include <gst/gst.h>
|
||||
#include <gst/rtp/gstrtcpbuffer.h>
|
||||
#include <gst/netbuffer/gstnetbuffer.h>
|
||||
|
||||
typedef struct _RTPJitterBuffer RTPJitterBuffer;
|
||||
typedef struct _RTPJitterBufferClass RTPJitterBufferClass;
|
||||
|
||||
#define RTP_TYPE_JITTER_BUFFER (rtp_jitter_buffer_get_type())
|
||||
#define RTP_JITTER_BUFFER(src) (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_JITTER_BUFFER,RTPJitterBuffer))
|
||||
#define RTP_JITTER_BUFFER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_JITTER_BUFFER,RTPJitterBufferClass))
|
||||
#define RTP_IS_JITTER_BUFFER(src) (G_TYPE_CHECK_INSTANCE_TYPE((src),RTP_TYPE_JITTER_BUFFER))
|
||||
#define RTP_IS_JITTER_BUFFER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_JITTER_BUFFER))
|
||||
#define RTP_JITTER_BUFFER_CAST(src) ((RTPJitterBuffer *)(src))
|
||||
|
||||
/**
|
||||
* RTPJitterBuffer:
|
||||
*
|
||||
* A JitterBuffer in the #RTPSession
|
||||
*/
|
||||
struct _RTPJitterBuffer {
|
||||
GObject object;
|
||||
|
||||
GQueue *packets;
|
||||
};
|
||||
|
||||
struct _RTPJitterBufferClass {
|
||||
GObjectClass parent_class;
|
||||
};
|
||||
|
||||
GType rtp_jitter_buffer_get_type (void);
|
||||
|
||||
/* managing lifetime */
|
||||
RTPJitterBuffer* rtp_jitter_buffer_new (void);
|
||||
|
||||
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf);
|
||||
GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf);
|
||||
|
||||
void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf);
|
||||
|
||||
guint rtp_jitter_buffer_num_packets (RTPJitterBuffer *jbuf);
|
||||
guint32 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer *jbuf);
|
||||
|
||||
|
||||
|
||||
#endif /* __RTP_JITTER_BUFFER_H__ */
|
|
@ -271,7 +271,6 @@ on_ssrc_validated (RTPSession * sess, RTPSource * source)
|
|||
static void
|
||||
on_bye_ssrc (RTPSession * sess, RTPSource * source)
|
||||
{
|
||||
/* notify app that reconsideration should be performed */
|
||||
g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
|
||||
}
|
||||
|
||||
|
@ -1724,7 +1723,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
|
|||
on_bye_timeout (sess, source);
|
||||
else
|
||||
on_timeout (sess, source);
|
||||
|
||||
}
|
||||
return remove;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue