Added videocaps and commented almost audiosrc related code

This commit is contained in:
Daniel Vilar 2018-04-12 17:52:21 +02:00
parent 227d750305
commit 2b09a48a5c

View file

@ -9,7 +9,8 @@
use glib;
use gst;
use gst::prelude::*;
use gst_audio;
//use gst_audio;
use gst_video;
use gst_base::prelude::*;
use byte_slice_cast::*;
@ -109,19 +110,19 @@ static PROPERTIES: [Property; 6] = [
// Stream-specific state, i.e. audio format configuration
// and sample offset
struct State {
info: Option<gst_audio::AudioInfo>,
sample_offset: u64,
sample_stop: Option<u64>,
accumulator: f64,
info: Option<gst_video::VideoInfo>,
// sample_offset: u64,
// sample_stop: Option<u64>,
// accumulator: f64,
}
impl Default for State {
fn default() -> State {
State {
info: None,
sample_offset: 0,
sample_stop: None,
accumulator: 0.0,
// sample_offset: 0,
// sample_stop: None,
// accumulator: 0.0,
}
}
}
@ -183,32 +184,41 @@ impl NdiSrc {
// On the src pad, we can produce F32/F64 with any sample rate
// and any number of channels
let caps = gst::Caps::new_simple(
"audio/x-raw",
"video/x-raw",
&[
(
"format",
&gst::List::new(&[
&gst_audio::AUDIO_FORMAT_F32.to_string(),
&gst_audio::AUDIO_FORMAT_F64.to_string(),
]),
&gst_video::VideoFormat::Rgb.to_string(),
&gst_video::VideoFormat::Gray8.to_string(),
]),
),
// ("layout", &"interleaved"),
// ("rate", &gst::IntRange::<i32>::new(1, i32::MAX)),
// ("channels", &gst::IntRange::<i32>::new(1, i32::MAX)),
("width", &gst::IntRange::<i32>::new(0, i32::MAX)),
("height", &gst::IntRange::<i32>::new(0, i32::MAX)),
(
"framerate",
&gst::FractionRange::new(
gst::Fraction::new(0, 1),
gst::Fraction::new(i32::MAX, 1),
),
("layout", &"interleaved"),
("rate", &gst::IntRange::<i32>::new(1, i32::MAX)),
("channels", &gst::IntRange::<i32>::new(1, i32::MAX)),
],
);
// The src pad template must be named "src" for basesrc
// and specific a pad that is always there
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
),
],
);
// The src pad template must be named "src" for basesrc
// and specific a pad that is always there
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
// Install all our properties
klass.install_properties(&PROPERTIES);
// Install all our properties
klass.install_properties(&PROPERTIES);
}
fn process<F: Float + FromByteSlice>(
@ -414,49 +424,54 @@ impl BaseSrcImpl<BaseSrc> for NdiSrc {
fn set_caps(&self, element: &BaseSrc, caps: &gst::CapsRef) -> bool {
use std::f64::consts::PI;
let info = match gst_audio::AudioInfo::from_caps(caps) {
let info = match gst_video::VideoInfo::from_caps(caps) {
None => return false,
Some(info) => info,
};
gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps);
element.set_blocksize(info.bpf() * (*self.settings.lock().unwrap()).samples_per_buffer);
let settings = &*self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
// If we have no caps yet, any old sample_offset and sample_stop will be
// in nanoseconds
let old_rate = match state.info {
Some(ref info) => info.rate() as u64,
None => gst::SECOND_VAL,
};
// Update sample offset and accumulator based on the previous values and the
// sample rate change, if any
let old_sample_offset = state.sample_offset;
let sample_offset = old_sample_offset
.mul_div_floor(info.rate() as u64, old_rate)
.unwrap();
let old_sample_stop = state.sample_stop;
let sample_stop =
old_sample_stop.map(|v| v.mul_div_floor(info.rate() as u64, old_rate).unwrap());
let accumulator =
(sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (info.rate() as f64));
*state = State {
*state = State {
info: Some(info),
sample_offset: sample_offset,
sample_stop: sample_stop,
accumulator: accumulator,
};
};
drop(state);
let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());
// element.set_blocksize(info.bpf() * (*self.settings.lock().unwrap()).samples_per_buffer);
//
// let settings = &*self.settings.lock().unwrap();
// let mut state = self.state.lock().unwrap();
//
// // If we have no caps yet, any old sample_offset and sample_stop will be
// // in nanoseconds
// let old_rate = match state.info {
// Some(ref info) => info.rate() as u64,
// None => gst::SECOND_VAL,
// };
//
// // Update sample offset and accumulator based on the previous values and the
// // sample rate change, if any
// let old_sample_offset = state.sample_offset;
// let sample_offset = old_sample_offset
// .mul_div_floor(info.rate() as u64, old_rate)
// .unwrap();
//
// let old_sample_stop = state.sample_stop;
// let sample_stop =
// old_sample_stop.map(|v| v.mul_div_floor(info.rate() as u64, old_rate).unwrap());
//
// let accumulator =
// (sample_offset as f64).rem(2.0 * PI * (settings.freq as f64) / (info.rate() as f64));
//
// *state = State {
// info: Some(info),
// sample_offset: sample_offset,
// sample_stop: sample_stop,
// accumulator: accumulator,
// };
//
// drop(state);
//
// let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build());
true
}
@ -483,39 +498,39 @@ impl BaseSrcImpl<BaseSrc> for NdiSrc {
true
}
fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
match query.view_mut() {
// We only work in Push mode. In Pull mode, create() could be called with
// arbitrary offsets and we would have to produce for that specific offset
QueryView::Scheduling(ref mut q) => {
q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
q.add_scheduling_modes(&[gst::PadMode::Push]);
return true;
}
// In Live mode we will have a latency equal to the number of samples in each buffer.
// We can't output samples before they were produced, and the last sample of a buffer
// is produced that much after the beginning, leading to this latency calculation
QueryView::Latency(ref mut q) => {
let settings = &*self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
if let Some(ref info) = state.info {
let latency = gst::SECOND
.mul_div_floor(settings.samples_per_buffer as u64, info.rate() as u64)
.unwrap();
gst_debug!(self.cat, obj: element, "Returning latency {}", latency);
q.set(settings.is_live, latency, gst::CLOCK_TIME_NONE);
return true;
} else {
return false;
}
}
_ => (),
}
BaseSrcBase::parent_query(element, query)
}
// fn query(&self, element: &BaseSrc, query: &mut gst::QueryRef) -> bool {
// use gst::QueryView;
//
// match query.view_mut() {
// // We only work in Push mode. In Pull mode, create() could be called with
// // arbitrary offsets and we would have to produce for that specific offset
// QueryView::Scheduling(ref mut q) => {
// q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0);
// q.add_scheduling_modes(&[gst::PadMode::Push]);
// return true;
// }
// // In Live mode we will have a latency equal to the number of samples in each buffer.
// // We can't output samples before they were produced, and the last sample of a buffer
// // is produced that much after the beginning, leading to this latency calculation
// QueryView::Latency(ref mut q) => {
// let settings = &*self.settings.lock().unwrap();
// let state = self.state.lock().unwrap();
//
// if let Some(ref info) = state.info {
// let latency = gst::SECOND
// .mul_div_floor(settings.samples_per_buffer as u64, info.rate() as u64)
// .unwrap();
// gst_debug!(self.cat, obj: element, "Returning latency {}", latency);
// q.set(settings.is_live, latency, gst::CLOCK_TIME_NONE);
// return true;
// } else {
// return false;
// }
// }
// _ => (),
// }
// BaseSrcBase::parent_query(element, query)
// }
// Creates the audio buffers
fn create(
@ -539,134 +554,136 @@ impl BaseSrcImpl<BaseSrc> for NdiSrc {
Some(ref info) => info.clone(),
};
// If a stop position is set (from a seek), only produce samples up to that
// point but at most samples_per_buffer samples per buffer
let n_samples = if let Some(sample_stop) = state.sample_stop {
if sample_stop <= state.sample_offset {
gst_log!(self.cat, obj: element, "At EOS");
return Err(gst::FlowReturn::Eos);
}
sample_stop - state.sample_offset
} else {
settings.samples_per_buffer as u64
};
// // If a stop position is set (from a seek), only produce samples up to that
// // point but at most samples_per_buffer samples per buffer
// let n_samples = if let Some(sample_stop) = state.sample_stop {
// if sample_stop <= state.sample_offset {
// gst_log!(self.cat, obj: element, "At EOS");
// return Err(gst::FlowReturn::Eos);
// }
//
// sample_stop - state.sample_offset
// } else {
// settings.samples_per_buffer as u64
// };
// Allocate a new buffer of the required size, update the metadata with the
// current timestamp and duration and then fill it according to the current
// caps
//TODO Set buffer size from data received from NDI
let mut buffer =
gst::Buffer::with_size((n_samples as usize) * (info.bpf() as usize)).unwrap();
// gst::Buffer::with_size((n_samples as usize) * (info.bpf() as usize)).unwrap();
gst::Buffer::with_size(200 * 400).unwrap();
{
let buffer = buffer.get_mut().unwrap();
// Calculate the current timestamp (PTS) and the next one,
// and calculate the duration from the difference instead of
// simply the number of samples to prevent rounding errors
let pts = state
.sample_offset
.mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
.unwrap()
.into();
let next_pts: gst::ClockTime = (state.sample_offset + n_samples)
.mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
.unwrap()
.into();
buffer.set_pts(pts);
buffer.set_duration(next_pts - pts);
// Map the buffer writable and create the actual samples
let mut map = buffer.map_writable().unwrap();
let data = map.as_mut_slice();
if info.format() == gst_audio::AUDIO_FORMAT_F32 {
Self::process::<f32>(
data,
&mut state.accumulator,
settings.freq,
info.rate(),
info.channels(),
settings.volume,
);
} else {
Self::process::<f64>(
data,
&mut state.accumulator,
settings.freq,
info.rate(),
info.channels(),
settings.volume,
);
}
}
state.sample_offset += n_samples;
drop(state);
// If we're live, we are waiting until the time of the last sample in our buffer has
// arrived. This is the very reason why we have to report that much latency.
// A real live-source would of course only allow us to have the data available after
// that latency, e.g. when capturing from a microphone, and no waiting from our side
// would be necessary..
// // Calculate the current timestamp (PTS) and the next one,
// // and calculate the duration from the difference instead of
// // simply the number of samples to prevent rounding errors
// let pts = state
// .sample_offset
// .mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
// .unwrap()
// .into();
// let next_pts: gst::ClockTime = (state.sample_offset + n_samples)
// .mul_div_floor(gst::SECOND_VAL, info.rate() as u64)
// .unwrap()
// .into();
// buffer.set_pts(pts);
// buffer.set_duration(next_pts - pts);
//
// Waiting happens based on the pipeline clock, which means that a real live source
// with its own clock would require various translations between the two clocks.
// This is out of scope for the tutorial though.
if element.is_live() {
let clock = match element.get_clock() {
None => return Ok(buffer),
Some(clock) => clock,
};
let segment = element
.get_segment()
.downcast::<gst::format::Time>()
.unwrap();
let base_time = element.get_base_time();
let running_time = segment.to_running_time(buffer.get_pts() + buffer.get_duration());
// The last sample's clock time is the base time of the element plus the
// running time of the last sample
let wait_until = running_time + base_time;
if wait_until.is_none() {
return Ok(buffer);
}
// Store the clock ID in our struct unless we're flushing anyway.
// This allows to asynchronously cancel the waiting from unlock()
// so that we immediately stop waiting on e.g. shutdown.
let mut clock_wait = self.clock_wait.lock().unwrap();
if clock_wait.flushing {
gst_debug!(self.cat, obj: element, "Flushing");
return Err(gst::FlowReturn::Flushing);
}
let id = clock.new_single_shot_id(wait_until).unwrap();
clock_wait.clock_id = Some(id.clone());
drop(clock_wait);
gst_log!(
self.cat,
obj: element,
"Waiting until {}, now {}",
wait_until,
clock.get_time()
);
let (res, jitter) = id.wait();
gst_log!(
self.cat,
obj: element,
"Waited res {:?} jitter {}",
res,
jitter
);
self.clock_wait.lock().unwrap().clock_id.take();
// If the clock ID was unscheduled, unlock() was called
// and we should return Flushing immediately.
if res == gst::ClockReturn::Unscheduled {
gst_debug!(self.cat, obj: element, "Flushing");
return Err(gst::FlowReturn::Flushing);
}
// // Map the buffer writable and create the actual samples
// let mut map = buffer.map_writable().unwrap();
// let data = map.as_mut_slice();
//
// if info.format() == gst_audio::AUDIO_FORMAT_F32 {
// Self::process::<f32>(
// data,
// &mut state.accumulator,
// settings.freq,
// info.rate(),
// info.channels(),
// settings.volume,
// );
// } else {
// Self::process::<f64>(
// data,
// &mut state.accumulator,
// settings.freq,
// info.rate(),
// info.channels(),
// settings.volume,
// );
// }
// }
// state.sample_offset += n_samples;
// drop(state);
//
// // If we're live, we are waiting until the time of the last sample in our buffer has
// // arrived. This is the very reason why we have to report that much latency.
// // A real live-source would of course only allow us to have the data available after
// // that latency, e.g. when capturing from a microphone, and no waiting from our side
// // would be necessary..
// //
// // Waiting happens based on the pipeline clock, which means that a real live source
// // with its own clock would require various translations between the two clocks.
// // This is out of scope for the tutorial though.
// if element.is_live() {
// let clock = match element.get_clock() {
// None => return Ok(buffer),
// Some(clock) => clock,
// };
//
// let segment = element
// .get_segment()
// .downcast::<gst::format::Time>()
// .unwrap();
// let base_time = element.get_base_time();
// let running_time = segment.to_running_time(buffer.get_pts() + buffer.get_duration());
//
// // The last sample's clock time is the base time of the element plus the
// // running time of the last sample
// let wait_until = running_time + base_time;
// if wait_until.is_none() {
// return Ok(buffer);
// }
//
// // Store the clock ID in our struct unless we're flushing anyway.
// // This allows to asynchronously cancel the waiting from unlock()
// // so that we immediately stop waiting on e.g. shutdown.
// let mut clock_wait = self.clock_wait.lock().unwrap();
// if clock_wait.flushing {
// gst_debug!(self.cat, obj: element, "Flushing");
// return Err(gst::FlowReturn::Flushing);
// }
//
// let id = clock.new_single_shot_id(wait_until).unwrap();
// clock_wait.clock_id = Some(id.clone());
// drop(clock_wait);
//
// gst_log!(
// self.cat,
// obj: element,
// "Waiting until {}, now {}",
// wait_until,
// clock.get_time()
// );
// let (res, jitter) = id.wait();
// gst_log!(
// self.cat,
// obj: element,
// "Waited res {:?} jitter {}",
// res,
// jitter
// );
// self.clock_wait.lock().unwrap().clock_id.take();
//
// // If the clock ID was unscheduled, unlock() was called
// // and we should return Flushing immediately.
// if res == gst::ClockReturn::Unscheduled {
// gst_debug!(self.cat, obj: element, "Flushing");
// return Err(gst::FlowReturn::Flushing);
// }
}
gst_debug!(self.cat, obj: element, "Produced buffer {:?}", buffer);