ptp: Get rid of struct padding in the messages with the helper process

Also remove the now unnecessary private header file.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4647>
This commit is contained in:
Sebastian Dröge 2023-05-15 17:44:31 +03:00 committed by GStreamer Marge Bot
parent 55c961a4dc
commit b74669a38a
3 changed files with 62 additions and 81 deletions

View file

@ -41,6 +41,14 @@ const PTP_EVENT_PORT: u16 = 319;
/// PTP General message port. /// PTP General message port.
const PTP_GENERAL_PORT: u16 = 320; const PTP_GENERAL_PORT: u16 = 320;
/// StdIO Message Types.
/// PTP message for the event socket.
const MSG_TYPE_EVENT: u8 = 0;
/// PTP message for the general socket.
const MSG_TYPE_GENERAL: u8 = 1;
/// Clock ID message
const MSG_TYPE_CLOCK_ID: u8 = 2;
/// Create a new `UdpSocket` for the given port and configure it for PTP. /// Create a new `UdpSocket` for the given port and configure it for PTP.
fn create_socket(port: u16) -> Result<UdpSocket, Error> { fn create_socket(port: u16) -> Result<UdpSocket, Error> {
let socket = UdpSocket::bind(SocketAddr::from((Ipv4Addr::UNSPECIFIED, port))) let socket = UdpSocket::bind(SocketAddr::from((Ipv4Addr::UNSPECIFIED, port)))
@ -139,11 +147,10 @@ fn main() -> Result<(), Error> {
// Write clock ID first // Write clock ID first
{ {
let mut clock_id_data = [0u8; 4 + 8]; let mut clock_id_data = [0u8; 3 + 8];
clock_id_data[0..2].copy_from_slice(&8u16.to_le_bytes()); clock_id_data[0..2].copy_from_slice(&8u16.to_be_bytes());
clock_id_data[2] = 2; clock_id_data[2] = MSG_TYPE_CLOCK_ID;
clock_id_data[3] = 0; clock_id_data[3..].copy_from_slice(&clock_id);
clock_id_data[4..].copy_from_slice(&clock_id);
poll.stdout() poll.stdout()
.write_all(&clock_id_data) .write_all(&clock_id_data)
@ -154,8 +161,8 @@ fn main() -> Result<(), Error> {
// //
// We assume that stdout never blocks and stdin receives a complete valid packet whenever it is // We assume that stdout never blocks and stdin receives a complete valid packet whenever it is
// ready and never blocks in the middle of a packet. // ready and never blocks in the middle of a packet.
let mut socket_buffer = [0u8; 1500]; let mut socket_buffer = [0u8; 8192];
let mut stdinout_buffer = [0u8; 1504]; let mut stdinout_buffer = [0u8; 8192 + 3];
loop { loop {
let poll_res = poll.poll().context("Failed polling")?; let poll_res = poll.poll().context("Failed polling")?;
@ -167,9 +174,10 @@ fn main() -> Result<(), Error> {
.enumerate() .enumerate()
.filter_map(|(idx, r)| if *r { Some(idx) } else { None }) .filter_map(|(idx, r)| if *r { Some(idx) } else { None })
{ {
let idx = idx as u8;
let res = match idx { let res = match idx {
0 => poll.event_socket().recv(&mut socket_buffer), MSG_TYPE_EVENT => poll.event_socket().recv(&mut socket_buffer),
1 => poll.general_socket().recv(&mut socket_buffer), MSG_TYPE_GENERAL => poll.general_socket().recv(&mut socket_buffer),
_ => unreachable!(), _ => unreachable!(),
}; };
@ -185,13 +193,12 @@ fn main() -> Result<(), Error> {
); );
} }
Ok(read) => { Ok(read) => {
stdinout_buffer[0..2].copy_from_slice(&(read as u16).to_ne_bytes()); stdinout_buffer[0..2].copy_from_slice(&(read as u16).to_be_bytes());
stdinout_buffer[2] = idx as u8; stdinout_buffer[2] = idx;
stdinout_buffer[3] = 0; stdinout_buffer[3..][..read].copy_from_slice(&socket_buffer[..read]);
stdinout_buffer[4..][..read].copy_from_slice(&socket_buffer[..read]);
poll.stdout() poll.stdout()
.write_all(&stdinout_buffer[..(read + 4)]) .write_all(&stdinout_buffer[..(read + 3)])
.context("Failed writing to stdout")?; .context("Failed writing to stdout")?;
} }
} }
@ -201,10 +208,10 @@ fn main() -> Result<(), Error> {
// it to the corresponding socket. // it to the corresponding socket.
if poll_res.stdin { if poll_res.stdin {
poll.stdin() poll.stdin()
.read_exact(&mut stdinout_buffer[0..4]) .read_exact(&mut stdinout_buffer[0..3])
.context("Failed reading packet header from stdin")?; .context("Failed reading packet header from stdin")?;
let size = u16::from_ne_bytes([stdinout_buffer[0], stdinout_buffer[1]]); let size = u16::from_be_bytes([stdinout_buffer[0], stdinout_buffer[1]]);
if size as usize > stdinout_buffer.len() { if size as usize > stdinout_buffer.len() {
bail!("Invalid packet size on stdin {}", size); bail!("Invalid packet size on stdin {}", size);
} }
@ -216,10 +223,10 @@ fn main() -> Result<(), Error> {
let buf = &stdinout_buffer[0..size as usize]; let buf = &stdinout_buffer[0..size as usize];
match type_ { match type_ {
0 => poll MSG_TYPE_EVENT => poll
.event_socket() .event_socket()
.send_to(buf, (PTP_MULTICAST_ADDR, PTP_EVENT_PORT)), .send_to(buf, (PTP_MULTICAST_ADDR, PTP_EVENT_PORT)),
1 => poll MSG_TYPE_GENERAL => poll
.general_socket() .general_socket()
.send_to(buf, (PTP_MULTICAST_ADDR, PTP_GENERAL_PORT)), .send_to(buf, (PTP_MULTICAST_ADDR, PTP_GENERAL_PORT)),
_ => unreachable!(), _ => unreachable!(),

View file

@ -1,19 +0,0 @@
#ifndef __GST_PTP_PRIVATE_H__
#define __GST_PTP_PRIVATE_H__
#include <glib.h>
enum
{
TYPE_EVENT,
TYPE_GENERAL,
TYPE_CLOCK_ID
};
typedef struct
{
guint16 size;
guint8 type;
} StdIOHeader;
#endif /* __GST_PTP_PRIVATE_H__ */

View file

@ -55,7 +55,6 @@
#endif #endif
#include "gstptpclock.h" #include "gstptpclock.h"
#include "gstptp_private.h"
#include <gio/gio.h> #include <gio/gio.h>
@ -238,6 +237,16 @@ typedef struct
} message_specific; } message_specific;
} PtpMessage; } PtpMessage;
typedef enum
{
TYPE_EVENT = 0, /* PTP message is payload */
TYPE_GENERAL = 1, /* PTP message is payload */
TYPE_CLOCK_ID = 2, /* 64-bit clock ID is payload */
} StdIOMessageType;
/* 2 byte BE payload size plus 1 byte message type */
#define STDIO_MESSAGE_HEADER_SIZE (sizeof (guint16) + sizeof (guint8))
static GMutex ptp_lock; static GMutex ptp_lock;
static GCond ptp_cond; static GCond ptp_cond;
static gboolean initted = FALSE; static gboolean initted = FALSE;
@ -249,8 +258,8 @@ static gboolean supported = FALSE;
static GSubprocess *ptp_helper_process; static GSubprocess *ptp_helper_process;
static GInputStream *stdout_pipe; static GInputStream *stdout_pipe;
static GOutputStream *stdin_pipe; static GOutputStream *stdin_pipe;
static StdIOHeader stdio_header; /* buffer for reading */ static guint8 stdio_header[STDIO_MESSAGE_HEADER_SIZE]; /* buffer for reading the message header */
static gchar stdout_buffer[8192]; /* buffer for reading */ static guint8 stdout_buffer[8192]; /* buffer for reading the message payload */
static GThread *ptp_helper_thread; static GThread *ptp_helper_thread;
static GMainContext *main_context; static GMainContext *main_context;
static GMainLoop *main_loop; static GMainLoop *main_loop;
@ -258,6 +267,9 @@ static GRand *delay_req_rand;
static GstClock *observation_system_clock; static GstClock *observation_system_clock;
static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 }; static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
#define CUR_STDIO_HEADER_SIZE (GST_READ_UINT16_BE (stdio_header))
#define CUR_STDIO_HEADER_TYPE ((StdIOMessageType) stdio_header[2])
typedef struct typedef struct
{ {
GstClockTime receive_time; GstClockTime receive_time;
@ -947,18 +959,16 @@ handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
static gboolean static gboolean
send_delay_req_timeout (PtpPendingSync * sync) send_delay_req_timeout (PtpPendingSync * sync)
{ {
StdIOHeader header = { 0, }; guint8 message[STDIO_MESSAGE_HEADER_SIZE + 44] = { 0, };
guint8 delay_req[44];
GstByteWriter writer; GstByteWriter writer;
gsize written; gsize written;
GError *err = NULL; GError *err = NULL;
header.type = TYPE_EVENT;
header.size = 44;
GST_TRACE ("Sending delay_req to domain %u", sync->domain); GST_TRACE ("Sending delay_req to domain %u", sync->domain);
gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE); gst_byte_writer_init_with_data (&writer, message, sizeof (message), FALSE);
gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
gst_byte_writer_put_uint8_unchecked (&writer, TYPE_EVENT);
gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ); gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
gst_byte_writer_put_uint8_unchecked (&writer, 2); gst_byte_writer_put_uint8_unchecked (&writer, 2);
gst_byte_writer_put_uint16_be_unchecked (&writer, 44); gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
@ -976,29 +986,11 @@ send_delay_req_timeout (PtpPendingSync * sync)
gst_byte_writer_put_uint64_be_unchecked (&writer, 0); gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
gst_byte_writer_put_uint16_be_unchecked (&writer, 0); gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
if (!g_output_stream_write_all (stdin_pipe, &header, sizeof (StdIOHeader),
&written, NULL, &err)) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)
|| g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) {
GST_ERROR ("Got EOF on stdout");
} else {
GST_ERROR ("Failed to write header to stdin: %s", err->message);
}
g_message ("EOF on stdout");
g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE;
} else if (written != sizeof (StdIOHeader)) {
GST_ERROR ("Unexpected write size: %" G_GSIZE_FORMAT, written);
g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE;
}
sync->delay_req_send_time_local = sync->delay_req_send_time_local =
gst_clock_get_time (observation_system_clock); gst_clock_get_time (observation_system_clock);
if (!g_output_stream_write_all (stdin_pipe, delay_req, 44, &written, NULL, if (!g_output_stream_write_all (stdin_pipe, message, sizeof (message),
&err)) { &written, NULL, &err)) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)
|| g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) { || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) {
GST_ERROR ("Got EOF on stdout"); GST_ERROR ("Got EOF on stdout");
@ -1009,7 +1001,7 @@ send_delay_req_timeout (PtpPendingSync * sync)
g_message ("EOF on stdout"); g_message ("EOF on stdout");
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE; return G_SOURCE_REMOVE;
} else if (written != 44) { } else if (written != sizeof (message)) {
GST_ERROR ("Unexpected write size: %" G_GSIZE_FORMAT, written); GST_ERROR ("Unexpected write size: %" G_GSIZE_FORMAT, written);
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE; return G_SOURCE_REMOVE;
@ -1817,20 +1809,20 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res,
GST_ERROR ("Got EOF on stdin"); GST_ERROR ("Got EOF on stdin");
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return; return;
} else if (read != stdio_header.size) { } else if (read != CUR_STDIO_HEADER_SIZE) {
GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read); GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return; return;
} }
switch (stdio_header.type) { switch (CUR_STDIO_HEADER_TYPE) {
case TYPE_EVENT: case TYPE_EVENT:
case TYPE_GENERAL:{ case TYPE_GENERAL:{
GstClockTime receive_time = gst_clock_get_time (observation_system_clock); GstClockTime receive_time = gst_clock_get_time (observation_system_clock);
PtpMessage msg; PtpMessage msg;
if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer, if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer,
stdio_header.size)) { CUR_STDIO_HEADER_SIZE)) {
dump_ptp_message (&msg); dump_ptp_message (&msg);
handle_ptp_message (&msg, receive_time); handle_ptp_message (&msg, receive_time);
} }
@ -1838,8 +1830,8 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res,
} }
default: default:
case TYPE_CLOCK_ID:{ case TYPE_CLOCK_ID:{
if (stdio_header.size != 8) { if (CUR_STDIO_HEADER_SIZE != 8) {
GST_ERROR ("Unexpected clock id size (%u != 8)", stdio_header.size); GST_ERROR ("Unexpected clock id size (%u != 8)", CUR_STDIO_HEADER_SIZE);
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return; return;
} }
@ -1859,9 +1851,9 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res,
} }
/* And read the next header */ /* And read the next header */
memset (&stdio_header, 0, sizeof (StdIOHeader)); memset (&stdio_header, 0, STDIO_MESSAGE_HEADER_SIZE);
g_input_stream_read_all_async (stdout_pipe, &stdio_header, g_input_stream_read_all_async (stdout_pipe, &stdio_header,
sizeof (StdIOHeader), G_PRIORITY_DEFAULT, NULL, STDIO_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback) have_stdout_header, NULL); (GAsyncReadyCallback) have_stdout_header, NULL);
} }
@ -1887,19 +1879,20 @@ have_stdout_header (GInputStream * stdout_pipe, GAsyncResult * res,
GST_ERROR ("Got EOF on stdin"); GST_ERROR ("Got EOF on stdin");
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return; return;
} else if (read != sizeof (StdIOHeader)) { } else if (read != STDIO_MESSAGE_HEADER_SIZE) {
GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read); GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return; return;
} else if (stdio_header.size > 8192) { } else if (CUR_STDIO_HEADER_SIZE > 8192) {
GST_ERROR ("Unexpected size: %u", stdio_header.size); GST_ERROR ("Unexpected size: %u", CUR_STDIO_HEADER_SIZE);
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return; return;
} }
/* And now read the body */ /* And now read the body */
g_input_stream_read_all_async (stdout_pipe, stdout_buffer, stdio_header.size, g_input_stream_read_all_async (stdout_pipe, stdout_buffer,
G_PRIORITY_DEFAULT, NULL, (GAsyncReadyCallback) have_stdout_body, NULL); CUR_STDIO_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback) have_stdout_body, NULL);
} }
/* Cleanup all announce messages and announce message senders /* Cleanup all announce messages and announce message senders
@ -2001,9 +1994,9 @@ ptp_helper_main (gpointer data)
g_main_context_push_thread_default (main_context); g_main_context_push_thread_default (main_context);
memset (&stdio_header, 0, sizeof (StdIOHeader)); memset (&stdio_header, 0, STDIO_MESSAGE_HEADER_SIZE);
g_input_stream_read_all_async (stdout_pipe, &stdio_header, g_input_stream_read_all_async (stdout_pipe, &stdio_header,
sizeof (StdIOHeader), G_PRIORITY_DEFAULT, NULL, STDIO_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback) have_stdout_header, NULL); (GAsyncReadyCallback) have_stdout_header, NULL);
/* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */ /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */