ptp: Listen with different sockets on individual interfaces

This allows us to portably know on which interface a multicast packet
arrived, and to send back any packets for that clock on the correct
interface.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/2728

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/5232>
This commit is contained in:
Sebastian Dröge 2023-08-23 19:21:37 +03:00 committed by Sebastian Dröge
parent c161eb973d
commit 967aa2abca
5 changed files with 603 additions and 240 deletions

View file

@ -41,6 +41,7 @@ pub mod unix {
pub const POLLNVAL: c_short = 0x20; pub const POLLNVAL: c_short = 0x20;
pub const IPPROTO_IP: c_int = 0; pub const IPPROTO_IP: c_int = 0;
#[cfg(any( #[cfg(any(
target_os = "freebsd", target_os = "freebsd",
target_os = "openbsd", target_os = "openbsd",
@ -54,6 +55,19 @@ pub mod unix {
#[cfg(any(target_os = "solaris", target_os = "illumos"))] #[cfg(any(target_os = "solaris", target_os = "illumos"))]
pub const IP_ADD_MEMBERSHIP: c_int = 19; pub const IP_ADD_MEMBERSHIP: c_int = 19;
#[cfg(any(
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly",
target_os = "macos",
))]
pub const IP_MULTICAST_IF: c_int = 9;
#[cfg(target_os = "linux")]
pub const IP_MULTICAST_IF: c_int = 32;
#[cfg(any(target_os = "solaris", target_os = "illumos"))]
pub const IP_MULTICAST_IF: c_int = 16;
#[cfg(any( #[cfg(any(
target_os = "solaris", target_os = "solaris",
target_os = "illumos", target_os = "illumos",
@ -822,7 +836,9 @@ pub mod windows {
} }
pub const IPPROTO_IP: u32 = 0u32; pub const IPPROTO_IP: u32 = 0u32;
pub const IP_ADD_MEMBERSHIP: u32 = 12u32; pub const IP_ADD_MEMBERSHIP: u32 = 12u32;
pub const IP_MULTICAST_IF: u32 = 9u32;
pub const SOL_SOCKET: u32 = 65535; pub const SOL_SOCKET: u32 = 65535;
pub const SO_REUSEADDR: u32 = 4; pub const SO_REUSEADDR: u32 = 4;

View file

@ -8,25 +8,59 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use std::net::UdpSocket;
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum SocketType {
EventSocket,
GeneralSocket,
}
/// Result of polling the inputs of the `Poll`. /// Result of polling the inputs of the `Poll`.
/// ///
/// Any input that has data available for reading will be set to `true`, potentially multiple
/// at once.
///
/// Note that reading from the sockets is non-blocking but reading from stdin is blocking so /// Note that reading from the sockets is non-blocking but reading from stdin is blocking so
/// special care has to be taken to only read as much as is available. /// special care has to be taken to only read as much as is available.
pub struct PollResult { pub struct PollResult<'a> {
pub event_socket: bool, ready_sockets: &'a [(usize, SocketType, &'a UdpSocket)],
pub general_socket: bool, sockets: &'a [(UdpSocket, UdpSocket)],
pub stdin: bool, stdin: Option<&'a Stdin>,
stdout: &'a Stdout,
}
impl<'a> PollResult<'a> {
/// Returns the sockets that are currently ready for reading.
pub fn ready_sockets(&self) -> &[(usize, SocketType, &UdpSocket)] {
&self.ready_sockets
}
/// Returns the event socket.
pub fn event_socket(&self, idx: usize) -> &UdpSocket {
&self.sockets[idx].0
}
/// Returns the general socket.
pub fn general_socket(&self, idx: usize) -> &UdpSocket {
&self.sockets[idx].1
}
/// Returns standard input if there is data to read.
pub fn stdin(&self) -> Option<&Stdin> {
self.stdin
}
/// Returns standard output.
pub fn stdout(&self) -> &Stdout {
self.stdout
}
} }
#[cfg(unix)] #[cfg(unix)]
mod imp { mod imp {
use super::PollResult; use super::{PollResult, SocketType};
use std::{ use std::{
io::{self, Read, Write}, io::{self, Read, Write},
mem,
net::UdpSocket, net::UdpSocket,
os::unix::io::{AsRawFd, RawFd}, os::unix::io::{AsRawFd, RawFd},
}; };
@ -37,10 +71,11 @@ mod imp {
/// ///
/// This carries the event/general UDP socket and stdin/stdout. /// This carries the event/general UDP socket and stdin/stdout.
pub struct Poll { pub struct Poll {
event_socket: UdpSocket, sockets: Vec<(UdpSocket, UdpSocket)>,
general_socket: UdpSocket,
stdin: Stdin, stdin: Stdin,
stdout: Stdout, stdout: Stdout,
pollfd: Vec<pollfd>,
results_cache: Vec<(usize, SocketType, &'static UdpSocket)>,
} }
#[cfg(test)] #[cfg(test)]
@ -83,7 +118,7 @@ mod imp {
} }
#[cfg(test)] #[cfg(test)]
impl Read for Pipe { impl<'a> Read for &'a Pipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// SAFETY: read() requires a valid fd and a mutable buffer with the given size. // SAFETY: read() requires a valid fd and a mutable buffer with the given size.
// The fd is valid by construction as is the buffer. // The fd is valid by construction as is the buffer.
@ -122,25 +157,27 @@ mod imp {
impl Poll { impl Poll {
/// Name of the input based on the `struct pollfd` index. /// Name of the input based on the `struct pollfd` index.
fn fd_name(idx: usize) -> &'static str { fn fd_name(idx: usize, len: usize) -> &'static str {
match idx { match idx {
0 => "event socket", i if i == len - 1 => "stdin",
1 => "general socket", i if i % 2 == 0 => "event socket",
2 => "stdin", i if i % 2 == 1 => "general socket",
_ => unreachable!(), _ => unreachable!(),
} }
} }
/// Create a new `Poll` instance from the two sockets. /// Create a new `Poll` instance from the pairs of sockets.
pub fn new(event_socket: UdpSocket, general_socket: UdpSocket) -> Result<Self, Error> { pub fn new(sockets: Vec<(UdpSocket, UdpSocket)>) -> Result<Self, Error> {
let stdin = Stdin::acquire(); let stdin = Stdin::acquire();
let stdout = Stdout::acquire(); let stdout = Stdout::acquire();
let n_sockets = sockets.len();
Ok(Self { Ok(Self {
event_socket, sockets,
general_socket,
stdin, stdin,
stdout, stdout,
pollfd: Vec::with_capacity(n_sockets * 2 + 1),
results_cache: Vec::with_capacity(n_sockets * 2),
}) })
} }
@ -157,57 +194,63 @@ mod imp {
Ok(( Ok((
Self { Self {
event_socket, sockets: vec![(event_socket, general_socket)],
general_socket,
stdin: Stdin(stdin.read), stdin: Stdin(stdin.read),
stdout: Stdout(stdout.write), stdout: Stdout(stdout.write),
pollfd: Vec::with_capacity(3),
results_cache: Vec::with_capacity(2),
}, },
stdin, stdin,
stdout, stdout,
)) ))
} }
/// Mutable reference to the event socket. /// Reference to the event socket.
pub fn event_socket(&mut self) -> &mut UdpSocket { #[allow(unused)]
&mut self.event_socket pub fn event_socket(&self, iface: usize) -> &UdpSocket {
&self.sockets[iface].0
} }
/// Mutable reference to the general socket. /// Reference to the general socket.
pub fn general_socket(&mut self) -> &mut UdpSocket { #[allow(unused)]
&mut self.general_socket pub fn general_socket(&self, iface: usize) -> &UdpSocket {
&self.sockets[iface].1
} }
/// Mutable reference to stdin for reading. /// Reference to stdin for reading.
pub fn stdin(&mut self) -> &mut Stdin { #[allow(unused)]
&mut self.stdin pub fn stdin(&self) -> &Stdin {
&self.stdin
} }
/// Mutable reference to stdout for writing. /// Reference to stdout for writing.
pub fn stdout(&mut self) -> &mut Stdout { pub fn stdout(&self) -> &Stdout {
&mut self.stdout &self.stdout
} }
/// Poll the event socket, general socket and stdin for available data to read. /// Poll the event socket, general socket and stdin for available data to read.
/// ///
/// This blocks until at least one input has data available. /// This blocks until at least one input has data available.
pub fn poll(&mut self) -> Result<PollResult, Error> { pub fn poll<'a>(&'a mut self) -> Result<PollResult<'a>, Error> {
let mut pollfd = [ self.pollfd.clear();
pollfd { for (event_socket, general_socket) in &self.sockets {
fd: self.event_socket.as_raw_fd(), self.pollfd.push(pollfd {
fd: event_socket.as_raw_fd(),
events: POLLIN, events: POLLIN,
revents: 0, revents: 0,
}, });
pollfd { self.pollfd.push(pollfd {
fd: self.general_socket.as_raw_fd(), fd: general_socket.as_raw_fd(),
events: POLLIN, events: POLLIN,
revents: 0, revents: 0,
}, });
pollfd { }
self.pollfd.push(pollfd {
fd: self.stdin.0, fd: self.stdin.0,
events: POLLIN, events: POLLIN,
revents: 0, revents: 0,
}, });
];
// SAFETY: Polls the given pollfds above and requires a valid number to be passed. // SAFETY: Polls the given pollfds above and requires a valid number to be passed.
// A negative timeout means that it will wait until at least one of the pollfds is // A negative timeout means that it will wait until at least one of the pollfds is
@ -219,7 +262,7 @@ mod imp {
// On EINTR polling should be retried. // On EINTR polling should be retried.
unsafe { unsafe {
loop { loop {
let res = poll(pollfd[..].as_mut_ptr(), pollfd.len() as _, -1); let res = poll(self.pollfd[..].as_mut_ptr(), self.pollfd.len() as _, -1);
if res == -1 { if res == -1 {
let err = std::io::Error::last_os_error(); let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::Interrupted { if err.kind() == std::io::ErrorKind::Interrupted {
@ -233,20 +276,60 @@ mod imp {
} }
// Check for errors or hangup first // Check for errors or hangup first
for (idx, pfd) in pollfd.iter().enumerate() { for (idx, pfd) in self.pollfd.iter().enumerate() {
if pfd.revents & (POLLERR | POLLNVAL) != 0 { if pfd.revents & (POLLERR | POLLNVAL) != 0 {
bail!("Poll error on {}", Self::fd_name(idx)); bail!(
"Poll error on {} for interface {}",
Self::fd_name(idx, self.pollfd.len()),
idx / 2
);
} }
if pfd.revents & POLLHUP != 0 { if pfd.revents & POLLHUP != 0 {
bail!("Hang up during polling on {}", Self::fd_name(idx)); bail!(
"Hang up during polling on {} for interface {}",
Self::fd_name(idx, self.pollfd.len()),
idx / 2
);
}
}
self.results_cache.clear();
// SAFETY: References have the same memory representation independent of lifetime
let ready_sockets = unsafe {
mem::transmute::<
&mut Vec<(usize, SocketType, &'static UdpSocket)>,
&mut Vec<(usize, SocketType, &'a UdpSocket)>,
>(&mut self.results_cache)
};
for (idx, pfd) in self.pollfd.iter().enumerate() {
if pfd.revents & POLLIN != 0 {
if idx == self.pollfd.len() - 1 {
break;
}
if idx % 2 == 0 {
ready_sockets.push((
idx / 2,
SocketType::EventSocket,
&self.sockets[idx / 2].0,
));
} else {
ready_sockets.push((
idx / 2,
SocketType::GeneralSocket,
&self.sockets[idx / 2].1,
));
}
} }
} }
Ok(PollResult { Ok(PollResult {
event_socket: pollfd[0].revents & POLLIN != 0, ready_sockets: &*ready_sockets,
general_socket: pollfd[1].revents & POLLIN != 0, sockets: &self.sockets,
stdin: pollfd[2].revents & POLLIN != 0, stdin: (self.pollfd[self.pollfd.len() - 1].revents & POLLIN != 0)
.then_some(&self.stdin),
stdout: &self.stdout,
}) })
} }
} }
@ -263,6 +346,12 @@ mod imp {
} }
impl Read for Stdin { impl Read for Stdin {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
<&Stdin as Read>::read(&mut &*self, buf)
}
}
impl<'a> Read for &'a Stdin {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// SAFETY: read() requires a valid fd and a mutable buffer with the given size. // SAFETY: read() requires a valid fd and a mutable buffer with the given size.
// The fd is valid by construction as is the buffer. // The fd is valid by construction as is the buffer.
@ -290,6 +379,16 @@ mod imp {
} }
impl Write for Stdout { impl Write for Stdout {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
<&Stdout as Write>::write(&mut &*self, buf)
}
fn flush(&mut self) -> io::Result<()> {
<&Stdout as Write>::flush(&mut &*self)
}
}
impl<'a> Write for &Stdout {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// SAFETY: write() requires a valid fd and a mutable buffer with the given size. // SAFETY: write() requires a valid fd and a mutable buffer with the given size.
// The fd is valid by construction as is the buffer. // The fd is valid by construction as is the buffer.
@ -326,6 +425,16 @@ mod imp {
} }
impl Write for Stderr { impl Write for Stderr {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
<&Stderr as Write>::write(&mut &*self, buf)
}
fn flush(&mut self) -> io::Result<()> {
<&Stderr as Write>::flush(&mut &*self)
}
}
impl<'a> Write for &'a Stderr {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// SAFETY: write() requires a valid fd and a mutable buffer with the given size. // SAFETY: write() requires a valid fd and a mutable buffer with the given size.
// The fd is valid by construction as is the buffer. // The fd is valid by construction as is the buffer.
@ -348,7 +457,7 @@ mod imp {
#[cfg(windows)] #[cfg(windows)]
mod imp { mod imp {
use super::PollResult; use super::{PollResult, SocketType};
use std::{ use std::{
cmp, cmp,
@ -371,12 +480,12 @@ mod imp {
/// ///
/// This carries the event/general UDP socket and stdin/stdout. /// This carries the event/general UDP socket and stdin/stdout.
pub struct Poll { pub struct Poll {
event_socket: UdpSocket, sockets: Vec<(UdpSocket, UdpSocket)>,
event_socket_event: EventHandle, events: Vec<(EventHandle, EventHandle)>,
general_socket: UdpSocket,
general_socket_event: EventHandle,
stdin: Stdin, stdin: Stdin,
stdout: Stdout, stdout: Stdout,
handles: Vec<HANDLE>,
results_cache: Vec<(usize, SocketType, &'static UdpSocket)>,
} }
/// Helper struct for a WSA event. /// Helper struct for a WSA event.
@ -456,7 +565,7 @@ mod imp {
} }
#[cfg(test)] #[cfg(test)]
impl Read for Pipe { impl<'a> Read for &'a Pipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// SAFETY: Reads the given number of bytes into the buffer from the stdin handle. // SAFETY: Reads the given number of bytes into the buffer from the stdin handle.
unsafe { unsafe {
@ -510,21 +619,31 @@ mod imp {
impl Poll { impl Poll {
/// Internal constructor. /// Internal constructor.
pub fn new_internal( pub fn new_internal(
event_socket: UdpSocket, sockets: Vec<(UdpSocket, UdpSocket)>,
general_socket: UdpSocket,
stdin: Stdin, stdin: Stdin,
stdout: Stdout, stdout: Stdout,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
// Create event objects for the readability of the sockets. // Create event objects for the readability of the sockets.
let event_socket_event = EventHandle::new().context("Failed creating WSA event")?; let events = sockets
let general_socket_event = EventHandle::new().context("Failed creating WSA event")?; .iter()
.map(|(_event_socket, _general_socket)| -> Result<_, Error> {
Ok((
EventHandle::new().context("Failed creating WSA event")?,
EventHandle::new().context("Failed creating WSA event")?,
))
})
.collect::<Result<Vec<_>, Error>>()?;
for ((event_socket, general_socket), (event_socket_event, general_socket_event)) in
Iterator::zip(sockets.iter(), events.iter())
{
// SAFETY: WSAEventSelect() requires a valid socket and WSA event, which are both // SAFETY: WSAEventSelect() requires a valid socket and WSA event, which are both
// passed here, and the bitflag of events that should be selected for. // passed here, and the bitflag of events that should be selected for.
// //
// On error a non-zero value is returned. // On error a non-zero value is returned.
unsafe { unsafe {
if WSAEventSelect(event_socket.as_raw_socket(), event_socket_event.0, FD_READ) != 0 if WSAEventSelect(event_socket.as_raw_socket(), event_socket_event.0, FD_READ)
!= 0
{ {
bail!( bail!(
source: io::Error::from_raw_os_error(WSAGetLastError()), source: io::Error::from_raw_os_error(WSAGetLastError()),
@ -544,23 +663,26 @@ mod imp {
); );
} }
} }
}
let n_sockets = sockets.len();
Ok(Self { Ok(Self {
event_socket, sockets,
event_socket_event, events,
general_socket,
general_socket_event,
stdin, stdin,
stdout, stdout,
handles: Vec::with_capacity(n_sockets * 2 + 1),
results_cache: Vec::with_capacity(1),
}) })
} }
/// Create a new `Poll` instance from the two sockets. /// Create a new `Poll` instance from the two sockets.
pub fn new(event_socket: UdpSocket, general_socket: UdpSocket) -> Result<Self, Error> { pub fn new(sockets: Vec<(UdpSocket, UdpSocket)>) -> Result<Self, Error> {
let stdin = Stdin::acquire().context("Failure acquiring stdin handle")?; let stdin = Stdin::acquire().context("Failure acquiring stdin handle")?;
let stdout = Stdout::acquire().context("Failed acquiring stdout handle")?; let stdout = Stdout::acquire().context("Failed acquiring stdout handle")?;
Self::new_internal(event_socket, general_socket, stdin, stdout) Self::new_internal(sockets, stdin, stdout)
} }
#[cfg(test)] #[cfg(test)]
@ -579,45 +701,53 @@ mod imp {
let stdout = let stdout =
Stdout::from_handle(stdout_pipe.write).context("Failed acquiring stdout handle")?; Stdout::from_handle(stdout_pipe.write).context("Failed acquiring stdout handle")?;
let poll = Self::new_internal(event_socket, general_socket, stdin, stdout)?; let poll = Self::new_internal(vec![(event_socket, general_socket)], stdin, stdout)?;
Ok((poll, stdin_pipe, stdout_pipe)) Ok((poll, stdin_pipe, stdout_pipe))
} }
/// Mutable reference to the event socket. /// Reference to the event socket.
pub fn event_socket(&mut self) -> &mut UdpSocket { #[allow(unused)]
&mut self.event_socket pub fn event_socket(&self, iface: usize) -> &UdpSocket {
&self.sockets[iface].0
} }
/// Mutable reference to the general socket. /// Reference to the general socket.
pub fn general_socket(&mut self) -> &mut UdpSocket { #[allow(unused)]
&mut self.general_socket pub fn general_socket(&self, iface: usize) -> &UdpSocket {
&self.sockets[iface].1
} }
/// Mutable reference to stdin for reading. /// Reference to stdin for reading.
pub fn stdin(&mut self) -> &mut Stdin { #[allow(unused)]
&mut self.stdin pub fn stdin(&self) -> &Stdin {
&self.stdin
} }
/// Mutable reference to stdout for writing. /// Reference to stdout for writing.
pub fn stdout(&mut self) -> &mut Stdout { pub fn stdout(&self) -> &Stdout {
&mut self.stdout &self.stdout
} }
/// Poll the event socket, general socket and stdin for available data to read. /// Poll the event socket, general socket and stdin for available data to read.
/// ///
/// This blocks until at least one input has data available. /// This blocks until at least one input has data available.
pub fn poll(&mut self) -> Result<PollResult, Error> { pub fn poll<'a>(&'a mut self) -> Result<PollResult<'a>, Error> {
let handles = [ self.handles.clear();
self.event_socket_event.0,
self.general_socket_event.0, for (event_socket_event, general_socket_event) in &self.events {
self.handles.push(event_socket_event.0);
self.handles.push(general_socket_event.0);
}
self.handles.push(
// If stdin is a pipe then we use the signalling event, otherwise stdin itself. // If stdin is a pipe then we use the signalling event, otherwise stdin itself.
if let Some(ref thread_state) = self.stdin.thread_state { if let Some(ref thread_state) = self.stdin.thread_state {
thread_state.event thread_state.event
} else { } else {
self.stdin.handle self.stdin.handle
}, },
]; );
// If stdin is a pipe and currently no data is pending on it then signal // If stdin is a pipe and currently no data is pending on it then signal
// the reading thread to try reading one byte and blocking for that long. // the reading thread to try reading one byte and blocking for that long.
@ -641,8 +771,12 @@ mod imp {
// On error u32::MAX is returned, otherwise an index into the array of handles is // On error u32::MAX is returned, otherwise an index into the array of handles is
// returned for the handle that became ready. // returned for the handle that became ready.
let res = unsafe { let res = unsafe {
let res = let res = WaitForMultipleObjects(
WaitForMultipleObjects(handles.len() as _, handles[..].as_ptr(), 0, u32::MAX); self.handles.len() as _,
self.handles[..].as_ptr(),
0,
u32::MAX,
);
if res == u32::MAX { if res == u32::MAX {
bail!( bail!(
source: io::Error::from_raw_os_error(WSAGetLastError()), source: io::Error::from_raw_os_error(WSAGetLastError()),
@ -651,21 +785,30 @@ mod imp {
} }
assert!( assert!(
(0..=2).contains(&res), (0..self.handles.len()).contains(&(res as usize)),
"Unexpected WaitForMultipleObjects() return value {}", "Unexpected WaitForMultipleObjects() return value {}",
res, res,
); );
res res as usize
};
self.results_cache.clear();
// SAFETY: References have the same memory representation independent of lifetime
let ready_sockets = unsafe {
mem::transmute::<
&mut Vec<(usize, SocketType, &'static UdpSocket)>,
&mut Vec<(usize, SocketType, &'a UdpSocket)>,
>(&mut self.results_cache)
}; };
// For the sockets, enumerate the events that woke up the waiting, collect any errors // For the sockets, enumerate the events that woke up the waiting, collect any errors
// and reset the event objects. // and reset the event objects.
if (0..=1).contains(&res) { if res < self.handles.len() - 1 {
let (socket, event) = if res == 0 { let (socket, event) = if res % 2 == 0 {
(&self.event_socket, &self.event_socket_event) (&self.sockets[res / 2].0, &self.events[res / 2].0)
} else { } else {
(&self.general_socket, &self.general_socket_event) (&self.sockets[res / 2].1, &self.events[res / 2].1)
}; };
// SAFETY: Requires a valid socket and event, which is given by construction here. // SAFETY: Requires a valid socket and event, which is given by construction here.
@ -681,8 +824,9 @@ mod imp {
{ {
bail!( bail!(
source: io::Error::from_raw_os_error(WSAGetLastError()), source: io::Error::from_raw_os_error(WSAGetLastError()),
"Failed enumerating network events on {} socket", "Failed enumerating network events on {} socket for interface {}",
if res == 0 { "event" } else { "general" }, if res % 2 == 0 { "event" } else { "general" },
res / 2,
); );
} }
@ -692,8 +836,9 @@ mod imp {
if networkevents.ierrorcode[FD_READ_BIT] != 0 { if networkevents.ierrorcode[FD_READ_BIT] != 0 {
bail!( bail!(
source: io::Error::from_raw_os_error(networkevents.ierrorcode[FD_READ_BIT]), source: io::Error::from_raw_os_error(networkevents.ierrorcode[FD_READ_BIT]),
"Error on {} socket while waiting for events", "Error on {} socket for interface {} while waiting for events",
if res == 0 { "event" } else { "general" }, if res == 0 { "event" } else { "general" },
res / 2,
); );
} }
@ -707,12 +852,22 @@ mod imp {
res res
); );
} }
ready_sockets.push((
res / 2,
if res % 2 == 0 {
SocketType::EventSocket
} else {
SocketType::GeneralSocket
},
socket,
));
} }
Ok(PollResult { Ok(PollResult {
event_socket: res == 0, ready_sockets: &*ready_sockets,
general_socket: res == 1, sockets: &self.sockets,
stdin: res == 2, stdin: (res == self.handles.len() - 1).then_some(&self.stdin),
stdout: &self.stdout,
}) })
} }
} }
@ -918,6 +1073,12 @@ mod imp {
} }
impl Read for Stdin { impl Read for Stdin {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
<&Stdin as Read>::read(&mut &*self, buf)
}
}
impl<'a> Read for &'a Stdin {
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() { if buf.is_empty() {
return Ok(0); return Ok(0);
@ -926,7 +1087,7 @@ mod imp {
// If a read byte is pending from the readiness signalling thread then // If a read byte is pending from the readiness signalling thread then
// read that first here before reading any remaining data. // read that first here before reading any remaining data.
let mut already_read = 0; let mut already_read = 0;
if let Some(ref mut thread_state) = self.thread_state { if let Some(ref thread_state) = self.thread_state {
let mut guard = thread_state.buffer.lock().unwrap(); let mut guard = thread_state.buffer.lock().unwrap();
assert!(!guard.fill_buffer); assert!(!guard.fill_buffer);
if guard.buffer_filled { if guard.buffer_filled {
@ -1013,6 +1174,16 @@ mod imp {
} }
impl Write for Stdout { impl Write for Stdout {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
<&Stdout as Write>::write(&mut &*self, buf)
}
fn flush(&mut self) -> io::Result<()> {
<&Stdout as Write>::flush(&mut &*self)
}
}
impl<'a> Write for &'a Stdout {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// SAFETY: Writes the given number of bytes to stdout or at most u32::MAX. On error // SAFETY: Writes the given number of bytes to stdout or at most u32::MAX. On error
// zero is returned, otherwise the number of bytes written is set accordingly and // zero is returned, otherwise the number of bytes written is set accordingly and
@ -1107,6 +1278,16 @@ mod imp {
} }
impl Write for Stderr { impl Write for Stderr {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
<&Stderr as Write>::write(&mut &*self, buf)
}
fn flush(&mut self) -> io::Result<()> {
<&Stderr as Write>::flush(&mut &*self)
}
}
impl<'a> Write for &'a Stderr {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.0 == INVALID_HANDLE_VALUE { if self.0 == INVALID_HANDLE_VALUE {
return Ok(buf.len()); return Ok(buf.len());
@ -1145,6 +1326,8 @@ pub use self::imp::{Poll, Stderr, Stdin, Stdout};
mod test { mod test {
#[test] #[test]
fn test_poll() { fn test_poll() {
use super::{Poll, SocketType};
use std::io::prelude::*; use std::io::prelude::*;
let event_socket = std::net::UdpSocket::bind(std::net::SocketAddr::from(( let event_socket = std::net::UdpSocket::bind(std::net::SocketAddr::from((
@ -1167,8 +1350,7 @@ mod test {
))) )))
.unwrap(); .unwrap();
let (mut poll, mut stdin, _stdout) = let (mut poll, mut stdin, _stdout) = Poll::new_test(event_socket, general_socket).unwrap();
super::Poll::new_test(event_socket, general_socket).unwrap();
let mut buf = [0u8; 4]; let mut buf = [0u8; 4];
@ -1177,30 +1359,33 @@ mod test {
.send_to(&[1, 2, 3, 4], (std::net::Ipv4Addr::LOCALHOST, event_port)) .send_to(&[1, 2, 3, 4], (std::net::Ipv4Addr::LOCALHOST, event_port))
.unwrap(); .unwrap();
let res = poll.poll().unwrap(); let res = poll.poll().unwrap();
assert!(res.event_socket); assert_eq!(res.ready_sockets().len(), 1);
assert!(!res.general_socket); assert_eq!(res.ready_sockets()[0].0, 0);
assert!(!res.stdin); assert_eq!(res.ready_sockets()[0].1, SocketType::EventSocket);
assert_eq!(poll.event_socket().recv(&mut buf).unwrap(), 4); assert_eq!(res.ready_sockets()[0].2.recv(&mut buf).unwrap(), 4);
assert_eq!(buf, [1, 2, 3, 4]); assert_eq!(buf, [1, 2, 3, 4]);
send_socket send_socket
.send_to(&[1, 2, 3, 4], (std::net::Ipv4Addr::LOCALHOST, general_port)) .send_to(&[1, 2, 3, 4], (std::net::Ipv4Addr::LOCALHOST, general_port))
.unwrap(); .unwrap();
let res = poll.poll().unwrap(); let res = poll.poll().unwrap();
assert!(!res.event_socket);
assert!(res.general_socket); assert_eq!(res.ready_sockets().len(), 1);
assert!(!res.stdin); assert_eq!(res.ready_sockets()[0].0, 0);
assert_eq!(poll.general_socket().recv(&mut buf).unwrap(), 4); assert_eq!(res.ready_sockets()[0].1, SocketType::GeneralSocket);
assert_eq!(res.ready_sockets()[0].2.recv(&mut buf).unwrap(), 4);
assert_eq!(buf, [1, 2, 3, 4]); assert_eq!(buf, [1, 2, 3, 4]);
stdin.write_all(&[1, 2, 3, 4]).unwrap(); stdin.write_all(&[1, 2, 3, 4]).unwrap();
let res = poll.poll().unwrap(); let res = poll.poll().unwrap();
assert!(!res.event_socket); assert!(res.ready_sockets().is_empty());
assert!(!res.general_socket); {
assert!(res.stdin); let mut stdin = res.stdin();
poll.stdin().read_exact(&mut buf).unwrap(); let stdin = stdin.as_mut().unwrap();
stdin.read_exact(&mut buf).unwrap();
assert_eq!(buf, [1, 2, 3, 4]); assert_eq!(buf, [1, 2, 3, 4]);
} }
}
drop(poll); drop(poll);
} }

View file

@ -73,12 +73,8 @@ fn create_socket(port: u16) -> Result<UdpSocket, Error> {
Ok(socket) Ok(socket)
} }
/// Join the multicast groups for PTP on the configured interfaces. /// Retrieve the list of interfaces based on the available ones and the arguments.
fn join_multicast( fn list_interfaces(args: &args::Args) -> Result<Vec<net::InterfaceInfo>, Error> {
args: &args::Args,
event_socket: &UdpSocket,
general_socket: &UdpSocket,
) -> Result<[u8; 8], Error> {
let mut ifaces = net::query_interfaces().context("Failed to query network interfaces")?; let mut ifaces = net::query_interfaces().context("Failed to query network interfaces")?;
if ifaces.is_empty() { if ifaces.is_empty() {
bail!("No suitable network interfaces for PTP found"); bail!("No suitable network interfaces for PTP found");
@ -115,15 +111,28 @@ fn join_multicast(
} }
} }
Ok(ifaces)
}
fn run() -> Result<(), Error> {
let args = args::parse_args().context("Failed parsing commandline parameters")?;
let ifaces = list_interfaces(&args).context("Failed listing interfaces")?;
let mut sockets = vec![];
for iface in &ifaces { for iface in &ifaces {
info!("Binding to interface {}", iface.name); info!("Binding to interface {}", iface.name);
}
for socket in [&event_socket, &general_socket].iter() { let event_socket = create_socket(PTP_EVENT_PORT).context("Failed creating event socket")?;
for iface in &ifaces { let general_socket =
create_socket(PTP_GENERAL_PORT).context("Failed creating general socket")?;
for socket in [&event_socket, &general_socket] {
net::join_multicast_v4(socket, &PTP_MULTICAST_ADDR, iface) net::join_multicast_v4(socket, &PTP_MULTICAST_ADDR, iface)
.context("Failed to join multicast group")?; .context("Failed to join multicast group")?;
} }
sockets.push((event_socket, general_socket));
} }
let clock_id = if args.clock_id == 0 { let clock_id = if args.clock_id == 0 {
@ -140,27 +149,13 @@ fn join_multicast(
} else { } else {
args.clock_id.to_be_bytes() args.clock_id.to_be_bytes()
}; };
info!("Using clock ID {:?}", clock_id); info!("Using clock ID {:?}", clock_id);
Ok(clock_id)
}
fn run() -> Result<(), Error> {
let args = args::parse_args().context("Failed parsing commandline parameters")?;
let event_socket = create_socket(PTP_EVENT_PORT).context("Failed creating event socket")?;
let general_socket =
create_socket(PTP_GENERAL_PORT).context("Failed creating general socket")?;
thread::set_priority().context("Failed to set thread priority")?; thread::set_priority().context("Failed to set thread priority")?;
privileges::drop().context("Failed dropping privileges")?; privileges::drop().context("Failed dropping privileges")?;
let clock_id = join_multicast(&args, &event_socket, &general_socket) let mut poll = io::Poll::new(sockets).context("Failed creating poller")?;
.context("Failed joining multicast groups")?;
let mut poll = io::Poll::new(event_socket, general_socket).context("Failed creating poller")?;
// Write clock ID first // Write clock ID first
{ {
@ -184,27 +179,20 @@ fn run() -> Result<(), Error> {
// We assume that stdout never blocks and stdin receives a complete valid packet whenever it is // We assume that stdout never blocks and stdin receives a complete valid packet whenever it is
// ready and never blocks in the middle of a packet. // ready and never blocks in the middle of a packet.
let mut socket_buffer = [0u8; 8192]; let mut socket_buffer = [0u8; 8192];
let mut stdinout_buffer = [0u8; 8192 + 3 + 8]; let mut stdinout_buffer = [0u8; 8192 + 4 + 8];
loop { loop {
let poll_res = poll.poll().context("Failed polling")?; let poll_res = poll.poll().context("Failed polling")?;
// If any of the sockets are ready, continue reading packets from them until no more // If any of the sockets are ready, continue reading packets from them until no more
// packets are left and directly forward them to stdout. // packets are left and directly forward them to stdout.
'next_socket: for idx in [poll_res.event_socket, poll_res.general_socket] 'next_socket: for (idx, type_, socket) in poll_res.ready_sockets() {
.iter() let idx = *idx;
.enumerate() let type_ = *type_;
.filter_map(|(idx, r)| if *r { Some(idx) } else { None })
{
let idx = idx as u8;
// Read all available packets from the socket before going to the next socket. // Read all available packets from the socket before going to the next socket.
'next_packet: loop { 'next_packet: loop {
let res = match idx { let res = socket.recv_from(&mut socket_buffer);
MSG_TYPE_EVENT => poll.event_socket().recv_from(&mut socket_buffer),
MSG_TYPE_GENERAL => poll.general_socket().recv_from(&mut socket_buffer),
_ => unreachable!(),
};
let (read, addr) = match res { let (read, addr) = match res {
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
@ -213,12 +201,7 @@ fn run() -> Result<(), Error> {
Err(err) => { Err(err) => {
bail!( bail!(
source: err, source: err,
"Failed reading from {} socket", "Failed reading from {:?} socket for interface {}", type_, idx,
if idx == MSG_TYPE_EVENT {
"event"
} else {
"general"
}
); );
} }
Ok((read, addr)) => (read, addr), Ok((read, addr)) => (read, addr),
@ -227,13 +210,10 @@ fn run() -> Result<(), Error> {
let recv_time = clock::time(); let recv_time = clock::time();
if args.verbose { if args.verbose {
trace!( trace!(
"Received {} bytes from {} socket from {} at {}", "Received {} bytes from {:?} socket for interface {} from {} at {}",
read, read,
if idx == MSG_TYPE_EVENT { type_,
"event" idx,
} else {
"general"
},
addr, addr,
recv_time, recv_time,
); );
@ -275,18 +255,25 @@ fn run() -> Result<(), Error> {
} }
{ {
let mut buf = &mut stdinout_buffer[..(read + 3 + 8)]; let mut buf = &mut stdinout_buffer[..(read + 4 + 8)];
buf.write_u16be(read as u16 + 8) buf.write_u16be(read as u16 + 1 + 8)
.expect("Too small stdout buffer"); .expect("Too small stdout buffer");
buf.write_u8(idx).expect("Too small stdout buffer"); buf.write_u8(if type_ == io::SocketType::EventSocket {
MSG_TYPE_EVENT
} else {
MSG_TYPE_GENERAL
})
.expect("Too small stdout buffer");
buf.write_u8(idx as u8).expect("Too small stdout buffer");
buf.write_u64be(recv_time).expect("Too small stdout buffer"); buf.write_u64be(recv_time).expect("Too small stdout buffer");
buf.write_all(&socket_buffer[..read]) buf.write_all(&socket_buffer[..read])
.expect("Too small stdout buffer"); .expect("Too small stdout buffer");
assert!(buf.is_empty(), "Too big stdout buffer",); assert!(buf.is_empty(), "Too big stdout buffer",);
} }
let buf = &stdinout_buffer[..(read + 3 + 8)]; let buf = &stdinout_buffer[..(read + 4 + 8)];
poll.stdout() poll_res
.stdout()
.write_all(buf) .write_all(buf)
.context("Failed writing to stdout")?; .context("Failed writing to stdout")?;
} }
@ -294,8 +281,8 @@ fn run() -> Result<(), Error> {
// After handling the sockets check if a packet is available on stdin, read it and forward // After handling the sockets check if a packet is available on stdin, read it and forward
// it to the corresponding socket. // it to the corresponding socket.
if poll_res.stdin { if let Some(ref mut stdin) = poll_res.stdin() {
poll.stdin() stdin
.read_exact(&mut stdinout_buffer[0..3]) .read_exact(&mut stdinout_buffer[0..3])
.context("Failed reading packet header from stdin")?; .context("Failed reading packet header from stdin")?;
@ -305,7 +292,7 @@ fn run() -> Result<(), Error> {
} }
let type_ = stdinout_buffer[2]; let type_ = stdinout_buffer[2];
poll.stdin() stdin
.read_exact(&mut stdinout_buffer[0..size as usize]) .read_exact(&mut stdinout_buffer[0..size as usize])
.context("Failed reading packet body from stdin")?; .context("Failed reading packet body from stdin")?;
@ -314,23 +301,31 @@ fn run() -> Result<(), Error> {
continue; continue;
} }
if size < 1 + 8 + 34 {
bail!("Invalid packet body size");
}
let buf = &mut &stdinout_buffer[..(size as usize)];
let idx = buf.read_u8().expect("Too small stdin buffer");
if idx as usize >= ifaces.len() {
warn!("Unexpected stdin message interface index {}", idx);
continue;
}
if args.verbose { if args.verbose {
trace!( trace!(
"Received {} bytes for {} socket from stdin", "Received {} bytes for {} socket for interface {} from stdin",
size, size,
if type_ == MSG_TYPE_EVENT { if type_ == MSG_TYPE_EVENT {
"event" "event"
} else { } else {
"general" "general"
}, },
idx,
); );
} }
if size < 8 + 34 {
bail!("Invalid packet body size");
}
let buf = &mut &stdinout_buffer[..(size as usize)];
let main_send_time = buf.read_u64be().expect("Too small stdin buffer"); let main_send_time = buf.read_u64be().expect("Too small stdin buffer");
// We require that the main process only ever sends valid PTP messages with the clock // We require that the main process only ever sends valid PTP messages with the clock
@ -347,11 +342,11 @@ fn run() -> Result<(), Error> {
let send_time = clock::time(); let send_time = clock::time();
match type_ { match type_ {
MSG_TYPE_EVENT => poll MSG_TYPE_EVENT => poll_res
.event_socket() .event_socket(idx as usize)
.send_to(buf, (PTP_MULTICAST_ADDR, PTP_EVENT_PORT)), .send_to(buf, (PTP_MULTICAST_ADDR, PTP_EVENT_PORT)),
MSG_TYPE_GENERAL => poll MSG_TYPE_GENERAL => poll_res
.general_socket() .general_socket(idx as usize)
.send_to(buf, (PTP_MULTICAST_ADDR, PTP_GENERAL_PORT)), .send_to(buf, (PTP_MULTICAST_ADDR, PTP_GENERAL_PORT)),
_ => unreachable!(), _ => unreachable!(),
} }
@ -393,7 +388,8 @@ fn run() -> Result<(), Error> {
} }
let buf = &stdinout_buffer[..(3 + 12)]; let buf = &stdinout_buffer[..(3 + 12)];
poll.stdout() poll_res
.stdout()
.write_all(buf) .write_all(buf)
.context("Failed writing to stdout")?; .context("Failed writing to stdout")?;
} }

View file

@ -399,6 +399,93 @@ mod imp {
} }
} }
#[cfg(not(any(
target_os = "openbsd",
target_os = "dragonfly",
target_os = "netbsd",
target_os = "macos"
)))]
{
let mreqn = ip_mreqn {
imr_multiaddr: in_addr {
s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()),
},
imr_address: in_addr {
s_addr: u32::from_ne_bytes(Ipv4Addr::UNSPECIFIED.octets()),
},
imr_ifindex: iface.index as _,
};
// SAFETY: Requires a valid ip_mreq or ip_mreqn struct to be passed together
// with its size for checking which of the two it is. On errors a negative
// integer is returned.
unsafe {
if setsockopt(
socket.as_raw_fd(),
IPPROTO_IP,
IP_MULTICAST_IF,
&mreqn as *const _ as *const _,
mem::size_of_val(&mreqn) as _,
) < 0
{
bail!(
source: io::Error::last_os_error(),
"Failed joining multicast group for interface {}",
iface.name,
);
}
}
}
#[cfg(any(target_os = "openbsd", target_os = "dragonfly", target_os = "macos"))]
{
let addr = in_addr {
s_addr: u32::from_ne_bytes(iface.ip_addr.octets()),
};
// SAFETY: Requires a valid in_addr struct to be passed together with its size for
// checking which of the two it is. On errors a negative integer is returned.
unsafe {
if setsockopt(
socket.as_raw_fd(),
IPPROTO_IP,
IP_MULTICAST_IF,
&addr as *const _ as *const _,
mem::size_of_val(&addr) as _,
) < 0
{
bail!(
source: io::Error::last_os_error(),
"Failed joining multicast group for interface {}",
iface.name,
);
}
}
}
#[cfg(target_os = "netbsd")]
{
let idx = (iface.index as u32).to_be();
// SAFETY: Requires a valid in_addr struct or interface index in network byte order
// to be passed together with its size for checking which of the two it is. On
// errors a negative integer is returned.
unsafe {
if setsockopt(
socket.as_raw_fd(),
IPPROTO_IP,
IP_MULTICAST_IF,
&idx as *const _ as *const _,
mem::size_of_val(&idx) as _,
) < 0
{
bail!(
source: io::Error::last_os_error(),
"Failed joining multicast group for interface {}",
iface.name,
);
}
}
}
Ok(()) Ok(())
} }
#[cfg(any(target_os = "solaris", target_os = "illumos"))] #[cfg(any(target_os = "solaris", target_os = "illumos"))]
@ -414,6 +501,29 @@ mod imp {
) )
})?; })?;
let addr = in_addr {
s_addr: u32::from_ne_bytes(iface.ip_addr.octets()),
};
// SAFETY: Requires a valid in_addr struct to be passed together with its size for
// checking which of the two it is. On errors a negative integer is returned.
unsafe {
if setsockopt(
socket.as_raw_fd(),
IPPROTO_IP,
IP_MULTICAST_IF,
&addr as *const _ as *const _,
mem::size_of_val(&addr) as _,
) < 0
{
bail!(
source: io::Error::last_os_error(),
"Failed setting multicast interface {}",
iface.name,
);
}
}
Ok(()) Ok(())
} }
} }
@ -838,6 +948,31 @@ mod imp {
} }
} }
let addr = IN_ADDR {
S_un: IN_ADDR_0 {
S_addr: u32::from_ne_bytes(Ipv4Addr::new(0, 0, 0, iface.index as u8).octets()),
},
};
// SAFETY: Requires a valid IN_ADDR struct to be passed together with its size for checking
// which of the two it is. On errors a negative integer is returned.
unsafe {
if setsockopt(
socket.as_raw_socket(),
IPPROTO_IP as i32,
IP_MULTICAST_IF as i32,
&addr as *const _ as *const _,
mem::size_of_val(&addr) as _,
) < 0
{
bail!(
source: io::Error::last_os_error(),
"Failed joining multicast group for interface {}",
iface.name,
);
}
}
Ok(()) Ok(())
} }

View file

@ -239,8 +239,8 @@ typedef struct
typedef enum typedef enum
{ {
TYPE_EVENT = 0, /* 64-bit monotonic clock time and PTP message is payload */ TYPE_EVENT = 0, /* 8-bit interface index, 64-bit monotonic clock time and PTP message is payload */
TYPE_GENERAL = 1, /* 64-bit monotonic clock time and PTP message is payload */ TYPE_GENERAL = 1, /* 8-bit interface index, 64-bit monotonic clock time and PTP message is payload */
TYPE_CLOCK_ID = 2, /* 64-bit clock ID is payload */ TYPE_CLOCK_ID = 2, /* 64-bit clock ID is payload */
TYPE_SEND_TIME_ACK = 3, /* 64-bit monotonic clock time, 8-bit message type, 8-bit domain number and 16-bit sequence number is payload */ TYPE_SEND_TIME_ACK = 3, /* 64-bit monotonic clock time, 8-bit message type, 8-bit domain number and 16-bit sequence number is payload */
} StdIOMessageType; } StdIOMessageType;
@ -292,6 +292,7 @@ typedef struct
GstClockTime receive_time; GstClockTime receive_time;
PtpClockIdentity master_clock_identity; PtpClockIdentity master_clock_identity;
guint8 iface_idx;
guint8 grandmaster_priority_1; guint8 grandmaster_priority_1;
PtpClockQuality grandmaster_clock_quality; PtpClockQuality grandmaster_clock_quality;
@ -306,6 +307,7 @@ typedef struct
typedef struct typedef struct
{ {
PtpClockIdentity master_clock_identity; PtpClockIdentity master_clock_identity;
guint8 iface_idx;
GstClockTime announce_interval; /* last interval we received */ GstClockTime announce_interval; /* last interval we received */
GQueue announce_messages; GQueue announce_messages;
@ -322,6 +324,7 @@ typedef struct
GstClockTime follow_up_recv_time_local; GstClockTime follow_up_recv_time_local;
GSource *timeout_source; GSource *timeout_source;
guint8 iface_idx;
guint16 delay_req_seqnum; guint16 delay_req_seqnum;
GstClockTime delay_req_send_time_local; /* t3, -1 if we wait for FOLLOW_UP */ GstClockTime delay_req_send_time_local; /* t3, -1 if we wait for FOLLOW_UP */
GstClockTime delay_req_recv_time_remote; /* t4, -1 if we wait */ GstClockTime delay_req_recv_time_remote; /* t4, -1 if we wait */
@ -355,6 +358,7 @@ typedef struct
/* Last selected master clock */ /* Last selected master clock */
gboolean have_master_clock; gboolean have_master_clock;
PtpClockIdentity master_clock_identity; PtpClockIdentity master_clock_identity;
guint8 iface_idx;
guint64 grandmaster_identity; guint64 grandmaster_identity;
/* Last SYNC or FOLLOW_UP timestamp we received */ /* Last SYNC or FOLLOW_UP timestamp we received */
@ -722,8 +726,11 @@ compare_announce_message (const PtpAnnounceMessage * a,
else if (a->master_clock_identity.port_number > else if (a->master_clock_identity.port_number >
b->master_clock_identity.port_number) b->master_clock_identity.port_number)
return 1; return 1;
else
g_assert_not_reached (); if (a->iface_idx < b->iface_idx)
return -1;
else if (a->iface_idx > b->iface_idx)
return 1;
return 0; return 0;
} }
@ -819,13 +826,15 @@ select_best_master_clock (PtpDomainData * domain, GstClockTime now)
if (domain->have_master_clock if (domain->have_master_clock
&& compare_clock_identity (&domain->master_clock_identity, && compare_clock_identity (&domain->master_clock_identity,
&best->master_clock_identity) == 0) { &best->master_clock_identity) == 0
&& domain->iface_idx == best->iface_idx) {
GST_DEBUG ("Master clock in domain %u did not change", domain->domain); GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
} else { } else {
GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
"x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x", "x %u on interface %u with grandmaster clock 0x%016" G_GINT64_MODIFIER
domain->domain, best->master_clock_identity.clock_identity, "x", domain->domain, best->master_clock_identity.clock_identity,
best->master_clock_identity.port_number, best->grandmaster_identity); best->master_clock_identity.port_number, best->iface_idx,
best->grandmaster_identity);
domain->have_master_clock = TRUE; domain->have_master_clock = TRUE;
domain->grandmaster_identity = best->grandmaster_identity; domain->grandmaster_identity = best->grandmaster_identity;
@ -833,9 +842,11 @@ select_best_master_clock (PtpDomainData * domain, GstClockTime now)
/* Opportunistic master clock selection likely gave us the same master /* Opportunistic master clock selection likely gave us the same master
* clock before, no need to reset all statistics */ * clock before, no need to reset all statistics */
if (compare_clock_identity (&domain->master_clock_identity, if (compare_clock_identity (&domain->master_clock_identity,
&best->master_clock_identity) != 0) { &best->master_clock_identity) != 0
|| domain->iface_idx != best->iface_idx) {
memcpy (&domain->master_clock_identity, &best->master_clock_identity, memcpy (&domain->master_clock_identity, &best->master_clock_identity,
sizeof (PtpClockIdentity)); sizeof (PtpClockIdentity));
domain->iface_idx = best->iface_idx;
domain->mean_path_delay = 0; domain->mean_path_delay = 0;
domain->last_delay_req = 0; domain->last_delay_req = 0;
domain->last_path_delays_missing = 9; domain->last_path_delays_missing = 9;
@ -865,7 +876,8 @@ select_best_master_clock (PtpDomainData * domain, GstClockTime now)
} }
static void static void
handle_announce_message (PtpMessage * msg, GstClockTime receive_time) handle_announce_message (PtpMessage * msg, guint8 iface_idx,
GstClockTime receive_time)
{ {
GList *l; GList *l;
PtpDomainData *domain = NULL; PtpDomainData *domain = NULL;
@ -925,7 +937,7 @@ handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
PtpAnnounceSender *tmp = l->data; PtpAnnounceSender *tmp = l->data;
if (compare_clock_identity (&tmp->master_clock_identity, if (compare_clock_identity (&tmp->master_clock_identity,
&msg->source_port_identity) == 0) { &msg->source_port_identity) == 0 && tmp->iface_idx == iface_idx) {
sender = tmp; sender = tmp;
break; break;
} }
@ -936,6 +948,7 @@ handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
memcpy (&sender->master_clock_identity, &msg->source_port_identity, memcpy (&sender->master_clock_identity, &msg->source_port_identity,
sizeof (PtpClockIdentity)); sizeof (PtpClockIdentity));
sender->iface_idx = iface_idx;
g_queue_init (&sender->announce_messages); g_queue_init (&sender->announce_messages);
domain->announce_senders = domain->announce_senders =
g_list_prepend (domain->announce_senders, sender); g_list_prepend (domain->announce_senders, sender);
@ -968,6 +981,7 @@ handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
announce->sequence_id = msg->sequence_id; announce->sequence_id = msg->sequence_id;
memcpy (&announce->master_clock_identity, &msg->source_port_identity, memcpy (&announce->master_clock_identity, &msg->source_port_identity,
sizeof (PtpClockIdentity)); sizeof (PtpClockIdentity));
announce->iface_idx = iface_idx;
announce->grandmaster_identity = announce->grandmaster_identity =
msg->message_specific.announce.grandmaster_identity; msg->message_specific.announce.grandmaster_identity;
announce->grandmaster_priority_1 = announce->grandmaster_priority_1 =
@ -991,7 +1005,7 @@ handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
static gboolean static gboolean
send_delay_req_timeout (PtpPendingSync * sync) send_delay_req_timeout (PtpPendingSync * sync)
{ {
guint8 message[STDIO_MESSAGE_HEADER_SIZE + 8 + 44] = { 0, }; guint8 message[STDIO_MESSAGE_HEADER_SIZE + 1 + 8 + 44] = { 0, };
GstByteWriter writer; GstByteWriter writer;
gsize written; gsize written;
GError *err = NULL; GError *err = NULL;
@ -1003,8 +1017,9 @@ send_delay_req_timeout (PtpPendingSync * sync)
gst_clock_get_time (observation_system_clock); gst_clock_get_time (observation_system_clock);
gst_byte_writer_init_with_data (&writer, message, sizeof (message), FALSE); gst_byte_writer_init_with_data (&writer, message, sizeof (message), FALSE);
gst_byte_writer_put_uint16_be_unchecked (&writer, 8 + 44); gst_byte_writer_put_uint16_be_unchecked (&writer, 1 + 8 + 44);
gst_byte_writer_put_uint8_unchecked (&writer, TYPE_EVENT); gst_byte_writer_put_uint8_unchecked (&writer, TYPE_EVENT);
gst_byte_writer_put_uint8_unchecked (&writer, sync->iface_idx);
gst_byte_writer_put_uint64_be_unchecked (&writer, send_time); gst_byte_writer_put_uint64_be_unchecked (&writer, send_time);
gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ); gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
gst_byte_writer_put_uint8_unchecked (&writer, 2); gst_byte_writer_put_uint8_unchecked (&writer, 2);
@ -1058,6 +1073,7 @@ send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
} }
domain->last_delay_req = now; domain->last_delay_req = now;
sync->iface_idx = domain->iface_idx;
sync->delay_req_seqnum = domain->last_delay_req_seqnum++; sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
/* IEEE 1588 9.5.11.2 */ /* IEEE 1588 9.5.11.2 */
@ -1481,7 +1497,8 @@ out:
} }
static void static void
handle_sync_message (PtpMessage * msg, GstClockTime receive_time) handle_sync_message (PtpMessage * msg, guint8 iface_idx,
GstClockTime receive_time)
{ {
GList *l; GList *l;
PtpDomainData *domain = NULL; PtpDomainData *domain = NULL;
@ -1523,15 +1540,20 @@ handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
/* If we have a master clock, ignore this message if it's not coming from there */ /* If we have a master clock, ignore this message if it's not coming from there */
if (domain->have_master_clock if (domain->have_master_clock
&& compare_clock_identity (&domain->master_clock_identity, && (compare_clock_identity (&domain->master_clock_identity,
&msg->source_port_identity) != 0) &msg->source_port_identity) != 0
|| domain->iface_idx != iface_idx)) {
GST_TRACE ("SYNC msg not from current clock master. Ignoring");
return; return;
}
#ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION #ifdef USE_OPPORTUNISTIC_CLOCK_SELECTION
/* Opportunistic selection of master clock */ /* Opportunistic selection of master clock */
if (!domain->have_master_clock) if (!domain->have_master_clock) {
memcpy (&domain->master_clock_identity, &msg->source_port_identity, memcpy (&domain->master_clock_identity, &msg->source_port_identity,
sizeof (PtpClockIdentity)); sizeof (PtpClockIdentity));
domain->iface_idx = iface_idx;
}
#else #else
if (!domain->have_master_clock) if (!domain->have_master_clock)
return; return;
@ -1613,7 +1635,8 @@ handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
} }
static void static void
handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time) handle_follow_up_message (PtpMessage * msg, guint8 iface_idx,
GstClockTime receive_time)
{ {
GList *l; GList *l;
PtpDomainData *domain = NULL; PtpDomainData *domain = NULL;
@ -1643,8 +1666,9 @@ handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
/* If we have a master clock, ignore this message if it's not coming from there */ /* If we have a master clock, ignore this message if it's not coming from there */
if (domain->have_master_clock if (domain->have_master_clock
&& compare_clock_identity (&domain->master_clock_identity, && (compare_clock_identity (&domain->master_clock_identity,
&msg->source_port_identity) != 0) { &msg->source_port_identity) != 0
|| domain->iface_idx != iface_idx)) {
GST_TRACE ("FOLLOW_UP msg not from current clock master. Ignoring"); GST_TRACE ("FOLLOW_UP msg not from current clock master. Ignoring");
return; return;
} }
@ -1709,7 +1733,8 @@ handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
} }
static void static void
handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time) handle_delay_resp_message (PtpMessage * msg, guint8 iface_idx,
GstClockTime receive_time)
{ {
GList *l; GList *l;
PtpDomainData *domain = NULL; PtpDomainData *domain = NULL;
@ -1740,9 +1765,12 @@ handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
/* If we have a master clock, ignore this message if it's not coming from there */ /* If we have a master clock, ignore this message if it's not coming from there */
if (domain->have_master_clock if (domain->have_master_clock
&& compare_clock_identity (&domain->master_clock_identity, && (compare_clock_identity (&domain->master_clock_identity,
&msg->source_port_identity) != 0) &msg->source_port_identity) != 0
|| domain->iface_idx != iface_idx)) {
GST_TRACE ("DELAY_RESP msg not from current clock master. Ignoring");
return; return;
}
if (msg->log_message_interval == 0x7f) { if (msg->log_message_interval == 0x7f) {
domain->min_delay_req_interval = GST_SECOND; domain->min_delay_req_interval = GST_SECOND;
@ -1809,7 +1837,8 @@ handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
} }
static void static void
handle_ptp_message (PtpMessage * msg, GstClockTime receive_time) handle_ptp_message (PtpMessage * msg, guint8 iface_idx,
GstClockTime receive_time)
{ {
/* Ignore our own messages */ /* Ignore our own messages */
if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity && if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
@ -1818,20 +1847,20 @@ handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
return; return;
} }
GST_TRACE ("Message type %d receive_time %" GST_TIME_FORMAT, GST_TRACE ("Message type %d iface idx %d receive_time %" GST_TIME_FORMAT,
msg->message_type, GST_TIME_ARGS (receive_time)); msg->message_type, iface_idx, GST_TIME_ARGS (receive_time));
switch (msg->message_type) { switch (msg->message_type) {
case PTP_MESSAGE_TYPE_ANNOUNCE: case PTP_MESSAGE_TYPE_ANNOUNCE:
handle_announce_message (msg, receive_time); handle_announce_message (msg, iface_idx, receive_time);
break; break;
case PTP_MESSAGE_TYPE_SYNC: case PTP_MESSAGE_TYPE_SYNC:
handle_sync_message (msg, receive_time); handle_sync_message (msg, iface_idx, receive_time);
break; break;
case PTP_MESSAGE_TYPE_FOLLOW_UP: case PTP_MESSAGE_TYPE_FOLLOW_UP:
handle_follow_up_message (msg, receive_time); handle_follow_up_message (msg, iface_idx, receive_time);
break; break;
case PTP_MESSAGE_TYPE_DELAY_RESP: case PTP_MESSAGE_TYPE_DELAY_RESP:
handle_delay_resp_message (msg, receive_time); handle_delay_resp_message (msg, iface_idx, receive_time);
break; break;
default: default:
break; break;
@ -1941,12 +1970,14 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res,
case TYPE_EVENT: case TYPE_EVENT:
case TYPE_GENERAL:{ case TYPE_GENERAL:{
GstClockTime receive_time = gst_clock_get_time (observation_system_clock); GstClockTime receive_time = gst_clock_get_time (observation_system_clock);
guint8 iface_idx;
GstClockTime helper_receive_time; GstClockTime helper_receive_time;
PtpMessage msg; PtpMessage msg;
helper_receive_time = GST_READ_UINT64_BE (stdout_buffer); iface_idx = GST_READ_UINT8 (stdout_buffer);
helper_receive_time = GST_READ_UINT64_BE (stdout_buffer + 1);
if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer + 8, if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer + 1 + 8,
CUR_STDIO_HEADER_SIZE)) { CUR_STDIO_HEADER_SIZE)) {
dump_ptp_message (&msg); dump_ptp_message (&msg);
if (helper_receive_time != 0) { if (helper_receive_time != 0) {
@ -1955,7 +1986,7 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res,
helper_receive_time))); helper_receive_time)));
receive_time = helper_receive_time; receive_time = helper_receive_time;
} }
handle_ptp_message (&msg, receive_time); handle_ptp_message (&msg, iface_idx, receive_time);
} }
break; break;
} }