transcriberbin: relink transcriber more thoroughly

* Remove old transcriber from correct bin
* Remove old unsynced ghost pads and expose new ones
* Fix potential deadlock where State was locked after PadState

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1925>
This commit is contained in:
Mathieu Duponchelle 2024-11-14 19:34:22 +01:00
parent 849ae7c845
commit 1fc1ed2539

View file

@ -139,7 +139,7 @@ pub struct TranscriberBin {
} }
impl TranscriberBin { impl TranscriberBin {
fn construct_channel_bin( fn construct_transcription_channel(
&self, &self,
lang: &str, lang: &str,
mux_method: MuxMethod, mux_method: MuxMethod,
@ -794,7 +794,7 @@ impl TranscriberBin {
* be called in READY */ * be called in READY */
fn relink_transcriber( fn relink_transcriber(
&self, &self,
state: &mut State, state: &State,
pad_state: &TranscriberSinkPadState, pad_state: &TranscriberSinkPadState,
old_transcriber: Option<&gst::Element>, old_transcriber: Option<&gst::Element>,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -814,9 +814,9 @@ impl TranscriberBin {
); );
pad_state.transcriber_aconv.unlink(old_transcriber); pad_state.transcriber_aconv.unlink(old_transcriber);
for channel in pad_state.transcription_channels.values() { for channel in pad_state.transcription_channels.values() {
old_transcriber.unlink(&channel.bin); pad_state.unlink_transcriber_pads(self.obj().as_ref(), state, channel);
} }
let _ = state.transcription_bin.remove(old_transcriber); let _ = pad_state.transcription_bin.remove(old_transcriber);
old_transcriber.set_state(gst::State::Null).unwrap(); old_transcriber.set_state(gst::State::Null).unwrap();
} }
@ -827,7 +827,8 @@ impl TranscriberBin {
pad_state.transcriber_aconv.link(transcriber)?; pad_state.transcriber_aconv.link(transcriber)?;
for channel in pad_state.transcription_channels.values() { for channel in pad_state.transcription_channels.values() {
channel.link_transcriber(transcriber)?; let srcpad_name = channel.link_transcriber(transcriber)?;
pad_state.expose_unsynced_pads(self.obj().as_ref(), state, &srcpad_name)?;
} }
} }
@ -883,7 +884,11 @@ impl TranscriberBin {
transcription_channels.insert( transcription_channels.insert(
language_code.to_owned(), language_code.to_owned(),
self.construct_channel_bin(&language_code, mux_method, caption_streams)?, self.construct_transcription_channel(
&language_code,
mux_method,
caption_streams,
)?,
); );
} }
} else { } else {
@ -893,7 +898,7 @@ impl TranscriberBin {
}; };
transcription_channels.insert( transcription_channels.insert(
"transcript".to_string(), "transcript".to_string(),
self.construct_channel_bin("transcript", mux_method, caption_streams)?, self.construct_transcription_channel("transcript", mux_method, caption_streams)?,
); );
} }
Ok(()) Ok(())
@ -966,36 +971,7 @@ impl TranscriberBin {
} }
for channel in pad_state.transcription_channels.values() { for channel in pad_state.transcription_channels.values() {
let sinkpad = channel.bin.static_pad("sink").unwrap(); pad_state.unlink_transcriber_pads(self.obj().as_ref(), state, channel);
if let Some(peer) = sinkpad.peer() {
peer.unlink(&sinkpad)?;
if let Some(ref transcriber) = pad_state.transcriber {
if channel.language != "transcript" {
transcriber.release_request_pad(&peer);
}
let mut unsynced_pad_name = format!("unsynced_{}", peer.name().as_str());
if transcriber.static_pad(&unsynced_pad_name).is_some() {
let srcpad = pad_state
.transcription_bin
.static_pad(&unsynced_pad_name)
.unwrap();
let _ = pad_state.transcription_bin.remove_pad(&srcpad);
if let Some(serial) = pad_state.serial {
unsynced_pad_name = format!("{}_{}", unsynced_pad_name, serial);
}
let srcpad = state
.transcription_bin
.static_pad(&unsynced_pad_name)
.unwrap();
let _ = state.transcription_bin.remove_pad(&srcpad);
let srcpad = state.internal_bin.static_pad(&unsynced_pad_name).unwrap();
let _ = state.internal_bin.remove_pad(&srcpad);
let srcpad = self.obj().static_pad(&unsynced_pad_name).unwrap();
let _ = self.obj().remove_pad(&srcpad);
}
}
}
let srcpad = pad_state let srcpad = pad_state
.transcription_bin .transcription_bin
@ -1950,12 +1926,60 @@ impl TranscriberSinkPadState {
}) })
} }
fn link_transcriber_pads( fn remove_unsynced_pads(
&self, &self,
elem: &super::TranscriberBin, topbin: &super::TranscriberBin,
srcpad_name: &str,
channel: &TranscriptionChannel,
state: &State, state: &State,
srcpad_name: &str,
) {
let Some(ref transcriber) = self.transcriber else {
return;
};
let mut unsynced_pad_name = format!("unsynced_{}", srcpad_name);
if transcriber.static_pad(&unsynced_pad_name).is_some() {
let srcpad = self
.transcription_bin
.static_pad(&unsynced_pad_name)
.unwrap();
let _ = self.transcription_bin.remove_pad(&srcpad);
if let Some(serial) = self.serial {
unsynced_pad_name = format!("{}_{}", unsynced_pad_name, serial);
}
let srcpad = state
.transcription_bin
.static_pad(&unsynced_pad_name)
.unwrap();
let _ = state.transcription_bin.remove_pad(&srcpad);
let srcpad = state.internal_bin.static_pad(&unsynced_pad_name).unwrap();
let _ = state.internal_bin.remove_pad(&srcpad);
let srcpad = topbin.static_pad(&unsynced_pad_name).unwrap();
let _ = topbin.remove_pad(&srcpad);
}
}
fn unlink_transcriber_pads(
&self,
topbin: &super::TranscriberBin,
state: &State,
channel: &TranscriptionChannel,
) {
let sinkpad = channel.bin.static_pad("sink").unwrap();
let Some(srcpad) = sinkpad.peer() else {
return;
};
srcpad.unlink(&sinkpad).unwrap();
self.remove_unsynced_pads(topbin, state, srcpad.name().as_str());
}
fn expose_unsynced_pads(
&self,
topbin: &super::TranscriberBin,
state: &State,
srcpad_name: &str,
) -> Result<(), Error> { ) -> Result<(), Error> {
let Some(ref transcriber) = self.transcriber else { let Some(ref transcriber) = self.transcriber else {
return Ok(()); return Ok(());
@ -1984,9 +2008,21 @@ impl TranscriberSinkPadState {
state.internal_bin.add_pad(&srcpad)?; state.internal_bin.add_pad(&srcpad)?;
let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); let srcpad = gst::GhostPad::with_target(&srcpad).unwrap();
elem.add_pad(&srcpad)?; topbin.add_pad(&srcpad)?;
} }
Ok(())
}
fn link_transcriber_pads(
&self,
topbin: &super::TranscriberBin,
srcpad_name: &str,
channel: &TranscriptionChannel,
state: &State,
) -> Result<(), Error> {
self.expose_unsynced_pads(topbin, state, srcpad_name)?;
let srcpad = gst::GhostPad::builder_with_target(&channel.bin.static_pad("src").unwrap()) let srcpad = gst::GhostPad::builder_with_target(&channel.bin.static_pad("src").unwrap())
.unwrap() .unwrap()
.name(format!("src_{}", channel.language)) .name(format!("src_{}", channel.language))
@ -2159,6 +2195,8 @@ impl ObjectImpl for TranscriberSinkPad {
} }
} }
"transcriber" => { "transcriber" => {
if let Some(this) = self.obj().parent().and_downcast::<super::TranscriberBin>() {
let mut s = this.imp().state.lock().unwrap();
let mut ps = self.state.lock().unwrap(); let mut ps = self.state.lock().unwrap();
let Ok(pad_state) = ps.as_mut() else { let Ok(pad_state) = ps.as_mut() else {
return; return;
@ -2168,8 +2206,6 @@ impl ObjectImpl for TranscriberSinkPad {
value.get().expect("type checked upstream"); value.get().expect("type checked upstream");
pad_state.transcriber.clone_from(&new_transcriber); pad_state.transcriber.clone_from(&new_transcriber);
if let Some(this) = self.obj().parent().and_downcast::<super::TranscriberBin>() {
let mut s = this.imp().state.lock().unwrap();
if old_transcriber != new_transcriber { if old_transcriber != new_transcriber {
if let Some(ref mut state) = s.as_mut() { if let Some(ref mut state) = s.as_mut() {
match this.imp().relink_transcriber( match this.imp().relink_transcriber(