Update to gstreamer-rs 0.17

This commit is contained in:
Sebastian Dröge 2021-09-13 12:26:56 +03:00
parent 160571e251
commit 291d951b01
16 changed files with 1036 additions and 992 deletions

View file

@ -8,16 +8,16 @@ description = "NewTek NDI Plugin"
edition = "2018" edition = "2018"
[dependencies] [dependencies]
glib = "0.10" glib = "0.14"
gst = { package = "gstreamer", version = "0.16", features = ["v1_12"] } gst = { package = "gstreamer", version = "0.17", features = ["v1_12"] }
gst-base = { package = "gstreamer-base", version = "0.16" } gst-base = { package = "gstreamer-base", version = "0.17" }
gst-audio = { package = "gstreamer-audio", version = "0.16" } gst-audio = { package = "gstreamer-audio", version = "0.17" }
gst-video = { package = "gstreamer-video", version = "0.16", features = ["v1_12"] } gst-video = { package = "gstreamer-video", version = "0.17", features = ["v1_12"] }
byte-slice-cast = "1" byte-slice-cast = "1"
once_cell = "1.0" once_cell = "1.0"
[build-dependencies] [build-dependencies]
gst-plugin-version-helper = "0.2" gst-plugin-version-helper = "0.7"
[features] [features]
default = ["interlaced-fields", "reference-timestamps", "sink"] default = ["interlaced-fields", "reference-timestamps", "sink"]

View file

@ -1,5 +1,3 @@
extern crate gst_plugin_version_helper;
fn main() { fn main() {
gst_plugin_version_helper::get_info() gst_plugin_version_helper::info()
} }

View file

@ -1,4 +1,3 @@
use glib::subclass;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_error, gst_log, gst_trace}; use gst::{gst_error, gst_log, gst_trace};
@ -9,24 +8,24 @@ use std::sync::atomic;
use std::sync::Mutex; use std::sync::Mutex;
use std::thread; use std::thread;
use once_cell::sync::Lazy;
use crate::ndi; use crate::ndi;
#[derive(Debug)] #[derive(Debug)]
struct DeviceProvider { pub struct DeviceProvider {
cat: gst::DebugCategory, cat: gst::DebugCategory,
thread: Mutex<Option<thread::JoinHandle<()>>>, thread: Mutex<Option<thread::JoinHandle<()>>>,
current_devices: Mutex<Vec<gst::Device>>, current_devices: Mutex<Vec<super::Device>>,
find: Mutex<Option<ndi::FindInstance>>, find: Mutex<Option<ndi::FindInstance>>,
is_running: atomic::AtomicBool, is_running: atomic::AtomicBool,
} }
#[glib::object_subclass]
impl ObjectSubclass for DeviceProvider { impl ObjectSubclass for DeviceProvider {
const NAME: &'static str = "NdiDeviceProvider"; const NAME: &'static str = "NdiDeviceProvider";
type Type = super::DeviceProvider;
type ParentType = gst::DeviceProvider; type ParentType = gst::DeviceProvider;
type Instance = subclass::simple::InstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn new() -> Self { fn new() -> Self {
Self { Self {
@ -41,26 +40,32 @@ impl ObjectSubclass for DeviceProvider {
is_running: atomic::AtomicBool::new(false), is_running: atomic::AtomicBool::new(false),
} }
} }
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"NewTek NDI Device Provider",
"Source/Audio/Video/Network",
"NewTek NDI Device Provider",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
);
}
} }
impl ObjectImpl for DeviceProvider { impl ObjectImpl for DeviceProvider {}
glib::glib_object_impl!();
}
impl DeviceProviderImpl for DeviceProvider { impl DeviceProviderImpl for DeviceProvider {
fn probe(&self, _device_provider: &gst::DeviceProvider) -> Vec<gst::Device> { fn metadata() -> Option<&'static gst::subclass::DeviceProviderMetadata> {
self.current_devices.lock().unwrap().clone() static METADATA: Lazy<gst::subclass::DeviceProviderMetadata> = Lazy::new(|| {
gst::subclass::DeviceProviderMetadata::new("NewTek NDI Device Provider",
"Source/Audio/Video/Network",
"NewTek NDI Device Provider",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>")
});
Some(&*METADATA)
} }
fn start(&self, device_provider: &gst::DeviceProvider) -> Result<(), gst::LoggableError> {
fn probe(&self, _device_provider: &Self::Type) -> Vec<gst::Device> {
self.current_devices
.lock()
.unwrap()
.iter()
.map(|d| d.clone().upcast())
.collect()
}
fn start(&self, device_provider: &Self::Type) -> Result<(), gst::LoggableError> {
let mut thread_guard = self.thread.lock().unwrap(); let mut thread_guard = self.thread.lock().unwrap();
if thread_guard.is_some() { if thread_guard.is_some() {
gst_log!( gst_log!(
@ -121,7 +126,8 @@ impl DeviceProviderImpl for DeviceProvider {
Ok(()) Ok(())
} }
fn stop(&self, _device_provider: &gst::DeviceProvider) {
fn stop(&self, _device_provider: &Self::Type) {
if let Some(_thread) = self.thread.lock().unwrap().take() { if let Some(_thread) = self.thread.lock().unwrap().take() {
self.is_running.store(false, atomic::Ordering::SeqCst); self.is_running.store(false, atomic::Ordering::SeqCst);
// Don't actually join because that might take a while // Don't actually join because that might take a while
@ -130,7 +136,7 @@ impl DeviceProviderImpl for DeviceProvider {
} }
impl DeviceProvider { impl DeviceProvider {
fn poll(&self, device_provider: &gst::DeviceProvider, first: bool) { fn poll(&self, device_provider: &super::DeviceProvider, first: bool) {
let mut find_guard = self.find.lock().unwrap(); let mut find_guard = self.find.lock().unwrap();
let find = match *find_guard { let find = match *find_guard {
None => return, None => return,
@ -189,11 +195,11 @@ impl DeviceProvider {
source source
); );
// Add once for audio, another time for video // Add once for audio, another time for video
let device = Device::new(&source, true); let device = super::Device::new(&source, true);
device_provider.device_add(&device); device_provider.device_add(&device);
current_devices_guard.push(device); current_devices_guard.push(device);
let device = Device::new(&source, false); let device = super::Device::new(&source, false);
device_provider.device_add(&device); device_provider.device_add(&device);
current_devices_guard.push(device); current_devices_guard.push(device);
} }
@ -201,18 +207,16 @@ impl DeviceProvider {
} }
#[derive(Debug)] #[derive(Debug)]
struct Device { pub struct Device {
cat: gst::DebugCategory, cat: gst::DebugCategory,
source: OnceCell<(ndi::Source<'static>, glib::Type)>, source: OnceCell<(ndi::Source<'static>, glib::Type)>,
} }
#[glib::object_subclass]
impl ObjectSubclass for Device { impl ObjectSubclass for Device {
const NAME: &'static str = "NdiDevice"; const NAME: &'static str = "NdiDevice";
type Type = super::Device;
type ParentType = gst::Device; type ParentType = gst::Device;
type Instance = subclass::simple::InstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn new() -> Self { fn new() -> Self {
Self { Self {
@ -226,18 +230,16 @@ impl ObjectSubclass for Device {
} }
} }
impl ObjectImpl for Device { impl ObjectImpl for Device {}
glib::glib_object_impl!();
}
impl DeviceImpl for Device { impl DeviceImpl for Device {
fn create_element( fn create_element(
&self, &self,
_device: &gst::Device, _device: &Self::Type,
name: Option<&str>, name: Option<&str>,
) -> Result<gst::Element, gst::LoggableError> { ) -> Result<gst::Element, gst::LoggableError> {
let source_info = self.source.get().unwrap(); let source_info = self.source.get().unwrap();
let element = glib::Object::new( let element = glib::Object::with_type(
source_info.1, source_info.1,
&[ &[
("name", &name), ("name", &name),
@ -253,8 +255,8 @@ impl DeviceImpl for Device {
} }
} }
impl Device { impl super::Device {
fn new(source: &ndi::Source<'_>, is_audio: bool) -> gst::Device { fn new(source: &ndi::Source<'_>, is_audio: bool) -> super::Device {
let display_name = format!( let display_name = format!(
"{} ({})", "{} ({})",
source.ndi_name(), source.ndi_name(),
@ -267,13 +269,13 @@ impl Device {
// Get the caps from the template caps of the corresponding source element // Get the caps from the template caps of the corresponding source element
let element_type = if is_audio { let element_type = if is_audio {
crate::ndiaudiosrc::NdiAudioSrc::get_type() crate::ndiaudiosrc::NdiAudioSrc::static_type()
} else { } else {
crate::ndivideosrc::NdiVideoSrc::get_type() crate::ndivideosrc::NdiVideoSrc::static_type()
}; };
let element_class = gst::ElementClass::from_type(element_type).unwrap(); let element_class = glib::Class::<gst::Element>::from_type(element_type).unwrap();
let templ = element_class.get_pad_template("src").unwrap(); let templ = element_class.pad_template("src").unwrap();
let caps = templ.get_caps().unwrap(); let caps = templ.caps();
// Put the url-address into the extra properties // Put the url-address into the extra properties
let extra_properties = gst::Structure::builder("properties") let extra_properties = gst::Structure::builder("properties")
@ -281,17 +283,12 @@ impl Device {
.field("url-address", &source.url_address()) .field("url-address", &source.url_address())
.build(); .build();
let device = glib::Object::new( let device = glib::Object::new::<super::Device>(&[
Device::get_type(), ("caps", &caps),
&[ ("display-name", &display_name),
("caps", &caps), ("device-class", &device_class),
("display-name", &display_name), ("properties", &extra_properties),
("device-class", &device_class), ])
("properties", &extra_properties),
],
)
.unwrap()
.dynamic_cast::<gst::Device>()
.unwrap(); .unwrap();
let device_impl = Device::from_instance(&device); let device_impl = Device::from_instance(&device);
@ -303,12 +300,3 @@ impl Device {
device device
} }
} }
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::DeviceProvider::register(
Some(plugin),
"ndideviceprovider",
gst::Rank::Primary,
DeviceProvider::get_type(),
)
}

View file

@ -0,0 +1,26 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct DeviceProvider(ObjectSubclass<imp::DeviceProvider>) @extends gst::DeviceProvider, gst::Object;
}
unsafe impl Send for DeviceProvider {}
unsafe impl Sync for DeviceProvider {}
glib::wrapper! {
pub struct Device(ObjectSubclass<imp::Device>) @extends gst::Device, gst::Object;
}
unsafe impl Send for Device {}
unsafe impl Sync for Device {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::DeviceProvider::register(
Some(plugin),
"ndideviceprovider",
gst::Rank::Primary,
DeviceProvider::static_type(),
)
}

View file

@ -1,5 +1,3 @@
use glib::prelude::*;
mod device_provider; mod device_provider;
pub mod ndi; pub mod ndi;
mod ndiaudiosrc; mod ndiaudiosrc;
@ -38,7 +36,7 @@ pub enum TimestampMode {
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
if !ndi::initialize() { if !ndi::initialize() {
return Err(glib::glib_bool_error!("Cannot initialize NDI")); return Err(glib::bool_error!("Cannot initialize NDI"));
} }
device_provider::register(plugin)?; device_provider::register(plugin)?;
@ -68,7 +66,7 @@ static TIMECODE_CAPS: Lazy<gst::Caps> =
static TIMESTAMP_CAPS: Lazy<gst::Caps> = static TIMESTAMP_CAPS: Lazy<gst::Caps> =
Lazy::new(|| gst::Caps::new_simple("timestamp/x-ndi-timestamp", &[])); Lazy::new(|| gst::Caps::new_simple("timestamp/x-ndi-timestamp", &[]));
gst::gst_plugin_define!( gst::plugin_define!(
ndi, ndi,
env!("CARGO_PKG_DESCRIPTION"), env!("CARGO_PKG_DESCRIPTION"),
plugin_init, plugin_init,

View file

@ -420,7 +420,7 @@ pub struct SendInstance(ptr::NonNull<::std::os::raw::c_void>);
unsafe impl Send for SendInstance {} unsafe impl Send for SendInstance {}
impl SendInstance { impl SendInstance {
pub fn builder<'a>(ndi_name: &'a str) -> SendBuilder<'a> { pub fn builder(ndi_name: &str) -> SendBuilder {
SendBuilder { SendBuilder {
ndi_name, ndi_name,
clock_video: false, clock_video: false,
@ -749,7 +749,7 @@ impl<'a> VideoFrame<'a> {
impl<'a> Drop for VideoFrame<'a> { impl<'a> Drop for VideoFrame<'a> {
#[allow(irrefutable_let_patterns)] #[allow(irrefutable_let_patterns)]
fn drop(&mut self) { fn drop(&mut self) {
if let VideoFrame::BorrowedRecv(ref mut frame, ref recv) = *self { if let VideoFrame::BorrowedRecv(ref mut frame, recv) = *self {
unsafe { unsafe {
NDIlib_recv_free_video_v2(((recv.0).0).0.as_ptr() as *mut _, frame); NDIlib_recv_free_video_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
} }
@ -918,7 +918,7 @@ impl<'a> AudioFrame<'a> {
impl<'a> Drop for AudioFrame<'a> { impl<'a> Drop for AudioFrame<'a> {
#[allow(irrefutable_let_patterns)] #[allow(irrefutable_let_patterns)]
fn drop(&mut self) { fn drop(&mut self) {
if let AudioFrame::BorrowedRecv(ref mut frame, ref recv) = *self { if let AudioFrame::BorrowedRecv(ref mut frame, recv) = *self {
unsafe { unsafe {
NDIlib_recv_free_audio_v2(((recv.0).0).0.as_ptr() as *mut _, frame); NDIlib_recv_free_audio_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
} }
@ -1010,7 +1010,7 @@ impl<'a> Default for MetadataFrame<'a> {
impl<'a> Drop for MetadataFrame<'a> { impl<'a> Drop for MetadataFrame<'a> {
fn drop(&mut self) { fn drop(&mut self) {
if let MetadataFrame::Borrowed(ref mut frame, ref recv) = *self { if let MetadataFrame::Borrowed(ref mut frame, recv) = *self {
unsafe { unsafe {
NDIlib_recv_free_metadata(((recv.0).0).0.as_ptr() as *mut _, frame); NDIlib_recv_free_metadata(((recv.0).0).0.as_ptr() as *mut _, frame);
} }

View file

@ -1,7 +1,6 @@
use glib::subclass;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg}; use gst::{gst_debug, gst_error};
use gst_base::prelude::*; use gst_base::prelude::*;
use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
@ -9,6 +8,8 @@ use gst_base::subclass::prelude::*;
use std::sync::Mutex; use std::sync::Mutex;
use std::{i32, u32}; use std::{i32, u32};
use once_cell::sync::Lazy;
use crate::connect_ndi; use crate::connect_ndi;
use crate::ndisys; use crate::ndisys;
@ -46,94 +47,10 @@ impl Default for Settings {
} }
} }
static PROPERTIES: [subclass::Property; 8] = [
subclass::Property("ndi-name", |name| {
glib::ParamSpec::string(
name,
"NDI Name",
"NDI stream name of the sender",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("url-address", |name| {
glib::ParamSpec::string(
name,
"URL/Address",
"URL/address and port of the sender, e.g. 127.0.0.1:5961",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("receiver-ndi-name", |name| {
glib::ParamSpec::string(
name,
"Receiver NDI Name",
"NDI stream name of this receiver",
Some(&*DEFAULT_RECEIVER_NDI_NAME),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("connect-timeout", |name| {
glib::ParamSpec::uint(
name,
"Connect Timeout",
"Connection timeout in ms",
0,
u32::MAX,
10000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timeout", |name| {
glib::ParamSpec::uint(
name,
"Timeout",
"Receive timeout in ms",
0,
u32::MAX,
5000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("max-queue-length", |name| {
glib::ParamSpec::uint(
name,
"Max Queue Length",
"Maximum receive queue length",
0,
u32::MAX,
5,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("bandwidth", |name| {
glib::ParamSpec::int(
name,
"Bandwidth",
"Bandwidth, -10 metadata-only, 10 audio-only, 100 highest",
-10,
100,
100,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timestamp-mode", |name| {
glib::ParamSpec::enum_(
name,
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTimeTimecode as i32,
glib::ParamFlags::READWRITE,
)
}),
];
struct State { struct State {
info: Option<gst_audio::AudioInfo>, info: Option<gst_audio::AudioInfo>,
receiver: Option<Receiver<AudioReceiver>>, receiver: Option<Receiver<AudioReceiver>>,
current_latency: gst::ClockTime, current_latency: Option<gst::ClockTime>,
} }
impl Default for State { impl Default for State {
@ -141,25 +58,23 @@ impl Default for State {
State { State {
info: None, info: None,
receiver: None, receiver: None,
current_latency: gst::CLOCK_TIME_NONE, current_latency: gst::ClockTime::NONE,
} }
} }
} }
pub(crate) struct NdiAudioSrc { pub struct NdiAudioSrc {
cat: gst::DebugCategory, cat: gst::DebugCategory,
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
receiver_controller: Mutex<Option<ReceiverControlHandle<AudioReceiver>>>, receiver_controller: Mutex<Option<ReceiverControlHandle<AudioReceiver>>>,
} }
#[glib::object_subclass]
impl ObjectSubclass for NdiAudioSrc { impl ObjectSubclass for NdiAudioSrc {
const NAME: &'static str = "NdiAudioSrc"; const NAME: &'static str = "NdiAudioSrc";
type Type = super::NdiAudioSrc;
type ParentType = gst_base::BaseSrc; type ParentType = gst_base::BaseSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn new() -> Self { fn new() -> Self {
Self { Self {
@ -173,89 +88,130 @@ impl ObjectSubclass for NdiAudioSrc {
receiver_controller: Mutex::new(None), receiver_controller: Mutex::new(None),
} }
} }
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"NewTek NDI Audio Source",
"Source",
"NewTek NDI audio source",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_simple(
"audio/x-raw",
&[
(
"format",
&gst::List::new(&[&gst_audio::AUDIO_FORMAT_S16.to_string()]),
),
("rate", &gst::IntRange::<i32>::new(1, i32::MAX)),
("channels", &gst::IntRange::<i32>::new(1, i32::MAX)),
("layout", &"interleaved"),
],
);
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
} }
impl ObjectImpl for NdiAudioSrc { impl ObjectImpl for NdiAudioSrc {
glib::glib_object_impl!(); fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpec::new_string(
"ndi-name",
"NDI Name",
"NDI stream name of the sender",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"url-address",
"URL/Address",
"URL/address and port of the sender, e.g. 127.0.0.1:5961",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"receiver-ndi-name",
"Receiver NDI Name",
"NDI stream name of this receiver",
Some(&*DEFAULT_RECEIVER_NDI_NAME),
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"connect-timeout",
"Connect Timeout",
"Connection timeout in ms",
0,
u32::MAX,
10000,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"timeout",
"Timeout",
"Receive timeout in ms",
0,
u32::MAX,
5000,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"max-queue-length",
"Max Queue Length",
"Maximum receive queue length",
0,
u32::MAX,
5,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_int(
"bandwidth",
"Bandwidth",
"Bandwidth, -10 metadata-only, 10 audio-only, 100 highest",
-10,
100,
100,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_enum(
"timestamp-mode",
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTimeTimecode as i32,
glib::ParamFlags::READWRITE,
),
]
});
fn constructed(&self, obj: &glib::Object) { PROPERTIES.as_ref()
self.parent_constructed(obj);
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
// Initialize live-ness and notify the base class that
// we'd like to operate in Time format
basesrc.set_live(true);
basesrc.set_format(gst::Format::Time);
} }
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { fn constructed(&self, obj: &Self::Type) {
let prop = &PROPERTIES[id]; self.parent_constructed(obj);
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
match *prop { // Initialize live-ness and notify the base class that
subclass::Property("ndi-name", ..) => { // we'd like to operate in Time format
obj.set_live(true);
obj.set_format(gst::Format::Time);
}
fn set_property(
&self,
obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() {
"ndi-name" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let ndi_name = value.get().unwrap(); let ndi_name = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing ndi-name from {:?} to {:?}", "Changing ndi-name from {:?} to {:?}",
settings.ndi_name, settings.ndi_name,
ndi_name, ndi_name,
); );
settings.ndi_name = ndi_name; settings.ndi_name = ndi_name;
} }
subclass::Property("url-address", ..) => { "url-address" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let url_address = value.get().unwrap(); let url_address = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing url-address from {:?} to {:?}", "Changing url-address from {:?} to {:?}",
settings.url_address, settings.url_address,
url_address, url_address,
); );
settings.url_address = url_address; settings.url_address = url_address;
} }
subclass::Property("receiver-ndi-name", ..) => { "receiver-ndi-name" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let receiver_ndi_name = value.get().unwrap(); let receiver_ndi_name = value.get::<Option<String>>().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing receiver-ndi-name from {:?} to {:?}", "Changing receiver-ndi-name from {:?} to {:?}",
settings.receiver_ndi_name, settings.receiver_ndi_name,
receiver_ndi_name, receiver_ndi_name,
@ -263,67 +219,66 @@ impl ObjectImpl for NdiAudioSrc {
settings.receiver_ndi_name = settings.receiver_ndi_name =
receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone()); receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone());
} }
subclass::Property("connect-timeout", ..) => { "connect-timeout" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let connect_timeout = value.get_some().unwrap(); let connect_timeout = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing connect-timeout from {} to {}", "Changing connect-timeout from {} to {}",
settings.connect_timeout, settings.connect_timeout,
connect_timeout, connect_timeout,
); );
settings.connect_timeout = connect_timeout; settings.connect_timeout = connect_timeout;
} }
subclass::Property("timeout", ..) => { "timeout" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let timeout = value.get_some().unwrap(); let timeout = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing timeout from {} to {}", "Changing timeout from {} to {}",
settings.timeout, settings.timeout,
timeout, timeout,
); );
settings.timeout = timeout; settings.timeout = timeout;
} }
subclass::Property("max-queue-length", ..) => { "max-queue-length" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let max_queue_length = value.get_some().unwrap(); let max_queue_length = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing max-queue-length from {} to {}", "Changing max-queue-length from {} to {}",
settings.max_queue_length, settings.max_queue_length,
max_queue_length, max_queue_length,
); );
settings.max_queue_length = max_queue_length; settings.max_queue_length = max_queue_length;
} }
subclass::Property("bandwidth", ..) => { "bandwidth" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get_some().unwrap(); let bandwidth = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing bandwidth from {} to {}", "Changing bandwidth from {} to {}",
settings.bandwidth, settings.bandwidth,
bandwidth, bandwidth,
); );
settings.bandwidth = bandwidth; settings.bandwidth = bandwidth;
} }
subclass::Property("timestamp-mode", ..) => { "timestamp-mode" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let timestamp_mode = value.get_some().unwrap(); let timestamp_mode = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing timestamp mode from {:?} to {:?}", "Changing timestamp mode from {:?} to {:?}",
settings.timestamp_mode, settings.timestamp_mode,
timestamp_mode timestamp_mode
); );
if settings.timestamp_mode != timestamp_mode { if settings.timestamp_mode != timestamp_mode {
let _ = let _ = obj.post_message(gst::message::Latency::builder().src(obj).build());
basesrc.post_message(gst::message::Latency::builder().src(basesrc).build());
} }
settings.timestamp_mode = timestamp_mode; settings.timestamp_mode = timestamp_mode;
} }
@ -331,41 +286,39 @@ impl ObjectImpl for NdiAudioSrc {
} }
} }
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> { fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let prop = &PROPERTIES[id]; match pspec.name() {
"ndi-name" => {
match *prop {
subclass::Property("ndi-name", ..) => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.ndi_name.to_value()) settings.ndi_name.to_value()
} }
subclass::Property("url-address", ..) => { "url-address" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.url_address.to_value()) settings.url_address.to_value()
} }
subclass::Property("receiver-ndi-name", ..) => { "receiver-ndi-name" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.receiver_ndi_name.to_value()) settings.receiver_ndi_name.to_value()
} }
subclass::Property("connect-timeout", ..) => { "connect-timeout" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.connect_timeout.to_value()) settings.connect_timeout.to_value()
} }
subclass::Property("timeout", ..) => { "timeout" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.timeout.to_value()) settings.timeout.to_value()
} }
subclass::Property("max-queue-length", ..) => { "max-queue-length" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.max_queue_length.to_value()) settings.max_queue_length.to_value()
} }
subclass::Property("bandwidth", ..) => { "bandwidth" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.bandwidth.to_value()) settings.bandwidth.to_value()
} }
subclass::Property("timestamp-mode", ..) => { "timestamp-mode" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.timestamp_mode.to_value()) settings.timestamp_mode.to_value()
} }
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -373,9 +326,51 @@ impl ObjectImpl for NdiAudioSrc {
} }
impl ElementImpl for NdiAudioSrc { impl ElementImpl for NdiAudioSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"NewTek NDI Audio Source",
"Source",
"NewTek NDI audio source",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::new_simple(
"audio/x-raw",
&[
(
"format",
&gst::List::new(&[&gst_audio::AUDIO_FORMAT_S16.to_string()]),
),
("rate", &gst::IntRange::<i32>::new(1, i32::MAX)),
("channels", &gst::IntRange::<i32>::new(1, i32::MAX)),
("layout", &"interleaved"),
],
);
let audio_src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&caps,
)
.unwrap();
vec![audio_src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state( fn change_state(
&self, &self,
element: &gst::Element, element: &Self::Type,
transition: gst::StateChange, transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition { match transition {
@ -402,13 +397,13 @@ impl ElementImpl for NdiAudioSrc {
} }
impl BaseSrcImpl for NdiAudioSrc { impl BaseSrcImpl for NdiAudioSrc {
fn negotiate(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::LoggableError> { fn negotiate(&self, _element: &Self::Type) -> Result<(), gst::LoggableError> {
// Always succeed here without doing anything: we will set the caps once we received a // Always succeed here without doing anything: we will set the caps once we received a
// buffer, there's nothing we can negotiate // buffer, there's nothing we can negotiate
Ok(()) Ok(())
} }
fn unlock(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { fn unlock(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Unlocking",); gst_debug!(self.cat, obj: element, "Unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(true); controller.set_flushing(true);
@ -416,7 +411,7 @@ impl BaseSrcImpl for NdiAudioSrc {
Ok(()) Ok(())
} }
fn unlock_stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { fn unlock_stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Stop unlocking",); gst_debug!(self.cat, obj: element, "Stop unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(false); controller.set_flushing(false);
@ -424,12 +419,12 @@ impl BaseSrcImpl for NdiAudioSrc {
Ok(()) Ok(())
} }
fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default(); *self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap().clone(); let settings = self.settings.lock().unwrap().clone();
if settings.ndi_name.is_none() && settings.url_address.is_none() { if settings.ndi_name.is_none() && settings.url_address.is_none() {
return Err(gst_error_msg!( return Err(gst::error_msg!(
gst::LibraryError::Settings, gst::LibraryError::Settings,
["No NDI name or URL/address given"] ["No NDI name or URL/address given"]
)); ));
@ -437,7 +432,7 @@ impl BaseSrcImpl for NdiAudioSrc {
let receiver = connect_ndi( let receiver = connect_ndi(
self.cat, self.cat,
element, element.upcast_ref(),
settings.ndi_name.as_deref(), settings.ndi_name.as_deref(),
settings.url_address.as_deref(), settings.url_address.as_deref(),
&settings.receiver_ndi_name, &settings.receiver_ndi_name,
@ -450,7 +445,7 @@ impl BaseSrcImpl for NdiAudioSrc {
// settings.id_receiver exists // settings.id_receiver exists
match receiver { match receiver {
None => Err(gst_error_msg!( None => Err(gst::error_msg!(
gst::ResourceError::NotFound, gst::ResourceError::NotFound,
["Could not connect to this source"] ["Could not connect to this source"]
)), )),
@ -465,7 +460,7 @@ impl BaseSrcImpl for NdiAudioSrc {
} }
} }
fn stop(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { fn stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() { if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() {
controller.shutdown(); controller.shutdown();
} }
@ -473,7 +468,7 @@ impl BaseSrcImpl for NdiAudioSrc {
Ok(()) Ok(())
} }
fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool { fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool {
use gst::QueryView; use gst::QueryView;
match query.view_mut() { match query.view_mut() {
@ -486,14 +481,14 @@ impl BaseSrcImpl for NdiAudioSrc {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
if state.current_latency.is_some() { if let Some(latency) = state.current_latency {
let min = if settings.timestamp_mode != TimestampMode::Timecode { let min = if settings.timestamp_mode != TimestampMode::Timecode {
state.current_latency latency
} else { } else {
0.into() gst::ClockTime::ZERO
}; };
let max = 5 * state.current_latency; let max = 5 * latency;
gst_debug!( gst_debug!(
self.cat, self.cat,
@ -512,11 +507,11 @@ impl BaseSrcImpl for NdiAudioSrc {
} }
} }
fn fixate(&self, element: &gst_base::BaseSrc, mut caps: gst::Caps) -> gst::Caps { fn fixate(&self, element: &Self::Type, mut caps: gst::Caps) -> gst::Caps {
caps.truncate(); caps.truncate();
{ {
let caps = caps.make_mut(); let caps = caps.make_mut();
let s = caps.get_mut_structure(0).unwrap(); let s = caps.structure_mut(0).unwrap();
s.fixate_field_nearest_int("rate", 48_000); s.fixate_field_nearest_int("rate", 48_000);
s.fixate_field_nearest_int("channels", 2); s.fixate_field_nearest_int("channels", 2);
} }
@ -526,7 +521,7 @@ impl BaseSrcImpl for NdiAudioSrc {
fn create( fn create(
&self, &self,
element: &gst_base::BaseSrc, element: &Self::Type,
_offset: u64, _offset: u64,
_buffer: Option<&mut gst::BufferRef>, _buffer: Option<&mut gst::BufferRef>,
_length: u32, _length: u32,
@ -548,7 +543,7 @@ impl BaseSrcImpl for NdiAudioSrc {
state.receiver = Some(recv); state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) { if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().map_err(|_| { let caps = info.to_caps().map_err(|_| {
gst_element_error!( gst::element_error!(
element, element,
gst::ResourceError::Settings, gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info] ["Invalid audio info received: {:?}", info]
@ -556,11 +551,11 @@ impl BaseSrcImpl for NdiAudioSrc {
gst::FlowError::NotNegotiated gst::FlowError::NotNegotiated
})?; })?;
state.info = Some(info); state.info = Some(info);
state.current_latency = buffer.get_duration(); state.current_latency = buffer.duration();
drop(state); drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element.set_caps(&caps).map_err(|_| { element.set_caps(&caps).map_err(|_| {
gst_element_error!( gst::element_error!(
element, element,
gst::CoreError::Negotiation, gst::CoreError::Negotiation,
["Failed to negotiate caps: {:?}", caps] ["Failed to negotiate caps: {:?}", caps]
@ -580,12 +575,3 @@ impl BaseSrcImpl for NdiAudioSrc {
} }
} }
} }
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndiaudiosrc",
gst::Rank::None,
NdiAudioSrc::get_type(),
)
}

19
src/ndiaudiosrc/mod.rs Normal file
View file

@ -0,0 +1,19 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiAudioSrc(ObjectSubclass<imp::NdiAudioSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
}
unsafe impl Send for NdiAudioSrc {}
unsafe impl Sync for NdiAudioSrc {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndiaudiosrc",
gst::Rank::None,
NdiAudioSrc::static_type(),
)
}

View file

@ -1,15 +1,15 @@
use glib::subclass;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_error_msg, gst_info, gst_loggable_error, gst_trace}; use gst::{gst_debug, gst_error, gst_info, gst_trace};
use gst_base::{subclass::prelude::*, BaseSinkExtManual}; use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use std::sync::Mutex; use std::sync::Mutex;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use super::ndi::SendInstance; use crate::ndi::SendInstance;
static DEFAULT_SENDER_NDI_NAME: Lazy<String> = Lazy::new(|| { static DEFAULT_SENDER_NDI_NAME: Lazy<String> = Lazy::new(|| {
format!( format!(
@ -32,16 +32,6 @@ impl Default for Settings {
} }
} }
static PROPERTIES: [subclass::Property; 1] = [subclass::Property("ndi-name", |name| {
glib::ParamSpec::string(
name,
"NDI Name",
"NDI Name to use",
Some(DEFAULT_SENDER_NDI_NAME.as_ref()),
glib::ParamFlags::READWRITE,
)
})];
struct State { struct State {
send: SendInstance, send: SendInstance,
video_info: Option<gst_video::VideoInfo>, video_info: Option<gst_video::VideoInfo>,
@ -57,13 +47,11 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new("ndisink", gst::DebugColorFlags::empty(), Some("NDI Sink")) gst::DebugCategory::new("ndisink", gst::DebugColorFlags::empty(), Some("NDI Sink"))
}); });
#[glib::object_subclass]
impl ObjectSubclass for NdiSink { impl ObjectSubclass for NdiSink {
const NAME: &'static str = "NdiSink"; const NAME: &'static str = "NdiSink";
type Type = super::NdiSink;
type ParentType = gst_base::BaseSink; type ParentType = gst_base::BaseSink;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn new() -> Self { fn new() -> Self {
Self { Self {
@ -71,106 +59,129 @@ impl ObjectSubclass for NdiSink {
state: Mutex::new(Default::default()), state: Mutex::new(Default::default()),
} }
} }
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"NDI Sink",
"Sink/Audio/Video",
"Render as an NDI stream",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::builder_full()
.structure(
gst::Structure::builder("video/x-raw")
.field(
"format",
&gst::List::new(&[
&gst_video::VideoFormat::Uyvy.to_str(),
&gst_video::VideoFormat::I420.to_str(),
&gst_video::VideoFormat::Nv12.to_str(),
&gst_video::VideoFormat::Nv21.to_str(),
&gst_video::VideoFormat::Yv12.to_str(),
&gst_video::VideoFormat::Bgra.to_str(),
&gst_video::VideoFormat::Bgrx.to_str(),
&gst_video::VideoFormat::Rgba.to_str(),
&gst_video::VideoFormat::Rgbx.to_str(),
]),
)
.field("width", &gst::IntRange::<i32>::new(1, std::i32::MAX))
.field("height", &gst::IntRange::<i32>::new(1, std::i32::MAX))
.field(
"framerate",
&gst::FractionRange::new(
gst::Fraction::new(0, 1),
gst::Fraction::new(std::i32::MAX, 1),
),
)
.build(),
)
.structure(
gst::Structure::builder("audio/x-raw")
.field("format", &gst_audio::AUDIO_FORMAT_S16.to_str())
.field("rate", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("channels", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("layout", &"interleaved")
.build(),
)
.build();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();
klass.add_pad_template(sink_pad_template);
klass.install_properties(&PROPERTIES);
}
} }
impl ObjectImpl for NdiSink { impl ObjectImpl for NdiSink {
glib::glib_object_impl!(); fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpec::new_string(
"ndi-name",
"NDI Name",
"NDI Name to use",
Some(DEFAULT_SENDER_NDI_NAME.as_ref()),
glib::ParamFlags::READWRITE,
)]
});
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) { PROPERTIES.as_ref()
let prop = &PROPERTIES[id]; }
match *prop {
subclass::Property("ndi-name", ..) => { fn set_property(
&self,
_obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() {
"ndi-name" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
settings.ndi_name = value settings.ndi_name = value
.get::<String>() .get::<String>()
.unwrap() .unwrap_or_else(|_| DEFAULT_SENDER_NDI_NAME.clone());
.unwrap_or_else(|| DEFAULT_SENDER_NDI_NAME.clone());
} }
_ => unimplemented!(), _ => unimplemented!(),
}; };
} }
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> { fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let prop = &PROPERTIES[id]; match pspec.name() {
match *prop { "ndi-name" => {
subclass::Property("ndi-name", ..) => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.ndi_name.to_value()) settings.ndi_name.to_value()
} }
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
} }
impl ElementImpl for NdiSink {} impl ElementImpl for NdiSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"NDI Sink",
"Sink/Audio/Video",
"Render as an NDI stream",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder_full()
.structure(
gst::Structure::builder("video/x-raw")
.field(
"format",
&gst::List::new(&[
&gst_video::VideoFormat::Uyvy.to_str(),
&gst_video::VideoFormat::I420.to_str(),
&gst_video::VideoFormat::Nv12.to_str(),
&gst_video::VideoFormat::Nv21.to_str(),
&gst_video::VideoFormat::Yv12.to_str(),
&gst_video::VideoFormat::Bgra.to_str(),
&gst_video::VideoFormat::Bgrx.to_str(),
&gst_video::VideoFormat::Rgba.to_str(),
&gst_video::VideoFormat::Rgbx.to_str(),
]),
)
.field("width", &gst::IntRange::<i32>::new(1, std::i32::MAX))
.field("height", &gst::IntRange::<i32>::new(1, std::i32::MAX))
.field(
"framerate",
&gst::FractionRange::new(
gst::Fraction::new(0, 1),
gst::Fraction::new(std::i32::MAX, 1),
),
)
.build(),
)
.structure(
gst::Structure::builder("audio/x-raw")
.field("format", &gst_audio::AUDIO_FORMAT_S16.to_str())
.field("rate", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("channels", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("layout", &"interleaved")
.build(),
)
.build();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}
impl BaseSinkImpl for NdiSink { impl BaseSinkImpl for NdiSink {
fn start(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap(); let mut state_storage = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let send = SendInstance::builder(&settings.ndi_name) let send = SendInstance::builder(&settings.ndi_name)
.build() .build()
.ok_or_else(|| { .ok_or_else(|| {
gst_error_msg!( gst::error_msg!(
gst::ResourceError::OpenWrite, gst::ResourceError::OpenWrite,
["Could not create send instance"] ["Could not create send instance"]
) )
@ -187,7 +198,7 @@ impl BaseSinkImpl for NdiSink {
Ok(()) Ok(())
} }
fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap(); let mut state_storage = self.state.lock().unwrap();
*state_storage = None; *state_storage = None;
@ -196,37 +207,33 @@ impl BaseSinkImpl for NdiSink {
Ok(()) Ok(())
} }
fn unlock(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { fn unlock(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
Ok(()) Ok(())
} }
fn unlock_stop(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> { fn unlock_stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
Ok(()) Ok(())
} }
fn set_caps( fn set_caps(&self, element: &Self::Type, caps: &gst::Caps) -> Result<(), gst::LoggableError> {
&self,
element: &gst_base::BaseSink,
caps: &gst::Caps,
) -> Result<(), gst::LoggableError> {
gst_debug!(CAT, obj: element, "Setting caps {}", caps); gst_debug!(CAT, obj: element, "Setting caps {}", caps);
let mut state_storage = self.state.lock().unwrap(); let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage { let state = match &mut *state_storage {
None => return Err(gst_loggable_error!(CAT, "Sink not started yet")), None => return Err(gst::loggable_error!(CAT, "Sink not started yet")),
Some(ref mut state) => state, Some(ref mut state) => state,
}; };
let s = caps.get_structure(0).unwrap(); let s = caps.structure(0).unwrap();
if s.get_name() == "video/x-raw" { if s.name() == "video/x-raw" {
let info = gst_video::VideoInfo::from_caps(caps) let info = gst_video::VideoInfo::from_caps(caps)
.map_err(|_| gst_loggable_error!(CAT, "Couldn't parse caps {}", caps))?; .map_err(|_| gst::loggable_error!(CAT, "Couldn't parse caps {}", caps))?;
state.video_info = Some(info); state.video_info = Some(info);
state.audio_info = None; state.audio_info = None;
} else { } else {
let info = gst_audio::AudioInfo::from_caps(caps) let info = gst_audio::AudioInfo::from_caps(caps)
.map_err(|_| gst_loggable_error!(CAT, "Couldn't parse caps {}", caps))?; .map_err(|_| gst::loggable_error!(CAT, "Couldn't parse caps {}", caps))?;
state.audio_info = Some(info); state.audio_info = Some(info);
state.video_info = None; state.video_info = None;
@ -237,7 +244,7 @@ impl BaseSinkImpl for NdiSink {
fn render( fn render(
&self, &self,
element: &gst_base::BaseSink, element: &Self::Type,
buffer: &gst::Buffer, buffer: &gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state_storage = self.state.lock().unwrap(); let mut state_storage = self.state.lock().unwrap();
@ -247,7 +254,7 @@ impl BaseSinkImpl for NdiSink {
}; };
if let Some(ref info) = state.video_info { if let Some(ref info) = state.video_info {
if let Some(audio_meta) = buffer.get_meta::<crate::ndisinkmeta::NdiSinkAudioMeta>() { if let Some(audio_meta) = buffer.meta::<crate::ndisinkmeta::NdiSinkAudioMeta>() {
for (buffer, info, timecode) in audio_meta.buffers() { for (buffer, info, timecode) in audio_meta.buffers() {
let frame = let frame =
crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, *timecode) crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, *timecode)
@ -262,9 +269,9 @@ impl BaseSinkImpl for NdiSink {
"Sending audio buffer {:?} with timecode {} and format {:?}", "Sending audio buffer {:?} with timecode {} and format {:?}",
buffer, buffer,
if *timecode < 0 { if *timecode < 0 {
gst::CLOCK_TIME_NONE gst::ClockTime::NONE.display()
} else { } else {
gst::ClockTime::from(*timecode as u64 * 100) Some(gst::ClockTime::from_nseconds(*timecode as u64 * 100)).display()
}, },
info, info,
); );
@ -273,15 +280,18 @@ impl BaseSinkImpl for NdiSink {
} }
// Skip empty/gap buffers from ndisinkcombiner // Skip empty/gap buffers from ndisinkcombiner
if buffer.get_size() != 0 { if buffer.size() != 0 {
let timecode = element let timecode = element
.get_segment() .segment()
.downcast::<gst::ClockTime>() .downcast::<gst::ClockTime>()
.ok() .ok()
.and_then(|segment| { .and_then(|segment| {
*(segment.to_running_time(buffer.get_pts()) + element.get_base_time()) segment
.to_running_time(buffer.pts())
.zip(element.base_time())
}) })
.map(|time| (time / 100) as i64) .and_then(|(running_time, base_time)| running_time.checked_add(base_time))
.map(|time| (time.nseconds() / 100) as i64)
.unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize); .unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize);
let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, info) let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, info)
@ -302,9 +312,9 @@ impl BaseSinkImpl for NdiSink {
"Sending video buffer {:?} with timecode {} and format {:?}", "Sending video buffer {:?} with timecode {} and format {:?}",
buffer, buffer,
if timecode < 0 { if timecode < 0 {
gst::CLOCK_TIME_NONE gst::ClockTime::NONE.display()
} else { } else {
gst::ClockTime::from(timecode as u64 * 100) Some(gst::ClockTime::from_nseconds(timecode as u64 * 100)).display()
}, },
info info
); );
@ -312,13 +322,16 @@ impl BaseSinkImpl for NdiSink {
} }
} else if let Some(ref info) = state.audio_info { } else if let Some(ref info) = state.audio_info {
let timecode = element let timecode = element
.get_segment() .segment()
.downcast::<gst::ClockTime>() .downcast::<gst::ClockTime>()
.ok() .ok()
.and_then(|segment| { .and_then(|segment| {
*(segment.to_running_time(buffer.get_pts()) + element.get_base_time()) segment
.to_running_time(buffer.pts())
.zip(element.base_time())
}) })
.map(|time| (time / 100) as i64) .and_then(|(running_time, base_time)| running_time.checked_add(base_time))
.map(|time| (time.nseconds() / 100) as i64)
.unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize); .unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize);
let frame = crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, timecode) let frame = crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, timecode)
@ -333,9 +346,9 @@ impl BaseSinkImpl for NdiSink {
"Sending audio buffer {:?} with timecode {} and format {:?}", "Sending audio buffer {:?} with timecode {} and format {:?}",
buffer, buffer,
if timecode < 0 { if timecode < 0 {
gst::CLOCK_TIME_NONE gst::ClockTime::NONE.display()
} else { } else {
gst::ClockTime::from(timecode as u64 * 100) Some(gst::ClockTime::from_nseconds(timecode as u64 * 100)).display()
}, },
info, info,
); );
@ -347,12 +360,3 @@ impl BaseSinkImpl for NdiSink {
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
} }
} }
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisink",
gst::Rank::None,
NdiSink::get_type(),
)
}

19
src/ndisink/mod.rs Normal file
View file

@ -0,0 +1,19 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiSink(ObjectSubclass<imp::NdiSink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
}
unsafe impl Send for NdiSink {}
unsafe impl Sync for NdiSink {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisink",
gst::Rank::None,
NdiSink::static_type(),
)
}

View file

@ -1,5 +1,4 @@
use glib::prelude::*; use glib::prelude::*;
use glib::subclass;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
@ -7,6 +6,8 @@ use gst::{gst_debug, gst_error, gst_trace, gst_warning};
use gst_base::prelude::*; use gst_base::prelude::*;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use once_cell::sync::Lazy;
use std::mem; use std::mem;
use std::sync::Mutex; use std::sync::Mutex;
@ -27,92 +28,20 @@ struct State {
current_audio_buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>, current_audio_buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>,
} }
struct NdiSinkCombiner { pub struct NdiSinkCombiner {
video_pad: gst_base::AggregatorPad, video_pad: gst_base::AggregatorPad,
audio_pad: Mutex<Option<gst_base::AggregatorPad>>, audio_pad: Mutex<Option<gst_base::AggregatorPad>>,
state: Mutex<Option<State>>, state: Mutex<Option<State>>,
} }
#[glib::object_subclass]
impl ObjectSubclass for NdiSinkCombiner { impl ObjectSubclass for NdiSinkCombiner {
const NAME: &'static str = "NdiSinkCombiner"; const NAME: &'static str = "NdiSinkCombiner";
type Type = super::NdiSinkCombiner;
type ParentType = gst_base::Aggregator; type ParentType = gst_base::Aggregator;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"NDI Sink Combiner",
"Combiner/Audio/Video",
"NDI sink audio/video combiner",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::builder("video/x-raw")
.field(
"format",
&gst::List::new(&[
&gst_video::VideoFormat::Uyvy.to_str(),
&gst_video::VideoFormat::I420.to_str(),
&gst_video::VideoFormat::Nv12.to_str(),
&gst_video::VideoFormat::Nv21.to_str(),
&gst_video::VideoFormat::Yv12.to_str(),
&gst_video::VideoFormat::Bgra.to_str(),
&gst_video::VideoFormat::Bgrx.to_str(),
&gst_video::VideoFormat::Rgba.to_str(),
&gst_video::VideoFormat::Rgbx.to_str(),
]),
)
.field("width", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("height", &gst::IntRange::<i32>::new(1, i32::MAX))
.field(
"framerate",
&gst::FractionRange::new(
gst::Fraction::new(1, i32::MAX),
gst::Fraction::new(i32::MAX, 1),
),
)
.build();
let src_pad_template = gst::PadTemplate::with_gtype(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
gst_base::AggregatorPad::static_type(),
)
.unwrap();
klass.add_pad_template(src_pad_template);
let sink_pad_template = gst::PadTemplate::with_gtype(
"video",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
gst_base::AggregatorPad::static_type(),
)
.unwrap();
klass.add_pad_template(sink_pad_template);
let caps = gst::Caps::builder("audio/x-raw")
.field("format", &gst_audio::AUDIO_FORMAT_S16.to_str())
.field("rate", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("channels", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("layout", &"interleaved")
.build();
let sink_pad_template = gst::PadTemplate::with_gtype(
"audio",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&caps,
gst_base::AggregatorPad::static_type(),
)
.unwrap();
klass.add_pad_template(sink_pad_template);
}
fn with_class(klass: &Self::Class) -> Self { fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("video").unwrap(); let templ = klass.pad_template("video").unwrap();
let video_pad = let video_pad =
gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("video")) gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("video"))
.build(); .build();
@ -126,18 +55,97 @@ impl ObjectSubclass for NdiSinkCombiner {
} }
impl ObjectImpl for NdiSinkCombiner { impl ObjectImpl for NdiSinkCombiner {
glib::glib_object_impl!(); fn constructed(&self, obj: &Self::Type) {
obj.add_pad(&self.video_pad).unwrap();
fn constructed(&self, obj: &glib::Object) {
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.video_pad).unwrap();
self.parent_constructed(obj); self.parent_constructed(obj);
} }
} }
impl ElementImpl for NdiSinkCombiner { impl ElementImpl for NdiSinkCombiner {
fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"NDI Sink Combiner",
"Combiner/Audio/Video",
"NDI sink audio/video combiner",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder("video/x-raw")
.field(
"format",
&gst::List::new(&[
&gst_video::VideoFormat::Uyvy.to_str(),
&gst_video::VideoFormat::I420.to_str(),
&gst_video::VideoFormat::Nv12.to_str(),
&gst_video::VideoFormat::Nv21.to_str(),
&gst_video::VideoFormat::Yv12.to_str(),
&gst_video::VideoFormat::Bgra.to_str(),
&gst_video::VideoFormat::Bgrx.to_str(),
&gst_video::VideoFormat::Rgba.to_str(),
&gst_video::VideoFormat::Rgbx.to_str(),
]),
)
.field("width", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("height", &gst::IntRange::<i32>::new(1, i32::MAX))
.field(
"framerate",
&gst::FractionRange::new(
gst::Fraction::new(1, i32::MAX),
gst::Fraction::new(i32::MAX, 1),
),
)
.build();
let src_pad_template = gst::PadTemplate::with_gtype(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
gst_base::AggregatorPad::static_type(),
)
.unwrap();
let video_sink_pad_template = gst::PadTemplate::with_gtype(
"video",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
gst_base::AggregatorPad::static_type(),
)
.unwrap();
let caps = gst::Caps::builder("audio/x-raw")
.field("format", &gst_audio::AUDIO_FORMAT_S16.to_str())
.field("rate", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("channels", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("layout", &"interleaved")
.build();
let audio_sink_pad_template = gst::PadTemplate::with_gtype(
"audio",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&caps,
gst_base::AggregatorPad::static_type(),
)
.unwrap();
vec![
src_pad_template,
video_sink_pad_template,
audio_sink_pad_template,
]
});
PAD_TEMPLATES.as_ref()
}
fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) {
let mut audio_pad_storage = self.audio_pad.lock().unwrap(); let mut audio_pad_storage = self.audio_pad.lock().unwrap();
if audio_pad_storage.as_ref().map(|p| p.upcast_ref()) == Some(pad) { if audio_pad_storage.as_ref().map(|p| p.upcast_ref()) == Some(pad) {
@ -151,7 +159,7 @@ impl ElementImpl for NdiSinkCombiner {
impl AggregatorImpl for NdiSinkCombiner { impl AggregatorImpl for NdiSinkCombiner {
fn create_new_pad( fn create_new_pad(
&self, &self,
agg: &gst_base::Aggregator, agg: &Self::Type,
templ: &gst::PadTemplate, templ: &gst::PadTemplate,
_req_name: Option<&str>, _req_name: Option<&str>,
_caps: Option<&gst::Caps>, _caps: Option<&gst::Caps>,
@ -163,7 +171,7 @@ impl AggregatorImpl for NdiSinkCombiner {
return None; return None;
} }
let sink_templ = agg.get_pad_template("audio").unwrap(); let sink_templ = agg.pad_template("audio").unwrap();
if templ != &sink_templ { if templ != &sink_templ {
gst_error!(CAT, obj: agg, "Wrong pad template"); gst_error!(CAT, obj: agg, "Wrong pad template");
return None; return None;
@ -178,7 +186,7 @@ impl AggregatorImpl for NdiSinkCombiner {
Some(pad) Some(pad)
} }
fn start(&self, agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> { fn start(&self, agg: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap(); let mut state_storage = self.state.lock().unwrap();
*state_storage = Some(State { *state_storage = Some(State {
audio_info: None, audio_info: None,
@ -192,7 +200,7 @@ impl AggregatorImpl for NdiSinkCombiner {
Ok(()) Ok(())
} }
fn stop(&self, agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> { fn stop(&self, agg: &Self::Type) -> Result<(), gst::ErrorMessage> {
// Drop our state now // Drop our state now
let _ = self.state.lock().unwrap().take(); let _ = self.state.lock().unwrap().take();
@ -201,18 +209,18 @@ impl AggregatorImpl for NdiSinkCombiner {
Ok(()) Ok(())
} }
fn get_next_time(&self, _agg: &gst_base::Aggregator) -> gst::ClockTime { fn next_time(&self, _agg: &Self::Type) -> Option<gst::ClockTime> {
// FIXME: What to do here? We don't really know when the next buffer is expected // FIXME: What to do here? We don't really know when the next buffer is expected
gst::CLOCK_TIME_NONE gst::ClockTime::NONE
} }
fn clip( fn clip(
&self, &self,
agg: &gst_base::Aggregator, agg: &Self::Type,
agg_pad: &gst_base::AggregatorPad, agg_pad: &gst_base::AggregatorPad,
mut buffer: gst::Buffer, mut buffer: gst::Buffer,
) -> Option<gst::Buffer> { ) -> Option<gst::Buffer> {
let segment = match agg_pad.get_segment().downcast::<gst::ClockTime>() { let segment = match agg_pad.segment().downcast::<gst::ClockTime>() {
Ok(segment) => segment, Ok(segment) => segment,
Err(_) => { Err(_) => {
gst_error!(CAT, obj: agg, "Only TIME segments supported"); gst_error!(CAT, obj: agg, "Only TIME segments supported");
@ -220,25 +228,21 @@ impl AggregatorImpl for NdiSinkCombiner {
} }
}; };
let pts = buffer.get_pts(); let pts = buffer.pts();
if pts.is_none() { if pts.is_none() {
gst_error!(CAT, obj: agg, "Only buffers with PTS supported"); gst_error!(CAT, obj: agg, "Only buffers with PTS supported");
return Some(buffer); return Some(buffer);
} }
let duration = if buffer.get_duration().is_some() { let duration = buffer.duration();
buffer.get_duration()
} else {
gst::CLOCK_TIME_NONE
};
gst_trace!( gst_trace!(
CAT, CAT,
obj: agg_pad, obj: agg_pad,
"Clipping buffer {:?} with PTS {} and duration {}", "Clipping buffer {:?} with PTS {} and duration {}",
buffer, buffer,
pts, pts.display(),
duration duration.display(),
); );
let state_storage = self.state.lock().unwrap(); let state_storage = self.state.lock().unwrap();
@ -247,25 +251,21 @@ impl AggregatorImpl for NdiSinkCombiner {
None => return None, None => return None,
}; };
let duration = if buffer.get_duration().is_some() { let duration = if duration.is_some() {
buffer.get_duration() duration
} else if let Some(ref audio_info) = state.audio_info { } else if let Some(ref audio_info) = state.audio_info {
gst::SECOND gst::ClockTime::SECOND.mul_div_floor(
.mul_div_floor( buffer.size() as u64,
buffer.get_size() as u64, audio_info.rate() as u64 * audio_info.bpf() as u64,
audio_info.rate() as u64 * audio_info.bpf() as u64, )
)
.unwrap()
} else if let Some(ref video_info) = state.video_info { } else if let Some(ref video_info) = state.video_info {
if *video_info.fps().numer() > 0 { if *video_info.fps().numer() > 0 {
gst::SECOND gst::ClockTime::SECOND.mul_div_floor(
.mul_div_floor( *video_info.fps().denom() as u64,
*video_info.fps().denom() as u64, *video_info.fps().numer() as u64,
*video_info.fps().numer() as u64, )
)
.unwrap()
} else { } else {
gst::CLOCK_TIME_NONE gst::ClockTime::NONE
} }
} else { } else {
unreachable!() unreachable!()
@ -276,18 +276,23 @@ impl AggregatorImpl for NdiSinkCombiner {
obj: agg_pad, obj: agg_pad,
"Clipping buffer {:?} with PTS {} and duration {}", "Clipping buffer {:?} with PTS {} and duration {}",
buffer, buffer,
pts, pts.display(),
duration duration.display(),
); );
if agg_pad == &self.video_pad { if agg_pad == &self.video_pad {
segment.clip(pts, pts + duration).map(|(start, stop)| { let end_pts = pts
.zip(duration)
.and_then(|(pts, duration)| pts.checked_add(duration));
segment.clip(pts, end_pts).map(|(start, stop)| {
{ {
let buffer = buffer.make_mut(); let buffer = buffer.make_mut();
buffer.set_pts(start); buffer.set_pts(start);
if duration.is_some() { buffer.set_duration(
buffer.set_duration(stop - start); stop.zip(start)
} .and_then(|(stop, start)| stop.checked_sub(start)),
);
} }
buffer buffer
@ -307,7 +312,7 @@ impl AggregatorImpl for NdiSinkCombiner {
fn aggregate( fn aggregate(
&self, &self,
agg: &gst_base::Aggregator, agg: &Self::Type,
timeout: bool, timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
// FIXME: Can't really happen because we always return NONE from get_next_time() but that // FIXME: Can't really happen because we always return NONE from get_next_time() but that
@ -318,7 +323,7 @@ impl AggregatorImpl for NdiSinkCombiner {
// first try getting buffers from both pads here // first try getting buffers from both pads here
let video_buffer_and_segment = match self.video_pad.peek_buffer() { let video_buffer_and_segment = match self.video_pad.peek_buffer() {
Some(video_buffer) => { Some(video_buffer) => {
let video_segment = self.video_pad.get_segment(); let video_segment = self.video_pad.segment();
let video_segment = match video_segment.downcast::<gst::ClockTime>() { let video_segment = match video_segment.downcast::<gst::ClockTime>() {
Ok(video_segment) => video_segment, Ok(video_segment) => video_segment,
Err(video_segment) => { Err(video_segment) => {
@ -326,7 +331,7 @@ impl AggregatorImpl for NdiSinkCombiner {
CAT, CAT,
obj: agg, obj: agg,
"Video segment of wrong format {:?}", "Video segment of wrong format {:?}",
video_segment.get_format() video_segment.format()
); );
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
} }
@ -344,14 +349,14 @@ impl AggregatorImpl for NdiSinkCombiner {
let audio_buffer_segment_and_pad; let audio_buffer_segment_and_pad;
if let Some(audio_pad) = self.audio_pad.lock().unwrap().clone() { if let Some(audio_pad) = self.audio_pad.lock().unwrap().clone() {
audio_buffer_segment_and_pad = match audio_pad.peek_buffer() { audio_buffer_segment_and_pad = match audio_pad.peek_buffer() {
Some(audio_buffer) if audio_buffer.get_size() == 0 => { Some(audio_buffer) if audio_buffer.size() == 0 => {
// Skip empty/gap audio buffer // Skip empty/gap audio buffer
audio_pad.drop_buffer(); audio_pad.drop_buffer();
gst_trace!(CAT, obj: agg, "Empty audio buffer, waiting for next"); gst_trace!(CAT, obj: agg, "Empty audio buffer, waiting for next");
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
} }
Some(audio_buffer) => { Some(audio_buffer) => {
let audio_segment = audio_pad.get_segment(); let audio_segment = audio_pad.segment();
let audio_segment = match audio_segment.downcast::<gst::ClockTime>() { let audio_segment = match audio_segment.downcast::<gst::ClockTime>() {
Ok(audio_segment) => audio_segment, Ok(audio_segment) => audio_segment,
Err(audio_segment) => { Err(audio_segment) => {
@ -359,7 +364,7 @@ impl AggregatorImpl for NdiSinkCombiner {
CAT, CAT,
obj: agg, obj: agg,
"Audio segment of wrong format {:?}", "Audio segment of wrong format {:?}",
audio_segment.get_format() audio_segment.format()
); );
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
} }
@ -385,8 +390,7 @@ impl AggregatorImpl for NdiSinkCombiner {
let (mut current_video_buffer, current_video_running_time_end, next_video_buffer) = let (mut current_video_buffer, current_video_running_time_end, next_video_buffer) =
if let Some((video_buffer, video_segment)) = video_buffer_and_segment { if let Some((video_buffer, video_segment)) = video_buffer_and_segment {
let video_running_time = video_segment.to_running_time(video_buffer.get_pts()); let video_running_time = video_segment.to_running_time(video_buffer.pts()).unwrap();
assert!(video_running_time.is_some());
match state.current_video_buffer { match state.current_video_buffer {
None => { None => {
@ -398,7 +402,7 @@ impl AggregatorImpl for NdiSinkCombiner {
} }
Some((ref buffer, _)) => ( Some((ref buffer, _)) => (
buffer.clone(), buffer.clone(),
video_running_time, Some(video_running_time),
Some((video_buffer, video_running_time)), Some((video_buffer, video_running_time)),
), ),
} }
@ -416,10 +420,9 @@ impl AggregatorImpl for NdiSinkCombiner {
// Create an empty dummy buffer for attaching the audio. This is going to // Create an empty dummy buffer for attaching the audio. This is going to
// be dropped by the sink later. // be dropped by the sink later.
let audio_running_time = let audio_running_time =
audio_segment.to_running_time(audio_buffer.get_pts()); audio_segment.to_running_time(audio_buffer.pts()).unwrap();
assert!(audio_running_time.is_some());
let video_segment = self.video_pad.get_segment(); let video_segment = self.video_pad.segment();
let video_segment = match video_segment.downcast::<gst::ClockTime>() { let video_segment = match video_segment.downcast::<gst::ClockTime>() {
Ok(video_segment) => video_segment, Ok(video_segment) => video_segment,
Err(video_segment) => { Err(video_segment) => {
@ -427,7 +430,7 @@ impl AggregatorImpl for NdiSinkCombiner {
CAT, CAT,
obj: agg, obj: agg,
"Video segment of wrong format {:?}", "Video segment of wrong format {:?}",
video_segment.get_format() video_segment.format()
); );
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
} }
@ -445,9 +448,9 @@ impl AggregatorImpl for NdiSinkCombiner {
buffer.set_pts(video_pts); buffer.set_pts(video_pts);
} }
(buffer, gst::CLOCK_TIME_NONE, None) (buffer, gst::ClockTime::NONE, None)
} }
(Some((ref buffer, _)), _) => (buffer.clone(), gst::CLOCK_TIME_NONE, None), (Some((ref buffer, _)), _) => (buffer.clone(), gst::ClockTime::NONE, None),
} }
}; };
@ -460,22 +463,26 @@ impl AggregatorImpl for NdiSinkCombiner {
} }
}; };
let audio_running_time = audio_segment.to_running_time(audio_buffer.get_pts()); let audio_running_time = audio_segment.to_running_time(audio_buffer.pts());
assert!(audio_running_time.is_some()); let duration = gst::ClockTime::SECOND.mul_div_floor(
let duration = gst::SECOND audio_buffer.size() as u64 / audio_info.bpf() as u64,
.mul_div_floor( audio_info.rate() as u64,
audio_buffer.get_size() as u64 / audio_info.bpf() as u64, );
audio_info.rate() as u64, let audio_running_time_end = audio_running_time
) .zip(duration)
.unwrap_or(gst::CLOCK_TIME_NONE); .and_then(|(running_time, duration)| running_time.checked_add(duration));
let audio_running_time_end = audio_running_time + duration;
assert!(audio_running_time_end.is_some());
if audio_running_time_end <= current_video_running_time_end if audio_running_time_end
|| current_video_running_time_end.is_none() .zip(current_video_running_time_end)
.map(|(audio, video)| audio <= video)
.unwrap_or(true)
{ {
let timecode = (audio_running_time + agg.get_base_time()) let timecode = agg
.map(|t| (t / 100) as i64) .base_time()
.zip(audio_running_time)
.map(|(base_time, audio_running_time)| {
((base_time.nseconds() + audio_running_time.nseconds()) / 100) as i64
})
.unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize); .unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize);
gst_trace!( gst_trace!(
@ -484,8 +491,8 @@ impl AggregatorImpl for NdiSinkCombiner {
"Including audio buffer {:?} with timecode {}: {} <= {}", "Including audio buffer {:?} with timecode {}: {} <= {}",
audio_buffer, audio_buffer,
timecode, timecode,
audio_running_time_end, audio_running_time_end.display(),
current_video_running_time_end, current_video_running_time_end.display(),
); );
state state
.current_audio_buffers .current_audio_buffers
@ -503,7 +510,7 @@ impl AggregatorImpl for NdiSinkCombiner {
// far // far
} }
let audio_buffers = mem::replace(&mut state.current_audio_buffers, Vec::new()); let audio_buffers = mem::take(&mut state.current_audio_buffers);
if !audio_buffers.is_empty() { if !audio_buffers.is_empty() {
let current_video_buffer = current_video_buffer.make_mut(); let current_video_buffer = current_video_buffer.make_mut();
@ -530,7 +537,7 @@ impl AggregatorImpl for NdiSinkCombiner {
fn sink_event( fn sink_event(
&self, &self,
agg: &gst_base::Aggregator, agg: &Self::Type,
pad: &gst_base::AggregatorPad, pad: &gst_base::AggregatorPad,
event: gst::Event, event: gst::Event,
) -> bool { ) -> bool {
@ -538,7 +545,7 @@ impl AggregatorImpl for NdiSinkCombiner {
match event.view() { match event.view() {
EventView::Caps(caps) => { EventView::Caps(caps) => {
let caps = caps.get_caps_owned(); let caps = caps.caps_owned();
let mut state_storage = self.state.lock().unwrap(); let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage { let state = match &mut *state_storage {
@ -558,22 +565,22 @@ impl AggregatorImpl for NdiSinkCombiner {
// 2 frames latency because we queue 1 frame and wait until audio // 2 frames latency because we queue 1 frame and wait until audio
// up to the end of that frame has arrived. // up to the end of that frame has arrived.
let latency = if *info.fps().numer() > 0 { let latency = if *info.fps().numer() > 0 {
gst::SECOND gst::ClockTime::SECOND
.mul_div_floor( .mul_div_floor(
2 * *info.fps().denom() as u64, 2 * *info.fps().denom() as u64,
*info.fps().numer() as u64, *info.fps().numer() as u64,
) )
.unwrap_or(80 * gst::MSECOND) .unwrap_or(80 * gst::ClockTime::MSECOND)
} else { } else {
// let's assume 25fps and 2 frames latency // let's assume 25fps and 2 frames latency
80 * gst::MSECOND 80 * gst::ClockTime::MSECOND
}; };
state.video_info = Some(info); state.video_info = Some(info);
drop(state_storage); drop(state_storage);
agg.set_latency(latency, gst::CLOCK_TIME_NONE); agg.set_latency(latency, gst::ClockTime::NONE);
// The video caps are passed through as the audio is included only in a meta // The video caps are passed through as the audio is included only in a meta
agg.set_src_caps(&caps); agg.set_src_caps(&caps);
@ -591,7 +598,7 @@ impl AggregatorImpl for NdiSinkCombiner {
} }
// The video segment is passed through as-is and the video timestamps are preserved // The video segment is passed through as-is and the video timestamps are preserved
EventView::Segment(segment) if pad == &self.video_pad => { EventView::Segment(segment) if pad == &self.video_pad => {
let segment = segment.get_segment(); let segment = segment.segment();
gst_debug!(CAT, obj: agg, "Updating segment {:?}", segment); gst_debug!(CAT, obj: agg, "Updating segment {:?}", segment);
agg.update_segment(segment); agg.update_segment(segment);
} }
@ -603,7 +610,7 @@ impl AggregatorImpl for NdiSinkCombiner {
fn sink_query( fn sink_query(
&self, &self,
agg: &gst_base::Aggregator, agg: &Self::Type,
pad: &gst_base::AggregatorPad, pad: &gst_base::AggregatorPad,
query: &mut gst::QueryRef, query: &mut gst::QueryRef,
) -> bool { ) -> bool {
@ -612,7 +619,7 @@ impl AggregatorImpl for NdiSinkCombiner {
match query.view_mut() { match query.view_mut() {
QueryView::Caps(_) if pad == &self.video_pad => { QueryView::Caps(_) if pad == &self.video_pad => {
// Directly forward caps queries // Directly forward caps queries
let srcpad = agg.get_static_pad("src").unwrap(); let srcpad = agg.static_pad("src").unwrap();
return srcpad.peer_query(query); return srcpad.peer_query(query);
} }
_ => (), _ => (),
@ -621,17 +628,8 @@ impl AggregatorImpl for NdiSinkCombiner {
self.parent_sink_query(agg, pad, query) self.parent_sink_query(agg, pad, query)
} }
fn negotiate(&self, _agg: &gst_base::Aggregator) -> bool { fn negotiate(&self, _agg: &Self::Type) -> bool {
// No negotiation needed as the video caps are just passed through // No negotiation needed as the video caps are just passed through
true true
} }
} }
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisinkcombiner",
gst::Rank::None,
NdiSinkCombiner::get_type(),
)
}

View file

@ -0,0 +1,19 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiSinkCombiner(ObjectSubclass<imp::NdiSinkCombiner>) @extends gst_base::Aggregator, gst::Element, gst::Object;
}
unsafe impl Send for NdiSinkCombiner {}
unsafe impl Sync for NdiSinkCombiner {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisinkcombiner",
gst::Rank::None,
NdiSinkCombiner::static_type(),
)
}

View file

@ -1,4 +1,3 @@
use gst::gst_sys;
use gst::prelude::*; use gst::prelude::*;
use std::fmt; use std::fmt;
use std::mem; use std::mem;
@ -19,10 +18,10 @@ impl NdiSinkAudioMeta {
// content of the struct // content of the struct
let mut params = mem::ManuallyDrop::new(imp::NdiSinkAudioMetaParams { buffers }); let mut params = mem::ManuallyDrop::new(imp::NdiSinkAudioMetaParams { buffers });
let meta = gst_sys::gst_buffer_add_meta( let meta = gst::ffi::gst_buffer_add_meta(
buffer.as_mut_ptr(), buffer.as_mut_ptr(),
imp::ndi_sink_audio_meta_get_info(), imp::ndi_sink_audio_meta_get_info(),
&mut *params as *mut imp::NdiSinkAudioMetaParams as glib::glib_sys::gpointer, &mut *params as *mut imp::NdiSinkAudioMetaParams as glib::ffi::gpointer,
) as *mut imp::NdiSinkAudioMeta; ) as *mut imp::NdiSinkAudioMeta;
Self::from_mut_ptr(buffer, meta) Self::from_mut_ptr(buffer, meta)
@ -37,7 +36,7 @@ impl NdiSinkAudioMeta {
unsafe impl MetaAPI for NdiSinkAudioMeta { unsafe impl MetaAPI for NdiSinkAudioMeta {
type GstType = imp::NdiSinkAudioMeta; type GstType = imp::NdiSinkAudioMeta;
fn get_meta_api() -> glib::Type { fn meta_api() -> glib::Type {
imp::ndi_sink_audio_meta_api_get_type() imp::ndi_sink_audio_meta_api_get_type()
} }
} }
@ -51,9 +50,7 @@ impl fmt::Debug for NdiSinkAudioMeta {
} }
mod imp { mod imp {
use glib::glib_sys;
use glib::translate::*; use glib::translate::*;
use gst::gst_sys;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::mem; use std::mem;
use std::ptr; use std::ptr;
@ -64,18 +61,18 @@ mod imp {
#[repr(C)] #[repr(C)]
pub struct NdiSinkAudioMeta { pub struct NdiSinkAudioMeta {
parent: gst_sys::GstMeta, parent: gst::ffi::GstMeta,
pub(super) buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>, pub(super) buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>,
} }
pub(super) fn ndi_sink_audio_meta_api_get_type() -> glib::Type { pub(super) fn ndi_sink_audio_meta_api_get_type() -> glib::Type {
static TYPE: Lazy<glib::Type> = Lazy::new(|| unsafe { static TYPE: Lazy<glib::Type> = Lazy::new(|| unsafe {
let t = from_glib(gst_sys::gst_meta_api_type_register( let t = from_glib(gst::ffi::gst_meta_api_type_register(
b"GstNdiSinkAudioMetaAPI\0".as_ptr() as *const _, b"GstNdiSinkAudioMetaAPI\0".as_ptr() as *const _,
[ptr::null::<std::os::raw::c_char>()].as_ptr() as *mut *const _, [ptr::null::<std::os::raw::c_char>()].as_ptr() as *mut *const _,
)); ));
assert_ne!(t, glib::Type::Invalid); assert_ne!(t, glib::Type::INVALID);
t t
}); });
@ -84,10 +81,10 @@ mod imp {
} }
unsafe extern "C" fn ndi_sink_audio_meta_init( unsafe extern "C" fn ndi_sink_audio_meta_init(
meta: *mut gst_sys::GstMeta, meta: *mut gst::ffi::GstMeta,
params: glib_sys::gpointer, params: glib::ffi::gpointer,
_buffer: *mut gst_sys::GstBuffer, _buffer: *mut gst::ffi::GstBuffer,
) -> glib_sys::gboolean { ) -> glib::ffi::gboolean {
assert!(!params.is_null()); assert!(!params.is_null());
let meta = &mut *(meta as *mut NdiSinkAudioMeta); let meta = &mut *(meta as *mut NdiSinkAudioMeta);
@ -95,12 +92,12 @@ mod imp {
ptr::write(&mut meta.buffers, params.buffers); ptr::write(&mut meta.buffers, params.buffers);
true.to_glib() true.into_glib()
} }
unsafe extern "C" fn ndi_sink_audio_meta_free( unsafe extern "C" fn ndi_sink_audio_meta_free(
meta: *mut gst_sys::GstMeta, meta: *mut gst::ffi::GstMeta,
_buffer: *mut gst_sys::GstBuffer, _buffer: *mut gst::ffi::GstBuffer,
) { ) {
let meta = &mut *(meta as *mut NdiSinkAudioMeta); let meta = &mut *(meta as *mut NdiSinkAudioMeta);
@ -108,34 +105,34 @@ mod imp {
} }
unsafe extern "C" fn ndi_sink_audio_meta_transform( unsafe extern "C" fn ndi_sink_audio_meta_transform(
dest: *mut gst_sys::GstBuffer, dest: *mut gst::ffi::GstBuffer,
meta: *mut gst_sys::GstMeta, meta: *mut gst::ffi::GstMeta,
_buffer: *mut gst_sys::GstBuffer, _buffer: *mut gst::ffi::GstBuffer,
_type_: glib_sys::GQuark, _type_: glib::ffi::GQuark,
_data: glib_sys::gpointer, _data: glib::ffi::gpointer,
) -> glib_sys::gboolean { ) -> glib::ffi::gboolean {
let meta = &*(meta as *mut NdiSinkAudioMeta); let meta = &*(meta as *mut NdiSinkAudioMeta);
super::NdiSinkAudioMeta::add(gst::BufferRef::from_mut_ptr(dest), meta.buffers.clone()); super::NdiSinkAudioMeta::add(gst::BufferRef::from_mut_ptr(dest), meta.buffers.clone());
true.to_glib() true.into_glib()
} }
pub(super) fn ndi_sink_audio_meta_get_info() -> *const gst_sys::GstMetaInfo { pub(super) fn ndi_sink_audio_meta_get_info() -> *const gst::ffi::GstMetaInfo {
struct MetaInfo(ptr::NonNull<gst_sys::GstMetaInfo>); struct MetaInfo(ptr::NonNull<gst::ffi::GstMetaInfo>);
unsafe impl Send for MetaInfo {} unsafe impl Send for MetaInfo {}
unsafe impl Sync for MetaInfo {} unsafe impl Sync for MetaInfo {}
static META_INFO: Lazy<MetaInfo> = Lazy::new(|| unsafe { static META_INFO: Lazy<MetaInfo> = Lazy::new(|| unsafe {
MetaInfo( MetaInfo(
ptr::NonNull::new(gst_sys::gst_meta_register( ptr::NonNull::new(gst::ffi::gst_meta_register(
ndi_sink_audio_meta_api_get_type().to_glib(), ndi_sink_audio_meta_api_get_type().into_glib(),
b"GstNdiSinkAudioMeta\0".as_ptr() as *const _, b"GstNdiSinkAudioMeta\0".as_ptr() as *const _,
mem::size_of::<NdiSinkAudioMeta>(), mem::size_of::<NdiSinkAudioMeta>(),
Some(ndi_sink_audio_meta_init), Some(ndi_sink_audio_meta_init),
Some(ndi_sink_audio_meta_free), Some(ndi_sink_audio_meta_free),
Some(ndi_sink_audio_meta_transform), Some(ndi_sink_audio_meta_transform),
) as *mut gst_sys::GstMetaInfo) ) as *mut gst::ffi::GstMetaInfo)
.expect("Failed to register meta API"), .expect("Failed to register meta API"),
) )
}); });

View file

@ -1,7 +1,6 @@
use glib::subclass;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg}; use gst::{gst_debug, gst_error};
use gst_base::prelude::*; use gst_base::prelude::*;
use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
@ -9,6 +8,8 @@ use gst_base::subclass::prelude::*;
use std::sync::Mutex; use std::sync::Mutex;
use std::{i32, u32}; use std::{i32, u32};
use once_cell::sync::Lazy;
use crate::ndisys; use crate::ndisys;
use crate::connect_ndi; use crate::connect_ndi;
@ -47,93 +48,9 @@ impl Default for Settings {
} }
} }
static PROPERTIES: [subclass::Property; 8] = [
subclass::Property("ndi-name", |name| {
glib::ParamSpec::string(
name,
"NDI Name",
"NDI stream name of the sender",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("url-address", |name| {
glib::ParamSpec::string(
name,
"URL/Address",
"URL/address and port of the sender, e.g. 127.0.0.1:5961",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("receiver-ndi-name", |name| {
glib::ParamSpec::string(
name,
"Receiver NDI Name",
"NDI stream name of this receiver",
Some(&*DEFAULT_RECEIVER_NDI_NAME),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("connect-timeout", |name| {
glib::ParamSpec::uint(
name,
"Connect Timeout",
"Connection timeout in ms",
0,
u32::MAX,
10000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timeout", |name| {
glib::ParamSpec::uint(
name,
"Timeout",
"Receive timeout in ms",
0,
u32::MAX,
5000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("max-queue-length", |name| {
glib::ParamSpec::uint(
name,
"Max Queue Length",
"Maximum receive queue length",
0,
u32::MAX,
5,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("bandwidth", |name| {
glib::ParamSpec::int(
name,
"Bandwidth",
"Bandwidth, -10 metadata-only, 10 audio-only, 100 highest",
-10,
100,
100,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timestamp-mode", |name| {
glib::ParamSpec::enum_(
name,
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTimeTimecode as i32,
glib::ParamFlags::READWRITE,
)
}),
];
struct State { struct State {
info: Option<gst_video::VideoInfo>, info: Option<gst_video::VideoInfo>,
current_latency: gst::ClockTime, current_latency: Option<gst::ClockTime>,
receiver: Option<Receiver<VideoReceiver>>, receiver: Option<Receiver<VideoReceiver>>,
} }
@ -141,26 +58,24 @@ impl Default for State {
fn default() -> State { fn default() -> State {
State { State {
info: None, info: None,
current_latency: gst::CLOCK_TIME_NONE, current_latency: gst::ClockTime::NONE,
receiver: None, receiver: None,
} }
} }
} }
pub(crate) struct NdiVideoSrc { pub struct NdiVideoSrc {
cat: gst::DebugCategory, cat: gst::DebugCategory,
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
receiver_controller: Mutex<Option<ReceiverControlHandle<VideoReceiver>>>, receiver_controller: Mutex<Option<ReceiverControlHandle<VideoReceiver>>>,
} }
#[glib::object_subclass]
impl ObjectSubclass for NdiVideoSrc { impl ObjectSubclass for NdiVideoSrc {
const NAME: &'static str = "NdiVideoSrc"; const NAME: &'static str = "NdiVideoSrc";
type Type = super::NdiVideoSrc;
type ParentType = gst_base::BaseSrc; type ParentType = gst_base::BaseSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn new() -> Self { fn new() -> Self {
Self { Self {
@ -174,123 +89,130 @@ impl ObjectSubclass for NdiVideoSrc {
receiver_controller: Mutex::new(None), receiver_controller: Mutex::new(None),
} }
} }
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"NewTek NDI Video Source",
"Source",
"NewTek NDI video source",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
);
// On the src pad, we can produce F32/F64 with any sample rate
// and any number of channels
let caps = gst::Caps::new_simple(
"video/x-raw",
&[
(
"format",
&gst::List::new(&[
&gst_video::VideoFormat::Uyvy.to_string(),
&gst_video::VideoFormat::Yv12.to_string(),
&gst_video::VideoFormat::Nv12.to_string(),
&gst_video::VideoFormat::I420.to_string(),
&gst_video::VideoFormat::Bgra.to_string(),
&gst_video::VideoFormat::Bgrx.to_string(),
&gst_video::VideoFormat::Rgba.to_string(),
&gst_video::VideoFormat::Rgbx.to_string(),
]),
),
("width", &gst::IntRange::<i32>::new(0, i32::MAX)),
("height", &gst::IntRange::<i32>::new(0, i32::MAX)),
(
"framerate",
&gst::FractionRange::new(
gst::Fraction::new(0, 1),
gst::Fraction::new(i32::MAX, 1),
),
),
],
);
#[cfg(feature = "interlaced-fields")]
let caps = {
let mut tmp = caps.copy();
{
let tmp = tmp.get_mut().unwrap();
tmp.set_features_simple(Some(gst::CapsFeatures::new(&["format:Interlaced"])));
}
let mut caps = caps;
{
let caps = caps.get_mut().unwrap();
caps.append(tmp);
}
caps
};
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
} }
impl ObjectImpl for NdiVideoSrc { impl ObjectImpl for NdiVideoSrc {
glib::glib_object_impl!(); fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpec::new_string(
"ndi-name",
"NDI Name",
"NDI stream name of the sender",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"url-address",
"URL/Address",
"URL/address and port of the sender, e.g. 127.0.0.1:5961",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"receiver-ndi-name",
"Receiver NDI Name",
"NDI stream name of this receiver",
Some(&*DEFAULT_RECEIVER_NDI_NAME),
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"connect-timeout",
"Connect Timeout",
"Connection timeout in ms",
0,
u32::MAX,
10000,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"timeout",
"Timeout",
"Receive timeout in ms",
0,
u32::MAX,
5000,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"max-queue-length",
"Max Queue Length",
"Maximum receive queue length",
0,
u32::MAX,
5,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_int(
"bandwidth",
"Bandwidth",
"Bandwidth, -10 metadata-only, 10 audio-only, 100 highest",
-10,
100,
100,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_enum(
"timestamp-mode",
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTimeTimecode as i32,
glib::ParamFlags::READWRITE,
),
]
});
fn constructed(&self, obj: &glib::Object) { PROPERTIES.as_ref()
self.parent_constructed(obj);
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
// Initialize live-ness and notify the base class that
// we'd like to operate in Time format
basesrc.set_live(true);
basesrc.set_format(gst::Format::Time);
} }
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { fn constructed(&self, obj: &Self::Type) {
let prop = &PROPERTIES[id]; self.parent_constructed(obj);
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
match *prop { // Initialize live-ness and notify the base class that
subclass::Property("ndi-name", ..) => { // we'd like to operate in Time format
obj.set_live(true);
obj.set_format(gst::Format::Time);
}
fn set_property(
&self,
obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() {
"ndi-name" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let ndi_name = value.get().unwrap(); let ndi_name = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing ndi-name from {:?} to {:?}", "Changing ndi-name from {:?} to {:?}",
settings.ndi_name, settings.ndi_name,
ndi_name, ndi_name,
); );
settings.ndi_name = ndi_name; settings.ndi_name = ndi_name;
} }
subclass::Property("url-address", ..) => { "url-address" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let url_address = value.get().unwrap(); let url_address = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing url-address from {:?} to {:?}", "Changing url-address from {:?} to {:?}",
settings.url_address, settings.url_address,
url_address, url_address,
); );
settings.url_address = url_address; settings.url_address = url_address;
} }
subclass::Property("receiver-ndi-name", ..) => { "receiver-ndi-name" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let receiver_ndi_name = value.get().unwrap(); let receiver_ndi_name = value.get::<Option<String>>().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing receiver-ndi-name from {:?} to {:?}", "Changing receiver-ndi-name from {:?} to {:?}",
settings.receiver_ndi_name, settings.receiver_ndi_name,
receiver_ndi_name, receiver_ndi_name,
@ -298,67 +220,66 @@ impl ObjectImpl for NdiVideoSrc {
settings.receiver_ndi_name = settings.receiver_ndi_name =
receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone()); receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone());
} }
subclass::Property("connect-timeout", ..) => { "connect-timeout" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let connect_timeout = value.get_some().unwrap(); let connect_timeout = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing connect-timeout from {} to {}", "Changing connect-timeout from {} to {}",
settings.connect_timeout, settings.connect_timeout,
connect_timeout, connect_timeout,
); );
settings.connect_timeout = connect_timeout; settings.connect_timeout = connect_timeout;
} }
subclass::Property("timeout", ..) => { "timeout" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let timeout = value.get_some().unwrap(); let timeout = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing timeout from {} to {}", "Changing timeout from {} to {}",
settings.timeout, settings.timeout,
timeout, timeout,
); );
settings.timeout = timeout; settings.timeout = timeout;
} }
subclass::Property("max-queue-length", ..) => { "max-queue-length" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let max_queue_length = value.get_some().unwrap(); let max_queue_length = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing max-queue-length from {} to {}", "Changing max-queue-length from {} to {}",
settings.max_queue_length, settings.max_queue_length,
max_queue_length, max_queue_length,
); );
settings.max_queue_length = max_queue_length; settings.max_queue_length = max_queue_length;
} }
subclass::Property("bandwidth", ..) => { "bandwidth" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get_some().unwrap(); let bandwidth = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing bandwidth from {} to {}", "Changing bandwidth from {} to {}",
settings.bandwidth, settings.bandwidth,
bandwidth, bandwidth,
); );
settings.bandwidth = bandwidth; settings.bandwidth = bandwidth;
} }
subclass::Property("timestamp-mode", ..) => { "timestamp-mode" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let timestamp_mode = value.get_some().unwrap(); let timestamp_mode = value.get().unwrap();
gst_debug!( gst_debug!(
self.cat, self.cat,
obj: basesrc, obj: obj,
"Changing timestamp mode from {:?} to {:?}", "Changing timestamp mode from {:?} to {:?}",
settings.timestamp_mode, settings.timestamp_mode,
timestamp_mode timestamp_mode
); );
if settings.timestamp_mode != timestamp_mode { if settings.timestamp_mode != timestamp_mode {
let _ = let _ = obj.post_message(gst::message::Latency::builder().src(obj).build());
basesrc.post_message(gst::message::Latency::builder().src(basesrc).build());
} }
settings.timestamp_mode = timestamp_mode; settings.timestamp_mode = timestamp_mode;
} }
@ -366,41 +287,39 @@ impl ObjectImpl for NdiVideoSrc {
} }
} }
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> { fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let prop = &PROPERTIES[id]; match pspec.name() {
"ndi-name" => {
match *prop {
subclass::Property("ndi-name", ..) => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.ndi_name.to_value()) settings.ndi_name.to_value()
} }
subclass::Property("url-address", ..) => { "url-address" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.url_address.to_value()) settings.url_address.to_value()
} }
subclass::Property("receiver-ndi-name", ..) => { "receiver-ndi-name" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.receiver_ndi_name.to_value()) settings.receiver_ndi_name.to_value()
} }
subclass::Property("connect-timeout", ..) => { "connect-timeout" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.connect_timeout.to_value()) settings.connect_timeout.to_value()
} }
subclass::Property("timeout", ..) => { "timeout" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.timeout.to_value()) settings.timeout.to_value()
} }
subclass::Property("max-queue-length", ..) => { "max-queue-length" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.max_queue_length.to_value()) settings.max_queue_length.to_value()
} }
subclass::Property("bandwidth", ..) => { "bandwidth" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.bandwidth.to_value()) settings.bandwidth.to_value()
} }
subclass::Property("timestamp-mode", ..) => { "timestamp-mode" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
Ok(settings.timestamp_mode.to_value()) settings.timestamp_mode.to_value()
} }
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -408,9 +327,85 @@ impl ObjectImpl for NdiVideoSrc {
} }
impl ElementImpl for NdiVideoSrc { impl ElementImpl for NdiVideoSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"NewTek NDI Video Source",
"Source",
"NewTek NDI video source",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
// On the src pad, we can produce F32/F64 with any sample rate
// and any number of channels
let caps = gst::Caps::new_simple(
"video/x-raw",
&[
(
"format",
&gst::List::new(&[
&gst_video::VideoFormat::Uyvy.to_string(),
&gst_video::VideoFormat::Yv12.to_string(),
&gst_video::VideoFormat::Nv12.to_string(),
&gst_video::VideoFormat::I420.to_string(),
&gst_video::VideoFormat::Bgra.to_string(),
&gst_video::VideoFormat::Bgrx.to_string(),
&gst_video::VideoFormat::Rgba.to_string(),
&gst_video::VideoFormat::Rgbx.to_string(),
]),
),
("width", &gst::IntRange::<i32>::new(0, i32::MAX)),
("height", &gst::IntRange::<i32>::new(0, i32::MAX)),
(
"framerate",
&gst::FractionRange::new(
gst::Fraction::new(0, 1),
gst::Fraction::new(i32::MAX, 1),
),
),
],
);
#[cfg(feature = "interlaced-fields")]
let caps = {
let mut tmp = caps.copy();
{
let tmp = tmp.get_mut().unwrap();
tmp.set_features_simple(Some(gst::CapsFeatures::new(&["format:Interlaced"])));
}
let mut caps = caps;
{
let caps = caps.get_mut().unwrap();
caps.append(tmp);
}
caps
};
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state( fn change_state(
&self, &self,
element: &gst::Element, element: &Self::Type,
transition: gst::StateChange, transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition { match transition {
@ -437,13 +432,13 @@ impl ElementImpl for NdiVideoSrc {
} }
impl BaseSrcImpl for NdiVideoSrc { impl BaseSrcImpl for NdiVideoSrc {
fn negotiate(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::LoggableError> { fn negotiate(&self, _element: &Self::Type) -> Result<(), gst::LoggableError> {
// Always succeed here without doing anything: we will set the caps once we received a // Always succeed here without doing anything: we will set the caps once we received a
// buffer, there's nothing we can negotiate // buffer, there's nothing we can negotiate
Ok(()) Ok(())
} }
fn unlock(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { fn unlock(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Unlocking",); gst_debug!(self.cat, obj: element, "Unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(true); controller.set_flushing(true);
@ -451,7 +446,7 @@ impl BaseSrcImpl for NdiVideoSrc {
Ok(()) Ok(())
} }
fn unlock_stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { fn unlock_stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Stop unlocking",); gst_debug!(self.cat, obj: element, "Stop unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() { if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(false); controller.set_flushing(false);
@ -459,12 +454,12 @@ impl BaseSrcImpl for NdiVideoSrc {
Ok(()) Ok(())
} }
fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default(); *self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap().clone(); let settings = self.settings.lock().unwrap().clone();
if settings.ndi_name.is_none() && settings.url_address.is_none() { if settings.ndi_name.is_none() && settings.url_address.is_none() {
return Err(gst_error_msg!( return Err(gst::error_msg!(
gst::LibraryError::Settings, gst::LibraryError::Settings,
["No NDI name or URL/address given"] ["No NDI name or URL/address given"]
)); ));
@ -472,7 +467,7 @@ impl BaseSrcImpl for NdiVideoSrc {
let receiver = connect_ndi( let receiver = connect_ndi(
self.cat, self.cat,
element, element.upcast_ref(),
settings.ndi_name.as_deref(), settings.ndi_name.as_deref(),
settings.url_address.as_deref(), settings.url_address.as_deref(),
&settings.receiver_ndi_name, &settings.receiver_ndi_name,
@ -485,7 +480,7 @@ impl BaseSrcImpl for NdiVideoSrc {
// settings.id_receiver exists // settings.id_receiver exists
match receiver { match receiver {
None => Err(gst_error_msg!( None => Err(gst::error_msg!(
gst::ResourceError::NotFound, gst::ResourceError::NotFound,
["Could not connect to this source"] ["Could not connect to this source"]
)), )),
@ -500,7 +495,7 @@ impl BaseSrcImpl for NdiVideoSrc {
} }
} }
fn stop(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { fn stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() { if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() {
controller.shutdown(); controller.shutdown();
} }
@ -508,7 +503,7 @@ impl BaseSrcImpl for NdiVideoSrc {
Ok(()) Ok(())
} }
fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool { fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool {
use gst::QueryView; use gst::QueryView;
match query.view_mut() { match query.view_mut() {
@ -521,14 +516,14 @@ impl BaseSrcImpl for NdiVideoSrc {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
if state.current_latency.is_some() { if let Some(latency) = state.current_latency {
let min = if settings.timestamp_mode != TimestampMode::Timecode { let min = if settings.timestamp_mode != TimestampMode::Timecode {
state.current_latency latency
} else { } else {
0.into() gst::ClockTime::ZERO
}; };
let max = 5 * state.current_latency; let max = 5 * latency;
println!("Returning latency min {} max {}", min, max,); println!("Returning latency min {} max {}", min, max,);
@ -549,11 +544,11 @@ impl BaseSrcImpl for NdiVideoSrc {
} }
} }
fn fixate(&self, element: &gst_base::BaseSrc, mut caps: gst::Caps) -> gst::Caps { fn fixate(&self, element: &Self::Type, mut caps: gst::Caps) -> gst::Caps {
caps.truncate(); caps.truncate();
{ {
let caps = caps.make_mut(); let caps = caps.make_mut();
let s = caps.get_mut_structure(0).unwrap(); let s = caps.structure_mut(0).unwrap();
s.fixate_field_nearest_int("width", 1920); s.fixate_field_nearest_int("width", 1920);
s.fixate_field_nearest_int("height", 1080); s.fixate_field_nearest_int("height", 1080);
if s.has_field("pixel-aspect-ratio") { if s.has_field("pixel-aspect-ratio") {
@ -567,7 +562,7 @@ impl BaseSrcImpl for NdiVideoSrc {
//Creates the video buffers //Creates the video buffers
fn create( fn create(
&self, &self,
element: &gst_base::BaseSrc, element: &Self::Type,
_offset: u64, _offset: u64,
_buffer: Option<&mut gst::BufferRef>, _buffer: Option<&mut gst::BufferRef>,
_length: u32, _length: u32,
@ -589,7 +584,7 @@ impl BaseSrcImpl for NdiVideoSrc {
state.receiver = Some(recv); state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) { if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().map_err(|_| { let caps = info.to_caps().map_err(|_| {
gst_element_error!( gst::element_error!(
element, element,
gst::ResourceError::Settings, gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info] ["Invalid audio info received: {:?}", info]
@ -597,11 +592,11 @@ impl BaseSrcImpl for NdiVideoSrc {
gst::FlowError::NotNegotiated gst::FlowError::NotNegotiated
})?; })?;
state.info = Some(info); state.info = Some(info);
state.current_latency = buffer.get_duration(); state.current_latency = buffer.duration();
drop(state); drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element.set_caps(&caps).map_err(|_| { element.set_caps(&caps).map_err(|_| {
gst_element_error!( gst::element_error!(
element, element,
gst::CoreError::Negotiation, gst::CoreError::Negotiation,
["Failed to negotiate caps: {:?}", caps] ["Failed to negotiate caps: {:?}", caps]
@ -621,12 +616,3 @@ impl BaseSrcImpl for NdiVideoSrc {
} }
} }
} }
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndivideosrc",
gst::Rank::None,
NdiVideoSrc::get_type(),
)
}

19
src/ndivideosrc/mod.rs Normal file
View file

@ -0,0 +1,19 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiVideoSrc(ObjectSubclass<imp::NdiVideoSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
}
unsafe impl Send for NdiVideoSrc {}
unsafe impl Sync for NdiVideoSrc {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndivideosrc",
gst::Rank::None,
NdiVideoSrc::static_type(),
)
}

View file

@ -1,6 +1,6 @@
use glib::prelude::*; use glib::prelude::*;
use gst::prelude::*; use gst::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_log, gst_warning}; use gst::{gst_debug, gst_error, gst_log, gst_warning};
use gst_video::prelude::*; use gst_video::prelude::*;
use byte_slice_cast::AsMutSliceOf; use byte_slice_cast::AsMutSliceOf;
@ -167,15 +167,14 @@ impl Observations {
&self, &self,
cat: gst::DebugCategory, cat: gst::DebugCategory,
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,
time: (gst::ClockTime, gst::ClockTime), time: (Option<gst::ClockTime>, gst::ClockTime),
duration: gst::ClockTime, duration: Option<gst::ClockTime>,
) -> (gst::ClockTime, gst::ClockTime) { ) -> (gst::ClockTime, Option<gst::ClockTime>) {
assert!(time.1.is_some());
if time.0.is_none() { if time.0.is_none() {
return (time.1, duration); return (time.1, duration);
} }
let time = (time.0.unwrap(), time.1.unwrap()); let time = (time.0.unwrap(), time.1);
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
let ObservationsInner { let ObservationsInner {
@ -190,8 +189,8 @@ impl Observations {
} = *inner; } = *inner;
if values.is_empty() { if values.is_empty() {
current_mapping.xbase = time.0; current_mapping.xbase = time.0.nseconds();
current_mapping.b = time.1; current_mapping.b = time.1.nseconds();
current_mapping.num = 1; current_mapping.num = 1;
current_mapping.den = 1; current_mapping.den = 1;
} }
@ -207,7 +206,9 @@ impl Observations {
// Start by first updating every frame, then every second frame, then every third // Start by first updating every frame, then every second frame, then every third
// frame, etc. until we update once every quarter second // frame, etc. until we update once every quarter second
let framerate = (gst::SECOND / duration).unwrap_or(25) as usize; let framerate = gst::ClockTime::SECOND
.checked_div(duration.unwrap_or(40 * gst::ClockTime::MSECOND).nseconds())
.unwrap_or(25) as usize;
if *skip_period < framerate / 4 + 1 { if *skip_period < framerate / 4 + 1 {
*skip_period += 1; *skip_period += 1;
@ -221,7 +222,7 @@ impl Observations {
if values.len() == WINDOW_LENGTH { if values.len() == WINDOW_LENGTH {
values.remove(0); values.remove(0);
} }
values.push(time); values.push((time.0.nseconds(), time.1.nseconds()));
if let Some((num, den, b, xbase, r_squared)) = if let Some((num, den, b, xbase, r_squared)) =
gst::calculate_linear_regression(values, Some(values_tmp)) gst::calculate_linear_regression(values, Some(values_tmp))
@ -236,8 +237,8 @@ impl Observations {
obj: element, obj: element,
"Calculated new time mapping: GStreamer time = {} * (NDI time - {}) + {} ({})", "Calculated new time mapping: GStreamer time = {} * (NDI time - {}) + {} ({})",
next_mapping.num as f64 / next_mapping.den as f64, next_mapping.num as f64 / next_mapping.den as f64,
gst::ClockTime::from(next_mapping.xbase), gst::ClockTime::from_nseconds(next_mapping.xbase),
gst::ClockTime::from(next_mapping.b), gst::ClockTime::from_nseconds(next_mapping.b),
r_squared, r_squared,
); );
} }
@ -250,81 +251,72 @@ impl Observations {
if *time_mapping_pending { if *time_mapping_pending {
let expected = gst::Clock::adjust_with_calibration( let expected = gst::Clock::adjust_with_calibration(
time.0.into(), time.0,
current_mapping.xbase.into(), gst::ClockTime::from_nseconds(current_mapping.xbase),
current_mapping.b.into(), gst::ClockTime::from_nseconds(current_mapping.b),
current_mapping.num.into(), gst::ClockTime::from_nseconds(current_mapping.num),
current_mapping.den.into(), gst::ClockTime::from_nseconds(current_mapping.den),
); );
let new_calculated = gst::Clock::adjust_with_calibration( let new_calculated = gst::Clock::adjust_with_calibration(
time.0.into(), time.0,
next_mapping.xbase.into(), gst::ClockTime::from_nseconds(next_mapping.xbase),
next_mapping.b.into(), gst::ClockTime::from_nseconds(next_mapping.b),
next_mapping.num.into(), gst::ClockTime::from_nseconds(next_mapping.num),
next_mapping.den.into(), gst::ClockTime::from_nseconds(next_mapping.den),
); );
if let (Some(expected), Some(new_calculated)) = (*expected, *new_calculated) { let diff = if new_calculated > expected {
let diff = if new_calculated > expected { new_calculated - expected
new_calculated - expected
} else {
expected - new_calculated
};
// Allow at most 5% frame duration or 2ms difference per frame
let max_diff = cmp::max(
(duration / 10).unwrap_or(2 * gst::MSECOND_VAL),
2 * gst::MSECOND_VAL,
);
if diff > max_diff {
gst_debug!(
cat,
obj: element,
"New time mapping causes difference {} but only {} allowed",
gst::ClockTime::from(diff),
gst::ClockTime::from(max_diff),
);
if new_calculated > expected {
current_mapping.b = expected + max_diff;
current_mapping.xbase = time.0;
} else {
current_mapping.b = expected - max_diff;
current_mapping.xbase = time.0;
}
} else {
*current_mapping = *next_mapping;
}
} else { } else {
gst_warning!( expected - new_calculated
};
// Allow at most 5% frame duration or 2ms difference per frame
let max_diff = cmp::max(
(duration.map(|d| d / 10)).unwrap_or(2 * gst::ClockTime::MSECOND),
2 * gst::ClockTime::MSECOND,
);
if diff > max_diff {
gst_debug!(
cat, cat,
obj: element, obj: element,
"Failed to calculate timestamps based on new mapping", "New time mapping causes difference {} but only {} allowed",
diff,
max_diff,
); );
if new_calculated > expected {
current_mapping.b = (expected + max_diff).nseconds();
current_mapping.xbase = time.0.nseconds();
} else {
current_mapping.b = (expected - max_diff).nseconds();
current_mapping.xbase = time.0.nseconds();
}
} else {
*current_mapping = *next_mapping;
} }
} }
let converted_timestamp = gst::Clock::adjust_with_calibration( let converted_timestamp = gst::Clock::adjust_with_calibration(
time.0.into(), time.0,
current_mapping.xbase.into(), gst::ClockTime::from_nseconds(current_mapping.xbase),
current_mapping.b.into(), gst::ClockTime::from_nseconds(current_mapping.b),
current_mapping.num.into(), gst::ClockTime::from_nseconds(current_mapping.num),
current_mapping.den.into(), gst::ClockTime::from_nseconds(current_mapping.den),
); );
let converted_duration = duration let converted_duration =
.mul_div_floor(current_mapping.num, current_mapping.den) duration.and_then(|d| d.mul_div_floor(current_mapping.num, current_mapping.den));
.unwrap_or(gst::CLOCK_TIME_NONE);
gst_debug!( gst_debug!(
cat, cat,
obj: element, obj: element,
"Converted timestamp {}/{} to {}, duration {} to {}", "Converted timestamp {}/{} to {}, duration {} to {}",
gst::ClockTime::from(time.0), time.0,
gst::ClockTime::from(time.1), time.1,
converted_timestamp, converted_timestamp.display(),
duration, duration.display(),
converted_duration, converted_duration.display(),
); );
(converted_timestamp, converted_duration) (converted_timestamp, converted_duration)
@ -422,7 +414,7 @@ impl<T: ReceiverType> Receiver<T> {
Err(_) => { Err(_) => {
if let Some(receiver) = weak.upgrade().map(Receiver) { if let Some(receiver) = weak.upgrade().map(Receiver) {
if let Some(element) = receiver.0.element.upgrade() { if let Some(element) = receiver.0.element.upgrade() {
gst_element_error!( gst::element_error!(
element, element,
gst::LibraryError::Failed, gst::LibraryError::Failed,
["Panic while connecting to NDI source"] ["Panic while connecting to NDI source"]
@ -558,7 +550,7 @@ where
if (receiver.video.is_some() || !T::IS_VIDEO) if (receiver.video.is_some() || !T::IS_VIDEO)
&& (receiver.audio.is_some() || T::IS_VIDEO) && (receiver.audio.is_some() || T::IS_VIDEO)
{ {
gst_element_error!( gst::element_error!(
element, element,
gst::ResourceError::OpenRead, gst::ResourceError::OpenRead,
[ [
@ -595,14 +587,14 @@ where
// FIXME: Ideally we would use NDIlib_recv_color_format_fastest here but that seems to be // FIXME: Ideally we would use NDIlib_recv_color_format_fastest here but that seems to be
// broken with interlaced content currently // broken with interlaced content currently
let recv = RecvInstance::builder(ndi_name, url_address, &receiver_ndi_name) let recv = RecvInstance::builder(ndi_name, url_address, receiver_ndi_name)
.bandwidth(bandwidth) .bandwidth(bandwidth)
.color_format(NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA) .color_format(NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA)
.allow_video_fields(true) .allow_video_fields(true)
.build(); .build();
let recv = match recv { let recv = match recv {
None => { None => {
gst_element_error!( gst::element_error!(
element, element,
gst::CoreError::Negotiation, gst::CoreError::Negotiation,
["Failed to connect to source"] ["Failed to connect to source"]
@ -785,44 +777,42 @@ impl<T: ReceiverType> Receiver<T> {
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,
timestamp: i64, timestamp: i64,
timecode: i64, timecode: i64,
duration: gst::ClockTime, duration: Option<gst::ClockTime>,
) -> Option<(gst::ClockTime, gst::ClockTime)> { ) -> Option<(gst::ClockTime, Option<gst::ClockTime>)> {
let clock = match element.get_clock() { let clock = element.clock()?;
None => return None,
Some(clock) => clock,
};
// For now take the current running time as PTS. At a later time we // For now take the current running time as PTS. At a later time we
// will want to work with the timestamp given by the NDI SDK if available // will want to work with the timestamp given by the NDI SDK if available
let now = clock.get_time(); let now = clock.time()?;
let base_time = element.get_base_time(); let base_time = element.base_time()?;
let receive_time = now - base_time; let receive_time = now - base_time;
let real_time_now = gst::ClockTime::from(glib::get_real_time() as u64 * 1000); let real_time_now = gst::ClockTime::from_nseconds(glib::real_time() as u64 * 1000);
let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined { let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined {
gst::CLOCK_TIME_NONE gst::ClockTime::NONE
} else { } else {
gst::ClockTime::from(timestamp as u64 * 100) Some(gst::ClockTime::from_nseconds(timestamp as u64 * 100))
}; };
let timecode = gst::ClockTime::from(timecode as u64 * 100); let timecode = gst::ClockTime::from_nseconds(timecode as u64 * 100);
gst_log!( gst_log!(
self.0.cat, self.0.cat,
obj: element, obj: element,
"Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}", "Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}",
timecode, timecode,
timestamp, timestamp.display(),
duration, duration.display(),
receive_time, receive_time.display(),
real_time_now, real_time_now,
); );
let (pts, duration) = match self.0.timestamp_mode { let (pts, duration) = match self.0.timestamp_mode {
TimestampMode::ReceiveTimeTimecode => { TimestampMode::ReceiveTimeTimecode => self.0.observations.process(
self.0 self.0.cat,
.observations element,
.process(self.0.cat, element, (timecode, receive_time), duration) (Some(timecode), receive_time),
} duration,
),
TimestampMode::ReceiveTimeTimestamp => self.0.observations.process( TimestampMode::ReceiveTimeTimestamp => self.0.observations.process(
self.0.cat, self.0.cat,
element, element,
@ -833,10 +823,11 @@ impl<T: ReceiverType> Receiver<T> {
TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration), TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration),
TimestampMode::Timestamp => { TimestampMode::Timestamp => {
// Timestamps are relative to the UNIX epoch // Timestamps are relative to the UNIX epoch
let timestamp = timestamp?;
if real_time_now > timestamp { if real_time_now > timestamp {
let diff = real_time_now - timestamp; let diff = real_time_now - timestamp;
if diff > receive_time { if diff > receive_time {
(0.into(), duration) (gst::ClockTime::ZERO, duration)
} else { } else {
(receive_time - diff, duration) (receive_time - diff, duration)
} }
@ -851,8 +842,8 @@ impl<T: ReceiverType> Receiver<T> {
self.0.cat, self.0.cat,
obj: element, obj: element,
"Calculated PTS {}, duration {}", "Calculated PTS {}, duration {}",
pts, pts.display(),
duration, duration.display(),
); );
Some((pts, duration)) Some((pts, duration))
@ -909,7 +900,7 @@ impl Receiver<VideoReceiver> {
let video_frame = match res { let video_frame = match res {
Err(_) => { Err(_) => {
gst_element_error!( gst::element_error!(
element, element,
gst::ResourceError::Read, gst::ResourceError::Read,
["Error receiving frame"] ["Error receiving frame"]
@ -970,13 +961,11 @@ impl Receiver<VideoReceiver> {
&self, &self,
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,
video_frame: &VideoFrame, video_frame: &VideoFrame,
) -> Option<(gst::ClockTime, gst::ClockTime)> { ) -> Option<(gst::ClockTime, Option<gst::ClockTime>)> {
let duration = gst::SECOND let duration = gst::ClockTime::SECOND.mul_div_floor(
.mul_div_floor( video_frame.frame_rate().1 as u64,
video_frame.frame_rate().1 as u64, video_frame.frame_rate().0 as u64,
video_frame.frame_rate().0 as u64, );
)
.unwrap_or(gst::CLOCK_TIME_NONE);
self.calculate_timestamp( self.calculate_timestamp(
element, element,
@ -1004,7 +993,7 @@ impl Receiver<VideoReceiver> {
ndisys::NDIlib_FourCC_video_type_RGBA => gst_video::VideoFormat::Rgba, ndisys::NDIlib_FourCC_video_type_RGBA => gst_video::VideoFormat::Rgba,
ndisys::NDIlib_FourCC_video_type_RGBX => gst_video::VideoFormat::Rgbx, ndisys::NDIlib_FourCC_video_type_RGBX => gst_video::VideoFormat::Rgbx,
_ => { _ => {
gst_element_error!( gst::element_error!(
element, element,
gst::StreamError::Format, gst::StreamError::Format,
["Unsupported video fourcc {:08x}", video_frame.fourcc()] ["Unsupported video fourcc {:08x}", video_frame.fourcc()]
@ -1045,7 +1034,7 @@ impl Receiver<VideoReceiver> {
} }
builder.build().map_err(|_| { builder.build().map_err(|_| {
gst_element_error!( gst::element_error!(
element, element,
gst::StreamError::Format, gst::StreamError::Format,
["Invalid video format configuration"] ["Invalid video format configuration"]
@ -1062,7 +1051,7 @@ impl Receiver<VideoReceiver> {
&& video_frame.frame_format_type() && video_frame.frame_format_type()
!= ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved != ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
{ {
gst_element_error!( gst::element_error!(
element, element,
gst::StreamError::Format, gst::StreamError::Format,
["Separate field interlacing not supported"] ["Separate field interlacing not supported"]
@ -1094,7 +1083,7 @@ impl Receiver<VideoReceiver> {
} }
builder.build().map_err(|_| { builder.build().map_err(|_| {
gst_element_error!( gst::element_error!(
element, element,
gst::StreamError::Format, gst::StreamError::Format,
["Invalid video format configuration"] ["Invalid video format configuration"]
@ -1109,7 +1098,7 @@ impl Receiver<VideoReceiver> {
&self, &self,
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,
pts: gst::ClockTime, pts: gst::ClockTime,
duration: gst::ClockTime, duration: Option<gst::ClockTime>,
info: &gst_video::VideoInfo, info: &gst_video::VideoInfo,
video_frame: &VideoFrame, video_frame: &VideoFrame,
) -> gst::Buffer { ) -> gst::Buffer {
@ -1124,15 +1113,15 @@ impl Receiver<VideoReceiver> {
gst::ReferenceTimestampMeta::add( gst::ReferenceTimestampMeta::add(
buffer, buffer,
&*TIMECODE_CAPS, &*TIMECODE_CAPS,
gst::ClockTime::from(video_frame.timecode() as u64 * 100), gst::ClockTime::from_nseconds(video_frame.timecode() as u64 * 100),
gst::CLOCK_TIME_NONE, gst::ClockTime::NONE,
); );
if video_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined { if video_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined {
gst::ReferenceTimestampMeta::add( gst::ReferenceTimestampMeta::add(
buffer, buffer,
&*TIMESTAMP_CAPS, &*TIMESTAMP_CAPS,
gst::ClockTime::from(video_frame.timestamp() as u64 * 100), gst::ClockTime::from_nseconds(video_frame.timestamp() as u64 * 100),
gst::CLOCK_TIME_NONE, gst::ClockTime::NONE,
); );
} }
} }
@ -1337,7 +1326,7 @@ impl Receiver<AudioReceiver> {
let audio_frame = match res { let audio_frame = match res {
Err(_) => { Err(_) => {
gst_element_error!( gst::element_error!(
element, element,
gst::ResourceError::Read, gst::ResourceError::Read,
["Error receiving frame"] ["Error receiving frame"]
@ -1398,13 +1387,11 @@ impl Receiver<AudioReceiver> {
&self, &self,
element: &gst_base::BaseSrc, element: &gst_base::BaseSrc,
audio_frame: &AudioFrame, audio_frame: &AudioFrame,
) -> Option<(gst::ClockTime, gst::ClockTime)> { ) -> Option<(gst::ClockTime, Option<gst::ClockTime>)> {
let duration = gst::SECOND let duration = gst::ClockTime::SECOND.mul_div_floor(
.mul_div_floor( audio_frame.no_samples() as u64,
audio_frame.no_samples() as u64, audio_frame.sample_rate() as u64,
audio_frame.sample_rate() as u64, );
)
.unwrap_or(gst::CLOCK_TIME_NONE);
self.calculate_timestamp( self.calculate_timestamp(
element, element,
@ -1426,7 +1413,7 @@ impl Receiver<AudioReceiver> {
); );
builder.build().map_err(|_| { builder.build().map_err(|_| {
gst_element_error!( gst::element_error!(
element, element,
gst::StreamError::Format, gst::StreamError::Format,
["Invalid audio format configuration"] ["Invalid audio format configuration"]
@ -1440,7 +1427,7 @@ impl Receiver<AudioReceiver> {
&self, &self,
_element: &gst_base::BaseSrc, _element: &gst_base::BaseSrc,
pts: gst::ClockTime, pts: gst::ClockTime,
duration: gst::ClockTime, duration: Option<gst::ClockTime>,
info: &gst_audio::AudioInfo, info: &gst_audio::AudioInfo,
audio_frame: &AudioFrame, audio_frame: &AudioFrame,
) -> gst::Buffer { ) -> gst::Buffer {
@ -1458,15 +1445,15 @@ impl Receiver<AudioReceiver> {
gst::ReferenceTimestampMeta::add( gst::ReferenceTimestampMeta::add(
buffer, buffer,
&*TIMECODE_CAPS, &*TIMECODE_CAPS,
gst::ClockTime::from(audio_frame.timecode() as u64 * 100), gst::ClockTime::from_nseconds(audio_frame.timecode() as u64 * 100),
gst::CLOCK_TIME_NONE, gst::ClockTime::NONE,
); );
if audio_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined { if audio_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined {
gst::ReferenceTimestampMeta::add( gst::ReferenceTimestampMeta::add(
buffer, buffer,
&*TIMESTAMP_CAPS, &*TIMESTAMP_CAPS,
gst::ClockTime::from(audio_frame.timestamp() as u64 * 100), gst::ClockTime::from_nseconds(audio_frame.timestamp() as u64 * 100),
gst::CLOCK_TIME_NONE, gst::ClockTime::NONE,
); );
} }
} }