From 1fc1ed2539de484d26decf6122c5d5393b7804c2 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Thu, 14 Nov 2024 19:34:22 +0100 Subject: [PATCH] transcriberbin: relink transcriber more thoroughly * Remove old transcriber from correct bin * Remove old unsynced ghost pads and expose new ones * Fix potential deadlock where State was locked after PadState Part-of: --- video/closedcaption/src/transcriberbin/imp.rs | 138 +++++++++++------- 1 file changed, 87 insertions(+), 51 deletions(-) diff --git a/video/closedcaption/src/transcriberbin/imp.rs b/video/closedcaption/src/transcriberbin/imp.rs index 211bd778..80a83736 100644 --- a/video/closedcaption/src/transcriberbin/imp.rs +++ b/video/closedcaption/src/transcriberbin/imp.rs @@ -139,7 +139,7 @@ pub struct TranscriberBin { } impl TranscriberBin { - fn construct_channel_bin( + fn construct_transcription_channel( &self, lang: &str, mux_method: MuxMethod, @@ -794,7 +794,7 @@ impl TranscriberBin { * be called in READY */ fn relink_transcriber( &self, - state: &mut State, + state: &State, pad_state: &TranscriberSinkPadState, old_transcriber: Option<&gst::Element>, ) -> Result<(), Error> { @@ -814,9 +814,9 @@ impl TranscriberBin { ); pad_state.transcriber_aconv.unlink(old_transcriber); for channel in pad_state.transcription_channels.values() { - old_transcriber.unlink(&channel.bin); + pad_state.unlink_transcriber_pads(self.obj().as_ref(), state, channel); } - let _ = state.transcription_bin.remove(old_transcriber); + let _ = pad_state.transcription_bin.remove(old_transcriber); old_transcriber.set_state(gst::State::Null).unwrap(); } @@ -827,7 +827,8 @@ impl TranscriberBin { pad_state.transcriber_aconv.link(transcriber)?; for channel in pad_state.transcription_channels.values() { - channel.link_transcriber(transcriber)?; + let srcpad_name = channel.link_transcriber(transcriber)?; + pad_state.expose_unsynced_pads(self.obj().as_ref(), state, &srcpad_name)?; } } @@ -883,7 +884,11 @@ impl TranscriberBin { transcription_channels.insert( language_code.to_owned(), - self.construct_channel_bin(&language_code, mux_method, caption_streams)?, + self.construct_transcription_channel( + &language_code, + mux_method, + caption_streams, + )?, ); } } else { @@ -893,7 +898,7 @@ impl TranscriberBin { }; transcription_channels.insert( "transcript".to_string(), - self.construct_channel_bin("transcript", mux_method, caption_streams)?, + self.construct_transcription_channel("transcript", mux_method, caption_streams)?, ); } Ok(()) @@ -966,36 +971,7 @@ impl TranscriberBin { } for channel in pad_state.transcription_channels.values() { - let sinkpad = channel.bin.static_pad("sink").unwrap(); - if let Some(peer) = sinkpad.peer() { - peer.unlink(&sinkpad)?; - if let Some(ref transcriber) = pad_state.transcriber { - if channel.language != "transcript" { - transcriber.release_request_pad(&peer); - } - let mut unsynced_pad_name = format!("unsynced_{}", peer.name().as_str()); - if transcriber.static_pad(&unsynced_pad_name).is_some() { - let srcpad = pad_state - .transcription_bin - .static_pad(&unsynced_pad_name) - .unwrap(); - let _ = pad_state.transcription_bin.remove_pad(&srcpad); - if let Some(serial) = pad_state.serial { - unsynced_pad_name = format!("{}_{}", unsynced_pad_name, serial); - } - let srcpad = state - .transcription_bin - .static_pad(&unsynced_pad_name) - .unwrap(); - let _ = state.transcription_bin.remove_pad(&srcpad); - - let srcpad = state.internal_bin.static_pad(&unsynced_pad_name).unwrap(); - let _ = state.internal_bin.remove_pad(&srcpad); - let srcpad = self.obj().static_pad(&unsynced_pad_name).unwrap(); - let _ = self.obj().remove_pad(&srcpad); - } - } - } + pad_state.unlink_transcriber_pads(self.obj().as_ref(), state, channel); let srcpad = pad_state .transcription_bin @@ -1950,12 +1926,60 @@ impl TranscriberSinkPadState { }) } - fn link_transcriber_pads( + fn remove_unsynced_pads( &self, - elem: &super::TranscriberBin, - srcpad_name: &str, - channel: &TranscriptionChannel, + topbin: &super::TranscriberBin, state: &State, + srcpad_name: &str, + ) { + let Some(ref transcriber) = self.transcriber else { + return; + }; + + let mut unsynced_pad_name = format!("unsynced_{}", srcpad_name); + + if transcriber.static_pad(&unsynced_pad_name).is_some() { + let srcpad = self + .transcription_bin + .static_pad(&unsynced_pad_name) + .unwrap(); + let _ = self.transcription_bin.remove_pad(&srcpad); + if let Some(serial) = self.serial { + unsynced_pad_name = format!("{}_{}", unsynced_pad_name, serial); + } + let srcpad = state + .transcription_bin + .static_pad(&unsynced_pad_name) + .unwrap(); + let _ = state.transcription_bin.remove_pad(&srcpad); + + let srcpad = state.internal_bin.static_pad(&unsynced_pad_name).unwrap(); + let _ = state.internal_bin.remove_pad(&srcpad); + let srcpad = topbin.static_pad(&unsynced_pad_name).unwrap(); + let _ = topbin.remove_pad(&srcpad); + } + } + + fn unlink_transcriber_pads( + &self, + topbin: &super::TranscriberBin, + state: &State, + channel: &TranscriptionChannel, + ) { + let sinkpad = channel.bin.static_pad("sink").unwrap(); + let Some(srcpad) = sinkpad.peer() else { + return; + }; + srcpad.unlink(&sinkpad).unwrap(); + + self.remove_unsynced_pads(topbin, state, srcpad.name().as_str()); + } + + fn expose_unsynced_pads( + &self, + topbin: &super::TranscriberBin, + state: &State, + srcpad_name: &str, ) -> Result<(), Error> { let Some(ref transcriber) = self.transcriber else { return Ok(()); @@ -1984,9 +2008,21 @@ impl TranscriberSinkPadState { state.internal_bin.add_pad(&srcpad)?; let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); - elem.add_pad(&srcpad)?; + topbin.add_pad(&srcpad)?; } + Ok(()) + } + + fn link_transcriber_pads( + &self, + topbin: &super::TranscriberBin, + srcpad_name: &str, + channel: &TranscriptionChannel, + state: &State, + ) -> Result<(), Error> { + self.expose_unsynced_pads(topbin, state, srcpad_name)?; + let srcpad = gst::GhostPad::builder_with_target(&channel.bin.static_pad("src").unwrap()) .unwrap() .name(format!("src_{}", channel.language)) @@ -2159,17 +2195,17 @@ impl ObjectImpl for TranscriberSinkPad { } } "transcriber" => { - let mut ps = self.state.lock().unwrap(); - let Ok(pad_state) = ps.as_mut() else { - return; - }; - let old_transcriber = pad_state.transcriber.clone(); - let new_transcriber: Option = - value.get().expect("type checked upstream"); - pad_state.transcriber.clone_from(&new_transcriber); - if let Some(this) = self.obj().parent().and_downcast::() { let mut s = this.imp().state.lock().unwrap(); + let mut ps = self.state.lock().unwrap(); + let Ok(pad_state) = ps.as_mut() else { + return; + }; + let old_transcriber = pad_state.transcriber.clone(); + let new_transcriber: Option = + value.get().expect("type checked upstream"); + pad_state.transcriber.clone_from(&new_transcriber); + if old_transcriber != new_transcriber { if let Some(ref mut state) = s.as_mut() { match this.imp().relink_transcriber(