Merge pull request #34 from sdroege/safe-ndi-wrapper

Lots of improvements
This commit is contained in:
Ruben Gonzalez 2019-07-23 07:19:04 +02:00 committed by GitHub
commit a0c5b24fb8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 3325 additions and 945 deletions

View file

@ -8,16 +8,23 @@ description = "NewTek NDI Plugin"
[dependencies]
glib = { version = "0.8.0", features = ["subclassing"] }
gstreamer = { version = "0.14.0", features = ["subclassing"] }
gobject-sys = "0.9"
gstreamer = { version = "0.14.3", features = ["subclassing", "v1_12"] }
gstreamer-base = { version = "0.14.0", features = ["subclassing"] }
gstreamer-audio = "0.14.0"
gstreamer-video = "0.14.0"
gstreamer-video = { version = "0.14.3", features = ["v1_12"] }
gstreamer-sys = "0.8"
lazy_static = "1.1.0"
byte-slice-cast = "0.2.0"
[build-dependencies]
gst-plugin-version-helper = "0.1"
[features]
default = ["interlaced-fields", "reference-timestamps"]
interlaced-fields = ["gstreamer/v1_16", "gstreamer-video/v1_16"]
reference-timestamps = ["gstreamer/v1_14"]
[lib]
name = "gstndi"
crate-type = ["cdylib"]

View file

@ -16,12 +16,12 @@ gst-inspect-1.0 ndivideosrc
gst-inspect-1.0 ndiaudiosrc
#Video pipeline
gst-launch-1.0 ndivideosrc stream-name="GC-DEV2 (OBS)" ! autovideosink
gst-launch-1.0 ndivideosrc ndi-name="GC-DEV2 (OBS)" ! autovideosink
#Audio pipeline
gst-launch-1.0 ndiaudiosrc stream-name="GC-DEV2 (OBS)" ! autoaudiosink
gst-launch-1.0 ndiaudiosrc ndi-name="GC-DEV2 (OBS)" ! autoaudiosink
#Video and audio pipeline
gst-launch-1.0 ndivideosrc stream-name="GC-DEV2 (OBS)" ! autovideosink ndiaudiosrc stream-name="GC-DEV2 (OBS)" ! autoaudiosink
gst-launch-1.0 ndivideosrc ndi-name="GC-DEV2 (OBS)" ! autovideosink ndiaudiosrc ndi-name="GC-DEV2 (OBS)" ! autoaudiosink
```
Feel free to contribute to this project. Some ways you can contribute are:
@ -63,11 +63,8 @@ gst-inspect-1.0 ndi
More info about GStreamer plugins written in Rust:
----------------------------------
https://github.com/sdroege/gstreamer-rs
https://github.com/sdroege/gst-plugin-rs
https://coaxion.net/blog/2018/01/how-to-write-gstreamer-elements-in-rust-part-1-a-video-filter-for-converting-rgb-to-grayscale/
https://coaxion.net/blog/2018/02/how-to-write-gstreamer-elements-in-rust-part-2-a-raw-audio-sine-wave-source/
https://gitlab.freedesktop.org/gstreamer/gstreamer-rs
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs
License

View file

@ -1,255 +1,156 @@
#![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)]
#[macro_use]
extern crate glib;
use glib::prelude::*;
#[macro_use]
extern crate gstreamer as gst;
use gst::prelude::*;
extern crate gstreamer_audio as gst_audio;
extern crate gstreamer_base as gst_base;
extern crate gstreamer_sys as gst_sys;
extern crate gstreamer_video as gst_video;
#[macro_use]
extern crate lazy_static;
extern crate byte_slice_cast;
pub mod ndi;
mod ndiaudiosrc;
pub mod ndisys;
mod ndivideosrc;
pub mod receiver;
// use gst_plugin::base_src::*;
use ndi::*;
use ndisys::*;
use std::ffi::{CStr, CString};
use receiver::*;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time;
use gst::GstObjectExt;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
#[repr(u32)]
pub enum TimestampMode {
ReceiveTime = 0,
Timecode = 1,
Timestamp = 2,
}
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
if !ndi::initialize() {
return Err(glib_bool_error!("Cannot initialize NDI"));
}
ndivideosrc::register(plugin)?;
ndiaudiosrc::register(plugin)?;
Ok(())
}
struct ndi_receiver_info {
stream_name: String,
ip: String,
video: bool,
audio: bool,
ndi_instance: NdiInstance,
initial_timestamp: u64,
id: i8,
}
struct Ndi {
start_pts: gst::ClockTime,
}
static mut ndi_struct: Ndi = Ndi {
start_pts: gst::ClockTime(Some(0)),
};
lazy_static! {
static ref hashmap_receivers: Mutex<HashMap<i8, ndi_receiver_info>> = {
let m = HashMap::new();
Mutex::new(m)
static ref DEFAULT_RECEIVER_NDI_NAME: String = {
format!(
"GStreamer NDI Source {}-{}",
env!("CARGO_PKG_VERSION"),
env!("COMMIT_ID")
)
};
}
static mut id_receiver: i8 = 0;
#[cfg(feature = "reference-timestamps")]
lazy_static! {
static ref TIMECODE_CAPS: gst::Caps =
{ gst::Caps::new_simple("timestamp/x-ndi-timecode", &[]) };
static ref TIMESTAMP_CAPS: gst::Caps =
{ gst::Caps::new_simple("timestamp/x-ndi-timestamp", &[]) };
}
fn connect_ndi(
cat: gst::DebugCategory,
element: &gst_base::BaseSrc,
ip: &str,
stream_name: &str,
) -> i8 {
gst_debug!(cat, obj: element, "Starting NDI connection...");
impl glib::translate::ToGlib for TimestampMode {
type GlibType = i32;
let mut receivers = hashmap_receivers.lock().unwrap();
let mut audio = false;
let mut video = false;
if element
.get_factory()
.map(|f| f.get_name() == "ndiaudiosrc")
.unwrap_or(false)
{
audio = true;
} else {
video = true;
}
for val in receivers.values_mut() {
if val.ip == ip || val.stream_name == stream_name {
if (val.audio && val.video) || (val.audio && audio) || (val.video && video) {
continue;
} else {
if video {
val.video = video;
} else {
val.audio = audio;
}
return val.id;
}
}
}
unsafe {
if !NDIlib_initialize() {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Cannot run NDI: NDIlib_initialize error"]
);
return 0;
}
let NDI_find_create_desc: NDIlib_find_create_t = Default::default();
let pNDI_find = NDIlib_find_create_v2(&NDI_find_create_desc);
if pNDI_find.is_null() {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Cannot run NDI: NDIlib_find_create_v2 error"]
);
return 0;
}
let mut total_sources: u32 = 0;
let p_sources;
// TODO Sleep 1s to wait for all sources
NDIlib_find_wait_for_sources(pNDI_find, 2000);
p_sources = NDIlib_find_get_current_sources(pNDI_find, &mut total_sources as *mut u32);
// We need at least one source
if p_sources.is_null() {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Error getting NDIlib_find_get_current_sources"]
);
return 0;
}
let mut no_source: isize = -1;
for i in 0..total_sources as isize {
if CStr::from_ptr((*p_sources.offset(i)).p_ndi_name)
.to_string_lossy()
.into_owned()
== stream_name
|| CStr::from_ptr((*p_sources.offset(i)).p_ip_address)
.to_string_lossy()
.into_owned()
== ip
{
no_source = i;
break;
}
}
if no_source == -1 {
gst_element_error!(element, gst::ResourceError::OpenRead, ["Stream not found"]);
return 0;
}
gst_debug!(
cat,
obj: element,
"Total sources in network {}: Connecting to NDI source with name '{}' and address '{}'",
total_sources,
CStr::from_ptr((*p_sources.offset(no_source)).p_ndi_name)
.to_string_lossy()
.into_owned(),
CStr::from_ptr((*p_sources.offset(no_source)).p_ip_address)
.to_string_lossy()
.into_owned()
);
let source = *p_sources.offset(no_source);
let source_ip = CStr::from_ptr(source.p_ip_address)
.to_string_lossy()
.into_owned();
let source_name = CStr::from_ptr(source.p_ndi_name)
.to_string_lossy()
.into_owned();
let p_ndi_name = CString::new("Galicaster NDI Receiver").unwrap();
let NDI_recv_create_desc = NDIlib_recv_create_v3_t {
source_to_connect_to: source,
p_ndi_name: p_ndi_name.as_ptr(),
..Default::default()
};
let pNDI_recv = NDIlib_recv_create_v3(&NDI_recv_create_desc);
if pNDI_recv.is_null() {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Cannot run NDI: NDIlib_recv_create_v3 error"]
);
return 0;
}
NDIlib_find_destroy(pNDI_find);
let tally_state: NDIlib_tally_t = Default::default();
NDIlib_recv_set_tally(pNDI_recv, &tally_state);
let data = CString::new("<ndi_hwaccel enabled=\"true\"/>").unwrap();
let enable_hw_accel = NDIlib_metadata_frame_t {
length: data.to_bytes().len() as i32,
timecode: 0,
p_data: data.as_ptr(),
};
NDIlib_recv_send_metadata(pNDI_recv, &enable_hw_accel);
id_receiver += 1;
receivers.insert(
id_receiver,
ndi_receiver_info {
stream_name: source_name.clone(),
ip: source_ip.clone(),
video,
audio,
ndi_instance: NdiInstance { recv: pNDI_recv },
initial_timestamp: 0,
id: id_receiver,
},
);
gst_debug!(cat, obj: element, "Started NDI connection");
id_receiver
fn to_glib(&self) -> i32 {
*self as i32
}
}
fn stop_ndi(cat: gst::DebugCategory, element: &gst_base::BaseSrc, id: i8) -> bool {
gst_debug!(cat, obj: element, "Closing NDI connection...");
let mut receivers = hashmap_receivers.lock().unwrap();
{
let val = receivers.get_mut(&id).unwrap();
if val.video && val.audio {
if element.get_name().contains("audiosrc") {
val.audio = false;
} else {
val.video = false;
}
return true;
}
let recv = &val.ndi_instance;
let pNDI_recv = recv.recv;
unsafe {
NDIlib_recv_destroy(pNDI_recv);
// ndi_struct.recv = None;
NDIlib_destroy();
impl glib::translate::FromGlib<i32> for TimestampMode {
fn from_glib(value: i32) -> Self {
match value {
0 => TimestampMode::ReceiveTime,
1 => TimestampMode::Timecode,
2 => TimestampMode::Timestamp,
_ => unreachable!(),
}
}
receivers.remove(&id);
gst_debug!(cat, obj: element, "Closed NDI connection");
true
}
impl StaticType for TimestampMode {
fn static_type() -> glib::Type {
timestamp_mode_get_type()
}
}
impl<'a> glib::value::FromValueOptional<'a> for TimestampMode {
unsafe fn from_value_optional(value: &glib::Value) -> Option<Self> {
Some(glib::value::FromValue::from_value(value))
}
}
impl<'a> glib::value::FromValue<'a> for TimestampMode {
unsafe fn from_value(value: &glib::Value) -> Self {
use glib::translate::ToGlibPtr;
glib::translate::from_glib(gobject_sys::g_value_get_enum(value.to_glib_none().0))
}
}
impl glib::value::SetValue for TimestampMode {
unsafe fn set_value(value: &mut glib::Value, this: &Self) {
use glib::translate::{ToGlib, ToGlibPtrMut};
gobject_sys::g_value_set_enum(value.to_glib_none_mut().0, this.to_glib())
}
}
fn timestamp_mode_get_type() -> glib::Type {
use std::sync::Once;
static ONCE: Once = Once::new();
static mut TYPE: glib::Type = glib::Type::Invalid;
ONCE.call_once(|| {
use std::ffi;
use std::ptr;
static mut VALUES: [gobject_sys::GEnumValue; 4] = [
gobject_sys::GEnumValue {
value: TimestampMode::ReceiveTime as i32,
value_name: b"Receive Time\0" as *const _ as *const _,
value_nick: b"receive-time\0" as *const _ as *const _,
},
gobject_sys::GEnumValue {
value: TimestampMode::Timecode as i32,
value_name: b"NDI Timecode\0" as *const _ as *const _,
value_nick: b"timecode\0" as *const _ as *const _,
},
gobject_sys::GEnumValue {
value: TimestampMode::Timestamp as i32,
value_name: b"NDI Timestamp\0" as *const _ as *const _,
value_nick: b"timestamp\0" as *const _ as *const _,
},
gobject_sys::GEnumValue {
value: 0,
value_name: ptr::null(),
value_nick: ptr::null(),
},
];
let name = ffi::CString::new("GstNdiTimestampMode").unwrap();
unsafe {
let type_ = gobject_sys::g_enum_register_static(name.as_ptr(), VALUES.as_ptr());
TYPE = glib::translate::from_glib(type_);
}
});
unsafe {
assert_ne!(TYPE, glib::Type::Invalid);
TYPE
}
}
gst_plugin_define!(

722
src/ndi.rs Normal file
View file

@ -0,0 +1,722 @@
use ndisys::*;
use std::ffi;
use std::mem;
use std::ptr;
use std::sync::{Arc, Mutex};
pub fn initialize() -> bool {
unsafe { NDIlib_initialize() }
}
#[derive(Debug)]
pub struct FindBuilder<'a> {
show_local_sources: bool,
groups: Option<&'a str>,
extra_ips: Option<&'a str>,
}
impl<'a> Default for FindBuilder<'a> {
fn default() -> Self {
Self {
show_local_sources: true,
groups: None,
extra_ips: None,
}
}
}
impl<'a> FindBuilder<'a> {
pub fn show_local_sources(self, show_local_sources: bool) -> Self {
Self {
show_local_sources,
..self
}
}
pub fn groups(self, groups: &'a str) -> Self {
Self {
groups: Some(groups),
..self
}
}
pub fn extra_ips(self, extra_ips: &'a str) -> Self {
Self {
extra_ips: Some(extra_ips),
..self
}
}
pub fn build(self) -> Option<FindInstance> {
let groups = self.groups.map(|s| ffi::CString::new(s).unwrap());
let extra_ips = self.extra_ips.map(|s| ffi::CString::new(s).unwrap());
unsafe {
let ptr = NDIlib_find_create_v2(&NDIlib_find_create_t {
show_local_sources: self.show_local_sources,
p_groups: groups.as_ref().map(|s| s.as_ptr()).unwrap_or(ptr::null()),
p_extra_ips: extra_ips
.as_ref()
.map(|s| s.as_ptr())
.unwrap_or(ptr::null()),
});
if ptr.is_null() {
None
} else {
Some(FindInstance(ptr::NonNull::new_unchecked(ptr)))
}
}
}
}
#[derive(Debug)]
pub struct FindInstance(ptr::NonNull<::std::os::raw::c_void>);
unsafe impl Send for FindInstance {}
impl FindInstance {
pub fn builder<'a>() -> FindBuilder<'a> {
FindBuilder::default()
}
pub fn wait_for_sources(&mut self, timeout_in_ms: u32) -> bool {
unsafe { NDIlib_find_wait_for_sources(self.0.as_ptr(), timeout_in_ms) }
}
pub fn get_current_sources(&mut self) -> Vec<Source> {
unsafe {
let mut no_sources = mem::MaybeUninit::uninit();
let sources_ptr =
NDIlib_find_get_current_sources(self.0.as_ptr(), no_sources.as_mut_ptr());
let no_sources = no_sources.assume_init();
if sources_ptr.is_null() || no_sources == 0 {
return vec![];
}
let mut sources = vec![];
for i in 0..no_sources {
sources.push(Source::Borrowed(
ptr::NonNull::new_unchecked(sources_ptr.add(i as usize) as *mut _),
self,
));
}
sources
}
}
}
impl Drop for FindInstance {
fn drop(&mut self) {
unsafe {
NDIlib_find_destroy(self.0.as_mut());
}
}
}
#[derive(Debug)]
pub enum Source<'a> {
Borrowed(ptr::NonNull<NDIlib_source_t>, &'a FindInstance),
Owned(NDIlib_source_t, ffi::CString, ffi::CString),
}
unsafe impl<'a> Send for Source<'a> {}
impl<'a> Source<'a> {
pub fn ndi_name(&self) -> &str {
unsafe {
let ptr = match *self {
Source::Borrowed(ptr, _) => &*ptr.as_ptr(),
Source::Owned(ref source, _, _) => source,
};
assert!(!ptr.p_ndi_name.is_null());
ffi::CStr::from_ptr(ptr.p_ndi_name).to_str().unwrap()
}
}
pub fn ip_address(&self) -> &str {
unsafe {
let ptr = match *self {
Source::Borrowed(ptr, _) => &*ptr.as_ptr(),
Source::Owned(ref source, _, _) => source,
};
assert!(!ptr.p_ip_address.is_null());
ffi::CStr::from_ptr(ptr.p_ip_address).to_str().unwrap()
}
}
fn ndi_name_ptr(&self) -> *const ::std::os::raw::c_char {
unsafe {
match *self {
Source::Borrowed(ptr, _) => ptr.as_ref().p_ndi_name,
Source::Owned(_, ref ndi_name, _) => ndi_name.as_ptr(),
}
}
}
fn ip_address_ptr(&self) -> *const ::std::os::raw::c_char {
unsafe {
match *self {
Source::Borrowed(ptr, _) => ptr.as_ref().p_ip_address,
Source::Owned(_, _, ref ip_address) => ip_address.as_ptr(),
}
}
}
pub fn to_owned<'b>(&self) -> Source<'b> {
unsafe {
let (ndi_name, ip_address) = match *self {
Source::Borrowed(ptr, _) => (ptr.as_ref().p_ndi_name, ptr.as_ref().p_ip_address),
Source::Owned(_, ref ndi_name, ref ip_address) => {
(ndi_name.as_ptr(), ip_address.as_ptr())
}
};
let ndi_name = ffi::CString::new(ffi::CStr::from_ptr(ndi_name).to_bytes()).unwrap();
let ip_address = ffi::CString::new(ffi::CStr::from_ptr(ip_address).to_bytes()).unwrap();
Source::Owned(
NDIlib_source_t {
p_ndi_name: ndi_name.as_ptr(),
p_ip_address: ip_address.as_ptr(),
},
ndi_name,
ip_address,
)
}
}
}
#[derive(Debug)]
pub struct RecvBuilder<'a> {
source_to_connect_to: &'a Source<'a>,
allow_video_fields: bool,
bandwidth: NDIlib_recv_bandwidth_e,
color_format: NDIlib_recv_color_format_e,
ndi_name: &'a str,
}
impl<'a> RecvBuilder<'a> {
pub fn allow_video_fields(self, allow_video_fields: bool) -> Self {
Self {
allow_video_fields,
..self
}
}
pub fn bandwidth(self, bandwidth: NDIlib_recv_bandwidth_e) -> Self {
Self { bandwidth, ..self }
}
pub fn color_format(self, color_format: NDIlib_recv_color_format_e) -> Self {
Self {
color_format,
..self
}
}
pub fn build(self) -> Option<RecvInstance> {
unsafe {
let ndi_name = ffi::CString::new(self.ndi_name).unwrap();
let ptr = NDIlib_recv_create_v3(&NDIlib_recv_create_v3_t {
source_to_connect_to: NDIlib_source_t {
p_ndi_name: self.source_to_connect_to.ndi_name_ptr(),
p_ip_address: self.source_to_connect_to.ip_address_ptr(),
},
allow_video_fields: self.allow_video_fields,
bandwidth: self.bandwidth,
color_format: self.color_format,
p_ndi_name: ndi_name.as_ptr(),
});
if ptr.is_null() {
None
} else {
Some(RecvInstance(Arc::new((
RecvInstanceInner(ptr::NonNull::new_unchecked(ptr)),
Mutex::new(()),
))))
}
}
}
}
// Any access to the RecvInstanceInner apart from calling the capture function must be protected by
// the mutex
#[derive(Debug, Clone)]
pub struct RecvInstance(Arc<(RecvInstanceInner, Mutex<()>)>);
#[derive(Debug)]
struct RecvInstanceInner(ptr::NonNull<::std::os::raw::c_void>);
unsafe impl Send for RecvInstanceInner {}
// Not 100% true but we ensure safety with the mutex. The documentation says that only the
// capturing itself can be performed from multiple threads at once safely.
unsafe impl Sync for RecvInstanceInner {}
impl RecvInstance {
pub fn builder<'a>(source_to_connect_to: &'a Source, ndi_name: &'a str) -> RecvBuilder<'a> {
RecvBuilder {
source_to_connect_to,
allow_video_fields: true,
bandwidth: NDIlib_recv_bandwidth_highest,
color_format: NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA,
ndi_name,
}
}
pub fn set_tally(&self, tally: &Tally) -> bool {
unsafe {
let _lock = (self.0).1.lock().unwrap();
NDIlib_recv_set_tally(((self.0).0).0.as_ptr(), &tally.0)
}
}
pub fn send_metadata(&self, metadata: &MetadataFrame) -> bool {
unsafe {
let _lock = (self.0).1.lock().unwrap();
NDIlib_recv_send_metadata(((self.0).0).0.as_ptr(), metadata.as_ptr())
}
}
pub fn get_queue(&self) -> Queue {
unsafe {
let _lock = (self.0).1.lock().unwrap();
let mut queue = mem::MaybeUninit::uninit();
NDIlib_recv_get_queue(((self.0).0).0.as_ptr(), queue.as_mut_ptr());
Queue(queue.assume_init())
}
}
pub fn capture(
&self,
video: bool,
audio: bool,
metadata: bool,
timeout_in_ms: u32,
) -> Result<Option<Frame>, ()> {
unsafe {
// Capturing from multiple threads at once is safe according to the documentation
let ptr = ((self.0).0).0.as_ptr();
let mut video_frame = mem::zeroed();
let mut audio_frame = mem::zeroed();
let mut metadata_frame = mem::zeroed();
let res = NDIlib_recv_capture_v2(
ptr,
if video {
&mut video_frame
} else {
ptr::null_mut()
},
if audio {
&mut audio_frame
} else {
ptr::null_mut()
},
if metadata {
&mut metadata_frame
} else {
ptr::null_mut()
},
timeout_in_ms,
);
match res {
NDIlib_frame_type_e::NDIlib_frame_type_audio => {
assert!(audio);
Ok(Some(Frame::Audio(AudioFrame::Borrowed(audio_frame, self))))
}
NDIlib_frame_type_e::NDIlib_frame_type_video => {
assert!(video);
Ok(Some(Frame::Video(VideoFrame::Borrowed(video_frame, self))))
}
NDIlib_frame_type_e::NDIlib_frame_type_metadata => {
assert!(metadata);
Ok(Some(Frame::Metadata(MetadataFrame::Borrowed(
metadata_frame,
self,
))))
}
NDIlib_frame_type_e::NDIlib_frame_type_error => Err(()),
_ => Ok(None),
}
}
}
}
impl Drop for RecvInstanceInner {
fn drop(&mut self) {
unsafe { NDIlib_recv_destroy(self.0.as_ptr() as *mut _) }
}
}
#[derive(Debug)]
pub struct Tally(NDIlib_tally_t);
unsafe impl Send for Tally {}
impl Default for Tally {
fn default() -> Self {
Self(NDIlib_tally_t {
on_program: true,
on_preview: false,
})
}
}
impl Tally {
pub fn new(on_program: bool, on_preview: bool) -> Self {
Self(NDIlib_tally_t {
on_program,
on_preview,
})
}
pub fn on_program(&self) -> bool {
self.0.on_program
}
pub fn on_preview(&self) -> bool {
self.0.on_preview
}
}
#[derive(Debug)]
pub enum Frame<'a> {
Video(VideoFrame<'a>),
Audio(AudioFrame<'a>),
Metadata(MetadataFrame<'a>),
}
#[derive(Debug)]
pub enum VideoFrame<'a> {
//Owned(NDIlib_video_frame_v2_t, Option<ffi::CString>, Option<Vec<u8>>),
Borrowed(NDIlib_video_frame_v2_t, &'a RecvInstance),
}
impl<'a> VideoFrame<'a> {
pub fn xres(&self) -> i32 {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.xres,
}
}
pub fn yres(&self) -> i32 {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.yres,
}
}
pub fn fourcc(&self) -> NDIlib_FourCC_type_e {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.FourCC,
}
}
pub fn frame_rate(&self) -> (i32, i32) {
match self {
VideoFrame::Borrowed(ref frame, _) => (frame.frame_rate_N, frame.frame_rate_D),
}
}
pub fn picture_aspect_ratio(&self) -> f32 {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.picture_aspect_ratio,
}
}
pub fn frame_format_type(&self) -> NDIlib_frame_format_type_e {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.frame_format_type,
}
}
pub fn timecode(&self) -> i64 {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.timecode,
}
}
pub fn data(&self) -> &[u8] {
// FIXME: Unclear if this is correct. Needs to be validated against an actual
// interlaced stream
let frame_size = if self.frame_format_type()
== NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_0
|| self.frame_format_type()
== NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_1
{
self.yres() * self.line_stride_in_bytes() / 2
} else {
self.yres() * self.line_stride_in_bytes()
};
unsafe {
use std::slice;
match self {
VideoFrame::Borrowed(ref frame, _) => {
slice::from_raw_parts(frame.p_data as *const u8, frame_size as usize)
}
}
}
}
pub fn line_stride_in_bytes(&self) -> i32 {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.line_stride_in_bytes,
}
}
pub fn metadata(&self) -> Option<&str> {
unsafe {
match self {
VideoFrame::Borrowed(ref frame, _) => {
if frame.p_metadata.is_null() {
None
} else {
Some(ffi::CStr::from_ptr(frame.p_metadata).to_str().unwrap())
}
}
}
}
}
pub fn timestamp(&self) -> i64 {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.timestamp,
}
}
pub fn as_ptr(&self) -> *const NDIlib_video_frame_v2_t {
match self {
VideoFrame::Borrowed(ref frame, _) => frame,
}
}
}
impl<'a> Drop for VideoFrame<'a> {
#[allow(irrefutable_let_patterns)]
fn drop(&mut self) {
if let VideoFrame::Borrowed(ref mut frame, ref recv) = *self {
unsafe {
NDIlib_recv_free_video_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
}
}
}
}
#[derive(Debug)]
pub enum AudioFrame<'a> {
//Owned(NDIlib_audio_frame_v2_t, Option<ffi::CString>, Option<Vec<u8>>),
Borrowed(NDIlib_audio_frame_v2_t, &'a RecvInstance),
}
impl<'a> AudioFrame<'a> {
pub fn sample_rate(&self) -> i32 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.sample_rate,
}
}
pub fn no_channels(&self) -> i32 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.no_channels,
}
}
pub fn no_samples(&self) -> i32 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.no_samples,
}
}
pub fn timecode(&self) -> i64 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.timecode,
}
}
pub fn data(&self) -> &[u8] {
unsafe {
use std::slice;
match self {
AudioFrame::Borrowed(ref frame, _) => slice::from_raw_parts(
frame.p_data as *const u8,
(frame.no_samples * frame.channel_stride_in_bytes) as usize,
),
}
}
}
pub fn channel_stride_in_bytes(&self) -> i32 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.channel_stride_in_bytes,
}
}
pub fn metadata(&self) -> Option<&str> {
unsafe {
match self {
AudioFrame::Borrowed(ref frame, _) => {
if frame.p_metadata.is_null() {
None
} else {
Some(ffi::CStr::from_ptr(frame.p_metadata).to_str().unwrap())
}
}
}
}
}
pub fn timestamp(&self) -> i64 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.timestamp,
}
}
pub fn as_ptr(&self) -> *const NDIlib_audio_frame_v2_t {
match self {
AudioFrame::Borrowed(ref frame, _) => frame,
}
}
pub fn copy_to_interleaved_16s(&self, data: &mut [i16]) {
assert_eq!(
data.len(),
(self.no_samples() * self.no_channels()) as usize
);
let mut dst = NDIlib_audio_frame_interleaved_16s_t {
sample_rate: self.sample_rate(),
no_channels: self.no_channels(),
no_samples: self.no_samples(),
timecode: self.timecode(),
reference_level: 0,
p_data: data.as_mut_ptr(),
};
unsafe {
NDIlib_util_audio_to_interleaved_16s_v2(self.as_ptr(), &mut dst);
}
}
}
impl<'a> Drop for AudioFrame<'a> {
#[allow(irrefutable_let_patterns)]
fn drop(&mut self) {
if let AudioFrame::Borrowed(ref mut frame, ref recv) = *self {
unsafe {
NDIlib_recv_free_audio_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
}
}
}
}
#[derive(Debug)]
pub enum MetadataFrame<'a> {
Owned(NDIlib_metadata_frame_t, Option<ffi::CString>),
Borrowed(NDIlib_metadata_frame_t, &'a RecvInstance),
}
impl<'a> MetadataFrame<'a> {
pub fn new(timecode: i64, data: Option<&str>) -> Self {
let data = data.map(|s| ffi::CString::new(s).unwrap());
MetadataFrame::Owned(
NDIlib_metadata_frame_t {
length: data
.as_ref()
.map(|s| s.to_str().unwrap().len())
.unwrap_or(0) as i32,
timecode,
p_data: data
.as_ref()
.map(|s| s.as_ptr() as *mut _)
.unwrap_or(ptr::null_mut()),
},
data,
)
}
pub fn timecode(&self) -> i64 {
match self {
MetadataFrame::Owned(ref frame, _) => frame.timecode,
MetadataFrame::Borrowed(ref frame, _) => frame.timecode,
}
}
pub fn metadata(&self) -> Option<&str> {
unsafe {
match self {
MetadataFrame::Owned(_, ref metadata) => {
metadata.as_ref().map(|s| s.to_str().unwrap())
}
MetadataFrame::Borrowed(ref frame, _) => {
if frame.p_data.is_null() || frame.length == 0 {
None
} else if frame.length != 0 {
use std::slice;
Some(
ffi::CStr::from_bytes_with_nul_unchecked(slice::from_raw_parts(
frame.p_data as *const u8,
frame.length as usize,
))
.to_str()
.unwrap(),
)
} else {
Some(ffi::CStr::from_ptr(frame.p_data).to_str().unwrap())
}
}
}
}
}
pub fn as_ptr(&self) -> *const NDIlib_metadata_frame_t {
match self {
MetadataFrame::Owned(ref frame, _) => frame,
MetadataFrame::Borrowed(ref frame, _) => frame,
}
}
}
impl<'a> Default for MetadataFrame<'a> {
fn default() -> Self {
MetadataFrame::Owned(
NDIlib_metadata_frame_t {
length: 0,
timecode: 0, //NDIlib_send_timecode_synthesize,
p_data: ptr::null(),
},
None,
)
}
}
impl<'a> Drop for MetadataFrame<'a> {
fn drop(&mut self) {
if let MetadataFrame::Borrowed(ref mut frame, ref recv) = *self {
unsafe {
NDIlib_recv_free_metadata(((recv.0).0).0.as_ptr() as *mut _, frame);
}
}
}
}
#[derive(Debug, Clone)]
pub struct Queue(NDIlib_recv_queue_t);
impl Queue {
pub fn audio_frames(&self) -> i32 {
self.0.audio_frames
}
pub fn video_frames(&self) -> i32 {
self.0.video_frames
}
pub fn metadata_frames(&self) -> i32 {
self.0.metadata_frames
}
}

View file

@ -1,5 +1,3 @@
#![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)]
use glib;
use glib::subclass;
use glib::subclass::prelude::*;
@ -14,64 +12,109 @@ use gst_base::subclass::prelude::*;
use std::sync::Mutex;
use std::{i32, u32};
use std::ptr;
use connect_ndi;
use ndi_struct;
use ndisys::*;
use stop_ndi;
use ndisys;
use byte_slice_cast::AsMutSliceOf;
use hashmap_receivers;
use AudioReceiver;
use Receiver;
use ReceiverControlHandle;
use ReceiverItem;
use TimestampMode;
use DEFAULT_RECEIVER_NDI_NAME;
#[derive(Debug, Clone)]
struct Settings {
stream_name: String,
ip: String,
loss_threshold: u32,
id_receiver: i8,
latency: Option<gst::ClockTime>,
ndi_name: Option<String>,
ip_address: Option<String>,
connect_timeout: u32,
timeout: u32,
receiver_ndi_name: String,
bandwidth: ndisys::NDIlib_recv_bandwidth_e,
timestamp_mode: TimestampMode,
}
impl Default for Settings {
fn default() -> Self {
Settings {
stream_name: String::from("Fixed ndi stream name"),
ip: String::from(""),
loss_threshold: 5,
id_receiver: 0,
latency: None,
ndi_name: None,
ip_address: None,
receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(),
connect_timeout: 10000,
timeout: 5000,
bandwidth: ndisys::NDIlib_recv_bandwidth_highest,
timestamp_mode: TimestampMode::ReceiveTime,
}
}
}
static PROPERTIES: [subclass::Property; 3] = [
subclass::Property("stream-name", |_| {
static PROPERTIES: [subclass::Property; 7] = [
subclass::Property("ndi-name", |name| {
glib::ParamSpec::string(
"stream-name",
"Sream Name",
"Name of the streaming device",
name,
"NDI Name",
"NDI stream name of the sender",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("ip", |_| {
subclass::Property("ip-address", |name| {
glib::ParamSpec::string(
"ip",
"Stream IP",
"IP of the streaming device. Ex: 127.0.0.1:5961",
name,
"IP Address",
"IP address and port of the sender, e.g. 127.0.0.1:5961",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("loss-threshold", |_| {
subclass::Property("receiver-ndi-name", |name| {
glib::ParamSpec::string(
name,
"Receiver NDI Name",
"NDI stream name of this receiver",
Some(&*DEFAULT_RECEIVER_NDI_NAME),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("connect-timeout", |name| {
glib::ParamSpec::uint(
"loss-threshold",
"Loss threshold",
"Loss threshold",
name,
"Connect Timeout",
"Connection timeout in ms",
0,
60,
5,
u32::MAX,
10000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timeout", |name| {
glib::ParamSpec::uint(
name,
"Timeout",
"Receive timeout in ms",
0,
u32::MAX,
5000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("bandwidth", |name| {
glib::ParamSpec::int(
name,
"Bandwidth",
"Bandwidth, -10 metadata-only, 10 audio-only, 100 highest",
-10,
100,
100,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timestamp-mode", |name| {
glib::ParamSpec::enum_(
name,
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTime as i32,
glib::ParamFlags::READWRITE,
)
}),
@ -79,23 +122,25 @@ static PROPERTIES: [subclass::Property; 3] = [
struct State {
info: Option<gst_audio::AudioInfo>,
receiver: Option<Receiver<AudioReceiver>>,
current_latency: gst::ClockTime,
}
impl Default for State {
fn default() -> State {
State { info: None }
State {
info: None,
receiver: None,
current_latency: gst::CLOCK_TIME_NONE,
}
}
}
struct TimestampData {
offset: u64,
}
struct NdiAudioSrc {
pub(crate) struct NdiAudioSrc {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
timestamp_data: Mutex<TimestampData>,
receiver_controller: Mutex<Option<ReceiverControlHandle<AudioReceiver>>>,
}
impl ObjectSubclass for NdiAudioSrc {
@ -115,7 +160,7 @@ impl ObjectSubclass for NdiAudioSrc {
),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
timestamp_data: Mutex::new(TimestampData { offset: 0 }),
receiver_controller: Mutex::new(None),
}
}
@ -124,7 +169,7 @@ impl ObjectSubclass for NdiAudioSrc {
"NewTek NDI Audio Source",
"Source",
"NewTek NDI audio source",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_simple(
@ -132,17 +177,11 @@ impl ObjectSubclass for NdiAudioSrc {
&[
(
"format",
&gst::List::new(&[
//TODO add more formats?
//&gst_audio::AUDIO_FORMAT_F32.to_string(),
//&gst_audio::AUDIO_FORMAT_F64.to_string(),
&gst_audio::AUDIO_FORMAT_S16.to_string(),
]),
&gst::List::new(&[&gst_audio::AUDIO_FORMAT_S16.to_string()]),
),
("rate", &gst::IntRange::<i32>::new(1, i32::MAX)),
("channels", &gst::IntRange::<i32>::new(1, i32::MAX)),
("layout", &"interleaved"),
("channel-mask", &gst::Bitmask::new(0)),
],
);
@ -177,44 +216,94 @@ impl ObjectImpl for NdiAudioSrc {
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
match *prop {
subclass::Property("stream-name", ..) => {
subclass::Property("ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
let stream_name = value.get().unwrap();
let ndi_name = value.get();
gst_debug!(
self.cat,
obj: basesrc,
"Changing stream-name from {} to {}",
settings.stream_name,
stream_name
"Changing ndi-name from {:?} to {:?}",
settings.ndi_name,
ndi_name,
);
settings.stream_name = stream_name;
drop(settings);
settings.ndi_name = ndi_name;
}
subclass::Property("ip", ..) => {
subclass::Property("ip-address", ..) => {
let mut settings = self.settings.lock().unwrap();
let ip = value.get().unwrap();
let ip_address = value.get();
gst_debug!(
self.cat,
obj: basesrc,
"Changing ip from {} to {}",
settings.ip,
ip
"Changing ip from {:?} to {:?}",
settings.ip_address,
ip_address,
);
settings.ip = ip;
drop(settings);
settings.ip_address = ip_address;
}
subclass::Property("loss-threshold", ..) => {
subclass::Property("receiver-ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
let loss_threshold = value.get().unwrap();
let receiver_ndi_name = value.get();
gst_debug!(
self.cat,
obj: basesrc,
"Changing loss threshold from {} to {}",
settings.loss_threshold,
loss_threshold
"Changing receiver-ndi-name from {:?} to {:?}",
settings.receiver_ndi_name,
receiver_ndi_name,
);
settings.loss_threshold = loss_threshold;
drop(settings);
settings.receiver_ndi_name =
receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone());
}
subclass::Property("connect-timeout", ..) => {
let mut settings = self.settings.lock().unwrap();
let connect_timeout = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing connect-timeout from {} to {}",
settings.connect_timeout,
connect_timeout,
);
settings.connect_timeout = connect_timeout;
}
subclass::Property("timeout", ..) => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing timeout from {} to {}",
settings.timeout,
timeout,
);
settings.timeout = timeout;
}
subclass::Property("bandwidth", ..) => {
let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing bandwidth from {} to {}",
settings.bandwidth,
bandwidth,
);
settings.bandwidth = bandwidth;
}
subclass::Property("timestamp-mode", ..) => {
let mut settings = self.settings.lock().unwrap();
let timestamp_mode = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing timestamp mode from {:?} to {:?}",
settings.timestamp_mode,
timestamp_mode
);
if settings.timestamp_mode != timestamp_mode {
let _ = basesrc
.post_message(&gst::Message::new_latency().src(Some(basesrc)).build());
}
settings.timestamp_mode = timestamp_mode;
}
_ => unimplemented!(),
}
@ -224,17 +313,33 @@ impl ObjectImpl for NdiAudioSrc {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("stream-name", ..) => {
subclass::Property("ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.stream_name.to_value())
Ok(settings.ndi_name.to_value())
}
subclass::Property("ip", ..) => {
subclass::Property("ip-address", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.ip.to_value())
Ok(settings.ip_address.to_value())
}
subclass::Property("loss-threshold", ..) => {
subclass::Property("receiver-ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.loss_threshold.to_value())
Ok(settings.receiver_ndi_name.to_value())
}
subclass::Property("connect-timeout", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.connect_timeout.to_value())
}
subclass::Property("timeout", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.timeout.to_value())
}
subclass::Property("bandwidth", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.bandwidth.to_value())
}
subclass::Property("timestamp-mode", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.timestamp_mode.to_value())
}
_ => unimplemented!(),
}
@ -247,170 +352,143 @@ impl ElementImpl for NdiAudioSrc {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if transition == gst::StateChange::PausedToPlaying {
let mut receivers = hashmap_receivers.lock().unwrap();
let settings = self.settings.lock().unwrap();
let receiver = receivers.get_mut(&settings.id_receiver).unwrap();
let recv = &receiver.ndi_instance;
let pNDI_recv = recv.recv;
let audio_frame: NDIlib_audio_frame_v2_t = Default::default();
unsafe {
while NDIlib_recv_capture_v2(
pNDI_recv,
ptr::null(),
&audio_frame,
ptr::null(),
1000,
) != NDIlib_frame_type_e::NDIlib_frame_type_audio
{}
match transition {
gst::StateChange::PausedToPlaying => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_playing(true);
}
}
gst_debug!(
self.cat,
obj: element,
"NDI audio frame received: {:?}",
audio_frame
);
if receiver.initial_timestamp <= audio_frame.timestamp as u64
|| receiver.initial_timestamp == 0
{
receiver.initial_timestamp = audio_frame.timestamp as u64;
gst::StateChange::PlayingToPaused => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_playing(false);
}
}
unsafe {
NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame);
gst::StateChange::PausedToReady => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.shutdown();
}
}
gst_debug!(
self.cat,
obj: element,
"Setting initial timestamp to {}",
receiver.initial_timestamp
);
_ => (),
}
self.parent_change_state(element, transition)
}
}
impl BaseSrcImpl for NdiAudioSrc {
fn set_caps(
fn unlock(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(true);
}
Ok(())
}
fn unlock_stop(
&self,
element: &gst_base::BaseSrc,
caps: &gst::Caps,
) -> Result<(), gst::LoggableError> {
let info = match gst_audio::AudioInfo::from_caps(caps) {
None => {
return Err(gst_loggable_error!(
self.cat,
"Failed to build `AudioInfo` from caps {}",
caps
));
}
Some(info) => info,
};
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
let mut state = self.state.lock().unwrap();
state.info = Some(info);
) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Stop unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(false);
}
Ok(())
}
fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap().clone();
let mut settings = self.settings.lock().unwrap();
settings.id_receiver = connect_ndi(
if settings.ip_address.is_none() && settings.ndi_name.is_none() {
return Err(gst_error_msg!(
gst::LibraryError::Settings,
["No IP address or NDI name given"]
));
}
let receiver = connect_ndi(
self.cat,
element,
&settings.ip.clone(),
&settings.stream_name.clone(),
settings.ip_address.as_ref().map(String::as_str),
settings.ndi_name.as_ref().map(String::as_str),
&settings.receiver_ndi_name,
settings.connect_timeout,
settings.bandwidth,
settings.timestamp_mode,
settings.timeout,
);
match settings.id_receiver {
0 => Err(gst_error_msg!(
// settings.id_receiver exists
match receiver {
None => Err(gst_error_msg!(
gst::ResourceError::NotFound,
["Could not connect to this source"]
)),
_ => Ok(()),
Some(receiver) => {
*self.receiver_controller.lock().unwrap() =
Some(receiver.receiver_control_handle());
let mut state = self.state.lock().unwrap();
state.receiver = Some(receiver);
Ok(())
}
}
}
fn stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap();
stop_ndi(self.cat, element, settings.id_receiver);
// Commented because when adding ndi destroy stopped in this line
//*self.state.lock().unwrap() = Default::default();
fn stop(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() {
controller.shutdown();
}
*self.state.lock().unwrap() = State::default();
Ok(())
}
fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
if let QueryView::Scheduling(ref mut q) = query.view_mut() {
q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
q.add_scheduling_modes(&[gst::PadMode::Push]);
return true;
}
if let QueryView::Latency(ref mut q) = query.view_mut() {
let settings = &*self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
if let Some(ref _info) = state.info {
let latency = settings.latency.unwrap();
gst_debug!(self.cat, obj: element, "Returning latency {}", latency);
q.set(true, latency, gst::CLOCK_TIME_NONE);
return true;
} else {
return false;
match query.view_mut() {
QueryView::Scheduling(ref mut q) => {
q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
q.add_scheduling_modes(&[gst::PadMode::Push]);
true
}
QueryView::Latency(ref mut q) => {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
if state.current_latency.is_some() {
let min = if settings.timestamp_mode != TimestampMode::Timecode {
state.current_latency
} else {
0.into()
};
let max = 5 * state.current_latency;
gst_debug!(
self.cat,
obj: element,
"Returning latency min {} max {}",
min,
max
);
q.set(true, min, max);
true
} else {
false
}
}
_ => BaseSrcImplExt::parent_query(self, element, query),
}
BaseSrcImplExt::parent_query(self, element, query)
}
fn fixate(&self, element: &gst_base::BaseSrc, caps: gst::Caps) -> gst::Caps {
let receivers = hashmap_receivers.lock().unwrap();
let mut settings = self.settings.lock().unwrap();
let receiver = receivers.get(&settings.id_receiver).unwrap();
let recv = &receiver.ndi_instance;
let pNDI_recv = recv.recv;
let audio_frame: NDIlib_audio_frame_v2_t = Default::default();
unsafe {
while NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000)
!= NDIlib_frame_type_e::NDIlib_frame_type_audio
{}
}
let no_samples = audio_frame.no_samples as u64;
let audio_rate = audio_frame.sample_rate;
settings.latency = gst::SECOND.mul_div_floor(no_samples, audio_rate as u64);
let mut caps = gst::Caps::truncate(caps);
{
let caps = caps.make_mut();
let s = caps.get_mut_structure(0).unwrap();
s.fixate_field_nearest_int("rate", audio_rate);
s.fixate_field_nearest_int("channels", audio_frame.no_channels);
s.fixate_field_str("layout", "interleaved");
s.set_value(
"channel-mask",
gst::Bitmask::new(gst_audio::AudioChannelPosition::get_fallback_mask(
audio_frame.no_channels as u32,
))
.to_send_value(),
);
}
let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());
unsafe {
NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame);
s.fixate_field_nearest_int("rate", 48_000);
s.fixate_field_nearest_int("channels", 2);
}
self.parent_fixate(element, caps)
@ -422,123 +500,52 @@ impl BaseSrcImpl for NdiAudioSrc {
_offset: u64,
_length: u32,
) -> Result<gst::Buffer, gst::FlowError> {
let _settings = &*self.settings.lock().unwrap();
let mut timestamp_data = self.timestamp_data.lock().unwrap();
let state = self.state.lock().unwrap();
let _info = match state.info {
None => {
gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]);
return Err(gst::FlowError::NotNegotiated);
}
Some(ref info) => info.clone(),
};
let receivers = hashmap_receivers.lock().unwrap();
let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance;
let pNDI_recv = recv.recv;
let pts: u64;
let audio_frame: NDIlib_audio_frame_v2_t = Default::default();
unsafe {
let time = receivers
.get(&_settings.id_receiver)
.unwrap()
.initial_timestamp;
let mut skip_frame = true;
let mut count_frame_none = 0;
while skip_frame {
let frame_type =
NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000);
if (frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none
&& _settings.loss_threshold != 0)
|| frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error
{
if count_frame_none < _settings.loss_threshold {
count_frame_none += 1;
continue;
}
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none or error received, assuming that the source closed the stream...."]);
let recv = {
let mut state = self.state.lock().unwrap();
match state.receiver.take() {
Some(recv) => recv,
None => {
gst_error!(self.cat, obj: element, "Have no receiver");
return Err(gst::FlowError::Error);
} else if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none
&& _settings.loss_threshold == 0
{
gst_debug!(
self.cat,
obj: element,
"No audio frame received, sending empty buffer"
);
let buffer = gst::Buffer::with_size(0).unwrap();
return Ok(buffer);
}
if time >= (audio_frame.timestamp as u64) {
NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame);
gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (audio_frame.timestamp as u64), time);
} else {
skip_frame = false;
}
}
};
gst_log!(
self.cat,
obj: element,
"NDI audio frame received: {:?}",
(audio_frame)
);
match recv.capture() {
ReceiverItem::Buffer(buffer, info) => {
let mut state = self.state.lock().unwrap();
state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().ok_or_else(|| {
gst_element_error!(
element,
gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info]
);
gst::FlowError::NotNegotiated
})?;
state.info = Some(info.clone());
state.current_latency = buffer.get_duration();
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element.set_caps(&caps).map_err(|_| {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Failed to negotiate caps: {:?}", caps]
);
gst::FlowError::NotNegotiated
})?;
pts = audio_frame.timestamp as u64 - time;
gst_log!(
self.cat,
obj: element,
"Calculated pts for audio frame: {:?}",
(pts)
);
// We multiply by 2 because is the size in bytes of an i16 variable
let buff_size = (audio_frame.no_samples * 2 * audio_frame.no_channels) as usize;
let mut buffer = gst::Buffer::with_size(buff_size).unwrap();
{
if ndi_struct.start_pts == gst::ClockTime(Some(0)) {
ndi_struct.start_pts =
element.get_clock().unwrap().get_time() - element.get_base_time();
let _ = element
.post_message(&gst::Message::new_latency().src(Some(element)).build());
}
let buffer = buffer.get_mut().unwrap();
// Newtek NDI yields times in 100ns intervals since the Unix Time
let pts: gst::ClockTime = (pts * 100).into();
buffer.set_pts(pts + ndi_struct.start_pts);
let duration: gst::ClockTime = (((f64::from(audio_frame.no_samples)
/ f64::from(audio_frame.sample_rate))
* 1_000_000_000.0) as u64)
.into();
buffer.set_duration(duration);
buffer.set_offset(timestamp_data.offset);
timestamp_data.offset += audio_frame.no_samples as u64;
buffer.set_offset_end(timestamp_data.offset);
let mut dst: NDIlib_audio_frame_interleaved_16s_t = Default::default();
dst.reference_level = 0;
dst.p_data = buffer
.map_writable()
.unwrap()
.as_mut_slice_of::<i16>()
.unwrap()
.as_mut_ptr();
NDIlib_util_audio_to_interleaved_16s_v2(&audio_frame, &mut dst);
NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame);
Ok(buffer)
}
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
Ok(buffer)
ReceiverItem::Flushing => Err(gst::FlowError::Flushing),
ReceiverItem::Timeout => Err(gst::FlowError::Eos),
ReceiverItem::Error(err) => Err(err),
}
}
}

View file

@ -1,10 +1,17 @@
#![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)]
use std::ptr;
#[cfg_attr(all(target_arch = "x86_64", target_os = "windows"), link(name = "Processing.NDI.Lib.x64"))]
#[cfg_attr(all(target_arch = "x86", target_os = "windows"), link(name = "Processing.NDI.Lib.x86"))]
#[cfg_attr(not(any(target_os = "windows", target_os = "macos")), link(name = "ndi"))]
#[cfg_attr(
all(target_arch = "x86_64", target_os = "windows"),
link(name = "Processing.NDI.Lib.x64")
)]
#[cfg_attr(
all(target_arch = "x86", target_os = "windows"),
link(name = "Processing.NDI.Lib.x86")
)]
#[cfg_attr(
not(any(target_os = "windows", target_os = "macos")),
link(name = "ndi")
)]
extern "C" {
pub fn NDIlib_initialize() -> bool;
pub fn NDIlib_find_create_v2(
@ -21,7 +28,7 @@ extern "C" {
pub fn NDIlib_recv_create_v3(
p_create_settings: *const NDIlib_recv_create_v3_t,
) -> NDIlib_recv_instance_t;
pub fn NDIlib_find_destroy(p_instance: NDIlib_recv_instance_t);
pub fn NDIlib_find_destroy(p_instance: NDIlib_find_instance_t);
pub fn NDIlib_recv_destroy(p_instance: NDIlib_recv_instance_t);
pub fn NDIlib_destroy();
pub fn NDIlib_recv_set_tally(
@ -34,18 +41,26 @@ extern "C" {
) -> bool;
pub fn NDIlib_recv_capture_v2(
p_instance: NDIlib_recv_instance_t,
p_video_data: *const NDIlib_video_frame_v2_t,
p_audio_data: *const NDIlib_audio_frame_v2_t,
p_metadata: *const NDIlib_metadata_frame_t,
p_video_data: *mut NDIlib_video_frame_v2_t,
p_audio_data: *mut NDIlib_audio_frame_v2_t,
p_metadata: *mut NDIlib_metadata_frame_t,
timeout_in_ms: u32,
) -> NDIlib_frame_type_e;
pub fn NDIlib_recv_free_video_v2(
p_instance: NDIlib_recv_instance_t,
p_video_data: *const NDIlib_video_frame_v2_t,
p_video_data: *mut NDIlib_video_frame_v2_t,
);
pub fn NDIlib_recv_free_audio_v2(
p_instance: NDIlib_recv_instance_t,
p_audio_data: *const NDIlib_audio_frame_v2_t,
p_audio_data: *mut NDIlib_audio_frame_v2_t,
);
pub fn NDIlib_recv_free_metadata(
p_instance: NDIlib_recv_instance_t,
p_metadata: *mut NDIlib_metadata_frame_t,
);
pub fn NDIlib_recv_get_queue(
p_instance: NDIlib_recv_instance_t,
p_total: *mut NDIlib_recv_queue_t,
);
}
@ -59,16 +74,6 @@ pub struct NDIlib_find_create_t {
pub p_extra_ips: *const ::std::os::raw::c_char,
}
impl Default for NDIlib_find_create_t {
fn default() -> Self {
NDIlib_find_create_t {
show_local_sources: true,
p_groups: ptr::null(),
p_extra_ips: ptr::null(),
}
}
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct NDIlib_source_t {
@ -76,15 +81,6 @@ pub struct NDIlib_source_t {
pub p_ip_address: *const ::std::os::raw::c_char,
}
impl Default for NDIlib_source_t {
fn default() -> Self {
NDIlib_source_t {
p_ndi_name: ptr::null(),
p_ip_address: ptr::null(),
}
}
}
#[repr(i32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum NDIlib_frame_type_e {
@ -96,14 +92,12 @@ pub enum NDIlib_frame_type_e {
NDIlib_frame_type_status_change = 100,
}
#[repr(i32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum NDIlib_recv_bandwidth_e {
NDIlib_recv_bandwidth_metadata_only = -10,
NDIlib_recv_bandwidth_audio_only = 10,
NDIlib_recv_bandwidth_lowest = 0,
NDIlib_recv_bandwidth_highest = 100,
}
pub type NDIlib_recv_bandwidth_e = i32;
pub const NDIlib_recv_bandwidth_metadata_only: NDIlib_recv_bandwidth_e = -10;
pub const NDIlib_recv_bandwidth_audio_only: NDIlib_recv_bandwidth_e = 10;
pub const NDIlib_recv_bandwidth_lowest: NDIlib_recv_bandwidth_e = 0;
pub const NDIlib_recv_bandwidth_highest: NDIlib_recv_bandwidth_e = 100;
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
@ -118,12 +112,15 @@ pub enum NDIlib_recv_color_format_e {
#[repr(u32)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum NDIlib_FourCC_type_e {
NDIlib_FourCC_type_UYVY = 1_498_831_189,
NDIlib_FourCC_type_BGRA = 1_095_911_234,
NDIlib_FourCC_type_BGRX = 1_481_787_202,
NDIlib_FourCC_type_RGBA = 1_094_862_674,
NDIlib_FourCC_type_RGBX = 1_480_738_642,
NDIlib_FourCC_type_UYVA = 1_096_178_005,
NDIlib_FourCC_type_UYVY = 0x59_56_59_55,
NDIlib_FourCC_type_YV12 = 0x32_31_56_59,
NDIlib_FourCC_type_NV12 = 0x32_31_56_4e,
NDIlib_FourCC_type_I420 = 0x30_32_34_49,
NDIlib_FourCC_type_BGRA = 0x41_52_47_42,
NDIlib_FourCC_type_BGRX = 0x58_52_47_42,
NDIlib_FourCC_type_RGBA = 0x41_42_47_52,
NDIlib_FourCC_type_RGBX = 0x58_42_47_52,
NDIlib_FourCC_type_UYVA = 0x41_56_56_55,
}
#[repr(u32)]
@ -149,28 +146,8 @@ pub struct NDIlib_recv_create_v3_t {
pub p_ndi_name: *const ::std::os::raw::c_char,
}
impl Default for NDIlib_recv_create_v3_t {
fn default() -> Self {
NDIlib_recv_create_v3_t {
source_to_connect_to: Default::default(),
allow_video_fields: true,
bandwidth: NDIlib_recv_bandwidth_e::NDIlib_recv_bandwidth_highest,
color_format: NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA,
p_ndi_name: ptr::null(),
}
}
}
pub type NDIlib_recv_instance_t = *mut ::std::os::raw::c_void;
//Rust wrapper around *mut ::std::os::raw::c_void
pub struct NdiInstance {
pub recv: NDIlib_recv_instance_t,
// pub audio: bool,
}
unsafe impl ::std::marker::Send for NdiInstance {}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct NDIlib_tally_t {
@ -178,13 +155,12 @@ pub struct NDIlib_tally_t {
pub on_preview: bool,
}
impl Default for NDIlib_tally_t {
fn default() -> Self {
NDIlib_tally_t {
on_program: false,
on_preview: false,
}
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct NDIlib_recv_queue_t {
pub video_frames: i32,
pub audio_frames: i32,
pub metadata_frames: i32,
}
#[repr(C)]
@ -195,16 +171,6 @@ pub struct NDIlib_metadata_frame_t {
pub p_data: *const ::std::os::raw::c_char,
}
impl Default for NDIlib_metadata_frame_t {
fn default() -> Self {
NDIlib_metadata_frame_t {
length: 0,
timecode: 0, //NDIlib_send_timecode_synthesize,
p_data: ptr::null(),
}
}
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct NDIlib_video_frame_v2_t {
@ -222,25 +188,6 @@ pub struct NDIlib_video_frame_v2_t {
pub timestamp: i64,
}
impl Default for NDIlib_video_frame_v2_t {
fn default() -> Self {
NDIlib_video_frame_v2_t {
xres: 0,
yres: 0,
FourCC: NDIlib_FourCC_type_e::NDIlib_FourCC_type_UYVY,
frame_rate_N: 30000,
frame_rate_D: 1001,
picture_aspect_ratio: 0.0,
frame_format_type: NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive,
timecode: NDIlib_send_timecode_synthesize,
p_data: ptr::null(),
line_stride_in_bytes: 0,
p_metadata: ptr::null(),
timestamp: NDIlib_send_timecode_empty,
}
}
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct NDIlib_audio_frame_v2_t {
@ -254,21 +201,6 @@ pub struct NDIlib_audio_frame_v2_t {
pub timestamp: i64,
}
impl Default for NDIlib_audio_frame_v2_t {
fn default() -> Self {
NDIlib_audio_frame_v2_t {
sample_rate: 48000,
no_channels: 2,
no_samples: 0,
timecode: NDIlib_send_timecode_synthesize,
p_data: ptr::null(),
channel_stride_in_bytes: 0,
p_metadata: ptr::null(),
timestamp: NDIlib_send_timecode_empty,
}
}
}
extern "C" {
pub fn NDIlib_util_audio_to_interleaved_16s_v2(
p_src: *const NDIlib_audio_frame_v2_t,
@ -291,16 +223,3 @@ pub struct NDIlib_audio_frame_interleaved_16s_t {
pub reference_level: ::std::os::raw::c_int,
pub p_data: *mut ::std::os::raw::c_short,
}
impl Default for NDIlib_audio_frame_interleaved_16s_t {
fn default() -> Self {
NDIlib_audio_frame_interleaved_16s_t {
sample_rate: 48000,
no_channels: 2,
no_samples: 0,
timecode: NDIlib_send_timecode_synthesize,
reference_level: 0,
p_data: ptr::null_mut(),
}
}
}

View file

@ -1,5 +1,3 @@
#![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)]
use glib;
use glib::subclass;
use glib::subclass::prelude::*;
@ -10,69 +8,115 @@ use gst_base;
use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use gst::Fraction;
use gst_video;
use std::sync::Mutex;
use std::{i32, u32};
use std::{slice, ptr};
use ndisys;
use connect_ndi;
use ndi_struct;
use ndisys::*;
use stop_ndi;
use hashmap_receivers;
use Receiver;
use ReceiverControlHandle;
use ReceiverItem;
use TimestampMode;
use VideoReceiver;
use DEFAULT_RECEIVER_NDI_NAME;
#[derive(Debug, Clone)]
struct Settings {
stream_name: String,
ip: String,
loss_threshold: u32,
id_receiver: i8,
latency: Option<gst::ClockTime>,
ndi_name: Option<String>,
ip_address: Option<String>,
connect_timeout: u32,
timeout: u32,
receiver_ndi_name: String,
bandwidth: ndisys::NDIlib_recv_bandwidth_e,
timestamp_mode: TimestampMode,
}
impl Default for Settings {
fn default() -> Self {
Settings {
stream_name: String::from("Fixed ndi stream name"),
ip: String::from(""),
loss_threshold: 5,
id_receiver: 0,
latency: None,
ndi_name: None,
ip_address: None,
receiver_ndi_name: DEFAULT_RECEIVER_NDI_NAME.clone(),
connect_timeout: 10000,
timeout: 5000,
bandwidth: ndisys::NDIlib_recv_bandwidth_highest,
timestamp_mode: TimestampMode::ReceiveTime,
}
}
}
static PROPERTIES: [subclass::Property; 3] = [
subclass::Property("stream-name", |_| {
static PROPERTIES: [subclass::Property; 7] = [
subclass::Property("ndi-name", |name| {
glib::ParamSpec::string(
"stream-name",
"Stream Name",
"Name of the streaming device",
name,
"NDI Name",
"NDI stream name of the sender",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("ip", |_| {
subclass::Property("ip-address", |name| {
glib::ParamSpec::string(
"ip",
"Stream IP",
"IP of the streaming device. Ex: 127.0.0.1:5961",
name,
"IP Address",
"IP address and port of the sender, e.g. 127.0.0.1:5961",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("loss-threshold", |_| {
subclass::Property("receiver-ndi-name", |name| {
glib::ParamSpec::string(
name,
"Receiver NDI Name",
"NDI stream name of this receiver",
Some(&*DEFAULT_RECEIVER_NDI_NAME),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("connect-timeout", |name| {
glib::ParamSpec::uint(
"loss-threshold",
"Loss threshold",
"Loss threshold",
name,
"Connect Timeout",
"Connection timeout in ms",
0,
60,
5,
u32::MAX,
10000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timeout", |name| {
glib::ParamSpec::uint(
name,
"Timeout",
"Receive timeout in ms",
0,
u32::MAX,
5000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("bandwidth", |name| {
glib::ParamSpec::int(
name,
"Bandwidth",
"Bandwidth, -10 metadata-only, 10 audio-only, 100 highest",
-10,
100,
100,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timestamp-mode", |name| {
glib::ParamSpec::enum_(
name,
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTime as i32,
glib::ParamFlags::READWRITE,
)
}),
@ -80,23 +124,25 @@ static PROPERTIES: [subclass::Property; 3] = [
struct State {
info: Option<gst_video::VideoInfo>,
current_latency: gst::ClockTime,
receiver: Option<Receiver<VideoReceiver>>,
}
impl Default for State {
fn default() -> State {
State { info: None }
State {
info: None,
current_latency: gst::CLOCK_TIME_NONE,
receiver: None,
}
}
}
struct TimestampData {
offset: u64,
}
struct NdiVideoSrc {
pub(crate) struct NdiVideoSrc {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
timestamp_data: Mutex<TimestampData>,
receiver_controller: Mutex<Option<ReceiverControlHandle<VideoReceiver>>>,
}
impl ObjectSubclass for NdiVideoSrc {
@ -116,7 +162,7 @@ impl ObjectSubclass for NdiVideoSrc {
),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
timestamp_data: Mutex::new(TimestampData { offset: 0 }),
receiver_controller: Mutex::new(None),
}
}
@ -125,7 +171,7 @@ impl ObjectSubclass for NdiVideoSrc {
"NewTek NDI Video Source",
"Source",
"NewTek NDI video source",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
);
// On the src pad, we can produce F32/F64 with any sample rate
@ -136,10 +182,14 @@ impl ObjectSubclass for NdiVideoSrc {
(
"format",
&gst::List::new(&[
//TODO add all formats
&gst_video::VideoFormat::Uyvy.to_string(),
//&gst_video::VideoFormat::Rgb.to_string(),
//&gst_video::VideoFormat::Gray8.to_string(),
&gst_video::VideoFormat::Yv12.to_string(),
&gst_video::VideoFormat::Nv12.to_string(),
&gst_video::VideoFormat::I420.to_string(),
&gst_video::VideoFormat::Bgra.to_string(),
&gst_video::VideoFormat::Bgrx.to_string(),
&gst_video::VideoFormat::Rgba.to_string(),
&gst_video::VideoFormat::Rgbx.to_string(),
]),
),
("width", &gst::IntRange::<i32>::new(0, i32::MAX)),
@ -154,6 +204,23 @@ impl ObjectSubclass for NdiVideoSrc {
],
);
#[cfg(feature = "interlaced-fields")]
let caps = {
let mut tmp = caps.copy();
{
let tmp = tmp.get_mut().unwrap();
tmp.set_features_simple(Some(gst::CapsFeatures::new(&["format:Interlaced"])));
}
let mut caps = caps;
{
let caps = caps.get_mut().unwrap();
caps.append(tmp);
}
caps
};
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
@ -185,44 +252,94 @@ impl ObjectImpl for NdiVideoSrc {
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
match *prop {
subclass::Property("stream-name", ..) => {
subclass::Property("ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
let stream_name = value.get().unwrap();
let ndi_name = value.get();
gst_debug!(
self.cat,
obj: basesrc,
"Changing stream-name from {} to {}",
settings.stream_name,
stream_name
"Changing ndi-name from {:?} to {:?}",
settings.ndi_name,
ndi_name,
);
settings.stream_name = stream_name;
drop(settings);
settings.ndi_name = ndi_name;
}
subclass::Property("ip", ..) => {
subclass::Property("ip-address", ..) => {
let mut settings = self.settings.lock().unwrap();
let ip = value.get().unwrap();
let ip_address = value.get();
gst_debug!(
self.cat,
obj: basesrc,
"Changing ip from {} to {}",
settings.ip,
ip
"Changing ip from {:?} to {:?}",
settings.ip_address,
ip_address,
);
settings.ip = ip;
drop(settings);
settings.ip_address = ip_address;
}
subclass::Property("loss-threshold", ..) => {
subclass::Property("receiver-ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
let loss_threshold = value.get().unwrap();
let receiver_ndi_name = value.get();
gst_debug!(
self.cat,
obj: basesrc,
"Changing loss threshold from {} to {}",
settings.loss_threshold,
loss_threshold
"Changing receiver-ndi-name from {:?} to {:?}",
settings.receiver_ndi_name,
receiver_ndi_name,
);
settings.loss_threshold = loss_threshold;
drop(settings);
settings.receiver_ndi_name =
receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone());
}
subclass::Property("connect-timeout", ..) => {
let mut settings = self.settings.lock().unwrap();
let connect_timeout = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing connect-timeout from {} to {}",
settings.connect_timeout,
connect_timeout,
);
settings.connect_timeout = connect_timeout;
}
subclass::Property("timeout", ..) => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing timeout from {} to {}",
settings.timeout,
timeout,
);
settings.timeout = timeout;
}
subclass::Property("bandwidth", ..) => {
let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing bandwidth from {} to {}",
settings.bandwidth,
bandwidth,
);
settings.bandwidth = bandwidth;
}
subclass::Property("timestamp-mode", ..) => {
let mut settings = self.settings.lock().unwrap();
let timestamp_mode = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing timestamp mode from {:?} to {:?}",
settings.timestamp_mode,
timestamp_mode
);
if settings.timestamp_mode != timestamp_mode {
let _ = basesrc
.post_message(&gst::Message::new_latency().src(Some(basesrc)).build());
}
settings.timestamp_mode = timestamp_mode;
}
_ => unimplemented!(),
}
@ -232,17 +349,33 @@ impl ObjectImpl for NdiVideoSrc {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("stream-name", ..) => {
subclass::Property("ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.stream_name.to_value())
Ok(settings.ndi_name.to_value())
}
subclass::Property("ip", ..) => {
subclass::Property("ip-address", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.ip.to_value())
Ok(settings.ip_address.to_value())
}
subclass::Property("loss-threshold", ..) => {
subclass::Property("receiver-ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.loss_threshold.to_value())
Ok(settings.receiver_ndi_name.to_value())
}
subclass::Property("connect-timeout", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.connect_timeout.to_value())
}
subclass::Property("timeout", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.timeout.to_value())
}
subclass::Property("bandwidth", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.bandwidth.to_value())
}
subclass::Property("timestamp-mode", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.timestamp_mode.to_value())
}
_ => unimplemented!(),
}
@ -255,164 +388,148 @@ impl ElementImpl for NdiVideoSrc {
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if transition == gst::StateChange::PausedToPlaying {
let mut receivers = hashmap_receivers.lock().unwrap();
let settings = self.settings.lock().unwrap();
let receiver = receivers.get_mut(&settings.id_receiver).unwrap();
let recv = &receiver.ndi_instance;
let pNDI_recv = recv.recv;
let video_frame: NDIlib_video_frame_v2_t = Default::default();
unsafe {
while NDIlib_recv_capture_v2(
pNDI_recv,
&video_frame,
ptr::null(),
ptr::null(),
1000,
) != NDIlib_frame_type_e::NDIlib_frame_type_video
{}
match transition {
gst::StateChange::PausedToPlaying => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_playing(true);
}
}
gst_debug!(
self.cat,
obj: element,
"NDI video frame received: {:?}",
video_frame
);
if receiver.initial_timestamp <= video_frame.timestamp as u64
|| receiver.initial_timestamp == 0
{
receiver.initial_timestamp = video_frame.timestamp as u64;
gst::StateChange::PlayingToPaused => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_playing(false);
}
}
unsafe {
NDIlib_recv_free_video_v2(pNDI_recv, &video_frame);
gst::StateChange::PausedToReady => {
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.shutdown();
}
}
gst_debug!(
self.cat,
obj: element,
"Setting initial timestamp to {}",
receiver.initial_timestamp
);
_ => (),
}
self.parent_change_state(element, transition)
}
}
impl BaseSrcImpl for NdiVideoSrc {
fn set_caps(
fn unlock(&self, element: &gst_base::BaseSrc) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(true);
}
Ok(())
}
fn unlock_stop(
&self,
element: &gst_base::BaseSrc,
caps: &gst::Caps,
) -> Result<(), gst::LoggableError> {
let info = match gst_video::VideoInfo::from_caps(caps) {
None => {
return Err(gst_loggable_error!(
self.cat,
"Failed to build `VideoInfo` from caps {}",
caps
));
}
Some(info) => info,
};
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
let mut state = self.state.lock().unwrap();
state.info = Some(info);
let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());
) -> std::result::Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Stop unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(false);
}
Ok(())
}
fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default();
let mut settings = self.settings.lock().unwrap();
settings.id_receiver = connect_ndi(
let settings = self.settings.lock().unwrap().clone();
if settings.ip_address.is_none() && settings.ndi_name.is_none() {
return Err(gst_error_msg!(
gst::LibraryError::Settings,
["No IP address or NDI name given"]
));
}
let receiver = connect_ndi(
self.cat,
element,
&settings.ip.clone(),
&settings.stream_name.clone(),
settings.ip_address.as_ref().map(String::as_str),
settings.ndi_name.as_ref().map(String::as_str),
&settings.receiver_ndi_name,
settings.connect_timeout,
settings.bandwidth,
settings.timestamp_mode,
settings.timeout,
);
// settings.id_receiver != 0
match settings.id_receiver {
0 => Err(gst_error_msg!(
// settings.id_receiver exists
match receiver {
None => Err(gst_error_msg!(
gst::ResourceError::NotFound,
["Could not connect to this source"]
)),
_ => Ok(()),
Some(receiver) => {
*self.receiver_controller.lock().unwrap() =
Some(receiver.receiver_control_handle());
let mut state = self.state.lock().unwrap();
state.receiver = Some(receiver);
Ok(())
}
}
}
fn stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap();
stop_ndi(self.cat, element, settings.id_receiver);
// Commented because when adding ndi destroy stopped in this line
//*self.state.lock().unwrap() = Default::default();
fn stop(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() {
controller.shutdown();
}
*self.state.lock().unwrap() = State::default();
Ok(())
}
fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
if let QueryView::Scheduling(ref mut q) = query.view_mut() {
q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
q.add_scheduling_modes(&[gst::PadMode::Push]);
return true;
}
if let QueryView::Latency(ref mut q) = query.view_mut() {
let settings = &*self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
if let Some(ref _info) = state.info {
let latency = settings.latency.unwrap();
gst_debug!(self.cat, obj: element, "Returning latency {}", latency);
q.set(true, latency, gst::CLOCK_TIME_NONE);
return true;
} else {
return false;
match query.view_mut() {
QueryView::Scheduling(ref mut q) => {
q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
q.add_scheduling_modes(&[gst::PadMode::Push]);
true
}
QueryView::Latency(ref mut q) => {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
if state.current_latency.is_some() {
let min = if settings.timestamp_mode != TimestampMode::Timecode {
state.current_latency
} else {
0.into()
};
let max = 5 * state.current_latency;
gst_debug!(
self.cat,
obj: element,
"Returning latency min {} max {}",
min,
max
);
q.set(true, min, max);
true
} else {
false
}
}
_ => BaseSrcImplExt::parent_query(self, element, query),
}
BaseSrcImplExt::parent_query(self, element, query)
}
fn fixate(&self, element: &gst_base::BaseSrc, caps: gst::Caps) -> gst::Caps {
let receivers = hashmap_receivers.lock().unwrap();
let mut settings = self.settings.lock().unwrap();
let receiver = receivers.get(&settings.id_receiver).unwrap();
let recv = &receiver.ndi_instance;
let pNDI_recv = recv.recv;
let video_frame: NDIlib_video_frame_v2_t = Default::default();
unsafe {
while NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000)
!= NDIlib_frame_type_e::NDIlib_frame_type_video
{}
}
settings.latency = gst::SECOND.mul_div_floor(
video_frame.frame_rate_D as u64,
video_frame.frame_rate_N as u64,
);
let mut caps = gst::Caps::truncate(caps);
{
let caps = caps.make_mut();
let s = caps.get_mut_structure(0).unwrap();
s.fixate_field_nearest_int("width", video_frame.xres);
s.fixate_field_nearest_int("height", video_frame.yres);
s.fixate_field_nearest_fraction(
"framerate",
Fraction::new(video_frame.frame_rate_N, video_frame.frame_rate_D),
);
s.fixate_field_nearest_int("width", 1920);
s.fixate_field_nearest_int("height", 1080);
if s.has_field("pixel-aspect-ratio") {
s.fixate_field_nearest_fraction("pixel-aspect-ratio", gst::Fraction::new(1, 1));
}
}
unsafe {
NDIlib_recv_free_video_v2(pNDI_recv, &video_frame);
}
let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());
self.parent_fixate(element, caps)
}
@ -423,111 +540,52 @@ impl BaseSrcImpl for NdiVideoSrc {
_offset: u64,
_length: u32,
) -> Result<gst::Buffer, gst::FlowError> {
let _settings = &*self.settings.lock().unwrap();
let mut timestamp_data = self.timestamp_data.lock().unwrap();
let state = self.state.lock().unwrap();
let _info = match state.info {
None => {
gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]);
return Err(gst::FlowError::NotNegotiated);
}
Some(ref info) => info.clone(),
};
let receivers = hashmap_receivers.lock().unwrap();
let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance;
let pNDI_recv = recv.recv;
let pts: u64;
let video_frame: NDIlib_video_frame_v2_t = Default::default();
unsafe {
let time = receivers
.get(&_settings.id_receiver)
.unwrap()
.initial_timestamp;
let mut skip_frame = true;
let mut count_frame_none = 0;
while skip_frame {
let frame_type =
NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000);
if (frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none
&& _settings.loss_threshold != 0)
|| frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error
{
if count_frame_none < _settings.loss_threshold {
count_frame_none += 1;
continue;
}
gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none or error received, assuming that the source closed the stream...."]);
let recv = {
let mut state = self.state.lock().unwrap();
match state.receiver.take() {
Some(recv) => recv,
None => {
gst_error!(self.cat, obj: element, "Have no receiver");
return Err(gst::FlowError::Error);
} else if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none
&& _settings.loss_threshold == 0
{
gst_debug!(
self.cat,
obj: element,
"No video frame received, sending empty buffer"
);
let buffer = gst::Buffer::with_size(0).unwrap();
return Ok(buffer);
}
if time >= (video_frame.timestamp as u64) {
gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (video_frame.timestamp as u64), time);
} else {
skip_frame = false;
}
}
};
gst_log!(
self.cat,
obj: element,
"NDI video frame received: {:?}",
(video_frame)
);
match recv.capture() {
ReceiverItem::Buffer(buffer, info) => {
let mut state = self.state.lock().unwrap();
state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().ok_or_else(|| {
gst_element_error!(
element,
gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info]
);
gst::FlowError::NotNegotiated
})?;
state.info = Some(info.clone());
state.current_latency = buffer.get_duration();
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element.set_caps(&caps).map_err(|_| {
gst_element_error!(
element,
gst::CoreError::Negotiation,
["Failed to negotiate caps: {:?}", caps]
);
gst::FlowError::NotNegotiated
})?;
pts = video_frame.timestamp as u64 - time;
gst_log!(
self.cat,
obj: element,
"Calculated pts for video frame: {:?}",
(pts)
);
let buff_size = (video_frame.yres * video_frame.line_stride_in_bytes) as usize;
let mut buffer = gst::Buffer::with_size(buff_size).unwrap();
{
let data = slice::from_raw_parts(video_frame.p_data as *mut u8, buff_size);
// Newtek NDI yields times in 100ns intervals since the Unix Time
let pts: gst::ClockTime = (pts * 100).into();
let duration: gst::ClockTime = (((f64::from(video_frame.frame_rate_D)
/ f64::from(video_frame.frame_rate_N))
* 1_000_000_000.0) as u64)
.into();
let buffer = buffer.get_mut().unwrap();
if ndi_struct.start_pts == gst::ClockTime(Some(0)) {
ndi_struct.start_pts =
element.get_clock().unwrap().get_time() - element.get_base_time();
let _ = element
.post_message(&gst::Message::new_latency().src(Some(element)).build());
}
buffer.set_pts(pts + ndi_struct.start_pts);
buffer.set_duration(duration);
buffer.set_offset(timestamp_data.offset);
timestamp_data.offset += 1;
buffer.set_offset_end(timestamp_data.offset);
buffer.copy_from_slice(0, data).unwrap();
Ok(buffer)
}
gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer);
NDIlib_recv_free_video_v2(pNDI_recv, &video_frame);
Ok(buffer)
ReceiverItem::Timeout => Err(gst::FlowError::Eos),
ReceiverItem::Flushing => Err(gst::FlowError::Flushing),
ReceiverItem::Error(err) => Err(err),
}
}
}

1769
src/receiver.rs Normal file

File diff suppressed because it is too large Load diff