From 87ca02bee7d3d38feabee1cda7d13509b74621df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 16 May 2023 14:53:24 +0300 Subject: [PATCH] ptp: Add logging between the helper process and the main process via stderr Part-of: --- .../gstreamer/libs/gst/helpers/ptp/args.rs | 4 +- .../gstreamer/libs/gst/helpers/ptp/ffi.rs | 6 + .../gstreamer/libs/gst/helpers/ptp/io.rs | 142 ++++++++++++- .../gstreamer/libs/gst/helpers/ptp/log.rs | 109 ++++++++++ .../gstreamer/libs/gst/helpers/ptp/main.rs | 48 ++++- .../gstreamer/libs/gst/helpers/ptp/net.rs | 65 ++++-- .../gstreamer/libs/gst/net/gstptpclock.c | 187 +++++++++++++++++- 7 files changed, 527 insertions(+), 34 deletions(-) create mode 100644 subprojects/gstreamer/libs/gst/helpers/ptp/log.rs diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/args.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/args.rs index 75868b9989..25849834ef 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/args.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/args.rs @@ -58,9 +58,7 @@ pub fn parse_args() -> Result { clock_id, }; - if verbose { - eprintln!("Running with arguments {:#?}", args); - } + info!("Running with arguments {:#?}", args); Ok(args) } diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs index 785a964cc2..99d8e231d1 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/ffi.rs @@ -31,6 +31,8 @@ pub mod unix { // XXX: Once meson has cargo subproject support all of the below can be replaced with the libc crate. pub const STDIN_FILENO: RawFd = 0; pub const STDOUT_FILENO: RawFd = 1; + #[cfg(not(test))] + pub const STDERR_FILENO: RawFd = 2; pub const O_RDONLY: c_int = 0; pub const POLLIN: c_short = 0x1; @@ -547,8 +549,12 @@ pub mod windows { // corresponding C headers, MSDN and related documentation. // // XXX: Once meson has cargo subproject support all of the below can be replaced with the windows-sys crate. + pub const INVALID_HANDLE_VALUE: HANDLE = (-1 as isize as usize) as HANDLE; + pub const STD_INPUT_HANDLE: i32 = -10; pub const STD_OUTPUT_HANDLE: i32 = -11; + #[cfg(not(test))] + pub const STD_ERROR_HANDLE: i32 = -12; pub const FILE_TYPE_CHAR: u32 = 0x0002; pub const FILE_TYPE_PIPE: u32 = 0x0003; diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/io.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/io.rs index d46572437d..1baefd6d2e 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/io.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/io.rs @@ -308,6 +308,42 @@ mod imp { Ok(()) } } + + /// Raw, unbuffered handle to `stderr`. + /// + /// This implements the `Write` trait for writing and is implemented as a singleton to allow + /// usage from everywhere at any time for logging purposes. + /// + /// This does not implement any locking so usage from multiple threads at once will likely + /// cause interleaved output. + pub struct Stderr(RawFd); + + impl Stderr { + #[cfg(not(test))] + pub fn acquire() -> Self { + Stderr(STDERR_FILENO) + } + } + + impl Write for Stderr { + fn write(&mut self, buf: &[u8]) -> io::Result { + // SAFETY: write() requires a valid fd and a mutable buffer with the given size. + // The fd is valid by construction as is the buffer. + // + // write() will return the number of bytes written or a negative value on errors. + let res = unsafe { write(self.0, buf.as_ptr(), buf.len()) }; + + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok(res as usize) + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } } #[cfg(windows)] @@ -352,7 +388,7 @@ mod imp { // that has to be closed again later. unsafe { let event = WSACreateEvent(); - if event.is_null() || event as isize == -1 { + if event.is_null() || event == INVALID_HANDLE_VALUE { Err(io::Error::from_raw_os_error(WSAGetLastError())) } else { Ok(EventHandle(event)) @@ -729,7 +765,7 @@ mod imp { let handle = GetStdHandle(STD_INPUT_HANDLE); if handle.is_null() { bail!("No stdin handle set"); - } else if handle as isize == -1 { + } else if handle == INVALID_HANDLE_VALUE { bail!(source: io::Error::last_os_error(), "Can't get stdin handle"); } @@ -932,7 +968,7 @@ mod imp { let handle = GetStdHandle(STD_OUTPUT_HANDLE); if handle.is_null() { bail!("No stdout handle set"); - } else if handle as isize == -1 { + } else if handle == INVALID_HANDLE_VALUE { bail!( source: io::Error::last_os_error(), "Can't get stdout handle" @@ -993,9 +1029,107 @@ mod imp { Ok(()) } } + + /// Raw, unbuffered handle to `stderr`. + /// + /// This implements the `Write` trait for writing and is implemented as a singleton to allow + /// usage from everywhere at any time for logging purposes. + /// + /// This does not implement any locking so usage from multiple threads at once will likely + /// cause interleaved output. + pub struct Stderr(HANDLE); + + impl Stderr { + #[cfg(not(test))] + pub fn acquire() -> Self { + use std::sync::Once; + + struct SyncHandle(HANDLE); + // SAFETY: This is a single-threaded application and even otherwise writing from + // multiple threads at once to a pipe is safe and will only cause interleaved output. + unsafe impl Send for SyncHandle {} + unsafe impl Sync for SyncHandle {} + + static mut STDERR: SyncHandle = SyncHandle(INVALID_HANDLE_VALUE); + static STDERR_ONCE: Once = Once::new(); + + STDERR_ONCE.call_once(|| { + // SAFETY: GetStdHandle returns a borrowed handle, or 0 if none is set or -1 if an + // error has happened. + let handle = unsafe { + let handle = GetStdHandle(STD_ERROR_HANDLE); + if handle.is_null() { + return; + } else if handle == INVALID_HANDLE_VALUE { + return; + } + + handle + }; + + // SAFETY: GetFileType() is safe to call on any valid handle. + let type_ = unsafe { GetFileType(handle) }; + + if type_ == FILE_TYPE_CHAR { + // Set the console to raw mode. + // + // SAFETY: Calling this on non-console handles will cause an error but otherwise + // have no negative effects. We can safely change the console mode here as nothing + // else is accessing the console. + unsafe { + let _ = SetConsoleMode(handle, 0); + } + } else if type_ != FILE_TYPE_PIPE { + return; + } + + // SAFETY: Only accessed in this function and multiple mutable accesses are + // prevented by the `Once`. + unsafe { + STDERR.0 = handle; + } + }); + + // SAFETY: Only accesses immutably here and all mutable accesses are serialized above + // by the `Once`. + Stderr(unsafe { STDERR.0 }) + } + } + + impl Write for Stderr { + fn write(&mut self, buf: &[u8]) -> io::Result { + if self.0 == INVALID_HANDLE_VALUE { + return Ok(buf.len()); + } + + // SAFETY: Writes the given number of bytes to stderr or at most u32::MAX. On error + // zero is returned, otherwise the number of bytes written is set accordingly and + // returned. + unsafe { + let mut lpnumberofbyteswritten = mem::MaybeUninit::uninit(); + let res = WriteFile( + self.0, + buf.as_ptr(), + cmp::min(buf.len() as u32, u32::MAX) as u32, + lpnumberofbyteswritten.as_mut_ptr(), + ptr::null_mut(), + ); + + if res == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(lpnumberofbyteswritten.assume_init() as usize) + } + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } } -pub use self::imp::{Poll, Stdin, Stdout}; +pub use self::imp::{Poll, Stderr, Stdin, Stdout}; #[cfg(test)] mod test { diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/log.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/log.rs new file mode 100644 index 0000000000..fbc33b4401 --- /dev/null +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/log.rs @@ -0,0 +1,109 @@ +// GStreamer +// +// Copyright (C) 2015-2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +#[cfg(not(test))] +use crate::io::Stderr; +#[cfg(not(test))] +use std::io::{self, Cursor, Write}; + +#[derive(Copy, Clone, PartialEq, Eq)] +#[repr(u8)] +#[allow(dead_code)] +pub enum LogLevel { + Error = 1, + Warning = 2, + _Fixme = 3, + Info = 4, + Debug = 5, + _Log = 6, + Trace = 7, +} + +/* + * - 1 byte GstDebugLevel + * - 2 byte BE filename length + * - filename UTF-8 string + * - 2 byte BE module path length + * - module path UTF-8 string + * - 4 byte BE line number + * - remainder is UTF-8 string +*/ +#[cfg(test)] +pub fn log( + _level: LogLevel, + _file: &str, + _module_path: &str, + _line: u32, + _args: std::fmt::Arguments, +) { +} + +#[cfg(not(test))] +pub fn log(level: LogLevel, file: &str, module_path: &str, line: u32, args: std::fmt::Arguments) { + let mut stderr = Stderr::acquire(); + let mut buffer = [0u8; 8192]; + let mut cursor = Cursor::new(&mut buffer[..]); + + // Silently ignore errors. What was written to the buffer was written and if there's more data + // than fits it will simply be cut off in the end. + let _ = (|| -> Result<(), io::Error> { + cursor.write_all(&[0u8, 0u8])?; + cursor.write_all(&[level as u8])?; + cursor.write_all(&(file.len() as u16).to_be_bytes())?; + cursor.write_all(file.as_bytes())?; + cursor.write_all(&(module_path.len() as u16).to_be_bytes())?; + cursor.write_all(module_path.as_bytes())?; + cursor.write_all(&line.to_be_bytes())?; + cursor.write_fmt(args)?; + + Ok(()) + })(); + + let pos = cursor.position() as u16; + if pos < 2 { + return; + } + + cursor.set_position(0); + let _ = cursor.write_all(&(pos - 2).to_be_bytes()); + + let _ = stderr.write_all(&buffer[..pos as usize]); +} + +#[allow(unused_macros)] +macro_rules! error { + ($format:expr $(, $arg:expr)* $(,)?) => {{ + $crate::log::log($crate::log::LogLevel::Error, file!(), module_path!(), line!(), format_args!($format, $($arg),*)); + }}; +} +#[allow(unused_macros)] +macro_rules! warn { + ($format:expr $(, $arg:expr)* $(,)?) => {{ + $crate::log::log($crate::log::LogLevel::Warning, file!(), module_path!(), line!(), format_args!($format, $($arg),*)); + }}; +} +#[allow(unused_macros)] +macro_rules! info { + ($format:expr $(, $arg:expr)* $(,)?) => {{ + $crate::log::log($crate::log::LogLevel::Info, file!(), module_path!(), line!(), format_args!($format, $($arg),*)); + }}; +} +#[allow(unused_macros)] +macro_rules! debug { + ($format:expr $(, $arg:expr)* $(,)?) => {{ + $crate::log::log($crate::log::LogLevel::Debug, file!(), module_path!(), line!(), format_args!($format, $($arg),*)); + }}; +} +#[allow(unused_macros)] +macro_rules! trace { + ($format:expr $(, $arg:expr)* $(,)?) => {{ + $crate::log::log($crate::log::LogLevel::Trace, file!(), module_path!(), line!(), format_args!($format, $($arg),*)); + }}; +} diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs index 311f5b9a60..7daa7d4e75 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/main.rs @@ -22,6 +22,9 @@ use std::{ net::{Ipv4Addr, SocketAddr, UdpSocket}, }; +#[macro_use] +mod log; + mod args; mod error; mod ffi; @@ -93,6 +96,8 @@ fn join_multicast( } } + info!("Interface {} filtered out", iface.name); + false }); @@ -104,6 +109,10 @@ fn join_multicast( } } + for iface in &ifaces { + info!("Binding to interface {}", iface.name); + } + for socket in [&event_socket, &general_socket].iter() { for iface in &ifaces { net::join_multicast_v4(socket, &PTP_MULTICAST_ADDR, iface) @@ -126,10 +135,12 @@ fn join_multicast( args.clock_id.to_be_bytes() }; + info!("Using clock ID {:?}", clock_id); + Ok(clock_id) } -fn main() -> Result<(), Error> { +fn run() -> Result<(), Error> { let args = args::parse_args().context("Failed parsing commandline parameters")?; let event_socket = create_socket(PTP_EVENT_PORT).context("Failed creating event socket")?; @@ -176,8 +187,8 @@ fn main() -> Result<(), Error> { { let idx = idx as u8; let res = match idx { - MSG_TYPE_EVENT => poll.event_socket().recv(&mut socket_buffer), - MSG_TYPE_GENERAL => poll.general_socket().recv(&mut socket_buffer), + MSG_TYPE_EVENT => poll.event_socket().recv_from(&mut socket_buffer), + MSG_TYPE_GENERAL => poll.general_socket().recv_from(&mut socket_buffer), _ => unreachable!(), }; @@ -192,7 +203,15 @@ fn main() -> Result<(), Error> { if idx == 0 { "event" } else { "general" } ); } - Ok(read) => { + Ok((read, addr)) => { + if args.verbose { + trace!( + "Received {} bytes from {} socket from {}", + read, + if idx == 0 { "event" } else { "general" }, + addr + ); + } stdinout_buffer[0..2].copy_from_slice(&(read as u16).to_be_bytes()); stdinout_buffer[2] = idx; stdinout_buffer[3..][..read].copy_from_slice(&socket_buffer[..read]); @@ -217,6 +236,14 @@ fn main() -> Result<(), Error> { } let type_ = stdinout_buffer[2]; + if args.verbose { + trace!( + "Received {} bytes for {} socket from stdin", + size, + if type_ == 0 { "event" } else { "general" }, + ); + } + poll.stdin() .read_exact(&mut stdinout_buffer[0..size as usize]) .context("Failed reading packet body from stdin")?; @@ -240,3 +267,16 @@ fn main() -> Result<(), Error> { } } } + +/// Custom panic hook so we can print them to stderr in a format the main process understands +fn panic_hook(info: &std::panic::PanicInfo) { + error!("Panicked. {}", info); +} + +fn main() { + std::panic::set_hook(Box::new(panic_hook)); + + if let Err(err) = run() { + error!("Exited with error: {:?}", err); + } +} diff --git a/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs b/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs index fc9b263072..b6d4db6fb1 100644 --- a/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs +++ b/subprojects/gstreamer/libs/gst/helpers/ptp/net.rs @@ -116,16 +116,23 @@ mod imp { // Skip loopback interfaces, interfaces that are not up and interfaces that can't do // multicast. These are all unusable for PTP purposes. let flags = ifaddr.ifa_flags; - if (flags & IFF_LOOPBACK as u32 != 0) - || (flags & IFF_UP as u32 == 0) - || (flags & IFF_MULTICAST as u32 == 0) - { + if flags & IFF_LOOPBACK as u32 != 0 { + debug!("Interface {} is loopback interface", name); + continue; + } + if flags & IFF_UP as u32 == 0 { + debug!("Interface {} is not up", name); + continue; + } + if flags & IFF_MULTICAST as u32 == 0 { + debug!("Interface {} does not support multicast", name); continue; } // If the interface has no address then skip it. Only interfaces with IPv4 addresses // are usable for PTP purposes. if ifaddr.ifa_addr.is_null() { + debug!("Interface {} has no IPv4 address", name); continue; } @@ -147,6 +154,7 @@ mod imp { break index; }; if index == 0 { + debug!("Interface {} has no valid interface index", name); continue; } @@ -164,6 +172,8 @@ mod imp { let addr = unsafe { &*(ifaddr.ifa_addr as *const sockaddr_in) }; let ip_addr = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); + debug!("Interface {} has IPv4 address {}", name, ip_addr); + if let Some(if_info) = if_infos.iter_mut().find(|info| info.name == name) { if if_info.ip_addr.is_broadcast() { if_info.ip_addr = ip_addr; @@ -188,6 +198,9 @@ mod imp { if addr.sll_halen == 6 { let mut hw_addr = [0u8; 6]; hw_addr.copy_from_slice(&addr.sll_addr[0..6]); + + debug!("Interface {} has MAC address {:?}", name, hw_addr); + if let Some(if_info) = if_infos.iter_mut().find(|info| info.name == name) { if if_info.hw_addr.is_none() { if_info.hw_addr = Some(hw_addr); @@ -224,6 +237,8 @@ mod imp { hw_addr.copy_from_slice(sdl_addr); } + debug!("Interface {} has MAC address {:?}", name, hw_addr); + if let Some(if_info) = if_infos.iter_mut().find(|info| info.name == name) { if if_info.hw_addr.is_none() { if_info.hw_addr = Some(hw_addr); @@ -315,13 +330,16 @@ mod imp { // lost other than the ability to run multiple processes at once. unsafe { let v = 1i32; - let _ = setsockopt( + let res = setsockopt( socket.as_raw_fd(), SOL_SOCKET, SO_REUSEADDR, &v as *const _ as *const _, mem::size_of_val(&v) as u32, ); + if res < 0 { + warn!("Failed to set SO_REUSEADDR on socket"); + } } #[cfg(not(any(target_os = "solaris", target_os = "illumos")))] @@ -333,13 +351,17 @@ mod imp { // lost other than the ability to run multiple processes at once. unsafe { let v = 1i32; - let _ = setsockopt( + let res = setsockopt( socket.as_raw_fd(), SOL_SOCKET, SO_REUSEPORT, &v as *const _ as *const _, mem::size_of_val(&v) as u32, ); + + if res < 0 { + warn!("Failed to set SO_REUSEPORT on socket"); + } } } } @@ -515,17 +537,26 @@ mod imp { // Skip adapters that are receive-only, can't do multicast or don't have IPv4 support // as they're not usable in a PTP context. - if address.flags & ADAPTER_FLAG_RECEIVE_ONLY != 0 - || address.flags & ADAPTER_FLAG_NO_MULTICAST != 0 - || address.flags & ADAPTER_FLAG_IPV4_ENABLED == 0 - { + if address.flags & ADAPTER_FLAG_RECEIVE_ONLY != 0 { + debug!("Interface {} is receive-only interface", adaptername); + continue; + } + if address.flags & ADAPTER_FLAG_NO_MULTICAST != 0 { + debug!("Interface {} does not support multicast", adaptername); + continue; + } + if address.flags & ADAPTER_FLAG_IPV4_ENABLED == 0 { + debug!("Interface {} has no IPv4 address", adaptername); continue; } // Skip adapters that are loopback or not up. - if address.iftype == IF_TYPE_SOFTWARE_LOOPBACK - || address.operstatus != IF_OPER_STATUS_UP - { + if address.iftype == IF_TYPE_SOFTWARE_LOOPBACK { + debug!("Interface {} is loopback interface", adaptername); + continue; + } + if address.operstatus != IF_OPER_STATUS_UP { + debug!("Interface {} is not up", adaptername); continue; } @@ -534,6 +565,7 @@ mod imp { // Skip adapters that have no valid interface index as they can't be used to join the // PTP multicast group reliably for this interface only. if index == 0 { + debug!("Interface {} has no valid interface index", adaptername); continue; } @@ -595,6 +627,8 @@ mod imp { ip_addr, hw_addr, }); + } else { + debug!("Interface {} has no IPv4 address", adaptername); } } @@ -653,13 +687,16 @@ mod imp { // lost other than the ability to run multiple processes at once. unsafe { let v = 1i32; - let _ = setsockopt( + let res = setsockopt( socket.as_raw_socket(), SOL_SOCKET as i32, SO_REUSEADDR as i32, &v as *const _ as *const _, mem::size_of_val(&v) as _, ); + if res < 0 { + warn!("Failed to set SO_REUSEADDR on socket"); + } } } } diff --git a/subprojects/gstreamer/libs/gst/net/gstptpclock.c b/subprojects/gstreamer/libs/gst/net/gstptpclock.c index b620731143..00a01fa465 100644 --- a/subprojects/gstreamer/libs/gst/net/gstptpclock.c +++ b/subprojects/gstreamer/libs/gst/net/gstptpclock.c @@ -245,7 +245,18 @@ typedef enum } StdIOMessageType; /* 2 byte BE payload size plus 1 byte message type */ -#define STDIO_MESSAGE_HEADER_SIZE (sizeof (guint16) + sizeof (guint8)) +#define STDIO_MESSAGE_HEADER_SIZE (3) + +/* 2 byte BE payload size. Payload format: + * - 1 byte GstDebugLevel + * - 2 byte BE filename length + * - filename UTF-8 string + * - 2 byte BE module path length + * - module path UTF-8 string + * - 4 byte BE line number + * - remainder is UTF-8 string + */ +#define STDERR_MESSAGE_HEADER_SIZE (2) static GMutex ptp_lock; static GCond ptp_cond; @@ -257,9 +268,12 @@ static gboolean supported = FALSE; #endif static GSubprocess *ptp_helper_process; static GInputStream *stdout_pipe; +static GInputStream *stderr_pipe; static GOutputStream *stdin_pipe; -static guint8 stdio_header[STDIO_MESSAGE_HEADER_SIZE]; /* buffer for reading the message header */ -static guint8 stdout_buffer[8192]; /* buffer for reading the message payload */ +static guint8 stdio_header[STDIO_MESSAGE_HEADER_SIZE]; /* buffer for reading the message header */ +static guint8 stdout_buffer[8192]; /* buffer for reading the message payload */ +static guint8 stderr_header[STDERR_MESSAGE_HEADER_SIZE]; /* buffer for reading the message header */ +static guint8 stderr_buffer[8192]; /* buffer for reading the message payload */ static GThread *ptp_helper_thread; static GMainContext *main_context; static GMainLoop *main_loop; @@ -270,6 +284,8 @@ static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 }; #define CUR_STDIO_HEADER_SIZE (GST_READ_UINT16_BE (stdio_header)) #define CUR_STDIO_HEADER_TYPE ((StdIOMessageType) stdio_header[2]) +#define CUR_STDERR_HEADER_SIZE (GST_READ_UINT16_BE (stderr_header)) + typedef struct { GstClockTime receive_time; @@ -1853,7 +1869,7 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res, /* And read the next header */ memset (&stdio_header, 0, STDIO_MESSAGE_HEADER_SIZE); - g_input_stream_read_all_async (stdout_pipe, &stdio_header, + g_input_stream_read_all_async (stdout_pipe, stdio_header, STDIO_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL, (GAsyncReadyCallback) have_stdout_header, NULL); } @@ -1878,7 +1894,6 @@ have_stdout_header (GInputStream * stdout_pipe, GAsyncResult * res, return; } else if (read == 0) { GST_ERROR ("Got EOF on stdin"); - g_main_loop_quit (main_loop); return; } else if (read != STDIO_MESSAGE_HEADER_SIZE) { GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read); @@ -1896,6 +1911,143 @@ have_stdout_header (GInputStream * stdout_pipe, GAsyncResult * res, (GAsyncReadyCallback) have_stdout_body, NULL); } +static void have_stderr_header (GInputStream * stderr_pipe, GAsyncResult * res, + gpointer user_data); + +static void +have_stderr_body (GInputStream * stderr_pipe, GAsyncResult * res, + gpointer user_data) +{ + GError *err = NULL; + gsize read; +#ifndef GST_DISABLE_GST_DEBUG + GstByteReader breader; + GstDebugLevel level; + guint16 filename_length; + gchar *filename = NULL; + guint16 module_path_length; + gchar *module_path = NULL; + guint32 line_number; + gchar *message = NULL; + guint16 message_length; + guint8 b; +#endif + + /* Finish reading the body */ + if (!g_input_stream_read_all_finish (stderr_pipe, res, &read, &err)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) || + g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) { + GST_ERROR ("Got EOF on stderr"); + } else { + GST_ERROR ("Failed to read header from stderr: %s", err->message); + } + g_clear_error (&err); + g_main_loop_quit (main_loop); + return; + } else if (read == 0) { + GST_ERROR ("Got EOF on stderr"); + g_main_loop_quit (main_loop); + return; + } else if (read != CUR_STDERR_HEADER_SIZE) { + GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read); + g_main_loop_quit (main_loop); + return; + } + +#ifndef GST_DISABLE_GST_DEBUG + gst_byte_reader_init (&breader, stderr_buffer, CUR_STDERR_HEADER_SIZE); + + if (!gst_byte_reader_get_uint8 (&breader, &b) || b > GST_LEVEL_MAX) + goto err; + level = (GstDebugLevel) b; + if (!gst_byte_reader_get_uint16_be (&breader, &filename_length) + || filename_length > gst_byte_reader_get_remaining (&breader)) + goto err; + filename = + g_strndup ((const gchar *) gst_byte_reader_get_data_unchecked (&breader, + filename_length), filename_length); + + if (!gst_byte_reader_get_uint16_be (&breader, &module_path_length) + || module_path_length > gst_byte_reader_get_remaining (&breader)) + goto err; + module_path = + g_strndup ((const gchar *) gst_byte_reader_get_data_unchecked (&breader, + module_path_length), module_path_length); + + if (!gst_byte_reader_get_uint32_be (&breader, &line_number)) + goto err; + + message_length = gst_byte_reader_get_remaining (&breader); + message = + g_strndup ((const gchar *) gst_byte_reader_get_data_unchecked (&breader, + message_length), message_length); + + gst_debug_log_literal (GST_CAT_DEFAULT, level, filename, module_path, + line_number, NULL, message); + + g_clear_pointer (&filename, g_free); + g_clear_pointer (&module_path, g_free); + g_clear_pointer (&message, g_free); +#endif + + /* And read the next header */ + memset (&stderr_header, 0, STDERR_MESSAGE_HEADER_SIZE); + g_input_stream_read_all_async (stderr_pipe, stderr_header, + STDERR_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL, + (GAsyncReadyCallback) have_stderr_header, NULL); + return; + +#ifndef GST_DISABLE_GST_DEBUG +err: + { + GST_ERROR ("Unexpected stderr data"); + g_clear_pointer (&filename, g_free); + g_clear_pointer (&module_path, g_free); + g_clear_pointer (&message, g_free); + g_main_loop_quit (main_loop); + return; + } +#endif +} + +static void +have_stderr_header (GInputStream * stderr_pipe, GAsyncResult * res, + gpointer user_data) +{ + GError *err = NULL; + gsize read; + + /* Finish reading the header */ + if (!g_input_stream_read_all_finish (stderr_pipe, res, &read, &err)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) || + g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) { + GST_ERROR ("Got EOF on stderr"); + } else { + GST_ERROR ("Failed to read header from stderr: %s", err->message); + } + g_clear_error (&err); + g_main_loop_quit (main_loop); + return; + } else if (read == 0) { + GST_ERROR ("Got EOF on stderr"); + g_main_loop_quit (main_loop); + return; + } else if (read != STDERR_MESSAGE_HEADER_SIZE) { + GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read); + g_main_loop_quit (main_loop); + return; + } else if (CUR_STDERR_HEADER_SIZE > 8192 || CUR_STDERR_HEADER_SIZE < 9) { + GST_ERROR ("Unexpected size: %u", CUR_STDERR_HEADER_SIZE); + g_main_loop_quit (main_loop); + return; + } + + /* And now read the body */ + g_input_stream_read_all_async (stderr_pipe, stderr_buffer, + CUR_STDERR_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL, + (GAsyncReadyCallback) have_stderr_body, NULL); +} + /* Cleanup all announce messages and announce message senders * that are timed out by now, and clean up all pending syncs * that are missing their FOLLOW_UP or DELAY_RESP */ @@ -1996,10 +2148,15 @@ ptp_helper_main (gpointer data) g_main_context_push_thread_default (main_context); memset (&stdio_header, 0, STDIO_MESSAGE_HEADER_SIZE); - g_input_stream_read_all_async (stdout_pipe, &stdio_header, + g_input_stream_read_all_async (stdout_pipe, stdio_header, STDIO_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL, (GAsyncReadyCallback) have_stdout_header, NULL); + memset (&stderr_header, 0, STDERR_MESSAGE_HEADER_SIZE); + g_input_stream_read_all_async (stderr_pipe, stderr_header, + STDERR_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL, + (GAsyncReadyCallback) have_stderr_header, NULL); + /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */ cleanup_source = g_timeout_source_new_seconds (5); g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT); @@ -2259,7 +2416,7 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces) if (interfaces != NULL) argc += 2 * g_strv_length (interfaces); - argv = g_new0 (gchar *, argc + 2); + argv = g_new0 (gchar *, argc + 3); argc_c = 0; /* Find the gst-ptp-helper */ @@ -2338,9 +2495,16 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces) } } + /* Check if the helper process should be verbose */ + env = g_getenv ("GST_PTP_HELPER_VERBOSE"); + if (env && g_ascii_strcasecmp (env, "no") != 0) { + argv[argc_c++] = g_strdup ("-v"); + } + ptp_helper_process = g_subprocess_newv ((const gchar * const *) argv, - G_SUBPROCESS_FLAGS_STDIN_PIPE | G_SUBPROCESS_FLAGS_STDOUT_PIPE, &err); + G_SUBPROCESS_FLAGS_STDIN_PIPE | G_SUBPROCESS_FLAGS_STDOUT_PIPE | + G_SUBPROCESS_FLAGS_STDERR_PIPE, &err); if (!ptp_helper_process) { GST_ERROR ("Failed to start ptp helper process: %s", err->message); g_clear_error (&err); @@ -2355,7 +2519,10 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces) stdout_pipe = g_subprocess_get_stdout_pipe (ptp_helper_process); if (stdout_pipe) g_object_ref (stdout_pipe); - if (!stdin_pipe || !stdout_pipe) { + stderr_pipe = g_subprocess_get_stderr_pipe (ptp_helper_process); + if (stderr_pipe) + g_object_ref (stderr_pipe); + if (!stdin_pipe || !stdout_pipe || !stderr_pipe) { GST_ERROR ("Failed to get ptp helper process pipes"); ret = FALSE; supported = FALSE; @@ -2404,6 +2571,7 @@ done: if (ptp_helper_process) { g_clear_object (&stdin_pipe); g_clear_object (&stdout_pipe); + g_clear_object (&stderr_pipe); g_subprocess_force_exit (ptp_helper_process); g_clear_object (&ptp_helper_process); } @@ -2453,6 +2621,7 @@ gst_ptp_deinit (void) if (ptp_helper_process) { g_clear_object (&stdin_pipe); g_clear_object (&stdout_pipe); + g_clear_object (&stderr_pipe); g_subprocess_force_exit (ptp_helper_process); g_clear_object (&ptp_helper_process); }