diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/clock.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/clock.rs new file mode 100644 index 0000000000..704d33cd4e --- /dev/null +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/clock.rs @@ -0,0 +1,118 @@ +// GStreamer +// +// Copyright (C) 2015-2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +#[cfg(target_os = "macos")] +/// Returns the monotonic system clock in nanoseconds or 0 on error. +pub fn time() -> u64 { + use std::{ + mem, + sync::atomic::{self, AtomicU32}, + }; + + use crate::ffi::unix::macos::*; + + static TIMEBASE_N: AtomicU32 = AtomicU32::new(0); + static TIMEBASE_D: AtomicU32 = AtomicU32::new(0); + + let mut timebase_n = TIMEBASE_N.load(atomic::Ordering::Relaxed); + let mut timebase_d = TIMEBASE_D.load(atomic::Ordering::Relaxed); + if timebase_n == 0 || timebase_d == 0 { + // SAFETY: This is safe to call at any time and returns the timebase. Returns 0 on success. + unsafe { + let mut timebase = mem::MaybeUninit::uninit(); + if mach_timebase_info(timebase.as_mut_ptr()) != 0 { + return 0; + } + let timebase = timebase.assume_init(); + timebase_n = timebase.numer; + timebase_d = timebase.denom; + + TIMEBASE_N.store(timebase_n, atomic::Ordering::Relaxed); + TIMEBASE_D.store(timebase_d, atomic::Ordering::Relaxed); + } + } + + // SAFETY: This is safe to call at any time. + let time = unsafe { mach_absolute_time() }; + + (time as u128 * timebase_n as u128 / timebase_d as u128) as u64 +} + +#[cfg(target_os = "windows")] +/// Returns the monotonic system clock in nanoseconds or 0 on error. +pub fn time() -> u64 { + use std::{ + mem, + sync::atomic::{self, AtomicI64}, + }; + + use crate::ffi::windows::*; + + static FREQUENCY: AtomicI64 = AtomicI64::new(0); + + let mut freq = FREQUENCY.load(atomic::Ordering::Relaxed); + if freq == 0 { + // SAFETY: This is safe to call at any time and will never fail on Windows XP or newer. + unsafe { + QueryPerformanceFrequency(&mut freq); + } + FREQUENCY.store(freq, atomic::Ordering::Relaxed); + } + + // SAFETY: This is safe to call at any time and will never fail on Windows XP or newer. + let time = unsafe { + let mut time = mem::MaybeUninit::uninit(); + QueryPerformanceCounter(time.as_mut_ptr()); + time.assume_init() + }; + + (time as u128 * 1_000_000_000 / freq as u128) as u64 +} + +#[cfg(any( + target_os = "linux", + target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd", + target_os = "dragonfly", + target_os = "solaris", + target_os = "illumos", +))] +/// Returns the monotonic system clock in nanoseconds or 0 on error. +pub fn time() -> u64 { + use std::mem; + + use crate::ffi::unix::clock_gettime::*; + + // SAFETY: This is safe to call at any time. 0 will be returned on success, any other value on + // error. + unsafe { + let mut timespec = mem::MaybeUninit::uninit(); + let res = clock_gettime(CLOCK_MONOTONIC, timespec.as_mut_ptr()); + if res == 0 { + let timespec = timespec.assume_init(); + + timespec.tv_sec as u64 * 1_000_000_000 + timespec.tv_nsec as u64 + } else { + 0 + } + } +} + +#[cfg(test)] +mod test { + #[test] + fn test_clock() { + // Check that this doesn't return 0 as that's very likely an indication of things going + // wrong. + let now = super::time(); + assert_ne!(now, 0); + } +} diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs index 99d8e231d1..8e6cc04a75 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs @@ -485,6 +485,121 @@ pub mod unix { } } + #[cfg(target_os = "macos")] + pub mod macos { + pub use super::*; + + #[repr(C)] + pub struct mach_timebase_info { + pub numer: u32, + pub denom: u32, + } + + extern "C" { + pub fn mach_timebase_info(info: *mut mach_timebase_info) -> c_int; + pub fn mach_absolute_time() -> u64; + } + } + + #[cfg(not(target_os = "macos"))] + pub mod clock_gettime { + pub use super::*; + + #[cfg(any( + target_os = "linux", + target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd", + target_os = "solaris", + target_os = "illumos", + ))] + pub type clock_id_t = c_int; + + #[cfg(target_os = "dragonfly")] + pub type clock_id_t = c_ulong; + + #[cfg(any(target_os = "solaris", target_os = "illumos"))] + pub type time_t = c_long; + + #[cfg(any(target_os = "openbsd", target_os = "netbsd", target_os = "dragonfly"))] + pub type time_t = i64; + + #[cfg(all(target_os = "freebsd", target_arch = "x86"))] + pub type time_t = i32; + #[cfg(all(target_os = "freebsd", not(target_arch = "x86")))] + pub type time_t = i64; + + #[cfg(all(target_os = "linux", target_env = "gnu", target_arch = "riscv32"))] + pub type time_t = i64; + #[cfg(all( + target_os = "linux", + target_env = "gnu", + any( + target_arch = "x86", + target_arch = "arm", + target_arch = "m68k", + target_arch = "mips", + target_arch = "powerpc", + target_arch = "sparc", + all(target_arch = "aarch64", target_pointer_width = "32"), + ) + ))] + pub type time_t = i32; + #[cfg(all( + target_os = "linux", + target_env = "gnu", + any( + target_arch = "x86_64", + all(target_arch = "aarch64", target_pointer_width = "64"), + target_arch = "powerpc64", + target_arch = "mips64", + target_arch = "s390x", + target_arch = "sparc64", + target_arch = "riscv64", + target_arch = "loongarch64", + ) + ))] + pub type time_t = i64; + #[cfg(all(target_os = "linux", target_env = "musl"))] + pub type time_t = c_long; + + #[cfg(all(target_os = "linux", target_env = "uclibc", target_arch = "x86_64"))] + pub type time_t = c_int; + #[cfg(all( + target_os = "linux", + target_env = "uclibc", + not(target_arch = "x86_64"), + ))] + pub type time_t = c_long; + + #[repr(C)] + pub struct timespec { + pub tv_sec: time_t, + #[cfg(all(target_arch = "x86_64", target_pointer_width = "32"))] + pub tv_nsec: i64, + #[cfg(not(all(target_arch = "x86_64", target_pointer_width = "32")))] + pub tv_nsec: c_long, + } + + #[cfg(any( + target_os = "freebsd", + target_os = "dragonfly", + target_os = "solaris", + target_os = "illumos", + ))] + pub const CLOCK_MONOTONIC: clock_id_t = 4; + + #[cfg(any(target_os = "openbsd", target_os = "netbsd",))] + pub const CLOCK_MONOTONIC: clock_id_t = 3; + + #[cfg(target_os = "linux")] + pub const CLOCK_MONOTONIC: clock_id_t = 1; + + extern "C" { + pub fn clock_gettime(clk_id: clock_id_t, tp: *mut timespec) -> c_int; + } + } + #[cfg(ptp_helper_permissions = "setcap")] pub mod setcaps { use super::*; @@ -623,6 +738,9 @@ pub mod windows { lppipeattributes: *mut c_void, nsize: u32, ) -> i32; + + pub fn QueryPerformanceFrequency(lpfrequence: *mut i64) -> i32; + pub fn QueryPerformanceCounter(lpperformancecount: *mut i64) -> i32; } pub const BCRYPT_USE_SYSTEM_PREFERRED_RNG: u32 = 0x00000002; diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs index 7daa7d4e75..76f7840b52 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs @@ -26,6 +26,7 @@ use std::{ mod log; mod args; +mod clock; mod error; mod ffi; mod io; @@ -51,6 +52,29 @@ const MSG_TYPE_EVENT: u8 = 0; const MSG_TYPE_GENERAL: u8 = 1; /// Clock ID message const MSG_TYPE_CLOCK_ID: u8 = 2; +/// Send time ACK message +const MSG_TYPE_SEND_TIME_ACK: u8 = 3; + +/// Reads a big endian 64 bit integer from a slice. +fn read_u64_be(s: &[u8]) -> u64 { + assert!(s.len() >= 8); + + (s[7] as u64) + | ((s[6] as u64) << 8) + | ((s[5] as u64) << 16) + | ((s[4] as u64) << 24) + | ((s[3] as u64) << 32) + | ((s[2] as u64) << 40) + | ((s[1] as u64) << 48) + | ((s[0] as u64) << 56) +} + +/// Reads a big endian 16 bit integer from a slice. +fn read_u16_be(s: &[u8]) -> u16 { + assert!(s.len() >= 2); + + (s[1] as u16) | ((s[0] as u16) << 8) +} /// Create a new `UdpSocket` for the given port and configure it for PTP. fn create_socket(port: u16) -> Result { @@ -173,7 +197,7 @@ fn run() -> Result<(), Error> { // We assume that stdout never blocks and stdin receives a complete valid packet whenever it is // ready and never blocks in the middle of a packet. let mut socket_buffer = [0u8; 8192]; - let mut stdinout_buffer = [0u8; 8192 + 3]; + let mut stdinout_buffer = [0u8; 8192 + 3 + 8]; loop { let poll_res = poll.poll().context("Failed polling")?; @@ -200,24 +224,36 @@ fn run() -> Result<(), Error> { bail!( source: err, "Failed reading from {} socket", - if idx == 0 { "event" } else { "general" } + if idx == MSG_TYPE_EVENT { + "event" + } else { + "general" + } ); } Ok((read, addr)) => { + let recv_time = clock::time(); if args.verbose { trace!( - "Received {} bytes from {} socket from {}", + "Received {} bytes from {} socket from {} at {}", read, - if idx == 0 { "event" } else { "general" }, - addr + if idx == MSG_TYPE_EVENT { + "event" + } else { + "general" + }, + addr, + recv_time, ); } - stdinout_buffer[0..2].copy_from_slice(&(read as u16).to_be_bytes()); + + stdinout_buffer[0..2].copy_from_slice(&(read as u16 + 8).to_be_bytes()); stdinout_buffer[2] = idx; - stdinout_buffer[3..][..read].copy_from_slice(&socket_buffer[..read]); + stdinout_buffer[3..][..8].copy_from_slice(&recv_time.to_be_bytes()); + stdinout_buffer[11..][..read].copy_from_slice(&socket_buffer[..read]); poll.stdout() - .write_all(&stdinout_buffer[..(read + 3)]) + .write_all(&stdinout_buffer[..(read + 3 + 8)]) .context("Failed writing to stdout")?; } } @@ -236,19 +272,38 @@ fn run() -> Result<(), Error> { } let type_ = stdinout_buffer[2]; - if args.verbose { - trace!( - "Received {} bytes for {} socket from stdin", - size, - if type_ == 0 { "event" } else { "general" }, - ); - } - poll.stdin() .read_exact(&mut stdinout_buffer[0..size as usize]) .context("Failed reading packet body from stdin")?; - let buf = &stdinout_buffer[0..size as usize]; + if type_ != MSG_TYPE_EVENT && type_ != MSG_TYPE_GENERAL { + warn!("Unexpected stdin message type {}", type_); + continue; + } + + if args.verbose { + trace!( + "Received {} bytes for {} socket from stdin", + size, + if type_ == MSG_TYPE_EVENT { + "event" + } else { + "general" + }, + ); + } + + if size < 8 + 32 { + bail!("Invalid packet body size"); + } + + let main_send_time = read_u64_be(&stdinout_buffer[..8]); + let buf = &stdinout_buffer[8..size as usize]; + let message_type = buf[0] & 0x0f; + let domain_number = buf[4]; + let seqnum = read_u16_be(&buf[30..][..2]); + + let send_time = clock::time(); match type_ { MSG_TYPE_EVENT => poll .event_socket() @@ -261,9 +316,36 @@ fn run() -> Result<(), Error> { .with_context(|| { format!( "Failed sending to {} socket", - if type_ == 0 { "event" } else { "general" } + if type_ == MSG_TYPE_EVENT { + "event" + } else { + "general" + } ) })?; + + if args.verbose { + trace!( + "Sending SEND_TIME_ACK for message type {}, domain number {}, seqnum {} received at {} at {}", + message_type, + domain_number, + seqnum, + main_send_time, + send_time, + ); + } + + stdinout_buffer[0..2].copy_from_slice(&12u16.to_be_bytes()); + stdinout_buffer[2] = MSG_TYPE_SEND_TIME_ACK; + let send_time_ack_msg = &mut stdinout_buffer[3..]; + send_time_ack_msg[..8].copy_from_slice(&send_time.to_be_bytes()); + send_time_ack_msg[8] = message_type; + send_time_ack_msg[9] = domain_number; + send_time_ack_msg[10..][..2].copy_from_slice(&seqnum.to_be_bytes()); + + poll.stdout() + .write_all(&stdinout_buffer[..(3 + 12)]) + .context("Failed writing to stdout")?; } } } diff --git a/subprojects/gstreamer/libs/gst/net/gstptpclock.c b/subprojects/gstreamer/libs/gst/net/gstptpclock.c index ad632d11f8..a673c32b11 100644 --- a/subprojects/gstreamer/libs/gst/net/gstptpclock.c +++ b/subprojects/gstreamer/libs/gst/net/gstptpclock.c @@ -239,9 +239,10 @@ typedef struct typedef enum { - TYPE_EVENT = 0, /* PTP message is payload */ - TYPE_GENERAL = 1, /* PTP message is payload */ + TYPE_EVENT = 0, /* 64-bit monotonic clock time and PTP message is payload */ + TYPE_GENERAL = 1, /* 64-bit monotonic clock time and PTP message is payload */ TYPE_CLOCK_ID = 2, /* 64-bit clock ID is payload */ + TYPE_SEND_TIME_ACK = 3, /* 64-bit monotonic clock time, 8-bit message type, 8-bit domain number and 16-bit sequence number is payload */ } StdIOMessageType; /* 2 byte BE payload size plus 1 byte message type */ @@ -990,16 +991,21 @@ handle_announce_message (PtpMessage * msg, GstClockTime receive_time) static gboolean send_delay_req_timeout (PtpPendingSync * sync) { - guint8 message[STDIO_MESSAGE_HEADER_SIZE + 44] = { 0, }; + guint8 message[STDIO_MESSAGE_HEADER_SIZE + 8 + 44] = { 0, }; GstByteWriter writer; gsize written; GError *err = NULL; + GstClockTime send_time; GST_TRACE ("Sending delay_req to domain %u", sync->domain); + sync->delay_req_send_time_local = send_time = + gst_clock_get_time (observation_system_clock); + gst_byte_writer_init_with_data (&writer, message, sizeof (message), FALSE); - gst_byte_writer_put_uint16_be_unchecked (&writer, 44); + gst_byte_writer_put_uint16_be_unchecked (&writer, 8 + 44); gst_byte_writer_put_uint8_unchecked (&writer, TYPE_EVENT); + gst_byte_writer_put_uint64_be_unchecked (&writer, send_time); gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ); gst_byte_writer_put_uint8_unchecked (&writer, 2); gst_byte_writer_put_uint16_be_unchecked (&writer, 44); @@ -1017,9 +1023,6 @@ send_delay_req_timeout (PtpPendingSync * sync) gst_byte_writer_put_uint64_be_unchecked (&writer, 0); gst_byte_writer_put_uint16_be_unchecked (&writer, 0); - sync->delay_req_send_time_local = - gst_clock_get_time (observation_system_clock); - if (!g_output_stream_write_all (stdin_pipe, message, sizeof (message), &written, NULL, &err)) { if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) @@ -1835,6 +1838,74 @@ handle_ptp_message (PtpMessage * msg, GstClockTime receive_time) } } +static void +handle_send_time_ack (const guint8 * data, gsize size, + GstClockTime receive_time) +{ + GstByteReader breader; + GstClockTime helper_send_time; + guint8 message_type; + guint8 domain_number; + guint16 seqnum; + GList *l; + PtpDomainData *domain = NULL; + PtpPendingSync *sync = NULL; + + gst_byte_reader_init (&breader, data, size); + helper_send_time = gst_byte_reader_get_uint64_be_unchecked (&breader); + message_type = gst_byte_reader_get_uint8_unchecked (&breader); + domain_number = gst_byte_reader_get_uint8_unchecked (&breader); + seqnum = gst_byte_reader_get_uint16_be_unchecked (&breader); + + GST_TRACE + ("Received SEND_TIME_ACK for message type %d, domain number %d, seqnum %d with send time %" + GST_TIME_FORMAT " at receive_time %" GST_TIME_FORMAT, message_type, + domain_number, seqnum, GST_TIME_ARGS (helper_send_time), + GST_TIME_ARGS (receive_time)); + + if (message_type != PTP_MESSAGE_TYPE_DELAY_REQ) + return; + + for (l = domain_data; l; l = l->next) { + PtpDomainData *tmp = l->data; + + if (domain_number == tmp->domain) { + domain = tmp; + break; + } + } + + if (!domain) + return; + + /* Check if we know about this one */ + for (l = domain->pending_syncs.head; l; l = l->next) { + PtpPendingSync *tmp = l->data; + + if (tmp->delay_req_seqnum == seqnum) { + sync = tmp; + break; + } + } + + if (!sync) + return; + + /* Got a DELAY_RESP for this already */ + if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE) + return; + + if (helper_send_time != 0) { + GST_TRACE ("DELAY_REQ message took %" GST_STIME_FORMAT + " to helper process, SEND_TIME_ACK took %" GST_STIME_FORMAT + " from helper process", + GST_STIME_ARGS ((GstClockTimeDiff) (helper_send_time - + sync->delay_req_send_time_local)), + GST_STIME_ARGS (receive_time - helper_send_time)); + sync->delay_req_send_time_local = helper_send_time; + } +} + static void have_stdout_header (GInputStream * stdout_pipe, GAsyncResult * res, gpointer user_data); @@ -1870,11 +1941,20 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res, case TYPE_EVENT: case TYPE_GENERAL:{ GstClockTime receive_time = gst_clock_get_time (observation_system_clock); + GstClockTime helper_receive_time; PtpMessage msg; - if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer, + helper_receive_time = GST_READ_UINT64_BE (stdout_buffer); + + if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer + 8, CUR_STDIO_HEADER_SIZE)) { dump_ptp_message (&msg); + if (helper_receive_time != 0) { + GST_TRACE ("Message took %" GST_STIME_FORMAT " from helper process", + GST_STIME_ARGS ((GstClockTimeDiff) (receive_time - + helper_receive_time))); + receive_time = helper_receive_time; + } handle_ptp_message (&msg, receive_time); } break; @@ -1898,6 +1978,19 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res, g_mutex_unlock (&ptp_lock); break; } + case TYPE_SEND_TIME_ACK:{ + GstClockTime receive_time = gst_clock_get_time (observation_system_clock); + + if (CUR_STDIO_HEADER_SIZE != 12) { + GST_ERROR ("Unexpected send time ack size (%u != 12)", + CUR_STDIO_HEADER_SIZE); + g_main_loop_quit (main_loop); + return; + } + + handle_send_time_ack (stdout_buffer, CUR_STDIO_HEADER_SIZE, receive_time); + break; + } default: break; }