Switch to parking_lot Mutex/Condvar for lower overhead

This commit is contained in:
Sebastian Dröge 2018-11-01 12:45:57 +02:00
parent 25501233ec
commit ee3fc37f4c
3 changed files with 61 additions and 59 deletions

View file

@ -12,6 +12,7 @@ gobject-subclass = { git = "https://github.com/gtk-rs/gobject-subclass" }
gst-plugin = { path = "../gst-plugin" }
gtk = { git = "https://github.com/gtk-rs/gtk", features = ["v3_6"], optional = true }
gio = { git = "https://github.com/gtk-rs/gio", optional = true }
parking_lot = "0.6"
[dev-dependencies]
either = "1.0"

View file

@ -25,6 +25,8 @@ extern crate gst_plugin;
extern crate gstreamer as gst;
extern crate gstreamer_video as gst_video;
extern crate parking_lot;
mod togglerecord;
fn plugin_init(plugin: &gst::Plugin) -> bool {

View file

@ -24,11 +24,12 @@ use gst_video;
use gobject_subclass::object::*;
use gst_plugin::element::*;
use parking_lot::{Condvar, Mutex};
use std::cmp;
use std::collections::HashMap;
use std::f64;
use std::iter;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::Arc;
const DEFAULT_RECORD: bool = false;
@ -320,7 +321,7 @@ impl ToggleRecord {
mut dts_or_pts: gst::ClockTime,
duration: gst::ClockTime,
) -> HandleResult {
let mut state = stream.state.lock().unwrap();
let mut state = stream.state.lock();
let mut dts_or_pts_end = if duration.is_some() {
dts_or_pts + duration
@ -357,10 +358,10 @@ impl ToggleRecord {
dts_or_pts_end
);
let settings = *self.settings.lock().unwrap();
let settings = *self.settings.lock();
// First check if we have to update our recording state
let mut rec_state = self.state.lock().unwrap();
let mut rec_state = self.state.lock();
let settings_changed = match rec_state.recording_state {
RecordingState::Recording if !settings.record => {
gst_debug!(self.cat, obj: pad, "Stopping recording");
@ -408,14 +409,14 @@ impl ToggleRecord {
// it or go EOS instead.
drop(rec_state);
while !self.other_streams.lock().unwrap().0.iter().all(|s| {
let s = s.state.lock().unwrap();
while !self.other_streams.lock().0.iter().all(|s| {
let s = s.state.lock();
s.eos
|| (s.current_running_time.is_some()
&& s.current_running_time >= current_running_time)
}) {
gst_log!(self.cat, obj: pad, "Waiting for other streams to stop");
state = self.main_stream_cond.wait(state).unwrap();
self.main_stream_cond.wait(&mut state);
}
if state.flushing {
@ -423,7 +424,7 @@ impl ToggleRecord {
return HandleResult::Flushing;
}
let mut rec_state = self.state.lock().unwrap();
let mut rec_state = self.state.lock();
rec_state.recording_state = RecordingState::Stopped;
rec_state.recording_duration +=
rec_state.last_recording_stop - rec_state.last_recording_start;
@ -479,22 +480,22 @@ impl ToggleRecord {
gst_debug!(self.cat, obj: pad, "Starting at {}", current_running_time);
state.segment_pending = true;
for other_stream in &self.other_streams.lock().unwrap().0 {
other_stream.state.lock().unwrap().segment_pending = true;
for other_stream in &self.other_streams.lock().0 {
other_stream.state.lock().segment_pending = true;
}
// Then unlock and wait for all other streams to reach
// it or go EOS instead
drop(rec_state);
while !self.other_streams.lock().unwrap().0.iter().all(|s| {
let s = s.state.lock().unwrap();
while !self.other_streams.lock().0.iter().all(|s| {
let s = s.state.lock();
s.eos
|| (s.current_running_time.is_some()
&& s.current_running_time >= current_running_time)
}) {
gst_log!(self.cat, obj: pad, "Waiting for other streams to start");
state = self.main_stream_cond.wait(state).unwrap();
self.main_stream_cond.wait(&mut state);
}
if state.flushing {
@ -502,7 +503,7 @@ impl ToggleRecord {
return HandleResult::Flushing;
}
let mut rec_state = self.state.lock().unwrap();
let mut rec_state = self.state.lock();
rec_state.recording_state = RecordingState::Recording;
gst_debug!(
self.cat,
@ -531,7 +532,7 @@ impl ToggleRecord {
duration: gst::ClockTime,
) -> HandleResult {
// Calculate end pts & current running time and make sure we stay in the segment
let mut state = stream.state.lock().unwrap();
let mut state = stream.state.lock();
let mut pts_end = if duration.is_some() {
pts + duration
@ -577,7 +578,7 @@ impl ToggleRecord {
drop(state);
let mut main_state = self.main_stream.state.lock().unwrap();
let mut main_state = self.main_stream.state.lock();
// Wake up, in case the main stream is waiting for us to progress up to here. We progressed
// above but all notifying must happen while the main_stream state is locked as per above.
@ -586,7 +587,7 @@ impl ToggleRecord {
while (main_state.current_running_time == gst::CLOCK_TIME_NONE
|| main_state.current_running_time < current_running_time)
&& !main_state.eos
&& !stream.state.lock().unwrap().flushing
&& !stream.state.lock().flushing
{
gst_log!(
self.cat,
@ -596,14 +597,14 @@ impl ToggleRecord {
main_state.current_running_time
);
main_state = self.main_stream_cond.wait(main_state).unwrap();
self.main_stream_cond.wait(&mut main_state);
}
if stream.state.lock().unwrap().flushing {
if stream.state.lock().flushing {
gst_debug!(self.cat, obj: pad, "Flushing");
return HandleResult::Flushing;
}
let rec_state = self.state.lock().unwrap();
let rec_state = self.state.lock();
// If the main stream is EOS, we are also EOS unless we are
// before the final last recording stop running time
@ -727,7 +728,7 @@ impl ToggleRecord {
element: &Element,
buffer: gst::Buffer,
) -> gst::FlowReturn {
let stream = match self.pads.lock().unwrap().get(pad) {
let stream = match self.pads.lock().get(pad) {
None => {
gst_element_error!(
element,
@ -742,7 +743,7 @@ impl ToggleRecord {
gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer);
{
let state = stream.state.lock().unwrap();
let state = stream.state.lock();
if state.eos {
return gst::FlowReturn::Eos;
}
@ -804,7 +805,7 @@ impl ToggleRecord {
HandleResult::Eos => {
stream.srcpad.push_event(
gst::Event::new_eos()
.seqnum(stream.state.lock().unwrap().segment_seqnum)
.seqnum(stream.state.lock().segment_seqnum)
.build(),
);
return gst::FlowReturn::Eos;
@ -815,11 +816,11 @@ impl ToggleRecord {
}
let out_running_time = {
let mut state = stream.state.lock().unwrap();
let mut state = stream.state.lock();
let mut events = Vec::with_capacity(state.pending_events.len() + 1);
if state.segment_pending {
let rec_state = self.state.lock().unwrap();
let rec_state = self.state.lock();
// Adjust so that last_recording_start has running time of
// recording_duration
@ -873,7 +874,7 @@ impl ToggleRecord {
fn sink_event(&self, pad: &gst::Pad, element: &Element, event: gst::Event) -> bool {
use gst::EventView;
let stream = match self.pads.lock().unwrap().get(pad) {
let stream = match self.pads.lock().get(pad) {
None => {
gst_element_error!(
element,
@ -893,17 +894,17 @@ impl ToggleRecord {
match event.view() {
EventView::FlushStart(..) => {
let _main_state = if stream != self.main_stream {
Some(self.main_stream.state.lock().unwrap())
Some(self.main_stream.state.lock())
} else {
None
};
let mut state = stream.state.lock().unwrap();
let mut state = stream.state.lock();
state.flushing = true;
self.main_stream_cond.notify_all();
}
EventView::FlushStop(..) => {
let mut state = stream.state.lock().unwrap();
let mut state = stream.state.lock();
state.eos = false;
state.flushing = false;
@ -911,7 +912,7 @@ impl ToggleRecord {
state.current_running_time = gst::CLOCK_TIME_NONE;
}
EventView::Segment(e) => {
let mut state = stream.state.lock().unwrap();
let mut state = stream.state.lock();
let segment = match e.get_segment().clone().downcast::<gst::ClockTime>() {
Err(segment) => {
@ -962,11 +963,11 @@ impl ToggleRecord {
}
EventView::Eos(..) => {
let _main_state = if stream != self.main_stream {
Some(self.main_stream.state.lock().unwrap())
Some(self.main_stream.state.lock())
} else {
None
};
let mut state = stream.state.lock().unwrap();
let mut state = stream.state.lock();
state.eos = true;
self.main_stream_cond.notify_all();
@ -989,7 +990,7 @@ impl ToggleRecord {
&& type_.is_serialized()
&& type_.partial_cmp(&gst::EventType::Segment) == Some(cmp::Ordering::Greater)
{
let mut state = stream.state.lock().unwrap();
let mut state = stream.state.lock();
if state.segment_pending {
gst_log!(self.cat, obj: pad, "Storing event for later pushing");
state.pending_events.push(event);
@ -998,7 +999,7 @@ impl ToggleRecord {
}
if send_pending {
let mut state = stream.state.lock().unwrap();
let mut state = stream.state.lock();
let mut events = Vec::with_capacity(state.pending_events.len() + 1);
// Got not a single buffer on this stream before EOS, forward
@ -1028,7 +1029,7 @@ impl ToggleRecord {
}
fn sink_query(&self, pad: &gst::Pad, element: &Element, query: &mut gst::QueryRef) -> bool {
let stream = match self.pads.lock().unwrap().get(pad) {
let stream = match self.pads.lock().get(pad) {
None => {
gst_element_error!(
element,
@ -1048,7 +1049,7 @@ impl ToggleRecord {
fn src_event(&self, pad: &gst::Pad, element: &Element, mut event: gst::Event) -> bool {
use gst::EventView;
let stream = match self.pads.lock().unwrap().get(pad) {
let stream = match self.pads.lock().get(pad) {
None => {
gst_element_error!(
element,
@ -1070,7 +1071,7 @@ impl ToggleRecord {
_ => (),
}
let rec_state = self.state.lock().unwrap();
let rec_state = self.state.lock();
let running_time_offset = rec_state.running_time_offset.unwrap_or(0) as i64;
let offset = event.get_running_time_offset();
event
@ -1090,7 +1091,7 @@ impl ToggleRecord {
fn src_query(&self, pad: &gst::Pad, element: &Element, query: &mut gst::QueryRef) -> bool {
use gst::QueryView;
let stream = match self.pads.lock().unwrap().get(pad) {
let stream = match self.pads.lock().get(pad) {
None => {
gst_element_error!(
element,
@ -1141,8 +1142,8 @@ impl ToggleRecord {
// Position and duration is always the current recording position
QueryView::Position(ref mut q) => {
if q.get_format() == gst::Format::Time {
let state = stream.state.lock().unwrap();
let rec_state = self.state.lock().unwrap();
let state = stream.state.lock();
let rec_state = self.state.lock();
let mut recording_duration = rec_state.recording_duration;
if rec_state.recording_state == RecordingState::Recording
|| rec_state.recording_state == RecordingState::Stopping
@ -1158,8 +1159,8 @@ impl ToggleRecord {
}
QueryView::Duration(ref mut q) => {
if q.get_format() == gst::Format::Time {
let state = stream.state.lock().unwrap();
let rec_state = self.state.lock().unwrap();
let state = stream.state.lock();
let rec_state = self.state.lock();
let mut recording_duration = rec_state.recording_duration;
if rec_state.recording_state == RecordingState::Recording
|| rec_state.recording_state == RecordingState::Stopping
@ -1181,7 +1182,7 @@ impl ToggleRecord {
}
fn iterate_internal_links(&self, pad: &gst::Pad, element: &Element) -> gst::Iterator<gst::Pad> {
let stream = match self.pads.lock().unwrap().get(pad) {
let stream = match self.pads.lock().get(pad) {
None => {
gst_element_error!(
element,
@ -1208,7 +1209,7 @@ impl ObjectImpl<Element> for ToggleRecord {
match *prop {
Property::Boolean("record", ..) => {
let mut settings = self.settings.lock().unwrap();
let mut settings = self.settings.lock();
let record = value.get().unwrap();
gst_debug!(
self.cat,
@ -1228,11 +1229,11 @@ impl ObjectImpl<Element> for ToggleRecord {
match *prop {
Property::Boolean("record", ..) => {
let settings = self.settings.lock().unwrap();
let settings = self.settings.lock();
Ok(settings.record.to_value())
}
Property::Boolean("recording", ..) => {
let rec_state = self.state.lock().unwrap();
let rec_state = self.state.lock();
Ok((rec_state.recording_state == RecordingState::Recording).to_value())
}
_ => unimplemented!(),
@ -1253,25 +1254,24 @@ impl ElementImpl<Element> for ToggleRecord {
for s in self
.other_streams
.lock()
.unwrap()
.0
.iter()
.chain(iter::once(&self.main_stream))
{
let mut state = s.state.lock().unwrap();
let mut state = s.state.lock();
*state = StreamState::default();
}
let mut rec_state = self.state.lock().unwrap();
let mut rec_state = self.state.lock();
*rec_state = State::default();
}
gst::StateChange::PausedToReady => {
for s in &self.other_streams.lock().unwrap().0 {
let mut state = s.state.lock().unwrap();
for s in &self.other_streams.lock().0 {
let mut state = s.state.lock();
state.flushing = true;
}
let mut state = self.main_stream.state.lock().unwrap();
let mut state = self.main_stream.state.lock();
state.flushing = true;
self.main_stream_cond.notify_all();
}
@ -1288,17 +1288,16 @@ impl ElementImpl<Element> for ToggleRecord {
for s in self
.other_streams
.lock()
.unwrap()
.0
.iter()
.chain(iter::once(&self.main_stream))
{
let mut state = s.state.lock().unwrap();
let mut state = s.state.lock();
state.pending_events.clear();
}
let mut rec_state = self.state.lock().unwrap();
let mut rec_state = self.state.lock();
*rec_state = State::default();
drop(rec_state);
element.notify("recording");
@ -1316,9 +1315,9 @@ impl ElementImpl<Element> for ToggleRecord {
_name: Option<String>,
_caps: Option<&gst::CapsRef>,
) -> Option<gst::Pad> {
let mut other_streams = self.other_streams.lock().unwrap();
let mut other_streams = self.other_streams.lock();
let (ref mut other_streams, ref mut pad_count) = *other_streams;
let mut pads = self.pads.lock().unwrap();
let mut pads = self.pads.lock();
let id = *pad_count;
*pad_count += 1;
@ -1348,9 +1347,9 @@ impl ElementImpl<Element> for ToggleRecord {
}
fn release_pad(&self, element: &Element, pad: &gst::Pad) {
let mut other_streams = self.other_streams.lock().unwrap();
let mut other_streams = self.other_streams.lock();
let (ref mut other_streams, _) = *other_streams;
let mut pads = self.pads.lock().unwrap();
let mut pads = self.pads.lock();
let stream = match pads.get(pad) {
None => return,