From b76f9c8392c82a0be46d5d77d5b202eb655f9b7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 18 May 2023 13:59:10 +0300 Subject: [PATCH] ptp: Capture actual send/receive times in the helper process While this doesn't yet use any OS provided times from the actual network stack, this still gets rid of any IPC jitter between the helper process and the main process as part of the PTP time calculations and should improve accuracy. Part-of: --- .../gstreamer/libs/gst/helpers/ptp/clock.rs | 118 ++++++++++++++++++ .../gstreamer/libs/gst/helpers/ptp/ffi.rs | 118 ++++++++++++++++++ .../gstreamer/libs/gst/helpers/ptp/main.rs | 118 +++++++++++++++--- .../gstreamer/libs/gst/net/gstptpclock.c | 109 ++++++++++++++-- 4 files changed, 437 insertions(+), 26 deletions(-) create mode 100644 subprojects/gstreamer/libs/gst/helpers/ptp/clock.rs diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/clock.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/clock.rs new file mode 100644 index 0000000000..704d33cd4e --- /dev/null +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/clock.rs @@ -0,0 +1,118 @@ +// GStreamer +// +// Copyright (C) 2015-2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +#[cfg(target_os = "macos")] +/// Returns the monotonic system clock in nanoseconds or 0 on error. +pub fn time() -> u64 { + use std::{ + mem, + sync::atomic::{self, AtomicU32}, + }; + + use crate::ffi::unix::macos::*; + + static TIMEBASE_N: AtomicU32 = AtomicU32::new(0); + static TIMEBASE_D: AtomicU32 = AtomicU32::new(0); + + let mut timebase_n = TIMEBASE_N.load(atomic::Ordering::Relaxed); + let mut timebase_d = TIMEBASE_D.load(atomic::Ordering::Relaxed); + if timebase_n == 0 || timebase_d == 0 { + // SAFETY: This is safe to call at any time and returns the timebase. Returns 0 on success. + unsafe { + let mut timebase = mem::MaybeUninit::uninit(); + if mach_timebase_info(timebase.as_mut_ptr()) != 0 { + return 0; + } + let timebase = timebase.assume_init(); + timebase_n = timebase.numer; + timebase_d = timebase.denom; + + TIMEBASE_N.store(timebase_n, atomic::Ordering::Relaxed); + TIMEBASE_D.store(timebase_d, atomic::Ordering::Relaxed); + } + } + + // SAFETY: This is safe to call at any time. + let time = unsafe { mach_absolute_time() }; + + (time as u128 * timebase_n as u128 / timebase_d as u128) as u64 +} + +#[cfg(target_os = "windows")] +/// Returns the monotonic system clock in nanoseconds or 0 on error. +pub fn time() -> u64 { + use std::{ + mem, + sync::atomic::{self, AtomicI64}, + }; + + use crate::ffi::windows::*; + + static FREQUENCY: AtomicI64 = AtomicI64::new(0); + + let mut freq = FREQUENCY.load(atomic::Ordering::Relaxed); + if freq == 0 { + // SAFETY: This is safe to call at any time and will never fail on Windows XP or newer. + unsafe { + QueryPerformanceFrequency(&mut freq); + } + FREQUENCY.store(freq, atomic::Ordering::Relaxed); + } + + // SAFETY: This is safe to call at any time and will never fail on Windows XP or newer. + let time = unsafe { + let mut time = mem::MaybeUninit::uninit(); + QueryPerformanceCounter(time.as_mut_ptr()); + time.assume_init() + }; + + (time as u128 * 1_000_000_000 / freq as u128) as u64 +} + +#[cfg(any( + target_os = "linux", + target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd", + target_os = "dragonfly", + target_os = "solaris", + target_os = "illumos", +))] +/// Returns the monotonic system clock in nanoseconds or 0 on error. +pub fn time() -> u64 { + use std::mem; + + use crate::ffi::unix::clock_gettime::*; + + // SAFETY: This is safe to call at any time. 0 will be returned on success, any other value on + // error. + unsafe { + let mut timespec = mem::MaybeUninit::uninit(); + let res = clock_gettime(CLOCK_MONOTONIC, timespec.as_mut_ptr()); + if res == 0 { + let timespec = timespec.assume_init(); + + timespec.tv_sec as u64 * 1_000_000_000 + timespec.tv_nsec as u64 + } else { + 0 + } + } +} + +#[cfg(test)] +mod test { + #[test] + fn test_clock() { + // Check that this doesn't return 0 as that's very likely an indication of things going + // wrong. + let now = super::time(); + assert_ne!(now, 0); + } +} diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs index 99d8e231d1..8e6cc04a75 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs @@ -485,6 +485,121 @@ pub mod unix { } } + #[cfg(target_os = "macos")] + pub mod macos { + pub use super::*; + + #[repr(C)] + pub struct mach_timebase_info { + pub numer: u32, + pub denom: u32, + } + + extern "C" { + pub fn mach_timebase_info(info: *mut mach_timebase_info) -> c_int; + pub fn mach_absolute_time() -> u64; + } + } + + #[cfg(not(target_os = "macos"))] + pub mod clock_gettime { + pub use super::*; + + #[cfg(any( + target_os = "linux", + target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd", + target_os = "solaris", + target_os = "illumos", + ))] + pub type clock_id_t = c_int; + + #[cfg(target_os = "dragonfly")] + pub type clock_id_t = c_ulong; + + #[cfg(any(target_os = "solaris", target_os = "illumos"))] + pub type time_t = c_long; + + #[cfg(any(target_os = "openbsd", target_os = "netbsd", target_os = "dragonfly"))] + pub type time_t = i64; + + #[cfg(all(target_os = "freebsd", target_arch = "x86"))] + pub type time_t = i32; + #[cfg(all(target_os = "freebsd", not(target_arch = "x86")))] + pub type time_t = i64; + + #[cfg(all(target_os = "linux", target_env = "gnu", target_arch = "riscv32"))] + pub type time_t = i64; + #[cfg(all( + target_os = "linux", + target_env = "gnu", + any( + target_arch = "x86", + target_arch = "arm", + target_arch = "m68k", + target_arch = "mips", + target_arch = "powerpc", + target_arch = "sparc", + all(target_arch = "aarch64", target_pointer_width = "32"), + ) + ))] + pub type time_t = i32; + #[cfg(all( + target_os = "linux", + target_env = "gnu", + any( + target_arch = "x86_64", + all(target_arch = "aarch64", target_pointer_width = "64"), + target_arch = "powerpc64", + target_arch = "mips64", + target_arch = "s390x", + target_arch = "sparc64", + target_arch = "riscv64", + target_arch = "loongarch64", + ) + ))] + pub type time_t = i64; + #[cfg(all(target_os = "linux", target_env = "musl"))] + pub type time_t = c_long; + + #[cfg(all(target_os = "linux", target_env = "uclibc", target_arch = "x86_64"))] + pub type time_t = c_int; + #[cfg(all( + target_os = "linux", + target_env = "uclibc", + not(target_arch = "x86_64"), + ))] + pub type time_t = c_long; + + #[repr(C)] + pub struct timespec { + pub tv_sec: time_t, + #[cfg(all(target_arch = "x86_64", target_pointer_width = "32"))] + pub tv_nsec: i64, + #[cfg(not(all(target_arch = "x86_64", target_pointer_width = "32")))] + pub tv_nsec: c_long, + } + + #[cfg(any( + target_os = "freebsd", + target_os = "dragonfly", + target_os = "solaris", + target_os = "illumos", + ))] + pub const CLOCK_MONOTONIC: clock_id_t = 4; + + #[cfg(any(target_os = "openbsd", target_os = "netbsd",))] + pub const CLOCK_MONOTONIC: clock_id_t = 3; + + #[cfg(target_os = "linux")] + pub const CLOCK_MONOTONIC: clock_id_t = 1; + + extern "C" { + pub fn clock_gettime(clk_id: clock_id_t, tp: *mut timespec) -> c_int; + } + } + #[cfg(ptp_helper_permissions = "setcap")] pub mod setcaps { use super::*; @@ -623,6 +738,9 @@ pub mod windows { lppipeattributes: *mut c_void, nsize: u32, ) -> i32; + + pub fn QueryPerformanceFrequency(lpfrequence: *mut i64) -> i32; + pub fn QueryPerformanceCounter(lpperformancecount: *mut i64) -> i32; } pub const BCRYPT_USE_SYSTEM_PREFERRED_RNG: u32 = 0x00000002; diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs index 7daa7d4e75..76f7840b52 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs @@ -26,6 +26,7 @@ use std::{ mod log; mod args; +mod clock; mod error; mod ffi; mod io; @@ -51,6 +52,29 @@ const MSG_TYPE_EVENT: u8 = 0; const MSG_TYPE_GENERAL: u8 = 1; /// Clock ID message const MSG_TYPE_CLOCK_ID: u8 = 2; +/// Send time ACK message +const MSG_TYPE_SEND_TIME_ACK: u8 = 3; + +/// Reads a big endian 64 bit integer from a slice. +fn read_u64_be(s: &[u8]) -> u64 { + assert!(s.len() >= 8); + + (s[7] as u64) + | ((s[6] as u64) << 8) + | ((s[5] as u64) << 16) + | ((s[4] as u64) << 24) + | ((s[3] as u64) << 32) + | ((s[2] as u64) << 40) + | ((s[1] as u64) << 48) + | ((s[0] as u64) << 56) +} + +/// Reads a big endian 16 bit integer from a slice. +fn read_u16_be(s: &[u8]) -> u16 { + assert!(s.len() >= 2); + + (s[1] as u16) | ((s[0] as u16) << 8) +} /// Create a new `UdpSocket` for the given port and configure it for PTP. fn create_socket(port: u16) -> Result { @@ -173,7 +197,7 @@ fn run() -> Result<(), Error> { // We assume that stdout never blocks and stdin receives a complete valid packet whenever it is // ready and never blocks in the middle of a packet. let mut socket_buffer = [0u8; 8192]; - let mut stdinout_buffer = [0u8; 8192 + 3]; + let mut stdinout_buffer = [0u8; 8192 + 3 + 8]; loop { let poll_res = poll.poll().context("Failed polling")?; @@ -200,24 +224,36 @@ fn run() -> Result<(), Error> { bail!( source: err, "Failed reading from {} socket", - if idx == 0 { "event" } else { "general" } + if idx == MSG_TYPE_EVENT { + "event" + } else { + "general" + } ); } Ok((read, addr)) => { + let recv_time = clock::time(); if args.verbose { trace!( - "Received {} bytes from {} socket from {}", + "Received {} bytes from {} socket from {} at {}", read, - if idx == 0 { "event" } else { "general" }, - addr + if idx == MSG_TYPE_EVENT { + "event" + } else { + "general" + }, + addr, + recv_time, ); } - stdinout_buffer[0..2].copy_from_slice(&(read as u16).to_be_bytes()); + + stdinout_buffer[0..2].copy_from_slice(&(read as u16 + 8).to_be_bytes()); stdinout_buffer[2] = idx; - stdinout_buffer[3..][..read].copy_from_slice(&socket_buffer[..read]); + stdinout_buffer[3..][..8].copy_from_slice(&recv_time.to_be_bytes()); + stdinout_buffer[11..][..read].copy_from_slice(&socket_buffer[..read]); poll.stdout() - .write_all(&stdinout_buffer[..(read + 3)]) + .write_all(&stdinout_buffer[..(read + 3 + 8)]) .context("Failed writing to stdout")?; } } @@ -236,19 +272,38 @@ fn run() -> Result<(), Error> { } let type_ = stdinout_buffer[2]; - if args.verbose { - trace!( - "Received {} bytes for {} socket from stdin", - size, - if type_ == 0 { "event" } else { "general" }, - ); - } - poll.stdin() .read_exact(&mut stdinout_buffer[0..size as usize]) .context("Failed reading packet body from stdin")?; - let buf = &stdinout_buffer[0..size as usize]; + if type_ != MSG_TYPE_EVENT && type_ != MSG_TYPE_GENERAL { + warn!("Unexpected stdin message type {}", type_); + continue; + } + + if args.verbose { + trace!( + "Received {} bytes for {} socket from stdin", + size, + if type_ == MSG_TYPE_EVENT { + "event" + } else { + "general" + }, + ); + } + + if size < 8 + 32 { + bail!("Invalid packet body size"); + } + + let main_send_time = read_u64_be(&stdinout_buffer[..8]); + let buf = &stdinout_buffer[8..size as usize]; + let message_type = buf[0] & 0x0f; + let domain_number = buf[4]; + let seqnum = read_u16_be(&buf[30..][..2]); + + let send_time = clock::time(); match type_ { MSG_TYPE_EVENT => poll .event_socket() @@ -261,9 +316,36 @@ fn run() -> Result<(), Error> { .with_context(|| { format!( "Failed sending to {} socket", - if type_ == 0 { "event" } else { "general" } + if type_ == MSG_TYPE_EVENT { + "event" + } else { + "general" + } ) })?; + + if args.verbose { + trace!( + "Sending SEND_TIME_ACK for message type {}, domain number {}, seqnum {} received at {} at {}", + message_type, + domain_number, + seqnum, + main_send_time, + send_time, + ); + } + + stdinout_buffer[0..2].copy_from_slice(&12u16.to_be_bytes()); + stdinout_buffer[2] = MSG_TYPE_SEND_TIME_ACK; + let send_time_ack_msg = &mut stdinout_buffer[3..]; + send_time_ack_msg[..8].copy_from_slice(&send_time.to_be_bytes()); + send_time_ack_msg[8] = message_type; + send_time_ack_msg[9] = domain_number; + send_time_ack_msg[10..][..2].copy_from_slice(&seqnum.to_be_bytes()); + + poll.stdout() + .write_all(&stdinout_buffer[..(3 + 12)]) + .context("Failed writing to stdout")?; } } } diff --git a/subprojects/gstreamer/libs/gst/net/gstptpclock.c b/subprojects/gstreamer/libs/gst/net/gstptpclock.c index ad632d11f8..a673c32b11 100644 --- a/subprojects/gstreamer/libs/gst/net/gstptpclock.c +++ b/subprojects/gstreamer/libs/gst/net/gstptpclock.c @@ -239,9 +239,10 @@ typedef struct typedef enum { - TYPE_EVENT = 0, /* PTP message is payload */ - TYPE_GENERAL = 1, /* PTP message is payload */ + TYPE_EVENT = 0, /* 64-bit monotonic clock time and PTP message is payload */ + TYPE_GENERAL = 1, /* 64-bit monotonic clock time and PTP message is payload */ TYPE_CLOCK_ID = 2, /* 64-bit clock ID is payload */ + TYPE_SEND_TIME_ACK = 3, /* 64-bit monotonic clock time, 8-bit message type, 8-bit domain number and 16-bit sequence number is payload */ } StdIOMessageType; /* 2 byte BE payload size plus 1 byte message type */ @@ -990,16 +991,21 @@ handle_announce_message (PtpMessage * msg, GstClockTime receive_time) static gboolean send_delay_req_timeout (PtpPendingSync * sync) { - guint8 message[STDIO_MESSAGE_HEADER_SIZE + 44] = { 0, }; + guint8 message[STDIO_MESSAGE_HEADER_SIZE + 8 + 44] = { 0, }; GstByteWriter writer; gsize written; GError *err = NULL; + GstClockTime send_time; GST_TRACE ("Sending delay_req to domain %u", sync->domain); + sync->delay_req_send_time_local = send_time = + gst_clock_get_time (observation_system_clock); + gst_byte_writer_init_with_data (&writer, message, sizeof (message), FALSE); - gst_byte_writer_put_uint16_be_unchecked (&writer, 44); + gst_byte_writer_put_uint16_be_unchecked (&writer, 8 + 44); gst_byte_writer_put_uint8_unchecked (&writer, TYPE_EVENT); + gst_byte_writer_put_uint64_be_unchecked (&writer, send_time); gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ); gst_byte_writer_put_uint8_unchecked (&writer, 2); gst_byte_writer_put_uint16_be_unchecked (&writer, 44); @@ -1017,9 +1023,6 @@ send_delay_req_timeout (PtpPendingSync * sync) gst_byte_writer_put_uint64_be_unchecked (&writer, 0); gst_byte_writer_put_uint16_be_unchecked (&writer, 0); - sync->delay_req_send_time_local = - gst_clock_get_time (observation_system_clock); - if (!g_output_stream_write_all (stdin_pipe, message, sizeof (message), &written, NULL, &err)) { if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) @@ -1835,6 +1838,74 @@ handle_ptp_message (PtpMessage * msg, GstClockTime receive_time) } } +static void +handle_send_time_ack (const guint8 * data, gsize size, + GstClockTime receive_time) +{ + GstByteReader breader; + GstClockTime helper_send_time; + guint8 message_type; + guint8 domain_number; + guint16 seqnum; + GList *l; + PtpDomainData *domain = NULL; + PtpPendingSync *sync = NULL; + + gst_byte_reader_init (&breader, data, size); + helper_send_time = gst_byte_reader_get_uint64_be_unchecked (&breader); + message_type = gst_byte_reader_get_uint8_unchecked (&breader); + domain_number = gst_byte_reader_get_uint8_unchecked (&breader); + seqnum = gst_byte_reader_get_uint16_be_unchecked (&breader); + + GST_TRACE + ("Received SEND_TIME_ACK for message type %d, domain number %d, seqnum %d with send time %" + GST_TIME_FORMAT " at receive_time %" GST_TIME_FORMAT, message_type, + domain_number, seqnum, GST_TIME_ARGS (helper_send_time), + GST_TIME_ARGS (receive_time)); + + if (message_type != PTP_MESSAGE_TYPE_DELAY_REQ) + return; + + for (l = domain_data; l; l = l->next) { + PtpDomainData *tmp = l->data; + + if (domain_number == tmp->domain) { + domain = tmp; + break; + } + } + + if (!domain) + return; + + /* Check if we know about this one */ + for (l = domain->pending_syncs.head; l; l = l->next) { + PtpPendingSync *tmp = l->data; + + if (tmp->delay_req_seqnum == seqnum) { + sync = tmp; + break; + } + } + + if (!sync) + return; + + /* Got a DELAY_RESP for this already */ + if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE) + return; + + if (helper_send_time != 0) { + GST_TRACE ("DELAY_REQ message took %" GST_STIME_FORMAT + " to helper process, SEND_TIME_ACK took %" GST_STIME_FORMAT + " from helper process", + GST_STIME_ARGS ((GstClockTimeDiff) (helper_send_time - + sync->delay_req_send_time_local)), + GST_STIME_ARGS (receive_time - helper_send_time)); + sync->delay_req_send_time_local = helper_send_time; + } +} + static void have_stdout_header (GInputStream * stdout_pipe, GAsyncResult * res, gpointer user_data); @@ -1870,11 +1941,20 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res, case TYPE_EVENT: case TYPE_GENERAL:{ GstClockTime receive_time = gst_clock_get_time (observation_system_clock); + GstClockTime helper_receive_time; PtpMessage msg; - if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer, + helper_receive_time = GST_READ_UINT64_BE (stdout_buffer); + + if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer + 8, CUR_STDIO_HEADER_SIZE)) { dump_ptp_message (&msg); + if (helper_receive_time != 0) { + GST_TRACE ("Message took %" GST_STIME_FORMAT " from helper process", + GST_STIME_ARGS ((GstClockTimeDiff) (receive_time - + helper_receive_time))); + receive_time = helper_receive_time; + } handle_ptp_message (&msg, receive_time); } break; @@ -1898,6 +1978,19 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res, g_mutex_unlock (&ptp_lock); break; } + case TYPE_SEND_TIME_ACK:{ + GstClockTime receive_time = gst_clock_get_time (observation_system_clock); + + if (CUR_STDIO_HEADER_SIZE != 12) { + GST_ERROR ("Unexpected send time ack size (%u != 12)", + CUR_STDIO_HEADER_SIZE); + g_main_loop_quit (main_loop); + return; + } + + handle_send_time_ack (stdout_buffer, CUR_STDIO_HEADER_SIZE, receive_time); + break; + } default: break; }