diff --git a/gst-plugin-ndi/src/ndisrc.rs b/gst-plugin-ndi/src/ndisrc.rs index 9323b0d2..1a52ef95 100644 --- a/gst-plugin-ndi/src/ndisrc.rs +++ b/gst-plugin-ndi/src/ndisrc.rs @@ -9,7 +9,8 @@ use glib; use gst; use gst::prelude::*; -use gst_audio; +//use gst_audio; +use gst_video; use gst_base::prelude::*; use byte_slice_cast::*; @@ -109,19 +110,19 @@ static PROPERTIES: [Property; 6] = [ // Stream-specific state, i.e. audio format configuration // and sample offset struct State { - info: Option, - sample_offset: u64, - sample_stop: Option, - accumulator: f64, + info: Option, + // sample_offset: u64, + // sample_stop: Option, + // accumulator: f64, } impl Default for State { fn default() -> State { State { info: None, - sample_offset: 0, - sample_stop: None, - accumulator: 0.0, + // sample_offset: 0, + // sample_stop: None, + // accumulator: 0.0, } } } @@ -183,32 +184,41 @@ impl NdiSrc { // On the src pad, we can produce F32/F64 with any sample rate // and any number of channels let caps = gst::Caps::new_simple( - "audio/x-raw", + "video/x-raw", &[ ( "format", &gst::List::new(&[ - &gst_audio::AUDIO_FORMAT_F32.to_string(), - &gst_audio::AUDIO_FORMAT_F64.to_string(), - ]), + &gst_video::VideoFormat::Rgb.to_string(), + &gst_video::VideoFormat::Gray8.to_string(), + ]), + ), + // ("layout", &"interleaved"), + // ("rate", &gst::IntRange::::new(1, i32::MAX)), + // ("channels", &gst::IntRange::::new(1, i32::MAX)), + ("width", &gst::IntRange::::new(0, i32::MAX)), + ("height", &gst::IntRange::::new(0, i32::MAX)), + ( + "framerate", + &gst::FractionRange::new( + gst::Fraction::new(0, 1), + gst::Fraction::new(i32::MAX, 1), ), - ("layout", &"interleaved"), - ("rate", &gst::IntRange::::new(1, i32::MAX)), - ("channels", &gst::IntRange::::new(1, i32::MAX)), - ], - ); - // The src pad template must be named "src" for basesrc - // and specific a pad that is always there - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - ); - klass.add_pad_template(src_pad_template); + ), + ], + ); + // The src pad template must be named "src" for basesrc + // and specific a pad that is always there + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ); + klass.add_pad_template(src_pad_template); - // Install all our properties - klass.install_properties(&PROPERTIES); + // Install all our properties + klass.install_properties(&PROPERTIES); } fn process( @@ -414,49 +424,54 @@ impl BaseSrcImpl for NdiSrc { fn set_caps(&self, element: &BaseSrc, caps: &gst::CapsRef) -> bool { use std::f64::consts::PI; - let info = match gst_audio::AudioInfo::from_caps(caps) { + let info = match gst_video::VideoInfo::from_caps(caps) { None => return false, Some(info) => info, }; gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); - element.set_blocksize(info.bpf() * (*self.settings.lock().unwrap()).samples_per_buffer); - - let settings = &*self.settings.lock().unwrap(); let mut state = self.state.lock().unwrap(); - - // If we have no caps yet, any old sample_offset and sample_stop will be - // in nanoseconds - let old_rate = match state.info { - Some(ref info) => info.rate() as u64, - None => gst::SECOND_VAL, - }; - - // Update sample offset and accumulator based on the previous values and the - // sample rate change, if any - let old_sample_offset = state.sample_offset; - let sample_offset = old_sample_offset - .mul_div_floor(info.rate() as u64, old_rate) - .unwrap(); - - let old_sample_stop = state.sample_stop; - let sample_stop = - old_sample_stop.map(|v| v.mul_div_floor(info.rate() as u64, old_rate).unwrap()); - - let accumulator = - (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (info.rate() as f64)); - - *state = State { + *state = State { info: Some(info), - sample_offset: sample_offset, - sample_stop: sample_stop, - accumulator: accumulator, - }; + }; - drop(state); - - let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + // element.set_blocksize(info.bpf() * (*self.settings.lock().unwrap()).samples_per_buffer); + // + // let settings = &*self.settings.lock().unwrap(); + // let mut state = self.state.lock().unwrap(); + // + // // If we have no caps yet, any old sample_offset and sample_stop will be + // // in nanoseconds + // let old_rate = match state.info { + // Some(ref info) => info.rate() as u64, + // None => gst::SECOND_VAL, + // }; + // + // // Update sample offset and accumulator based on the previous values and the + // // sample rate change, if any + // let old_sample_offset = state.sample_offset; + // let sample_offset = old_sample_offset + // .mul_div_floor(info.rate() as u64, old_rate) + // .unwrap(); + // + // let old_sample_stop = state.sample_stop; + // let sample_stop = + // old_sample_stop.map(|v| v.mul_div_floor(info.rate() as u64, old_rate).unwrap()); + // + // let accumulator = + // (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (info.rate() as f64)); + // + // *state = State { + // info: Some(info), + // sample_offset: sample_offset, + // sample_stop: sample_stop, + // accumulator: accumulator, + // }; + // + // drop(state); + // + // let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); true } @@ -483,39 +498,39 @@ impl BaseSrcImpl for NdiSrc { true } - fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool { - use gst::QueryView; - - match query.view_mut() { - // We only work in Push mode. In Pull mode, create() could be called with - // arbitrary offsets and we would have to produce for that specific offset - QueryView::Scheduling(ref mut q) => { - q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); - q.add_scheduling_modes(&[gst::PadMode::Push]); - return true; - } - // In Live mode we will have a latency equal to the number of samples in each buffer. - // We can't output samples before they were produced, and the last sample of a buffer - // is produced that much after the beginning, leading to this latency calculation - QueryView::Latency(ref mut q) => { - let settings = &*self.settings.lock().unwrap(); - let state = self.state.lock().unwrap(); - - if let Some(ref info) = state.info { - let latency = gst::SECOND - .mul_div_floor(settings.samples_per_buffer as u64, info.rate() as u64) - .unwrap(); - gst_debug!(self.cat, obj: element, "Returning latency {}", latency); - q.set(settings.is_live, latency, gst::CLOCK_TIME_NONE); - return true; - } else { - return false; - } - } - _ => (), - } - BaseSrcBase::parent_query(element, query) - } + // fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool { + // use gst::QueryView; + // + // match query.view_mut() { + // // We only work in Push mode. In Pull mode, create() could be called with + // // arbitrary offsets and we would have to produce for that specific offset + // QueryView::Scheduling(ref mut q) => { + // q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); + // q.add_scheduling_modes(&[gst::PadMode::Push]); + // return true; + // } + // // In Live mode we will have a latency equal to the number of samples in each buffer. + // // We can't output samples before they were produced, and the last sample of a buffer + // // is produced that much after the beginning, leading to this latency calculation + // QueryView::Latency(ref mut q) => { + // let settings = &*self.settings.lock().unwrap(); + // let state = self.state.lock().unwrap(); + // + // if let Some(ref info) = state.info { + // let latency = gst::SECOND + // .mul_div_floor(settings.samples_per_buffer as u64, info.rate() as u64) + // .unwrap(); + // gst_debug!(self.cat, obj: element, "Returning latency {}", latency); + // q.set(settings.is_live, latency, gst::CLOCK_TIME_NONE); + // return true; + // } else { + // return false; + // } + // } + // _ => (), + // } + // BaseSrcBase::parent_query(element, query) + // } // Creates the audio buffers fn create( @@ -539,134 +554,136 @@ impl BaseSrcImpl for NdiSrc { Some(ref info) => info.clone(), }; - // If a stop position is set (from a seek), only produce samples up to that - // point but at most samples_per_buffer samples per buffer - let n_samples = if let Some(sample_stop) = state.sample_stop { - if sample_stop <= state.sample_offset { - gst_log!(self.cat, obj: element, "At EOS"); - return Err(gst::FlowReturn::Eos); - } - - sample_stop - state.sample_offset - } else { - settings.samples_per_buffer as u64 - }; + // // If a stop position is set (from a seek), only produce samples up to that + // // point but at most samples_per_buffer samples per buffer + // let n_samples = if let Some(sample_stop) = state.sample_stop { + // if sample_stop <= state.sample_offset { + // gst_log!(self.cat, obj: element, "At EOS"); + // return Err(gst::FlowReturn::Eos); + // } + // + // sample_stop - state.sample_offset + // } else { + // settings.samples_per_buffer as u64 + // }; // Allocate a new buffer of the required size, update the metadata with the // current timestamp and duration and then fill it according to the current // caps + //TODO Set buffer size from data received from NDI let mut buffer = - gst::Buffer::with_size((n_samples as usize) * (info.bpf() as usize)).unwrap(); + // gst::Buffer::with_size((n_samples as usize) * (info.bpf() as usize)).unwrap(); + gst::Buffer::with_size(200 * 400).unwrap(); { let buffer = buffer.get_mut().unwrap(); - // Calculate the current timestamp (PTS) and the next one, - // and calculate the duration from the difference instead of - // simply the number of samples to prevent rounding errors - let pts = state - .sample_offset - .mul_div_floor(gst::SECOND_VAL, info.rate() as u64) - .unwrap() - .into(); - let next_pts: gst::ClockTime = (state.sample_offset + n_samples) - .mul_div_floor(gst::SECOND_VAL, info.rate() as u64) - .unwrap() - .into(); - buffer.set_pts(pts); - buffer.set_duration(next_pts - pts); - - // Map the buffer writable and create the actual samples - let mut map = buffer.map_writable().unwrap(); - let data = map.as_mut_slice(); - - if info.format() == gst_audio::AUDIO_FORMAT_F32 { - Self::process::( - data, - &mut state.accumulator, - settings.freq, - info.rate(), - info.channels(), - settings.volume, - ); - } else { - Self::process::( - data, - &mut state.accumulator, - settings.freq, - info.rate(), - info.channels(), - settings.volume, - ); - } - } - state.sample_offset += n_samples; - drop(state); - - // If we're live, we are waiting until the time of the last sample in our buffer has - // arrived. This is the very reason why we have to report that much latency. - // A real live-source would of course only allow us to have the data available after - // that latency, e.g. when capturing from a microphone, and no waiting from our side - // would be necessary.. + // // Calculate the current timestamp (PTS) and the next one, + // // and calculate the duration from the difference instead of + // // simply the number of samples to prevent rounding errors + // let pts = state + // .sample_offset + // .mul_div_floor(gst::SECOND_VAL, info.rate() as u64) + // .unwrap() + // .into(); + // let next_pts: gst::ClockTime = (state.sample_offset + n_samples) + // .mul_div_floor(gst::SECOND_VAL, info.rate() as u64) + // .unwrap() + // .into(); + // buffer.set_pts(pts); + // buffer.set_duration(next_pts - pts); // - // Waiting happens based on the pipeline clock, which means that a real live source - // with its own clock would require various translations between the two clocks. - // This is out of scope for the tutorial though. - if element.is_live() { - let clock = match element.get_clock() { - None => return Ok(buffer), - Some(clock) => clock, - }; - - let segment = element - .get_segment() - .downcast::() - .unwrap(); - let base_time = element.get_base_time(); - let running_time = segment.to_running_time(buffer.get_pts() + buffer.get_duration()); - - // The last sample's clock time is the base time of the element plus the - // running time of the last sample - let wait_until = running_time + base_time; - if wait_until.is_none() { - return Ok(buffer); - } - - // Store the clock ID in our struct unless we're flushing anyway. - // This allows to asynchronously cancel the waiting from unlock() - // so that we immediately stop waiting on e.g. shutdown. - let mut clock_wait = self.clock_wait.lock().unwrap(); - if clock_wait.flushing { - gst_debug!(self.cat, obj: element, "Flushing"); - return Err(gst::FlowReturn::Flushing); - } - - let id = clock.new_single_shot_id(wait_until).unwrap(); - clock_wait.clock_id = Some(id.clone()); - drop(clock_wait); - - gst_log!( - self.cat, - obj: element, - "Waiting until {}, now {}", - wait_until, - clock.get_time() - ); - let (res, jitter) = id.wait(); - gst_log!( - self.cat, - obj: element, - "Waited res {:?} jitter {}", - res, - jitter - ); - self.clock_wait.lock().unwrap().clock_id.take(); - - // If the clock ID was unscheduled, unlock() was called - // and we should return Flushing immediately. - if res == gst::ClockReturn::Unscheduled { - gst_debug!(self.cat, obj: element, "Flushing"); - return Err(gst::FlowReturn::Flushing); - } + // // Map the buffer writable and create the actual samples + // let mut map = buffer.map_writable().unwrap(); + // let data = map.as_mut_slice(); + // + // if info.format() == gst_audio::AUDIO_FORMAT_F32 { + // Self::process::( + // data, + // &mut state.accumulator, + // settings.freq, + // info.rate(), + // info.channels(), + // settings.volume, + // ); + // } else { + // Self::process::( + // data, + // &mut state.accumulator, + // settings.freq, + // info.rate(), + // info.channels(), + // settings.volume, + // ); + // } + // } + // state.sample_offset += n_samples; + // drop(state); + // + // // If we're live, we are waiting until the time of the last sample in our buffer has + // // arrived. This is the very reason why we have to report that much latency. + // // A real live-source would of course only allow us to have the data available after + // // that latency, e.g. when capturing from a microphone, and no waiting from our side + // // would be necessary.. + // // + // // Waiting happens based on the pipeline clock, which means that a real live source + // // with its own clock would require various translations between the two clocks. + // // This is out of scope for the tutorial though. + // if element.is_live() { + // let clock = match element.get_clock() { + // None => return Ok(buffer), + // Some(clock) => clock, + // }; + // + // let segment = element + // .get_segment() + // .downcast::() + // .unwrap(); + // let base_time = element.get_base_time(); + // let running_time = segment.to_running_time(buffer.get_pts() + buffer.get_duration()); + // + // // The last sample's clock time is the base time of the element plus the + // // running time of the last sample + // let wait_until = running_time + base_time; + // if wait_until.is_none() { + // return Ok(buffer); + // } + // + // // Store the clock ID in our struct unless we're flushing anyway. + // // This allows to asynchronously cancel the waiting from unlock() + // // so that we immediately stop waiting on e.g. shutdown. + // let mut clock_wait = self.clock_wait.lock().unwrap(); + // if clock_wait.flushing { + // gst_debug!(self.cat, obj: element, "Flushing"); + // return Err(gst::FlowReturn::Flushing); + // } + // + // let id = clock.new_single_shot_id(wait_until).unwrap(); + // clock_wait.clock_id = Some(id.clone()); + // drop(clock_wait); + // + // gst_log!( + // self.cat, + // obj: element, + // "Waiting until {}, now {}", + // wait_until, + // clock.get_time() + // ); + // let (res, jitter) = id.wait(); + // gst_log!( + // self.cat, + // obj: element, + // "Waited res {:?} jitter {}", + // res, + // jitter + // ); + // self.clock_wait.lock().unwrap().clock_id.take(); + // + // // If the clock ID was unscheduled, unlock() was called + // // and we should return Flushing immediately. + // if res == gst::ClockReturn::Unscheduled { + // gst_debug!(self.cat, obj: element, "Flushing"); + // return Err(gst::FlowReturn::Flushing); + // } } gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer);