rtpjitterbuffer: Add RFC7273 media clock handling

https://bugzilla.gnome.org/show_bug.cgi?id=762259
This commit is contained in:
Sebastian Dröge 2016-01-05 16:15:16 +02:00
parent fd7964e746
commit df247f091c
7 changed files with 499 additions and 88 deletions

View file

@ -298,6 +298,7 @@ enum
#define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
#define DEFAULT_MAX_DROPOUT_TIME 60000
#define DEFAULT_MAX_MISORDER_TIME 2000
#define DEFAULT_RFC7273_SYNC FALSE
enum
{
@ -320,7 +321,8 @@ enum
PROP_RTCP_SYNC_SEND_TIME,
PROP_MAX_RTCP_RTP_TIME_DIFF,
PROP_MAX_DROPOUT_TIME,
PROP_MAX_MISORDER_TIME
PROP_MAX_MISORDER_TIME,
PROP_RFC7273_SYNC
};
#define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
@ -1621,6 +1623,7 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
rtpbin->max_rtcp_rtp_time_diff, NULL);
g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time,
"max-misorder-time", rtpbin->max_misorder_time, NULL);
g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL);
g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0,
buffer, session->id, ssrc);
@ -2314,6 +2317,12 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
"Synchronize received streams to the RFC7273 clock "
"(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
@ -2383,6 +2392,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin)
rtpbin->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
rtpbin->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC;
/* some default SDES entries */
cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
@ -2599,6 +2609,11 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id,
gst_rtp_bin_propagate_property_to_session (rtpbin, "max-dropout-time",
value);
break;
case PROP_RFC7273_SYNC:
rtpbin->rfc7273_sync = g_value_get_boolean (value);
gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
"rfc7273-sync", value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -2681,6 +2696,8 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id,
case PROP_MAX_MISORDER_TIME:
g_value_set_uint (value, rtpbin->max_misorder_time);
break;
case PROP_RFC7273_SYNC:
g_value_set_boolean (value, rtpbin->rfc7273_sync);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);

View file

@ -73,6 +73,7 @@ struct _GstRtpBin {
gint max_rtcp_rtp_time_diff;
guint32 max_dropout_time;
guint32 max_misorder_time;
gboolean rfc7273_sync;
/* a list of session */
GSList *sessions;

View file

@ -100,8 +100,10 @@
#endif
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <gst/net/net.h>
#include "gstrtpjitterbuffer.h"
#include "rtpjitterbuffer.h"
@ -141,6 +143,7 @@ enum
#define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
#define DEFAULT_MAX_DROPOUT_TIME 60000
#define DEFAULT_MAX_MISORDER_TIME 2000
#define DEFAULT_RFC7273_SYNC FALSE
#define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
#define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
@ -166,7 +169,8 @@ enum
PROP_STATS,
PROP_MAX_RTCP_RTP_TIME_DIFF,
PROP_MAX_DROPOUT_TIME,
PROP_MAX_MISORDER_TIME
PROP_MAX_MISORDER_TIME,
PROP_RFC7273_SYNC
};
#define JBUF_LOCK(priv) (g_mutex_lock (&(priv)->jbuf_lock))
@ -413,6 +417,8 @@ static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
GstPad * pad);
static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
GstClock * clock);
/* pad overrides */
static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
@ -745,6 +751,12 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
"Synchronize received streams to the RFC7273 clock "
"(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstRtpJitterBuffer::request-pt-map:
* @buffer: the object which received the signal
@ -820,6 +832,8 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
gstelement_class->provide_clock =
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
gstelement_class->set_clock =
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
gst_element_class_add_static_pad_template (gstelement_class,
&gst_rtp_jitter_buffer_src_template);
@ -1129,6 +1143,16 @@ gst_rtp_jitter_buffer_provide_clock (GstElement * element)
return gst_system_clock_obtain ();
}
static gboolean
gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
{
GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock);
}
static void
gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
{
@ -1233,6 +1257,7 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
GstStructure *caps_struct;
guint val;
GstClockTime tval;
const gchar *ts_refclk, *mediaclk;
priv = jitterbuffer->priv;
@ -1297,6 +1322,75 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
"npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
GstClock *clock = NULL;
guint64 clock_offset = -1;
GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
ts_refclk);
if (g_str_has_prefix (ts_refclk, "ntp=")) {
if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
} else {
const gchar *host, *portstr;
gchar *hostname;
guint port;
host = ts_refclk + sizeof ("ntp=") - 1;
if (host[0] == '[') {
/* IPv6 */
portstr = strchr (host, ']');
if (portstr && portstr[1] == ':')
portstr = portstr + 1;
else
portstr = NULL;
} else {
portstr = strrchr (host, ':');
}
if (!portstr || sscanf (portstr, ":%u", &port) != 1)
port = 123;
if (portstr)
hostname = g_strndup (host, (portstr - host));
else
hostname = g_strdup (host);
clock = gst_ntp_clock_new (NULL, hostname, port, 0);
g_free (hostname);
}
} else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
const gchar *domainstr =
ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
guint domain;
if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
domain = 0;
clock = gst_ptp_clock_new (NULL, domain);
} else {
GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
}
if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
if (!g_str_has_prefix (mediaclk, "direct=")
|| sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1)
GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
if (strstr (mediaclk, "rate=") != NULL) {
GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
clock_offset = -1;
}
}
rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
} else {
rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
}
return TRUE;
/* ERRORS */
@ -1566,7 +1660,7 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
GST_DEBUG_OBJECT (jitterbuffer, "adding event");
item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
if (head)
JBUF_SIGNAL_EVENT (priv);
@ -2625,7 +2719,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
RTPJitterBufferItem *item;
item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
}
g_list_free (events);
@ -2741,7 +2835,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
&head, &percent)))
&head, &percent,
gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)))))
goto duplicate;
/* update timers */
@ -3311,7 +3406,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
"retry", G_TYPE_UINT, num_rtx_retry, NULL));
item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
/* remove timer now */
remove_timer (jitterbuffer, timer);
@ -3780,7 +3875,7 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
RTP_JITTER_BUFFER_MODE_BUFFER) {
GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
if (head)
JBUF_SIGNAL_EVENT (priv);
JBUF_WAIT_QUERY (priv, out_flushing);
@ -4018,6 +4113,12 @@ gst_rtp_jitter_buffer_set_property (GObject * object,
priv->max_misorder_time = g_value_get_uint (value);
JBUF_UNLOCK (priv);
break;
case PROP_RFC7273_SYNC:
JBUF_LOCK (priv);
rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf,
g_value_get_boolean (value));
JBUF_UNLOCK (priv);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -4138,6 +4239,12 @@ gst_rtp_jitter_buffer_get_property (GObject * object,
g_value_set_uint (value, priv->max_misorder_time);
JBUF_UNLOCK (priv);
break;
case PROP_RFC7273_SYNC:
JBUF_LOCK (priv);
g_value_set_boolean (value,
rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf));
JBUF_UNLOCK (priv);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;

View file

@ -85,6 +85,8 @@ rtp_jitter_buffer_class_init (RTPJitterBufferClass * klass)
static void
rtp_jitter_buffer_init (RTPJitterBuffer * jbuf)
{
g_mutex_init (&jbuf->clock_lock);
jbuf->packets = g_queue_new ();
jbuf->mode = RTP_JITTER_BUFFER_MODE_SLAVE;
@ -98,8 +100,19 @@ rtp_jitter_buffer_finalize (GObject * object)
jbuf = RTP_JITTER_BUFFER_CAST (object);
if (jbuf->media_clock_synced_id)
g_signal_handler_disconnect (jbuf->media_clock,
jbuf->media_clock_synced_id);
if (jbuf->media_clock)
gst_object_unref (jbuf->media_clock);
if (jbuf->pipeline_clock)
gst_object_unref (jbuf->pipeline_clock);
g_queue_free (jbuf->packets);
g_mutex_clear (&jbuf->clock_lock);
G_OBJECT_CLASS (rtp_jitter_buffer_parent_class)->finalize (object);
}
@ -199,6 +212,110 @@ rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer * jbuf)
return jbuf->clock_rate;
}
static void
media_clock_synced_cb (GstClock * clock, gboolean synced,
RTPJitterBuffer * jbuf)
{
GstClockTime internal, external;
g_mutex_lock (&jbuf->clock_lock);
if (jbuf->pipeline_clock) {
internal = gst_clock_get_internal_time (jbuf->media_clock);
external = gst_clock_get_time (jbuf->pipeline_clock);
gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
}
g_mutex_unlock (&jbuf->clock_lock);
}
/**
* rtp_jitter_buffer_set_media_clock:
* @jbuf: an #RTPJitterBuffer
* @clock: (transfer full): media #GstClock
* @clock_offset: RTP time at clock epoch or -1
*
* Sets the media clock for the media and the clock offset
*
*/
void
rtp_jitter_buffer_set_media_clock (RTPJitterBuffer * jbuf, GstClock * clock,
guint64 clock_offset)
{
g_mutex_lock (&jbuf->clock_lock);
if (jbuf->media_clock) {
if (jbuf->media_clock_synced_id)
g_signal_handler_disconnect (jbuf->media_clock,
jbuf->media_clock_synced_id);
jbuf->media_clock_synced_id = 0;
gst_object_unref (jbuf->media_clock);
}
jbuf->media_clock = clock;
jbuf->media_clock_offset = clock_offset;
if (jbuf->pipeline_clock && jbuf->media_clock &&
jbuf->pipeline_clock != jbuf->media_clock) {
jbuf->media_clock_synced_id =
g_signal_connect (jbuf->media_clock, "synced",
G_CALLBACK (media_clock_synced_cb), jbuf);
if (gst_clock_is_synced (jbuf->media_clock)) {
GstClockTime internal, external;
internal = gst_clock_get_internal_time (jbuf->media_clock);
external = gst_clock_get_time (jbuf->pipeline_clock);
gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
}
gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock);
}
g_mutex_unlock (&jbuf->clock_lock);
}
/**
* rtp_jitter_buffer_set_pipeline_clock:
* @jbuf: an #RTPJitterBuffer
* @clock: pipeline #GstClock
*
* Sets the pipeline clock
*
*/
void
rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer * jbuf, GstClock * clock)
{
g_mutex_lock (&jbuf->clock_lock);
if (jbuf->pipeline_clock)
gst_object_unref (jbuf->pipeline_clock);
jbuf->pipeline_clock = clock ? gst_object_ref (clock) : NULL;
if (jbuf->pipeline_clock && jbuf->media_clock &&
jbuf->pipeline_clock != jbuf->media_clock) {
if (gst_clock_is_synced (jbuf->media_clock)) {
GstClockTime internal, external;
internal = gst_clock_get_internal_time (jbuf->media_clock);
external = gst_clock_get_time (jbuf->pipeline_clock);
gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
}
gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock);
}
g_mutex_unlock (&jbuf->clock_lock);
}
gboolean
rtp_jitter_buffer_get_rfc7273_sync (RTPJitterBuffer * jbuf)
{
return jbuf->rfc7273_sync;
}
void
rtp_jitter_buffer_set_rfc7273_sync (RTPJitterBuffer * jbuf,
gboolean rfc7273_sync)
{
jbuf->rfc7273_sync = rfc7273_sync;
}
/**
* rtp_jitter_buffer_reset_skew:
* @jbuf: an #RTPJitterBuffer
@ -211,6 +328,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf)
jbuf->base_time = -1;
jbuf->base_rtptime = -1;
jbuf->base_extrtp = -1;
jbuf->media_clock_base_time = -1;
jbuf->ext_rtptime = -1;
jbuf->last_rtptime = -1;
jbuf->window_pos = 0;
@ -220,6 +338,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf)
jbuf->prev_send_diff = -1;
jbuf->prev_out_time = -1;
jbuf->need_resync = TRUE;
GST_DEBUG ("reset skew correction");
}
@ -241,6 +360,7 @@ rtp_jitter_buffer_resync (RTPJitterBuffer * jbuf, GstClockTime time,
GstClockTime gstrtptime, guint64 ext_rtptime, gboolean reset_skew)
{
jbuf->base_time = time;
jbuf->media_clock_base_time = -1;
jbuf->base_rtptime = gstrtptime;
jbuf->base_extrtp = ext_rtptime;
jbuf->prev_out_time = -1;
@ -406,55 +526,18 @@ update_buffer_level (RTPJitterBuffer * jbuf, gint * percent)
* Returns: @time adjusted with the clock skew.
*/
static GstClockTime
calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time)
calculate_skew (RTPJitterBuffer * jbuf, guint64 ext_rtptime,
GstClockTime gstrtptime, GstClockTime time)
{
guint64 ext_rtptime;
guint64 send_diff, recv_diff;
gint64 delta;
gint64 old;
gint pos, i;
GstClockTime gstrtptime, out_time;
GstClockTime out_time;
guint64 slope;
ext_rtptime = gst_rtp_buffer_ext_timestamp (&jbuf->ext_rtptime, rtptime);
if (jbuf->last_rtptime != -1 && ext_rtptime == jbuf->last_rtptime)
return jbuf->prev_out_time;
gstrtptime =
gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, jbuf->clock_rate);
/* keep track of the last extended rtptime */
jbuf->last_rtptime = ext_rtptime;
send_diff = 0;
if (G_LIKELY (jbuf->base_rtptime != -1)) {
/* check elapsed time in RTP units */
if (G_LIKELY (gstrtptime >= jbuf->base_rtptime)) {
/* elapsed time at sender */
send_diff = gstrtptime - jbuf->base_rtptime;
} else {
/* elapsed time at sender, timestamps can go backwards and thus be
* smaller than our base time, schedule to take a new base time in
* that case. */
GST_WARNING ("backward timestamps at server, schedule resync");
jbuf->need_resync = TRUE;
send_diff = 0;
}
}
/* need resync, lock on to time and gstrtptime if we can, otherwise we
* do with the previous values */
if (G_UNLIKELY (jbuf->need_resync && time != -1)) {
GST_INFO ("resync to time %" GST_TIME_FORMAT ", rtptime %"
GST_TIME_FORMAT, GST_TIME_ARGS (time), GST_TIME_ARGS (gstrtptime));
rtp_jitter_buffer_resync (jbuf, time, gstrtptime, ext_rtptime, FALSE);
send_diff = 0;
}
GST_DEBUG ("extrtp %" G_GUINT64_FORMAT ", gstrtp %" GST_TIME_FORMAT ", base %"
GST_TIME_FORMAT ", send_diff %" GST_TIME_FORMAT, ext_rtptime,
GST_TIME_ARGS (gstrtptime), GST_TIME_ARGS (jbuf->base_rtptime),
GST_TIME_ARGS (send_diff));
/* we don't have an arrival timestamp so we can't do skew detection. we
* should still apply a timestamp based on RTP timestamp and base_time */
@ -574,40 +657,9 @@ no_skew:
} else {
out_time += jbuf->skew;
}
/* check if timestamps are not going backwards, we can only check this if we
* have a previous out time and a previous send_diff */
if (G_LIKELY (jbuf->prev_out_time != -1 && jbuf->prev_send_diff != -1)) {
/* now check for backwards timestamps */
if (G_UNLIKELY (
/* if the server timestamps went up and the out_time backwards */
(send_diff > jbuf->prev_send_diff
&& out_time < jbuf->prev_out_time) ||
/* if the server timestamps went backwards and the out_time forwards */
(send_diff < jbuf->prev_send_diff
&& out_time > jbuf->prev_out_time) ||
/* if the server timestamps did not change */
send_diff == jbuf->prev_send_diff)) {
GST_DEBUG ("backwards timestamps, using previous time");
out_time = jbuf->prev_out_time;
}
}
if (time != -1 && out_time + jbuf->delay < time) {
/* if we are going to produce a timestamp that is later than the input
* timestamp, we need to reset the jitterbuffer. Likely the server paused
* temporarily */
GST_DEBUG ("out %" GST_TIME_FORMAT " + %" G_GUINT64_FORMAT " < time %"
GST_TIME_FORMAT ", reset jitterbuffer", GST_TIME_ARGS (out_time),
jbuf->delay, GST_TIME_ARGS (time));
rtp_jitter_buffer_resync (jbuf, time, gstrtptime, ext_rtptime, TRUE);
out_time = time;
send_diff = 0;
}
} else
out_time = -1;
jbuf->prev_out_time = out_time;
jbuf->prev_send_diff = send_diff;
GST_DEBUG ("skew %" G_GINT64_FORMAT ", out %" GST_TIME_FORMAT,
jbuf->skew, GST_TIME_ARGS (out_time));
@ -642,6 +694,7 @@ queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item)
* @item: an #RTPJitterBufferItem to insert
* @head: TRUE when the head element changed.
* @percent: the buffering percent after insertion
* @base_time: base time of the pipeline
*
* Inserts @item into the packet queue of @jbuf. The sequence number of the
* packet will be used to sort the packets. This function takes ownerhip of
@ -655,12 +708,16 @@ queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item)
*/
gboolean
rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
gboolean * head, gint * percent)
gboolean * head, gint * percent, GstClockTime base_time)
{
GList *list, *event = NULL;
guint32 rtptime;
guint64 ext_rtptime;
guint16 seqnum;
GstClockTime dts;
GstClockTime gstrtptime, dts;
GstClock *media_clock, *pipeline_clock;
guint64 media_clock_offset;
gboolean rfc7273_mode;
g_return_val_if_fail (jbuf != NULL, FALSE);
g_return_val_if_fail (item != NULL, FALSE);
@ -737,6 +794,37 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
}
}
/* Return the last time if we got the same RTP timestamp again */
ext_rtptime = gst_rtp_buffer_ext_timestamp (&jbuf->ext_rtptime, rtptime);
if (jbuf->last_rtptime != -1 && ext_rtptime == jbuf->last_rtptime) {
item->pts = jbuf->prev_out_time;
goto append;
}
/* keep track of the last extended rtptime */
jbuf->last_rtptime = ext_rtptime;
g_mutex_lock (&jbuf->clock_lock);
media_clock = jbuf->media_clock ? gst_object_ref (jbuf->media_clock) : NULL;
pipeline_clock =
jbuf->pipeline_clock ? gst_object_ref (jbuf->pipeline_clock) : NULL;
media_clock_offset = jbuf->media_clock_offset;
g_mutex_unlock (&jbuf->clock_lock);
gstrtptime =
gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, jbuf->clock_rate);
if (G_LIKELY (jbuf->base_rtptime != -1)) {
/* check elapsed time in RTP units */
if (gstrtptime < jbuf->base_rtptime) {
/* elapsed time at sender, timestamps can go backwards and thus be
* smaller than our base time, schedule to take a new base time in
* that case. */
GST_WARNING ("backward timestamps at server, schedule resync");
jbuf->need_resync = TRUE;
}
}
switch (jbuf->mode) {
case RTP_JITTER_BUFFER_MODE_NONE:
case RTP_JITTER_BUFFER_MODE_BUFFER:
@ -752,16 +840,178 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
case RTP_JITTER_BUFFER_MODE_SYNCED:
/* synchronized clocks, take first timestamp as base, use RTP timestamps
* to interpolate */
if (jbuf->base_time != -1)
if (jbuf->base_time != -1 && !jbuf->need_resync)
dts = -1;
break;
case RTP_JITTER_BUFFER_MODE_SLAVE:
default:
break;
}
/* need resync, lock on to time and gstrtptime if we can, otherwise we
* do with the previous values */
if (G_UNLIKELY (jbuf->need_resync && dts != -1)) {
GST_INFO ("resync to time %" GST_TIME_FORMAT ", rtptime %"
GST_TIME_FORMAT, GST_TIME_ARGS (time), GST_TIME_ARGS (gstrtptime));
rtp_jitter_buffer_resync (jbuf, dts, gstrtptime, ext_rtptime, FALSE);
}
GST_DEBUG ("extrtp %" G_GUINT64_FORMAT ", gstrtp %" GST_TIME_FORMAT ", base %"
GST_TIME_FORMAT ", send_diff %" GST_TIME_FORMAT, ext_rtptime,
GST_TIME_ARGS (gstrtptime), GST_TIME_ARGS (jbuf->base_rtptime),
GST_TIME_ARGS (gstrtptime - jbuf->base_rtptime));
rfc7273_mode = media_clock && pipeline_clock
&& gst_clock_is_synced (media_clock);
if (rfc7273_mode && jbuf->mode == RTP_JITTER_BUFFER_MODE_SLAVE
&& (media_clock_offset == -1 || !jbuf->rfc7273_sync)) {
GstClockTime internal, external;
GstClockTime rate_num, rate_denom;
GstClockTime nsrtptimediff, rtpntptime, rtpsystime;
gst_clock_get_calibration (media_clock, &internal, &external, &rate_num,
&rate_denom);
/* Slave to the RFC7273 media clock instead of trying to estimate it
* based on receive times and RTP timestamps */
if (jbuf->media_clock_base_time == -1) {
if (jbuf->base_time != -1) {
jbuf->media_clock_base_time =
gst_clock_unadjust_with_calibration (media_clock,
jbuf->base_time + base_time, internal, external, rate_num,
rate_denom);
} else {
if (dts != -1)
jbuf->media_clock_base_time =
gst_clock_unadjust_with_calibration (media_clock, dts + base_time,
internal, external, rate_num, rate_denom);
else
jbuf->media_clock_base_time =
gst_clock_get_internal_time (media_clock);
jbuf->base_rtptime = gstrtptime;
}
}
if (gstrtptime > jbuf->base_rtptime)
nsrtptimediff = gstrtptime - jbuf->base_rtptime;
else
nsrtptimediff = 0;
rtpntptime = nsrtptimediff + jbuf->media_clock_base_time;
rtpsystime =
gst_clock_adjust_with_calibration (media_clock, rtpntptime, internal,
external, rate_num, rate_denom);
if (rtpsystime > base_time)
item->pts = rtpsystime - base_time;
else
item->pts = 0;
GST_DEBUG ("RFC7273 clock time %" GST_TIME_FORMAT ", out %" GST_TIME_FORMAT,
GST_TIME_ARGS (rtpsystime), GST_TIME_ARGS (item->pts));
} else if (rfc7273_mode && (jbuf->mode == RTP_JITTER_BUFFER_MODE_SLAVE
|| jbuf->mode == RTP_JITTER_BUFFER_MODE_SYNCED)
&& media_clock_offset != -1 && jbuf->rfc7273_sync) {
GstClockTime ntptime, rtptime_tmp;
GstClockTime ntprtptime, rtpsystime;
GstClockTime internal, external;
GstClockTime rate_num, rate_denom;
/* Don't do any of the dts related adjustments further down */
dts = -1;
/* Calculate the actual clock time on the sender side based on the
* RFC7273 clock and convert it to our pipeline clock
*/
gst_clock_get_calibration (media_clock, &internal, &external, &rate_num,
&rate_denom);
ntptime = gst_clock_get_internal_time (media_clock);
ntprtptime = gst_util_uint64_scale (ntptime, jbuf->clock_rate, GST_SECOND);
ntprtptime += media_clock_offset;
ntprtptime &= 0xffffffff;
rtptime_tmp = rtptime;
/* Check for wraparounds, we assume that the diff between current RTP
* timestamp and current media clock time can't be bigger than
* 2**31 clock units */
if (ntprtptime > rtptime_tmp && ntprtptime - rtptime_tmp >= 0x80000000)
rtptime_tmp += G_GUINT64_CONSTANT (0x100000000);
else if (rtptime_tmp > ntprtptime && rtptime_tmp - ntprtptime >= 0x80000000)
ntprtptime += G_GUINT64_CONSTANT (0x100000000);
if (ntprtptime > rtptime_tmp)
ntptime -=
gst_util_uint64_scale (ntprtptime - rtptime_tmp, jbuf->clock_rate,
GST_SECOND);
else
ntptime +=
gst_util_uint64_scale (rtptime_tmp - ntprtptime, jbuf->clock_rate,
GST_SECOND);
rtpsystime =
gst_clock_adjust_with_calibration (media_clock, ntptime, internal,
external, rate_num, rate_denom);
/* All this assumes that the pipeline has enough additional
* latency to cover for the network delay */
if (rtpsystime > base_time)
item->pts = rtpsystime - base_time;
else
item->pts = 0;
GST_DEBUG ("RFC7273 clock time %" GST_TIME_FORMAT ", out %" GST_TIME_FORMAT,
GST_TIME_ARGS (rtpsystime), GST_TIME_ARGS (item->pts));
} else {
/* If we used the RFC7273 clock before and not anymore,
* we need to resync it later again */
jbuf->media_clock_base_time = -1;
/* do skew calculation by measuring the difference between rtptime and the
* receive dts, this function will return the skew corrected rtptime. */
item->pts = calculate_skew (jbuf, rtptime, dts);
item->pts = calculate_skew (jbuf, ext_rtptime, gstrtptime, dts);
}
/* check if timestamps are not going backwards, we can only check this if we
* have a previous out time and a previous send_diff */
if (G_LIKELY (item->pts != -1 && jbuf->prev_out_time != -1
&& jbuf->prev_send_diff != -1)) {
/* now check for backwards timestamps */
if (G_UNLIKELY (
/* if the server timestamps went up and the out_time backwards */
(gstrtptime - jbuf->base_rtptime > jbuf->prev_send_diff
&& item->pts < jbuf->prev_out_time) ||
/* if the server timestamps went backwards and the out_time forwards */
(gstrtptime - jbuf->base_rtptime < jbuf->prev_send_diff
&& item->pts > jbuf->prev_out_time) ||
/* if the server timestamps did not change */
gstrtptime - jbuf->base_rtptime == jbuf->prev_send_diff)) {
GST_DEBUG ("backwards timestamps, using previous time");
item->pts = jbuf->prev_out_time;
}
}
if (dts != -1 && item->pts + jbuf->delay < dts) {
/* if we are going to produce a timestamp that is later than the input
* timestamp, we need to reset the jitterbuffer. Likely the server paused
* temporarily */
GST_DEBUG ("out %" GST_TIME_FORMAT " + %" G_GUINT64_FORMAT " < time %"
GST_TIME_FORMAT ", reset jitterbuffer", GST_TIME_ARGS (item->pts),
jbuf->delay, GST_TIME_ARGS (time));
rtp_jitter_buffer_resync (jbuf, dts, gstrtptime, ext_rtptime, TRUE);
item->pts = dts;
}
jbuf->prev_out_time = item->pts;
jbuf->prev_send_diff = gstrtptime - jbuf->base_rtptime;
if (media_clock)
gst_object_unref (media_clock);
if (pipeline_clock)
gst_object_unref (pipeline_clock);
append:
queue_do_insert (jbuf, list, (GList *) item);

View file

@ -89,6 +89,7 @@ struct _RTPJitterBuffer {
gboolean need_resync;
GstClockTime base_time;
GstClockTime base_rtptime;
GstClockTime media_clock_base_time;
guint32 clock_rate;
GstClockTime base_extrtp;
GstClockTime prev_out_time;
@ -102,6 +103,14 @@ struct _RTPJitterBuffer {
gint64 skew;
gint64 prev_send_diff;
gboolean buffering_disabled;
GMutex clock_lock;
GstClock *pipeline_clock;
GstClock *media_clock;
gulong media_clock_synced_id;
guint64 media_clock_offset;
gboolean rfc7273_sync;
};
struct _RTPJitterBufferClass {
@ -150,11 +159,18 @@ void rtp_jitter_buffer_set_delay (RTPJitterBuffer *jbuf,
void rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer *jbuf, guint32 clock_rate);
guint32 rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf);
void rtp_jitter_buffer_set_media_clock (RTPJitterBuffer *jbuf, GstClock * clock, guint64 clock_offset);
void rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer *jbuf, GstClock * clock);
gboolean rtp_jitter_buffer_get_rfc7273_sync (RTPJitterBuffer *jbuf);
void rtp_jitter_buffer_set_rfc7273_sync (RTPJitterBuffer *jbuf, gboolean rfc7273_sync);
void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf);
gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf,
RTPJitterBufferItem *item,
gboolean *head, gint *percent);
gboolean *head, gint *percent,
GstClockTime base_time);
void rtp_jitter_buffer_disable_buffering (RTPJitterBuffer *jbuf, gboolean disabled);

View file

@ -227,6 +227,7 @@ gst_rtsp_src_ntp_time_source_get_type (void)
#define DEFAULT_NTP_TIME_SOURCE NTP_TIME_SOURCE_NTP
#define DEFAULT_USER_AGENT "GStreamer/" PACKAGE_VERSION
#define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
#define DEFAULT_RFC7273_SYNC FALSE
enum
{
@ -265,7 +266,8 @@ enum
PROP_DO_RETRANSMISSION,
PROP_NTP_TIME_SOURCE,
PROP_USER_AGENT,
PROP_MAX_RTCP_RTP_TIME_DIFF
PROP_MAX_RTCP_RTP_TIME_DIFF,
PROP_RFC7273_SYNC
};
#define GST_TYPE_RTSP_NAT_METHOD (gst_rtsp_nat_method_get_type())
@ -732,6 +734,12 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass)
DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
"Synchronize received streams to the RFC7273 clock "
"(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstRTSPSrc::handle-request:
* @rtspsrc: a #GstRTSPSrc
@ -879,6 +887,7 @@ gst_rtspsrc_init (GstRTSPSrc * src)
src->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
src->user_agent = g_strdup (DEFAULT_USER_AGENT);
src->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
src->rfc7273_sync = DEFAULT_RFC7273_SYNC;
/* get a list of all extensions */
src->extensions = gst_rtsp_ext_list_get ();
@ -1158,6 +1167,9 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value,
case PROP_MAX_RTCP_RTP_TIME_DIFF:
rtspsrc->max_rtcp_rtp_time_diff = g_value_get_int (value);
break;
case PROP_RFC7273_SYNC:
rtspsrc->rfc7273_sync = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -1304,6 +1316,9 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_MAX_RTCP_RTP_TIME_DIFF:
g_value_set_int (value, rtspsrc->max_rtcp_rtp_time_diff);
break;
case PROP_RFC7273_SYNC:
g_value_set_boolean (value, rtspsrc->rfc7273_sync);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -2988,6 +3003,10 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
g_object_set (src->manager, "ntp-sync", src->ntp_sync, NULL);
}
if (g_object_class_find_property (klass, "rfc7273-sync")) {
g_object_set (src->manager, "rfc7273-sync", src->rfc7273_sync, NULL);
}
if (src->use_pipeline_clock) {
if (g_object_class_find_property (klass, "use-pipeline-clock")) {
g_object_set (src->manager, "use-pipeline-clock", TRUE, NULL);

View file

@ -239,6 +239,7 @@ struct _GstRTSPSrc {
gint ntp_time_source;
gchar *user_agent;
GstClockTime max_rtcp_rtp_time_diff;
gboolean rfc7273_sync;
/* state */
GstRTSPState state;