Port audio/video source to the safe NDI SDK wrappers

This also fixes various memory unsafety issues.

And add lots of FIXME comments for code that is currently wrong.
This commit is contained in:
Sebastian Dröge 2019-07-11 21:56:30 +03:00
parent 48f8c498fa
commit 861d216eed
4 changed files with 333 additions and 374 deletions

View file

@ -1,5 +1,3 @@
#![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)]
#[macro_use]
extern crate glib;
#[macro_use]
@ -18,57 +16,59 @@ pub mod ndi;
mod ndiaudiosrc;
mod ndivideosrc;
// use gst_plugin::base_src::*;
use ndisys::*;
use std::ffi::{CStr, CString};
use ndi::*;
use std::collections::HashMap;
use std::sync::Mutex;
use gst::GstObjectExt;
use std::sync::atomic::{AtomicUsize, Ordering};
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
if !ndi::initialize() {
return Err(glib_bool_error!("Cannot initialize NDI"));
}
ndivideosrc::register(plugin)?;
ndiaudiosrc::register(plugin)?;
Ok(())
}
struct ndi_receiver_info {
struct ReceiverInfo {
id: usize,
stream_name: String,
ip: String,
video: bool,
audio: bool,
ndi_instance: NdiInstance,
ndi_instance: RecvInstance,
initial_timestamp: u64,
id: i8,
}
struct Ndi {
start_pts: gst::ClockTime,
}
static mut ndi_struct: Ndi = Ndi {
static mut NDI_STRUCT: Ndi = Ndi {
start_pts: gst::ClockTime(Some(0)),
};
lazy_static! {
static ref hashmap_receivers: Mutex<HashMap<i8, ndi_receiver_info>> = {
static ref HASHMAP_RECEIVERS: Mutex<HashMap<usize, ReceiverInfo>> = {
let m = HashMap::new();
Mutex::new(m)
};
}
static mut id_receiver: i8 = 0;
static ID_RECEIVER: AtomicUsize = AtomicUsize::new(0);
fn connect_ndi(
cat: gst::DebugCategory,
element: &gst_base::BaseSrc,
ip: &str,
stream_name: &str,
) -> i8 {
) -> Option<usize> {
gst_debug!(cat, obj: element, "Starting NDI connection...");
let mut receivers = hashmap_receivers.lock().unwrap();
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
let mut audio = false;
let mut video = false;
@ -92,143 +92,102 @@ fn connect_ndi(
} else {
val.audio = audio;
}
return val.id;
return Some(val.id);
}
}
}
unsafe {
if !NDIlib_initialize() {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Cannot run NDI: NDIlib_initialize error"]
);
return 0;
}
let NDI_find_create_desc: NDIlib_find_create_t = Default::default();
let pNDI_find = NDIlib_find_create_v2(&NDI_find_create_desc);
if pNDI_find.is_null() {
let mut find = match FindInstance::builder().build() {
None => {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Cannot run NDI: NDIlib_find_create_v2 error"]
);
return 0;
}
return None;
},
Some(find) => find,
};
let mut total_sources: u32 = 0;
let p_sources;
// TODO Sleep 1s to wait for all sources
find.wait_for_sources(2000);
// TODO Sleep 1s to wait for all sources
NDIlib_find_wait_for_sources(pNDI_find, 2000);
p_sources = NDIlib_find_get_current_sources(pNDI_find, &mut total_sources as *mut u32);
let sources = find.get_current_sources();
// We need at least one source
if p_sources.is_null() {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Error getting NDIlib_find_get_current_sources"]
);
return 0;
}
let mut no_source: isize = -1;
for i in 0..total_sources as isize {
if CStr::from_ptr((*p_sources.offset(i)).p_ndi_name)
.to_string_lossy()
.into_owned()
== stream_name
|| CStr::from_ptr((*p_sources.offset(i)).p_ip_address)
.to_string_lossy()
.into_owned()
== ip
{
no_source = i;
break;
}
}
if no_source == -1 {
gst_element_error!(element, gst::ResourceError::OpenRead, ["Stream not found"]);
return 0;
}
gst_debug!(
cat,
obj: element,
"Total sources in network {}: Connecting to NDI source with name '{}' and address '{}'",
total_sources,
CStr::from_ptr((*p_sources.offset(no_source)).p_ndi_name)
.to_string_lossy()
.into_owned(),
CStr::from_ptr((*p_sources.offset(no_source)).p_ip_address)
.to_string_lossy()
.into_owned()
// We need at least one source
if sources.is_empty() {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Error getting NDIlib_find_get_current_sources"]
);
return None;
}
let source = *p_sources.offset(no_source);
let source = sources.iter().find(|s| {
s.ndi_name() == stream_name || s.ip_address() == ip
});
let source_ip = CStr::from_ptr(source.p_ip_address)
.to_string_lossy()
.into_owned();
let source_name = CStr::from_ptr(source.p_ndi_name)
.to_string_lossy()
.into_owned();
let source = match source {
None => {
gst_element_error!(element, gst::ResourceError::OpenRead, ["Stream not found"]);
return None;
},
Some(source) => source,
};
let p_ndi_name = CString::new("Galicaster NDI Receiver").unwrap();
let NDI_recv_create_desc = NDIlib_recv_create_v3_t {
source_to_connect_to: source,
p_ndi_name: p_ndi_name.as_ptr(),
..Default::default()
};
gst_debug!(
cat,
obj: element,
"Total sources in network {}: Connecting to NDI source with name '{}' and address '{}'",
sources.len(),
source.ndi_name(),
source.ip_address(),
);
let pNDI_recv = NDIlib_recv_create_v3(&NDI_recv_create_desc);
if pNDI_recv.is_null() {
let recv = RecvInstance::builder(&source, "Galicaster NDI Receiver")
.bandwidth(NDIlib_recv_bandwidth_e::NDIlib_recv_bandwidth_highest)
.color_format(NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA)
.allow_video_fields(true)
.build();
let recv = match recv {
None => {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Cannot run NDI: NDIlib_recv_create_v3 error"]
);
return 0;
}
return None;
},
Some(recv) => recv,
};
NDIlib_find_destroy(pNDI_find);
recv.set_tally(&Tally::default());
let tally_state: NDIlib_tally_t = Default::default();
NDIlib_recv_set_tally(pNDI_recv, &tally_state);
let enable_hw_accel = MetadataFrame::new(0, Some("<ndi_hwaccel enabled=\"true\"/>"));
recv.send_metadata(&enable_hw_accel);
let data = CString::new("<ndi_hwaccel enabled=\"true\"/>").unwrap();
let enable_hw_accel = NDIlib_metadata_frame_t {
length: data.to_bytes().len() as i32,
timecode: 0,
p_data: data.as_ptr(),
};
let id_receiver = ID_RECEIVER.fetch_add(1, Ordering::SeqCst);
receivers.insert(
id_receiver,
ReceiverInfo {
stream_name: source.ndi_name().to_owned(),
ip: source.ip_address().to_owned(),
video,
audio,
ndi_instance: recv,
initial_timestamp: 0,
id: id_receiver,
},
);
NDIlib_recv_send_metadata(pNDI_recv, &enable_hw_accel);
id_receiver += 1;
receivers.insert(
id_receiver,
ndi_receiver_info {
stream_name: source_name.clone(),
ip: source_ip.clone(),
video,
audio,
ndi_instance: NdiInstance { recv: pNDI_recv },
initial_timestamp: 0,
id: id_receiver,
},
);
gst_debug!(cat, obj: element, "Started NDI connection");
id_receiver
}
gst_debug!(cat, obj: element, "Started NDI connection");
Some(id_receiver)
}
fn stop_ndi(cat: gst::DebugCategory, element: &gst_base::BaseSrc, id: i8) -> bool {
fn stop_ndi(cat: gst::DebugCategory, element: &gst_base::BaseSrc, id: usize) -> bool {
gst_debug!(cat, obj: element, "Closing NDI connection...");
let mut receivers = hashmap_receivers.lock().unwrap();
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
{
let val = receivers.get_mut(&id).unwrap();
if val.video && val.audio {
@ -239,14 +198,6 @@ fn stop_ndi(cat: gst::DebugCategory, element: &gst_base::BaseSrc, id: i8) -> boo
}
return true;
}
let recv = &val.ndi_instance;
let pNDI_recv = recv.recv;
unsafe {
NDIlib_recv_destroy(pNDI_recv);
// ndi_struct.recv = None;
NDIlib_destroy();
}
}
receivers.remove(&id);
gst_debug!(cat, obj: element, "Closed NDI connection");

View file

@ -7,6 +7,7 @@ pub fn initialize() -> bool {
unsafe { NDIlib_initialize() }
}
#[derive(Debug)]
pub struct FindBuilder<'a> {
show_local_sources: bool,
groups: Option<&'a str>,
@ -67,6 +68,7 @@ impl<'a> FindBuilder<'a> {
}
}
#[derive(Debug)]
pub struct FindInstance(ptr::NonNull<::std::os::raw::c_void>);
unsafe impl Send for FindInstance {}
@ -111,6 +113,7 @@ impl Drop for FindInstance {
}
}
#[derive(Debug)]
pub struct Source<'a>(ptr::NonNull<NDIlib_source_t>, &'a FindInstance);
unsafe impl<'a> Send for Source<'a> {}
@ -135,6 +138,7 @@ impl<'a> Source<'a> {
}
}
#[derive(Debug)]
pub struct RecvBuilder<'a> {
source_to_connect_to: &'a Source<'a>,
allow_video_fields: bool,
@ -185,6 +189,7 @@ impl<'a> RecvBuilder<'a> {
}
}
#[derive(Debug)]
pub struct RecvInstance(ptr::NonNull<::std::os::raw::c_void>);
unsafe impl Send for RecvInstance {}
@ -207,7 +212,7 @@ impl RecvInstance {
unsafe { NDIlib_recv_send_metadata(self.0.as_ptr(), metadata.as_ptr()) }
}
pub fn recv_capture(
pub fn capture(
&self,
video: bool,
audio: bool,
@ -268,6 +273,7 @@ impl Drop for RecvInstance {
}
}
#[derive(Debug)]
pub struct Tally(NDIlib_tally_t);
unsafe impl Send for Tally {}
@ -297,12 +303,14 @@ impl Tally {
}
}
#[derive(Debug)]
pub enum Frame<'a> {
Video(VideoFrame<'a>),
Audio(AudioFrame<'a>),
Metadata(MetadataFrame<'a>),
}
#[derive(Debug)]
pub enum VideoFrame<'a> {
//Owned(NDIlib_video_frame_v2_t, Option<ffi::CString>, Option<Vec<u8>>),
Borrowed(NDIlib_video_frame_v2_t, &'a RecvInstance),
@ -407,6 +415,7 @@ impl<'a> Drop for VideoFrame<'a> {
}
}
#[derive(Debug)]
pub enum AudioFrame<'a> {
//Owned(NDIlib_audio_frame_v2_t, Option<ffi::CString>, Option<Vec<u8>>),
Borrowed(NDIlib_audio_frame_v2_t, &'a RecvInstance),
@ -484,7 +493,7 @@ impl<'a> AudioFrame<'a> {
pub fn copy_to_interleaved_16s(&self, data: &mut [i16]) {
assert_eq!(
data.len(),
(self.no_samples() * self.no_channels() * 2) as usize
(self.no_samples() * self.no_channels()) as usize
);
let mut dst = NDIlib_audio_frame_interleaved_16s_t {
@ -513,6 +522,7 @@ impl<'a> Drop for AudioFrame<'a> {
}
}
#[derive(Debug)]
pub enum MetadataFrame<'a> {
Owned(NDIlib_metadata_frame_t, Option<ffi::CString>),
Borrowed(NDIlib_metadata_frame_t, &'a RecvInstance),

View file

@ -1,5 +1,3 @@
#![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)]
use glib;
use glib::subclass;
use glib::subclass::prelude::*;
@ -14,22 +12,22 @@ use gst_base::subclass::prelude::*;
use std::sync::Mutex;
use std::{i32, u32};
use std::ptr;
use connect_ndi;
use ndi_struct;
use ndisys::*;
use stop_ndi;
use ndi::*;
use NDI_STRUCT;
use HASHMAP_RECEIVERS;
use byte_slice_cast::AsMutSliceOf;
use hashmap_receivers;
#[derive(Debug, Clone)]
struct Settings {
stream_name: String,
ip: String,
loss_threshold: u32,
id_receiver: i8,
// FIXME: Should be in State
id_receiver: Option<usize>,
latency: Option<gst::ClockTime>,
}
@ -39,7 +37,7 @@ impl Default for Settings {
stream_name: String::from("Fixed ndi stream name"),
ip: String::from(""),
loss_threshold: 5,
id_receiver: 0,
id_receiver: None,
latency: None,
}
}
@ -248,25 +246,23 @@ impl ElementImpl for NdiAudioSrc {
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if transition == gst::StateChange::PausedToPlaying {
let mut receivers = hashmap_receivers.lock().unwrap();
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
let settings = self.settings.lock().unwrap();
let receiver = receivers.get_mut(&settings.id_receiver).unwrap();
let receiver = receivers.get_mut(&settings.id_receiver.unwrap()).unwrap();
let recv = &receiver.ndi_instance;
let pNDI_recv = recv.recv;
let audio_frame: NDIlib_audio_frame_v2_t = Default::default();
// FIXME error handling, make interruptable
let audio_frame =
loop {
match recv.capture(false, true, false, 1000) {
Err(_) => unimplemented!(),
Ok(None) => continue,
Ok(Some(Frame::Audio(frame))) => break frame,
_ => unreachable!(),
}
};
unsafe {
while NDIlib_recv_capture_v2(
pNDI_recv,
ptr::null(),
&audio_frame,
ptr::null(),
1000,
) != NDIlib_frame_type_e::NDIlib_frame_type_audio
{}
}
gst_debug!(
self.cat,
obj: element,
@ -274,14 +270,13 @@ impl ElementImpl for NdiAudioSrc {
audio_frame
);
if receiver.initial_timestamp <= audio_frame.timestamp as u64
// FIXME handle unset timestamp
if receiver.initial_timestamp <= audio_frame.timestamp() as u64
|| receiver.initial_timestamp == 0
{
receiver.initial_timestamp = audio_frame.timestamp as u64;
}
unsafe {
NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame);
receiver.initial_timestamp = audio_frame.timestamp() as u64;
}
gst_debug!(
self.cat,
obj: element,
@ -330,7 +325,7 @@ impl BaseSrcImpl for NdiAudioSrc {
);
match settings.id_receiver {
0 => Err(gst_error_msg!(
None => Err(gst_error_msg!(
gst::ResourceError::NotFound,
["Could not connect to this source"]
)),
@ -342,7 +337,7 @@ impl BaseSrcImpl for NdiAudioSrc {
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap();
stop_ndi(self.cat, element, settings.id_receiver);
stop_ndi(self.cat, element, settings.id_receiver.unwrap());
// Commented because when adding ndi destroy stopped in this line
//*self.state.lock().unwrap() = Default::default();
Ok(())
@ -372,24 +367,28 @@ impl BaseSrcImpl for NdiAudioSrc {
}
fn fixate(&self, element: &gst_base::BaseSrc, caps: gst::Caps) -> gst::Caps {
let receivers = hashmap_receivers.lock().unwrap();
let receivers = HASHMAP_RECEIVERS.lock().unwrap();
let mut settings = self.settings.lock().unwrap();
let receiver = receivers.get(&settings.id_receiver).unwrap();
let receiver = receivers.get(&settings.id_receiver.unwrap()).unwrap();
let recv = &receiver.ndi_instance;
let pNDI_recv = recv.recv;
let audio_frame: NDIlib_audio_frame_v2_t = Default::default();
// FIXME: Should be done in create() and caps be updated as needed
unsafe {
while NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000)
!= NDIlib_frame_type_e::NDIlib_frame_type_audio
{}
}
let audio_frame =
loop {
match recv.capture(false, true, false, 1000) {
Err(_) => unimplemented!(),
Ok(None) => continue,
Ok(Some(Frame::Audio(frame))) => break frame,
_ => unreachable!(),
}
};
let no_samples = audio_frame.no_samples as u64;
let audio_rate = audio_frame.sample_rate;
// FIXME: Why?
let no_samples = audio_frame.no_samples() as u64;
let audio_rate = audio_frame.sample_rate();
settings.latency = gst::SECOND.mul_div_floor(no_samples, audio_rate as u64);
let mut caps = gst::Caps::truncate(caps);
@ -397,21 +396,18 @@ impl BaseSrcImpl for NdiAudioSrc {
let caps = caps.make_mut();
let s = caps.get_mut_structure(0).unwrap();
s.fixate_field_nearest_int("rate", audio_rate);
s.fixate_field_nearest_int("channels", audio_frame.no_channels);
s.fixate_field_nearest_int("channels", audio_frame.no_channels());
s.fixate_field_str("layout", "interleaved");
s.set_value(
"channel-mask",
gst::Bitmask::new(gst_audio::AudioChannelPosition::get_fallback_mask(
audio_frame.no_channels as u32,
audio_frame.no_channels() as u32,
))
.to_send_value(),
);
}
let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());
unsafe {
NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame);
}
self.parent_fixate(element, caps)
}
@ -434,112 +430,113 @@ impl BaseSrcImpl for NdiAudioSrc {
}
Some(ref info) => info.clone(),
};
let receivers = hashmap_receivers.lock().unwrap();
let receivers = HASHMAP_RECEIVERS.lock().unwrap();
let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance;
let pNDI_recv = recv.recv;
let receiver = &receivers.get(&_settings.id_receiver.unwrap()).unwrap();
let recv = &receiver.ndi_instance;
let pts: u64;
let audio_frame: NDIlib_audio_frame_v2_t = Default::default();
let time = receiver.initial_timestamp;
unsafe {
let time = receivers
.get(&_settings.id_receiver)
.unwrap()
.initial_timestamp;
let mut count_frame_none = 0;
let audio_frame =
loop {
// FIXME: make interruptable
let res =
loop {
match recv.capture(false, true, false, 1000) {
Err(_) => break Err(()),
Ok(None) => break Ok(None),
Ok(Some(Frame::Audio(frame))) => break Ok(Some(frame)),
_ => unreachable!(),
}
};
let mut skip_frame = true;
let mut count_frame_none = 0;
while skip_frame {
let frame_type =
NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000);
if (frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none
&& _settings.loss_threshold != 0)
|| frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error
{
let audio_frame = match res {
Err(_) => {
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type error received, assuming that the source closed the stream...."]);
return Err(gst::FlowError::Error);
},
Ok(None) if _settings.loss_threshold != 0 => {
if count_frame_none < _settings.loss_threshold {
count_frame_none += 1;
continue;
}
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none or error received, assuming that the source closed the stream...."]);
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]);
return Err(gst::FlowError::Error);
} else if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none
&& _settings.loss_threshold == 0
{
},
Ok(None) => {
gst_debug!(
self.cat,
obj: element,
"No audio frame received, sending empty buffer"
"No audio frame received, retry"
);
let buffer = gst::Buffer::with_size(0).unwrap();
return Ok(buffer);
}
count_frame_none += 1;
continue;
},
Ok(Some(frame)) => frame,
};
if time >= (audio_frame.timestamp as u64) {
NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame);
gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (audio_frame.timestamp as u64), time);
} else {
skip_frame = false;
}
if time >= (audio_frame.timestamp() as u64) {
gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (audio_frame.timestamp() as u64), time);
} else {
break audio_frame;
}
};
gst_log!(
self.cat,
obj: element,
"NDI audio frame received: {:?}",
(audio_frame)
);
gst_log!(
self.cat,
obj: element,
"NDI audio frame received: {:?}",
audio_frame
);
pts = audio_frame.timestamp as u64 - time;
let pts = audio_frame.timestamp() as u64 - time;
gst_log!(
self.cat,
obj: element,
"Calculated pts for audio frame: {:?}",
(pts)
);
gst_log!(
self.cat,
obj: element,
"Calculated pts for audio frame: {:?}",
pts
);
// We multiply by 2 because is the size in bytes of an i16 variable
let buff_size = (audio_frame.no_samples * 2 * audio_frame.no_channels) as usize;
let mut buffer = gst::Buffer::with_size(buff_size).unwrap();
{
if ndi_struct.start_pts == gst::ClockTime(Some(0)) {
ndi_struct.start_pts =
// We multiply by 2 because is the size in bytes of an i16 variable
let buff_size = (audio_frame.no_samples() * 2 * audio_frame.no_channels()) as usize;
let mut buffer = gst::Buffer::with_size(buff_size).unwrap();
{
// FIXME don't use static mut, also this calculation is wrong
unsafe {
if NDI_STRUCT.start_pts == gst::ClockTime(Some(0)) {
NDI_STRUCT.start_pts =
element.get_clock().unwrap().get_time() - element.get_base_time();
}
let buffer = buffer.get_mut().unwrap();
// Newtek NDI yields times in 100ns intervals since the Unix Time
let pts: gst::ClockTime = (pts * 100).into();
buffer.set_pts(pts + ndi_struct.start_pts);
let duration: gst::ClockTime = (((f64::from(audio_frame.no_samples)
/ f64::from(audio_frame.sample_rate))
* 1_000_000_000.0) as u64)
.into();
buffer.set_duration(duration);
buffer.set_offset(timestamp_data.offset);
timestamp_data.offset += audio_frame.no_samples as u64;
buffer.set_offset_end(timestamp_data.offset);
let mut dst: NDIlib_audio_frame_interleaved_16s_t = Default::default();
dst.reference_level = 0;
dst.p_data = buffer
.map_writable()
.unwrap()
.as_mut_slice_of::<i16>()
.unwrap()
.as_mut_ptr();
NDIlib_util_audio_to_interleaved_16s_v2(&audio_frame, &mut dst);
NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame);
}
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
let buffer = buffer.get_mut().unwrap();
Ok(buffer)
// Newtek NDI yields times in 100ns intervals since the Unix Time
let pts: gst::ClockTime = (pts * 100).into();
buffer.set_pts(pts + unsafe { NDI_STRUCT.start_pts });
let duration: gst::ClockTime = (((f64::from(audio_frame.no_samples())
/ f64::from(audio_frame.sample_rate()))
* 1_000_000_000.0) as u64)
.into();
buffer.set_duration(duration);
buffer.set_offset(timestamp_data.offset);
timestamp_data.offset += audio_frame.no_samples() as u64;
buffer.set_offset_end(timestamp_data.offset);
audio_frame.copy_to_interleaved_16s(buffer
.map_writable()
.unwrap()
.as_mut_slice_of::<i16>()
.unwrap());
}
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer)
}
}

View file

@ -1,5 +1,3 @@
#![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)]
use glib;
use glib::subclass;
use glib::subclass::prelude::*;
@ -16,21 +14,21 @@ use gst_video;
use std::sync::Mutex;
use std::{i32, u32};
use std::{slice, ptr};
use ndi::*;
use connect_ndi;
use ndi_struct;
use ndisys::*;
use stop_ndi;
use hashmap_receivers;
use NDI_STRUCT;
use HASHMAP_RECEIVERS;
#[derive(Debug, Clone)]
struct Settings {
stream_name: String,
ip: String,
loss_threshold: u32,
id_receiver: i8,
// FIXME: should be in state
id_receiver: Option<usize>,
latency: Option<gst::ClockTime>,
}
@ -40,7 +38,7 @@ impl Default for Settings {
stream_name: String::from("Fixed ndi stream name"),
ip: String::from(""),
loss_threshold: 5,
id_receiver: 0,
id_receiver: None,
latency: None,
}
}
@ -256,25 +254,23 @@ impl ElementImpl for NdiVideoSrc {
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if transition == gst::StateChange::PausedToPlaying {
let mut receivers = hashmap_receivers.lock().unwrap();
let mut receivers = HASHMAP_RECEIVERS.lock().unwrap();
let settings = self.settings.lock().unwrap();
let receiver = receivers.get_mut(&settings.id_receiver).unwrap();
let receiver = receivers.get_mut(&settings.id_receiver.unwrap()).unwrap();
let recv = &receiver.ndi_instance;
let pNDI_recv = recv.recv;
let video_frame: NDIlib_video_frame_v2_t = Default::default();
// FIXME error handling, make interruptable
let video_frame =
loop {
match recv.capture(true, false, false, 1000) {
Err(_) => unimplemented!(),
Ok(None) => continue,
Ok(Some(Frame::Video(frame))) => break frame,
_ => unreachable!(),
}
};
unsafe {
while NDIlib_recv_capture_v2(
pNDI_recv,
&video_frame,
ptr::null(),
ptr::null(),
1000,
) != NDIlib_frame_type_e::NDIlib_frame_type_video
{}
}
gst_debug!(
self.cat,
obj: element,
@ -282,13 +278,12 @@ impl ElementImpl for NdiVideoSrc {
video_frame
);
if receiver.initial_timestamp <= video_frame.timestamp as u64
// FIXME handle unset timestamp
if receiver.initial_timestamp <= video_frame.timestamp() as u64
|| receiver.initial_timestamp == 0
{
receiver.initial_timestamp = video_frame.timestamp as u64;
}
unsafe {
NDIlib_recv_free_video_v2(pNDI_recv, &video_frame);
receiver.initial_timestamp = video_frame.timestamp() as u64;
}
gst_debug!(
self.cat,
@ -331,13 +326,13 @@ impl BaseSrcImpl for NdiVideoSrc {
settings.id_receiver = connect_ndi(
self.cat,
element,
&settings.ip.clone(),
&settings.stream_name.clone(),
&settings.ip,
&settings.stream_name,
);
// settings.id_receiver != 0
// settings.id_receiver exists
match settings.id_receiver {
0 => Err(gst_error_msg!(
None => Err(gst_error_msg!(
gst::ResourceError::NotFound,
["Could not connect to this source"]
)),
@ -349,7 +344,7 @@ impl BaseSrcImpl for NdiVideoSrc {
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap();
stop_ndi(self.cat, element, settings.id_receiver);
stop_ndi(self.cat, element, settings.id_receiver.unwrap());
// Commented because when adding ndi destroy stopped in this line
//*self.state.lock().unwrap() = Default::default();
Ok(())
@ -379,39 +374,40 @@ impl BaseSrcImpl for NdiVideoSrc {
}
fn fixate(&self, element: &gst_base::BaseSrc, caps: gst::Caps) -> gst::Caps {
let receivers = hashmap_receivers.lock().unwrap();
let receivers = HASHMAP_RECEIVERS.lock().unwrap();
let mut settings = self.settings.lock().unwrap();
let receiver = receivers.get(&settings.id_receiver).unwrap();
let receiver = receivers.get(&settings.id_receiver.unwrap()).unwrap();
let recv = &receiver.ndi_instance;
let pNDI_recv = recv.recv;
let video_frame: NDIlib_video_frame_v2_t = Default::default();
// FIXME: Should be done in create() and caps be updated as needed
let video_frame =
loop {
match recv.capture(true, false, false, 1000) {
Err(_) => unimplemented!(),
Ok(None) => continue,
Ok(Some(Frame::Video(frame))) => break frame,
_ => unreachable!(),
}
};
unsafe {
while NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000)
!= NDIlib_frame_type_e::NDIlib_frame_type_video
{}
}
// FIXME: Why?
settings.latency = gst::SECOND.mul_div_floor(
video_frame.frame_rate_D as u64,
video_frame.frame_rate_N as u64,
video_frame.frame_rate().1 as u64,
video_frame.frame_rate().0 as u64,
);
let mut caps = gst::Caps::truncate(caps);
{
let caps = caps.make_mut();
let s = caps.get_mut_structure(0).unwrap();
s.fixate_field_nearest_int("width", video_frame.xres);
s.fixate_field_nearest_int("height", video_frame.yres);
s.fixate_field_nearest_int("width", video_frame.xres());
s.fixate_field_nearest_int("height", video_frame.yres());
s.fixate_field_nearest_fraction(
"framerate",
Fraction::new(video_frame.frame_rate_N, video_frame.frame_rate_D),
Fraction::new(video_frame.frame_rate().0, video_frame.frame_rate().1),
);
}
unsafe {
NDIlib_recv_free_video_v2(pNDI_recv, &video_frame);
}
let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());
self.parent_fixate(element, caps)
}
@ -434,38 +430,41 @@ impl BaseSrcImpl for NdiVideoSrc {
}
Some(ref info) => info.clone(),
};
let receivers = hashmap_receivers.lock().unwrap();
let receivers = HASHMAP_RECEIVERS.lock().unwrap();
let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance;
let pNDI_recv = recv.recv;
let receiver = &receivers.get(&_settings.id_receiver.unwrap()).unwrap();
let recv = &receiver.ndi_instance;
let pts: u64;
let video_frame: NDIlib_video_frame_v2_t = Default::default();
let time = receiver.initial_timestamp;
unsafe {
let time = receivers
.get(&_settings.id_receiver)
.unwrap()
.initial_timestamp;
let mut count_frame_none = 0;
let video_frame =
loop {
// FIXME: make interruptable
let res =
loop {
match recv.capture(true, false, false, 1000) {
Err(_) => break Err(()),
Ok(None) => break Ok(None),
Ok(Some(Frame::Video(frame))) => break Ok(Some(frame)),
_ => unreachable!(),
}
};
let mut skip_frame = true;
let mut count_frame_none = 0;
while skip_frame {
let frame_type =
NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000);
if (frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none
&& _settings.loss_threshold != 0)
|| frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error
{
let video_frame = match res {
Err(_) => {
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type error received, assuming that the source closed the stream...."]);
return Err(gst::FlowError::Error);
},
Ok(None) if _settings.loss_threshold != 0 => {
if count_frame_none < _settings.loss_threshold {
count_frame_none += 1;
continue;
}
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none or error received, assuming that the source closed the stream...."]);
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]);
return Err(gst::FlowError::Error);
} else if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none
&& _settings.loss_threshold == 0
{
},
Ok(None) => {
gst_debug!(
self.cat,
obj: element,
@ -473,62 +472,64 @@ impl BaseSrcImpl for NdiVideoSrc {
);
count_frame_none += 1;
continue;
}
},
Ok(Some(frame)) => frame,
};
if time >= (video_frame.timestamp as u64) {
gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (video_frame.timestamp as u64), time);
} else {
skip_frame = false;
}
if time >= (video_frame.timestamp() as u64) {
gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (video_frame.timestamp() as u64), time);
} else {
break video_frame;
}
};
gst_log!(
self.cat,
obj: element,
"NDI video frame received: {:?}",
(video_frame)
);
gst_log!(
self.cat,
obj: element,
"NDI video frame received: {:?}",
video_frame
);
pts = video_frame.timestamp as u64 - time;
let pts = video_frame.timestamp() as u64 - time;
gst_log!(
self.cat,
obj: element,
"Calculated pts for video frame: {:?}",
(pts)
);
gst_log!(
self.cat,
obj: element,
"Calculated pts for video frame: {:?}",
pts
);
let buff_size = (video_frame.yres * video_frame.line_stride_in_bytes) as usize;
let mut buffer = gst::Buffer::with_size(buff_size).unwrap();
{
let data = slice::from_raw_parts(video_frame.p_data as *mut u8, buff_size);
// Newtek NDI yields times in 100ns intervals since the Unix Time
let pts: gst::ClockTime = (pts * 100).into();
let buff_size = (video_frame.yres() * video_frame.line_stride_in_bytes()) as usize;
let mut buffer = gst::Buffer::with_size(buff_size).unwrap();
{
// Newtek NDI yields times in 100ns intervals since the Unix Time
let pts: gst::ClockTime = (pts * 100).into();
let duration: gst::ClockTime = (((f64::from(video_frame.frame_rate_D)
/ f64::from(video_frame.frame_rate_N))
* 1_000_000_000.0) as u64)
.into();
let buffer = buffer.get_mut().unwrap();
let duration: gst::ClockTime = (((f64::from(video_frame.frame_rate().1)
/ f64::from(video_frame.frame_rate().0))
* 1_000_000_000.0) as u64)
.into();
let buffer = buffer.get_mut().unwrap();
if ndi_struct.start_pts == gst::ClockTime(Some(0)) {
ndi_struct.start_pts =
// FIXME don't use static mut, also this calculation is wrong
unsafe {
if NDI_STRUCT.start_pts == gst::ClockTime(Some(0)) {
NDI_STRUCT.start_pts =
element.get_clock().unwrap().get_time() - element.get_base_time();
}
buffer.set_pts(pts + ndi_struct.start_pts);
buffer.set_duration(duration);
buffer.set_offset(timestamp_data.offset);
timestamp_data.offset += 1;
buffer.set_offset_end(timestamp_data.offset);
buffer.copy_from_slice(0, data).unwrap();
buffer.set_pts(pts + NDI_STRUCT.start_pts);
}
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
NDIlib_recv_free_video_v2(pNDI_recv, &video_frame);
Ok(buffer)
buffer.set_duration(duration);
buffer.set_offset(timestamp_data.offset);
timestamp_data.offset += 1;
buffer.set_offset_end(timestamp_data.offset);
buffer.copy_from_slice(0, video_frame.data()).unwrap();
}
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer)
}
}