Merge pull request #55 from sdroege/sink

Add initial version of NDI sink
This commit is contained in:
Pablo Nieto 2021-03-15 13:01:43 +01:00 committed by GitHub
commit 50548c8e6a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 1503 additions and 42 deletions

View file

@ -20,9 +20,10 @@ once_cell = "1.0"
gst-plugin-version-helper = "0.2"
[features]
default = ["interlaced-fields", "reference-timestamps"]
default = ["interlaced-fields", "reference-timestamps", "sink"]
interlaced-fields = ["gst/v1_16", "gst-video/v1_16"]
reference-timestamps = ["gst/v1_14"]
sink = ["gst/v1_18", "gst-base/v1_18"]
[lib]
name = "gstndi"

View file

@ -3,6 +3,12 @@ use glib::prelude::*;
mod device_provider;
pub mod ndi;
mod ndiaudiosrc;
#[cfg(feature = "sink")]
mod ndisink;
#[cfg(feature = "sink")]
mod ndisinkcombiner;
#[cfg(feature = "sink")]
pub mod ndisinkmeta;
pub mod ndisys;
mod ndivideosrc;
pub mod receiver;
@ -20,12 +26,14 @@ use once_cell::sync::Lazy;
#[repr(u32)]
#[genum(type_name = "GstNdiTimestampMode")]
pub enum TimestampMode {
#[genum(name = "Receive Time", nick = "receive-time")]
ReceiveTime = 0,
#[genum(name = "Receive Time / Timecode", nick = "receive-time-vs-timecode")]
ReceiveTimeTimecode = 0,
#[genum(name = "Receive Time / Timestamp", nick = "receive-time-vs-timestamp")]
ReceiveTimeTimestamp = 1,
#[genum(name = "NDI Timecode", nick = "timecode")]
Timecode = 1,
Timecode = 2,
#[genum(name = "NDI Timestamp", nick = "timestamp")]
Timestamp = 2,
Timestamp = 3,
}
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
@ -33,9 +41,15 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
return Err(glib::glib_bool_error!("Cannot initialize NDI"));
}
device_provider::register(plugin)?;
ndivideosrc::register(plugin)?;
ndiaudiosrc::register(plugin)?;
device_provider::register(plugin)?;
#[cfg(feature = "sink")]
{
ndisinkcombiner::register(plugin)?;
ndisink::register(plugin)?;
}
Ok(())
}

View file

@ -5,6 +5,8 @@ use std::mem;
use std::ptr;
use std::sync::{Arc, Mutex};
use byte_slice_cast::*;
pub fn initialize() -> bool {
unsafe { NDIlib_initialize() }
}
@ -339,11 +341,17 @@ impl RecvInstance {
match res {
NDIlib_frame_type_e::NDIlib_frame_type_audio => {
assert!(audio);
Ok(Some(Frame::Audio(AudioFrame::Borrowed(audio_frame, self))))
Ok(Some(Frame::Audio(AudioFrame::BorrowedRecv(
audio_frame,
self,
))))
}
NDIlib_frame_type_e::NDIlib_frame_type_video => {
assert!(video);
Ok(Some(Frame::Video(VideoFrame::Borrowed(video_frame, self))))
Ok(Some(Frame::Video(VideoFrame::BorrowedRecv(
video_frame,
self,
))))
}
NDIlib_frame_type_e::NDIlib_frame_type_metadata => {
assert!(metadata);
@ -365,6 +373,80 @@ impl Drop for RecvInstanceInner {
}
}
#[derive(Debug)]
pub struct SendBuilder<'a> {
ndi_name: &'a str,
clock_audio: bool,
clock_video: bool,
}
impl<'a> SendBuilder<'a> {
pub fn clock_audio(self) -> Self {
Self {
clock_audio: true,
..self
}
}
pub fn clock_video(self) -> Self {
Self {
clock_video: true,
..self
}
}
pub fn build(self) -> Option<SendInstance> {
unsafe {
let ndi_name = ffi::CString::new(self.ndi_name).unwrap();
let ptr = NDIlib_send_create(&NDIlib_send_create_t {
p_ndi_name: ndi_name.as_ptr(),
clock_video: self.clock_video,
clock_audio: self.clock_audio,
p_groups: ptr::null(),
});
if ptr.is_null() {
None
} else {
Some(SendInstance(ptr::NonNull::new_unchecked(ptr)))
}
}
}
}
#[derive(Debug)]
pub struct SendInstance(ptr::NonNull<::std::os::raw::c_void>);
unsafe impl Send for SendInstance {}
impl SendInstance {
pub fn builder<'a>(ndi_name: &'a str) -> SendBuilder<'a> {
SendBuilder {
ndi_name,
clock_video: false,
clock_audio: false,
}
}
pub fn send_video(&mut self, frame: &VideoFrame) {
unsafe {
NDIlib_send_send_video_v2(self.0.as_ptr(), frame.as_ptr());
}
}
pub fn send_audio(&mut self, frame: &AudioFrame) {
unsafe {
NDIlib_send_send_audio_v2(self.0.as_ptr(), frame.as_ptr());
}
}
}
impl Drop for SendInstance {
fn drop(&mut self) {
unsafe { NDIlib_send_destroy(self.0.as_ptr() as *mut _) }
}
}
#[derive(Debug)]
pub struct Tally(NDIlib_tally_t);
unsafe impl Send for Tally {}
@ -405,49 +487,67 @@ pub enum Frame<'a> {
#[derive(Debug)]
pub enum VideoFrame<'a> {
//Owned(NDIlib_video_frame_v2_t, Option<ffi::CString>, Option<Vec<u8>>),
Borrowed(NDIlib_video_frame_v2_t, &'a RecvInstance),
BorrowedRecv(NDIlib_video_frame_v2_t, &'a RecvInstance),
BorrowedGst(
NDIlib_video_frame_v2_t,
&'a gst_video::VideoFrameRef<&'a gst::BufferRef>,
),
}
impl<'a> VideoFrame<'a> {
pub fn xres(&self) -> i32 {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.xres,
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => {
frame.xres
}
}
}
pub fn yres(&self) -> i32 {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.yres,
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => {
frame.yres
}
}
}
pub fn fourcc(&self) -> NDIlib_FourCC_video_type_e {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.FourCC,
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => {
frame.FourCC
}
}
}
pub fn frame_rate(&self) -> (i32, i32) {
match self {
VideoFrame::Borrowed(ref frame, _) => (frame.frame_rate_N, frame.frame_rate_D),
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => {
(frame.frame_rate_N, frame.frame_rate_D)
}
}
}
pub fn picture_aspect_ratio(&self) -> f32 {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.picture_aspect_ratio,
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => {
frame.picture_aspect_ratio
}
}
}
pub fn frame_format_type(&self) -> NDIlib_frame_format_type_e {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.frame_format_type,
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => {
frame.frame_format_type
}
}
}
pub fn timecode(&self) -> i64 {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.timecode,
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => {
frame.timecode
}
}
}
@ -467,7 +567,7 @@ impl<'a> VideoFrame<'a> {
unsafe {
use std::slice;
match self {
VideoFrame::Borrowed(ref frame, _) => {
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => {
slice::from_raw_parts(frame.p_data as *const u8, frame_size as usize)
}
}
@ -476,7 +576,7 @@ impl<'a> VideoFrame<'a> {
pub fn line_stride_or_data_size_in_bytes(&self) -> i32 {
match self {
VideoFrame::Borrowed(ref frame, _) => {
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => {
let stride = frame.line_stride_or_data_size_in_bytes;
if stride != 0 {
@ -506,7 +606,7 @@ impl<'a> VideoFrame<'a> {
pub fn metadata(&self) -> Option<&str> {
unsafe {
match self {
VideoFrame::Borrowed(ref frame, _) => {
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => {
if frame.p_metadata.is_null() {
None
} else {
@ -519,21 +619,137 @@ impl<'a> VideoFrame<'a> {
pub fn timestamp(&self) -> i64 {
match self {
VideoFrame::Borrowed(ref frame, _) => frame.timestamp,
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => {
frame.timestamp
}
}
}
pub fn as_ptr(&self) -> *const NDIlib_video_frame_v2_t {
match self {
VideoFrame::Borrowed(ref frame, _) => frame,
VideoFrame::BorrowedRecv(ref frame, _) | VideoFrame::BorrowedGst(ref frame, _) => frame,
}
}
pub fn try_from_video_frame(
frame: &'a gst_video::VideoFrameRef<&'a gst::BufferRef>,
timecode: i64,
) -> Result<Self, ()> {
// Planar formats must be in contiguous memory
let format = match frame.format() {
gst_video::VideoFormat::Uyvy => ndisys::NDIlib_FourCC_video_type_UYVY,
gst_video::VideoFormat::I420 => {
if (frame.plane_data(1).unwrap().as_ptr() as usize)
.checked_sub(frame.plane_data(0).unwrap().as_ptr() as usize)
!= Some(frame.height() as usize * frame.plane_stride()[0] as usize)
{
return Err(());
}
if (frame.plane_data(2).unwrap().as_ptr() as usize)
.checked_sub(frame.plane_data(1).unwrap().as_ptr() as usize)
!= Some((frame.height() as usize + 1) / 2 * frame.plane_stride()[1] as usize)
{
return Err(());
}
ndisys::NDIlib_FourCC_video_type_I420
}
gst_video::VideoFormat::Nv12 => {
if (frame.plane_data(1).unwrap().as_ptr() as usize)
.checked_sub(frame.plane_data(0).unwrap().as_ptr() as usize)
!= Some(frame.height() as usize * frame.plane_stride()[0] as usize)
{
return Err(());
}
ndisys::NDIlib_FourCC_video_type_NV12
}
gst_video::VideoFormat::Nv21 => {
if (frame.plane_data(1).unwrap().as_ptr() as usize)
.checked_sub(frame.plane_data(0).unwrap().as_ptr() as usize)
!= Some(frame.height() as usize * frame.plane_stride()[0] as usize)
{
return Err(());
}
ndisys::NDIlib_FourCC_video_type_NV12
}
gst_video::VideoFormat::Yv12 => {
if (frame.plane_data(1).unwrap().as_ptr() as usize)
.checked_sub(frame.plane_data(0).unwrap().as_ptr() as usize)
!= Some(frame.height() as usize * frame.plane_stride()[0] as usize)
{
return Err(());
}
if (frame.plane_data(2).unwrap().as_ptr() as usize)
.checked_sub(frame.plane_data(1).unwrap().as_ptr() as usize)
!= Some((frame.height() as usize + 1) / 2 * frame.plane_stride()[1] as usize)
{
return Err(());
}
ndisys::NDIlib_FourCC_video_type_YV12
}
gst_video::VideoFormat::Bgra => ndisys::NDIlib_FourCC_video_type_BGRA,
gst_video::VideoFormat::Bgrx => ndisys::NDIlib_FourCC_video_type_BGRX,
gst_video::VideoFormat::Rgba => ndisys::NDIlib_FourCC_video_type_RGBA,
gst_video::VideoFormat::Rgbx => ndisys::NDIlib_FourCC_video_type_RGBX,
_ => return Err(()),
};
let frame_format_type = match frame.info().interlace_mode() {
gst_video::VideoInterlaceMode::Progressive => {
NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive
}
gst_video::VideoInterlaceMode::Interleaved => {
NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
}
// FIXME: Is this correct?
#[cfg(feature = "interlaced-fields")]
gst_video::VideoInterlaceMode::Alternate
if frame.flags().contains(gst_video::VideoFrameFlags::TFF) =>
{
NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_0
}
#[cfg(feature = "interlaced-fields")]
gst_video::VideoInterlaceMode::Alternate
if !frame.flags().contains(gst_video::VideoFrameFlags::TFF) =>
{
NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_1
}
_ => return Err(()),
};
let picture_aspect_ratio =
frame.info().par() * gst::Fraction::new(frame.width() as i32, frame.height() as i32);
let picture_aspect_ratio =
*picture_aspect_ratio.numer() as f32 / *picture_aspect_ratio.denom() as f32;
let ndi_frame = NDIlib_video_frame_v2_t {
xres: frame.width() as i32,
yres: frame.height() as i32,
FourCC: format,
frame_rate_N: *frame.info().fps().numer(),
frame_rate_D: *frame.info().fps().denom(),
picture_aspect_ratio,
frame_format_type,
timecode,
p_data: frame.plane_data(0).unwrap().as_ptr() as *const i8,
line_stride_or_data_size_in_bytes: frame.plane_stride()[0],
p_metadata: ptr::null(),
timestamp: 0,
};
Ok(VideoFrame::BorrowedGst(ndi_frame, frame))
}
}
impl<'a> Drop for VideoFrame<'a> {
#[allow(irrefutable_let_patterns)]
fn drop(&mut self) {
if let VideoFrame::Borrowed(ref mut frame, ref recv) = *self {
if let VideoFrame::BorrowedRecv(ref mut frame, ref recv) = *self {
unsafe {
NDIlib_recv_free_video_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
}
@ -543,32 +759,44 @@ impl<'a> Drop for VideoFrame<'a> {
#[derive(Debug)]
pub enum AudioFrame<'a> {
//Owned(NDIlib_audio_frame_v2_t, Option<ffi::CString>, Option<Vec<u8>>),
Borrowed(NDIlib_audio_frame_v2_t, &'a RecvInstance),
Owned(
NDIlib_audio_frame_v2_t,
Option<ffi::CString>,
Option<Vec<f32>>,
),
BorrowedRecv(NDIlib_audio_frame_v2_t, &'a RecvInstance),
}
impl<'a> AudioFrame<'a> {
pub fn sample_rate(&self) -> i32 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.sample_rate,
AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => {
frame.sample_rate
}
}
}
pub fn no_channels(&self) -> i32 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.no_channels,
AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => {
frame.no_channels
}
}
}
pub fn no_samples(&self) -> i32 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.no_samples,
AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => {
frame.no_samples
}
}
}
pub fn timecode(&self) -> i64 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.timecode,
AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => {
frame.timecode
}
}
}
@ -576,24 +804,28 @@ impl<'a> AudioFrame<'a> {
unsafe {
use std::slice;
match self {
AudioFrame::Borrowed(ref frame, _) => slice::from_raw_parts(
AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => {
slice::from_raw_parts(
frame.p_data as *const u8,
(frame.no_samples * frame.channel_stride_in_bytes) as usize,
),
)
}
}
}
}
pub fn channel_stride_in_bytes(&self) -> i32 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.channel_stride_in_bytes,
AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => {
frame.channel_stride_in_bytes
}
}
}
pub fn metadata(&self) -> Option<&str> {
unsafe {
match self {
AudioFrame::Borrowed(ref frame, _) => {
AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => {
if frame.p_metadata.is_null() {
None
} else {
@ -606,13 +838,15 @@ impl<'a> AudioFrame<'a> {
pub fn timestamp(&self) -> i64 {
match self {
AudioFrame::Borrowed(ref frame, _) => frame.timestamp,
AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => {
frame.timestamp
}
}
}
pub fn as_ptr(&self) -> *const NDIlib_audio_frame_v2_t {
match self {
AudioFrame::Borrowed(ref frame, _) => frame,
AudioFrame::BorrowedRecv(ref frame, _) | AudioFrame::Owned(ref frame, _, _) => frame,
}
}
@ -635,12 +869,56 @@ impl<'a> AudioFrame<'a> {
NDIlib_util_audio_to_interleaved_16s_v2(self.as_ptr(), &mut dst);
}
}
pub fn try_from_interleaved_16s(
info: &gst_audio::AudioInfo,
buffer: &gst::BufferRef,
timecode: i64,
) -> Result<Self, ()> {
if info.format() != gst_audio::AUDIO_FORMAT_S16 {
return Err(());
}
let map = buffer.map_readable().map_err(|_| ())?;
let src_data = map.as_slice_of::<i16>().map_err(|_| ())?;
let src = NDIlib_audio_frame_interleaved_16s_t {
sample_rate: info.rate() as i32,
no_channels: info.channels() as i32,
no_samples: src_data.len() as i32 / info.channels() as i32,
timecode,
reference_level: 0,
p_data: src_data.as_ptr() as *mut i16,
};
let channel_stride_in_bytes = src.no_samples * mem::size_of::<f32>() as i32;
let mut dest_data =
Vec::with_capacity(channel_stride_in_bytes as usize * info.channels() as usize);
let mut dest = NDIlib_audio_frame_v2_t {
sample_rate: src.sample_rate,
no_channels: src.no_channels,
no_samples: src.no_samples,
timecode: src.timecode,
p_data: dest_data.as_mut_ptr(),
channel_stride_in_bytes,
p_metadata: ptr::null(),
timestamp: 0,
};
unsafe {
NDIlib_util_audio_from_interleaved_16s_v2(&src, &mut dest);
dest_data.set_len(dest_data.capacity());
}
Ok(AudioFrame::Owned(dest, None, Some(dest_data)))
}
}
impl<'a> Drop for AudioFrame<'a> {
#[allow(irrefutable_let_patterns)]
fn drop(&mut self) {
if let AudioFrame::Borrowed(ref mut frame, ref recv) = *self {
if let AudioFrame::BorrowedRecv(ref mut frame, ref recv) = *self {
unsafe {
NDIlib_recv_free_audio_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
}

View file

@ -39,7 +39,7 @@ impl Default for Settings {
connect_timeout: 10000,
timeout: 5000,
bandwidth: ndisys::NDIlib_recv_bandwidth_highest,
timestamp_mode: TimestampMode::ReceiveTime,
timestamp_mode: TimestampMode::ReceiveTimeTimecode,
}
}
}
@ -111,7 +111,7 @@ static PROPERTIES: [subclass::Property; 7] = [
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTime as i32,
TimestampMode::ReceiveTimeTimecode as i32,
glib::ParamFlags::READWRITE,
)
}),

358
src/ndisink.rs Normal file
View file

@ -0,0 +1,358 @@
use glib::subclass;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_error_msg, gst_info, gst_loggable_error, gst_trace};
use gst_base::{subclass::prelude::*, BaseSinkExtManual};
use std::sync::Mutex;
use once_cell::sync::Lazy;
use super::ndi::SendInstance;
static DEFAULT_SENDER_NDI_NAME: Lazy<String> = Lazy::new(|| {
format!(
"GStreamer NDI Sink {}-{}",
env!("CARGO_PKG_VERSION"),
env!("COMMIT_ID")
)
});
#[derive(Debug)]
struct Settings {
ndi_name: String,
}
impl Default for Settings {
fn default() -> Self {
Settings {
ndi_name: DEFAULT_SENDER_NDI_NAME.clone(),
}
}
}
static PROPERTIES: [subclass::Property; 1] = [subclass::Property("ndi-name", |name| {
glib::ParamSpec::string(
name,
"NDI Name",
"NDI Name to use",
Some(DEFAULT_SENDER_NDI_NAME.as_ref()),
glib::ParamFlags::READWRITE,
)
})];
struct State {
send: SendInstance,
video_info: Option<gst_video::VideoInfo>,
audio_info: Option<gst_audio::AudioInfo>,
}
pub struct NdiSink {
settings: Mutex<Settings>,
state: Mutex<Option<State>>,
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new("ndisink", gst::DebugColorFlags::empty(), Some("NDI Sink"))
});
impl ObjectSubclass for NdiSink {
const NAME: &'static str = "NdiSink";
type ParentType = gst_base::BaseSink;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn new() -> Self {
Self {
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"NDI Sink",
"Sink/Audio/Video",
"Render as an NDI stream",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::builder_full()
.structure(
gst::Structure::builder("video/x-raw")
.field(
"format",
&gst::List::new(&[
&gst_video::VideoFormat::Uyvy.to_str(),
&gst_video::VideoFormat::I420.to_str(),
&gst_video::VideoFormat::Nv12.to_str(),
&gst_video::VideoFormat::Nv21.to_str(),
&gst_video::VideoFormat::Yv12.to_str(),
&gst_video::VideoFormat::Bgra.to_str(),
&gst_video::VideoFormat::Bgrx.to_str(),
&gst_video::VideoFormat::Rgba.to_str(),
&gst_video::VideoFormat::Rgbx.to_str(),
]),
)
.field("width", &gst::IntRange::<i32>::new(1, std::i32::MAX))
.field("height", &gst::IntRange::<i32>::new(1, std::i32::MAX))
.field(
"framerate",
&gst::FractionRange::new(
gst::Fraction::new(0, 1),
gst::Fraction::new(std::i32::MAX, 1),
),
)
.build(),
)
.structure(
gst::Structure::builder("audio/x-raw")
.field("format", &gst_audio::AUDIO_FORMAT_S16.to_str())
.field("rate", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("channels", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("layout", &"interleaved")
.build(),
)
.build();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();
klass.add_pad_template(sink_pad_template);
klass.install_properties(&PROPERTIES);
}
}
impl ObjectImpl for NdiSink {
glib::glib_object_impl!();
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.ndi_name = value
.get::<String>()
.unwrap()
.unwrap_or_else(|| DEFAULT_SENDER_NDI_NAME.clone());
}
_ => unimplemented!(),
};
}
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.ndi_name.to_value())
}
_ => unimplemented!(),
}
}
}
impl ElementImpl for NdiSink {}
impl BaseSinkImpl for NdiSink {
fn start(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
let send = SendInstance::builder(&settings.ndi_name)
.build()
.ok_or_else(|| {
gst_error_msg!(
gst::ResourceError::OpenWrite,
["Could not create send instance"]
)
})?;
let state = State {
send,
video_info: None,
audio_info: None,
};
*state_storage = Some(state);
gst_info!(CAT, obj: element, "Started");
Ok(())
}
fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap();
*state_storage = None;
gst_info!(CAT, obj: element, "Stopped");
Ok(())
}
fn unlock(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
Ok(())
}
fn unlock_stop(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
Ok(())
}
fn set_caps(
&self,
element: &gst_base::BaseSink,
caps: &gst::Caps,
) -> Result<(), gst::LoggableError> {
gst_debug!(CAT, obj: element, "Setting caps {}", caps);
let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage {
None => return Err(gst_loggable_error!(CAT, "Sink not started yet")),
Some(ref mut state) => state,
};
let s = caps.get_structure(0).unwrap();
if s.get_name() == "video/x-raw" {
let info = gst_video::VideoInfo::from_caps(caps)
.map_err(|_| gst_loggable_error!(CAT, "Couldn't parse caps {}", caps))?;
state.video_info = Some(info);
state.audio_info = None;
} else {
let info = gst_audio::AudioInfo::from_caps(caps)
.map_err(|_| gst_loggable_error!(CAT, "Couldn't parse caps {}", caps))?;
state.audio_info = Some(info);
state.video_info = None;
}
Ok(())
}
fn render(
&self,
element: &gst_base::BaseSink,
buffer: &gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage {
None => return Err(gst::FlowError::Error),
Some(ref mut state) => state,
};
if let Some(ref info) = state.video_info {
if let Some(audio_meta) = buffer.get_meta::<crate::ndisinkmeta::NdiSinkAudioMeta>() {
for (buffer, info, timecode) in audio_meta.buffers() {
let frame =
crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, *timecode)
.map_err(|_| {
gst_error!(CAT, obj: element, "Unsupported audio frame");
gst::FlowError::NotNegotiated
})?;
gst_trace!(
CAT,
obj: element,
"Sending audio buffer {:?} with timecode {} and format {:?}",
buffer,
if *timecode < 0 {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime::from(*timecode as u64 * 100)
},
info,
);
state.send.send_audio(&frame);
}
}
// Skip empty/gap buffers from ndisinkcombiner
if buffer.get_size() != 0 {
let timecode = element
.get_segment()
.downcast::<gst::ClockTime>()
.ok()
.and_then(|segment| {
*(segment.to_running_time(buffer.get_pts()) + element.get_base_time())
})
.map(|time| (time / 100) as i64)
.unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize);
let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, info)
.map_err(|_| {
gst_error!(CAT, obj: element, "Failed to map buffer");
gst::FlowError::Error
})?;
let frame = crate::ndi::VideoFrame::try_from_video_frame(&frame, timecode)
.map_err(|_| {
gst_error!(CAT, obj: element, "Unsupported video frame");
gst::FlowError::NotNegotiated
})?;
gst_trace!(
CAT,
obj: element,
"Sending video buffer {:?} with timecode {} and format {:?}",
buffer,
if timecode < 0 {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime::from(timecode as u64 * 100)
},
info
);
state.send.send_video(&frame);
}
} else if let Some(ref info) = state.audio_info {
let timecode = element
.get_segment()
.downcast::<gst::ClockTime>()
.ok()
.and_then(|segment| {
*(segment.to_running_time(buffer.get_pts()) + element.get_base_time())
})
.map(|time| (time / 100) as i64)
.unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize);
let frame = crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, timecode)
.map_err(|_| {
gst_error!(CAT, obj: element, "Unsupported audio frame");
gst::FlowError::NotNegotiated
})?;
gst_trace!(
CAT,
obj: element,
"Sending audio buffer {:?} with timecode {} and format {:?}",
buffer,
if timecode < 0 {
gst::CLOCK_TIME_NONE
} else {
gst::ClockTime::from(timecode as u64 * 100)
},
info,
);
state.send.send_audio(&frame);
} else {
return Err(gst::FlowError::Error);
}
Ok(gst::FlowSuccess::Ok)
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisink",
gst::Rank::None,
NdiSink::get_type(),
)
}

637
src/ndisinkcombiner.rs Normal file
View file

@ -0,0 +1,637 @@
use glib::prelude::*;
use glib::subclass;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_trace, gst_warning};
use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use std::mem;
use std::sync::Mutex;
static CAT: once_cell::sync::Lazy<gst::DebugCategory> = once_cell::sync::Lazy::new(|| {
gst::DebugCategory::new(
"ndisinkcombiner",
gst::DebugColorFlags::empty(),
Some("NDI sink audio/video combiner"),
)
});
struct State {
// Note that this applies to the currently pending buffer on the pad and *not*
// to the current_video_buffer below!
video_info: Option<gst_video::VideoInfo>,
audio_info: Option<gst_audio::AudioInfo>,
current_video_buffer: Option<(gst::Buffer, gst::ClockTime)>,
current_audio_buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>,
}
struct NdiSinkCombiner {
video_pad: gst_base::AggregatorPad,
audio_pad: Mutex<Option<gst_base::AggregatorPad>>,
state: Mutex<Option<State>>,
}
impl ObjectSubclass for NdiSinkCombiner {
const NAME: &'static str = "NdiSinkCombiner";
type ParentType = gst_base::Aggregator;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"NDI Sink Combiner",
"Combiner/Audio/Video",
"NDI sink audio/video combiner",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::builder("video/x-raw")
.field(
"format",
&gst::List::new(&[
&gst_video::VideoFormat::Uyvy.to_str(),
&gst_video::VideoFormat::I420.to_str(),
&gst_video::VideoFormat::Nv12.to_str(),
&gst_video::VideoFormat::Nv21.to_str(),
&gst_video::VideoFormat::Yv12.to_str(),
&gst_video::VideoFormat::Bgra.to_str(),
&gst_video::VideoFormat::Bgrx.to_str(),
&gst_video::VideoFormat::Rgba.to_str(),
&gst_video::VideoFormat::Rgbx.to_str(),
]),
)
.field("width", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("height", &gst::IntRange::<i32>::new(1, i32::MAX))
.field(
"framerate",
&gst::FractionRange::new(
gst::Fraction::new(1, i32::MAX),
gst::Fraction::new(i32::MAX, 1),
),
)
.build();
let src_pad_template = gst::PadTemplate::with_gtype(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
gst_base::AggregatorPad::static_type(),
)
.unwrap();
klass.add_pad_template(src_pad_template);
let sink_pad_template = gst::PadTemplate::with_gtype(
"video",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
gst_base::AggregatorPad::static_type(),
)
.unwrap();
klass.add_pad_template(sink_pad_template);
let caps = gst::Caps::builder("audio/x-raw")
.field("format", &gst_audio::AUDIO_FORMAT_S16.to_str())
.field("rate", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("channels", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("layout", &"interleaved")
.build();
let sink_pad_template = gst::PadTemplate::with_gtype(
"audio",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&caps,
gst_base::AggregatorPad::static_type(),
)
.unwrap();
klass.add_pad_template(sink_pad_template);
}
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("video").unwrap();
let video_pad =
gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("video"))
.build();
Self {
video_pad,
audio_pad: Mutex::new(None),
state: Mutex::new(None),
}
}
}
impl ObjectImpl for NdiSinkCombiner {
glib::glib_object_impl!();
fn constructed(&self, obj: &glib::Object) {
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.video_pad).unwrap();
self.parent_constructed(obj);
}
}
impl ElementImpl for NdiSinkCombiner {
fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) {
let mut audio_pad_storage = self.audio_pad.lock().unwrap();
if audio_pad_storage.as_ref().map(|p| p.upcast_ref()) == Some(pad) {
gst_debug!(CAT, obj: element, "Release audio pad");
self.parent_release_pad(element, pad);
*audio_pad_storage = None;
}
}
}
impl AggregatorImpl for NdiSinkCombiner {
fn create_new_pad(
&self,
agg: &gst_base::Aggregator,
templ: &gst::PadTemplate,
_req_name: Option<&str>,
_caps: Option<&gst::Caps>,
) -> Option<gst_base::AggregatorPad> {
let mut audio_pad_storage = self.audio_pad.lock().unwrap();
if audio_pad_storage.is_some() {
gst_error!(CAT, obj: agg, "Audio pad already requested");
return None;
}
let sink_templ = agg.get_pad_template("audio").unwrap();
if templ != &sink_templ {
gst_error!(CAT, obj: agg, "Wrong pad template");
return None;
}
let pad =
gst::PadBuilder::<gst_base::AggregatorPad>::from_template(templ, Some("audio")).build();
*audio_pad_storage = Some(pad.clone());
gst_debug!(CAT, obj: agg, "Requested audio pad");
Some(pad)
}
fn start(&self, agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap();
*state_storage = Some(State {
audio_info: None,
video_info: None,
current_video_buffer: None,
current_audio_buffers: Vec::new(),
});
gst_debug!(CAT, obj: agg, "Started");
Ok(())
}
fn stop(&self, agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> {
// Drop our state now
let _ = self.state.lock().unwrap().take();
gst_debug!(CAT, obj: agg, "Stopped");
Ok(())
}
fn get_next_time(&self, _agg: &gst_base::Aggregator) -> gst::ClockTime {
// FIXME: What to do here? We don't really know when the next buffer is expected
gst::CLOCK_TIME_NONE
}
fn clip(
&self,
agg: &gst_base::Aggregator,
agg_pad: &gst_base::AggregatorPad,
mut buffer: gst::Buffer,
) -> Option<gst::Buffer> {
let segment = match agg_pad.get_segment().downcast::<gst::ClockTime>() {
Ok(segment) => segment,
Err(_) => {
gst_error!(CAT, obj: agg, "Only TIME segments supported");
return Some(buffer);
}
};
let pts = buffer.get_pts();
if pts.is_none() {
gst_error!(CAT, obj: agg, "Only buffers with PTS supported");
return Some(buffer);
}
let duration = if buffer.get_duration().is_some() {
buffer.get_duration()
} else {
gst::CLOCK_TIME_NONE
};
gst_trace!(
CAT,
obj: agg_pad,
"Clipping buffer {:?} with PTS {} and duration {}",
buffer,
pts,
duration
);
let state_storage = self.state.lock().unwrap();
let state = match &*state_storage {
Some(ref state) => state,
None => return None,
};
let duration = if buffer.get_duration().is_some() {
buffer.get_duration()
} else if let Some(ref audio_info) = state.audio_info {
gst::SECOND
.mul_div_floor(
buffer.get_size() as u64,
audio_info.rate() as u64 * audio_info.bpf() as u64,
)
.unwrap()
} else if let Some(ref video_info) = state.video_info {
if *video_info.fps().numer() > 0 {
gst::SECOND
.mul_div_floor(
*video_info.fps().denom() as u64,
*video_info.fps().numer() as u64,
)
.unwrap()
} else {
gst::CLOCK_TIME_NONE
}
} else {
unreachable!()
};
gst_debug!(
CAT,
obj: agg_pad,
"Clipping buffer {:?} with PTS {} and duration {}",
buffer,
pts,
duration
);
if agg_pad == &self.video_pad {
segment.clip(pts, pts + duration).map(|(start, stop)| {
{
let buffer = buffer.make_mut();
buffer.set_pts(start);
if duration.is_some() {
buffer.set_duration(stop - start);
}
}
buffer
})
} else if let Some(ref audio_info) = state.audio_info {
gst_audio::audio_buffer_clip(
buffer,
segment.upcast_ref(),
audio_info.rate(),
audio_info.bpf(),
)
} else {
// Can't really have audio buffers without caps
unreachable!();
}
}
fn aggregate(
&self,
agg: &gst_base::Aggregator,
timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
// FIXME: Can't really happen because we always return NONE from get_next_time() but that
// should be improved!
assert!(!timeout);
// Because peek_buffer() can call into clip() and that would take the state lock again,
// first try getting buffers from both pads here
let video_buffer_and_segment = match self.video_pad.peek_buffer() {
Some(video_buffer) => {
let video_segment = self.video_pad.get_segment();
let video_segment = match video_segment.downcast::<gst::ClockTime>() {
Ok(video_segment) => video_segment,
Err(video_segment) => {
gst_error!(
CAT,
obj: agg,
"Video segment of wrong format {:?}",
video_segment.get_format()
);
return Err(gst::FlowError::Error);
}
};
Some((video_buffer, video_segment))
}
None if !self.video_pad.is_eos() => {
gst_trace!(CAT, obj: agg, "Waiting for video buffer");
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
}
None => None,
};
let audio_buffer_segment_and_pad;
if let Some(audio_pad) = self.audio_pad.lock().unwrap().clone() {
audio_buffer_segment_and_pad = match audio_pad.peek_buffer() {
Some(audio_buffer) if audio_buffer.get_size() == 0 => {
// Skip empty/gap audio buffer
audio_pad.drop_buffer();
gst_trace!(CAT, obj: agg, "Empty audio buffer, waiting for next");
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
}
Some(audio_buffer) => {
let audio_segment = audio_pad.get_segment();
let audio_segment = match audio_segment.downcast::<gst::ClockTime>() {
Ok(audio_segment) => audio_segment,
Err(audio_segment) => {
gst_error!(
CAT,
obj: agg,
"Audio segment of wrong format {:?}",
audio_segment.get_format()
);
return Err(gst::FlowError::Error);
}
};
Some((audio_buffer, audio_segment, audio_pad))
}
None if !audio_pad.is_eos() => {
gst_trace!(CAT, obj: agg, "Waiting for audio buffer");
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
}
None => None,
};
} else {
audio_buffer_segment_and_pad = None;
}
let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage {
Some(ref mut state) => state,
None => return Err(gst::FlowError::Flushing),
};
let (mut current_video_buffer, current_video_running_time_end, next_video_buffer) =
if let Some((video_buffer, video_segment)) = video_buffer_and_segment {
let video_running_time = video_segment.to_running_time(video_buffer.get_pts());
assert!(video_running_time.is_some());
match state.current_video_buffer {
None => {
gst_trace!(CAT, obj: agg, "First video buffer, waiting for second");
state.current_video_buffer = Some((video_buffer, video_running_time));
drop(state_storage);
self.video_pad.drop_buffer();
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
}
Some((ref buffer, _)) => (
buffer.clone(),
video_running_time,
Some((video_buffer, video_running_time)),
),
}
} else {
match (&state.current_video_buffer, &audio_buffer_segment_and_pad) {
(None, None) => {
gst_trace!(
CAT,
obj: agg,
"All pads are EOS and no buffers are queued, finishing"
);
return Err(gst::FlowError::Eos);
}
(None, Some((ref audio_buffer, ref audio_segment, _))) => {
// Create an empty dummy buffer for attaching the audio. This is going to
// be dropped by the sink later.
let audio_running_time =
audio_segment.to_running_time(audio_buffer.get_pts());
assert!(audio_running_time.is_some());
let video_segment = self.video_pad.get_segment();
let video_segment = match video_segment.downcast::<gst::ClockTime>() {
Ok(video_segment) => video_segment,
Err(video_segment) => {
gst_error!(
CAT,
obj: agg,
"Video segment of wrong format {:?}",
video_segment.get_format()
);
return Err(gst::FlowError::Error);
}
};
let video_pts =
video_segment.position_from_running_time(audio_running_time);
if video_pts.is_none() {
gst_warning!(CAT, obj: agg, "Can't output more audio after video EOS");
return Err(gst::FlowError::Eos);
}
let mut buffer = gst::Buffer::new();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(video_pts);
}
(buffer, gst::CLOCK_TIME_NONE, None)
}
(Some((ref buffer, _)), _) => (buffer.clone(), gst::CLOCK_TIME_NONE, None),
}
};
if let Some((audio_buffer, audio_segment, audio_pad)) = audio_buffer_segment_and_pad {
let audio_info = match state.audio_info {
Some(ref audio_info) => audio_info,
None => {
gst_error!(CAT, obj: agg, "Have no audio caps");
return Err(gst::FlowError::NotNegotiated);
}
};
let audio_running_time = audio_segment.to_running_time(audio_buffer.get_pts());
assert!(audio_running_time.is_some());
let duration = gst::SECOND
.mul_div_floor(
audio_buffer.get_size() as u64 / audio_info.bpf() as u64,
audio_info.rate() as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
let audio_running_time_end = audio_running_time + duration;
assert!(audio_running_time_end.is_some());
if audio_running_time_end <= current_video_running_time_end
|| current_video_running_time_end.is_none()
{
let timecode = (audio_running_time + agg.get_base_time())
.map(|t| (t / 100) as i64)
.unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize);
gst_trace!(
CAT,
obj: agg,
"Including audio buffer {:?} with timecode {}: {} <= {}",
audio_buffer,
timecode,
audio_running_time_end,
current_video_running_time_end,
);
state
.current_audio_buffers
.push((audio_buffer, audio_info.clone(), timecode));
audio_pad.drop_buffer();
// If there is still video data, wait for the next audio buffer or EOS,
// otherwise just output the dummy video buffer directly.
if current_video_running_time_end.is_some() {
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
}
}
// Otherwise finish this video buffer with all audio that has accumulated so
// far
}
let audio_buffers = mem::replace(&mut state.current_audio_buffers, Vec::new());
if !audio_buffers.is_empty() {
let current_video_buffer = current_video_buffer.make_mut();
crate::ndisinkmeta::NdiSinkAudioMeta::add(current_video_buffer, audio_buffers);
}
if let Some((video_buffer, video_running_time)) = next_video_buffer {
state.current_video_buffer = Some((video_buffer, video_running_time));
drop(state_storage);
self.video_pad.drop_buffer();
} else {
state.current_video_buffer = None;
drop(state_storage);
}
gst_trace!(
CAT,
obj: agg,
"Finishing video buffer {:?}",
current_video_buffer
);
agg.finish_buffer(current_video_buffer)
}
fn sink_event(
&self,
agg: &gst_base::Aggregator,
pad: &gst_base::AggregatorPad,
event: gst::Event,
) -> bool {
use gst::EventView;
match event.view() {
EventView::Caps(caps) => {
let caps = caps.get_caps_owned();
let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage {
Some(ref mut state) => state,
None => return false,
};
if pad == &self.video_pad {
let info = match gst_video::VideoInfo::from_caps(&caps) {
Ok(info) => info,
Err(_) => {
gst_error!(CAT, obj: pad, "Failed to parse caps {:?}", caps);
return false;
}
};
// 2 frames latency because we queue 1 frame and wait until audio
// up to the end of that frame has arrived.
let latency = if *info.fps().numer() > 0 {
gst::SECOND
.mul_div_floor(
2 * *info.fps().denom() as u64,
*info.fps().numer() as u64,
)
.unwrap_or(80 * gst::MSECOND)
} else {
// let's assume 25fps and 2 frames latency
80 * gst::MSECOND
};
state.video_info = Some(info);
drop(state_storage);
agg.set_latency(latency, gst::CLOCK_TIME_NONE);
// The video caps are passed through as the audio is included only in a meta
agg.set_src_caps(&caps);
} else {
let info = match gst_audio::AudioInfo::from_caps(&caps) {
Ok(info) => info,
Err(_) => {
gst_error!(CAT, obj: pad, "Failed to parse caps {:?}", caps);
return false;
}
};
state.audio_info = Some(info);
}
}
// The video segment is passed through as-is and the video timestamps are preserved
EventView::Segment(segment) if pad == &self.video_pad => {
let segment = segment.get_segment();
gst_debug!(CAT, obj: agg, "Updating segment {:?}", segment);
agg.update_segment(segment);
}
_ => (),
}
self.parent_sink_event(agg, pad, event)
}
fn sink_query(
&self,
agg: &gst_base::Aggregator,
pad: &gst_base::AggregatorPad,
query: &mut gst::QueryRef,
) -> bool {
use gst::QueryView;
match query.view_mut() {
QueryView::Caps(_) if pad == &self.video_pad => {
// Directly forward caps queries
let srcpad = agg.get_static_pad("src").unwrap();
return srcpad.peer_query(query);
}
_ => (),
}
self.parent_sink_query(agg, pad, query)
}
fn negotiate(&self, _agg: &gst_base::Aggregator) -> bool {
// No negotiation needed as the video caps are just passed through
true
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisinkcombiner",
gst::Rank::None,
NdiSinkCombiner::get_type(),
)
}

145
src/ndisinkmeta.rs Normal file
View file

@ -0,0 +1,145 @@
use gst::gst_sys;
use gst::prelude::*;
use std::fmt;
use std::mem;
#[repr(transparent)]
pub struct NdiSinkAudioMeta(imp::NdiSinkAudioMeta);
unsafe impl Send for NdiSinkAudioMeta {}
unsafe impl Sync for NdiSinkAudioMeta {}
impl NdiSinkAudioMeta {
pub fn add(
buffer: &mut gst::BufferRef,
buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>,
) -> gst::MetaRefMut<Self, gst::meta::Standalone> {
unsafe {
// Manually dropping because gst_buffer_add_meta() takes ownership of the
// content of the struct
let mut params = mem::ManuallyDrop::new(imp::NdiSinkAudioMetaParams { buffers });
let meta = gst_sys::gst_buffer_add_meta(
buffer.as_mut_ptr(),
imp::ndi_sink_audio_meta_get_info(),
&mut *params as *mut imp::NdiSinkAudioMetaParams as glib::glib_sys::gpointer,
) as *mut imp::NdiSinkAudioMeta;
Self::from_mut_ptr(buffer, meta)
}
}
pub fn buffers(&self) -> &[(gst::Buffer, gst_audio::AudioInfo, i64)] {
&self.0.buffers
}
}
unsafe impl MetaAPI for NdiSinkAudioMeta {
type GstType = imp::NdiSinkAudioMeta;
fn get_meta_api() -> glib::Type {
imp::ndi_sink_audio_meta_api_get_type()
}
}
impl fmt::Debug for NdiSinkAudioMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("NdiSinkAudioMeta")
.field("buffers", &self.buffers())
.finish()
}
}
mod imp {
use glib::glib_sys;
use glib::translate::*;
use gst::gst_sys;
use once_cell::sync::Lazy;
use std::mem;
use std::ptr;
pub(super) struct NdiSinkAudioMetaParams {
pub buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>,
}
#[repr(C)]
pub struct NdiSinkAudioMeta {
parent: gst_sys::GstMeta,
pub(super) buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>,
}
pub(super) fn ndi_sink_audio_meta_api_get_type() -> glib::Type {
static TYPE: Lazy<glib::Type> = Lazy::new(|| unsafe {
let t = from_glib(gst_sys::gst_meta_api_type_register(
b"GstNdiSinkAudioMetaAPI\0".as_ptr() as *const _,
[ptr::null::<std::os::raw::c_char>()].as_ptr() as *mut *const _,
));
assert_ne!(t, glib::Type::Invalid);
t
});
*TYPE
}
unsafe extern "C" fn ndi_sink_audio_meta_init(
meta: *mut gst_sys::GstMeta,
params: glib_sys::gpointer,
_buffer: *mut gst_sys::GstBuffer,
) -> glib_sys::gboolean {
assert!(!params.is_null());
let meta = &mut *(meta as *mut NdiSinkAudioMeta);
let params = ptr::read(params as *const NdiSinkAudioMetaParams);
ptr::write(&mut meta.buffers, params.buffers);
true.to_glib()
}
unsafe extern "C" fn ndi_sink_audio_meta_free(
meta: *mut gst_sys::GstMeta,
_buffer: *mut gst_sys::GstBuffer,
) {
let meta = &mut *(meta as *mut NdiSinkAudioMeta);
ptr::drop_in_place(&mut meta.buffers);
}
unsafe extern "C" fn ndi_sink_audio_meta_transform(
dest: *mut gst_sys::GstBuffer,
meta: *mut gst_sys::GstMeta,
_buffer: *mut gst_sys::GstBuffer,
_type_: glib_sys::GQuark,
_data: glib_sys::gpointer,
) -> glib_sys::gboolean {
let meta = &*(meta as *mut NdiSinkAudioMeta);
super::NdiSinkAudioMeta::add(gst::BufferRef::from_mut_ptr(dest), meta.buffers.clone());
true.to_glib()
}
pub(super) fn ndi_sink_audio_meta_get_info() -> *const gst_sys::GstMetaInfo {
struct MetaInfo(ptr::NonNull<gst_sys::GstMetaInfo>);
unsafe impl Send for MetaInfo {}
unsafe impl Sync for MetaInfo {}
static META_INFO: Lazy<MetaInfo> = Lazy::new(|| unsafe {
MetaInfo(
ptr::NonNull::new(gst_sys::gst_meta_register(
ndi_sink_audio_meta_api_get_type().to_glib(),
b"GstNdiSinkAudioMeta\0".as_ptr() as *const _,
mem::size_of::<NdiSinkAudioMeta>(),
Some(ndi_sink_audio_meta_init),
Some(ndi_sink_audio_meta_free),
Some(ndi_sink_audio_meta_transform),
) as *mut gst_sys::GstMetaInfo)
.expect("Failed to register meta API"),
)
});
META_INFO.0.as_ptr()
}
}

View file

@ -62,6 +62,18 @@ extern "C" {
p_instance: NDIlib_recv_instance_t,
p_total: *mut NDIlib_recv_queue_t,
);
pub fn NDIlib_send_create(
p_create_settings: *const NDIlib_send_create_t,
) -> NDIlib_send_instance_t;
pub fn NDIlib_send_destroy(p_instance: NDIlib_send_instance_t);
pub fn NDIlib_send_send_video_v2(
p_instance: NDIlib_send_instance_t,
p_video_data: *const NDIlib_video_frame_v2_t,
);
pub fn NDIlib_send_send_audio_v2(
p_instance: NDIlib_send_instance_t,
p_audio_data: *const NDIlib_audio_frame_v2_t,
);
}
pub type NDIlib_find_instance_t = *mut ::std::os::raw::c_void;
@ -147,6 +159,17 @@ pub struct NDIlib_recv_create_v3_t {
pub type NDIlib_recv_instance_t = *mut ::std::os::raw::c_void;
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct NDIlib_send_create_t {
pub p_ndi_name: *const ::std::os::raw::c_char,
pub p_groups: *const ::std::os::raw::c_char,
pub clock_video: bool,
pub clock_audio: bool,
}
pub type NDIlib_send_instance_t = *mut ::std::os::raw::c_void;
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct NDIlib_tally_t {

View file

@ -40,7 +40,7 @@ impl Default for Settings {
connect_timeout: 10000,
timeout: 5000,
bandwidth: ndisys::NDIlib_recv_bandwidth_highest,
timestamp_mode: TimestampMode::ReceiveTime,
timestamp_mode: TimestampMode::ReceiveTimeTimecode,
}
}
}
@ -112,7 +112,7 @@ static PROPERTIES: [subclass::Property; 7] = [
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTime as i32,
TimestampMode::ReceiveTimeTimecode as i32,
glib::ParamFlags::READWRITE,
)
}),

View file

@ -812,7 +812,12 @@ impl<T: ReceiverType> Receiver<T> {
);
let (pts, duration) = match self.0.timestamp_mode {
TimestampMode::ReceiveTime => self.0.observations.process(
TimestampMode::ReceiveTimeTimecode => {
self.0
.observations
.process(self.0.cat, element, (timecode, receive_time), duration)
}
TimestampMode::ReceiveTimeTimestamp => self.0.observations.process(
self.0.cat,
element,
(timestamp, receive_time),