Improved timestamps calculation

Due to the possibility to connect to two or more streams simultaneously with different clocks synchronization It's necessary to improve the timestamps calculation to detect this.

Prior to this commit, we saved the first timestamp that arrive and use it to calculate the running time of the stream for the rest of frames (pts field in gstreamer buffer) in all of the streams. This lead to problems when connecting to multiple streams in multiple computers and the clocks were not correctly synchronized.

To fix this, now we save a different initial timestamp for each stream.
This commit is contained in:
Daniel Vilar 2018-12-10 13:27:23 +01:00
parent f06ac8c035
commit 678c5876de
3 changed files with 41 additions and 16 deletions

View file

@ -41,16 +41,15 @@ struct ndi_receiver_info {
video: bool, video: bool,
audio: bool, audio: bool,
ndi_instance: NdiInstance, ndi_instance: NdiInstance,
initial_timestamp: u64,
id: i8, id: i8,
} }
struct Ndi { struct Ndi {
initial_timestamp: u64,
start_pts: gst::ClockTime, start_pts: gst::ClockTime,
} }
static mut ndi_struct: Ndi = Ndi { static mut ndi_struct: Ndi = Ndi {
initial_timestamp: 0,
start_pts: gst::ClockTime(Some(0)), start_pts: gst::ClockTime(Some(0)),
}; };
@ -214,6 +213,7 @@ fn connect_ndi(cat: gst::DebugCategory, element: &BaseSrc, ip: &str, stream_name
video, video,
audio, audio,
ndi_instance: NdiInstance { recv: pNDI_recv }, ndi_instance: NdiInstance { recv: pNDI_recv },
initial_timestamp: 0,
id: id_receiver, id: id_receiver,
}, },
); );

View file

@ -80,6 +80,7 @@ impl Default for State {
struct TimestampData { struct TimestampData {
offset: u64, offset: u64,
initial_timestamp: u64,
} }
struct NdiAudioSrc { struct NdiAudioSrc {
@ -102,7 +103,7 @@ impl NdiAudioSrc {
), ),
settings: Mutex::new(Default::default()), settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()), state: Mutex::new(Default::default()),
timestamp_data: Mutex::new(TimestampData { offset: 0 }), timestamp_data: Mutex::new(TimestampData { offset: 0 , initial_timestamp: 0}),
}) })
} }
@ -228,10 +229,10 @@ impl ElementImpl<BaseSrc> for NdiAudioSrc {
transition: gst::StateChange, transition: gst::StateChange,
) -> gst::StateChangeReturn { ) -> gst::StateChangeReturn {
if transition == gst::StateChange::PausedToPlaying { if transition == gst::StateChange::PausedToPlaying {
let receivers = hashmap_receivers.lock().unwrap(); let mut receivers = hashmap_receivers.lock().unwrap();
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let receiver = receivers.get(&settings.id_receiver).unwrap(); let receiver = receivers.get_mut(&settings.id_receiver).unwrap();
let recv = &receiver.ndi_instance; let recv = &receiver.ndi_instance;
let pNDI_recv = recv.recv; let pNDI_recv = recv.recv;
@ -249,10 +250,13 @@ impl ElementImpl<BaseSrc> for NdiAudioSrc {
); );
} }
if ndi_struct.initial_timestamp <= audio_frame.timestamp as u64 let mut timestamp_data = self.timestamp_data.lock().unwrap();
|| ndi_struct.initial_timestamp == 0 timestamp_data.initial_timestamp = receiver.initial_timestamp;
if receiver.initial_timestamp <= audio_frame.timestamp as u64
|| receiver.initial_timestamp == 0
{ {
ndi_struct.initial_timestamp = audio_frame.timestamp as u64; receiver.initial_timestamp = audio_frame.timestamp as u64;
timestamp_data.initial_timestamp = audio_frame.timestamp as u64;
} }
} }
} }
@ -386,7 +390,7 @@ impl BaseSrcImpl<BaseSrc> for NdiAudioSrc {
let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); let audio_frame: NDIlib_audio_frame_v2_t = Default::default();
unsafe { unsafe {
let time = ndi_struct.initial_timestamp; let time = timestamp_data.initial_timestamp;
let mut skip_frame = true; let mut skip_frame = true;
let mut count_frame_none = 0; let mut count_frame_none = 0;
@ -403,6 +407,12 @@ impl BaseSrcImpl<BaseSrc> for NdiAudioSrc {
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]); gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]);
return Err(gst::FlowReturn::CustomError); return Err(gst::FlowReturn::CustomError);
} }
else{
if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none && _settings.loss_threshold == 0{
let buffer = gst::Buffer::with_size(0).unwrap();
return Ok(buffer)
}
}
if time >= (audio_frame.timestamp as u64) { if time >= (audio_frame.timestamp as u64) {
gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (audio_frame.timestamp as u64), time); gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (audio_frame.timestamp as u64), time);
} else { } else {

View file

@ -81,6 +81,7 @@ impl Default for State {
struct TimestampData { struct TimestampData {
offset: u64, offset: u64,
initial_timestamp: u64,
} }
struct NdiVideoSrc { struct NdiVideoSrc {
@ -103,7 +104,7 @@ impl NdiVideoSrc {
), ),
settings: Mutex::new(Default::default()), settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()), state: Mutex::new(Default::default()),
timestamp_data: Mutex::new(TimestampData { offset: 0 }), timestamp_data: Mutex::new(TimestampData { offset: 0 , initial_timestamp: 0}),
}) })
} }
@ -230,10 +231,10 @@ impl ElementImpl<BaseSrc> for NdiVideoSrc {
transition: gst::StateChange, transition: gst::StateChange,
) -> gst::StateChangeReturn { ) -> gst::StateChangeReturn {
if transition == gst::StateChange::PausedToPlaying { if transition == gst::StateChange::PausedToPlaying {
let receivers = hashmap_receivers.lock().unwrap(); let mut receivers = hashmap_receivers.lock().unwrap();
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let receiver = receivers.get(&settings.id_receiver).unwrap(); let receiver = receivers.get_mut(&settings.id_receiver).unwrap();
let recv = &receiver.ndi_instance; let recv = &receiver.ndi_instance;
let pNDI_recv = recv.recv; let pNDI_recv = recv.recv;
@ -251,11 +252,15 @@ impl ElementImpl<BaseSrc> for NdiVideoSrc {
); );
} }
if ndi_struct.initial_timestamp <= video_frame.timestamp as u64 let mut timestamp_data = self.timestamp_data.lock().unwrap();
|| ndi_struct.initial_timestamp == 0 timestamp_data.initial_timestamp = receiver.initial_timestamp;
if receiver.initial_timestamp <= video_frame.timestamp as u64
|| receiver.initial_timestamp == 0
{ {
ndi_struct.initial_timestamp = video_frame.timestamp as u64; receiver.initial_timestamp = video_frame.timestamp as u64;
timestamp_data.initial_timestamp = video_frame.timestamp as u64;
} }
} }
} }
element.parent_change_state(transition) element.parent_change_state(transition)
@ -388,7 +393,7 @@ impl BaseSrcImpl<BaseSrc> for NdiVideoSrc {
let video_frame: NDIlib_video_frame_v2_t = Default::default(); let video_frame: NDIlib_video_frame_v2_t = Default::default();
unsafe { unsafe {
let time = ndi_struct.initial_timestamp; let time = timestamp_data.initial_timestamp;
let mut skip_frame = true; let mut skip_frame = true;
let mut count_frame_none = 0; let mut count_frame_none = 0;
@ -405,6 +410,12 @@ impl BaseSrcImpl<BaseSrc> for NdiVideoSrc {
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]); gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]);
return Err(gst::FlowReturn::CustomError); return Err(gst::FlowReturn::CustomError);
} }
else{
if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none && _settings.loss_threshold == 0{
let buffer = gst::Buffer::with_size(0).unwrap();
return Ok(buffer)
}
}
if time >= (video_frame.timestamp as u64) { if time >= (video_frame.timestamp as u64) {
gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (video_frame.timestamp as u64), time); gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (video_frame.timestamp as u64), time);
} else { } else {
@ -412,8 +423,12 @@ impl BaseSrcImpl<BaseSrc> for NdiVideoSrc {
} }
} }
gst_warning!(self.cat, obj: element, "NDI video frame received: {:?}", (video_frame));
pts = video_frame.timestamp as u64 - time; pts = video_frame.timestamp as u64 - time;
gst_warning!(self.cat, obj: element, "Calculated pts for video frame: {:?}", (pts));
let buff_size = (video_frame.yres * video_frame.line_stride_in_bytes) as usize; let buff_size = (video_frame.yres * video_frame.line_stride_in_bytes) as usize;
let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); let mut buffer = gst::Buffer::with_size(buff_size).unwrap();
{ {