ptp: Add logging between the helper process and the main process via stderr

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4647>
This commit is contained in:
Sebastian Dröge 2023-05-16 14:53:24 +03:00 committed by GStreamer Marge Bot
parent ebc050d133
commit 87ca02bee7
7 changed files with 527 additions and 34 deletions

View file

@ -58,9 +58,7 @@ pub fn parse_args() -> Result<Args, Error> {
clock_id, clock_id,
}; };
if verbose { info!("Running with arguments {:#?}", args);
eprintln!("Running with arguments {:#?}", args);
}
Ok(args) Ok(args)
} }

View file

@ -31,6 +31,8 @@ pub mod unix {
// XXX: Once meson has cargo subproject support all of the below can be replaced with the libc crate. // XXX: Once meson has cargo subproject support all of the below can be replaced with the libc crate.
pub const STDIN_FILENO: RawFd = 0; pub const STDIN_FILENO: RawFd = 0;
pub const STDOUT_FILENO: RawFd = 1; pub const STDOUT_FILENO: RawFd = 1;
#[cfg(not(test))]
pub const STDERR_FILENO: RawFd = 2;
pub const O_RDONLY: c_int = 0; pub const O_RDONLY: c_int = 0;
pub const POLLIN: c_short = 0x1; pub const POLLIN: c_short = 0x1;
@ -547,8 +549,12 @@ pub mod windows {
// corresponding C headers, MSDN and related documentation. // corresponding C headers, MSDN and related documentation.
// //
// XXX: Once meson has cargo subproject support all of the below can be replaced with the windows-sys crate. // XXX: Once meson has cargo subproject support all of the below can be replaced with the windows-sys crate.
pub const INVALID_HANDLE_VALUE: HANDLE = (-1 as isize as usize) as HANDLE;
pub const STD_INPUT_HANDLE: i32 = -10; pub const STD_INPUT_HANDLE: i32 = -10;
pub const STD_OUTPUT_HANDLE: i32 = -11; pub const STD_OUTPUT_HANDLE: i32 = -11;
#[cfg(not(test))]
pub const STD_ERROR_HANDLE: i32 = -12;
pub const FILE_TYPE_CHAR: u32 = 0x0002; pub const FILE_TYPE_CHAR: u32 = 0x0002;
pub const FILE_TYPE_PIPE: u32 = 0x0003; pub const FILE_TYPE_PIPE: u32 = 0x0003;

View file

@ -308,6 +308,42 @@ mod imp {
Ok(()) Ok(())
} }
} }
/// Raw, unbuffered handle to `stderr`.
///
/// This implements the `Write` trait for writing and is implemented as a singleton to allow
/// usage from everywhere at any time for logging purposes.
///
/// This does not implement any locking so usage from multiple threads at once will likely
/// cause interleaved output.
pub struct Stderr(RawFd);
impl Stderr {
#[cfg(not(test))]
pub fn acquire() -> Self {
Stderr(STDERR_FILENO)
}
}
impl Write for Stderr {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// SAFETY: write() requires a valid fd and a mutable buffer with the given size.
// The fd is valid by construction as is the buffer.
//
// write() will return the number of bytes written or a negative value on errors.
let res = unsafe { write(self.0, buf.as_ptr(), buf.len()) };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res as usize)
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
} }
#[cfg(windows)] #[cfg(windows)]
@ -352,7 +388,7 @@ mod imp {
// that has to be closed again later. // that has to be closed again later.
unsafe { unsafe {
let event = WSACreateEvent(); let event = WSACreateEvent();
if event.is_null() || event as isize == -1 { if event.is_null() || event == INVALID_HANDLE_VALUE {
Err(io::Error::from_raw_os_error(WSAGetLastError())) Err(io::Error::from_raw_os_error(WSAGetLastError()))
} else { } else {
Ok(EventHandle(event)) Ok(EventHandle(event))
@ -729,7 +765,7 @@ mod imp {
let handle = GetStdHandle(STD_INPUT_HANDLE); let handle = GetStdHandle(STD_INPUT_HANDLE);
if handle.is_null() { if handle.is_null() {
bail!("No stdin handle set"); bail!("No stdin handle set");
} else if handle as isize == -1 { } else if handle == INVALID_HANDLE_VALUE {
bail!(source: io::Error::last_os_error(), "Can't get stdin handle"); bail!(source: io::Error::last_os_error(), "Can't get stdin handle");
} }
@ -932,7 +968,7 @@ mod imp {
let handle = GetStdHandle(STD_OUTPUT_HANDLE); let handle = GetStdHandle(STD_OUTPUT_HANDLE);
if handle.is_null() { if handle.is_null() {
bail!("No stdout handle set"); bail!("No stdout handle set");
} else if handle as isize == -1 { } else if handle == INVALID_HANDLE_VALUE {
bail!( bail!(
source: io::Error::last_os_error(), source: io::Error::last_os_error(),
"Can't get stdout handle" "Can't get stdout handle"
@ -993,9 +1029,107 @@ mod imp {
Ok(()) Ok(())
} }
} }
/// Raw, unbuffered handle to `stderr`.
///
/// This implements the `Write` trait for writing and is implemented as a singleton to allow
/// usage from everywhere at any time for logging purposes.
///
/// This does not implement any locking so usage from multiple threads at once will likely
/// cause interleaved output.
pub struct Stderr(HANDLE);
impl Stderr {
#[cfg(not(test))]
pub fn acquire() -> Self {
use std::sync::Once;
struct SyncHandle(HANDLE);
// SAFETY: This is a single-threaded application and even otherwise writing from
// multiple threads at once to a pipe is safe and will only cause interleaved output.
unsafe impl Send for SyncHandle {}
unsafe impl Sync for SyncHandle {}
static mut STDERR: SyncHandle = SyncHandle(INVALID_HANDLE_VALUE);
static STDERR_ONCE: Once = Once::new();
STDERR_ONCE.call_once(|| {
// SAFETY: GetStdHandle returns a borrowed handle, or 0 if none is set or -1 if an
// error has happened.
let handle = unsafe {
let handle = GetStdHandle(STD_ERROR_HANDLE);
if handle.is_null() {
return;
} else if handle == INVALID_HANDLE_VALUE {
return;
}
handle
};
// SAFETY: GetFileType() is safe to call on any valid handle.
let type_ = unsafe { GetFileType(handle) };
if type_ == FILE_TYPE_CHAR {
// Set the console to raw mode.
//
// SAFETY: Calling this on non-console handles will cause an error but otherwise
// have no negative effects. We can safely change the console mode here as nothing
// else is accessing the console.
unsafe {
let _ = SetConsoleMode(handle, 0);
}
} else if type_ != FILE_TYPE_PIPE {
return;
}
// SAFETY: Only accessed in this function and multiple mutable accesses are
// prevented by the `Once`.
unsafe {
STDERR.0 = handle;
}
});
// SAFETY: Only accesses immutably here and all mutable accesses are serialized above
// by the `Once`.
Stderr(unsafe { STDERR.0 })
}
}
impl Write for Stderr {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.0 == INVALID_HANDLE_VALUE {
return Ok(buf.len());
}
// SAFETY: Writes the given number of bytes to stderr or at most u32::MAX. On error
// zero is returned, otherwise the number of bytes written is set accordingly and
// returned.
unsafe {
let mut lpnumberofbyteswritten = mem::MaybeUninit::uninit();
let res = WriteFile(
self.0,
buf.as_ptr(),
cmp::min(buf.len() as u32, u32::MAX) as u32,
lpnumberofbyteswritten.as_mut_ptr(),
ptr::null_mut(),
);
if res == 0 {
Err(io::Error::last_os_error())
} else {
Ok(lpnumberofbyteswritten.assume_init() as usize)
}
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
} }
pub use self::imp::{Poll, Stdin, Stdout}; pub use self::imp::{Poll, Stderr, Stdin, Stdout};
#[cfg(test)] #[cfg(test)]
mod test { mod test {

View file

@ -0,0 +1,109 @@
// GStreamer
//
// Copyright (C) 2015-2023 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
#[cfg(not(test))]
use crate::io::Stderr;
#[cfg(not(test))]
use std::io::{self, Cursor, Write};
#[derive(Copy, Clone, PartialEq, Eq)]
#[repr(u8)]
#[allow(dead_code)]
pub enum LogLevel {
Error = 1,
Warning = 2,
_Fixme = 3,
Info = 4,
Debug = 5,
_Log = 6,
Trace = 7,
}
/*
* - 1 byte GstDebugLevel
* - 2 byte BE filename length
* - filename UTF-8 string
* - 2 byte BE module path length
* - module path UTF-8 string
* - 4 byte BE line number
* - remainder is UTF-8 string
*/
#[cfg(test)]
pub fn log(
_level: LogLevel,
_file: &str,
_module_path: &str,
_line: u32,
_args: std::fmt::Arguments,
) {
}
#[cfg(not(test))]
pub fn log(level: LogLevel, file: &str, module_path: &str, line: u32, args: std::fmt::Arguments) {
let mut stderr = Stderr::acquire();
let mut buffer = [0u8; 8192];
let mut cursor = Cursor::new(&mut buffer[..]);
// Silently ignore errors. What was written to the buffer was written and if there's more data
// than fits it will simply be cut off in the end.
let _ = (|| -> Result<(), io::Error> {
cursor.write_all(&[0u8, 0u8])?;
cursor.write_all(&[level as u8])?;
cursor.write_all(&(file.len() as u16).to_be_bytes())?;
cursor.write_all(file.as_bytes())?;
cursor.write_all(&(module_path.len() as u16).to_be_bytes())?;
cursor.write_all(module_path.as_bytes())?;
cursor.write_all(&line.to_be_bytes())?;
cursor.write_fmt(args)?;
Ok(())
})();
let pos = cursor.position() as u16;
if pos < 2 {
return;
}
cursor.set_position(0);
let _ = cursor.write_all(&(pos - 2).to_be_bytes());
let _ = stderr.write_all(&buffer[..pos as usize]);
}
#[allow(unused_macros)]
macro_rules! error {
($format:expr $(, $arg:expr)* $(,)?) => {{
$crate::log::log($crate::log::LogLevel::Error, file!(), module_path!(), line!(), format_args!($format, $($arg),*));
}};
}
#[allow(unused_macros)]
macro_rules! warn {
($format:expr $(, $arg:expr)* $(,)?) => {{
$crate::log::log($crate::log::LogLevel::Warning, file!(), module_path!(), line!(), format_args!($format, $($arg),*));
}};
}
#[allow(unused_macros)]
macro_rules! info {
($format:expr $(, $arg:expr)* $(,)?) => {{
$crate::log::log($crate::log::LogLevel::Info, file!(), module_path!(), line!(), format_args!($format, $($arg),*));
}};
}
#[allow(unused_macros)]
macro_rules! debug {
($format:expr $(, $arg:expr)* $(,)?) => {{
$crate::log::log($crate::log::LogLevel::Debug, file!(), module_path!(), line!(), format_args!($format, $($arg),*));
}};
}
#[allow(unused_macros)]
macro_rules! trace {
($format:expr $(, $arg:expr)* $(,)?) => {{
$crate::log::log($crate::log::LogLevel::Trace, file!(), module_path!(), line!(), format_args!($format, $($arg),*));
}};
}

View file

@ -22,6 +22,9 @@ use std::{
net::{Ipv4Addr, SocketAddr, UdpSocket}, net::{Ipv4Addr, SocketAddr, UdpSocket},
}; };
#[macro_use]
mod log;
mod args; mod args;
mod error; mod error;
mod ffi; mod ffi;
@ -93,6 +96,8 @@ fn join_multicast(
} }
} }
info!("Interface {} filtered out", iface.name);
false false
}); });
@ -104,6 +109,10 @@ fn join_multicast(
} }
} }
for iface in &ifaces {
info!("Binding to interface {}", iface.name);
}
for socket in [&event_socket, &general_socket].iter() { for socket in [&event_socket, &general_socket].iter() {
for iface in &ifaces { for iface in &ifaces {
net::join_multicast_v4(socket, &PTP_MULTICAST_ADDR, iface) net::join_multicast_v4(socket, &PTP_MULTICAST_ADDR, iface)
@ -126,10 +135,12 @@ fn join_multicast(
args.clock_id.to_be_bytes() args.clock_id.to_be_bytes()
}; };
info!("Using clock ID {:?}", clock_id);
Ok(clock_id) Ok(clock_id)
} }
fn main() -> Result<(), Error> { fn run() -> Result<(), Error> {
let args = args::parse_args().context("Failed parsing commandline parameters")?; let args = args::parse_args().context("Failed parsing commandline parameters")?;
let event_socket = create_socket(PTP_EVENT_PORT).context("Failed creating event socket")?; let event_socket = create_socket(PTP_EVENT_PORT).context("Failed creating event socket")?;
@ -176,8 +187,8 @@ fn main() -> Result<(), Error> {
{ {
let idx = idx as u8; let idx = idx as u8;
let res = match idx { let res = match idx {
MSG_TYPE_EVENT => poll.event_socket().recv(&mut socket_buffer), MSG_TYPE_EVENT => poll.event_socket().recv_from(&mut socket_buffer),
MSG_TYPE_GENERAL => poll.general_socket().recv(&mut socket_buffer), MSG_TYPE_GENERAL => poll.general_socket().recv_from(&mut socket_buffer),
_ => unreachable!(), _ => unreachable!(),
}; };
@ -192,7 +203,15 @@ fn main() -> Result<(), Error> {
if idx == 0 { "event" } else { "general" } if idx == 0 { "event" } else { "general" }
); );
} }
Ok(read) => { Ok((read, addr)) => {
if args.verbose {
trace!(
"Received {} bytes from {} socket from {}",
read,
if idx == 0 { "event" } else { "general" },
addr
);
}
stdinout_buffer[0..2].copy_from_slice(&(read as u16).to_be_bytes()); stdinout_buffer[0..2].copy_from_slice(&(read as u16).to_be_bytes());
stdinout_buffer[2] = idx; stdinout_buffer[2] = idx;
stdinout_buffer[3..][..read].copy_from_slice(&socket_buffer[..read]); stdinout_buffer[3..][..read].copy_from_slice(&socket_buffer[..read]);
@ -217,6 +236,14 @@ fn main() -> Result<(), Error> {
} }
let type_ = stdinout_buffer[2]; let type_ = stdinout_buffer[2];
if args.verbose {
trace!(
"Received {} bytes for {} socket from stdin",
size,
if type_ == 0 { "event" } else { "general" },
);
}
poll.stdin() poll.stdin()
.read_exact(&mut stdinout_buffer[0..size as usize]) .read_exact(&mut stdinout_buffer[0..size as usize])
.context("Failed reading packet body from stdin")?; .context("Failed reading packet body from stdin")?;
@ -240,3 +267,16 @@ fn main() -> Result<(), Error> {
} }
} }
} }
/// Custom panic hook so we can print them to stderr in a format the main process understands
fn panic_hook(info: &std::panic::PanicInfo) {
error!("Panicked. {}", info);
}
fn main() {
std::panic::set_hook(Box::new(panic_hook));
if let Err(err) = run() {
error!("Exited with error: {:?}", err);
}
}

View file

@ -116,16 +116,23 @@ mod imp {
// Skip loopback interfaces, interfaces that are not up and interfaces that can't do // Skip loopback interfaces, interfaces that are not up and interfaces that can't do
// multicast. These are all unusable for PTP purposes. // multicast. These are all unusable for PTP purposes.
let flags = ifaddr.ifa_flags; let flags = ifaddr.ifa_flags;
if (flags & IFF_LOOPBACK as u32 != 0) if flags & IFF_LOOPBACK as u32 != 0 {
|| (flags & IFF_UP as u32 == 0) debug!("Interface {} is loopback interface", name);
|| (flags & IFF_MULTICAST as u32 == 0) continue;
{ }
if flags & IFF_UP as u32 == 0 {
debug!("Interface {} is not up", name);
continue;
}
if flags & IFF_MULTICAST as u32 == 0 {
debug!("Interface {} does not support multicast", name);
continue; continue;
} }
// If the interface has no address then skip it. Only interfaces with IPv4 addresses // If the interface has no address then skip it. Only interfaces with IPv4 addresses
// are usable for PTP purposes. // are usable for PTP purposes.
if ifaddr.ifa_addr.is_null() { if ifaddr.ifa_addr.is_null() {
debug!("Interface {} has no IPv4 address", name);
continue; continue;
} }
@ -147,6 +154,7 @@ mod imp {
break index; break index;
}; };
if index == 0 { if index == 0 {
debug!("Interface {} has no valid interface index", name);
continue; continue;
} }
@ -164,6 +172,8 @@ mod imp {
let addr = unsafe { &*(ifaddr.ifa_addr as *const sockaddr_in) }; let addr = unsafe { &*(ifaddr.ifa_addr as *const sockaddr_in) };
let ip_addr = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); let ip_addr = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes());
debug!("Interface {} has IPv4 address {}", name, ip_addr);
if let Some(if_info) = if_infos.iter_mut().find(|info| info.name == name) { if let Some(if_info) = if_infos.iter_mut().find(|info| info.name == name) {
if if_info.ip_addr.is_broadcast() { if if_info.ip_addr.is_broadcast() {
if_info.ip_addr = ip_addr; if_info.ip_addr = ip_addr;
@ -188,6 +198,9 @@ mod imp {
if addr.sll_halen == 6 { if addr.sll_halen == 6 {
let mut hw_addr = [0u8; 6]; let mut hw_addr = [0u8; 6];
hw_addr.copy_from_slice(&addr.sll_addr[0..6]); hw_addr.copy_from_slice(&addr.sll_addr[0..6]);
debug!("Interface {} has MAC address {:?}", name, hw_addr);
if let Some(if_info) = if_infos.iter_mut().find(|info| info.name == name) { if let Some(if_info) = if_infos.iter_mut().find(|info| info.name == name) {
if if_info.hw_addr.is_none() { if if_info.hw_addr.is_none() {
if_info.hw_addr = Some(hw_addr); if_info.hw_addr = Some(hw_addr);
@ -224,6 +237,8 @@ mod imp {
hw_addr.copy_from_slice(sdl_addr); hw_addr.copy_from_slice(sdl_addr);
} }
debug!("Interface {} has MAC address {:?}", name, hw_addr);
if let Some(if_info) = if_infos.iter_mut().find(|info| info.name == name) { if let Some(if_info) = if_infos.iter_mut().find(|info| info.name == name) {
if if_info.hw_addr.is_none() { if if_info.hw_addr.is_none() {
if_info.hw_addr = Some(hw_addr); if_info.hw_addr = Some(hw_addr);
@ -315,13 +330,16 @@ mod imp {
// lost other than the ability to run multiple processes at once. // lost other than the ability to run multiple processes at once.
unsafe { unsafe {
let v = 1i32; let v = 1i32;
let _ = setsockopt( let res = setsockopt(
socket.as_raw_fd(), socket.as_raw_fd(),
SOL_SOCKET, SOL_SOCKET,
SO_REUSEADDR, SO_REUSEADDR,
&v as *const _ as *const _, &v as *const _ as *const _,
mem::size_of_val(&v) as u32, mem::size_of_val(&v) as u32,
); );
if res < 0 {
warn!("Failed to set SO_REUSEADDR on socket");
}
} }
#[cfg(not(any(target_os = "solaris", target_os = "illumos")))] #[cfg(not(any(target_os = "solaris", target_os = "illumos")))]
@ -333,13 +351,17 @@ mod imp {
// lost other than the ability to run multiple processes at once. // lost other than the ability to run multiple processes at once.
unsafe { unsafe {
let v = 1i32; let v = 1i32;
let _ = setsockopt( let res = setsockopt(
socket.as_raw_fd(), socket.as_raw_fd(),
SOL_SOCKET, SOL_SOCKET,
SO_REUSEPORT, SO_REUSEPORT,
&v as *const _ as *const _, &v as *const _ as *const _,
mem::size_of_val(&v) as u32, mem::size_of_val(&v) as u32,
); );
if res < 0 {
warn!("Failed to set SO_REUSEPORT on socket");
}
} }
} }
} }
@ -515,17 +537,26 @@ mod imp {
// Skip adapters that are receive-only, can't do multicast or don't have IPv4 support // Skip adapters that are receive-only, can't do multicast or don't have IPv4 support
// as they're not usable in a PTP context. // as they're not usable in a PTP context.
if address.flags & ADAPTER_FLAG_RECEIVE_ONLY != 0 if address.flags & ADAPTER_FLAG_RECEIVE_ONLY != 0 {
|| address.flags & ADAPTER_FLAG_NO_MULTICAST != 0 debug!("Interface {} is receive-only interface", adaptername);
|| address.flags & ADAPTER_FLAG_IPV4_ENABLED == 0 continue;
{ }
if address.flags & ADAPTER_FLAG_NO_MULTICAST != 0 {
debug!("Interface {} does not support multicast", adaptername);
continue;
}
if address.flags & ADAPTER_FLAG_IPV4_ENABLED == 0 {
debug!("Interface {} has no IPv4 address", adaptername);
continue; continue;
} }
// Skip adapters that are loopback or not up. // Skip adapters that are loopback or not up.
if address.iftype == IF_TYPE_SOFTWARE_LOOPBACK if address.iftype == IF_TYPE_SOFTWARE_LOOPBACK {
|| address.operstatus != IF_OPER_STATUS_UP debug!("Interface {} is loopback interface", adaptername);
{ continue;
}
if address.operstatus != IF_OPER_STATUS_UP {
debug!("Interface {} is not up", adaptername);
continue; continue;
} }
@ -534,6 +565,7 @@ mod imp {
// Skip adapters that have no valid interface index as they can't be used to join the // Skip adapters that have no valid interface index as they can't be used to join the
// PTP multicast group reliably for this interface only. // PTP multicast group reliably for this interface only.
if index == 0 { if index == 0 {
debug!("Interface {} has no valid interface index", adaptername);
continue; continue;
} }
@ -595,6 +627,8 @@ mod imp {
ip_addr, ip_addr,
hw_addr, hw_addr,
}); });
} else {
debug!("Interface {} has no IPv4 address", adaptername);
} }
} }
@ -653,13 +687,16 @@ mod imp {
// lost other than the ability to run multiple processes at once. // lost other than the ability to run multiple processes at once.
unsafe { unsafe {
let v = 1i32; let v = 1i32;
let _ = setsockopt( let res = setsockopt(
socket.as_raw_socket(), socket.as_raw_socket(),
SOL_SOCKET as i32, SOL_SOCKET as i32,
SO_REUSEADDR as i32, SO_REUSEADDR as i32,
&v as *const _ as *const _, &v as *const _ as *const _,
mem::size_of_val(&v) as _, mem::size_of_val(&v) as _,
); );
if res < 0 {
warn!("Failed to set SO_REUSEADDR on socket");
}
} }
} }
} }

View file

@ -245,7 +245,18 @@ typedef enum
} StdIOMessageType; } StdIOMessageType;
/* 2 byte BE payload size plus 1 byte message type */ /* 2 byte BE payload size plus 1 byte message type */
#define STDIO_MESSAGE_HEADER_SIZE (sizeof (guint16) + sizeof (guint8)) #define STDIO_MESSAGE_HEADER_SIZE (3)
/* 2 byte BE payload size. Payload format:
* - 1 byte GstDebugLevel
* - 2 byte BE filename length
* - filename UTF-8 string
* - 2 byte BE module path length
* - module path UTF-8 string
* - 4 byte BE line number
* - remainder is UTF-8 string
*/
#define STDERR_MESSAGE_HEADER_SIZE (2)
static GMutex ptp_lock; static GMutex ptp_lock;
static GCond ptp_cond; static GCond ptp_cond;
@ -257,9 +268,12 @@ static gboolean supported = FALSE;
#endif #endif
static GSubprocess *ptp_helper_process; static GSubprocess *ptp_helper_process;
static GInputStream *stdout_pipe; static GInputStream *stdout_pipe;
static GInputStream *stderr_pipe;
static GOutputStream *stdin_pipe; static GOutputStream *stdin_pipe;
static guint8 stdio_header[STDIO_MESSAGE_HEADER_SIZE]; /* buffer for reading the message header */ static guint8 stdio_header[STDIO_MESSAGE_HEADER_SIZE]; /* buffer for reading the message header */
static guint8 stdout_buffer[8192]; /* buffer for reading the message payload */ static guint8 stdout_buffer[8192]; /* buffer for reading the message payload */
static guint8 stderr_header[STDERR_MESSAGE_HEADER_SIZE]; /* buffer for reading the message header */
static guint8 stderr_buffer[8192]; /* buffer for reading the message payload */
static GThread *ptp_helper_thread; static GThread *ptp_helper_thread;
static GMainContext *main_context; static GMainContext *main_context;
static GMainLoop *main_loop; static GMainLoop *main_loop;
@ -270,6 +284,8 @@ static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
#define CUR_STDIO_HEADER_SIZE (GST_READ_UINT16_BE (stdio_header)) #define CUR_STDIO_HEADER_SIZE (GST_READ_UINT16_BE (stdio_header))
#define CUR_STDIO_HEADER_TYPE ((StdIOMessageType) stdio_header[2]) #define CUR_STDIO_HEADER_TYPE ((StdIOMessageType) stdio_header[2])
#define CUR_STDERR_HEADER_SIZE (GST_READ_UINT16_BE (stderr_header))
typedef struct typedef struct
{ {
GstClockTime receive_time; GstClockTime receive_time;
@ -1853,7 +1869,7 @@ have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res,
/* And read the next header */ /* And read the next header */
memset (&stdio_header, 0, STDIO_MESSAGE_HEADER_SIZE); memset (&stdio_header, 0, STDIO_MESSAGE_HEADER_SIZE);
g_input_stream_read_all_async (stdout_pipe, &stdio_header, g_input_stream_read_all_async (stdout_pipe, stdio_header,
STDIO_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL, STDIO_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback) have_stdout_header, NULL); (GAsyncReadyCallback) have_stdout_header, NULL);
} }
@ -1878,7 +1894,6 @@ have_stdout_header (GInputStream * stdout_pipe, GAsyncResult * res,
return; return;
} else if (read == 0) { } else if (read == 0) {
GST_ERROR ("Got EOF on stdin"); GST_ERROR ("Got EOF on stdin");
g_main_loop_quit (main_loop);
return; return;
} else if (read != STDIO_MESSAGE_HEADER_SIZE) { } else if (read != STDIO_MESSAGE_HEADER_SIZE) {
GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read); GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
@ -1896,6 +1911,143 @@ have_stdout_header (GInputStream * stdout_pipe, GAsyncResult * res,
(GAsyncReadyCallback) have_stdout_body, NULL); (GAsyncReadyCallback) have_stdout_body, NULL);
} }
static void have_stderr_header (GInputStream * stderr_pipe, GAsyncResult * res,
gpointer user_data);
static void
have_stderr_body (GInputStream * stderr_pipe, GAsyncResult * res,
gpointer user_data)
{
GError *err = NULL;
gsize read;
#ifndef GST_DISABLE_GST_DEBUG
GstByteReader breader;
GstDebugLevel level;
guint16 filename_length;
gchar *filename = NULL;
guint16 module_path_length;
gchar *module_path = NULL;
guint32 line_number;
gchar *message = NULL;
guint16 message_length;
guint8 b;
#endif
/* Finish reading the body */
if (!g_input_stream_read_all_finish (stderr_pipe, res, &read, &err)) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) ||
g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) {
GST_ERROR ("Got EOF on stderr");
} else {
GST_ERROR ("Failed to read header from stderr: %s", err->message);
}
g_clear_error (&err);
g_main_loop_quit (main_loop);
return;
} else if (read == 0) {
GST_ERROR ("Got EOF on stderr");
g_main_loop_quit (main_loop);
return;
} else if (read != CUR_STDERR_HEADER_SIZE) {
GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
g_main_loop_quit (main_loop);
return;
}
#ifndef GST_DISABLE_GST_DEBUG
gst_byte_reader_init (&breader, stderr_buffer, CUR_STDERR_HEADER_SIZE);
if (!gst_byte_reader_get_uint8 (&breader, &b) || b > GST_LEVEL_MAX)
goto err;
level = (GstDebugLevel) b;
if (!gst_byte_reader_get_uint16_be (&breader, &filename_length)
|| filename_length > gst_byte_reader_get_remaining (&breader))
goto err;
filename =
g_strndup ((const gchar *) gst_byte_reader_get_data_unchecked (&breader,
filename_length), filename_length);
if (!gst_byte_reader_get_uint16_be (&breader, &module_path_length)
|| module_path_length > gst_byte_reader_get_remaining (&breader))
goto err;
module_path =
g_strndup ((const gchar *) gst_byte_reader_get_data_unchecked (&breader,
module_path_length), module_path_length);
if (!gst_byte_reader_get_uint32_be (&breader, &line_number))
goto err;
message_length = gst_byte_reader_get_remaining (&breader);
message =
g_strndup ((const gchar *) gst_byte_reader_get_data_unchecked (&breader,
message_length), message_length);
gst_debug_log_literal (GST_CAT_DEFAULT, level, filename, module_path,
line_number, NULL, message);
g_clear_pointer (&filename, g_free);
g_clear_pointer (&module_path, g_free);
g_clear_pointer (&message, g_free);
#endif
/* And read the next header */
memset (&stderr_header, 0, STDERR_MESSAGE_HEADER_SIZE);
g_input_stream_read_all_async (stderr_pipe, stderr_header,
STDERR_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback) have_stderr_header, NULL);
return;
#ifndef GST_DISABLE_GST_DEBUG
err:
{
GST_ERROR ("Unexpected stderr data");
g_clear_pointer (&filename, g_free);
g_clear_pointer (&module_path, g_free);
g_clear_pointer (&message, g_free);
g_main_loop_quit (main_loop);
return;
}
#endif
}
static void
have_stderr_header (GInputStream * stderr_pipe, GAsyncResult * res,
gpointer user_data)
{
GError *err = NULL;
gsize read;
/* Finish reading the header */
if (!g_input_stream_read_all_finish (stderr_pipe, res, &read, &err)) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) ||
g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) {
GST_ERROR ("Got EOF on stderr");
} else {
GST_ERROR ("Failed to read header from stderr: %s", err->message);
}
g_clear_error (&err);
g_main_loop_quit (main_loop);
return;
} else if (read == 0) {
GST_ERROR ("Got EOF on stderr");
g_main_loop_quit (main_loop);
return;
} else if (read != STDERR_MESSAGE_HEADER_SIZE) {
GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
g_main_loop_quit (main_loop);
return;
} else if (CUR_STDERR_HEADER_SIZE > 8192 || CUR_STDERR_HEADER_SIZE < 9) {
GST_ERROR ("Unexpected size: %u", CUR_STDERR_HEADER_SIZE);
g_main_loop_quit (main_loop);
return;
}
/* And now read the body */
g_input_stream_read_all_async (stderr_pipe, stderr_buffer,
CUR_STDERR_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback) have_stderr_body, NULL);
}
/* Cleanup all announce messages and announce message senders /* Cleanup all announce messages and announce message senders
* that are timed out by now, and clean up all pending syncs * that are timed out by now, and clean up all pending syncs
* that are missing their FOLLOW_UP or DELAY_RESP */ * that are missing their FOLLOW_UP or DELAY_RESP */
@ -1996,10 +2148,15 @@ ptp_helper_main (gpointer data)
g_main_context_push_thread_default (main_context); g_main_context_push_thread_default (main_context);
memset (&stdio_header, 0, STDIO_MESSAGE_HEADER_SIZE); memset (&stdio_header, 0, STDIO_MESSAGE_HEADER_SIZE);
g_input_stream_read_all_async (stdout_pipe, &stdio_header, g_input_stream_read_all_async (stdout_pipe, stdio_header,
STDIO_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL, STDIO_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback) have_stdout_header, NULL); (GAsyncReadyCallback) have_stdout_header, NULL);
memset (&stderr_header, 0, STDERR_MESSAGE_HEADER_SIZE);
g_input_stream_read_all_async (stderr_pipe, stderr_header,
STDERR_MESSAGE_HEADER_SIZE, G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback) have_stderr_header, NULL);
/* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */ /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
cleanup_source = g_timeout_source_new_seconds (5); cleanup_source = g_timeout_source_new_seconds (5);
g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT); g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
@ -2259,7 +2416,7 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces)
if (interfaces != NULL) if (interfaces != NULL)
argc += 2 * g_strv_length (interfaces); argc += 2 * g_strv_length (interfaces);
argv = g_new0 (gchar *, argc + 2); argv = g_new0 (gchar *, argc + 3);
argc_c = 0; argc_c = 0;
/* Find the gst-ptp-helper */ /* Find the gst-ptp-helper */
@ -2338,9 +2495,16 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces)
} }
} }
/* Check if the helper process should be verbose */
env = g_getenv ("GST_PTP_HELPER_VERBOSE");
if (env && g_ascii_strcasecmp (env, "no") != 0) {
argv[argc_c++] = g_strdup ("-v");
}
ptp_helper_process = ptp_helper_process =
g_subprocess_newv ((const gchar * const *) argv, g_subprocess_newv ((const gchar * const *) argv,
G_SUBPROCESS_FLAGS_STDIN_PIPE | G_SUBPROCESS_FLAGS_STDOUT_PIPE, &err); G_SUBPROCESS_FLAGS_STDIN_PIPE | G_SUBPROCESS_FLAGS_STDOUT_PIPE |
G_SUBPROCESS_FLAGS_STDERR_PIPE, &err);
if (!ptp_helper_process) { if (!ptp_helper_process) {
GST_ERROR ("Failed to start ptp helper process: %s", err->message); GST_ERROR ("Failed to start ptp helper process: %s", err->message);
g_clear_error (&err); g_clear_error (&err);
@ -2355,7 +2519,10 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces)
stdout_pipe = g_subprocess_get_stdout_pipe (ptp_helper_process); stdout_pipe = g_subprocess_get_stdout_pipe (ptp_helper_process);
if (stdout_pipe) if (stdout_pipe)
g_object_ref (stdout_pipe); g_object_ref (stdout_pipe);
if (!stdin_pipe || !stdout_pipe) { stderr_pipe = g_subprocess_get_stderr_pipe (ptp_helper_process);
if (stderr_pipe)
g_object_ref (stderr_pipe);
if (!stdin_pipe || !stdout_pipe || !stderr_pipe) {
GST_ERROR ("Failed to get ptp helper process pipes"); GST_ERROR ("Failed to get ptp helper process pipes");
ret = FALSE; ret = FALSE;
supported = FALSE; supported = FALSE;
@ -2404,6 +2571,7 @@ done:
if (ptp_helper_process) { if (ptp_helper_process) {
g_clear_object (&stdin_pipe); g_clear_object (&stdin_pipe);
g_clear_object (&stdout_pipe); g_clear_object (&stdout_pipe);
g_clear_object (&stderr_pipe);
g_subprocess_force_exit (ptp_helper_process); g_subprocess_force_exit (ptp_helper_process);
g_clear_object (&ptp_helper_process); g_clear_object (&ptp_helper_process);
} }
@ -2453,6 +2621,7 @@ gst_ptp_deinit (void)
if (ptp_helper_process) { if (ptp_helper_process) {
g_clear_object (&stdin_pipe); g_clear_object (&stdin_pipe);
g_clear_object (&stdout_pipe); g_clear_object (&stdout_pipe);
g_clear_object (&stderr_pipe);
g_subprocess_force_exit (ptp_helper_process); g_subprocess_force_exit (ptp_helper_process);
g_clear_object (&ptp_helper_process); g_clear_object (&ptp_helper_process);
} }