Fix to use multiple NDI streams in the same pipeline

It's not possible to connect to the same stream twice. For example to audio and video from the same stream.
This commit is contained in:
Daniel Vilar 2018-08-14 15:45:13 +02:00
parent 0baf7ee839
commit 8bf4f8f935
3 changed files with 79 additions and 70 deletions

View file

@ -19,6 +19,8 @@ extern crate gstreamer_video as gst_video;
extern crate byte_slice_cast; extern crate byte_slice_cast;
extern crate num_traits; extern crate num_traits;
#[macro_use]
extern crate lazy_static;
mod ndivideosrc; mod ndivideosrc;
mod ndiaudiosrc; mod ndiaudiosrc;
@ -31,6 +33,8 @@ use std::ffi::{CStr, CString};
use ndilib::*; use ndilib::*;
use gst_plugin::base_src::*; use gst_plugin::base_src::*;
use std::collections::HashMap;
use std::sync::Mutex;
// Plugin entry point that should register all elements provided by this plugin, // Plugin entry point that should register all elements provided by this plugin,
// and everything else that this plugin might provide (e.g. typefinders or device providers). // and everything else that this plugin might provide (e.g. typefinders or device providers).
@ -40,7 +44,6 @@ fn plugin_init(plugin: &gst::Plugin) -> bool {
true true
} }
struct Ndi{ struct Ndi{
recv: Option<NdiInstance>, recv: Option<NdiInstance>,
start_pts: u64, start_pts: u64,
@ -51,16 +54,23 @@ static mut ndi_struct: Ndi = Ndi{
start_pts: 0, start_pts: 0,
}; };
lazy_static! {
static ref hashmap_receivers: Mutex<HashMap<String, NdiInstance>> = {
let mut m = HashMap::new();
Mutex::new(m)
};
}
fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream_name: String) -> bool{ fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream_name: String) -> bool{
unsafe { unsafe {
gst_debug!(cat, obj: element, "Starting NDI connection..."); gst_debug!(cat, obj: element, "Starting NDI connection...");
match ndi_struct.recv {
None => { let mut map = hashmap_receivers.lock().unwrap();
//gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]); if (map.contains_key(&stream_name) || map.contains_key(&ip)){
//return true; println!("Already connected to {}{}", ip, stream_name);
return false;
} }
_ => return true,
};
if !NDIlib_initialize() { if !NDIlib_initialize() {
gst_element_error!(element, gst::CoreError::Negotiation, ["Cannot run NDI: NDIlib_initialize error"]); gst_element_error!(element, gst::CoreError::Negotiation, ["Cannot run NDI: NDIlib_initialize error"]);
return false; return false;
@ -73,7 +83,6 @@ fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream
let NDI_find_create_desc: NDIlib_find_create_t = Default::default(); let NDI_find_create_desc: NDIlib_find_create_t = Default::default();
let pNDI_find = NDIlib_find_create_v2(&NDI_find_create_desc); let pNDI_find = NDIlib_find_create_v2(&NDI_find_create_desc);
let ip_ptr = CString::new(ip.clone()).unwrap(); let ip_ptr = CString::new(ip.clone()).unwrap();
if ip_ptr == CString::new("").unwrap(){
if pNDI_find.is_null() { if pNDI_find.is_null() {
gst_element_error!(element, gst::CoreError::Negotiation, ["Cannot run NDI: NDIlib_find_create_v2 error"]); gst_element_error!(element, gst::CoreError::Negotiation, ["Cannot run NDI: NDIlib_find_create_v2 error"]);
return false; return false;
@ -94,9 +103,8 @@ fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream
let mut no_source: isize = -1; let mut no_source: isize = -1;
for i in 0..total_sources as isize{ for i in 0..total_sources as isize{
if CStr::from_ptr((*p_sources.offset(i)).p_ndi_name) if (CStr::from_ptr((*p_sources.offset(i)).p_ndi_name).to_string_lossy().into_owned() == stream_name ||
.to_string_lossy() CStr::from_ptr((*p_sources.offset(i)).p_ip_address).to_string_lossy().into_owned() == ip){
.into_owned() == stream_name{
no_source = i; no_source = i;
break; break;
} }
@ -115,14 +123,9 @@ fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream
.into_owned()); .into_owned());
source = *p_sources.offset(no_source).clone(); source = *p_sources.offset(no_source).clone();
}
else{ let source_ip = CStr::from_ptr(source.p_ip_address).to_string_lossy().into_owned();
source.p_ip_address = ip_ptr.as_ptr(); let source_name = CStr::from_ptr(source.p_ndi_name).to_string_lossy().into_owned();
gst_debug!(cat, obj: element, "Connecting to NDI source with address '{}'", CStr::from_ptr(source.p_ip_address)
.to_string_lossy()
.into_owned()
);
}
// We now have at least one source, so we create a receiver to look at it. // We now have at least one source, so we create a receiver to look at it.
// We tell it that we prefer YCbCr video since it is more efficient for us. If the source has an alpha channel // We tell it that we prefer YCbCr video since it is more efficient for us. If the source has an alpha channel
@ -159,7 +162,9 @@ fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream
}; };
NDIlib_recv_send_metadata(pNDI_recv, &enable_hw_accel); NDIlib_recv_send_metadata(pNDI_recv, &enable_hw_accel);
ndi_struct.recv = Some(NdiInstance{recv: pNDI_recv});
map.insert(source_name.clone(), NdiInstance{recv: pNDI_recv});
map.insert(source_ip.clone(), NdiInstance{recv: pNDI_recv});
// let start = SystemTime::now(); // let start = SystemTime::now();
// let since_the_epoch = start.duration_since(UNIX_EPOCH) // let since_the_epoch = start.duration_since(UNIX_EPOCH)

View file

@ -18,9 +18,11 @@ use std::ptr;
use ndilib::*; use ndilib::*;
use connect_ndi; use connect_ndi;
use ndi_struct; // use ndi_struct;
use stop_ndi; use stop_ndi;
use hashmap_receivers;
// Property value storage // Property value storage
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
@ -318,26 +320,27 @@ impl NdiAudioSrc {
fn fixate(&self, element: &BaseSrc, caps: gst::Caps) -> gst::Caps { fn fixate(&self, element: &BaseSrc, caps: gst::Caps) -> gst::Caps {
//We need to set the correct caps resolution and framerate //We need to set the correct caps resolution and framerate
unsafe{ unsafe{
let recv = match ndi_struct.recv{ let map = hashmap_receivers.lock().unwrap();
None => { let settings = self.settings.lock().unwrap();
//TODO Update gst_element_error with one more descriptive
//println!("pNDI_recv no encontrado"); let mut id = &settings.stream_name;
gst_element_error!(element, gst::CoreError::Negotiation, ["No encontramos ndi recv"]); if (&settings.ip != ""){
return caps; id = &settings.ip;
} }
Some(ref recv) => recv.clone(), let recv = map.get(id).unwrap();
};
let pNDI_recv = recv.recv; let pNDI_recv = recv.recv;
let mut timestamp_data = self.timestamp_data.lock().unwrap(); let mut timestamp_data = self.timestamp_data.lock().unwrap();
let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); let audio_frame: NDIlib_audio_frame_v2_t = Default::default();
let mut frame_type: NDIlib_frame_type_e = NDIlib_frame_type_e::NDIlib_frame_type_none; let mut frame_type: NDIlib_frame_type_e = NDIlib_frame_type_e::NDIlib_frame_type_none;
while frame_type != NDIlib_frame_type_e::NDIlib_frame_type_audio{ while frame_type != NDIlib_frame_type_e::NDIlib_frame_type_audio{
frame_type = NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000); frame_type = NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000);
} }
ndi_struct.start_pts = audio_frame.timecode as u64;
//timestamp_data.pts = audio_frame.timecode as u64; //ndi_struct.start_pts = audio_frame.timecode as u64;
timestamp_data.pts = audio_frame.timecode as u64;
let mut caps = gst::Caps::truncate(caps); let mut caps = gst::Caps::truncate(caps);
{ {
@ -377,22 +380,22 @@ impl NdiAudioSrc {
Some(ref info) => info.clone(), Some(ref info) => info.clone(),
}; };
unsafe{ unsafe{
let recv = match ndi_struct.recv{ let map = hashmap_receivers.lock().unwrap();
None => { let mut id = &_settings.stream_name;
//TODO Update gst_element_error with one more descriptive
//println!("pNDI_recv no encontrado");
gst_element_error!(element, gst::CoreError::Negotiation, ["No encontramos ndi recv"]);
return Err(gst::FlowReturn::NotNegotiated);
}
Some(ref recv) => recv.clone(),
};
let pNDI_recv = recv.recv;
let pts: u64;
if (&_settings.ip != ""){
id = &_settings.ip;
}
let recv = map.get(id).unwrap();
let pNDI_recv = recv.recv;
let pts: u64;
let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); let audio_frame: NDIlib_audio_frame_v2_t = Default::default();
NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000,); NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000,);
//pts = (audio_frame.timecode as u64) - timestamp_data.pts;
pts = (audio_frame.timecode as u64) - ndi_struct.start_pts; pts = (audio_frame.timecode as u64) - timestamp_data.pts;
//pts = (audio_frame.timecode as u64) - ndi_struct.start_pts;
let buff_size = ((audio_frame.channel_stride_in_bytes)) as usize; let buff_size = ((audio_frame.channel_stride_in_bytes)) as usize;
let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); let mut buffer = gst::Buffer::with_size(buff_size).unwrap();

View file

@ -19,9 +19,11 @@ use std::ptr;
use ndilib::*; use ndilib::*;
use connect_ndi; use connect_ndi;
use ndi_struct; // use ndi_struct;
use stop_ndi; use stop_ndi;
use hashmap_receivers;
// Property value storage // Property value storage
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
@ -324,16 +326,14 @@ impl NdiVideoSrc {
fn fixate(&self, element: &BaseSrc, caps: gst::Caps) -> gst::Caps { fn fixate(&self, element: &BaseSrc, caps: gst::Caps) -> gst::Caps {
//We need to set the correct caps resolution and framerate //We need to set the correct caps resolution and framerate
unsafe{ unsafe{
let recv = match ndi_struct.recv{ let map = hashmap_receivers.lock().unwrap();
None => { let settings = self.settings.lock().unwrap();
//TODO Update gst_element_error with one more descriptive
//println!("pNDI_recv no encontrado"); let mut id = &settings.stream_name;
gst_element_error!(element, gst::CoreError::Negotiation, ["No encontramos ndi recv"]); if (&settings.ip != ""){
//TODO if none not return anything id = &settings.ip;
return caps;
} }
Some(ref recv) => recv.clone(), let recv = map.get(id).unwrap();
};
let pNDI_recv = recv.recv; let pNDI_recv = recv.recv;
let mut timestamp_data = self.timestamp_data.lock().unwrap(); let mut timestamp_data = self.timestamp_data.lock().unwrap();
@ -345,8 +345,9 @@ impl NdiVideoSrc {
frame_type = NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000); frame_type = NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000);
} }
//timestamp_data.pts = video_frame.timecode as u64; //TODO Check that this is working
ndi_struct.start_pts = video_frame.timecode as u64; timestamp_data.pts = video_frame.timecode as u64;
//ndi_struct.start_pts = video_frame.timecode as u64;
let mut caps = gst::Caps::truncate(caps); let mut caps = gst::Caps::truncate(caps);
{ {
@ -386,23 +387,23 @@ impl NdiVideoSrc {
Some(ref info) => info.clone(), Some(ref info) => info.clone(),
}; };
unsafe{ unsafe{
let recv = match ndi_struct.recv{ let map = hashmap_receivers.lock().unwrap();
None => { let mut id = &_settings.stream_name;
//TODO Update gst_element_error with one more descriptive
//println!("pNDI_recv no encontrado"); if (&_settings.ip != ""){
gst_element_error!(element, gst::CoreError::Negotiation, ["No encontramos ndi recv"]); id = &_settings.ip;
return Err(gst::FlowReturn::NotNegotiated);
} }
Some(ref recv) => recv.clone(), let recv = map.get(id).unwrap();
};
let pNDI_recv = recv.recv; let pNDI_recv = recv.recv;
let pts: u64; let pts: u64;
let video_frame: NDIlib_video_frame_v2_t = Default::default(); let video_frame: NDIlib_video_frame_v2_t = Default::default();
NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000,); NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000,);
//pts = (video_frame.timecode as u64) - timestamp_data.pts;
pts = (video_frame.timecode as u64) - ndi_struct.start_pts; pts = (video_frame.timecode as u64) - timestamp_data.pts;
//pts = (video_frame.timecode as u64) - ndi_struct.start_pts;
let buff_size = (video_frame.yres * video_frame.line_stride_in_bytes) as usize; let buff_size = (video_frame.yres * video_frame.line_stride_in_bytes) as usize;
//println!("{:?}", buff_size); //println!("{:?}", buff_size);
let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); let mut buffer = gst::Buffer::with_size(buff_size).unwrap();