diff --git a/gst-plugin-ndi/src/lib.rs b/gst-plugin-ndi/src/lib.rs index 3b6c79bbf..f06868034 100644 --- a/gst-plugin-ndi/src/lib.rs +++ b/gst-plugin-ndi/src/lib.rs @@ -22,14 +22,14 @@ extern crate gstreamer_video as gst_video; #[macro_use] extern crate lazy_static; -mod ndivideosrc; mod ndiaudiosrc; pub mod ndisys; +mod ndivideosrc; -use std::{thread, time}; -use std::ffi::{CStr, CString}; -use ndisys::*; use gst_plugin::base_src::*; +use ndisys::*; +use std::ffi::{CStr, CString}; +use std::{thread, time}; use std::collections::HashMap; use std::sync::Mutex; @@ -44,7 +44,7 @@ fn plugin_init(plugin: &gst::Plugin) -> bool { true } -struct ndi_receiver_info{ +struct ndi_receiver_info { stream_name: String, ip: String, video: bool, @@ -53,12 +53,12 @@ struct ndi_receiver_info{ id: i8, } -struct Ndi{ +struct Ndi { initial_timestamp: u64, start_pts: gst::ClockTime, } -static mut ndi_struct: Ndi = Ndi{ +static mut ndi_struct: Ndi = Ndi { initial_timestamp: 0, start_pts: gst::ClockTime(Some(0)), }; @@ -72,7 +72,7 @@ lazy_static! { static mut id_receiver: i8 = 0; -fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream_name: String) -> i8{ +fn connect_ndi(cat: gst::DebugCategory, element: &BaseSrc, ip: String, stream_name: String) -> i8 { gst_debug!(cat, obj: element, "Starting NDI connection..."); let mut receivers = hashmap_receivers.lock().unwrap(); @@ -80,35 +80,33 @@ fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream let mut video = false; //FIXME Search for another way to know if the source is an audio or a video source - if element.get_name().contains("audiosrc"){ + if element.get_name().contains("audiosrc") { audio = true; - } - else - { + } else { video = true; } - for val in receivers.values_mut(){ - if val.ip == ip || val.stream_name == stream_name{ - if (val.audio && val.video) || (val.audio && audio) || (val.video && video){ + for val in receivers.values_mut() { + if val.ip == ip || val.stream_name == stream_name { + if (val.audio && val.video) || (val.audio && audio) || (val.video && video) { continue; - } - else { + } else { if video { val.video = video; - } - else{ + } else { val.audio = audio; } return val.id; } } - } unsafe { - if !NDIlib_initialize() { - gst_element_error!(element, gst::CoreError::Negotiation, ["Cannot run NDI: NDIlib_initialize error"]); + gst_element_error!( + element, + gst::CoreError::Negotiation, + ["Cannot run NDI: NDIlib_initialize error"] + ); // return false; return 0; } @@ -118,7 +116,11 @@ fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream let pNDI_find = NDIlib_find_create_v2(&NDI_find_create_desc); //let ip_ptr = CString::new(ip.clone()).unwrap(); if pNDI_find.is_null() { - gst_element_error!(element, gst::CoreError::Negotiation, ["Cannot run NDI: NDIlib_find_create_v2 error"]); + gst_element_error!( + element, + gst::CoreError::Negotiation, + ["Cannot run NDI: NDIlib_find_create_v2 error"] + ); // return false; return 0; } @@ -132,37 +134,57 @@ fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream // We need at least one source if p_sources.is_null() { - gst_element_error!(element, gst::CoreError::Negotiation, ["Error getting NDIlib_find_get_current_sources"]); + gst_element_error!( + element, + gst::CoreError::Negotiation, + ["Error getting NDIlib_find_get_current_sources"] + ); // return false; return 0; } let mut no_source: isize = -1; - for i in 0..total_sources as isize{ - if CStr::from_ptr((*p_sources.offset(i)).p_ndi_name).to_string_lossy().into_owned() == stream_name || - CStr::from_ptr((*p_sources.offset(i)).p_ip_address).to_string_lossy().into_owned() == ip{ + for i in 0..total_sources as isize { + if CStr::from_ptr((*p_sources.offset(i)).p_ndi_name) + .to_string_lossy() + .into_owned() + == stream_name + || CStr::from_ptr((*p_sources.offset(i)).p_ip_address) + .to_string_lossy() + .into_owned() + == ip + { no_source = i; break; } } - if no_source == -1 { + if no_source == -1 { gst_element_error!(element, gst::ResourceError::OpenRead, ["Stream not found"]); // return false; return 0; } - gst_debug!(cat, obj: element, "Total sources in network {}: Connecting to NDI source with name '{}' and address '{}'", total_sources, - CStr::from_ptr((*p_sources.offset(no_source)).p_ndi_name) - .to_string_lossy() - .into_owned(), - CStr::from_ptr((*p_sources.offset(no_source)).p_ip_address) - .to_string_lossy() - .into_owned()); + gst_debug!( + cat, + obj: element, + "Total sources in network {}: Connecting to NDI source with name '{}' and address '{}'", + total_sources, + CStr::from_ptr((*p_sources.offset(no_source)).p_ndi_name) + .to_string_lossy() + .into_owned(), + CStr::from_ptr((*p_sources.offset(no_source)).p_ip_address) + .to_string_lossy() + .into_owned() + ); let source = *p_sources.offset(no_source).clone(); - let source_ip = CStr::from_ptr(source.p_ip_address).to_string_lossy().into_owned(); - let source_name = CStr::from_ptr(source.p_ndi_name).to_string_lossy().into_owned(); + let source_ip = CStr::from_ptr(source.p_ip_address) + .to_string_lossy() + .into_owned(); + let source_name = CStr::from_ptr(source.p_ndi_name) + .to_string_lossy() + .into_owned(); // We now have at least one source, so we create a receiver to look at it. // We tell it that we prefer YCbCr video since it is more efficient for us. If the source has an alpha channel @@ -177,7 +199,11 @@ fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream let pNDI_recv = NDIlib_recv_create_v3(&NDI_recv_create_desc); if pNDI_recv.is_null() { //println!("Cannot run NDI: NDIlib_recv_create_v3 error."); - gst_element_error!(element, gst::CoreError::Negotiation, ["Cannot run NDI: NDIlib_recv_create_v3 error"]); + gst_element_error!( + element, + gst::CoreError::Negotiation, + ["Cannot run NDI: NDIlib_recv_create_v3 error"] + ); // return false; return 0; } @@ -202,7 +228,17 @@ fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream NDIlib_recv_send_metadata(pNDI_recv, &enable_hw_accel); id_receiver += 1; - receivers.insert(id_receiver, ndi_receiver_info{stream_name: source_name.clone(), ip: source_ip.clone(), video:video, audio: audio, ndi_instance: NdiInstance{recv: pNDI_recv}, id: id_receiver}); + receivers.insert( + id_receiver, + ndi_receiver_info { + stream_name: source_name.clone(), + ip: source_ip.clone(), + video: video, + audio: audio, + ndi_instance: NdiInstance { recv: pNDI_recv }, + id: id_receiver, + }, + ); // let start = SystemTime::now(); // let since_the_epoch = start.duration_since(UNIX_EPOCH) @@ -215,16 +251,15 @@ fn connect_ndi(cat: gst::DebugCategory , element: &BaseSrc, ip: String, stream } } -fn stop_ndi(cat: gst::DebugCategory , element: &BaseSrc, id: i8) -> bool{ +fn stop_ndi(cat: gst::DebugCategory, element: &BaseSrc, id: i8) -> bool { gst_debug!(cat, obj: element, "Closing NDI connection..."); let mut receivers = hashmap_receivers.lock().unwrap(); { let val = receivers.get_mut(&id).unwrap(); - if val.video && val.audio{ - if element.get_name().contains("audiosrc"){ + if val.video && val.audio { + if element.get_name().contains("audiosrc") { val.audio = false; - } - else{ + } else { val.video = false; } return true; @@ -232,7 +267,7 @@ fn stop_ndi(cat: gst::DebugCategory , element: &BaseSrc, id: i8) -> bool{ let recv = &val.ndi_instance; let pNDI_recv = recv.recv; - unsafe{ + unsafe { NDIlib_recv_destroy(pNDI_recv); // ndi_struct.recv = None; NDIlib_destroy(); diff --git a/gst-plugin-ndi/src/ndiaudiosrc.rs b/gst-plugin-ndi/src/ndiaudiosrc.rs index e56ffd97e..57b28db56 100644 --- a/gst-plugin-ndi/src/ndiaudiosrc.rs +++ b/gst-plugin-ndi/src/ndiaudiosrc.rs @@ -6,19 +6,19 @@ use gst::prelude::*; use gst_audio; use gst_base::prelude::*; +use gobject_subclass::object::*; use gst_plugin::base_src::*; use gst_plugin::element::*; -use gobject_subclass::object::*; use std::sync::Mutex; use std::{i32, u32}; use std::ptr; -use ndisys::*; use connect_ndi; -use stop_ndi; use ndi_struct; +use ndisys::*; +use stop_ndi; use hashmap_receivers; @@ -42,20 +42,20 @@ impl Default for Settings { // Metadata for the properties static PROPERTIES: [Property; 2] = [ -Property::String( - "stream-name", - "Sream Name", - "Name of the streaming device", - None, - PropertyMutability::ReadWrite, -), -Property::String( - "ip", - "Stream IP", - "Stream IP", - None, - PropertyMutability::ReadWrite, -), + Property::String( + "stream-name", + "Sream Name", + "Name of the streaming device", + None, + PropertyMutability::ReadWrite, + ), + Property::String( + "ip", + "Stream IP", + "Stream IP", + None, + PropertyMutability::ReadWrite, + ), ]; // Stream-specific state, i.e. audio format configuration @@ -66,13 +66,11 @@ struct State { impl Default for State { fn default() -> State { - State { - info: None, - } + State { info: None } } } -struct TimestampData{ +struct TimestampData { offset: u64, } @@ -100,9 +98,7 @@ impl NdiAudioSrc { ), settings: Mutex::new(Default::default()), state: Mutex::new(Default::default()), - timestamp_data: Mutex::new(TimestampData{ - offset: 0, - }), + timestamp_data: Mutex::new(TimestampData { offset: 0 }), }) } @@ -129,362 +125,383 @@ impl NdiAudioSrc { let caps = gst::Caps::new_simple( "audio/x-raw", &[ - ( - "format", - &gst::List::new(&[ - //TODO add all formats? - &gst_audio::AUDIO_FORMAT_F32.to_string(), - &gst_audio::AUDIO_FORMAT_F64.to_string(), - &gst_audio::AUDIO_FORMAT_S16.to_string(), + ( + "format", + &gst::List::new(&[ + //TODO add all formats? + &gst_audio::AUDIO_FORMAT_F32.to_string(), + &gst_audio::AUDIO_FORMAT_F64.to_string(), + &gst_audio::AUDIO_FORMAT_S16.to_string(), ]), ), ("rate", &gst::IntRange::::new(1, i32::MAX)), ("channels", &gst::IntRange::::new(1, i32::MAX)), ("layout", &"interleaved"), - ], - ); - // The src pad template must be named "src" for basesrc - // and specific a pad that is always there - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - //&gst::Caps::new_any(), - ); - klass.add_pad_template(src_pad_template); + ], + ); + // The src pad template must be named "src" for basesrc + // and specific a pad that is always there + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + //&gst::Caps::new_any(), + ); + klass.add_pad_template(src_pad_template); - // Install all our properties - klass.install_properties(&PROPERTIES); + // Install all our properties + klass.install_properties(&PROPERTIES); + } +} + +// Virtual methods of GObject itself +impl ObjectImpl for NdiAudioSrc { + // Called whenever a value of a property is changed. It can be called + // at any time from any thread. + fn set_property(&self, obj: &glib::Object, id: u32, value: &glib::Value) { + let prop = &PROPERTIES[id as usize]; + let element = obj.clone().downcast::().unwrap(); + + match *prop { + Property::String("stream-name", ..) => { + let mut settings = self.settings.lock().unwrap(); + let stream_name = value.get().unwrap(); + gst_debug!( + self.cat, + obj: &element, + "Changing stream-name from {} to {}", + settings.stream_name, + stream_name + ); + settings.stream_name = stream_name; + drop(settings); + + let _ = + element.post_message(&gst::Message::new_latency().src(Some(&element)).build()); + } + Property::String("ip", ..) => { + let mut settings = self.settings.lock().unwrap(); + let ip = value.get().unwrap(); + gst_debug!( + self.cat, + obj: &element, + "Changing ip from {} to {}", + settings.ip, + ip + ); + settings.ip = ip; + drop(settings); + + let _ = + element.post_message(&gst::Message::new_latency().src(Some(&element)).build()); + } + _ => unimplemented!(), } } + // Called whenever a value of a property is read. It can be called + // at any time from any thread. + fn get_property(&self, _obj: &glib::Object, id: u32) -> Result { + let prop = &PROPERTIES[id as usize]; - // Virtual methods of GObject itself - impl ObjectImpl for NdiAudioSrc { - // Called whenever a value of a property is changed. It can be called - // at any time from any thread. - fn set_property(&self, obj: &glib::Object, id: u32, value: &glib::Value) { - let prop = &PROPERTIES[id as usize]; - let element = obj.clone().downcast::().unwrap(); - - match *prop { - Property::String("stream-name", ..) => { - let mut settings = self.settings.lock().unwrap(); - let stream_name = value.get().unwrap(); - gst_debug!( - self.cat, - obj: &element, - "Changing stream-name from {} to {}", - settings.stream_name, - stream_name - ); - settings.stream_name = stream_name; - drop(settings); - - let _ = - element.post_message(&gst::Message::new_latency().src(Some(&element)).build()); - }, - Property::String("ip", ..) => { - let mut settings = self.settings.lock().unwrap(); - let ip = value.get().unwrap(); - gst_debug!( - self.cat, - obj: &element, - "Changing ip from {} to {}", - settings.ip, - ip - ); - settings.ip = ip; - drop(settings); - - let _ = - element.post_message(&gst::Message::new_latency().src(Some(&element)).build()); - } - _ => unimplemented!(), - } - } - - // Called whenever a value of a property is read. It can be called - // at any time from any thread. - fn get_property(&self, _obj: &glib::Object, id: u32) -> Result { - let prop = &PROPERTIES[id as usize]; - - match *prop { - Property::String("stream-name", ..) => { - let settings = self.settings.lock().unwrap(); - //TODO to_value supongo que solo funciona con numeros - Ok(settings.stream_name.to_value()) - }, - Property::String("ip", ..) => { - let settings = self.settings.lock().unwrap(); - //TODO to_value supongo que solo funciona con numeros - Ok(settings.ip.to_value()) - } - _ => unimplemented!(), - } - } - } - - // Virtual methods of gst::Element. We override none - impl ElementImpl for NdiAudioSrc { - fn change_state(&self, element: &BaseSrc, transition: gst::StateChange) -> gst::StateChangeReturn { - if transition == gst::StateChange::PausedToPlaying{ - let receivers = hashmap_receivers.lock().unwrap(); + match *prop { + Property::String("stream-name", ..) => { let settings = self.settings.lock().unwrap(); - - let receiver = receivers.get(&settings.id_receiver).unwrap(); - let recv = &receiver.ndi_instance; - let pNDI_recv = recv.recv; - - let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); - - let mut frame_type: NDIlib_frame_type_e = NDIlib_frame_type_e::NDIlib_frame_type_none; - unsafe{ - while frame_type != NDIlib_frame_type_e::NDIlib_frame_type_audio{ - frame_type = NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000); - } - - if ndi_struct.initial_timestamp <= audio_frame.timestamp as u64 || ndi_struct.initial_timestamp == 0{ - ndi_struct.initial_timestamp = audio_frame.timestamp as u64; - } - } + //TODO to_value supongo que solo funciona con numeros + Ok(settings.stream_name.to_value()) } - element.parent_change_state(transition) + Property::String("ip", ..) => { + let settings = self.settings.lock().unwrap(); + //TODO to_value supongo que solo funciona con numeros + Ok(settings.ip.to_value()) + } + _ => unimplemented!(), } } +} - // Virtual methods of gst_base::BaseSrc - impl BaseSrcImpl for NdiAudioSrc { - // Called whenever the input/output caps are changing, i.e. in the very beginning before data - // flow happens and whenever the situation in the pipeline is changing. All buffers after this - // call have the caps given here. - // - // We simply remember the resulting AudioInfo from the caps to be able to use this for knowing - // the sample rate, etc. when creating buffers - fn set_caps(&self, element: &BaseSrc, caps: &gst::CapsRef) -> bool { - - let info = match gst_audio::AudioInfo::from_caps(caps) { - None => return false, - Some(info) => info, - }; - - gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); - - // TODO Puede que falle si no creamos la estructura de cero, pero si lo hacemos no podemos poner recv a none - let mut state = self.state.lock().unwrap(); - state.info = Some(info); - - true - } - - // Called when starting, so we can initialize all stream-related state to its defaults - fn start(&self, element: &BaseSrc) -> bool { - // Reset state - *self.state.lock().unwrap() = Default::default(); - - let mut settings = self.settings.lock().unwrap(); - settings.id_receiver = connect_ndi(self.cat, element, settings.ip.clone(), settings.stream_name.clone()); - if settings.id_receiver == 0{ - return false; - } - else{ - return true; - } - } - - // Called when shutting down the element so we can release all stream-related state - fn stop(&self, element: &BaseSrc) -> bool { - // Reset state - *self.state.lock().unwrap() = Default::default(); - - let settings = self.settings.lock().unwrap(); - stop_ndi(self.cat, element, settings.id_receiver.clone()); - // Commented because when adding ndi destroy stopped in this line - //*self.state.lock().unwrap() = Default::default(); - true - } - - fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool { - use gst::QueryView; - - match query.view_mut() { - // We only work in Push mode. In Pull mode, create() could be called with - // arbitrary offsets and we would have to produce for that specific offset - QueryView::Scheduling(ref mut q) => { - q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); - q.add_scheduling_modes(&[gst::PadMode::Push]); - return true; - } - // In Live mode we will have a latency equal to the number of samples in each buffer. - // We can't output samples before they were produced, and the last sample of a buffer - // is produced that much after the beginning, leading to this latency calculation - // QueryView::Latency(ref mut q) => { - // let settings = &*self.settings.lock().unwrap(); - // let state = self.state.lock().unwrap(); - // - // if let Some(ref _info) = state.info { - // // let latency = gst::SECOND - // // .mul_div_floor(1024 as u64, _info.rate() as u64) - // // .unwrap(); - // let latency = gst::SECOND.mul_div_floor(3 as u64, 2 as u64).unwrap(); - // // let latency = gst::SECOND - // // .mul_div_floor(1 as u64, 30 as u64) - // // .unwrap(); - // // gst_debug!(self.cat, obj: element, "Returning latency {}", latency); - // let max = latency * 1843200; - // // println!("{:?}", latency); - // // println!("{:?}",max); - // q.set(true, latency, max); - // return true; - // } else { - // return false; - // } - // } - _ => (), - } - BaseSrcBase::parent_query(element, query) - } - - fn fixate(&self, element: &BaseSrc, caps: gst::Caps) -> gst::Caps { - //We need to set the correct caps resolution and framerate +// Virtual methods of gst::Element. We override none +impl ElementImpl for NdiAudioSrc { + fn change_state( + &self, + element: &BaseSrc, + transition: gst::StateChange, + ) -> gst::StateChangeReturn { + if transition == gst::StateChange::PausedToPlaying { let receivers = hashmap_receivers.lock().unwrap(); let settings = self.settings.lock().unwrap(); let receiver = receivers.get(&settings.id_receiver).unwrap(); - let recv = &receiver.ndi_instance; let pNDI_recv = recv.recv; let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); let mut frame_type: NDIlib_frame_type_e = NDIlib_frame_type_e::NDIlib_frame_type_none; - while frame_type != NDIlib_frame_type_e::NDIlib_frame_type_audio{ - unsafe{ - frame_type = NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000); - } - } - let mut caps = gst::Caps::truncate(caps); - { - let caps = caps.make_mut(); - let s = caps.get_mut_structure(0).unwrap(); - //s.fixate_field_nearest_int("rate", audio_frame.sample_rate); - s.fixate_field_nearest_int("rate", audio_frame.sample_rate / audio_frame.no_channels); - s.fixate_field_nearest_int("channels", audio_frame.no_channels); - } - - // Let BaseSrc fixate anything else for us. We could've alternatively have - // called Caps::fixate() here - element.parent_fixate(caps) - // } - } - - //Creates the audio buffers - fn create( - &self, - element: &BaseSrc, - _offset: u64, - _length: u32, - ) -> Result { - // Keep a local copy of the values of all our properties at this very moment. This - // ensures that the mutex is never locked for long and the application wouldn't - // have to block until this function returns when getting/setting property values - let _settings = &*self.settings.lock().unwrap(); - - let mut timestamp_data = self.timestamp_data.lock().unwrap(); - // Get a locked reference to our state, i.e. the input and output AudioInfo - let state = self.state.lock().unwrap(); - let _info = match state.info { - None => { - gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]); - return Err(gst::FlowReturn::NotNegotiated); - } - Some(ref info) => info.clone(), - }; - let receivers = hashmap_receivers.lock().unwrap(); - - let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance; - let pNDI_recv = recv.recv; - - let pts: u64; - let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); - - unsafe{ - let time = ndi_struct.initial_timestamp; - - let mut skip_frame = true; - while skip_frame { - let frame_type = NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000,); - if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none || frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error { - gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]); - return Err(gst::FlowReturn::CustomError); - } - if time >= (audio_frame.timestamp as u64){ - gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (audio_frame.timestamp as u64), time); - } - else{ - skip_frame = false; - } + unsafe { + while frame_type != NDIlib_frame_type_e::NDIlib_frame_type_audio { + frame_type = NDIlib_recv_capture_v2( + pNDI_recv, + ptr::null(), + &audio_frame, + ptr::null(), + 1000, + ); } - pts = audio_frame.timestamp as u64 - time; - - let buff_size = ((audio_frame.channel_stride_in_bytes)) as usize; - let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); + if ndi_struct.initial_timestamp <= audio_frame.timestamp as u64 + || ndi_struct.initial_timestamp == 0 { - let vec = Vec::from_raw_parts(audio_frame.p_data as *mut u8, buff_size, buff_size); - let pts: gst::ClockTime = (pts * 100).into(); - - let duration: gst::ClockTime = (((audio_frame.no_samples as f64 / audio_frame.sample_rate as f64) * 1000000000.0) as u64).into(); - let buffer = buffer.get_mut().unwrap(); - - if ndi_struct.start_pts == gst::ClockTime(Some(0)){ - ndi_struct.start_pts = element.get_clock().unwrap().get_time() - element.get_base_time(); - } - - buffer.set_pts(pts + ndi_struct.start_pts); - buffer.set_duration(duration); - buffer.set_offset(timestamp_data.offset); - buffer.set_offset_end(timestamp_data.offset + 1); - timestamp_data.offset = timestamp_data.offset + 1; - buffer.copy_from_slice(0, &vec).unwrap(); + ndi_struct.initial_timestamp = audio_frame.timestamp as u64; } - - gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer); - - Ok(buffer) } } + element.parent_change_state(transition) + } +} + +// Virtual methods of gst_base::BaseSrc +impl BaseSrcImpl for NdiAudioSrc { + // Called whenever the input/output caps are changing, i.e. in the very beginning before data + // flow happens and whenever the situation in the pipeline is changing. All buffers after this + // call have the caps given here. + // + // We simply remember the resulting AudioInfo from the caps to be able to use this for knowing + // the sample rate, etc. when creating buffers + fn set_caps(&self, element: &BaseSrc, caps: &gst::CapsRef) -> bool { + let info = match gst_audio::AudioInfo::from_caps(caps) { + None => return false, + Some(info) => info, + }; + + gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); + + // TODO Puede que falle si no creamos la estructura de cero, pero si lo hacemos no podemos poner recv a none + let mut state = self.state.lock().unwrap(); + state.info = Some(info); + + true } - // This zero-sized struct is containing the static metadata of our element. It is only necessary to - // be able to implement traits on it, but e.g. a plugin that registers multiple elements with the - // same code would use this struct to store information about the concrete element. An example of - // this would be a plugin that wraps around a library that has multiple decoders with the same API, - // but wants (as it should) a separate element registered for each decoder. - struct NdiAudioSrcStatic; + // Called when starting, so we can initialize all stream-related state to its defaults + fn start(&self, element: &BaseSrc) -> bool { + // Reset state + *self.state.lock().unwrap() = Default::default(); - // The basic trait for registering the type: This returns a name for the type and registers the - // instance and class initializations functions with the type system, thus hooking everything - // together. - impl ImplTypeStatic for NdiAudioSrcStatic { - fn get_name(&self) -> &str { - "NdiAudioSrc" - } - - fn new(&self, element: &BaseSrc) -> Box> { - NdiAudioSrc::new(element) - } - - fn class_init(&self, klass: &mut BaseSrcClass) { - NdiAudioSrc::class_init(klass); + let mut settings = self.settings.lock().unwrap(); + settings.id_receiver = connect_ndi( + self.cat, + element, + settings.ip.clone(), + settings.stream_name.clone(), + ); + if settings.id_receiver == 0 { + return false; + } else { + return true; } } - // Registers the type for our element, and then registers in GStreamer under - // the name NdiAudioSrc for being able to instantiate it via e.g. - // gst::ElementFactory::make(). - pub fn register(plugin: &gst::Plugin) { - let type_ = register_type(NdiAudioSrcStatic); - gst::Element::register(plugin, "ndiaudiosrc", 0, type_); + // Called when shutting down the element so we can release all stream-related state + fn stop(&self, element: &BaseSrc) -> bool { + // Reset state + *self.state.lock().unwrap() = Default::default(); + + let settings = self.settings.lock().unwrap(); + stop_ndi(self.cat, element, settings.id_receiver.clone()); + // Commented because when adding ndi destroy stopped in this line + //*self.state.lock().unwrap() = Default::default(); + true } + + fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool { + use gst::QueryView; + + match query.view_mut() { + // We only work in Push mode. In Pull mode, create() could be called with + // arbitrary offsets and we would have to produce for that specific offset + QueryView::Scheduling(ref mut q) => { + q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); + q.add_scheduling_modes(&[gst::PadMode::Push]); + return true; + } + // In Live mode we will have a latency equal to the number of samples in each buffer. + // We can't output samples before they were produced, and the last sample of a buffer + // is produced that much after the beginning, leading to this latency calculation + // QueryView::Latency(ref mut q) => { + // let settings = &*self.settings.lock().unwrap(); + // let state = self.state.lock().unwrap(); + // + // if let Some(ref _info) = state.info { + // // let latency = gst::SECOND + // // .mul_div_floor(1024 as u64, _info.rate() as u64) + // // .unwrap(); + // let latency = gst::SECOND.mul_div_floor(3 as u64, 2 as u64).unwrap(); + // // let latency = gst::SECOND + // // .mul_div_floor(1 as u64, 30 as u64) + // // .unwrap(); + // // gst_debug!(self.cat, obj: element, "Returning latency {}", latency); + // let max = latency * 1843200; + // // println!("{:?}", latency); + // // println!("{:?}",max); + // q.set(true, latency, max); + // return true; + // } else { + // return false; + // } + // } + _ => (), + } + BaseSrcBase::parent_query(element, query) + } + + fn fixate(&self, element: &BaseSrc, caps: gst::Caps) -> gst::Caps { + //We need to set the correct caps resolution and framerate + let receivers = hashmap_receivers.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + let receiver = receivers.get(&settings.id_receiver).unwrap(); + + let recv = &receiver.ndi_instance; + let pNDI_recv = recv.recv; + + let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); + + let mut frame_type: NDIlib_frame_type_e = NDIlib_frame_type_e::NDIlib_frame_type_none; + while frame_type != NDIlib_frame_type_e::NDIlib_frame_type_audio { + unsafe { + frame_type = + NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000); + } + } + let mut caps = gst::Caps::truncate(caps); + { + let caps = caps.make_mut(); + let s = caps.get_mut_structure(0).unwrap(); + //s.fixate_field_nearest_int("rate", audio_frame.sample_rate); + s.fixate_field_nearest_int("rate", audio_frame.sample_rate / audio_frame.no_channels); + s.fixate_field_nearest_int("channels", audio_frame.no_channels); + } + + // Let BaseSrc fixate anything else for us. We could've alternatively have + // called Caps::fixate() here + element.parent_fixate(caps) + // } + } + + //Creates the audio buffers + fn create( + &self, + element: &BaseSrc, + _offset: u64, + _length: u32, + ) -> Result { + // Keep a local copy of the values of all our properties at this very moment. This + // ensures that the mutex is never locked for long and the application wouldn't + // have to block until this function returns when getting/setting property values + let _settings = &*self.settings.lock().unwrap(); + + let mut timestamp_data = self.timestamp_data.lock().unwrap(); + // Get a locked reference to our state, i.e. the input and output AudioInfo + let state = self.state.lock().unwrap(); + let _info = match state.info { + None => { + gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]); + return Err(gst::FlowReturn::NotNegotiated); + } + Some(ref info) => info.clone(), + }; + let receivers = hashmap_receivers.lock().unwrap(); + + let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance; + let pNDI_recv = recv.recv; + + let pts: u64; + let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); + + unsafe { + let time = ndi_struct.initial_timestamp; + + let mut skip_frame = true; + while skip_frame { + let frame_type = + NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000); + if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none + || frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error + { + gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]); + return Err(gst::FlowReturn::CustomError); + } + if time >= (audio_frame.timestamp as u64) { + gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (audio_frame.timestamp as u64), time); + } else { + skip_frame = false; + } + } + + pts = audio_frame.timestamp as u64 - time; + + let buff_size = (audio_frame.channel_stride_in_bytes) as usize; + let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); + { + let vec = Vec::from_raw_parts(audio_frame.p_data as *mut u8, buff_size, buff_size); + let pts: gst::ClockTime = (pts * 100).into(); + + let duration: gst::ClockTime = (((audio_frame.no_samples as f64 + / audio_frame.sample_rate as f64) + * 1000000000.0) as u64) + .into(); + let buffer = buffer.get_mut().unwrap(); + + if ndi_struct.start_pts == gst::ClockTime(Some(0)) { + ndi_struct.start_pts = + element.get_clock().unwrap().get_time() - element.get_base_time(); + } + + buffer.set_pts(pts + ndi_struct.start_pts); + buffer.set_duration(duration); + buffer.set_offset(timestamp_data.offset); + buffer.set_offset_end(timestamp_data.offset + 1); + timestamp_data.offset = timestamp_data.offset + 1; + buffer.copy_from_slice(0, &vec).unwrap(); + } + + gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer); + + Ok(buffer) + } + } +} + +// This zero-sized struct is containing the static metadata of our element. It is only necessary to +// be able to implement traits on it, but e.g. a plugin that registers multiple elements with the +// same code would use this struct to store information about the concrete element. An example of +// this would be a plugin that wraps around a library that has multiple decoders with the same API, +// but wants (as it should) a separate element registered for each decoder. +struct NdiAudioSrcStatic; + +// The basic trait for registering the type: This returns a name for the type and registers the +// instance and class initializations functions with the type system, thus hooking everything +// together. +impl ImplTypeStatic for NdiAudioSrcStatic { + fn get_name(&self) -> &str { + "NdiAudioSrc" + } + + fn new(&self, element: &BaseSrc) -> Box> { + NdiAudioSrc::new(element) + } + + fn class_init(&self, klass: &mut BaseSrcClass) { + NdiAudioSrc::class_init(klass); + } +} + +// Registers the type for our element, and then registers in GStreamer under +// the name NdiAudioSrc for being able to instantiate it via e.g. +// gst::ElementFactory::make(). +pub fn register(plugin: &gst::Plugin) { + let type_ = register_type(NdiAudioSrcStatic); + gst::Element::register(plugin, "ndiaudiosrc", 0, type_); +} diff --git a/gst-plugin-ndi/src/ndisys.rs b/gst-plugin-ndi/src/ndisys.rs index ec2c8d380..502986095 100644 --- a/gst-plugin-ndi/src/ndisys.rs +++ b/gst-plugin-ndi/src/ndisys.rs @@ -157,7 +157,6 @@ pub struct NdiInstance { unsafe impl ::std::marker::Send for NdiInstance {} - #[repr(C)] #[derive(Debug, Copy, Clone)] pub struct NDIlib_tally_t { diff --git a/gst-plugin-ndi/src/ndivideosrc.rs b/gst-plugin-ndi/src/ndivideosrc.rs index 83bde8396..871b2282a 100644 --- a/gst-plugin-ndi/src/ndivideosrc.rs +++ b/gst-plugin-ndi/src/ndivideosrc.rs @@ -3,23 +3,23 @@ use glib; use gst; use gst::prelude::*; -use gst_video; -use gst_base::prelude::*; use gst::Fraction; +use gst_base::prelude::*; +use gst_video; +use gobject_subclass::object::*; use gst_plugin::base_src::*; use gst_plugin::element::*; -use gobject_subclass::object::*; use std::sync::Mutex; use std::{i32, u32}; use std::ptr; -use ndisys::*; use connect_ndi; -use stop_ndi; use ndi_struct; +use ndisys::*; +use stop_ndi; use hashmap_receivers; @@ -45,20 +45,20 @@ impl Default for Settings { // Metadata for the properties static PROPERTIES: [Property; 2] = [ -Property::String( - "stream-name", - "Sream Name", - "Name of the streaming device", - None, - PropertyMutability::ReadWrite, -), -Property::String( - "ip", - "Stream IP", - "Stream IP", - None, - PropertyMutability::ReadWrite, -), + Property::String( + "stream-name", + "Sream Name", + "Name of the streaming device", + None, + PropertyMutability::ReadWrite, + ), + Property::String( + "ip", + "Stream IP", + "Stream IP", + None, + PropertyMutability::ReadWrite, + ), ]; // Stream-specific state, i.e. audio format configuration @@ -69,13 +69,11 @@ struct State { impl Default for State { fn default() -> State { - State { - info: None, - } + State { info: None } } } -struct TimestampData{ +struct TimestampData { offset: u64, } @@ -103,9 +101,7 @@ impl NdiVideoSrc { ), settings: Mutex::new(Default::default()), state: Mutex::new(Default::default()), - timestamp_data: Mutex::new(TimestampData{ - offset: 0, - }), + timestamp_data: Mutex::new(TimestampData { offset: 0 }), }) } @@ -132,13 +128,13 @@ impl NdiVideoSrc { let caps = gst::Caps::new_simple( "video/x-raw", &[ - ( - "format", - &gst::List::new(&[ - //TODO add all formats? - &gst_video::VideoFormat::Uyvy.to_string(), - //&gst_video::VideoFormat::Rgb.to_string(), - //&gst_video::VideoFormat::Gray8.to_string(), + ( + "format", + &gst::List::new(&[ + //TODO add all formats? + &gst_video::VideoFormat::Uyvy.to_string(), + //&gst_video::VideoFormat::Rgb.to_string(), + //&gst_video::VideoFormat::Gray8.to_string(), ]), ), ("width", &gst::IntRange::::new(0, i32::MAX)), @@ -150,220 +146,96 @@ impl NdiVideoSrc { gst::Fraction::new(i32::MAX, 1), ), ), - ], - ); - // The src pad template must be named "src" for basesrc - // and specific a pad that is always there - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - //&gst::Caps::new_any(), - ); - klass.add_pad_template(src_pad_template); + ], + ); + // The src pad template must be named "src" for basesrc + // and specific a pad that is always there + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + //&gst::Caps::new_any(), + ); + klass.add_pad_template(src_pad_template); - // Install all our properties - klass.install_properties(&PROPERTIES); + // Install all our properties + klass.install_properties(&PROPERTIES); + } +} + +// Virtual methods of GObject itself +impl ObjectImpl for NdiVideoSrc { + // Called whenever a value of a property is changed. It can be called + // at any time from any thread. + fn set_property(&self, obj: &glib::Object, id: u32, value: &glib::Value) { + let prop = &PROPERTIES[id as usize]; + let element = obj.clone().downcast::().unwrap(); + + match *prop { + Property::String("stream-name", ..) => { + let mut settings = self.settings.lock().unwrap(); + let stream_name = value.get().unwrap(); + gst_debug!( + self.cat, + obj: &element, + "Changing stream-name from {} to {}", + settings.stream_name, + stream_name + ); + settings.stream_name = stream_name; + drop(settings); + + // let _ = + // element.post_message(&gst::Message::new_latency().src(Some(&element)).build()); + } + Property::String("ip", ..) => { + let mut settings = self.settings.lock().unwrap(); + let ip = value.get().unwrap(); + gst_debug!( + self.cat, + obj: &element, + "Changing ip from {} to {}", + settings.ip, + ip + ); + settings.ip = ip; + drop(settings); + + // let _ = + // element.post_message(&gst::Message::new_latency().src(Some(&element)).build()); + } + _ => unimplemented!(), } } + // Called whenever a value of a property is read. It can be called + // at any time from any thread. + fn get_property(&self, _obj: &glib::Object, id: u32) -> Result { + let prop = &PROPERTIES[id as usize]; - - // Virtual methods of GObject itself - impl ObjectImpl for NdiVideoSrc { - // Called whenever a value of a property is changed. It can be called - // at any time from any thread. - fn set_property(&self, obj: &glib::Object, id: u32, value: &glib::Value) { - let prop = &PROPERTIES[id as usize]; - let element = obj.clone().downcast::().unwrap(); - - match *prop { - Property::String("stream-name", ..) => { - let mut settings = self.settings.lock().unwrap(); - let stream_name = value.get().unwrap(); - gst_debug!( - self.cat, - obj: &element, - "Changing stream-name from {} to {}", - settings.stream_name, - stream_name - ); - settings.stream_name = stream_name; - drop(settings); - - // let _ = - // element.post_message(&gst::Message::new_latency().src(Some(&element)).build()); - }, - Property::String("ip", ..) => { - let mut settings = self.settings.lock().unwrap(); - let ip = value.get().unwrap(); - gst_debug!( - self.cat, - obj: &element, - "Changing ip from {} to {}", - settings.ip, - ip - ); - settings.ip = ip; - drop(settings); - - // let _ = - // element.post_message(&gst::Message::new_latency().src(Some(&element)).build()); - } - _ => unimplemented!(), - } - } - - // Called whenever a value of a property is read. It can be called - // at any time from any thread. - fn get_property(&self, _obj: &glib::Object, id: u32) -> Result { - let prop = &PROPERTIES[id as usize]; - - match *prop { - Property::String("stream-name", ..) => { - let settings = self.settings.lock().unwrap(); - Ok(settings.stream_name.to_value()) - }, - Property::String("ip", ..) => { - let settings = self.settings.lock().unwrap(); - Ok(settings.ip.to_value()) - } - _ => unimplemented!(), - } - } - } - - // Virtual methods of gst::Element. We override none - impl ElementImpl for NdiVideoSrc { - fn change_state(&self, element: &BaseSrc, transition: gst::StateChange) -> gst::StateChangeReturn { - if transition == gst::StateChange::PausedToPlaying{ - let receivers = hashmap_receivers.lock().unwrap(); + match *prop { + Property::String("stream-name", ..) => { let settings = self.settings.lock().unwrap(); - - let receiver = receivers.get(&settings.id_receiver).unwrap(); - let recv = &receiver.ndi_instance; - let pNDI_recv = recv.recv; - - let video_frame: NDIlib_video_frame_v2_t = Default::default(); - - let mut frame_type: NDIlib_frame_type_e = NDIlib_frame_type_e::NDIlib_frame_type_none; - unsafe{ - while frame_type != NDIlib_frame_type_e::NDIlib_frame_type_video{ - frame_type = NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000); - } - - if ndi_struct.initial_timestamp <= video_frame.timestamp as u64 || ndi_struct.initial_timestamp == 0{ - ndi_struct.initial_timestamp = video_frame.timestamp as u64; - } - } + Ok(settings.stream_name.to_value()) } - element.parent_change_state(transition) + Property::String("ip", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.ip.to_value()) + } + _ => unimplemented!(), } } +} - - // Virtual methods of gst_base::BaseSrc - impl BaseSrcImpl for NdiVideoSrc { - // Called whenever the input/output caps are changing, i.e. in the very beginning before data - // flow happens and whenever the situation in the pipeline is changing. All buffers after this - // call have the caps given here. - // - // We simply remember the resulting AudioInfo from the caps to be able to use this for knowing - // the sample rate, etc. when creating buffers - fn set_caps(&self, element: &BaseSrc, caps: &gst::CapsRef) -> bool { - let info = match gst_video::VideoInfo::from_caps(caps) { - None => return false, - Some(info) => info, - }; - gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); - - // TODO Puede que falle si no creamos la estructura de cero, pero si lo hacemos no podemos poner recv a none - let mut state = self.state.lock().unwrap(); - state.info = Some(info); - let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); - true - } - - // Called when starting, so we can initialize all stream-related state to its defaults - fn start(&self, element: &BaseSrc) -> bool { - // Reset state - *self.state.lock().unwrap() = Default::default(); - let mut settings = self.settings.lock().unwrap(); - settings.id_receiver = connect_ndi(self.cat, element, settings.ip.clone(), settings.stream_name.clone()); - - if settings.id_receiver == 0{ - return false; - } - else{ - // let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); - return true; - } - } - - // Called when shutting down the element so we can release all stream-related state - fn stop(&self, element: &BaseSrc) -> bool { - // Reset state - *self.state.lock().unwrap() = Default::default(); - - let settings = self.settings.lock().unwrap(); - stop_ndi(self.cat, element, settings.id_receiver.clone()); - // Commented because when adding ndi destroy stopped in this line - //*self.state.lock().unwrap() = Default::default(); - true - } - - - fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool { - use gst::QueryView; - match query.view_mut() { - // We only work in Push mode. In Pull mode, create() could be called with - // arbitrary offsets and we would have to produce for that specific offset - QueryView::Scheduling(ref mut q) => { - q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); - q.add_scheduling_modes(&[gst::PadMode::Push]); - return true; - } - // In Live mode we will have a latency equal to the number of samples in each buffer. - // We can't output samples before they were produced, and the last sample of a buffer - // is produced that much after the beginning, leading to this latency calculation - // QueryView::Latency(ref mut q) => { - // let settings = self.settings.lock().unwrap(); - // let state = self.state.lock().unwrap(); - // println!("Dentro de query"); - // - // if let Some(ref _info) = state.info { - // // let latency = gst::SECOND - // // .mul_div_floor(settings.samples_per_buffer as u64, info.rate() as u64) - // // .unwrap(); - // let latency = gst::SECOND.mul_div_floor(3 as u64, 2 as u64).unwrap(); - // let mut latency = gst::SECOND.mul_div_floor(settings.latency, 1000).unwrap(); - // // if settings.latency > 2000{ - // // println!("{:?}", element.get_name()); - // // latency = gst::SECOND * 0; - // // } - // let latency = gst::SECOND * 0; - // // .mul_div_floor(1 as u64, 30 as u64) - // // .unwrap(); - // // gst_debug!(self.cat, obj: element, "Returning latency {}", latency); - // let max = gst::SECOND * 120 * 1843200; - // // println!("{:?}", latency2); - // println!("{:?}", latency); - // println!("{:?}", (settings.latency / 1000)); - // // println!("{:?}",max); - // q.set(true, latency, max); - // return true; - // } else { - // return false; - // } - // } - _ => (), - } - BaseSrcBase::parent_query(element, query) - } - - fn fixate(&self, element: &BaseSrc, caps: gst::Caps) -> gst::Caps { - //We need to set the correct caps resolution and framerate +// Virtual methods of gst::Element. We override none +impl ElementImpl for NdiVideoSrc { + fn change_state( + &self, + element: &BaseSrc, + transition: gst::StateChange, + ) -> gst::StateChangeReturn { + if transition == gst::StateChange::PausedToPlaying { let receivers = hashmap_receivers.lock().unwrap(); let settings = self.settings.lock().unwrap(); @@ -374,135 +246,281 @@ impl NdiVideoSrc { let video_frame: NDIlib_video_frame_v2_t = Default::default(); let mut frame_type: NDIlib_frame_type_e = NDIlib_frame_type_e::NDIlib_frame_type_none; - while frame_type != NDIlib_frame_type_e::NDIlib_frame_type_video{ - unsafe{ - frame_type = NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000); - } - } - - let mut caps = gst::Caps::truncate(caps); - { - let caps = caps.make_mut(); - let s = caps.get_mut_structure(0).unwrap(); - s.fixate_field_nearest_int("width", video_frame.xres); - s.fixate_field_nearest_int("height", video_frame.yres); - s.fixate_field_nearest_fraction("framerate", Fraction::new(video_frame.frame_rate_N, video_frame.frame_rate_D)); - } - - // Let BaseSrc fixate anything else for us. We could've alternatively have - // called Caps::fixate() here - - let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); - element.parent_fixate(caps) - } - - //Creates the video buffers - fn create( - &self, - element: &BaseSrc, - _offset: u64, - _length: u32, - ) -> Result { - // Keep a local copy of the values of all our properties at this very moment. This - // ensures that the mutex is never locked for long and the application wouldn't - // have to block until this function returns when getting/setting property values - let _settings = &*self.settings.lock().unwrap(); - - let mut timestamp_data = self.timestamp_data.lock().unwrap(); - // Get a locked reference to our state, i.e. the input and output AudioInfo - let state = self.state.lock().unwrap(); - let _info = match state.info { - None => { - gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]); - return Err(gst::FlowReturn::NotNegotiated); - } - Some(ref info) => info.clone(), - }; - // unsafe{ - let receivers = hashmap_receivers.lock().unwrap(); - - let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance; - let pNDI_recv = recv.recv; - - let pts: u64; - let video_frame: NDIlib_video_frame_v2_t = Default::default(); - - unsafe{ - let time = ndi_struct.initial_timestamp; - - let mut skip_frame = true; - while skip_frame { - let frame_type = NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000,); - if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none || frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error { - gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]); - return Err(gst::FlowReturn::CustomError); - } - if time >= (video_frame.timestamp as u64){ - gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (video_frame.timestamp as u64), time); - } - else{ - skip_frame = false; - } + unsafe { + while frame_type != NDIlib_frame_type_e::NDIlib_frame_type_video { + frame_type = NDIlib_recv_capture_v2( + pNDI_recv, + &video_frame, + ptr::null(), + ptr::null(), + 1000, + ); } - pts = video_frame.timestamp as u64 - time; - - let buff_size = (video_frame.yres * video_frame.line_stride_in_bytes) as usize; - let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); + if ndi_struct.initial_timestamp <= video_frame.timestamp as u64 + || ndi_struct.initial_timestamp == 0 { - let vec = Vec::from_raw_parts(video_frame.p_data as *mut u8, buff_size, buff_size); - let pts: gst::ClockTime = (pts * 100).into(); - - let duration: gst::ClockTime = (((video_frame.frame_rate_D as f64 / video_frame.frame_rate_N as f64) * 1000000000.0) as u64).into(); - let buffer = buffer.get_mut().unwrap(); - - if ndi_struct.start_pts == gst::ClockTime(Some(0)){ - ndi_struct.start_pts = element.get_clock().unwrap().get_time() - element.get_base_time(); - } - - buffer.set_pts(pts + ndi_struct.start_pts); - buffer.set_duration(duration); - buffer.set_offset(timestamp_data.offset); - buffer.set_offset_end(timestamp_data.offset + 1); - timestamp_data.offset = timestamp_data.offset + 1; - buffer.copy_from_slice(0, &vec).unwrap(); + ndi_struct.initial_timestamp = video_frame.timestamp as u64; } - - gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer); - - Ok(buffer) } } + element.parent_change_state(transition) + } +} + +// Virtual methods of gst_base::BaseSrc +impl BaseSrcImpl for NdiVideoSrc { + // Called whenever the input/output caps are changing, i.e. in the very beginning before data + // flow happens and whenever the situation in the pipeline is changing. All buffers after this + // call have the caps given here. + // + // We simply remember the resulting AudioInfo from the caps to be able to use this for knowing + // the sample rate, etc. when creating buffers + fn set_caps(&self, element: &BaseSrc, caps: &gst::CapsRef) -> bool { + let info = match gst_video::VideoInfo::from_caps(caps) { + None => return false, + Some(info) => info, + }; + gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); + + // TODO Puede que falle si no creamos la estructura de cero, pero si lo hacemos no podemos poner recv a none + let mut state = self.state.lock().unwrap(); + state.info = Some(info); + let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + true } - // This zero-sized struct is containing the static metadata of our element. It is only necessary to - // be able to implement traits on it, but e.g. a plugin that registers multiple elements with the - // same code would use this struct to store information about the concrete element. An example of - // this would be a plugin that wraps around a library that has multiple decoders with the same API, - // but wants (as it should) a separate element registered for each decoder. - struct NdiVideoSrcStatic; + // Called when starting, so we can initialize all stream-related state to its defaults + fn start(&self, element: &BaseSrc) -> bool { + // Reset state + *self.state.lock().unwrap() = Default::default(); + let mut settings = self.settings.lock().unwrap(); + settings.id_receiver = connect_ndi( + self.cat, + element, + settings.ip.clone(), + settings.stream_name.clone(), + ); - // The basic trait for registering the type: This returns a name for the type and registers the - // instance and class initializations functions with the type system, thus hooking everything - // together. - impl ImplTypeStatic for NdiVideoSrcStatic { - fn get_name(&self) -> &str { - "NdiVideoSrc" - } - - fn new(&self, element: &BaseSrc) -> Box> { - NdiVideoSrc::new(element) - } - - fn class_init(&self, klass: &mut BaseSrcClass) { - NdiVideoSrc::class_init(klass); + if settings.id_receiver == 0 { + return false; + } else { + // let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + return true; } } - // Registers the type for our element, and then registers in GStreamer under - // the name NdiVideoSrc for being able to instantiate it via e.g. - // gst::ElementFactory::make(). - pub fn register(plugin: &gst::Plugin) { - let type_ = register_type(NdiVideoSrcStatic); - gst::Element::register(plugin, "ndivideosrc", 0, type_); + // Called when shutting down the element so we can release all stream-related state + fn stop(&self, element: &BaseSrc) -> bool { + // Reset state + *self.state.lock().unwrap() = Default::default(); + + let settings = self.settings.lock().unwrap(); + stop_ndi(self.cat, element, settings.id_receiver.clone()); + // Commented because when adding ndi destroy stopped in this line + //*self.state.lock().unwrap() = Default::default(); + true } + + fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool { + use gst::QueryView; + match query.view_mut() { + // We only work in Push mode. In Pull mode, create() could be called with + // arbitrary offsets and we would have to produce for that specific offset + QueryView::Scheduling(ref mut q) => { + q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); + q.add_scheduling_modes(&[gst::PadMode::Push]); + return true; + } + // In Live mode we will have a latency equal to the number of samples in each buffer. + // We can't output samples before they were produced, and the last sample of a buffer + // is produced that much after the beginning, leading to this latency calculation + // QueryView::Latency(ref mut q) => { + // let settings = self.settings.lock().unwrap(); + // let state = self.state.lock().unwrap(); + // println!("Dentro de query"); + // + // if let Some(ref _info) = state.info { + // // let latency = gst::SECOND + // // .mul_div_floor(settings.samples_per_buffer as u64, info.rate() as u64) + // // .unwrap(); + // let latency = gst::SECOND.mul_div_floor(3 as u64, 2 as u64).unwrap(); + // let mut latency = gst::SECOND.mul_div_floor(settings.latency, 1000).unwrap(); + // // if settings.latency > 2000{ + // // println!("{:?}", element.get_name()); + // // latency = gst::SECOND * 0; + // // } + // let latency = gst::SECOND * 0; + // // .mul_div_floor(1 as u64, 30 as u64) + // // .unwrap(); + // // gst_debug!(self.cat, obj: element, "Returning latency {}", latency); + // let max = gst::SECOND * 120 * 1843200; + // // println!("{:?}", latency2); + // println!("{:?}", latency); + // println!("{:?}", (settings.latency / 1000)); + // // println!("{:?}",max); + // q.set(true, latency, max); + // return true; + // } else { + // return false; + // } + // } + _ => (), + } + BaseSrcBase::parent_query(element, query) + } + + fn fixate(&self, element: &BaseSrc, caps: gst::Caps) -> gst::Caps { + //We need to set the correct caps resolution and framerate + let receivers = hashmap_receivers.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + let receiver = receivers.get(&settings.id_receiver).unwrap(); + let recv = &receiver.ndi_instance; + let pNDI_recv = recv.recv; + + let video_frame: NDIlib_video_frame_v2_t = Default::default(); + + let mut frame_type: NDIlib_frame_type_e = NDIlib_frame_type_e::NDIlib_frame_type_none; + while frame_type != NDIlib_frame_type_e::NDIlib_frame_type_video { + unsafe { + frame_type = + NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000); + } + } + + let mut caps = gst::Caps::truncate(caps); + { + let caps = caps.make_mut(); + let s = caps.get_mut_structure(0).unwrap(); + s.fixate_field_nearest_int("width", video_frame.xres); + s.fixate_field_nearest_int("height", video_frame.yres); + s.fixate_field_nearest_fraction( + "framerate", + Fraction::new(video_frame.frame_rate_N, video_frame.frame_rate_D), + ); + } + + // Let BaseSrc fixate anything else for us. We could've alternatively have + // called Caps::fixate() here + + let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + element.parent_fixate(caps) + } + + //Creates the video buffers + fn create( + &self, + element: &BaseSrc, + _offset: u64, + _length: u32, + ) -> Result { + // Keep a local copy of the values of all our properties at this very moment. This + // ensures that the mutex is never locked for long and the application wouldn't + // have to block until this function returns when getting/setting property values + let _settings = &*self.settings.lock().unwrap(); + + let mut timestamp_data = self.timestamp_data.lock().unwrap(); + // Get a locked reference to our state, i.e. the input and output AudioInfo + let state = self.state.lock().unwrap(); + let _info = match state.info { + None => { + gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]); + return Err(gst::FlowReturn::NotNegotiated); + } + Some(ref info) => info.clone(), + }; + // unsafe{ + let receivers = hashmap_receivers.lock().unwrap(); + + let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance; + let pNDI_recv = recv.recv; + + let pts: u64; + let video_frame: NDIlib_video_frame_v2_t = Default::default(); + + unsafe { + let time = ndi_struct.initial_timestamp; + + let mut skip_frame = true; + while skip_frame { + let frame_type = + NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000); + if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none + || frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error + { + gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]); + return Err(gst::FlowReturn::CustomError); + } + if time >= (video_frame.timestamp as u64) { + gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (video_frame.timestamp as u64), time); + } else { + skip_frame = false; + } + } + + pts = video_frame.timestamp as u64 - time; + + let buff_size = (video_frame.yres * video_frame.line_stride_in_bytes) as usize; + let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); + { + let vec = Vec::from_raw_parts(video_frame.p_data as *mut u8, buff_size, buff_size); + let pts: gst::ClockTime = (pts * 100).into(); + + let duration: gst::ClockTime = (((video_frame.frame_rate_D as f64 + / video_frame.frame_rate_N as f64) + * 1000000000.0) as u64) + .into(); + let buffer = buffer.get_mut().unwrap(); + + if ndi_struct.start_pts == gst::ClockTime(Some(0)) { + ndi_struct.start_pts = + element.get_clock().unwrap().get_time() - element.get_base_time(); + } + + buffer.set_pts(pts + ndi_struct.start_pts); + buffer.set_duration(duration); + buffer.set_offset(timestamp_data.offset); + buffer.set_offset_end(timestamp_data.offset + 1); + timestamp_data.offset = timestamp_data.offset + 1; + buffer.copy_from_slice(0, &vec).unwrap(); + } + + gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer); + + Ok(buffer) + } + } +} + +// This zero-sized struct is containing the static metadata of our element. It is only necessary to +// be able to implement traits on it, but e.g. a plugin that registers multiple elements with the +// same code would use this struct to store information about the concrete element. An example of +// this would be a plugin that wraps around a library that has multiple decoders with the same API, +// but wants (as it should) a separate element registered for each decoder. +struct NdiVideoSrcStatic; + +// The basic trait for registering the type: This returns a name for the type and registers the +// instance and class initializations functions with the type system, thus hooking everything +// together. +impl ImplTypeStatic for NdiVideoSrcStatic { + fn get_name(&self) -> &str { + "NdiVideoSrc" + } + + fn new(&self, element: &BaseSrc) -> Box> { + NdiVideoSrc::new(element) + } + + fn class_init(&self, klass: &mut BaseSrcClass) { + NdiVideoSrc::class_init(klass); + } +} + +// Registers the type for our element, and then registers in GStreamer under +// the name NdiVideoSrc for being able to instantiate it via e.g. +// gst::ElementFactory::make(). +pub fn register(plugin: &gst::Plugin) { + let type_ = register_type(NdiVideoSrcStatic); + gst::Element::register(plugin, "ndivideosrc", 0, type_); +}