Merge pull request #71 from sdroege/gst-0.17

Update to gstreamer-rs 0.17
This commit is contained in:
Samuel Alonso Rodriguez 2021-09-24 11:30:22 +02:00 committed by GitHub
commit 0f88b3df68
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 1036 additions and 992 deletions

View file

@ -8,16 +8,16 @@ description = "NewTek NDI Plugin"
edition = "2018"
[dependencies]
glib = "0.10"
gst = { package = "gstreamer", version = "0.16", features = ["v1_12"] }
gst-base = { package = "gstreamer-base", version = "0.16" }
gst-audio = { package = "gstreamer-audio", version = "0.16" }
gst-video = { package = "gstreamer-video", version = "0.16", features = ["v1_12"] }
glib = "0.14"
gst = { package = "gstreamer", version = "0.17", features = ["v1_12"] }
gst-base = { package = "gstreamer-base", version = "0.17" }
gst-audio = { package = "gstreamer-audio", version = "0.17" }
gst-video = { package = "gstreamer-video", version = "0.17", features = ["v1_12"] }
byte-slice-cast = "1"
once_cell = "1.0"
[build-dependencies]
gst-plugin-version-helper = "0.2"
gst-plugin-version-helper = "0.7"
[features]
default = ["interlaced-fields", "reference-timestamps", "sink"]

View file

@ -1,5 +1,3 @@
extern crate gst_plugin_version_helper;
fn main() {
gst_plugin_version_helper::get_info()
gst_plugin_version_helper::info()
}

View file

@ -1,4 +1,3 @@
use glib::subclass;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_error, gst_log, gst_trace};
@ -9,24 +8,24 @@ use std::sync::atomic;
use std::sync::Mutex;
use std::thread;
use once_cell::sync::Lazy;
use crate::ndi;
#[derive(Debug)]
struct DeviceProvider {
pub struct DeviceProvider {
cat: gst::DebugCategory,
thread: Mutex<Option<thread::JoinHandle<()>>>,
current_devices: Mutex<Vec<gst::Device>>,
current_devices: Mutex<Vec<super::Device>>,
find: Mutex<Option<ndi::FindInstance>>,
is_running: atomic::AtomicBool,
}
#[glib::object_subclass]
impl ObjectSubclass for DeviceProvider {
const NAME: &'static str = "NdiDeviceProvider";
type Type = super::DeviceProvider;
type ParentType = gst::DeviceProvider;
type Instance = subclass::simple::InstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn new() -> Self {
Self {
@ -41,26 +40,32 @@ impl ObjectSubclass for DeviceProvider {
is_running: atomic::AtomicBool::new(false),
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"NewTek NDI Device Provider",
"Source/Audio/Video/Network",
"NewTek NDI Device Provider",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
);
}
}
impl ObjectImpl for DeviceProvider {
glib::glib_object_impl!();
}
impl ObjectImpl for DeviceProvider {}
impl DeviceProviderImpl for DeviceProvider {
fn probe(&self, _device_provider: &gst::DeviceProvider) -> Vec<gst::Device> {
self.current_devices.lock().unwrap().clone()
fn metadata() -> Option<&'static gst::subclass::DeviceProviderMetadata> {
static METADATA: Lazy<gst::subclass::DeviceProviderMetadata> = Lazy::new(|| {
gst::subclass::DeviceProviderMetadata::new("NewTek NDI Device Provider",
"Source/Audio/Video/Network",
"NewTek NDI Device Provider",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>")
});
Some(&*METADATA)
}
fn start(&self, device_provider: &gst::DeviceProvider) -> Result<(), gst::LoggableError> {
fn probe(&self, _device_provider: &Self::Type) -> Vec<gst::Device> {
self.current_devices
.lock()
.unwrap()
.iter()
.map(|d| d.clone().upcast())
.collect()
}
fn start(&self, device_provider: &Self::Type) -> Result<(), gst::LoggableError> {
let mut thread_guard = self.thread.lock().unwrap();
if thread_guard.is_some() {
gst_log!(
@ -121,7 +126,8 @@ impl DeviceProviderImpl for DeviceProvider {
Ok(())
}
fn stop(&self, _device_provider: &gst::DeviceProvider) {
fn stop(&self, _device_provider: &Self::Type) {
if let Some(_thread) = self.thread.lock().unwrap().take() {
self.is_running.store(false, atomic::Ordering::SeqCst);
// Don't actually join because that might take a while
@ -130,7 +136,7 @@ impl DeviceProviderImpl for DeviceProvider {
}
impl DeviceProvider {
fn poll(&self, device_provider: &gst::DeviceProvider, first: bool) {
fn poll(&self, device_provider: &super::DeviceProvider, first: bool) {
let mut find_guard = self.find.lock().unwrap();
let find = match *find_guard {
None => return,
@ -189,11 +195,11 @@ impl DeviceProvider {
source
);
// Add once for audio, another time for video
let device = Device::new(&source, true);
let device = super::Device::new(&source, true);
device_provider.device_add(&device);
current_devices_guard.push(device);
let device = Device::new(&source, false);
let device = super::Device::new(&source, false);
device_provider.device_add(&device);
current_devices_guard.push(device);
}
@ -201,18 +207,16 @@ impl DeviceProvider {
}
#[derive(Debug)]
struct Device {
pub struct Device {
cat: gst::DebugCategory,
source: OnceCell<(ndi::Source<'static>, glib::Type)>,
}
#[glib::object_subclass]
impl ObjectSubclass for Device {
const NAME: &'static str = "NdiDevice";
type Type = super::Device;
type ParentType = gst::Device;
type Instance = subclass::simple::InstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn new() -> Self {
Self {
@ -226,18 +230,16 @@ impl ObjectSubclass for Device {
}
}
impl ObjectImpl for Device {
glib::glib_object_impl!();
}
impl ObjectImpl for Device {}
impl DeviceImpl for Device {
fn create_element(
&self,
_device: &gst::Device,
_device: &Self::Type,
name: Option<&str>,
) -> Result<gst::Element, gst::LoggableError> {
let source_info = self.source.get().unwrap();
let element = glib::Object::new(
let element = glib::Object::with_type(
source_info.1,
&[
("name", &name),
@ -253,8 +255,8 @@ impl DeviceImpl for Device {
}
}
impl Device {
fn new(source: &ndi::Source<'_>, is_audio: bool) -> gst::Device {
impl super::Device {
fn new(source: &ndi::Source<'_>, is_audio: bool) -> super::Device {
let display_name = format!(
"{} ({})",
source.ndi_name(),
@ -267,13 +269,13 @@ impl Device {
// Get the caps from the template caps of the corresponding source element
let element_type = if is_audio {
crate::ndiaudiosrc::NdiAudioSrc::get_type()
crate::ndiaudiosrc::NdiAudioSrc::static_type()
} else {
crate::ndivideosrc::NdiVideoSrc::get_type()
crate::ndivideosrc::NdiVideoSrc::static_type()
};
let element_class = gst::ElementClass::from_type(element_type).unwrap();
let templ = element_class.get_pad_template("src").unwrap();
let caps = templ.get_caps().unwrap();
let element_class = glib::Class::<gst::Element>::from_type(element_type).unwrap();
let templ = element_class.pad_template("src").unwrap();
let caps = templ.caps();
// Put the url-address into the extra properties
let extra_properties = gst::Structure::builder("properties")
@ -281,17 +283,12 @@ impl Device {
.field("url-address", &source.url_address())
.build();
let device = glib::Object::new(
Device::get_type(),
&[
let device = glib::Object::new::<super::Device>(&[
("caps", &caps),
("display-name", &display_name),
("device-class", &device_class),
("properties", &extra_properties),
],
)
.unwrap()
.dynamic_cast::<gst::Device>()
])
.unwrap();
let device_impl = Device::from_instance(&device);
@ -303,12 +300,3 @@ impl Device {
device
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::DeviceProvider::register(
Some(plugin),
"ndideviceprovider",
gst::Rank::Primary,
DeviceProvider::get_type(),
)
}

View file

@ -0,0 +1,26 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct DeviceProvider(ObjectSubclass<imp::DeviceProvider>) @extends gst::DeviceProvider, gst::Object;
}
unsafe impl Send for DeviceProvider {}
unsafe impl Sync for DeviceProvider {}
glib::wrapper! {
pub struct Device(ObjectSubclass<imp::Device>) @extends gst::Device, gst::Object;
}
unsafe impl Send for Device {}
unsafe impl Sync for Device {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::DeviceProvider::register(
Some(plugin),
"ndideviceprovider",
gst::Rank::Primary,
DeviceProvider::static_type(),
)
}

View file

@ -1,5 +1,3 @@
use glib::prelude::*;
mod device_provider;
pub mod ndi;
mod ndiaudiosrc;
@ -38,7 +36,7 @@ pub enum TimestampMode {
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
if !ndi::initialize() {
return Err(glib::glib_bool_error!("Cannot initialize NDI"));
return Err(glib::bool_error!("Cannot initialize NDI"));
}
device_provider::register(plugin)?;
@ -68,7 +66,7 @@ static TIMECODE_CAPS: Lazy<gst::Caps> =
static TIMESTAMP_CAPS: Lazy<gst::Caps> =
Lazy::new(|| gst::Caps::new_simple("timestamp/x-ndi-timestamp", &[]));
gst::gst_plugin_define!(
gst::plugin_define!(
ndi,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,

View file

@ -420,7 +420,7 @@ pub struct SendInstance(ptr::NonNull<::std::os::raw::c_void>);
unsafe impl Send for SendInstance {}
impl SendInstance {
pub fn builder<'a>(ndi_name: &'a str) -> SendBuilder<'a> {
pub fn builder(ndi_name: &str) -> SendBuilder {
SendBuilder {
ndi_name,
clock_video: false,
@ -749,7 +749,7 @@ impl<'a> VideoFrame<'a> {
impl<'a> Drop for VideoFrame<'a> {
#[allow(irrefutable_let_patterns)]
fn drop(&mut self) {
if let VideoFrame::BorrowedRecv(ref mut frame, ref recv) = *self {
if let VideoFrame::BorrowedRecv(ref mut frame, recv) = *self {
unsafe {
NDIlib_recv_free_video_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
}
@ -918,7 +918,7 @@ impl<'a> AudioFrame<'a> {
impl<'a> Drop for AudioFrame<'a> {
#[allow(irrefutable_let_patterns)]
fn drop(&mut self) {
if let AudioFrame::BorrowedRecv(ref mut frame, ref recv) = *self {
if let AudioFrame::BorrowedRecv(ref mut frame, recv) = *self {
unsafe {
NDIlib_recv_free_audio_v2(((recv.0).0).0.as_ptr() as *mut _, frame);
}
@ -1010,7 +1010,7 @@ impl<'a> Default for MetadataFrame<'a> {
impl<'a> Drop for MetadataFrame<'a> {
fn drop(&mut self) {
if let MetadataFrame::Borrowed(ref mut frame, ref recv) = *self {
if let MetadataFrame::Borrowed(ref mut frame, recv) = *self {
unsafe {
NDIlib_recv_free_metadata(((recv.0).0).0.as_ptr() as *mut _, frame);
}

View file

@ -1,7 +1,6 @@
use glib::subclass;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg};
use gst::{gst_debug, gst_error};
use gst_base::prelude::*;
use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*;
@ -9,6 +8,8 @@ use gst_base::subclass::prelude::*;
use std::sync::Mutex;
use std::{i32, u32};
use once_cell::sync::Lazy;
use crate::connect_ndi;
use crate::ndisys;
@ -46,94 +47,10 @@ impl Default for Settings {
}
}
static PROPERTIES: [subclass::Property; 8] = [
subclass::Property("ndi-name", |name| {
glib::ParamSpec::string(
name,
"NDI Name",
"NDI stream name of the sender",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("url-address", |name| {
glib::ParamSpec::string(
name,
"URL/Address",
"URL/address and port of the sender, e.g. 127.0.0.1:5961",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("receiver-ndi-name", |name| {
glib::ParamSpec::string(
name,
"Receiver NDI Name",
"NDI stream name of this receiver",
Some(&*DEFAULT_RECEIVER_NDI_NAME),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("connect-timeout", |name| {
glib::ParamSpec::uint(
name,
"Connect Timeout",
"Connection timeout in ms",
0,
u32::MAX,
10000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timeout", |name| {
glib::ParamSpec::uint(
name,
"Timeout",
"Receive timeout in ms",
0,
u32::MAX,
5000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("max-queue-length", |name| {
glib::ParamSpec::uint(
name,
"Max Queue Length",
"Maximum receive queue length",
0,
u32::MAX,
5,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("bandwidth", |name| {
glib::ParamSpec::int(
name,
"Bandwidth",
"Bandwidth, -10 metadata-only, 10 audio-only, 100 highest",
-10,
100,
100,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timestamp-mode", |name| {
glib::ParamSpec::enum_(
name,
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTimeTimecode as i32,
glib::ParamFlags::READWRITE,
)
}),
];
struct State {
info: Option<gst_audio::AudioInfo>,
receiver: Option<Receiver<AudioReceiver>>,
current_latency: gst::ClockTime,
current_latency: Option<gst::ClockTime>,
}
impl Default for State {
@ -141,25 +58,23 @@ impl Default for State {
State {
info: None,
receiver: None,
current_latency: gst::CLOCK_TIME_NONE,
current_latency: gst::ClockTime::NONE,
}
}
}
pub(crate) struct NdiAudioSrc {
pub struct NdiAudioSrc {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
receiver_controller: Mutex<Option<ReceiverControlHandle<AudioReceiver>>>,
}
#[glib::object_subclass]
impl ObjectSubclass for NdiAudioSrc {
const NAME: &'static str = "NdiAudioSrc";
type Type = super::NdiAudioSrc;
type ParentType = gst_base::BaseSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn new() -> Self {
Self {
@ -173,15 +88,259 @@ impl ObjectSubclass for NdiAudioSrc {
receiver_controller: Mutex::new(None),
}
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
impl ObjectImpl for NdiAudioSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpec::new_string(
"ndi-name",
"NDI Name",
"NDI stream name of the sender",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"url-address",
"URL/Address",
"URL/address and port of the sender, e.g. 127.0.0.1:5961",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"receiver-ndi-name",
"Receiver NDI Name",
"NDI stream name of this receiver",
Some(&*DEFAULT_RECEIVER_NDI_NAME),
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"connect-timeout",
"Connect Timeout",
"Connection timeout in ms",
0,
u32::MAX,
10000,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"timeout",
"Timeout",
"Receive timeout in ms",
0,
u32::MAX,
5000,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"max-queue-length",
"Max Queue Length",
"Maximum receive queue length",
0,
u32::MAX,
5,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_int(
"bandwidth",
"Bandwidth",
"Bandwidth, -10 metadata-only, 10 audio-only, 100 highest",
-10,
100,
100,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_enum(
"timestamp-mode",
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTimeTimecode as i32,
glib::ParamFlags::READWRITE,
),
]
});
PROPERTIES.as_ref()
}
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
// Initialize live-ness and notify the base class that
// we'd like to operate in Time format
obj.set_live(true);
obj.set_format(gst::Format::Time);
}
fn set_property(
&self,
obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() {
"ndi-name" => {
let mut settings = self.settings.lock().unwrap();
let ndi_name = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing ndi-name from {:?} to {:?}",
settings.ndi_name,
ndi_name,
);
settings.ndi_name = ndi_name;
}
"url-address" => {
let mut settings = self.settings.lock().unwrap();
let url_address = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing url-address from {:?} to {:?}",
settings.url_address,
url_address,
);
settings.url_address = url_address;
}
"receiver-ndi-name" => {
let mut settings = self.settings.lock().unwrap();
let receiver_ndi_name = value.get::<Option<String>>().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing receiver-ndi-name from {:?} to {:?}",
settings.receiver_ndi_name,
receiver_ndi_name,
);
settings.receiver_ndi_name =
receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone());
}
"connect-timeout" => {
let mut settings = self.settings.lock().unwrap();
let connect_timeout = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing connect-timeout from {} to {}",
settings.connect_timeout,
connect_timeout,
);
settings.connect_timeout = connect_timeout;
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing timeout from {} to {}",
settings.timeout,
timeout,
);
settings.timeout = timeout;
}
"max-queue-length" => {
let mut settings = self.settings.lock().unwrap();
let max_queue_length = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing max-queue-length from {} to {}",
settings.max_queue_length,
max_queue_length,
);
settings.max_queue_length = max_queue_length;
}
"bandwidth" => {
let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing bandwidth from {} to {}",
settings.bandwidth,
bandwidth,
);
settings.bandwidth = bandwidth;
}
"timestamp-mode" => {
let mut settings = self.settings.lock().unwrap();
let timestamp_mode = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing timestamp mode from {:?} to {:?}",
settings.timestamp_mode,
timestamp_mode
);
if settings.timestamp_mode != timestamp_mode {
let _ = obj.post_message(gst::message::Latency::builder().src(obj).build());
}
settings.timestamp_mode = timestamp_mode;
}
_ => unimplemented!(),
}
}
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"ndi-name" => {
let settings = self.settings.lock().unwrap();
settings.ndi_name.to_value()
}
"url-address" => {
let settings = self.settings.lock().unwrap();
settings.url_address.to_value()
}
"receiver-ndi-name" => {
let settings = self.settings.lock().unwrap();
settings.receiver_ndi_name.to_value()
}
"connect-timeout" => {
let settings = self.settings.lock().unwrap();
settings.connect_timeout.to_value()
}
"timeout" => {
let settings = self.settings.lock().unwrap();
settings.timeout.to_value()
}
"max-queue-length" => {
let settings = self.settings.lock().unwrap();
settings.max_queue_length.to_value()
}
"bandwidth" => {
let settings = self.settings.lock().unwrap();
settings.bandwidth.to_value()
}
"timestamp-mode" => {
let settings = self.settings.lock().unwrap();
settings.timestamp_mode.to_value()
}
_ => unimplemented!(),
}
}
}
impl ElementImpl for NdiAudioSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"NewTek NDI Audio Source",
"Source",
"NewTek NDI audio source",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
);
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::new_simple(
"audio/x-raw",
&[
@ -195,187 +354,23 @@ impl ObjectSubclass for NdiAudioSrc {
],
);
let src_pad_template = gst::PadTemplate::new(
let audio_src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
gst::PadPresence::Sometimes,
&caps,
)
.unwrap();
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
vec![audio_src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
impl ObjectImpl for NdiAudioSrc {
glib::glib_object_impl!();
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
// Initialize live-ness and notify the base class that
// we'd like to operate in Time format
basesrc.set_live(true);
basesrc.set_format(gst::Format::Time);
}
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
match *prop {
subclass::Property("ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
let ndi_name = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing ndi-name from {:?} to {:?}",
settings.ndi_name,
ndi_name,
);
settings.ndi_name = ndi_name;
}
subclass::Property("url-address", ..) => {
let mut settings = self.settings.lock().unwrap();
let url_address = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing url-address from {:?} to {:?}",
settings.url_address,
url_address,
);
settings.url_address = url_address;
}
subclass::Property("receiver-ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
let receiver_ndi_name = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing receiver-ndi-name from {:?} to {:?}",
settings.receiver_ndi_name,
receiver_ndi_name,
);
settings.receiver_ndi_name =
receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone());
}
subclass::Property("connect-timeout", ..) => {
let mut settings = self.settings.lock().unwrap();
let connect_timeout = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing connect-timeout from {} to {}",
settings.connect_timeout,
connect_timeout,
);
settings.connect_timeout = connect_timeout;
}
subclass::Property("timeout", ..) => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing timeout from {} to {}",
settings.timeout,
timeout,
);
settings.timeout = timeout;
}
subclass::Property("max-queue-length", ..) => {
let mut settings = self.settings.lock().unwrap();
let max_queue_length = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing max-queue-length from {} to {}",
settings.max_queue_length,
max_queue_length,
);
settings.max_queue_length = max_queue_length;
}
subclass::Property("bandwidth", ..) => {
let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing bandwidth from {} to {}",
settings.bandwidth,
bandwidth,
);
settings.bandwidth = bandwidth;
}
subclass::Property("timestamp-mode", ..) => {
let mut settings = self.settings.lock().unwrap();
let timestamp_mode = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing timestamp mode from {:?} to {:?}",
settings.timestamp_mode,
timestamp_mode
);
if settings.timestamp_mode != timestamp_mode {
let _ =
basesrc.post_message(gst::message::Latency::builder().src(basesrc).build());
}
settings.timestamp_mode = timestamp_mode;
}
_ => unimplemented!(),
}
}
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.ndi_name.to_value())
}
subclass::Property("url-address", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.url_address.to_value())
}
subclass::Property("receiver-ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.receiver_ndi_name.to_value())
}
subclass::Property("connect-timeout", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.connect_timeout.to_value())
}
subclass::Property("timeout", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.timeout.to_value())
}
subclass::Property("max-queue-length", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.max_queue_length.to_value())
}
subclass::Property("bandwidth", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.bandwidth.to_value())
}
subclass::Property("timestamp-mode", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.timestamp_mode.to_value())
}
_ => unimplemented!(),
}
}
}
impl ElementImpl for NdiAudioSrc {
fn change_state(
&self,
element: &gst::Element,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
@ -402,13 +397,13 @@ impl ElementImpl for NdiAudioSrc {
}
impl BaseSrcImpl for NdiAudioSrc {
fn negotiate(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::LoggableError> {
fn negotiate(&self, _element: &Self::Type) -> Result<(), gst::LoggableError> {
// Always succeed here without doing anything: we will set the caps once we received a
// buffer, there's nothing we can negotiate
Ok(())
}
fn unlock(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn unlock(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(true);
@ -416,7 +411,7 @@ impl BaseSrcImpl for NdiAudioSrc {
Ok(())
}
fn unlock_stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn unlock_stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Stop unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(false);
@ -424,12 +419,12 @@ impl BaseSrcImpl for NdiAudioSrc {
Ok(())
}
fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap().clone();
if settings.ndi_name.is_none() && settings.url_address.is_none() {
return Err(gst_error_msg!(
return Err(gst::error_msg!(
gst::LibraryError::Settings,
["No NDI name or URL/address given"]
));
@ -437,7 +432,7 @@ impl BaseSrcImpl for NdiAudioSrc {
let receiver = connect_ndi(
self.cat,
element,
element.upcast_ref(),
settings.ndi_name.as_deref(),
settings.url_address.as_deref(),
&settings.receiver_ndi_name,
@ -450,7 +445,7 @@ impl BaseSrcImpl for NdiAudioSrc {
// settings.id_receiver exists
match receiver {
None => Err(gst_error_msg!(
None => Err(gst::error_msg!(
gst::ResourceError::NotFound,
["Could not connect to this source"]
)),
@ -465,7 +460,7 @@ impl BaseSrcImpl for NdiAudioSrc {
}
}
fn stop(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() {
controller.shutdown();
}
@ -473,7 +468,7 @@ impl BaseSrcImpl for NdiAudioSrc {
Ok(())
}
fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool {
fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
match query.view_mut() {
@ -486,14 +481,14 @@ impl BaseSrcImpl for NdiAudioSrc {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
if state.current_latency.is_some() {
if let Some(latency) = state.current_latency {
let min = if settings.timestamp_mode != TimestampMode::Timecode {
state.current_latency
latency
} else {
0.into()
gst::ClockTime::ZERO
};
let max = 5 * state.current_latency;
let max = 5 * latency;
gst_debug!(
self.cat,
@ -512,11 +507,11 @@ impl BaseSrcImpl for NdiAudioSrc {
}
}
fn fixate(&self, element: &gst_base::BaseSrc, mut caps: gst::Caps) -> gst::Caps {
fn fixate(&self, element: &Self::Type, mut caps: gst::Caps) -> gst::Caps {
caps.truncate();
{
let caps = caps.make_mut();
let s = caps.get_mut_structure(0).unwrap();
let s = caps.structure_mut(0).unwrap();
s.fixate_field_nearest_int("rate", 48_000);
s.fixate_field_nearest_int("channels", 2);
}
@ -526,7 +521,7 @@ impl BaseSrcImpl for NdiAudioSrc {
fn create(
&self,
element: &gst_base::BaseSrc,
element: &Self::Type,
_offset: u64,
_buffer: Option<&mut gst::BufferRef>,
_length: u32,
@ -548,7 +543,7 @@ impl BaseSrcImpl for NdiAudioSrc {
state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().map_err(|_| {
gst_element_error!(
gst::element_error!(
element,
gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info]
@ -556,11 +551,11 @@ impl BaseSrcImpl for NdiAudioSrc {
gst::FlowError::NotNegotiated
})?;
state.info = Some(info);
state.current_latency = buffer.get_duration();
state.current_latency = buffer.duration();
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element.set_caps(&caps).map_err(|_| {
gst_element_error!(
gst::element_error!(
element,
gst::CoreError::Negotiation,
["Failed to negotiate caps: {:?}", caps]
@ -580,12 +575,3 @@ impl BaseSrcImpl for NdiAudioSrc {
}
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndiaudiosrc",
gst::Rank::None,
NdiAudioSrc::get_type(),
)
}

19
src/ndiaudiosrc/mod.rs Normal file
View file

@ -0,0 +1,19 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiAudioSrc(ObjectSubclass<imp::NdiAudioSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
}
unsafe impl Send for NdiAudioSrc {}
unsafe impl Sync for NdiAudioSrc {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndiaudiosrc",
gst::Rank::None,
NdiAudioSrc::static_type(),
)
}

View file

@ -1,15 +1,15 @@
use glib::subclass;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error, gst_error_msg, gst_info, gst_loggable_error, gst_trace};
use gst_base::{subclass::prelude::*, BaseSinkExtManual};
use gst::{gst_debug, gst_error, gst_info, gst_trace};
use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use std::sync::Mutex;
use once_cell::sync::Lazy;
use super::ndi::SendInstance;
use crate::ndi::SendInstance;
static DEFAULT_SENDER_NDI_NAME: Lazy<String> = Lazy::new(|| {
format!(
@ -32,16 +32,6 @@ impl Default for Settings {
}
}
static PROPERTIES: [subclass::Property; 1] = [subclass::Property("ndi-name", |name| {
glib::ParamSpec::string(
name,
"NDI Name",
"NDI Name to use",
Some(DEFAULT_SENDER_NDI_NAME.as_ref()),
glib::ParamFlags::READWRITE,
)
})];
struct State {
send: SendInstance,
video_info: Option<gst_video::VideoInfo>,
@ -57,13 +47,11 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new("ndisink", gst::DebugColorFlags::empty(), Some("NDI Sink"))
});
#[glib::object_subclass]
impl ObjectSubclass for NdiSink {
const NAME: &'static str = "NdiSink";
type Type = super::NdiSink;
type ParentType = gst_base::BaseSink;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn new() -> Self {
Self {
@ -71,15 +59,68 @@ impl ObjectSubclass for NdiSink {
state: Mutex::new(Default::default()),
}
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
impl ObjectImpl for NdiSink {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpec::new_string(
"ndi-name",
"NDI Name",
"NDI Name to use",
Some(DEFAULT_SENDER_NDI_NAME.as_ref()),
glib::ParamFlags::READWRITE,
)]
});
PROPERTIES.as_ref()
}
fn set_property(
&self,
_obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() {
"ndi-name" => {
let mut settings = self.settings.lock().unwrap();
settings.ndi_name = value
.get::<String>()
.unwrap_or_else(|_| DEFAULT_SENDER_NDI_NAME.clone());
}
_ => unimplemented!(),
};
}
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"ndi-name" => {
let settings = self.settings.lock().unwrap();
settings.ndi_name.to_value()
}
_ => unimplemented!(),
}
}
}
impl ElementImpl for NdiSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"NDI Sink",
"Sink/Audio/Video",
"Render as an NDI stream",
"Sebastian Dröge <sebastian@centricular.com>",
);
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder_full()
.structure(
gst::Structure::builder("video/x-raw")
@ -125,52 +166,22 @@ impl ObjectSubclass for NdiSink {
&caps,
)
.unwrap();
klass.add_pad_template(sink_pad_template);
vec![sink_pad_template]
});
klass.install_properties(&PROPERTIES);
PAD_TEMPLATES.as_ref()
}
}
impl ObjectImpl for NdiSink {
glib::glib_object_impl!();
fn set_property(&self, _obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.ndi_name = value
.get::<String>()
.unwrap()
.unwrap_or_else(|| DEFAULT_SENDER_NDI_NAME.clone());
}
_ => unimplemented!(),
};
}
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.ndi_name.to_value())
}
_ => unimplemented!(),
}
}
}
impl ElementImpl for NdiSink {}
impl BaseSinkImpl for NdiSink {
fn start(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
let send = SendInstance::builder(&settings.ndi_name)
.build()
.ok_or_else(|| {
gst_error_msg!(
gst::error_msg!(
gst::ResourceError::OpenWrite,
["Could not create send instance"]
)
@ -187,7 +198,7 @@ impl BaseSinkImpl for NdiSink {
Ok(())
}
fn stop(&self, element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap();
*state_storage = None;
@ -196,37 +207,33 @@ impl BaseSinkImpl for NdiSink {
Ok(())
}
fn unlock(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
fn unlock(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
Ok(())
}
fn unlock_stop(&self, _element: &gst_base::BaseSink) -> Result<(), gst::ErrorMessage> {
fn unlock_stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
Ok(())
}
fn set_caps(
&self,
element: &gst_base::BaseSink,
caps: &gst::Caps,
) -> Result<(), gst::LoggableError> {
fn set_caps(&self, element: &Self::Type, caps: &gst::Caps) -> Result<(), gst::LoggableError> {
gst_debug!(CAT, obj: element, "Setting caps {}", caps);
let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage {
None => return Err(gst_loggable_error!(CAT, "Sink not started yet")),
None => return Err(gst::loggable_error!(CAT, "Sink not started yet")),
Some(ref mut state) => state,
};
let s = caps.get_structure(0).unwrap();
if s.get_name() == "video/x-raw" {
let s = caps.structure(0).unwrap();
if s.name() == "video/x-raw" {
let info = gst_video::VideoInfo::from_caps(caps)
.map_err(|_| gst_loggable_error!(CAT, "Couldn't parse caps {}", caps))?;
.map_err(|_| gst::loggable_error!(CAT, "Couldn't parse caps {}", caps))?;
state.video_info = Some(info);
state.audio_info = None;
} else {
let info = gst_audio::AudioInfo::from_caps(caps)
.map_err(|_| gst_loggable_error!(CAT, "Couldn't parse caps {}", caps))?;
.map_err(|_| gst::loggable_error!(CAT, "Couldn't parse caps {}", caps))?;
state.audio_info = Some(info);
state.video_info = None;
@ -237,7 +244,7 @@ impl BaseSinkImpl for NdiSink {
fn render(
&self,
element: &gst_base::BaseSink,
element: &Self::Type,
buffer: &gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state_storage = self.state.lock().unwrap();
@ -247,7 +254,7 @@ impl BaseSinkImpl for NdiSink {
};
if let Some(ref info) = state.video_info {
if let Some(audio_meta) = buffer.get_meta::<crate::ndisinkmeta::NdiSinkAudioMeta>() {
if let Some(audio_meta) = buffer.meta::<crate::ndisinkmeta::NdiSinkAudioMeta>() {
for (buffer, info, timecode) in audio_meta.buffers() {
let frame =
crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, *timecode)
@ -262,9 +269,9 @@ impl BaseSinkImpl for NdiSink {
"Sending audio buffer {:?} with timecode {} and format {:?}",
buffer,
if *timecode < 0 {
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE.display()
} else {
gst::ClockTime::from(*timecode as u64 * 100)
Some(gst::ClockTime::from_nseconds(*timecode as u64 * 100)).display()
},
info,
);
@ -273,15 +280,18 @@ impl BaseSinkImpl for NdiSink {
}
// Skip empty/gap buffers from ndisinkcombiner
if buffer.get_size() != 0 {
if buffer.size() != 0 {
let timecode = element
.get_segment()
.segment()
.downcast::<gst::ClockTime>()
.ok()
.and_then(|segment| {
*(segment.to_running_time(buffer.get_pts()) + element.get_base_time())
segment
.to_running_time(buffer.pts())
.zip(element.base_time())
})
.map(|time| (time / 100) as i64)
.and_then(|(running_time, base_time)| running_time.checked_add(base_time))
.map(|time| (time.nseconds() / 100) as i64)
.unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize);
let frame = gst_video::VideoFrameRef::from_buffer_ref_readable(buffer, info)
@ -302,9 +312,9 @@ impl BaseSinkImpl for NdiSink {
"Sending video buffer {:?} with timecode {} and format {:?}",
buffer,
if timecode < 0 {
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE.display()
} else {
gst::ClockTime::from(timecode as u64 * 100)
Some(gst::ClockTime::from_nseconds(timecode as u64 * 100)).display()
},
info
);
@ -312,13 +322,16 @@ impl BaseSinkImpl for NdiSink {
}
} else if let Some(ref info) = state.audio_info {
let timecode = element
.get_segment()
.segment()
.downcast::<gst::ClockTime>()
.ok()
.and_then(|segment| {
*(segment.to_running_time(buffer.get_pts()) + element.get_base_time())
segment
.to_running_time(buffer.pts())
.zip(element.base_time())
})
.map(|time| (time / 100) as i64)
.and_then(|(running_time, base_time)| running_time.checked_add(base_time))
.map(|time| (time.nseconds() / 100) as i64)
.unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize);
let frame = crate::ndi::AudioFrame::try_from_interleaved_16s(info, buffer, timecode)
@ -333,9 +346,9 @@ impl BaseSinkImpl for NdiSink {
"Sending audio buffer {:?} with timecode {} and format {:?}",
buffer,
if timecode < 0 {
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE.display()
} else {
gst::ClockTime::from(timecode as u64 * 100)
Some(gst::ClockTime::from_nseconds(timecode as u64 * 100)).display()
},
info,
);
@ -347,12 +360,3 @@ impl BaseSinkImpl for NdiSink {
Ok(gst::FlowSuccess::Ok)
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisink",
gst::Rank::None,
NdiSink::get_type(),
)
}

19
src/ndisink/mod.rs Normal file
View file

@ -0,0 +1,19 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiSink(ObjectSubclass<imp::NdiSink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
}
unsafe impl Send for NdiSink {}
unsafe impl Sync for NdiSink {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisink",
gst::Rank::None,
NdiSink::static_type(),
)
}

View file

@ -1,5 +1,4 @@
use glib::prelude::*;
use glib::subclass;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
@ -7,6 +6,8 @@ use gst::{gst_debug, gst_error, gst_trace, gst_warning};
use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use once_cell::sync::Lazy;
use std::mem;
use std::sync::Mutex;
@ -27,28 +28,56 @@ struct State {
current_audio_buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>,
}
struct NdiSinkCombiner {
pub struct NdiSinkCombiner {
video_pad: gst_base::AggregatorPad,
audio_pad: Mutex<Option<gst_base::AggregatorPad>>,
state: Mutex<Option<State>>,
}
#[glib::object_subclass]
impl ObjectSubclass for NdiSinkCombiner {
const NAME: &'static str = "NdiSinkCombiner";
type Type = super::NdiSinkCombiner;
type ParentType = gst_base::Aggregator;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("video").unwrap();
let video_pad =
gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("video"))
.build();
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
Self {
video_pad,
audio_pad: Mutex::new(None),
state: Mutex::new(None),
}
}
}
impl ObjectImpl for NdiSinkCombiner {
fn constructed(&self, obj: &Self::Type) {
obj.add_pad(&self.video_pad).unwrap();
self.parent_constructed(obj);
}
}
impl ElementImpl for NdiSinkCombiner {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"NDI Sink Combiner",
"Combiner/Audio/Video",
"NDI sink audio/video combiner",
"Sebastian Dröge <sebastian@centricular.com>",
);
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder("video/x-raw")
.field(
"format",
@ -82,9 +111,8 @@ impl ObjectSubclass for NdiSinkCombiner {
gst_base::AggregatorPad::static_type(),
)
.unwrap();
klass.add_pad_template(src_pad_template);
let sink_pad_template = gst::PadTemplate::with_gtype(
let video_sink_pad_template = gst::PadTemplate::with_gtype(
"video",
gst::PadDirection::Sink,
gst::PadPresence::Always,
@ -92,7 +120,6 @@ impl ObjectSubclass for NdiSinkCombiner {
gst_base::AggregatorPad::static_type(),
)
.unwrap();
klass.add_pad_template(sink_pad_template);
let caps = gst::Caps::builder("audio/x-raw")
.field("format", &gst_audio::AUDIO_FORMAT_S16.to_str())
@ -100,7 +127,7 @@ impl ObjectSubclass for NdiSinkCombiner {
.field("channels", &gst::IntRange::<i32>::new(1, i32::MAX))
.field("layout", &"interleaved")
.build();
let sink_pad_template = gst::PadTemplate::with_gtype(
let audio_sink_pad_template = gst::PadTemplate::with_gtype(
"audio",
gst::PadDirection::Sink,
gst::PadPresence::Request,
@ -108,36 +135,17 @@ impl ObjectSubclass for NdiSinkCombiner {
gst_base::AggregatorPad::static_type(),
)
.unwrap();
klass.add_pad_template(sink_pad_template);
vec![
src_pad_template,
video_sink_pad_template,
audio_sink_pad_template,
]
});
PAD_TEMPLATES.as_ref()
}
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("video").unwrap();
let video_pad =
gst::PadBuilder::<gst_base::AggregatorPad>::from_template(&templ, Some("video"))
.build();
Self {
video_pad,
audio_pad: Mutex::new(None),
state: Mutex::new(None),
}
}
}
impl ObjectImpl for NdiSinkCombiner {
glib::glib_object_impl!();
fn constructed(&self, obj: &glib::Object) {
let element = obj.downcast_ref::<gst::Element>().unwrap();
element.add_pad(&self.video_pad).unwrap();
self.parent_constructed(obj);
}
}
impl ElementImpl for NdiSinkCombiner {
fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) {
fn release_pad(&self, element: &Self::Type, pad: &gst::Pad) {
let mut audio_pad_storage = self.audio_pad.lock().unwrap();
if audio_pad_storage.as_ref().map(|p| p.upcast_ref()) == Some(pad) {
@ -151,7 +159,7 @@ impl ElementImpl for NdiSinkCombiner {
impl AggregatorImpl for NdiSinkCombiner {
fn create_new_pad(
&self,
agg: &gst_base::Aggregator,
agg: &Self::Type,
templ: &gst::PadTemplate,
_req_name: Option<&str>,
_caps: Option<&gst::Caps>,
@ -163,7 +171,7 @@ impl AggregatorImpl for NdiSinkCombiner {
return None;
}
let sink_templ = agg.get_pad_template("audio").unwrap();
let sink_templ = agg.pad_template("audio").unwrap();
if templ != &sink_templ {
gst_error!(CAT, obj: agg, "Wrong pad template");
return None;
@ -178,7 +186,7 @@ impl AggregatorImpl for NdiSinkCombiner {
Some(pad)
}
fn start(&self, agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> {
fn start(&self, agg: &Self::Type) -> Result<(), gst::ErrorMessage> {
let mut state_storage = self.state.lock().unwrap();
*state_storage = Some(State {
audio_info: None,
@ -192,7 +200,7 @@ impl AggregatorImpl for NdiSinkCombiner {
Ok(())
}
fn stop(&self, agg: &gst_base::Aggregator) -> Result<(), gst::ErrorMessage> {
fn stop(&self, agg: &Self::Type) -> Result<(), gst::ErrorMessage> {
// Drop our state now
let _ = self.state.lock().unwrap().take();
@ -201,18 +209,18 @@ impl AggregatorImpl for NdiSinkCombiner {
Ok(())
}
fn get_next_time(&self, _agg: &gst_base::Aggregator) -> gst::ClockTime {
fn next_time(&self, _agg: &Self::Type) -> Option<gst::ClockTime> {
// FIXME: What to do here? We don't really know when the next buffer is expected
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE
}
fn clip(
&self,
agg: &gst_base::Aggregator,
agg: &Self::Type,
agg_pad: &gst_base::AggregatorPad,
mut buffer: gst::Buffer,
) -> Option<gst::Buffer> {
let segment = match agg_pad.get_segment().downcast::<gst::ClockTime>() {
let segment = match agg_pad.segment().downcast::<gst::ClockTime>() {
Ok(segment) => segment,
Err(_) => {
gst_error!(CAT, obj: agg, "Only TIME segments supported");
@ -220,25 +228,21 @@ impl AggregatorImpl for NdiSinkCombiner {
}
};
let pts = buffer.get_pts();
let pts = buffer.pts();
if pts.is_none() {
gst_error!(CAT, obj: agg, "Only buffers with PTS supported");
return Some(buffer);
}
let duration = if buffer.get_duration().is_some() {
buffer.get_duration()
} else {
gst::CLOCK_TIME_NONE
};
let duration = buffer.duration();
gst_trace!(
CAT,
obj: agg_pad,
"Clipping buffer {:?} with PTS {} and duration {}",
buffer,
pts,
duration
pts.display(),
duration.display(),
);
let state_storage = self.state.lock().unwrap();
@ -247,25 +251,21 @@ impl AggregatorImpl for NdiSinkCombiner {
None => return None,
};
let duration = if buffer.get_duration().is_some() {
buffer.get_duration()
let duration = if duration.is_some() {
duration
} else if let Some(ref audio_info) = state.audio_info {
gst::SECOND
.mul_div_floor(
buffer.get_size() as u64,
gst::ClockTime::SECOND.mul_div_floor(
buffer.size() as u64,
audio_info.rate() as u64 * audio_info.bpf() as u64,
)
.unwrap()
} else if let Some(ref video_info) = state.video_info {
if *video_info.fps().numer() > 0 {
gst::SECOND
.mul_div_floor(
gst::ClockTime::SECOND.mul_div_floor(
*video_info.fps().denom() as u64,
*video_info.fps().numer() as u64,
)
.unwrap()
} else {
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE
}
} else {
unreachable!()
@ -276,18 +276,23 @@ impl AggregatorImpl for NdiSinkCombiner {
obj: agg_pad,
"Clipping buffer {:?} with PTS {} and duration {}",
buffer,
pts,
duration
pts.display(),
duration.display(),
);
if agg_pad == &self.video_pad {
segment.clip(pts, pts + duration).map(|(start, stop)| {
let end_pts = pts
.zip(duration)
.and_then(|(pts, duration)| pts.checked_add(duration));
segment.clip(pts, end_pts).map(|(start, stop)| {
{
let buffer = buffer.make_mut();
buffer.set_pts(start);
if duration.is_some() {
buffer.set_duration(stop - start);
}
buffer.set_duration(
stop.zip(start)
.and_then(|(stop, start)| stop.checked_sub(start)),
);
}
buffer
@ -307,7 +312,7 @@ impl AggregatorImpl for NdiSinkCombiner {
fn aggregate(
&self,
agg: &gst_base::Aggregator,
agg: &Self::Type,
timeout: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
// FIXME: Can't really happen because we always return NONE from get_next_time() but that
@ -318,7 +323,7 @@ impl AggregatorImpl for NdiSinkCombiner {
// first try getting buffers from both pads here
let video_buffer_and_segment = match self.video_pad.peek_buffer() {
Some(video_buffer) => {
let video_segment = self.video_pad.get_segment();
let video_segment = self.video_pad.segment();
let video_segment = match video_segment.downcast::<gst::ClockTime>() {
Ok(video_segment) => video_segment,
Err(video_segment) => {
@ -326,7 +331,7 @@ impl AggregatorImpl for NdiSinkCombiner {
CAT,
obj: agg,
"Video segment of wrong format {:?}",
video_segment.get_format()
video_segment.format()
);
return Err(gst::FlowError::Error);
}
@ -344,14 +349,14 @@ impl AggregatorImpl for NdiSinkCombiner {
let audio_buffer_segment_and_pad;
if let Some(audio_pad) = self.audio_pad.lock().unwrap().clone() {
audio_buffer_segment_and_pad = match audio_pad.peek_buffer() {
Some(audio_buffer) if audio_buffer.get_size() == 0 => {
Some(audio_buffer) if audio_buffer.size() == 0 => {
// Skip empty/gap audio buffer
audio_pad.drop_buffer();
gst_trace!(CAT, obj: agg, "Empty audio buffer, waiting for next");
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
}
Some(audio_buffer) => {
let audio_segment = audio_pad.get_segment();
let audio_segment = audio_pad.segment();
let audio_segment = match audio_segment.downcast::<gst::ClockTime>() {
Ok(audio_segment) => audio_segment,
Err(audio_segment) => {
@ -359,7 +364,7 @@ impl AggregatorImpl for NdiSinkCombiner {
CAT,
obj: agg,
"Audio segment of wrong format {:?}",
audio_segment.get_format()
audio_segment.format()
);
return Err(gst::FlowError::Error);
}
@ -385,8 +390,7 @@ impl AggregatorImpl for NdiSinkCombiner {
let (mut current_video_buffer, current_video_running_time_end, next_video_buffer) =
if let Some((video_buffer, video_segment)) = video_buffer_and_segment {
let video_running_time = video_segment.to_running_time(video_buffer.get_pts());
assert!(video_running_time.is_some());
let video_running_time = video_segment.to_running_time(video_buffer.pts()).unwrap();
match state.current_video_buffer {
None => {
@ -398,7 +402,7 @@ impl AggregatorImpl for NdiSinkCombiner {
}
Some((ref buffer, _)) => (
buffer.clone(),
video_running_time,
Some(video_running_time),
Some((video_buffer, video_running_time)),
),
}
@ -416,10 +420,9 @@ impl AggregatorImpl for NdiSinkCombiner {
// Create an empty dummy buffer for attaching the audio. This is going to
// be dropped by the sink later.
let audio_running_time =
audio_segment.to_running_time(audio_buffer.get_pts());
assert!(audio_running_time.is_some());
audio_segment.to_running_time(audio_buffer.pts()).unwrap();
let video_segment = self.video_pad.get_segment();
let video_segment = self.video_pad.segment();
let video_segment = match video_segment.downcast::<gst::ClockTime>() {
Ok(video_segment) => video_segment,
Err(video_segment) => {
@ -427,7 +430,7 @@ impl AggregatorImpl for NdiSinkCombiner {
CAT,
obj: agg,
"Video segment of wrong format {:?}",
video_segment.get_format()
video_segment.format()
);
return Err(gst::FlowError::Error);
}
@ -445,9 +448,9 @@ impl AggregatorImpl for NdiSinkCombiner {
buffer.set_pts(video_pts);
}
(buffer, gst::CLOCK_TIME_NONE, None)
(buffer, gst::ClockTime::NONE, None)
}
(Some((ref buffer, _)), _) => (buffer.clone(), gst::CLOCK_TIME_NONE, None),
(Some((ref buffer, _)), _) => (buffer.clone(), gst::ClockTime::NONE, None),
}
};
@ -460,22 +463,26 @@ impl AggregatorImpl for NdiSinkCombiner {
}
};
let audio_running_time = audio_segment.to_running_time(audio_buffer.get_pts());
assert!(audio_running_time.is_some());
let duration = gst::SECOND
.mul_div_floor(
audio_buffer.get_size() as u64 / audio_info.bpf() as u64,
let audio_running_time = audio_segment.to_running_time(audio_buffer.pts());
let duration = gst::ClockTime::SECOND.mul_div_floor(
audio_buffer.size() as u64 / audio_info.bpf() as u64,
audio_info.rate() as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
let audio_running_time_end = audio_running_time + duration;
assert!(audio_running_time_end.is_some());
);
let audio_running_time_end = audio_running_time
.zip(duration)
.and_then(|(running_time, duration)| running_time.checked_add(duration));
if audio_running_time_end <= current_video_running_time_end
|| current_video_running_time_end.is_none()
if audio_running_time_end
.zip(current_video_running_time_end)
.map(|(audio, video)| audio <= video)
.unwrap_or(true)
{
let timecode = (audio_running_time + agg.get_base_time())
.map(|t| (t / 100) as i64)
let timecode = agg
.base_time()
.zip(audio_running_time)
.map(|(base_time, audio_running_time)| {
((base_time.nseconds() + audio_running_time.nseconds()) / 100) as i64
})
.unwrap_or(crate::ndisys::NDIlib_send_timecode_synthesize);
gst_trace!(
@ -484,8 +491,8 @@ impl AggregatorImpl for NdiSinkCombiner {
"Including audio buffer {:?} with timecode {}: {} <= {}",
audio_buffer,
timecode,
audio_running_time_end,
current_video_running_time_end,
audio_running_time_end.display(),
current_video_running_time_end.display(),
);
state
.current_audio_buffers
@ -503,7 +510,7 @@ impl AggregatorImpl for NdiSinkCombiner {
// far
}
let audio_buffers = mem::replace(&mut state.current_audio_buffers, Vec::new());
let audio_buffers = mem::take(&mut state.current_audio_buffers);
if !audio_buffers.is_empty() {
let current_video_buffer = current_video_buffer.make_mut();
@ -530,7 +537,7 @@ impl AggregatorImpl for NdiSinkCombiner {
fn sink_event(
&self,
agg: &gst_base::Aggregator,
agg: &Self::Type,
pad: &gst_base::AggregatorPad,
event: gst::Event,
) -> bool {
@ -538,7 +545,7 @@ impl AggregatorImpl for NdiSinkCombiner {
match event.view() {
EventView::Caps(caps) => {
let caps = caps.get_caps_owned();
let caps = caps.caps_owned();
let mut state_storage = self.state.lock().unwrap();
let state = match &mut *state_storage {
@ -558,22 +565,22 @@ impl AggregatorImpl for NdiSinkCombiner {
// 2 frames latency because we queue 1 frame and wait until audio
// up to the end of that frame has arrived.
let latency = if *info.fps().numer() > 0 {
gst::SECOND
gst::ClockTime::SECOND
.mul_div_floor(
2 * *info.fps().denom() as u64,
*info.fps().numer() as u64,
)
.unwrap_or(80 * gst::MSECOND)
.unwrap_or(80 * gst::ClockTime::MSECOND)
} else {
// let's assume 25fps and 2 frames latency
80 * gst::MSECOND
80 * gst::ClockTime::MSECOND
};
state.video_info = Some(info);
drop(state_storage);
agg.set_latency(latency, gst::CLOCK_TIME_NONE);
agg.set_latency(latency, gst::ClockTime::NONE);
// The video caps are passed through as the audio is included only in a meta
agg.set_src_caps(&caps);
@ -591,7 +598,7 @@ impl AggregatorImpl for NdiSinkCombiner {
}
// The video segment is passed through as-is and the video timestamps are preserved
EventView::Segment(segment) if pad == &self.video_pad => {
let segment = segment.get_segment();
let segment = segment.segment();
gst_debug!(CAT, obj: agg, "Updating segment {:?}", segment);
agg.update_segment(segment);
}
@ -603,7 +610,7 @@ impl AggregatorImpl for NdiSinkCombiner {
fn sink_query(
&self,
agg: &gst_base::Aggregator,
agg: &Self::Type,
pad: &gst_base::AggregatorPad,
query: &mut gst::QueryRef,
) -> bool {
@ -612,7 +619,7 @@ impl AggregatorImpl for NdiSinkCombiner {
match query.view_mut() {
QueryView::Caps(_) if pad == &self.video_pad => {
// Directly forward caps queries
let srcpad = agg.get_static_pad("src").unwrap();
let srcpad = agg.static_pad("src").unwrap();
return srcpad.peer_query(query);
}
_ => (),
@ -621,17 +628,8 @@ impl AggregatorImpl for NdiSinkCombiner {
self.parent_sink_query(agg, pad, query)
}
fn negotiate(&self, _agg: &gst_base::Aggregator) -> bool {
fn negotiate(&self, _agg: &Self::Type) -> bool {
// No negotiation needed as the video caps are just passed through
true
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisinkcombiner",
gst::Rank::None,
NdiSinkCombiner::get_type(),
)
}

View file

@ -0,0 +1,19 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiSinkCombiner(ObjectSubclass<imp::NdiSinkCombiner>) @extends gst_base::Aggregator, gst::Element, gst::Object;
}
unsafe impl Send for NdiSinkCombiner {}
unsafe impl Sync for NdiSinkCombiner {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndisinkcombiner",
gst::Rank::None,
NdiSinkCombiner::static_type(),
)
}

View file

@ -1,4 +1,3 @@
use gst::gst_sys;
use gst::prelude::*;
use std::fmt;
use std::mem;
@ -19,10 +18,10 @@ impl NdiSinkAudioMeta {
// content of the struct
let mut params = mem::ManuallyDrop::new(imp::NdiSinkAudioMetaParams { buffers });
let meta = gst_sys::gst_buffer_add_meta(
let meta = gst::ffi::gst_buffer_add_meta(
buffer.as_mut_ptr(),
imp::ndi_sink_audio_meta_get_info(),
&mut *params as *mut imp::NdiSinkAudioMetaParams as glib::glib_sys::gpointer,
&mut *params as *mut imp::NdiSinkAudioMetaParams as glib::ffi::gpointer,
) as *mut imp::NdiSinkAudioMeta;
Self::from_mut_ptr(buffer, meta)
@ -37,7 +36,7 @@ impl NdiSinkAudioMeta {
unsafe impl MetaAPI for NdiSinkAudioMeta {
type GstType = imp::NdiSinkAudioMeta;
fn get_meta_api() -> glib::Type {
fn meta_api() -> glib::Type {
imp::ndi_sink_audio_meta_api_get_type()
}
}
@ -51,9 +50,7 @@ impl fmt::Debug for NdiSinkAudioMeta {
}
mod imp {
use glib::glib_sys;
use glib::translate::*;
use gst::gst_sys;
use once_cell::sync::Lazy;
use std::mem;
use std::ptr;
@ -64,18 +61,18 @@ mod imp {
#[repr(C)]
pub struct NdiSinkAudioMeta {
parent: gst_sys::GstMeta,
parent: gst::ffi::GstMeta,
pub(super) buffers: Vec<(gst::Buffer, gst_audio::AudioInfo, i64)>,
}
pub(super) fn ndi_sink_audio_meta_api_get_type() -> glib::Type {
static TYPE: Lazy<glib::Type> = Lazy::new(|| unsafe {
let t = from_glib(gst_sys::gst_meta_api_type_register(
let t = from_glib(gst::ffi::gst_meta_api_type_register(
b"GstNdiSinkAudioMetaAPI\0".as_ptr() as *const _,
[ptr::null::<std::os::raw::c_char>()].as_ptr() as *mut *const _,
));
assert_ne!(t, glib::Type::Invalid);
assert_ne!(t, glib::Type::INVALID);
t
});
@ -84,10 +81,10 @@ mod imp {
}
unsafe extern "C" fn ndi_sink_audio_meta_init(
meta: *mut gst_sys::GstMeta,
params: glib_sys::gpointer,
_buffer: *mut gst_sys::GstBuffer,
) -> glib_sys::gboolean {
meta: *mut gst::ffi::GstMeta,
params: glib::ffi::gpointer,
_buffer: *mut gst::ffi::GstBuffer,
) -> glib::ffi::gboolean {
assert!(!params.is_null());
let meta = &mut *(meta as *mut NdiSinkAudioMeta);
@ -95,12 +92,12 @@ mod imp {
ptr::write(&mut meta.buffers, params.buffers);
true.to_glib()
true.into_glib()
}
unsafe extern "C" fn ndi_sink_audio_meta_free(
meta: *mut gst_sys::GstMeta,
_buffer: *mut gst_sys::GstBuffer,
meta: *mut gst::ffi::GstMeta,
_buffer: *mut gst::ffi::GstBuffer,
) {
let meta = &mut *(meta as *mut NdiSinkAudioMeta);
@ -108,34 +105,34 @@ mod imp {
}
unsafe extern "C" fn ndi_sink_audio_meta_transform(
dest: *mut gst_sys::GstBuffer,
meta: *mut gst_sys::GstMeta,
_buffer: *mut gst_sys::GstBuffer,
_type_: glib_sys::GQuark,
_data: glib_sys::gpointer,
) -> glib_sys::gboolean {
dest: *mut gst::ffi::GstBuffer,
meta: *mut gst::ffi::GstMeta,
_buffer: *mut gst::ffi::GstBuffer,
_type_: glib::ffi::GQuark,
_data: glib::ffi::gpointer,
) -> glib::ffi::gboolean {
let meta = &*(meta as *mut NdiSinkAudioMeta);
super::NdiSinkAudioMeta::add(gst::BufferRef::from_mut_ptr(dest), meta.buffers.clone());
true.to_glib()
true.into_glib()
}
pub(super) fn ndi_sink_audio_meta_get_info() -> *const gst_sys::GstMetaInfo {
struct MetaInfo(ptr::NonNull<gst_sys::GstMetaInfo>);
pub(super) fn ndi_sink_audio_meta_get_info() -> *const gst::ffi::GstMetaInfo {
struct MetaInfo(ptr::NonNull<gst::ffi::GstMetaInfo>);
unsafe impl Send for MetaInfo {}
unsafe impl Sync for MetaInfo {}
static META_INFO: Lazy<MetaInfo> = Lazy::new(|| unsafe {
MetaInfo(
ptr::NonNull::new(gst_sys::gst_meta_register(
ndi_sink_audio_meta_api_get_type().to_glib(),
ptr::NonNull::new(gst::ffi::gst_meta_register(
ndi_sink_audio_meta_api_get_type().into_glib(),
b"GstNdiSinkAudioMeta\0".as_ptr() as *const _,
mem::size_of::<NdiSinkAudioMeta>(),
Some(ndi_sink_audio_meta_init),
Some(ndi_sink_audio_meta_free),
Some(ndi_sink_audio_meta_transform),
) as *mut gst_sys::GstMetaInfo)
) as *mut gst::ffi::GstMetaInfo)
.expect("Failed to register meta API"),
)
});

View file

@ -1,7 +1,6 @@
use glib::subclass;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_error_msg};
use gst::{gst_debug, gst_error};
use gst_base::prelude::*;
use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*;
@ -9,6 +8,8 @@ use gst_base::subclass::prelude::*;
use std::sync::Mutex;
use std::{i32, u32};
use once_cell::sync::Lazy;
use crate::ndisys;
use crate::connect_ndi;
@ -47,93 +48,9 @@ impl Default for Settings {
}
}
static PROPERTIES: [subclass::Property; 8] = [
subclass::Property("ndi-name", |name| {
glib::ParamSpec::string(
name,
"NDI Name",
"NDI stream name of the sender",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("url-address", |name| {
glib::ParamSpec::string(
name,
"URL/Address",
"URL/address and port of the sender, e.g. 127.0.0.1:5961",
None,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("receiver-ndi-name", |name| {
glib::ParamSpec::string(
name,
"Receiver NDI Name",
"NDI stream name of this receiver",
Some(&*DEFAULT_RECEIVER_NDI_NAME),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("connect-timeout", |name| {
glib::ParamSpec::uint(
name,
"Connect Timeout",
"Connection timeout in ms",
0,
u32::MAX,
10000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timeout", |name| {
glib::ParamSpec::uint(
name,
"Timeout",
"Receive timeout in ms",
0,
u32::MAX,
5000,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("max-queue-length", |name| {
glib::ParamSpec::uint(
name,
"Max Queue Length",
"Maximum receive queue length",
0,
u32::MAX,
5,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("bandwidth", |name| {
glib::ParamSpec::int(
name,
"Bandwidth",
"Bandwidth, -10 metadata-only, 10 audio-only, 100 highest",
-10,
100,
100,
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timestamp-mode", |name| {
glib::ParamSpec::enum_(
name,
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTimeTimecode as i32,
glib::ParamFlags::READWRITE,
)
}),
];
struct State {
info: Option<gst_video::VideoInfo>,
current_latency: gst::ClockTime,
current_latency: Option<gst::ClockTime>,
receiver: Option<Receiver<VideoReceiver>>,
}
@ -141,26 +58,24 @@ impl Default for State {
fn default() -> State {
State {
info: None,
current_latency: gst::CLOCK_TIME_NONE,
current_latency: gst::ClockTime::NONE,
receiver: None,
}
}
}
pub(crate) struct NdiVideoSrc {
pub struct NdiVideoSrc {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
state: Mutex<State>,
receiver_controller: Mutex<Option<ReceiverControlHandle<VideoReceiver>>>,
}
#[glib::object_subclass]
impl ObjectSubclass for NdiVideoSrc {
const NAME: &'static str = "NdiVideoSrc";
type Type = super::NdiVideoSrc;
type ParentType = gst_base::BaseSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn new() -> Self {
Self {
@ -174,15 +89,259 @@ impl ObjectSubclass for NdiVideoSrc {
receiver_controller: Mutex::new(None),
}
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
impl ObjectImpl for NdiVideoSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpec::new_string(
"ndi-name",
"NDI Name",
"NDI stream name of the sender",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"url-address",
"URL/Address",
"URL/address and port of the sender, e.g. 127.0.0.1:5961",
None,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_string(
"receiver-ndi-name",
"Receiver NDI Name",
"NDI stream name of this receiver",
Some(&*DEFAULT_RECEIVER_NDI_NAME),
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"connect-timeout",
"Connect Timeout",
"Connection timeout in ms",
0,
u32::MAX,
10000,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"timeout",
"Timeout",
"Receive timeout in ms",
0,
u32::MAX,
5000,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_uint(
"max-queue-length",
"Max Queue Length",
"Maximum receive queue length",
0,
u32::MAX,
5,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_int(
"bandwidth",
"Bandwidth",
"Bandwidth, -10 metadata-only, 10 audio-only, 100 highest",
-10,
100,
100,
glib::ParamFlags::READWRITE,
),
glib::ParamSpec::new_enum(
"timestamp-mode",
"Timestamp Mode",
"Timestamp information to use for outgoing PTS",
TimestampMode::static_type(),
TimestampMode::ReceiveTimeTimecode as i32,
glib::ParamFlags::READWRITE,
),
]
});
PROPERTIES.as_ref()
}
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
// Initialize live-ness and notify the base class that
// we'd like to operate in Time format
obj.set_live(true);
obj.set_format(gst::Format::Time);
}
fn set_property(
&self,
obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() {
"ndi-name" => {
let mut settings = self.settings.lock().unwrap();
let ndi_name = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing ndi-name from {:?} to {:?}",
settings.ndi_name,
ndi_name,
);
settings.ndi_name = ndi_name;
}
"url-address" => {
let mut settings = self.settings.lock().unwrap();
let url_address = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing url-address from {:?} to {:?}",
settings.url_address,
url_address,
);
settings.url_address = url_address;
}
"receiver-ndi-name" => {
let mut settings = self.settings.lock().unwrap();
let receiver_ndi_name = value.get::<Option<String>>().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing receiver-ndi-name from {:?} to {:?}",
settings.receiver_ndi_name,
receiver_ndi_name,
);
settings.receiver_ndi_name =
receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone());
}
"connect-timeout" => {
let mut settings = self.settings.lock().unwrap();
let connect_timeout = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing connect-timeout from {} to {}",
settings.connect_timeout,
connect_timeout,
);
settings.connect_timeout = connect_timeout;
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing timeout from {} to {}",
settings.timeout,
timeout,
);
settings.timeout = timeout;
}
"max-queue-length" => {
let mut settings = self.settings.lock().unwrap();
let max_queue_length = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing max-queue-length from {} to {}",
settings.max_queue_length,
max_queue_length,
);
settings.max_queue_length = max_queue_length;
}
"bandwidth" => {
let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing bandwidth from {} to {}",
settings.bandwidth,
bandwidth,
);
settings.bandwidth = bandwidth;
}
"timestamp-mode" => {
let mut settings = self.settings.lock().unwrap();
let timestamp_mode = value.get().unwrap();
gst_debug!(
self.cat,
obj: obj,
"Changing timestamp mode from {:?} to {:?}",
settings.timestamp_mode,
timestamp_mode
);
if settings.timestamp_mode != timestamp_mode {
let _ = obj.post_message(gst::message::Latency::builder().src(obj).build());
}
settings.timestamp_mode = timestamp_mode;
}
_ => unimplemented!(),
}
}
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"ndi-name" => {
let settings = self.settings.lock().unwrap();
settings.ndi_name.to_value()
}
"url-address" => {
let settings = self.settings.lock().unwrap();
settings.url_address.to_value()
}
"receiver-ndi-name" => {
let settings = self.settings.lock().unwrap();
settings.receiver_ndi_name.to_value()
}
"connect-timeout" => {
let settings = self.settings.lock().unwrap();
settings.connect_timeout.to_value()
}
"timeout" => {
let settings = self.settings.lock().unwrap();
settings.timeout.to_value()
}
"max-queue-length" => {
let settings = self.settings.lock().unwrap();
settings.max_queue_length.to_value()
}
"bandwidth" => {
let settings = self.settings.lock().unwrap();
settings.bandwidth.to_value()
}
"timestamp-mode" => {
let settings = self.settings.lock().unwrap();
settings.timestamp_mode.to_value()
}
_ => unimplemented!(),
}
}
}
impl ElementImpl for NdiVideoSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"NewTek NDI Video Source",
"Source",
"NewTek NDI video source",
"Ruben Gonzalez <rubenrua@teltek.es>, Daniel Vilar <daniel.peiteado@teltek.es>, Sebastian Dröge <sebastian@centricular.com>",
);
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
// On the src pad, we can produce F32/F64 with any sample rate
// and any number of channels
let caps = gst::Caps::new_simple(
@ -237,180 +396,16 @@ impl ObjectSubclass for NdiVideoSrc {
&caps,
)
.unwrap();
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
vec![src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
impl ObjectImpl for NdiVideoSrc {
glib::glib_object_impl!();
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
// Initialize live-ness and notify the base class that
// we'd like to operate in Time format
basesrc.set_live(true);
basesrc.set_format(gst::Format::Time);
}
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
match *prop {
subclass::Property("ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
let ndi_name = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing ndi-name from {:?} to {:?}",
settings.ndi_name,
ndi_name,
);
settings.ndi_name = ndi_name;
}
subclass::Property("url-address", ..) => {
let mut settings = self.settings.lock().unwrap();
let url_address = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing url-address from {:?} to {:?}",
settings.url_address,
url_address,
);
settings.url_address = url_address;
}
subclass::Property("receiver-ndi-name", ..) => {
let mut settings = self.settings.lock().unwrap();
let receiver_ndi_name = value.get().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing receiver-ndi-name from {:?} to {:?}",
settings.receiver_ndi_name,
receiver_ndi_name,
);
settings.receiver_ndi_name =
receiver_ndi_name.unwrap_or_else(|| DEFAULT_RECEIVER_NDI_NAME.clone());
}
subclass::Property("connect-timeout", ..) => {
let mut settings = self.settings.lock().unwrap();
let connect_timeout = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing connect-timeout from {} to {}",
settings.connect_timeout,
connect_timeout,
);
settings.connect_timeout = connect_timeout;
}
subclass::Property("timeout", ..) => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing timeout from {} to {}",
settings.timeout,
timeout,
);
settings.timeout = timeout;
}
subclass::Property("max-queue-length", ..) => {
let mut settings = self.settings.lock().unwrap();
let max_queue_length = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing max-queue-length from {} to {}",
settings.max_queue_length,
max_queue_length,
);
settings.max_queue_length = max_queue_length;
}
subclass::Property("bandwidth", ..) => {
let mut settings = self.settings.lock().unwrap();
let bandwidth = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing bandwidth from {} to {}",
settings.bandwidth,
bandwidth,
);
settings.bandwidth = bandwidth;
}
subclass::Property("timestamp-mode", ..) => {
let mut settings = self.settings.lock().unwrap();
let timestamp_mode = value.get_some().unwrap();
gst_debug!(
self.cat,
obj: basesrc,
"Changing timestamp mode from {:?} to {:?}",
settings.timestamp_mode,
timestamp_mode
);
if settings.timestamp_mode != timestamp_mode {
let _ =
basesrc.post_message(gst::message::Latency::builder().src(basesrc).build());
}
settings.timestamp_mode = timestamp_mode;
}
_ => unimplemented!(),
}
}
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.ndi_name.to_value())
}
subclass::Property("url-address", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.url_address.to_value())
}
subclass::Property("receiver-ndi-name", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.receiver_ndi_name.to_value())
}
subclass::Property("connect-timeout", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.connect_timeout.to_value())
}
subclass::Property("timeout", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.timeout.to_value())
}
subclass::Property("max-queue-length", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.max_queue_length.to_value())
}
subclass::Property("bandwidth", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.bandwidth.to_value())
}
subclass::Property("timestamp-mode", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.timestamp_mode.to_value())
}
_ => unimplemented!(),
}
}
}
impl ElementImpl for NdiVideoSrc {
fn change_state(
&self,
element: &gst::Element,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
@ -437,13 +432,13 @@ impl ElementImpl for NdiVideoSrc {
}
impl BaseSrcImpl for NdiVideoSrc {
fn negotiate(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::LoggableError> {
fn negotiate(&self, _element: &Self::Type) -> Result<(), gst::LoggableError> {
// Always succeed here without doing anything: we will set the caps once we received a
// buffer, there's nothing we can negotiate
Ok(())
}
fn unlock(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn unlock(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(true);
@ -451,7 +446,7 @@ impl BaseSrcImpl for NdiVideoSrc {
Ok(())
}
fn unlock_stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn unlock_stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
gst_debug!(self.cat, obj: element, "Stop unlocking",);
if let Some(ref controller) = *self.receiver_controller.lock().unwrap() {
controller.set_flushing(false);
@ -459,12 +454,12 @@ impl BaseSrcImpl for NdiVideoSrc {
Ok(())
}
fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn start(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
*self.state.lock().unwrap() = Default::default();
let settings = self.settings.lock().unwrap().clone();
if settings.ndi_name.is_none() && settings.url_address.is_none() {
return Err(gst_error_msg!(
return Err(gst::error_msg!(
gst::LibraryError::Settings,
["No NDI name or URL/address given"]
));
@ -472,7 +467,7 @@ impl BaseSrcImpl for NdiVideoSrc {
let receiver = connect_ndi(
self.cat,
element,
element.upcast_ref(),
settings.ndi_name.as_deref(),
settings.url_address.as_deref(),
&settings.receiver_ndi_name,
@ -485,7 +480,7 @@ impl BaseSrcImpl for NdiVideoSrc {
// settings.id_receiver exists
match receiver {
None => Err(gst_error_msg!(
None => Err(gst::error_msg!(
gst::ResourceError::NotFound,
["Could not connect to this source"]
)),
@ -500,7 +495,7 @@ impl BaseSrcImpl for NdiVideoSrc {
}
}
fn stop(&self, _element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
fn stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
if let Some(ref controller) = self.receiver_controller.lock().unwrap().take() {
controller.shutdown();
}
@ -508,7 +503,7 @@ impl BaseSrcImpl for NdiVideoSrc {
Ok(())
}
fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool {
fn query(&self, element: &Self::Type, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
match query.view_mut() {
@ -521,14 +516,14 @@ impl BaseSrcImpl for NdiVideoSrc {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
if state.current_latency.is_some() {
if let Some(latency) = state.current_latency {
let min = if settings.timestamp_mode != TimestampMode::Timecode {
state.current_latency
latency
} else {
0.into()
gst::ClockTime::ZERO
};
let max = 5 * state.current_latency;
let max = 5 * latency;
println!("Returning latency min {} max {}", min, max,);
@ -549,11 +544,11 @@ impl BaseSrcImpl for NdiVideoSrc {
}
}
fn fixate(&self, element: &gst_base::BaseSrc, mut caps: gst::Caps) -> gst::Caps {
fn fixate(&self, element: &Self::Type, mut caps: gst::Caps) -> gst::Caps {
caps.truncate();
{
let caps = caps.make_mut();
let s = caps.get_mut_structure(0).unwrap();
let s = caps.structure_mut(0).unwrap();
s.fixate_field_nearest_int("width", 1920);
s.fixate_field_nearest_int("height", 1080);
if s.has_field("pixel-aspect-ratio") {
@ -567,7 +562,7 @@ impl BaseSrcImpl for NdiVideoSrc {
//Creates the video buffers
fn create(
&self,
element: &gst_base::BaseSrc,
element: &Self::Type,
_offset: u64,
_buffer: Option<&mut gst::BufferRef>,
_length: u32,
@ -589,7 +584,7 @@ impl BaseSrcImpl for NdiVideoSrc {
state.receiver = Some(recv);
if state.info.as_ref() != Some(&info) {
let caps = info.to_caps().map_err(|_| {
gst_element_error!(
gst::element_error!(
element,
gst::ResourceError::Settings,
["Invalid audio info received: {:?}", info]
@ -597,11 +592,11 @@ impl BaseSrcImpl for NdiVideoSrc {
gst::FlowError::NotNegotiated
})?;
state.info = Some(info);
state.current_latency = buffer.get_duration();
state.current_latency = buffer.duration();
drop(state);
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element.set_caps(&caps).map_err(|_| {
gst_element_error!(
gst::element_error!(
element,
gst::CoreError::Negotiation,
["Failed to negotiate caps: {:?}", caps]
@ -621,12 +616,3 @@ impl BaseSrcImpl for NdiVideoSrc {
}
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndivideosrc",
gst::Rank::None,
NdiVideoSrc::get_type(),
)
}

19
src/ndivideosrc/mod.rs Normal file
View file

@ -0,0 +1,19 @@
use glib::prelude::*;
mod imp;
glib::wrapper! {
pub struct NdiVideoSrc(ObjectSubclass<imp::NdiVideoSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
}
unsafe impl Send for NdiVideoSrc {}
unsafe impl Sync for NdiVideoSrc {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"ndivideosrc",
gst::Rank::None,
NdiVideoSrc::static_type(),
)
}

View file

@ -1,6 +1,6 @@
use glib::prelude::*;
use gst::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_log, gst_warning};
use gst::{gst_debug, gst_error, gst_log, gst_warning};
use gst_video::prelude::*;
use byte_slice_cast::AsMutSliceOf;
@ -167,15 +167,14 @@ impl Observations {
&self,
cat: gst::DebugCategory,
element: &gst_base::BaseSrc,
time: (gst::ClockTime, gst::ClockTime),
duration: gst::ClockTime,
) -> (gst::ClockTime, gst::ClockTime) {
assert!(time.1.is_some());
time: (Option<gst::ClockTime>, gst::ClockTime),
duration: Option<gst::ClockTime>,
) -> (gst::ClockTime, Option<gst::ClockTime>) {
if time.0.is_none() {
return (time.1, duration);
}
let time = (time.0.unwrap(), time.1.unwrap());
let time = (time.0.unwrap(), time.1);
let mut inner = self.0.lock().unwrap();
let ObservationsInner {
@ -190,8 +189,8 @@ impl Observations {
} = *inner;
if values.is_empty() {
current_mapping.xbase = time.0;
current_mapping.b = time.1;
current_mapping.xbase = time.0.nseconds();
current_mapping.b = time.1.nseconds();
current_mapping.num = 1;
current_mapping.den = 1;
}
@ -207,7 +206,9 @@ impl Observations {
// Start by first updating every frame, then every second frame, then every third
// frame, etc. until we update once every quarter second
let framerate = (gst::SECOND / duration).unwrap_or(25) as usize;
let framerate = gst::ClockTime::SECOND
.checked_div(duration.unwrap_or(40 * gst::ClockTime::MSECOND).nseconds())
.unwrap_or(25) as usize;
if *skip_period < framerate / 4 + 1 {
*skip_period += 1;
@ -221,7 +222,7 @@ impl Observations {
if values.len() == WINDOW_LENGTH {
values.remove(0);
}
values.push(time);
values.push((time.0.nseconds(), time.1.nseconds()));
if let Some((num, den, b, xbase, r_squared)) =
gst::calculate_linear_regression(values, Some(values_tmp))
@ -236,8 +237,8 @@ impl Observations {
obj: element,
"Calculated new time mapping: GStreamer time = {} * (NDI time - {}) + {} ({})",
next_mapping.num as f64 / next_mapping.den as f64,
gst::ClockTime::from(next_mapping.xbase),
gst::ClockTime::from(next_mapping.b),
gst::ClockTime::from_nseconds(next_mapping.xbase),
gst::ClockTime::from_nseconds(next_mapping.b),
r_squared,
);
}
@ -250,21 +251,20 @@ impl Observations {
if *time_mapping_pending {
let expected = gst::Clock::adjust_with_calibration(
time.0.into(),
current_mapping.xbase.into(),
current_mapping.b.into(),
current_mapping.num.into(),
current_mapping.den.into(),
time.0,
gst::ClockTime::from_nseconds(current_mapping.xbase),
gst::ClockTime::from_nseconds(current_mapping.b),
gst::ClockTime::from_nseconds(current_mapping.num),
gst::ClockTime::from_nseconds(current_mapping.den),
);
let new_calculated = gst::Clock::adjust_with_calibration(
time.0.into(),
next_mapping.xbase.into(),
next_mapping.b.into(),
next_mapping.num.into(),
next_mapping.den.into(),
time.0,
gst::ClockTime::from_nseconds(next_mapping.xbase),
gst::ClockTime::from_nseconds(next_mapping.b),
gst::ClockTime::from_nseconds(next_mapping.num),
gst::ClockTime::from_nseconds(next_mapping.den),
);
if let (Some(expected), Some(new_calculated)) = (*expected, *new_calculated) {
let diff = if new_calculated > expected {
new_calculated - expected
} else {
@ -273,8 +273,8 @@ impl Observations {
// Allow at most 5% frame duration or 2ms difference per frame
let max_diff = cmp::max(
(duration / 10).unwrap_or(2 * gst::MSECOND_VAL),
2 * gst::MSECOND_VAL,
(duration.map(|d| d / 10)).unwrap_or(2 * gst::ClockTime::MSECOND),
2 * gst::ClockTime::MSECOND,
);
if diff > max_diff {
@ -282,49 +282,41 @@ impl Observations {
cat,
obj: element,
"New time mapping causes difference {} but only {} allowed",
gst::ClockTime::from(diff),
gst::ClockTime::from(max_diff),
diff,
max_diff,
);
if new_calculated > expected {
current_mapping.b = expected + max_diff;
current_mapping.xbase = time.0;
current_mapping.b = (expected + max_diff).nseconds();
current_mapping.xbase = time.0.nseconds();
} else {
current_mapping.b = expected - max_diff;
current_mapping.xbase = time.0;
current_mapping.b = (expected - max_diff).nseconds();
current_mapping.xbase = time.0.nseconds();
}
} else {
*current_mapping = *next_mapping;
}
} else {
gst_warning!(
cat,
obj: element,
"Failed to calculate timestamps based on new mapping",
);
}
}
let converted_timestamp = gst::Clock::adjust_with_calibration(
time.0.into(),
current_mapping.xbase.into(),
current_mapping.b.into(),
current_mapping.num.into(),
current_mapping.den.into(),
time.0,
gst::ClockTime::from_nseconds(current_mapping.xbase),
gst::ClockTime::from_nseconds(current_mapping.b),
gst::ClockTime::from_nseconds(current_mapping.num),
gst::ClockTime::from_nseconds(current_mapping.den),
);
let converted_duration = duration
.mul_div_floor(current_mapping.num, current_mapping.den)
.unwrap_or(gst::CLOCK_TIME_NONE);
let converted_duration =
duration.and_then(|d| d.mul_div_floor(current_mapping.num, current_mapping.den));
gst_debug!(
cat,
obj: element,
"Converted timestamp {}/{} to {}, duration {} to {}",
gst::ClockTime::from(time.0),
gst::ClockTime::from(time.1),
converted_timestamp,
duration,
converted_duration,
time.0,
time.1,
converted_timestamp.display(),
duration.display(),
converted_duration.display(),
);
(converted_timestamp, converted_duration)
@ -422,7 +414,7 @@ impl<T: ReceiverType> Receiver<T> {
Err(_) => {
if let Some(receiver) = weak.upgrade().map(Receiver) {
if let Some(element) = receiver.0.element.upgrade() {
gst_element_error!(
gst::element_error!(
element,
gst::LibraryError::Failed,
["Panic while connecting to NDI source"]
@ -558,7 +550,7 @@ where
if (receiver.video.is_some() || !T::IS_VIDEO)
&& (receiver.audio.is_some() || T::IS_VIDEO)
{
gst_element_error!(
gst::element_error!(
element,
gst::ResourceError::OpenRead,
[
@ -595,14 +587,14 @@ where
// FIXME: Ideally we would use NDIlib_recv_color_format_fastest here but that seems to be
// broken with interlaced content currently
let recv = RecvInstance::builder(ndi_name, url_address, &receiver_ndi_name)
let recv = RecvInstance::builder(ndi_name, url_address, receiver_ndi_name)
.bandwidth(bandwidth)
.color_format(NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA)
.allow_video_fields(true)
.build();
let recv = match recv {
None => {
gst_element_error!(
gst::element_error!(
element,
gst::CoreError::Negotiation,
["Failed to connect to source"]
@ -785,44 +777,42 @@ impl<T: ReceiverType> Receiver<T> {
element: &gst_base::BaseSrc,
timestamp: i64,
timecode: i64,
duration: gst::ClockTime,
) -> Option<(gst::ClockTime, gst::ClockTime)> {
let clock = match element.get_clock() {
None => return None,
Some(clock) => clock,
};
duration: Option<gst::ClockTime>,
) -> Option<(gst::ClockTime, Option<gst::ClockTime>)> {
let clock = element.clock()?;
// For now take the current running time as PTS. At a later time we
// will want to work with the timestamp given by the NDI SDK if available
let now = clock.get_time();
let base_time = element.get_base_time();
let now = clock.time()?;
let base_time = element.base_time()?;
let receive_time = now - base_time;
let real_time_now = gst::ClockTime::from(glib::get_real_time() as u64 * 1000);
let real_time_now = gst::ClockTime::from_nseconds(glib::real_time() as u64 * 1000);
let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined {
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE
} else {
gst::ClockTime::from(timestamp as u64 * 100)
Some(gst::ClockTime::from_nseconds(timestamp as u64 * 100))
};
let timecode = gst::ClockTime::from(timecode as u64 * 100);
let timecode = gst::ClockTime::from_nseconds(timecode as u64 * 100);
gst_log!(
self.0.cat,
obj: element,
"Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}",
timecode,
timestamp,
duration,
receive_time,
timestamp.display(),
duration.display(),
receive_time.display(),
real_time_now,
);
let (pts, duration) = match self.0.timestamp_mode {
TimestampMode::ReceiveTimeTimecode => {
self.0
.observations
.process(self.0.cat, element, (timecode, receive_time), duration)
}
TimestampMode::ReceiveTimeTimecode => self.0.observations.process(
self.0.cat,
element,
(Some(timecode), receive_time),
duration,
),
TimestampMode::ReceiveTimeTimestamp => self.0.observations.process(
self.0.cat,
element,
@ -833,10 +823,11 @@ impl<T: ReceiverType> Receiver<T> {
TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration),
TimestampMode::Timestamp => {
// Timestamps are relative to the UNIX epoch
let timestamp = timestamp?;
if real_time_now > timestamp {
let diff = real_time_now - timestamp;
if diff > receive_time {
(0.into(), duration)
(gst::ClockTime::ZERO, duration)
} else {
(receive_time - diff, duration)
}
@ -851,8 +842,8 @@ impl<T: ReceiverType> Receiver<T> {
self.0.cat,
obj: element,
"Calculated PTS {}, duration {}",
pts,
duration,
pts.display(),
duration.display(),
);
Some((pts, duration))
@ -909,7 +900,7 @@ impl Receiver<VideoReceiver> {
let video_frame = match res {
Err(_) => {
gst_element_error!(
gst::element_error!(
element,
gst::ResourceError::Read,
["Error receiving frame"]
@ -970,13 +961,11 @@ impl Receiver<VideoReceiver> {
&self,
element: &gst_base::BaseSrc,
video_frame: &VideoFrame,
) -> Option<(gst::ClockTime, gst::ClockTime)> {
let duration = gst::SECOND
.mul_div_floor(
) -> Option<(gst::ClockTime, Option<gst::ClockTime>)> {
let duration = gst::ClockTime::SECOND.mul_div_floor(
video_frame.frame_rate().1 as u64,
video_frame.frame_rate().0 as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
);
self.calculate_timestamp(
element,
@ -1004,7 +993,7 @@ impl Receiver<VideoReceiver> {
ndisys::NDIlib_FourCC_video_type_RGBA => gst_video::VideoFormat::Rgba,
ndisys::NDIlib_FourCC_video_type_RGBX => gst_video::VideoFormat::Rgbx,
_ => {
gst_element_error!(
gst::element_error!(
element,
gst::StreamError::Format,
["Unsupported video fourcc {:08x}", video_frame.fourcc()]
@ -1045,7 +1034,7 @@ impl Receiver<VideoReceiver> {
}
builder.build().map_err(|_| {
gst_element_error!(
gst::element_error!(
element,
gst::StreamError::Format,
["Invalid video format configuration"]
@ -1062,7 +1051,7 @@ impl Receiver<VideoReceiver> {
&& video_frame.frame_format_type()
!= ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved
{
gst_element_error!(
gst::element_error!(
element,
gst::StreamError::Format,
["Separate field interlacing not supported"]
@ -1094,7 +1083,7 @@ impl Receiver<VideoReceiver> {
}
builder.build().map_err(|_| {
gst_element_error!(
gst::element_error!(
element,
gst::StreamError::Format,
["Invalid video format configuration"]
@ -1109,7 +1098,7 @@ impl Receiver<VideoReceiver> {
&self,
element: &gst_base::BaseSrc,
pts: gst::ClockTime,
duration: gst::ClockTime,
duration: Option<gst::ClockTime>,
info: &gst_video::VideoInfo,
video_frame: &VideoFrame,
) -> gst::Buffer {
@ -1124,15 +1113,15 @@ impl Receiver<VideoReceiver> {
gst::ReferenceTimestampMeta::add(
buffer,
&*TIMECODE_CAPS,
gst::ClockTime::from(video_frame.timecode() as u64 * 100),
gst::CLOCK_TIME_NONE,
gst::ClockTime::from_nseconds(video_frame.timecode() as u64 * 100),
gst::ClockTime::NONE,
);
if video_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined {
gst::ReferenceTimestampMeta::add(
buffer,
&*TIMESTAMP_CAPS,
gst::ClockTime::from(video_frame.timestamp() as u64 * 100),
gst::CLOCK_TIME_NONE,
gst::ClockTime::from_nseconds(video_frame.timestamp() as u64 * 100),
gst::ClockTime::NONE,
);
}
}
@ -1337,7 +1326,7 @@ impl Receiver<AudioReceiver> {
let audio_frame = match res {
Err(_) => {
gst_element_error!(
gst::element_error!(
element,
gst::ResourceError::Read,
["Error receiving frame"]
@ -1398,13 +1387,11 @@ impl Receiver<AudioReceiver> {
&self,
element: &gst_base::BaseSrc,
audio_frame: &AudioFrame,
) -> Option<(gst::ClockTime, gst::ClockTime)> {
let duration = gst::SECOND
.mul_div_floor(
) -> Option<(gst::ClockTime, Option<gst::ClockTime>)> {
let duration = gst::ClockTime::SECOND.mul_div_floor(
audio_frame.no_samples() as u64,
audio_frame.sample_rate() as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE);
);
self.calculate_timestamp(
element,
@ -1426,7 +1413,7 @@ impl Receiver<AudioReceiver> {
);
builder.build().map_err(|_| {
gst_element_error!(
gst::element_error!(
element,
gst::StreamError::Format,
["Invalid audio format configuration"]
@ -1440,7 +1427,7 @@ impl Receiver<AudioReceiver> {
&self,
_element: &gst_base::BaseSrc,
pts: gst::ClockTime,
duration: gst::ClockTime,
duration: Option<gst::ClockTime>,
info: &gst_audio::AudioInfo,
audio_frame: &AudioFrame,
) -> gst::Buffer {
@ -1458,15 +1445,15 @@ impl Receiver<AudioReceiver> {
gst::ReferenceTimestampMeta::add(
buffer,
&*TIMECODE_CAPS,
gst::ClockTime::from(audio_frame.timecode() as u64 * 100),
gst::CLOCK_TIME_NONE,
gst::ClockTime::from_nseconds(audio_frame.timecode() as u64 * 100),
gst::ClockTime::NONE,
);
if audio_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined {
gst::ReferenceTimestampMeta::add(
buffer,
&*TIMESTAMP_CAPS,
gst::ClockTime::from(audio_frame.timestamp() as u64 * 100),
gst::CLOCK_TIME_NONE,
gst::ClockTime::from_nseconds(audio_frame.timestamp() as u64 * 100),
gst::ClockTime::NONE,
);
}
}