From df247f091cbaafe8c395d1a3f03b2d0f424c0595 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 5 Jan 2016 16:15:16 +0200 Subject: [PATCH] rtpjitterbuffer: Add RFC7273 media clock handling https://bugzilla.gnome.org/show_bug.cgi?id=762259 --- gst/rtpmanager/gstrtpbin.c | 19 +- gst/rtpmanager/gstrtpbin.h | 1 + gst/rtpmanager/gstrtpjitterbuffer.c | 119 +++++++- gst/rtpmanager/rtpjitterbuffer.c | 408 ++++++++++++++++++++++------ gst/rtpmanager/rtpjitterbuffer.h | 18 +- gst/rtsp/gstrtspsrc.c | 21 +- gst/rtsp/gstrtspsrc.h | 1 + 7 files changed, 499 insertions(+), 88 deletions(-) diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index e6bacf2d2e..cbe6901831 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -298,6 +298,7 @@ enum #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000 #define DEFAULT_MAX_DROPOUT_TIME 60000 #define DEFAULT_MAX_MISORDER_TIME 2000 +#define DEFAULT_RFC7273_SYNC FALSE enum { @@ -320,7 +321,8 @@ enum PROP_RTCP_SYNC_SEND_TIME, PROP_MAX_RTCP_RTP_TIME_DIFF, PROP_MAX_DROPOUT_TIME, - PROP_MAX_MISORDER_TIME + PROP_MAX_MISORDER_TIME, + PROP_RFC7273_SYNC }; #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type()) @@ -1621,6 +1623,7 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) rtpbin->max_rtcp_rtp_time_diff, NULL); g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time, "max-misorder-time", rtpbin->max_misorder_time, NULL); + g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL); g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0, buffer, session->id, ssrc); @@ -2314,6 +2317,12 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC, + g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock", + "Synchronize received streams to the RFC7273 clock " + "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state); gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad); @@ -2383,6 +2392,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin) rtpbin->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF; rtpbin->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME; rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME; + rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC; /* some default SDES entries */ cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ()); @@ -2599,6 +2609,11 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id, gst_rtp_bin_propagate_property_to_session (rtpbin, "max-dropout-time", value); break; + case PROP_RFC7273_SYNC: + rtpbin->rfc7273_sync = g_value_get_boolean (value); + gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, + "rfc7273-sync", value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2681,6 +2696,8 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id, case PROP_MAX_MISORDER_TIME: g_value_set_uint (value, rtpbin->max_misorder_time); break; + case PROP_RFC7273_SYNC: + g_value_set_boolean (value, rtpbin->rfc7273_sync); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index 01539d24e3..15eb88a8db 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -73,6 +73,7 @@ struct _GstRtpBin { gint max_rtcp_rtp_time_diff; guint32 max_dropout_time; guint32 max_misorder_time; + gboolean rfc7273_sync; /* a list of session */ GSList *sessions; diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 65f7e149b3..ae2bf9da63 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -100,8 +100,10 @@ #endif #include +#include #include #include +#include #include "gstrtpjitterbuffer.h" #include "rtpjitterbuffer.h" @@ -141,6 +143,7 @@ enum #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000 #define DEFAULT_MAX_DROPOUT_TIME 60000 #define DEFAULT_MAX_MISORDER_TIME 2000 +#define DEFAULT_RFC7273_SYNC FALSE #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND) #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND) @@ -166,7 +169,8 @@ enum PROP_STATS, PROP_MAX_RTCP_RTP_TIME_DIFF, PROP_MAX_DROPOUT_TIME, - PROP_MAX_MISORDER_TIME + PROP_MAX_MISORDER_TIME, + PROP_RFC7273_SYNC }; #define JBUF_LOCK(priv) (g_mutex_lock (&(priv)->jbuf_lock)) @@ -413,6 +417,8 @@ static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element, static void gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad); static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element); +static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element, + GstClock * clock); /* pad overrides */ static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter); @@ -745,6 +751,12 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) DEFAULT_MAX_RTCP_RTP_TIME_DIFF, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC, + g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock", + "Synchronize received streams to the RFC7273 clock " + "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** * GstRtpJitterBuffer::request-pt-map: * @buffer: the object which received the signal @@ -820,6 +832,8 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad); gstelement_class->provide_clock = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock); + gstelement_class->set_clock = + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock); gst_element_class_add_static_pad_template (gstelement_class, &gst_rtp_jitter_buffer_src_template); @@ -1129,6 +1143,16 @@ gst_rtp_jitter_buffer_provide_clock (GstElement * element) return gst_system_clock_obtain (); } +static gboolean +gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock) +{ + GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element); + + rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock); + + return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock); +} + static void gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer) { @@ -1233,6 +1257,7 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, GstStructure *caps_struct; guint val; GstClockTime tval; + const gchar *ts_refclk, *mediaclk; priv = jitterbuffer->priv; @@ -1297,6 +1322,75 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT, GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop)); + if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) { + GstClock *clock = NULL; + guint64 clock_offset = -1; + + GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s", + ts_refclk); + + if (g_str_has_prefix (ts_refclk, "ntp=")) { + if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) { + GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks"); + } else { + const gchar *host, *portstr; + gchar *hostname; + guint port; + + host = ts_refclk + sizeof ("ntp=") - 1; + if (host[0] == '[') { + /* IPv6 */ + portstr = strchr (host, ']'); + if (portstr && portstr[1] == ':') + portstr = portstr + 1; + else + portstr = NULL; + } else { + portstr = strrchr (host, ':'); + } + + + if (!portstr || sscanf (portstr, ":%u", &port) != 1) + port = 123; + + if (portstr) + hostname = g_strndup (host, (portstr - host)); + else + hostname = g_strdup (host); + + clock = gst_ntp_clock_new (NULL, hostname, port, 0); + g_free (hostname); + } + } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) { + const gchar *domainstr = + ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1; + guint domain; + + if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1) + domain = 0; + + clock = gst_ptp_clock_new (NULL, domain); + } else { + GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock"); + } + + if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) { + GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk); + + if (!g_str_has_prefix (mediaclk, "direct=") + || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1) + GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock"); + if (strstr (mediaclk, "rate=") != NULL) { + GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported"); + clock_offset = -1; + } + } + + rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset); + } else { + rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1); + } + return TRUE; /* ERRORS */ @@ -1566,7 +1660,7 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event) GST_DEBUG_OBJECT (jitterbuffer, "adding event"); item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); - rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); + rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1); if (head) JBUF_SIGNAL_EVENT (priv); @@ -2625,7 +2719,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, RTPJitterBufferItem *item; item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); - rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); + rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1); } g_list_free (events); @@ -2741,7 +2835,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, * FALSE if a packet with the same seqnum was already in the queue, meaning we * have a duplicate. */ if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, - &head, &percent))) + &head, &percent, + gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer))))) goto duplicate; /* update timers */ @@ -3311,7 +3406,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, "retry", G_TYPE_UINT, num_rtx_retry, NULL)); item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1); - rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); + rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1); /* remove timer now */ remove_timer (jitterbuffer, timer); @@ -3780,7 +3875,7 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent, RTP_JITTER_BUFFER_MODE_BUFFER) { GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query"); item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1); - rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); + rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1); if (head) JBUF_SIGNAL_EVENT (priv); JBUF_WAIT_QUERY (priv, out_flushing); @@ -4018,6 +4113,12 @@ gst_rtp_jitter_buffer_set_property (GObject * object, priv->max_misorder_time = g_value_get_uint (value); JBUF_UNLOCK (priv); break; + case PROP_RFC7273_SYNC: + JBUF_LOCK (priv); + rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf, + g_value_get_boolean (value)); + JBUF_UNLOCK (priv); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -4138,6 +4239,12 @@ gst_rtp_jitter_buffer_get_property (GObject * object, g_value_set_uint (value, priv->max_misorder_time); JBUF_UNLOCK (priv); break; + case PROP_RFC7273_SYNC: + JBUF_LOCK (priv); + g_value_set_boolean (value, + rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf)); + JBUF_UNLOCK (priv); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c index 80d82663c4..41aa8e6d06 100644 --- a/gst/rtpmanager/rtpjitterbuffer.c +++ b/gst/rtpmanager/rtpjitterbuffer.c @@ -85,6 +85,8 @@ rtp_jitter_buffer_class_init (RTPJitterBufferClass * klass) static void rtp_jitter_buffer_init (RTPJitterBuffer * jbuf) { + g_mutex_init (&jbuf->clock_lock); + jbuf->packets = g_queue_new (); jbuf->mode = RTP_JITTER_BUFFER_MODE_SLAVE; @@ -98,8 +100,19 @@ rtp_jitter_buffer_finalize (GObject * object) jbuf = RTP_JITTER_BUFFER_CAST (object); + if (jbuf->media_clock_synced_id) + g_signal_handler_disconnect (jbuf->media_clock, + jbuf->media_clock_synced_id); + if (jbuf->media_clock) + gst_object_unref (jbuf->media_clock); + + if (jbuf->pipeline_clock) + gst_object_unref (jbuf->pipeline_clock); + g_queue_free (jbuf->packets); + g_mutex_clear (&jbuf->clock_lock); + G_OBJECT_CLASS (rtp_jitter_buffer_parent_class)->finalize (object); } @@ -199,6 +212,110 @@ rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer * jbuf) return jbuf->clock_rate; } +static void +media_clock_synced_cb (GstClock * clock, gboolean synced, + RTPJitterBuffer * jbuf) +{ + GstClockTime internal, external; + + g_mutex_lock (&jbuf->clock_lock); + if (jbuf->pipeline_clock) { + internal = gst_clock_get_internal_time (jbuf->media_clock); + external = gst_clock_get_time (jbuf->pipeline_clock); + + gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1); + } + g_mutex_unlock (&jbuf->clock_lock); +} + +/** + * rtp_jitter_buffer_set_media_clock: + * @jbuf: an #RTPJitterBuffer + * @clock: (transfer full): media #GstClock + * @clock_offset: RTP time at clock epoch or -1 + * + * Sets the media clock for the media and the clock offset + * + */ +void +rtp_jitter_buffer_set_media_clock (RTPJitterBuffer * jbuf, GstClock * clock, + guint64 clock_offset) +{ + g_mutex_lock (&jbuf->clock_lock); + if (jbuf->media_clock) { + if (jbuf->media_clock_synced_id) + g_signal_handler_disconnect (jbuf->media_clock, + jbuf->media_clock_synced_id); + jbuf->media_clock_synced_id = 0; + gst_object_unref (jbuf->media_clock); + } + jbuf->media_clock = clock; + jbuf->media_clock_offset = clock_offset; + + if (jbuf->pipeline_clock && jbuf->media_clock && + jbuf->pipeline_clock != jbuf->media_clock) { + jbuf->media_clock_synced_id = + g_signal_connect (jbuf->media_clock, "synced", + G_CALLBACK (media_clock_synced_cb), jbuf); + if (gst_clock_is_synced (jbuf->media_clock)) { + GstClockTime internal, external; + + internal = gst_clock_get_internal_time (jbuf->media_clock); + external = gst_clock_get_time (jbuf->pipeline_clock); + + gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1); + } + + gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock); + } + g_mutex_unlock (&jbuf->clock_lock); +} + +/** + * rtp_jitter_buffer_set_pipeline_clock: + * @jbuf: an #RTPJitterBuffer + * @clock: pipeline #GstClock + * + * Sets the pipeline clock + * + */ +void +rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer * jbuf, GstClock * clock) +{ + g_mutex_lock (&jbuf->clock_lock); + if (jbuf->pipeline_clock) + gst_object_unref (jbuf->pipeline_clock); + jbuf->pipeline_clock = clock ? gst_object_ref (clock) : NULL; + + if (jbuf->pipeline_clock && jbuf->media_clock && + jbuf->pipeline_clock != jbuf->media_clock) { + if (gst_clock_is_synced (jbuf->media_clock)) { + GstClockTime internal, external; + + internal = gst_clock_get_internal_time (jbuf->media_clock); + external = gst_clock_get_time (jbuf->pipeline_clock); + + gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1); + } + + gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock); + } + g_mutex_unlock (&jbuf->clock_lock); +} + +gboolean +rtp_jitter_buffer_get_rfc7273_sync (RTPJitterBuffer * jbuf) +{ + return jbuf->rfc7273_sync; +} + +void +rtp_jitter_buffer_set_rfc7273_sync (RTPJitterBuffer * jbuf, + gboolean rfc7273_sync) +{ + jbuf->rfc7273_sync = rfc7273_sync; +} + /** * rtp_jitter_buffer_reset_skew: * @jbuf: an #RTPJitterBuffer @@ -211,6 +328,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf) jbuf->base_time = -1; jbuf->base_rtptime = -1; jbuf->base_extrtp = -1; + jbuf->media_clock_base_time = -1; jbuf->ext_rtptime = -1; jbuf->last_rtptime = -1; jbuf->window_pos = 0; @@ -220,6 +338,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf) jbuf->prev_send_diff = -1; jbuf->prev_out_time = -1; jbuf->need_resync = TRUE; + GST_DEBUG ("reset skew correction"); } @@ -241,6 +360,7 @@ rtp_jitter_buffer_resync (RTPJitterBuffer * jbuf, GstClockTime time, GstClockTime gstrtptime, guint64 ext_rtptime, gboolean reset_skew) { jbuf->base_time = time; + jbuf->media_clock_base_time = -1; jbuf->base_rtptime = gstrtptime; jbuf->base_extrtp = ext_rtptime; jbuf->prev_out_time = -1; @@ -406,55 +526,18 @@ update_buffer_level (RTPJitterBuffer * jbuf, gint * percent) * Returns: @time adjusted with the clock skew. */ static GstClockTime -calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time) +calculate_skew (RTPJitterBuffer * jbuf, guint64 ext_rtptime, + GstClockTime gstrtptime, GstClockTime time) { - guint64 ext_rtptime; guint64 send_diff, recv_diff; gint64 delta; gint64 old; gint pos, i; - GstClockTime gstrtptime, out_time; + GstClockTime out_time; guint64 slope; - ext_rtptime = gst_rtp_buffer_ext_timestamp (&jbuf->ext_rtptime, rtptime); - - if (jbuf->last_rtptime != -1 && ext_rtptime == jbuf->last_rtptime) - return jbuf->prev_out_time; - - gstrtptime = - gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, jbuf->clock_rate); - - /* keep track of the last extended rtptime */ - jbuf->last_rtptime = ext_rtptime; - - send_diff = 0; - if (G_LIKELY (jbuf->base_rtptime != -1)) { - /* check elapsed time in RTP units */ - if (G_LIKELY (gstrtptime >= jbuf->base_rtptime)) { - send_diff = gstrtptime - jbuf->base_rtptime; - } else { - /* elapsed time at sender, timestamps can go backwards and thus be - * smaller than our base time, schedule to take a new base time in - * that case. */ - GST_WARNING ("backward timestamps at server, schedule resync"); - jbuf->need_resync = TRUE; - send_diff = 0; - } - } - - /* need resync, lock on to time and gstrtptime if we can, otherwise we - * do with the previous values */ - if (G_UNLIKELY (jbuf->need_resync && time != -1)) { - GST_INFO ("resync to time %" GST_TIME_FORMAT ", rtptime %" - GST_TIME_FORMAT, GST_TIME_ARGS (time), GST_TIME_ARGS (gstrtptime)); - rtp_jitter_buffer_resync (jbuf, time, gstrtptime, ext_rtptime, FALSE); - send_diff = 0; - } - - GST_DEBUG ("extrtp %" G_GUINT64_FORMAT ", gstrtp %" GST_TIME_FORMAT ", base %" - GST_TIME_FORMAT ", send_diff %" GST_TIME_FORMAT, ext_rtptime, - GST_TIME_ARGS (gstrtptime), GST_TIME_ARGS (jbuf->base_rtptime), - GST_TIME_ARGS (send_diff)); + /* elapsed time at sender */ + send_diff = gstrtptime - jbuf->base_rtptime; /* we don't have an arrival timestamp so we can't do skew detection. we * should still apply a timestamp based on RTP timestamp and base_time */ @@ -574,40 +657,9 @@ no_skew: } else { out_time += jbuf->skew; } - /* check if timestamps are not going backwards, we can only check this if we - * have a previous out time and a previous send_diff */ - if (G_LIKELY (jbuf->prev_out_time != -1 && jbuf->prev_send_diff != -1)) { - /* now check for backwards timestamps */ - if (G_UNLIKELY ( - /* if the server timestamps went up and the out_time backwards */ - (send_diff > jbuf->prev_send_diff - && out_time < jbuf->prev_out_time) || - /* if the server timestamps went backwards and the out_time forwards */ - (send_diff < jbuf->prev_send_diff - && out_time > jbuf->prev_out_time) || - /* if the server timestamps did not change */ - send_diff == jbuf->prev_send_diff)) { - GST_DEBUG ("backwards timestamps, using previous time"); - out_time = jbuf->prev_out_time; - } - } - if (time != -1 && out_time + jbuf->delay < time) { - /* if we are going to produce a timestamp that is later than the input - * timestamp, we need to reset the jitterbuffer. Likely the server paused - * temporarily */ - GST_DEBUG ("out %" GST_TIME_FORMAT " + %" G_GUINT64_FORMAT " < time %" - GST_TIME_FORMAT ", reset jitterbuffer", GST_TIME_ARGS (out_time), - jbuf->delay, GST_TIME_ARGS (time)); - rtp_jitter_buffer_resync (jbuf, time, gstrtptime, ext_rtptime, TRUE); - out_time = time; - send_diff = 0; - } } else out_time = -1; - jbuf->prev_out_time = out_time; - jbuf->prev_send_diff = send_diff; - GST_DEBUG ("skew %" G_GINT64_FORMAT ", out %" GST_TIME_FORMAT, jbuf->skew, GST_TIME_ARGS (out_time)); @@ -642,6 +694,7 @@ queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item) * @item: an #RTPJitterBufferItem to insert * @head: TRUE when the head element changed. * @percent: the buffering percent after insertion + * @base_time: base time of the pipeline * * Inserts @item into the packet queue of @jbuf. The sequence number of the * packet will be used to sort the packets. This function takes ownerhip of @@ -655,12 +708,16 @@ queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item) */ gboolean rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item, - gboolean * head, gint * percent) + gboolean * head, gint * percent, GstClockTime base_time) { GList *list, *event = NULL; guint32 rtptime; + guint64 ext_rtptime; guint16 seqnum; - GstClockTime dts; + GstClockTime gstrtptime, dts; + GstClock *media_clock, *pipeline_clock; + guint64 media_clock_offset; + gboolean rfc7273_mode; g_return_val_if_fail (jbuf != NULL, FALSE); g_return_val_if_fail (item != NULL, FALSE); @@ -737,6 +794,37 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item, } } + /* Return the last time if we got the same RTP timestamp again */ + ext_rtptime = gst_rtp_buffer_ext_timestamp (&jbuf->ext_rtptime, rtptime); + if (jbuf->last_rtptime != -1 && ext_rtptime == jbuf->last_rtptime) { + item->pts = jbuf->prev_out_time; + goto append; + } + + /* keep track of the last extended rtptime */ + jbuf->last_rtptime = ext_rtptime; + + g_mutex_lock (&jbuf->clock_lock); + media_clock = jbuf->media_clock ? gst_object_ref (jbuf->media_clock) : NULL; + pipeline_clock = + jbuf->pipeline_clock ? gst_object_ref (jbuf->pipeline_clock) : NULL; + media_clock_offset = jbuf->media_clock_offset; + g_mutex_unlock (&jbuf->clock_lock); + + gstrtptime = + gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, jbuf->clock_rate); + + if (G_LIKELY (jbuf->base_rtptime != -1)) { + /* check elapsed time in RTP units */ + if (gstrtptime < jbuf->base_rtptime) { + /* elapsed time at sender, timestamps can go backwards and thus be + * smaller than our base time, schedule to take a new base time in + * that case. */ + GST_WARNING ("backward timestamps at server, schedule resync"); + jbuf->need_resync = TRUE; + } + } + switch (jbuf->mode) { case RTP_JITTER_BUFFER_MODE_NONE: case RTP_JITTER_BUFFER_MODE_BUFFER: @@ -752,16 +840,178 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item, case RTP_JITTER_BUFFER_MODE_SYNCED: /* synchronized clocks, take first timestamp as base, use RTP timestamps * to interpolate */ - if (jbuf->base_time != -1) + if (jbuf->base_time != -1 && !jbuf->need_resync) dts = -1; break; case RTP_JITTER_BUFFER_MODE_SLAVE: default: break; } - /* do skew calculation by measuring the difference between rtptime and the - * receive dts, this function will return the skew corrected rtptime. */ - item->pts = calculate_skew (jbuf, rtptime, dts); + + /* need resync, lock on to time and gstrtptime if we can, otherwise we + * do with the previous values */ + if (G_UNLIKELY (jbuf->need_resync && dts != -1)) { + GST_INFO ("resync to time %" GST_TIME_FORMAT ", rtptime %" + GST_TIME_FORMAT, GST_TIME_ARGS (time), GST_TIME_ARGS (gstrtptime)); + rtp_jitter_buffer_resync (jbuf, dts, gstrtptime, ext_rtptime, FALSE); + } + + GST_DEBUG ("extrtp %" G_GUINT64_FORMAT ", gstrtp %" GST_TIME_FORMAT ", base %" + GST_TIME_FORMAT ", send_diff %" GST_TIME_FORMAT, ext_rtptime, + GST_TIME_ARGS (gstrtptime), GST_TIME_ARGS (jbuf->base_rtptime), + GST_TIME_ARGS (gstrtptime - jbuf->base_rtptime)); + + rfc7273_mode = media_clock && pipeline_clock + && gst_clock_is_synced (media_clock); + + if (rfc7273_mode && jbuf->mode == RTP_JITTER_BUFFER_MODE_SLAVE + && (media_clock_offset == -1 || !jbuf->rfc7273_sync)) { + GstClockTime internal, external; + GstClockTime rate_num, rate_denom; + GstClockTime nsrtptimediff, rtpntptime, rtpsystime; + + gst_clock_get_calibration (media_clock, &internal, &external, &rate_num, + &rate_denom); + + /* Slave to the RFC7273 media clock instead of trying to estimate it + * based on receive times and RTP timestamps */ + + if (jbuf->media_clock_base_time == -1) { + if (jbuf->base_time != -1) { + jbuf->media_clock_base_time = + gst_clock_unadjust_with_calibration (media_clock, + jbuf->base_time + base_time, internal, external, rate_num, + rate_denom); + } else { + if (dts != -1) + jbuf->media_clock_base_time = + gst_clock_unadjust_with_calibration (media_clock, dts + base_time, + internal, external, rate_num, rate_denom); + else + jbuf->media_clock_base_time = + gst_clock_get_internal_time (media_clock); + jbuf->base_rtptime = gstrtptime; + } + } + + if (gstrtptime > jbuf->base_rtptime) + nsrtptimediff = gstrtptime - jbuf->base_rtptime; + else + nsrtptimediff = 0; + + rtpntptime = nsrtptimediff + jbuf->media_clock_base_time; + + rtpsystime = + gst_clock_adjust_with_calibration (media_clock, rtpntptime, internal, + external, rate_num, rate_denom); + + if (rtpsystime > base_time) + item->pts = rtpsystime - base_time; + else + item->pts = 0; + + GST_DEBUG ("RFC7273 clock time %" GST_TIME_FORMAT ", out %" GST_TIME_FORMAT, + GST_TIME_ARGS (rtpsystime), GST_TIME_ARGS (item->pts)); + } else if (rfc7273_mode && (jbuf->mode == RTP_JITTER_BUFFER_MODE_SLAVE + || jbuf->mode == RTP_JITTER_BUFFER_MODE_SYNCED) + && media_clock_offset != -1 && jbuf->rfc7273_sync) { + GstClockTime ntptime, rtptime_tmp; + GstClockTime ntprtptime, rtpsystime; + GstClockTime internal, external; + GstClockTime rate_num, rate_denom; + + /* Don't do any of the dts related adjustments further down */ + dts = -1; + + /* Calculate the actual clock time on the sender side based on the + * RFC7273 clock and convert it to our pipeline clock + */ + + gst_clock_get_calibration (media_clock, &internal, &external, &rate_num, + &rate_denom); + + ntptime = gst_clock_get_internal_time (media_clock); + + ntprtptime = gst_util_uint64_scale (ntptime, jbuf->clock_rate, GST_SECOND); + ntprtptime += media_clock_offset; + ntprtptime &= 0xffffffff; + + rtptime_tmp = rtptime; + /* Check for wraparounds, we assume that the diff between current RTP + * timestamp and current media clock time can't be bigger than + * 2**31 clock units */ + if (ntprtptime > rtptime_tmp && ntprtptime - rtptime_tmp >= 0x80000000) + rtptime_tmp += G_GUINT64_CONSTANT (0x100000000); + else if (rtptime_tmp > ntprtptime && rtptime_tmp - ntprtptime >= 0x80000000) + ntprtptime += G_GUINT64_CONSTANT (0x100000000); + + if (ntprtptime > rtptime_tmp) + ntptime -= + gst_util_uint64_scale (ntprtptime - rtptime_tmp, jbuf->clock_rate, + GST_SECOND); + else + ntptime += + gst_util_uint64_scale (rtptime_tmp - ntprtptime, jbuf->clock_rate, + GST_SECOND); + + rtpsystime = + gst_clock_adjust_with_calibration (media_clock, ntptime, internal, + external, rate_num, rate_denom); + /* All this assumes that the pipeline has enough additional + * latency to cover for the network delay */ + if (rtpsystime > base_time) + item->pts = rtpsystime - base_time; + else + item->pts = 0; + + GST_DEBUG ("RFC7273 clock time %" GST_TIME_FORMAT ", out %" GST_TIME_FORMAT, + GST_TIME_ARGS (rtpsystime), GST_TIME_ARGS (item->pts)); + } else { + /* If we used the RFC7273 clock before and not anymore, + * we need to resync it later again */ + jbuf->media_clock_base_time = -1; + + /* do skew calculation by measuring the difference between rtptime and the + * receive dts, this function will return the skew corrected rtptime. */ + item->pts = calculate_skew (jbuf, ext_rtptime, gstrtptime, dts); + } + + /* check if timestamps are not going backwards, we can only check this if we + * have a previous out time and a previous send_diff */ + if (G_LIKELY (item->pts != -1 && jbuf->prev_out_time != -1 + && jbuf->prev_send_diff != -1)) { + /* now check for backwards timestamps */ + if (G_UNLIKELY ( + /* if the server timestamps went up and the out_time backwards */ + (gstrtptime - jbuf->base_rtptime > jbuf->prev_send_diff + && item->pts < jbuf->prev_out_time) || + /* if the server timestamps went backwards and the out_time forwards */ + (gstrtptime - jbuf->base_rtptime < jbuf->prev_send_diff + && item->pts > jbuf->prev_out_time) || + /* if the server timestamps did not change */ + gstrtptime - jbuf->base_rtptime == jbuf->prev_send_diff)) { + GST_DEBUG ("backwards timestamps, using previous time"); + item->pts = jbuf->prev_out_time; + } + } + if (dts != -1 && item->pts + jbuf->delay < dts) { + /* if we are going to produce a timestamp that is later than the input + * timestamp, we need to reset the jitterbuffer. Likely the server paused + * temporarily */ + GST_DEBUG ("out %" GST_TIME_FORMAT " + %" G_GUINT64_FORMAT " < time %" + GST_TIME_FORMAT ", reset jitterbuffer", GST_TIME_ARGS (item->pts), + jbuf->delay, GST_TIME_ARGS (time)); + rtp_jitter_buffer_resync (jbuf, dts, gstrtptime, ext_rtptime, TRUE); + item->pts = dts; + } + + jbuf->prev_out_time = item->pts; + jbuf->prev_send_diff = gstrtptime - jbuf->base_rtptime; + + if (media_clock) + gst_object_unref (media_clock); + if (pipeline_clock) + gst_object_unref (pipeline_clock); append: queue_do_insert (jbuf, list, (GList *) item); diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h index ba8da6db25..c7115fe4dd 100644 --- a/gst/rtpmanager/rtpjitterbuffer.h +++ b/gst/rtpmanager/rtpjitterbuffer.h @@ -89,6 +89,7 @@ struct _RTPJitterBuffer { gboolean need_resync; GstClockTime base_time; GstClockTime base_rtptime; + GstClockTime media_clock_base_time; guint32 clock_rate; GstClockTime base_extrtp; GstClockTime prev_out_time; @@ -102,6 +103,14 @@ struct _RTPJitterBuffer { gint64 skew; gint64 prev_send_diff; gboolean buffering_disabled; + + GMutex clock_lock; + GstClock *pipeline_clock; + GstClock *media_clock; + gulong media_clock_synced_id; + guint64 media_clock_offset; + + gboolean rfc7273_sync; }; struct _RTPJitterBufferClass { @@ -150,11 +159,18 @@ void rtp_jitter_buffer_set_delay (RTPJitterBuffer *jbuf, void rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer *jbuf, guint32 clock_rate); guint32 rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf); +void rtp_jitter_buffer_set_media_clock (RTPJitterBuffer *jbuf, GstClock * clock, guint64 clock_offset); +void rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer *jbuf, GstClock * clock); + +gboolean rtp_jitter_buffer_get_rfc7273_sync (RTPJitterBuffer *jbuf); +void rtp_jitter_buffer_set_rfc7273_sync (RTPJitterBuffer *jbuf, gboolean rfc7273_sync); + void rtp_jitter_buffer_reset_skew (RTPJitterBuffer *jbuf); gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, RTPJitterBufferItem *item, - gboolean *head, gint *percent); + gboolean *head, gint *percent, + GstClockTime base_time); void rtp_jitter_buffer_disable_buffering (RTPJitterBuffer *jbuf, gboolean disabled); diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index b41666d5bc..e2b3a410e1 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -227,6 +227,7 @@ gst_rtsp_src_ntp_time_source_get_type (void) #define DEFAULT_NTP_TIME_SOURCE NTP_TIME_SOURCE_NTP #define DEFAULT_USER_AGENT "GStreamer/" PACKAGE_VERSION #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000 +#define DEFAULT_RFC7273_SYNC FALSE enum { @@ -265,7 +266,8 @@ enum PROP_DO_RETRANSMISSION, PROP_NTP_TIME_SOURCE, PROP_USER_AGENT, - PROP_MAX_RTCP_RTP_TIME_DIFF + PROP_MAX_RTCP_RTP_TIME_DIFF, + PROP_RFC7273_SYNC }; #define GST_TYPE_RTSP_NAT_METHOD (gst_rtsp_nat_method_get_type()) @@ -732,6 +734,12 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass) DEFAULT_MAX_RTCP_RTP_TIME_DIFF, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC, + g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock", + "Synchronize received streams to the RFC7273 clock " + "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** * GstRTSPSrc::handle-request: * @rtspsrc: a #GstRTSPSrc @@ -879,6 +887,7 @@ gst_rtspsrc_init (GstRTSPSrc * src) src->ntp_time_source = DEFAULT_NTP_TIME_SOURCE; src->user_agent = g_strdup (DEFAULT_USER_AGENT); src->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF; + src->rfc7273_sync = DEFAULT_RFC7273_SYNC; /* get a list of all extensions */ src->extensions = gst_rtsp_ext_list_get (); @@ -1158,6 +1167,9 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value, case PROP_MAX_RTCP_RTP_TIME_DIFF: rtspsrc->max_rtcp_rtp_time_diff = g_value_get_int (value); break; + case PROP_RFC7273_SYNC: + rtspsrc->rfc7273_sync = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1304,6 +1316,9 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value, case PROP_MAX_RTCP_RTP_TIME_DIFF: g_value_set_int (value, rtspsrc->max_rtcp_rtp_time_diff); break; + case PROP_RFC7273_SYNC: + g_value_set_boolean (value, rtspsrc->rfc7273_sync); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2988,6 +3003,10 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream, g_object_set (src->manager, "ntp-sync", src->ntp_sync, NULL); } + if (g_object_class_find_property (klass, "rfc7273-sync")) { + g_object_set (src->manager, "rfc7273-sync", src->rfc7273_sync, NULL); + } + if (src->use_pipeline_clock) { if (g_object_class_find_property (klass, "use-pipeline-clock")) { g_object_set (src->manager, "use-pipeline-clock", TRUE, NULL); diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index dc218dc115..df1741cfe2 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -239,6 +239,7 @@ struct _GstRTSPSrc { gint ntp_time_source; gchar *user_agent; GstClockTime max_rtcp_rtp_time_diff; + gboolean rfc7273_sync; /* state */ GstRTSPState state;