From 967aa2abca383a96728b8185446f3c2b2b46f4ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 23 Aug 2023 19:21:37 +0300 Subject: [PATCH] ptp: Listen with different sockets on individual interfaces This allows us to portably know on which interface a multicast packet arrived, and to send back any packets for that clock on the correct interface. Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/2728 Part-of: --- .../gstreamer/libs/gst/helpers/ptp/ffi.rs | 16 + .../gstreamer/libs/gst/helpers/ptp/io.rs | 465 ++++++++++++------ .../gstreamer/libs/gst/helpers/ptp/main.rs | 130 +++-- .../gstreamer/libs/gst/helpers/ptp/net.rs | 135 +++++ .../gstreamer/libs/gst/net/gstptpclock.c | 97 ++-- 5 files changed, 603 insertions(+), 240 deletions(-) diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs index 451f2dd48d..c452a1d7d2 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs @@ -41,6 +41,7 @@ pub mod unix { pub const POLLNVAL: c_short = 0x20; pub const IPPROTO_IP: c_int = 0; + #[cfg(any( target_os = "freebsd", target_os = "openbsd", @@ -54,6 +55,19 @@ pub mod unix { #[cfg(any(target_os = "solaris", target_os = "illumos"))] pub const IP_ADD_MEMBERSHIP: c_int = 19; + #[cfg(any( + target_os = "freebsd", + target_os = "openbsd", + target_os = "netbsd", + target_os = "dragonfly", + target_os = "macos", + ))] + pub const IP_MULTICAST_IF: c_int = 9; + #[cfg(target_os = "linux")] + pub const IP_MULTICAST_IF: c_int = 32; + #[cfg(any(target_os = "solaris", target_os = "illumos"))] + pub const IP_MULTICAST_IF: c_int = 16; + #[cfg(any( target_os = "solaris", target_os = "illumos", @@ -822,7 +836,9 @@ pub mod windows { } pub const IPPROTO_IP: u32 = 0u32; + pub const IP_ADD_MEMBERSHIP: u32 = 12u32; + pub const IP_MULTICAST_IF: u32 = 9u32; pub const SOL_SOCKET: u32 = 65535; pub const SO_REUSEADDR: u32 = 4; diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/io.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/io.rs index f42eaad07c..6474ee34ea 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/io.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/io.rs @@ -8,25 +8,59 @@ // // SPDX-License-Identifier: MPL-2.0 +use std::net::UdpSocket; + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum SocketType { + EventSocket, + GeneralSocket, +} + /// Result of polling the inputs of the `Poll`. /// -/// Any input that has data available for reading will be set to `true`, potentially multiple -/// at once. -/// /// Note that reading from the sockets is non-blocking but reading from stdin is blocking so /// special care has to be taken to only read as much as is available. -pub struct PollResult { - pub event_socket: bool, - pub general_socket: bool, - pub stdin: bool, +pub struct PollResult<'a> { + ready_sockets: &'a [(usize, SocketType, &'a UdpSocket)], + sockets: &'a [(UdpSocket, UdpSocket)], + stdin: Option<&'a Stdin>, + stdout: &'a Stdout, +} + +impl<'a> PollResult<'a> { + /// Returns the sockets that are currently ready for reading. + pub fn ready_sockets(&self) -> &[(usize, SocketType, &UdpSocket)] { + &self.ready_sockets + } + + /// Returns the event socket. + pub fn event_socket(&self, idx: usize) -> &UdpSocket { + &self.sockets[idx].0 + } + + /// Returns the general socket. + pub fn general_socket(&self, idx: usize) -> &UdpSocket { + &self.sockets[idx].1 + } + + /// Returns standard input if there is data to read. + pub fn stdin(&self) -> Option<&Stdin> { + self.stdin + } + + /// Returns standard output. + pub fn stdout(&self) -> &Stdout { + self.stdout + } } #[cfg(unix)] mod imp { - use super::PollResult; + use super::{PollResult, SocketType}; use std::{ io::{self, Read, Write}, + mem, net::UdpSocket, os::unix::io::{AsRawFd, RawFd}, }; @@ -37,10 +71,11 @@ mod imp { /// /// This carries the event/general UDP socket and stdin/stdout. pub struct Poll { - event_socket: UdpSocket, - general_socket: UdpSocket, + sockets: Vec<(UdpSocket, UdpSocket)>, stdin: Stdin, stdout: Stdout, + pollfd: Vec, + results_cache: Vec<(usize, SocketType, &'static UdpSocket)>, } #[cfg(test)] @@ -83,7 +118,7 @@ mod imp { } #[cfg(test)] - impl Read for Pipe { + impl<'a> Read for &'a Pipe { fn read(&mut self, buf: &mut [u8]) -> io::Result { // SAFETY: read() requires a valid fd and a mutable buffer with the given size. // The fd is valid by construction as is the buffer. @@ -122,25 +157,27 @@ mod imp { impl Poll { /// Name of the input based on the `struct pollfd` index. - fn fd_name(idx: usize) -> &'static str { + fn fd_name(idx: usize, len: usize) -> &'static str { match idx { - 0 => "event socket", - 1 => "general socket", - 2 => "stdin", + i if i == len - 1 => "stdin", + i if i % 2 == 0 => "event socket", + i if i % 2 == 1 => "general socket", _ => unreachable!(), } } - /// Create a new `Poll` instance from the two sockets. - pub fn new(event_socket: UdpSocket, general_socket: UdpSocket) -> Result { + /// Create a new `Poll` instance from the pairs of sockets. + pub fn new(sockets: Vec<(UdpSocket, UdpSocket)>) -> Result { let stdin = Stdin::acquire(); let stdout = Stdout::acquire(); + let n_sockets = sockets.len(); Ok(Self { - event_socket, - general_socket, + sockets, stdin, stdout, + pollfd: Vec::with_capacity(n_sockets * 2 + 1), + results_cache: Vec::with_capacity(n_sockets * 2), }) } @@ -157,57 +194,63 @@ mod imp { Ok(( Self { - event_socket, - general_socket, + sockets: vec![(event_socket, general_socket)], stdin: Stdin(stdin.read), stdout: Stdout(stdout.write), + pollfd: Vec::with_capacity(3), + results_cache: Vec::with_capacity(2), }, stdin, stdout, )) } - /// Mutable reference to the event socket. - pub fn event_socket(&mut self) -> &mut UdpSocket { - &mut self.event_socket + /// Reference to the event socket. + #[allow(unused)] + pub fn event_socket(&self, iface: usize) -> &UdpSocket { + &self.sockets[iface].0 } - /// Mutable reference to the general socket. - pub fn general_socket(&mut self) -> &mut UdpSocket { - &mut self.general_socket + /// Reference to the general socket. + #[allow(unused)] + pub fn general_socket(&self, iface: usize) -> &UdpSocket { + &self.sockets[iface].1 } - /// Mutable reference to stdin for reading. - pub fn stdin(&mut self) -> &mut Stdin { - &mut self.stdin + /// Reference to stdin for reading. + #[allow(unused)] + pub fn stdin(&self) -> &Stdin { + &self.stdin } - /// Mutable reference to stdout for writing. - pub fn stdout(&mut self) -> &mut Stdout { - &mut self.stdout + /// Reference to stdout for writing. + pub fn stdout(&self) -> &Stdout { + &self.stdout } /// Poll the event socket, general socket and stdin for available data to read. /// /// This blocks until at least one input has data available. - pub fn poll(&mut self) -> Result { - let mut pollfd = [ - pollfd { - fd: self.event_socket.as_raw_fd(), + pub fn poll<'a>(&'a mut self) -> Result, Error> { + self.pollfd.clear(); + for (event_socket, general_socket) in &self.sockets { + self.pollfd.push(pollfd { + fd: event_socket.as_raw_fd(), events: POLLIN, revents: 0, - }, - pollfd { - fd: self.general_socket.as_raw_fd(), + }); + self.pollfd.push(pollfd { + fd: general_socket.as_raw_fd(), events: POLLIN, revents: 0, - }, - pollfd { - fd: self.stdin.0, - events: POLLIN, - revents: 0, - }, - ]; + }); + } + + self.pollfd.push(pollfd { + fd: self.stdin.0, + events: POLLIN, + revents: 0, + }); // SAFETY: Polls the given pollfds above and requires a valid number to be passed. // A negative timeout means that it will wait until at least one of the pollfds is @@ -219,7 +262,7 @@ mod imp { // On EINTR polling should be retried. unsafe { loop { - let res = poll(pollfd[..].as_mut_ptr(), pollfd.len() as _, -1); + let res = poll(self.pollfd[..].as_mut_ptr(), self.pollfd.len() as _, -1); if res == -1 { let err = std::io::Error::last_os_error(); if err.kind() == std::io::ErrorKind::Interrupted { @@ -233,20 +276,60 @@ mod imp { } // Check for errors or hangup first - for (idx, pfd) in pollfd.iter().enumerate() { + for (idx, pfd) in self.pollfd.iter().enumerate() { if pfd.revents & (POLLERR | POLLNVAL) != 0 { - bail!("Poll error on {}", Self::fd_name(idx)); + bail!( + "Poll error on {} for interface {}", + Self::fd_name(idx, self.pollfd.len()), + idx / 2 + ); } if pfd.revents & POLLHUP != 0 { - bail!("Hang up during polling on {}", Self::fd_name(idx)); + bail!( + "Hang up during polling on {} for interface {}", + Self::fd_name(idx, self.pollfd.len()), + idx / 2 + ); + } + } + + self.results_cache.clear(); + // SAFETY: References have the same memory representation independent of lifetime + let ready_sockets = unsafe { + mem::transmute::< + &mut Vec<(usize, SocketType, &'static UdpSocket)>, + &mut Vec<(usize, SocketType, &'a UdpSocket)>, + >(&mut self.results_cache) + }; + + for (idx, pfd) in self.pollfd.iter().enumerate() { + if pfd.revents & POLLIN != 0 { + if idx == self.pollfd.len() - 1 { + break; + } + if idx % 2 == 0 { + ready_sockets.push(( + idx / 2, + SocketType::EventSocket, + &self.sockets[idx / 2].0, + )); + } else { + ready_sockets.push(( + idx / 2, + SocketType::GeneralSocket, + &self.sockets[idx / 2].1, + )); + } } } Ok(PollResult { - event_socket: pollfd[0].revents & POLLIN != 0, - general_socket: pollfd[1].revents & POLLIN != 0, - stdin: pollfd[2].revents & POLLIN != 0, + ready_sockets: &*ready_sockets, + sockets: &self.sockets, + stdin: (self.pollfd[self.pollfd.len() - 1].revents & POLLIN != 0) + .then_some(&self.stdin), + stdout: &self.stdout, }) } } @@ -263,6 +346,12 @@ mod imp { } impl Read for Stdin { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + <&Stdin as Read>::read(&mut &*self, buf) + } + } + + impl<'a> Read for &'a Stdin { fn read(&mut self, buf: &mut [u8]) -> io::Result { // SAFETY: read() requires a valid fd and a mutable buffer with the given size. // The fd is valid by construction as is the buffer. @@ -290,6 +379,16 @@ mod imp { } impl Write for Stdout { + fn write(&mut self, buf: &[u8]) -> io::Result { + <&Stdout as Write>::write(&mut &*self, buf) + } + + fn flush(&mut self) -> io::Result<()> { + <&Stdout as Write>::flush(&mut &*self) + } + } + + impl<'a> Write for &Stdout { fn write(&mut self, buf: &[u8]) -> io::Result { // SAFETY: write() requires a valid fd and a mutable buffer with the given size. // The fd is valid by construction as is the buffer. @@ -326,6 +425,16 @@ mod imp { } impl Write for Stderr { + fn write(&mut self, buf: &[u8]) -> io::Result { + <&Stderr as Write>::write(&mut &*self, buf) + } + + fn flush(&mut self) -> io::Result<()> { + <&Stderr as Write>::flush(&mut &*self) + } + } + + impl<'a> Write for &'a Stderr { fn write(&mut self, buf: &[u8]) -> io::Result { // SAFETY: write() requires a valid fd and a mutable buffer with the given size. // The fd is valid by construction as is the buffer. @@ -348,7 +457,7 @@ mod imp { #[cfg(windows)] mod imp { - use super::PollResult; + use super::{PollResult, SocketType}; use std::{ cmp, @@ -371,12 +480,12 @@ mod imp { /// /// This carries the event/general UDP socket and stdin/stdout. pub struct Poll { - event_socket: UdpSocket, - event_socket_event: EventHandle, - general_socket: UdpSocket, - general_socket_event: EventHandle, + sockets: Vec<(UdpSocket, UdpSocket)>, + events: Vec<(EventHandle, EventHandle)>, stdin: Stdin, stdout: Stdout, + handles: Vec, + results_cache: Vec<(usize, SocketType, &'static UdpSocket)>, } /// Helper struct for a WSA event. @@ -456,7 +565,7 @@ mod imp { } #[cfg(test)] - impl Read for Pipe { + impl<'a> Read for &'a Pipe { fn read(&mut self, buf: &mut [u8]) -> io::Result { // SAFETY: Reads the given number of bytes into the buffer from the stdin handle. unsafe { @@ -510,57 +619,70 @@ mod imp { impl Poll { /// Internal constructor. pub fn new_internal( - event_socket: UdpSocket, - general_socket: UdpSocket, + sockets: Vec<(UdpSocket, UdpSocket)>, stdin: Stdin, stdout: Stdout, ) -> Result { // Create event objects for the readability of the sockets. - let event_socket_event = EventHandle::new().context("Failed creating WSA event")?; - let general_socket_event = EventHandle::new().context("Failed creating WSA event")?; + let events = sockets + .iter() + .map(|(_event_socket, _general_socket)| -> Result<_, Error> { + Ok(( + EventHandle::new().context("Failed creating WSA event")?, + EventHandle::new().context("Failed creating WSA event")?, + )) + }) + .collect::, Error>>()?; - // SAFETY: WSAEventSelect() requires a valid socket and WSA event, which are both - // passed here, and the bitflag of events that should be selected for. - // - // On error a non-zero value is returned. - unsafe { - if WSAEventSelect(event_socket.as_raw_socket(), event_socket_event.0, FD_READ) != 0 - { - bail!( - source: io::Error::from_raw_os_error(WSAGetLastError()), - "Failed selecting for read events on event socket" - ); - } + for ((event_socket, general_socket), (event_socket_event, general_socket_event)) in + Iterator::zip(sockets.iter(), events.iter()) + { + // SAFETY: WSAEventSelect() requires a valid socket and WSA event, which are both + // passed here, and the bitflag of events that should be selected for. + // + // On error a non-zero value is returned. + unsafe { + if WSAEventSelect(event_socket.as_raw_socket(), event_socket_event.0, FD_READ) + != 0 + { + bail!( + source: io::Error::from_raw_os_error(WSAGetLastError()), + "Failed selecting for read events on event socket" + ); + } - if WSAEventSelect( - general_socket.as_raw_socket(), - general_socket_event.0, - FD_READ, - ) != 0 - { - bail!( - source: io::Error::from_raw_os_error(WSAGetLastError()), - "Failed selecting for read events on general socket" - ); + if WSAEventSelect( + general_socket.as_raw_socket(), + general_socket_event.0, + FD_READ, + ) != 0 + { + bail!( + source: io::Error::from_raw_os_error(WSAGetLastError()), + "Failed selecting for read events on general socket" + ); + } } } + let n_sockets = sockets.len(); + Ok(Self { - event_socket, - event_socket_event, - general_socket, - general_socket_event, + sockets, + events, stdin, stdout, + handles: Vec::with_capacity(n_sockets * 2 + 1), + results_cache: Vec::with_capacity(1), }) } /// Create a new `Poll` instance from the two sockets. - pub fn new(event_socket: UdpSocket, general_socket: UdpSocket) -> Result { + pub fn new(sockets: Vec<(UdpSocket, UdpSocket)>) -> Result { let stdin = Stdin::acquire().context("Failure acquiring stdin handle")?; let stdout = Stdout::acquire().context("Failed acquiring stdout handle")?; - Self::new_internal(event_socket, general_socket, stdin, stdout) + Self::new_internal(sockets, stdin, stdout) } #[cfg(test)] @@ -579,45 +701,53 @@ mod imp { let stdout = Stdout::from_handle(stdout_pipe.write).context("Failed acquiring stdout handle")?; - let poll = Self::new_internal(event_socket, general_socket, stdin, stdout)?; + let poll = Self::new_internal(vec![(event_socket, general_socket)], stdin, stdout)?; Ok((poll, stdin_pipe, stdout_pipe)) } - /// Mutable reference to the event socket. - pub fn event_socket(&mut self) -> &mut UdpSocket { - &mut self.event_socket + /// Reference to the event socket. + #[allow(unused)] + pub fn event_socket(&self, iface: usize) -> &UdpSocket { + &self.sockets[iface].0 } - /// Mutable reference to the general socket. - pub fn general_socket(&mut self) -> &mut UdpSocket { - &mut self.general_socket + /// Reference to the general socket. + #[allow(unused)] + pub fn general_socket(&self, iface: usize) -> &UdpSocket { + &self.sockets[iface].1 } - /// Mutable reference to stdin for reading. - pub fn stdin(&mut self) -> &mut Stdin { - &mut self.stdin + /// Reference to stdin for reading. + #[allow(unused)] + pub fn stdin(&self) -> &Stdin { + &self.stdin } - /// Mutable reference to stdout for writing. - pub fn stdout(&mut self) -> &mut Stdout { - &mut self.stdout + /// Reference to stdout for writing. + pub fn stdout(&self) -> &Stdout { + &self.stdout } /// Poll the event socket, general socket and stdin for available data to read. /// /// This blocks until at least one input has data available. - pub fn poll(&mut self) -> Result { - let handles = [ - self.event_socket_event.0, - self.general_socket_event.0, + pub fn poll<'a>(&'a mut self) -> Result, Error> { + self.handles.clear(); + + for (event_socket_event, general_socket_event) in &self.events { + self.handles.push(event_socket_event.0); + self.handles.push(general_socket_event.0); + } + + self.handles.push( // If stdin is a pipe then we use the signalling event, otherwise stdin itself. if let Some(ref thread_state) = self.stdin.thread_state { thread_state.event } else { self.stdin.handle }, - ]; + ); // If stdin is a pipe and currently no data is pending on it then signal // the reading thread to try reading one byte and blocking for that long. @@ -641,8 +771,12 @@ mod imp { // On error u32::MAX is returned, otherwise an index into the array of handles is // returned for the handle that became ready. let res = unsafe { - let res = - WaitForMultipleObjects(handles.len() as _, handles[..].as_ptr(), 0, u32::MAX); + let res = WaitForMultipleObjects( + self.handles.len() as _, + self.handles[..].as_ptr(), + 0, + u32::MAX, + ); if res == u32::MAX { bail!( source: io::Error::from_raw_os_error(WSAGetLastError()), @@ -651,21 +785,30 @@ mod imp { } assert!( - (0..=2).contains(&res), + (0..self.handles.len()).contains(&(res as usize)), "Unexpected WaitForMultipleObjects() return value {}", res, ); - res + res as usize + }; + + self.results_cache.clear(); + // SAFETY: References have the same memory representation independent of lifetime + let ready_sockets = unsafe { + mem::transmute::< + &mut Vec<(usize, SocketType, &'static UdpSocket)>, + &mut Vec<(usize, SocketType, &'a UdpSocket)>, + >(&mut self.results_cache) }; // For the sockets, enumerate the events that woke up the waiting, collect any errors // and reset the event objects. - if (0..=1).contains(&res) { - let (socket, event) = if res == 0 { - (&self.event_socket, &self.event_socket_event) + if res < self.handles.len() - 1 { + let (socket, event) = if res % 2 == 0 { + (&self.sockets[res / 2].0, &self.events[res / 2].0) } else { - (&self.general_socket, &self.general_socket_event) + (&self.sockets[res / 2].1, &self.events[res / 2].1) }; // SAFETY: Requires a valid socket and event, which is given by construction here. @@ -681,8 +824,9 @@ mod imp { { bail!( source: io::Error::from_raw_os_error(WSAGetLastError()), - "Failed enumerating network events on {} socket", - if res == 0 { "event" } else { "general" }, + "Failed enumerating network events on {} socket for interface {}", + if res % 2 == 0 { "event" } else { "general" }, + res / 2, ); } @@ -692,8 +836,9 @@ mod imp { if networkevents.ierrorcode[FD_READ_BIT] != 0 { bail!( source: io::Error::from_raw_os_error(networkevents.ierrorcode[FD_READ_BIT]), - "Error on {} socket while waiting for events", + "Error on {} socket for interface {} while waiting for events", if res == 0 { "event" } else { "general" }, + res / 2, ); } @@ -707,12 +852,22 @@ mod imp { res ); } + ready_sockets.push(( + res / 2, + if res % 2 == 0 { + SocketType::EventSocket + } else { + SocketType::GeneralSocket + }, + socket, + )); } Ok(PollResult { - event_socket: res == 0, - general_socket: res == 1, - stdin: res == 2, + ready_sockets: &*ready_sockets, + sockets: &self.sockets, + stdin: (res == self.handles.len() - 1).then_some(&self.stdin), + stdout: &self.stdout, }) } } @@ -918,6 +1073,12 @@ mod imp { } impl Read for Stdin { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + <&Stdin as Read>::read(&mut &*self, buf) + } + } + + impl<'a> Read for &'a Stdin { fn read(&mut self, mut buf: &mut [u8]) -> io::Result { if buf.is_empty() { return Ok(0); @@ -926,7 +1087,7 @@ mod imp { // If a read byte is pending from the readiness signalling thread then // read that first here before reading any remaining data. let mut already_read = 0; - if let Some(ref mut thread_state) = self.thread_state { + if let Some(ref thread_state) = self.thread_state { let mut guard = thread_state.buffer.lock().unwrap(); assert!(!guard.fill_buffer); if guard.buffer_filled { @@ -1013,6 +1174,16 @@ mod imp { } impl Write for Stdout { + fn write(&mut self, buf: &[u8]) -> io::Result { + <&Stdout as Write>::write(&mut &*self, buf) + } + + fn flush(&mut self) -> io::Result<()> { + <&Stdout as Write>::flush(&mut &*self) + } + } + + impl<'a> Write for &'a Stdout { fn write(&mut self, buf: &[u8]) -> io::Result { // SAFETY: Writes the given number of bytes to stdout or at most u32::MAX. On error // zero is returned, otherwise the number of bytes written is set accordingly and @@ -1107,6 +1278,16 @@ mod imp { } impl Write for Stderr { + fn write(&mut self, buf: &[u8]) -> io::Result { + <&Stderr as Write>::write(&mut &*self, buf) + } + + fn flush(&mut self) -> io::Result<()> { + <&Stderr as Write>::flush(&mut &*self) + } + } + + impl<'a> Write for &'a Stderr { fn write(&mut self, buf: &[u8]) -> io::Result { if self.0 == INVALID_HANDLE_VALUE { return Ok(buf.len()); @@ -1145,6 +1326,8 @@ pub use self::imp::{Poll, Stderr, Stdin, Stdout}; mod test { #[test] fn test_poll() { + use super::{Poll, SocketType}; + use std::io::prelude::*; let event_socket = std::net::UdpSocket::bind(std::net::SocketAddr::from(( @@ -1167,8 +1350,7 @@ mod test { ))) .unwrap(); - let (mut poll, mut stdin, _stdout) = - super::Poll::new_test(event_socket, general_socket).unwrap(); + let (mut poll, mut stdin, _stdout) = Poll::new_test(event_socket, general_socket).unwrap(); let mut buf = [0u8; 4]; @@ -1177,29 +1359,32 @@ mod test { .send_to(&[1, 2, 3, 4], (std::net::Ipv4Addr::LOCALHOST, event_port)) .unwrap(); let res = poll.poll().unwrap(); - assert!(res.event_socket); - assert!(!res.general_socket); - assert!(!res.stdin); - assert_eq!(poll.event_socket().recv(&mut buf).unwrap(), 4); + assert_eq!(res.ready_sockets().len(), 1); + assert_eq!(res.ready_sockets()[0].0, 0); + assert_eq!(res.ready_sockets()[0].1, SocketType::EventSocket); + assert_eq!(res.ready_sockets()[0].2.recv(&mut buf).unwrap(), 4); assert_eq!(buf, [1, 2, 3, 4]); send_socket .send_to(&[1, 2, 3, 4], (std::net::Ipv4Addr::LOCALHOST, general_port)) .unwrap(); let res = poll.poll().unwrap(); - assert!(!res.event_socket); - assert!(res.general_socket); - assert!(!res.stdin); - assert_eq!(poll.general_socket().recv(&mut buf).unwrap(), 4); + + assert_eq!(res.ready_sockets().len(), 1); + assert_eq!(res.ready_sockets()[0].0, 0); + assert_eq!(res.ready_sockets()[0].1, SocketType::GeneralSocket); + assert_eq!(res.ready_sockets()[0].2.recv(&mut buf).unwrap(), 4); assert_eq!(buf, [1, 2, 3, 4]); stdin.write_all(&[1, 2, 3, 4]).unwrap(); let res = poll.poll().unwrap(); - assert!(!res.event_socket); - assert!(!res.general_socket); - assert!(res.stdin); - poll.stdin().read_exact(&mut buf).unwrap(); - assert_eq!(buf, [1, 2, 3, 4]); + assert!(res.ready_sockets().is_empty()); + { + let mut stdin = res.stdin(); + let stdin = stdin.as_mut().unwrap(); + stdin.read_exact(&mut buf).unwrap(); + assert_eq!(buf, [1, 2, 3, 4]); + } } drop(poll); diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs index 8ce182345f..e568ed4962 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs @@ -73,12 +73,8 @@ fn create_socket(port: u16) -> Result { Ok(socket) } -/// Join the multicast groups for PTP on the configured interfaces. -fn join_multicast( - args: &args::Args, - event_socket: &UdpSocket, - general_socket: &UdpSocket, -) -> Result<[u8; 8], Error> { +/// Retrieve the list of interfaces based on the available ones and the arguments. +fn list_interfaces(args: &args::Args) -> Result, Error> { let mut ifaces = net::query_interfaces().context("Failed to query network interfaces")?; if ifaces.is_empty() { bail!("No suitable network interfaces for PTP found"); @@ -115,15 +111,28 @@ fn join_multicast( } } + Ok(ifaces) +} + +fn run() -> Result<(), Error> { + let args = args::parse_args().context("Failed parsing commandline parameters")?; + + let ifaces = list_interfaces(&args).context("Failed listing interfaces")?; + + let mut sockets = vec![]; for iface in &ifaces { info!("Binding to interface {}", iface.name); - } - for socket in [&event_socket, &general_socket].iter() { - for iface in &ifaces { + let event_socket = create_socket(PTP_EVENT_PORT).context("Failed creating event socket")?; + let general_socket = + create_socket(PTP_GENERAL_PORT).context("Failed creating general socket")?; + + for socket in [&event_socket, &general_socket] { net::join_multicast_v4(socket, &PTP_MULTICAST_ADDR, iface) .context("Failed to join multicast group")?; } + + sockets.push((event_socket, general_socket)); } let clock_id = if args.clock_id == 0 { @@ -140,27 +149,13 @@ fn join_multicast( } else { args.clock_id.to_be_bytes() }; - info!("Using clock ID {:?}", clock_id); - Ok(clock_id) -} - -fn run() -> Result<(), Error> { - let args = args::parse_args().context("Failed parsing commandline parameters")?; - - let event_socket = create_socket(PTP_EVENT_PORT).context("Failed creating event socket")?; - let general_socket = - create_socket(PTP_GENERAL_PORT).context("Failed creating general socket")?; - thread::set_priority().context("Failed to set thread priority")?; privileges::drop().context("Failed dropping privileges")?; - let clock_id = join_multicast(&args, &event_socket, &general_socket) - .context("Failed joining multicast groups")?; - - let mut poll = io::Poll::new(event_socket, general_socket).context("Failed creating poller")?; + let mut poll = io::Poll::new(sockets).context("Failed creating poller")?; // Write clock ID first { @@ -184,27 +179,20 @@ fn run() -> Result<(), Error> { // We assume that stdout never blocks and stdin receives a complete valid packet whenever it is // ready and never blocks in the middle of a packet. let mut socket_buffer = [0u8; 8192]; - let mut stdinout_buffer = [0u8; 8192 + 3 + 8]; + let mut stdinout_buffer = [0u8; 8192 + 4 + 8]; loop { let poll_res = poll.poll().context("Failed polling")?; // If any of the sockets are ready, continue reading packets from them until no more // packets are left and directly forward them to stdout. - 'next_socket: for idx in [poll_res.event_socket, poll_res.general_socket] - .iter() - .enumerate() - .filter_map(|(idx, r)| if *r { Some(idx) } else { None }) - { - let idx = idx as u8; + 'next_socket: for (idx, type_, socket) in poll_res.ready_sockets() { + let idx = *idx; + let type_ = *type_; // Read all available packets from the socket before going to the next socket. 'next_packet: loop { - let res = match idx { - MSG_TYPE_EVENT => poll.event_socket().recv_from(&mut socket_buffer), - MSG_TYPE_GENERAL => poll.general_socket().recv_from(&mut socket_buffer), - _ => unreachable!(), - }; + let res = socket.recv_from(&mut socket_buffer); let (read, addr) = match res { Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { @@ -213,12 +201,7 @@ fn run() -> Result<(), Error> { Err(err) => { bail!( source: err, - "Failed reading from {} socket", - if idx == MSG_TYPE_EVENT { - "event" - } else { - "general" - } + "Failed reading from {:?} socket for interface {}", type_, idx, ); } Ok((read, addr)) => (read, addr), @@ -227,13 +210,10 @@ fn run() -> Result<(), Error> { let recv_time = clock::time(); if args.verbose { trace!( - "Received {} bytes from {} socket from {} at {}", + "Received {} bytes from {:?} socket for interface {} from {} at {}", read, - if idx == MSG_TYPE_EVENT { - "event" - } else { - "general" - }, + type_, + idx, addr, recv_time, ); @@ -275,18 +255,25 @@ fn run() -> Result<(), Error> { } { - let mut buf = &mut stdinout_buffer[..(read + 3 + 8)]; - buf.write_u16be(read as u16 + 8) + let mut buf = &mut stdinout_buffer[..(read + 4 + 8)]; + buf.write_u16be(read as u16 + 1 + 8) .expect("Too small stdout buffer"); - buf.write_u8(idx).expect("Too small stdout buffer"); + buf.write_u8(if type_ == io::SocketType::EventSocket { + MSG_TYPE_EVENT + } else { + MSG_TYPE_GENERAL + }) + .expect("Too small stdout buffer"); + buf.write_u8(idx as u8).expect("Too small stdout buffer"); buf.write_u64be(recv_time).expect("Too small stdout buffer"); buf.write_all(&socket_buffer[..read]) .expect("Too small stdout buffer"); assert!(buf.is_empty(), "Too big stdout buffer",); } - let buf = &stdinout_buffer[..(read + 3 + 8)]; - poll.stdout() + let buf = &stdinout_buffer[..(read + 4 + 8)]; + poll_res + .stdout() .write_all(buf) .context("Failed writing to stdout")?; } @@ -294,8 +281,8 @@ fn run() -> Result<(), Error> { // After handling the sockets check if a packet is available on stdin, read it and forward // it to the corresponding socket. - if poll_res.stdin { - poll.stdin() + if let Some(ref mut stdin) = poll_res.stdin() { + stdin .read_exact(&mut stdinout_buffer[0..3]) .context("Failed reading packet header from stdin")?; @@ -305,7 +292,7 @@ fn run() -> Result<(), Error> { } let type_ = stdinout_buffer[2]; - poll.stdin() + stdin .read_exact(&mut stdinout_buffer[0..size as usize]) .context("Failed reading packet body from stdin")?; @@ -314,23 +301,31 @@ fn run() -> Result<(), Error> { continue; } + if size < 1 + 8 + 34 { + bail!("Invalid packet body size"); + } + + let buf = &mut &stdinout_buffer[..(size as usize)]; + + let idx = buf.read_u8().expect("Too small stdin buffer"); + if idx as usize >= ifaces.len() { + warn!("Unexpected stdin message interface index {}", idx); + continue; + } + if args.verbose { trace!( - "Received {} bytes for {} socket from stdin", + "Received {} bytes for {} socket for interface {} from stdin", size, if type_ == MSG_TYPE_EVENT { "event" } else { "general" }, + idx, ); } - if size < 8 + 34 { - bail!("Invalid packet body size"); - } - - let buf = &mut &stdinout_buffer[..(size as usize)]; let main_send_time = buf.read_u64be().expect("Too small stdin buffer"); // We require that the main process only ever sends valid PTP messages with the clock @@ -347,11 +342,11 @@ fn run() -> Result<(), Error> { let send_time = clock::time(); match type_ { - MSG_TYPE_EVENT => poll - .event_socket() + MSG_TYPE_EVENT => poll_res + .event_socket(idx as usize) .send_to(buf, (PTP_MULTICAST_ADDR, PTP_EVENT_PORT)), - MSG_TYPE_GENERAL => poll - .general_socket() + MSG_TYPE_GENERAL => poll_res + .general_socket(idx as usize) .send_to(buf, (PTP_MULTICAST_ADDR, PTP_GENERAL_PORT)), _ => unreachable!(), } @@ -393,7 +388,8 @@ fn run() -> Result<(), Error> { } let buf = &stdinout_buffer[..(3 + 12)]; - poll.stdout() + poll_res + .stdout() .write_all(buf) .context("Failed writing to stdout")?; } diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs index 4f83e85c27..8f04988405 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs @@ -399,6 +399,93 @@ mod imp { } } + #[cfg(not(any( + target_os = "openbsd", + target_os = "dragonfly", + target_os = "netbsd", + target_os = "macos" + )))] + { + let mreqn = ip_mreqn { + imr_multiaddr: in_addr { + s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()), + }, + imr_address: in_addr { + s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()), + }, + imr_ifindex: iface.index as _, + }; + + // SAFETY: Requires a valid ip_mreq or ip_mreqn struct to be passed together + // with its size for checking which of the two it is. On errors a negative + // integer is returned. + unsafe { + if setsockopt( + socket.as_raw_fd(), + IPPROTO_IP, + IP_MULTICAST_IF, + &mreqn as *const _ as *const _, + mem::size_of_val(&mreqn) as _, + ) < 0 + { + bail!( + source: io::Error::last_os_error(), + "Failed joining multicast group for interface {}", + iface.name, + ); + } + } + } + #[cfg(any(target_os = "openbsd", target_os = "dragonfly", target_os = "macos"))] + { + let addr = in_addr { + s_addr: u32::from_ne_bytes(iface.ip_addr.octets()), + }; + + // SAFETY: Requires a valid in_addr struct to be passed together with its size for + // checking which of the two it is. On errors a negative integer is returned. + unsafe { + if setsockopt( + socket.as_raw_fd(), + IPPROTO_IP, + IP_MULTICAST_IF, + &addr as *const _ as *const _, + mem::size_of_val(&addr) as _, + ) < 0 + { + bail!( + source: io::Error::last_os_error(), + "Failed joining multicast group for interface {}", + iface.name, + ); + } + } + } + #[cfg(target_os = "netbsd")] + { + let idx = (iface.index as u32).to_be(); + + // SAFETY: Requires a valid in_addr struct or interface index in network byte order + // to be passed together with its size for checking which of the two it is. On + // errors a negative integer is returned. + unsafe { + if setsockopt( + socket.as_raw_fd(), + IPPROTO_IP, + IP_MULTICAST_IF, + &idx as *const _ as *const _, + mem::size_of_val(&idx) as _, + ) < 0 + { + bail!( + source: io::Error::last_os_error(), + "Failed joining multicast group for interface {}", + iface.name, + ); + } + } + } + Ok(()) } #[cfg(any(target_os = "solaris", target_os = "illumos"))] @@ -414,6 +501,29 @@ mod imp { ) })?; + let addr = in_addr { + s_addr: u32::from_ne_bytes(iface.ip_addr.octets()), + }; + + // SAFETY: Requires a valid in_addr struct to be passed together with its size for + // checking which of the two it is. On errors a negative integer is returned. + unsafe { + if setsockopt( + socket.as_raw_fd(), + IPPROTO_IP, + IP_MULTICAST_IF, + &addr as *const _ as *const _, + mem::size_of_val(&addr) as _, + ) < 0 + { + bail!( + source: io::Error::last_os_error(), + "Failed setting multicast interface {}", + iface.name, + ); + } + } + Ok(()) } } @@ -838,6 +948,31 @@ mod imp { } } + let addr = IN_ADDR { + S_un: IN_ADDR_0 { + S_addr: u32::from_ne_bytes(Ipv4Addr::new(0, 0, 0, iface.index as u8).octets()), + }, + }; + + // SAFETY: Requires a valid IN_ADDR struct to be passed together with its size for checking + // which of the two it is. On errors a negative integer is returned. + unsafe { + if setsockopt( + socket.as_raw_socket(), + IPPROTO_IP as i32, + IP_MULTICAST_IF as i32, + &addr as *const _ as *const _, + mem::size_of_val(&addr) as _, + ) < 0 + { + bail!( + source: io::Error::last_os_error(), + "Failed joining multicast group for interface {}", + iface.name, + ); + } + } + Ok(()) } diff --git a/subprojects/gstreamer/libs/gst/net/gstptpclock.c b/subprojects/gstreamer/libs/gst/net/gstptpclock.c index b9f1ecfeb9..b166954343 100644 --- a/subprojects/gstreamer/libs/gst/net/gstptpclock.c +++ b/subprojects/gstreamer/libs/gst/net/gstptpclock.c @@ -239,8 +239,8 @@ typedef struct typedef enum { - TYPE_EVENT = 0, /* 64-bit monotonic clock time and PTP message is payload */ - TYPE_GENERAL = 1, /* 64-bit monotonic clock time and PTP message is payload */ + TYPE_EVENT = 0, /* 8-bit interface index, 64-bit monotonic clock time and PTP message is payload */ + TYPE_GENERAL = 1, /* 8-bit interface index, 64-bit monotonic clock time and PTP message is payload */ TYPE_CLOCK_ID = 2, /* 64-bit clock ID is payload */ TYPE_SEND_TIME_ACK = 3, /* 64-bit monotonic clock time, 8-bit message type, 8-bit domain number and 16-bit sequence number is payload */ } StdIOMessageType; @@ -292,6 +292,7 @@ typedef struct GstClockTime receive_time; PtpClockIdentity master_clock_identity; + guint8 iface_idx; guint8 grandmaster_priority_1; PtpClockQuality grandmaster_clock_quality; @@ -306,6 +307,7 @@ typedef struct typedef struct { PtpClockIdentity master_clock_identity; + guint8 iface_idx; GstClockTime announce_interval; /* last interval we received */ GQueue announce_messages; @@ -322,6 +324,7 @@ typedef struct GstClockTime follow_up_recv_time_local; GSource *timeout_source; + guint8 iface_idx; guint16 delay_req_seqnum; GstClockTime delay_req_send_time_local; /* t3, -1 if we wait for FOLLOW_UP */ GstClockTime delay_req_recv_time_remote; /* t4, -1 if we wait */ @@ -355,6 +358,7 @@ typedef struct /* Last selected master clock */ gboolean have_master_clock; PtpClockIdentity master_clock_identity; + guint8 iface_idx; guint64 grandmaster_identity; /* Last SYNC or FOLLOW_UP timestamp we received */ @@ -722,8 +726,11 @@ compare_announce_message (const PtpAnnounceMessage * a, else if (a->master_clock_identity.port_number > b->master_clock_identity.port_number) return 1; - else - g_assert_not_reached (); + + if (a->iface_idx < b->iface_idx) + return -1; + else if (a->iface_idx > b->iface_idx) + return 1; return 0; } @@ -819,13 +826,15 @@ select_best_master_clock (PtpDomainData * domain, GstClockTime now) if (domain->have_master_clock && compare_clock_identity (&domain->master_clock_identity, - &best->master_clock_identity) == 0) { + &best->master_clock_identity) == 0 + && domain->iface_idx == best->iface_idx) { GST_DEBUG ("Master clock in domain %u did not change", domain->domain); } else { GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER - "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x", - domain->domain, best->master_clock_identity.clock_identity, - best->master_clock_identity.port_number, best->grandmaster_identity); + "x %u on interface %u with grandmaster clock 0x%016" G_GINT64_MODIFIER + "x", domain->domain, best->master_clock_identity.clock_identity, + best->master_clock_identity.port_number, best->iface_idx, + best->grandmaster_identity); domain->have_master_clock = TRUE; domain->grandmaster_identity = best->grandmaster_identity; @@ -833,9 +842,11 @@ select_best_master_clock (PtpDomainData * domain, GstClockTime now) /* Opportunistic master clock selection likely gave us the same master * clock before, no need to reset all statistics */ if (compare_clock_identity (&domain->master_clock_identity, - &best->master_clock_identity) != 0) { + &best->master_clock_identity) != 0 + || domain->iface_idx != best->iface_idx) { memcpy (&domain->master_clock_identity, &best->master_clock_identity, sizeof (PtpClockIdentity)); + domain->iface_idx = best->iface_idx; domain->mean_path_delay = 0; domain->last_delay_req = 0; domain->last_path_delays_missing = 9; @@ -865,7 +876,8 @@ select_best_master_clock (PtpDomainData * domain, GstClockTime now) } static void -handle_announce_message (PtpMessage * msg, GstClockTime receive_time) +handle_announce_message (PtpMessage * msg, guint8 iface_idx, + GstClockTime receive_time) { GList *l; PtpDomainData *domain = NULL; @@ -925,7 +937,7 @@ handle_announce_message (PtpMessage * msg, GstClockTime receive_time) PtpAnnounceSender *tmp = l->data; if (compare_clock_identity (&tmp->master_clock_identity, - &msg->source_port_identity) == 0) { + &msg->source_port_identity) == 0 && tmp->iface_idx == iface_idx) { sender = tmp; break; } @@ -936,6 +948,7 @@ handle_announce_message (PtpMessage * msg, GstClockTime receive_time) memcpy (&sender->master_clock_identity, &msg->source_port_identity, sizeof (PtpClockIdentity)); + sender->iface_idx = iface_idx; g_queue_init (&sender->announce_messages); domain->announce_senders = g_list_prepend (domain->announce_senders, sender); @@ -968,6 +981,7 @@ handle_announce_message (PtpMessage * msg, GstClockTime receive_time) announce->sequence_id = msg->sequence_id; memcpy (&announce->master_clock_identity, &msg->source_port_identity, sizeof (PtpClockIdentity)); + announce->iface_idx = iface_idx; announce->grandmaster_identity = msg->message_specific.announce.grandmaster_identity; announce->grandmaster_priority_1 = @@ -991,7 +1005,7 @@ handle_announce_message (PtpMessage * msg, GstClockTime receive_time) static gboolean send_delay_req_timeout (PtpPendingSync * sync) { - guint8 message[STDIO_MESSAGE_HEADER_SIZE + 8 + 44] = { 0, }; + guint8 message[STDIO_MESSAGE_HEADER_SIZE + 1 + 8 + 44] = { 0, }; GstByteWriter writer; gsize written; GError *err = NULL; @@ -1003,8 +1017,9 @@ send_delay_req_timeout (PtpPendingSync * sync) gst_clock_get_time (observation_system_clock); gst_byte_writer_init_with_data (&writer, message, sizeof (message), FALSE); - gst_byte_writer_put_uint16_be_unchecked (&writer, 8 + 44); + gst_byte_writer_put_uint16_be_unchecked (&writer, 1 + 8 + 44); gst_byte_writer_put_uint8_unchecked (&writer, TYPE_EVENT); + gst_byte_writer_put_uint8_unchecked (&writer, sync->iface_idx); gst_byte_writer_put_uint64_be_unchecked (&writer, send_time); gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ); gst_byte_writer_put_uint8_unchecked (&writer, 2); @@ -1058,6 +1073,7 @@ send_delay_req (PtpDomainData * domain, PtpPendingSync * sync) } domain->last_delay_req = now; + sync->iface_idx = domain->iface_idx; sync->delay_req_seqnum = domain->last_delay_req_seqnum++; /* IEEE 1588 9.5.11.2 */ @@ -1481,7 +1497,8 @@ out: } static void -handle_sync_message (PtpMessage * msg, GstClockTime receive_time) +handle_sync_message (PtpMessage * msg, guint8 iface_idx, + GstClockTime receive_time) { GList *l; PtpDomainData *domain = NULL; @@ -1523,15 +1540,20 @@ handle_sync_message (PtpMessage * msg, GstClockTime receive_time) /* If we have a master clock, ignore this message if it's not coming from there */ if (domain->have_master_clock - && compare_clock_identity (&domain->master_clock_identity, - &msg->source_port_identity) != 0) + && (compare_clock_identity (&domain->master_clock_identity, + &msg->source_port_identity) != 0 + || domain->iface_idx != iface_idx)) { + GST_TRACE ("SYNC msg not from current clock master. Ignoring"); return; + } #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION /* Opportunistic selection of master clock */ - if (!domain->have_master_clock) + if (!domain->have_master_clock) { memcpy (&domain->master_clock_identity, &msg->source_port_identity, sizeof (PtpClockIdentity)); + domain->iface_idx = iface_idx; + } #else if (!domain->have_master_clock) return; @@ -1613,7 +1635,8 @@ handle_sync_message (PtpMessage * msg, GstClockTime receive_time) } static void -handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time) +handle_follow_up_message (PtpMessage * msg, guint8 iface_idx, + GstClockTime receive_time) { GList *l; PtpDomainData *domain = NULL; @@ -1643,8 +1666,9 @@ handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time) /* If we have a master clock, ignore this message if it's not coming from there */ if (domain->have_master_clock - && compare_clock_identity (&domain->master_clock_identity, - &msg->source_port_identity) != 0) { + && (compare_clock_identity (&domain->master_clock_identity, + &msg->source_port_identity) != 0 + || domain->iface_idx != iface_idx)) { GST_TRACE ("FOLLOW_UP msg not from current clock master. Ignoring"); return; } @@ -1709,7 +1733,8 @@ handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time) } static void -handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time) +handle_delay_resp_message (PtpMessage * msg, guint8 iface_idx, + GstClockTime receive_time) { GList *l; PtpDomainData *domain = NULL; @@ -1740,9 +1765,12 @@ handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time) /* If we have a master clock, ignore this message if it's not coming from there */ if (domain->have_master_clock - && compare_clock_identity (&domain->master_clock_identity, - &msg->source_port_identity) != 0) + && (compare_clock_identity (&domain->master_clock_identity, + &msg->source_port_identity) != 0 + || domain->iface_idx != iface_idx)) { + GST_TRACE ("DELAY_RESP msg not from current clock master. Ignoring"); return; + } if (msg->log_message_interval == 0x7f) { domain->min_delay_req_interval = GST_SECOND; @@ -1809,7 +1837,8 @@ handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time) } static void -handle_ptp_message (PtpMessage * msg, GstClockTime receive_time) +handle_ptp_message (PtpMessage * msg, guint8 iface_idx, + GstClockTime receive_time) { /* Ignore our own messages */ if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity && @@ -1818,20 +1847,20 @@ handle_ptp_message (PtpMessage * msg, GstClockTime receive_time) return; } - GST_TRACE ("Message type %d receive_time %" GST_TIME_FORMAT, - msg->message_type, GST_TIME_ARGS (receive_time)); + GST_TRACE ("Message type %d iface idx %d receive_time %" GST_TIME_FORMAT, + msg->message_type, iface_idx, GST_TIME_ARGS (receive_time)); switch (msg->message_type) { case PTP_MESSAGE_TYPE_ANNOUNCE: - handle_announce_message (msg, receive_time); + handle_announce_message (msg, iface_idx, receive_time); break; case PTP_MESSAGE_TYPE_SYNC: - handle_sync_message (msg, receive_time); + handle_sync_message (msg, iface_idx, receive_time); break; case PTP_MESSAGE_TYPE_FOLLOW_UP: - handle_follow_up_message (msg, receive_time); + handle_follow_up_message (msg, iface_idx, receive_time); break; case PTP_MESSAGE_TYPE_DELAY_RESP: - handle_delay_resp_message (msg, receive_time); + handle_delay_resp_message (msg, iface_idx, receive_time); break; default: break; @@ -1941,12 +1970,14 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res, case TYPE_EVENT: case TYPE_GENERAL:{ GstClockTime receive_time = gst_clock_get_time (observation_system_clock); + guint8 iface_idx; GstClockTime helper_receive_time; PtpMessage msg; - helper_receive_time = GST_READ_UINT64_BE (stdout_buffer); + iface_idx = GST_READ_UINT8 (stdout_buffer); + helper_receive_time = GST_READ_UINT64_BE (stdout_buffer + 1); - if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer + 8, + if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer + 1 + 8, CUR_STDIO_HEADER_SIZE)) { dump_ptp_message (&msg); if (helper_receive_time != 0) { @@ -1955,7 +1986,7 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res, helper_receive_time))); receive_time = helper_receive_time; } - handle_ptp_message (&msg, receive_time); + handle_ptp_message (&msg, iface_idx, receive_time); } break; }