rusoto/aws_transcriber: Don't hold mutex across await points

This mutex is actually only ever used from a single thread, so use
AtomicRefCell instead. It provides the guarantees of a mutex but panics
instead of blocking.
This commit is contained in:
Sebastian Dröge 2020-12-29 17:28:19 +02:00
parent 005c62425b
commit 640d8ef904
2 changed files with 8 additions and 5 deletions

View file

@ -30,6 +30,7 @@ once_cell = "1.0"
serde = "1" serde = "1"
serde_derive = "1" serde_derive = "1"
serde_json = "1" serde_json = "1"
atomic_refcell = "0.1"
[lib] [lib]
name = "gstrusoto" name = "gstrusoto"

View file

@ -43,6 +43,8 @@ use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use atomic_refcell::AtomicRefCell;
use super::packet::*; use super::packet::*;
use serde_derive::Deserialize; use serde_derive::Deserialize;
@ -200,14 +202,14 @@ impl Default for State {
} }
} }
type WsSink = Pin<Box<dyn Sink<Message, Error = WsError> + Send>>; type WsSink = Pin<Box<dyn Sink<Message, Error = WsError> + Send + Sync>>;
pub struct Transcriber { pub struct Transcriber {
srcpad: gst::Pad, srcpad: gst::Pad,
sinkpad: gst::Pad, sinkpad: gst::Pad,
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
ws_sink: Mutex<Option<WsSink>>, ws_sink: AtomicRefCell<Option<WsSink>>,
} }
fn build_packet(payload: &[u8]) -> Vec<u8> { fn build_packet(payload: &[u8]) -> Vec<u8> {
@ -801,7 +803,7 @@ impl Transcriber {
tokio::time::delay_for(Duration::from_nanos(delay.nseconds().unwrap())).await; tokio::time::delay_for(Duration::from_nanos(delay.nseconds().unwrap())).await;
} }
if let Some(ws_sink) = self.ws_sink.lock().unwrap().as_mut() { if let Some(ws_sink) = self.ws_sink.borrow_mut().as_mut() {
if let Some(buffer) = buffer { if let Some(buffer) = buffer {
let data = buffer.map_readable().unwrap(); let data = buffer.map_readable().unwrap();
for chunk in data.chunks(8192) { for chunk in data.chunks(8192) {
@ -916,7 +918,7 @@ impl Transcriber {
let (ws_sink, mut ws_stream) = ws.split(); let (ws_sink, mut ws_stream) = ws.split();
*self.ws_sink.lock().unwrap() = Some(Box::pin(ws_sink)); *self.ws_sink.borrow_mut() = Some(Box::pin(ws_sink));
let element_weak = element.downgrade(); let element_weak = element.downgrade();
let future = async move { let future = async move {
@ -1047,7 +1049,7 @@ impl ObjectSubclass for Transcriber {
sinkpad, sinkpad,
settings, settings,
state: Mutex::new(State::default()), state: Mutex::new(State::default()),
ws_sink: Mutex::new(None), ws_sink: AtomicRefCell::new(None),
} }
} }