From 7ffc830959a1b388ad386685465e8e1b2cc93e2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 6 Apr 2022 15:39:14 +0300 Subject: [PATCH] rtpsession: Update 64-bit NTP header extensions with the actual NTP time in senders Part-of: --- .../gst/rtpmanager/gstrtpsession.c | 79 ++++++- .../gst/rtpmanager/rtpsession.c | 192 +++++++++++++++++- .../gst/rtpmanager/rtpsession.h | 6 +- .../gst/rtpmanager/rtpstats.h | 5 + 4 files changed, 278 insertions(+), 4 deletions(-) diff --git a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtpsession.c b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtpsession.c index cc9d59f7a4..0f018aa336 100644 --- a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtpsession.c +++ b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtpsession.c @@ -2402,6 +2402,8 @@ gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession, GstFlowReturn ret; GstClockTime timestamp, running_time; GstClockTime current_time; + guint64 ntpnstime; + GstClock *clock; priv = rtpsession->priv; @@ -2435,8 +2437,83 @@ gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession, } current_time = gst_clock_get_time (priv->sysclock); + + /* Calculate the NTP time of this packet based on the session configuration + * and the running time from above */ + GST_OBJECT_LOCK (rtpsession); + if (running_time != -1 && (clock = GST_ELEMENT_CLOCK (rtpsession))) { + GstClockTime base_time; + base_time = GST_ELEMENT_CAST (rtpsession)->base_time; + gst_object_ref (clock); + GST_OBJECT_UNLOCK (rtpsession); + + if (rtpsession->priv->use_pipeline_clock) { + ntpnstime = running_time; + /* add constant to convert from 1970 based time to 1900 based time */ + ntpnstime += (2208988800LL * GST_SECOND); + } else { + switch (rtpsession->priv->ntp_time_source) { + case GST_RTP_NTP_TIME_SOURCE_NTP: + case GST_RTP_NTP_TIME_SOURCE_UNIX:{ + GstClockTime wallclock_now, pipeline_now; + + /* pipeline clock time for this packet */ + ntpnstime = running_time + base_time; + + /* get current wallclock and pipeline clock time */ + wallclock_now = g_get_real_time () * GST_USECOND; + pipeline_now = gst_clock_get_time (clock); + + /* adjust pipeline clock time by the current diff. + * Note that this will include some jitter for each packet */ + if (wallclock_now > pipeline_now) { + GstClockTime diff = wallclock_now - pipeline_now; + + ntpnstime += diff; + } else { + GstClockTime diff = pipeline_now - wallclock_now; + + if (diff > ntpnstime) { + /* This can't really happen unless the clock configuration is + * broken */ + ntpnstime = GST_CLOCK_TIME_NONE; + } else { + ntpnstime -= diff; + } + } + + /* add constant to convert from 1970 based time to 1900 based time */ + if (ntpnstime != GST_CLOCK_TIME_NONE + && rtpsession->priv->ntp_time_source == + GST_RTP_NTP_TIME_SOURCE_NTP) + ntpnstime += (2208988800LL * GST_SECOND); + break; + } + case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME: + ntpnstime = running_time; + break; + case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME: + ntpnstime = running_time + base_time; + break; + default: + ntpnstime = -1; + g_assert_not_reached (); + break; + } + } + + gst_object_unref (clock); + } else { + if (!GST_ELEMENT_CLOCK (rtpsession)) { + GST_WARNING_OBJECT (rtpsession, + "Don't have a clock yet and can't determine NTP time for this packet"); + } + GST_OBJECT_UNLOCK (rtpsession); + ntpnstime = GST_CLOCK_TIME_NONE; + } + ret = rtp_session_send_rtp (priv->session, data, is_list, current_time, - running_time); + running_time, ntpnstime); if (ret != GST_FLOW_OK) goto push_error; diff --git a/subprojects/gst-plugins-good/gst/rtpmanager/rtpsession.c b/subprojects/gst-plugins-good/gst/rtpmanager/rtpsession.c index 80b788a3fc..fdb17bed82 100644 --- a/subprojects/gst-plugins-good/gst/rtpmanager/rtpsession.c +++ b/subprojects/gst-plugins-good/gst/rtpmanager/rtpsession.c @@ -2118,6 +2118,25 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo) pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp, &pinfo->header_ext_bit_pattern); } + + if (pinfo->ntp64_ext_id != 0 && pinfo->send && !pinfo->have_ntp64_ext) { + guint8 *data; + guint size; + + /* Remember here that there is a 64-bit NTP header extension on this buffer + * or any of the other buffers in the buffer list. + * Later we update this after making the buffer(list) writable. + */ + if ((gst_rtp_buffer_get_extension_onebyte_header (&rtp, + pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size) + && size == 8) + || (gst_rtp_buffer_get_extension_twobytes_header (&rtp, NULL, + pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size) + && size == 8)) { + pinfo->have_ntp64_ext = TRUE; + } + } + gst_rtp_buffer_unmap (&rtp); } @@ -2166,6 +2185,8 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo, pinfo->payload_len = 0; pinfo->packets = 0; pinfo->marker = FALSE; + pinfo->ntp64_ext_id = send ? sess->send_ntp64_ext_id : 0; + pinfo->have_ntp64_ext = FALSE; if (is_list) { GstBufferList *list = GST_BUFFER_LIST_CAST (data); @@ -3125,6 +3146,29 @@ invalid_packet: } } +static guint8 +_get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name) +{ + guint i; + guint8 extmap_id = 0; + guint n_fields = gst_structure_n_fields (s); + + for (i = 0; i < n_fields; i++) { + const gchar *field_name = gst_structure_nth_field_name (s, i); + if (g_str_has_prefix (field_name, "extmap-")) { + const gchar *str = gst_structure_get_string (s, field_name); + if (str && g_strcmp0 (str, ext_name) == 0) { + gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10); + if (id > 0 && id < 15) { + extmap_id = id; + break; + } + } + } + } + return extmap_id; +} + /** * rtp_session_update_send_caps: * @sess: an #RTPSession @@ -3180,9 +3224,151 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps) sess->internal_ssrc_from_caps_or_property = FALSE; } + sess->send_ntp64_ext_id = + _get_extmap_id_for_attribute (s, + GST_RTP_HDREXT_BASE GST_RTP_HDREXT_NTP_64); + rtp_twcc_manager_parse_send_ext_id (sess->twcc, s); } +static void +update_ntp64_header_ext_data (RTPPacketInfo * pinfo, GstBuffer * buffer) +{ + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + + if (gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtp)) { + guint16 bits; + guint8 *data; + guint wordlen; + + if (gst_rtp_buffer_get_extension_data (&rtp, &bits, (gpointer *) & data, + &wordlen)) { + gsize len = wordlen * 4; + + /* One-byte header */ + if (bits == 0xBEDE) { + /* One-byte header extension */ + while (TRUE) { + guint8 ext_id, ext_len; + + if (len < 1) + break; + + ext_id = GST_READ_UINT8 (data) >> 4; + ext_len = (GST_READ_UINT8 (data) & 0xF) + 1; + data += 1; + len -= 1; + if (ext_id == 0) { + /* Skip padding */ + continue; + } else if (ext_id == 15) { + /* Stop parsing */ + break; + } + + /* extension doesn't fit into the header */ + if (ext_len > len) + break; + + if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) { + if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) { + guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime, + G_GUINT64_CONSTANT (1) << 32, + GST_SECOND); + + GST_WRITE_UINT64_BE (data, ntptime); + } else { + /* Replace extension with padding */ + memset (data - 1, 0, 1 + ext_len); + } + } + + /* skip to the next extension */ + data += ext_len; + len -= ext_len; + } + } else if ((bits >> 4) == 0x100) { + /* Two-byte header extension */ + + while (TRUE) { + guint8 ext_id, ext_len; + + if (len < 1) + break; + + ext_id = GST_READ_UINT8 (data); + data += 1; + len -= 1; + if (ext_id == 0) { + /* Skip padding */ + continue; + } + + ext_len = GST_READ_UINT8 (data); + data += 1; + len -= 1; + + /* extension doesn't fit into the header */ + if (ext_len > len) + break; + + if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) { + if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) { + guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime, + G_GUINT64_CONSTANT (1) << 32, + GST_SECOND); + + GST_WRITE_UINT64_BE (data, ntptime); + } else { + /* Replace extension with padding */ + memset (data - 2, 0, 2 + ext_len); + } + } + + /* skip to the next extension */ + data += ext_len; + len -= ext_len; + } + } + } + gst_rtp_buffer_unmap (&rtp); + } +} + +static void +update_ntp64_header_ext (RTPPacketInfo * pinfo) +{ + /* Early return if we don't know the header extension id or the packets + * don't contain the header extension */ + if (pinfo->ntp64_ext_id == 0 || !pinfo->have_ntp64_ext) + return; + + /* If no NTP time is known then the header extension will be replaced with + * padding, otherwise it will be updated */ + GST_TRACE + ("Updating NTP-64 header extension for SSRC %08x packet with RTP time %u and running time %" + GST_TIME_FORMAT " to %" GST_TIME_FORMAT, pinfo->ssrc, pinfo->rtptime, + GST_TIME_ARGS (pinfo->running_time), GST_TIME_ARGS (pinfo->ntpnstime)); + + if (GST_IS_BUFFER_LIST (pinfo->data)) { + GstBufferList *list; + guint i = 0; + + pinfo->data = gst_buffer_list_make_writable (pinfo->data); + + list = GST_BUFFER_LIST (pinfo->data); + + for (i = 0; i < gst_buffer_list_length (list); i++) { + GstBuffer *buffer = gst_buffer_list_get_writable (list, i); + + update_ntp64_header_ext_data (pinfo, buffer); + } + } else { + pinfo->data = gst_buffer_make_writable (pinfo->data); + update_ntp64_header_ext_data (pinfo, pinfo->data); + } +} + /** * rtp_session_send_rtp: * @sess: an #RTPSession @@ -3198,7 +3384,7 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps) */ GstFlowReturn rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, - GstClockTime current_time, GstClockTime running_time) + GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime) { GstFlowReturn result; RTPSource *source; @@ -3214,9 +3400,11 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, RTP_SESSION_LOCK (sess); if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data, - current_time, running_time, -1)) + current_time, running_time, ntpnstime)) goto invalid_packet; + /* Update any 64-bit NTP header extensions with the actual NTP time here */ + update_ntp64_header_ext (&pinfo); rtp_twcc_manager_send_packet (sess->twcc, &pinfo); source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time); diff --git a/subprojects/gst-plugins-good/gst/rtpmanager/rtpsession.h b/subprojects/gst-plugins-good/gst/rtpmanager/rtpsession.h index 4c6d18c580..d68aa3c112 100644 --- a/subprojects/gst-plugins-good/gst/rtpmanager/rtpsession.h +++ b/subprojects/gst-plugins-good/gst/rtpmanager/rtpsession.h @@ -309,6 +309,9 @@ struct _RTPSession { gboolean timestamp_sender_reports; + /* RFC6051 64-bit NTP header extension */ + guint8 send_ntp64_ext_id; + /* Transport-wide cc-extension */ RTPTWCCManager *twcc; RTPTWCCStats *twcc_stats; @@ -410,7 +413,8 @@ GstFlowReturn rtp_session_process_rtcp (RTPSession *sess, GstBuffer /* processing packets for sending */ void rtp_session_update_send_caps (RTPSession *sess, GstCaps *caps); GstFlowReturn rtp_session_send_rtp (RTPSession *sess, gpointer data, gboolean is_list, - GstClockTime current_time, GstClockTime running_time); + GstClockTime current_time, GstClockTime running_time, + guint64 ntpnstime); /* scheduling bye */ void rtp_session_mark_all_bye (RTPSession *sess, const gchar *reason); diff --git a/subprojects/gst-plugins-good/gst/rtpmanager/rtpstats.h b/subprojects/gst-plugins-good/gst/rtpmanager/rtpstats.h index 27f80cd530..45ad377ee4 100644 --- a/subprojects/gst-plugins-good/gst/rtpmanager/rtpstats.h +++ b/subprojects/gst-plugins-good/gst/rtpmanager/rtpstats.h @@ -83,6 +83,9 @@ typedef struct { * @csrcs: CSRCs * @header_ext: Header extension data * @header_ext_bit_pattern: Header extension bit pattern + * @ntp64_ext_id: Extension header ID for RFC6051 64-bit NTP timestamp. + * @have_ntp64_ext: If there is at least one 64-bit NTP timestamp header + * extension. * * Structure holding information about the packet. */ @@ -109,6 +112,8 @@ typedef struct { guint32 csrcs[16]; GBytes *header_ext; guint16 header_ext_bit_pattern; + guint8 ntp64_ext_id; + gboolean have_ntp64_ext; } RTPPacketInfo; /**