togglerecord: Calculate a fallback duration for audio/video buffers based on the caps if possible

I.e. based on the framerate for video and based on the buffer size,
sample size and sample rate for raw audio.
This commit is contained in:
Sebastian Dröge 2019-07-10 19:30:04 +03:00
parent caeff6f968
commit 20c02c4b38
3 changed files with 195 additions and 84 deletions

View file

@ -10,6 +10,7 @@ edition = "2018"
[dependencies]
glib = { git = "https://github.com/gtk-rs/glib", features = ["subclassing"] }
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] }
gstreamer-audio = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gtk = { git = "https://github.com/gtk-rs/gtk", optional = true }
gio = { git = "https://github.com/gtk-rs/gio", optional = true }

View file

@ -21,6 +21,7 @@
extern crate glib;
#[macro_use]
extern crate gstreamer as gst;
extern crate gstreamer_audio as gst_audio;
extern crate gstreamer_video as gst_video;
extern crate parking_lot;

View file

@ -22,6 +22,7 @@ use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_audio;
use gst_video;
use parking_lot::{Condvar, Mutex};
@ -101,6 +102,8 @@ struct StreamState {
flushing: bool,
segment_pending: bool,
pending_events: Vec<gst::Event>,
audio_info: Option<gst_audio::AudioInfo>,
video_info: Option<gst_video::VideoInfo>,
}
impl Default for StreamState {
@ -114,6 +117,8 @@ impl Default for StreamState {
flushing: false,
segment_pending: false,
pending_events: Vec::new(),
audio_info: None,
video_info: None,
}
}
}
@ -162,13 +167,93 @@ impl Default for State {
}
#[derive(Debug, PartialEq, Eq)]
enum HandleResult {
Pass,
enum HandleResult<T> {
Pass(T),
Drop,
Eos,
Flushing,
}
trait HandleData: Sized {
fn get_pts(&self) -> gst::ClockTime;
fn get_dts(&self) -> gst::ClockTime;
fn get_dts_or_pts(&self) -> gst::ClockTime {
let dts = self.get_dts();
if dts.is_some() {
dts
} else {
self.get_pts()
}
}
fn get_duration(&self, state: &StreamState) -> gst::ClockTime;
fn is_keyframe(&self) -> bool;
}
impl HandleData for (gst::ClockTime, gst::ClockTime) {
fn get_pts(&self) -> gst::ClockTime {
self.0
}
fn get_dts(&self) -> gst::ClockTime {
self.0
}
fn get_duration(&self, _state: &StreamState) -> gst::ClockTime {
self.1
}
fn is_keyframe(&self) -> bool {
true
}
}
impl HandleData for gst::Buffer {
fn get_pts(&self) -> gst::ClockTime {
gst::BufferRef::get_pts(self)
}
fn get_dts(&self) -> gst::ClockTime {
gst::BufferRef::get_dts(self)
}
fn get_duration(&self, state: &StreamState) -> gst::ClockTime {
let duration = gst::BufferRef::get_duration(self);
if duration.is_some() {
duration
} else if let Some(ref video_info) = state.video_info {
if video_info.fps() != 0.into() {
gst::SECOND
.mul_div_floor(
*video_info.fps().denom() as u64,
*video_info.fps().numer() as u64,
)
.unwrap_or(gst::CLOCK_TIME_NONE)
} else {
gst::CLOCK_TIME_NONE
}
} else if let Some(ref audio_info) = state.audio_info {
if audio_info.bpf() == 0 || audio_info.rate() == 0 {
return gst::CLOCK_TIME_NONE;
}
let size = self.get_size() as u64;
let num_samples = size / audio_info.bpf() as u64;
let duration = gst::SECOND
.mul_div_floor(num_samples, audio_info.rate() as u64)
.unwrap_or(gst::CLOCK_TIME_NONE);
duration
} else {
gst::CLOCK_TIME_NONE
}
}
fn is_keyframe(&self) -> bool {
!gst::BufferRef::get_flags(self).contains(gst::BufferFlags::DELTA_UNIT)
}
}
struct ToggleRecord {
cat: gst::DebugCategory,
settings: Mutex<Settings>,
@ -236,17 +321,27 @@ impl ToggleRecord {
});
}
fn handle_main_stream(
fn handle_main_stream<T: HandleData>(
&self,
element: &gst::Element,
pad: &gst::Pad,
stream: &Stream,
is_keyframe: bool,
mut dts_or_pts: gst::ClockTime,
duration: gst::ClockTime,
) -> HandleResult {
data: T,
) -> Result<HandleResult<T>, gst::FlowError> {
let mut state = stream.state.lock();
let mut dts_or_pts = data.get_dts_or_pts();
let duration = data.get_duration(&state);
if !dts_or_pts.is_some() {
gst_element_error!(
element,
gst::StreamError::Format,
["Buffer without DTS or PTS"]
);
return Err(gst::FlowError::Error);
}
let mut dts_or_pts_end = if duration.is_some() {
dts_or_pts + duration
} else {
@ -305,10 +400,10 @@ impl ToggleRecord {
// Remember where we stopped last, in case of EOS
rec_state.last_recording_stop = current_running_time_end;
gst_log!(self.cat, obj: pad, "Passing buffer (recording)");
HandleResult::Pass
Ok(HandleResult::Pass(data))
}
RecordingState::Stopping => {
if !is_keyframe {
if !data.is_keyframe() {
// Remember where we stopped last, in case of EOS
rec_state.last_recording_stop = current_running_time_end;
gst_log!(self.cat, obj: pad, "Passing non-keyframe buffer (stopping)");
@ -322,7 +417,7 @@ impl ToggleRecord {
.push_event(gst_video::new_upstream_force_key_unit_event().build());
}
return HandleResult::Pass;
return Ok(HandleResult::Pass(data));
}
// Remember the time when we stopped: now!
@ -345,7 +440,7 @@ impl ToggleRecord {
if state.flushing {
gst_debug!(self.cat, obj: pad, "Flushing");
return HandleResult::Flushing;
return Ok(HandleResult::Flushing);
}
let mut rec_state = self.state.lock();
@ -371,15 +466,15 @@ impl ToggleRecord {
drop(state);
element.notify("recording");
HandleResult::Drop
Ok(HandleResult::Drop)
}
RecordingState::Stopped => {
gst_log!(self.cat, obj: pad, "Dropping buffer (stopped)");
HandleResult::Drop
Ok(HandleResult::Drop)
}
RecordingState::Starting => {
// If this is no keyframe, we can directly go out again here and drop the frame
if !is_keyframe {
if !data.is_keyframe() {
gst_log!(
self.cat,
obj: pad,
@ -395,7 +490,7 @@ impl ToggleRecord {
.push_event(gst_video::new_upstream_force_key_unit_event().build());
}
return HandleResult::Drop;
return Ok(HandleResult::Drop);
}
// Remember the time when we started: now!
@ -424,7 +519,7 @@ impl ToggleRecord {
if state.flushing {
gst_debug!(self.cat, obj: pad, "Flushing");
return HandleResult::Flushing;
return Ok(HandleResult::Flushing);
}
let mut rec_state = self.state.lock();
@ -443,21 +538,48 @@ impl ToggleRecord {
drop(state);
element.notify("recording");
HandleResult::Pass
Ok(HandleResult::Pass(data))
}
}
}
fn handle_secondary_stream(
fn handle_secondary_stream<T: HandleData>(
&self,
element: &gst::Element,
pad: &gst::Pad,
stream: &Stream,
mut pts: gst::ClockTime,
duration: gst::ClockTime,
) -> HandleResult {
data: T,
) -> Result<HandleResult<T>, gst::FlowError> {
// Calculate end pts & current running time and make sure we stay in the segment
let mut state = stream.state.lock();
let mut pts = data.get_pts();
let duration = data.get_duration(&state);
if pts.is_none() {
gst_element_error!(element, gst::StreamError::Format, ["Buffer without PTS"]);
return Err(gst::FlowError::Error);
}
let dts = data.get_dts();
if dts.is_some() && pts.is_some() && dts != pts {
gst_element_error!(
element,
gst::StreamError::Format,
["DTS != PTS not supported for secondary streams"]
);
return Err(gst::FlowError::Error);
}
if !data.is_keyframe() {
gst_element_error!(
element,
gst::StreamError::Format,
["Delta-units not supported for secondary streams"]
);
return Err(gst::FlowError::Error);
}
let mut pts_end = if duration.is_some() {
pts + duration
} else {
@ -478,7 +600,7 @@ impl ToggleRecord {
state.in_segment.get_stop()
);
return HandleResult::Eos;
return Ok(HandleResult::Eos);
}
pts_end = cmp::max(state.in_segment.get_start(), pts_end);
if state.in_segment.get_stop().is_some() {
@ -525,7 +647,7 @@ impl ToggleRecord {
}
if stream.state.lock().flushing {
gst_debug!(self.cat, obj: pad, "Flushing");
return HandleResult::Flushing;
return Ok(HandleResult::Flushing);
}
let rec_state = self.state.lock();
@ -548,7 +670,7 @@ impl ToggleRecord {
current_running_time_end,
rec_state.last_recording_stop
);
return HandleResult::Eos;
return Ok(HandleResult::Eos);
} else if current_running_time < rec_state.last_recording_start {
gst_debug!(
self.cat,
@ -557,7 +679,7 @@ impl ToggleRecord {
current_running_time,
rec_state.last_recording_start
);
return HandleResult::Drop;
return Ok(HandleResult::Drop);
} else {
gst_debug!(
self.cat,
@ -567,10 +689,12 @@ impl ToggleRecord {
current_running_time,
rec_state.last_recording_stop
);
return HandleResult::Pass;
return Ok(HandleResult::Pass(data));
}
}
assert!(main_state.current_running_time >= current_running_time);
match rec_state.recording_state {
RecordingState::Recording => {
// We're properly started, must have a start position and
@ -578,7 +702,7 @@ impl ToggleRecord {
assert!(rec_state.last_recording_start.is_some());
assert!(current_running_time >= rec_state.last_recording_start);
gst_log!(self.cat, obj: pad, "Passing buffer (recording)");
HandleResult::Pass
Ok(HandleResult::Pass(data))
}
RecordingState::Stopping => {
// If we have no start position yet, the main stream is waiting for a key-frame
@ -588,7 +712,7 @@ impl ToggleRecord {
obj: pad,
"Passing buffer (stopping: waiting for keyframe)",
);
HandleResult::Pass
Ok(HandleResult::Pass(data))
} else if current_running_time_end <= rec_state.last_recording_stop {
gst_log!(
self.cat,
@ -597,7 +721,7 @@ impl ToggleRecord {
current_running_time_end,
rec_state.last_recording_stop
);
HandleResult::Pass
Ok(HandleResult::Pass(data))
} else {
gst_log!(
self.cat,
@ -606,13 +730,13 @@ impl ToggleRecord {
current_running_time_end,
rec_state.last_recording_stop
);
HandleResult::Drop
Ok(HandleResult::Drop)
}
}
RecordingState::Stopped => {
// We're properly stopped
gst_log!(self.cat, obj: pad, "Dropping buffer (stopped)");
HandleResult::Drop
Ok(HandleResult::Drop)
}
RecordingState::Starting => {
// If we have no start position yet, the main stream is waiting for a key-frame
@ -622,7 +746,7 @@ impl ToggleRecord {
obj: pad,
"Dropping buffer (starting: waiting for keyframe)",
);
HandleResult::Drop
Ok(HandleResult::Drop)
} else if current_running_time >= rec_state.last_recording_start {
gst_log!(
self.cat,
@ -631,7 +755,7 @@ impl ToggleRecord {
current_running_time,
rec_state.last_recording_start
);
HandleResult::Pass
Ok(HandleResult::Pass(data))
} else {
gst_log!(
self.cat,
@ -640,7 +764,7 @@ impl ToggleRecord {
current_running_time,
rec_state.last_recording_start
);
HandleResult::Drop
Ok(HandleResult::Drop)
}
}
}
@ -676,52 +800,12 @@ impl ToggleRecord {
}
let handle_result = if stream != self.main_stream {
let pts = buffer.get_pts();
let dts = buffer.get_dts();
if dts.is_some() && pts.is_some() && dts != pts {
gst_element_error!(
element,
gst::StreamError::Format,
["DTS != PTS not supported for secondary streams"]
);
return Err(gst::FlowError::Error);
}
pts.ok_or_else(|| {
gst_element_error!(element, gst::StreamError::Format, ["Buffer without PTS"]);
gst::FlowError::Error
})?;
if buffer.get_flags().contains(gst::BufferFlags::DELTA_UNIT) {
gst_element_error!(
element,
gst::StreamError::Format,
["Delta-units not supported for secondary streams"]
);
return Err(gst::FlowError::Error);
}
self.handle_secondary_stream(pad, &stream, pts, buffer.get_duration())
self.handle_secondary_stream(element, pad, &stream, buffer)
} else {
let dts_or_pts = buffer.get_dts_or_pts();
if !dts_or_pts.is_some() {
gst_element_error!(
element,
gst::StreamError::Format,
["Buffer without DTS or PTS"]
);
return Err(gst::FlowError::Error);
}
self.handle_main_stream(element, pad, &stream, buffer)
}?;
self.handle_main_stream(
element,
pad,
&stream,
!buffer.get_flags().contains(gst::BufferFlags::DELTA_UNIT),
dts_or_pts,
buffer.get_duration(),
)
};
match handle_result {
let buffer = match handle_result {
HandleResult::Drop => {
return Ok(gst::FlowSuccess::Ok);
}
@ -736,10 +820,11 @@ impl ToggleRecord {
);
return Err(gst::FlowError::Eos);
}
HandleResult::Pass => {
HandleResult::Pass(buffer) => {
// Pass through and actually push the buffer
buffer
}
}
};
let out_running_time = {
let mut state = stream.state.lock();
@ -799,7 +884,7 @@ impl ToggleRecord {
stream.srcpad.push(buffer)
}
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool {
use gst::EventView;
let stream = match self.pads.lock().get(pad) {
@ -839,6 +924,21 @@ impl ToggleRecord {
state.segment_pending = false;
state.current_running_time = gst::CLOCK_TIME_NONE;
}
EventView::Caps(c) => {
let mut state = stream.state.lock();
let caps = c.get_caps();
let s = caps.get_structure(0).unwrap();
if s.get_name().starts_with("audio/") {
state.audio_info = gst_audio::AudioInfo::from_caps(caps);
state.video_info = None;
} else if s.get_name().starts_with("video/") {
state.audio_info = None;
state.video_info = gst_video::VideoInfo::from_caps(caps);
} else {
state.audio_info = None;
state.video_info = None;
}
}
EventView::Segment(e) => {
let mut state = stream.state.lock();
@ -882,12 +982,21 @@ impl ToggleRecord {
gst_debug!(self.cat, obj: pad, "Handling Gap event {:?}", event);
let (pts, duration) = e.get();
let handle_result = if stream == self.main_stream {
self.handle_main_stream(element, pad, &stream, false, pts, duration)
self.handle_main_stream(element, pad, &stream, (pts, duration))
} else {
self.handle_secondary_stream(pad, &stream, pts, duration)
self.handle_secondary_stream(element, pad, &stream, (pts, duration))
};
forward = handle_result == HandleResult::Pass;
forward = match handle_result {
Ok(HandleResult::Pass((new_pts, new_duration))) if new_pts.is_some() => {
if new_pts != pts || new_duration != duration {
event = gst::Event::new_gap(new_pts, new_duration).build();
}
true
}
Ok(_) => false,
Err(_) => false,
};
}
EventView::Eos(..) => {
let _main_state = if stream != self.main_stream {