diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 982e0d02..990a8f01 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -643,19 +643,19 @@ "caps": "text/x-raw:\n format: utf8\n", "direction": "src", "presence": "always", - "type": "GstTranslationSrcPad" + "type": "GstTranslateSrcPad" }, "src_%%u": { "caps": "text/x-raw:\n format: utf8\n", "direction": "src", "presence": "request", - "type": "GstTranslationSrcPad" + "type": "GstTranslateSrcPad" }, - "translation_src_%%u": { + "translate_src_%%u": { "caps": "text/x-raw:\n format: utf8\n", "direction": "src", "presence": "request", - "type": "GstTranslationSrcPad" + "type": "GstTranslateSrcPad" } }, "properties": { @@ -773,20 +773,6 @@ "type": "guint", "writable": true }, - "transcript-lookahead": { - "blurb": "Maximum duration in milliseconds of transcript to lookahead before sending to translation when no separator was encountered", - "conditionally-available": false, - "construct": false, - "construct-only": false, - "controllable": false, - "default": "5000", - "max": "-1", - "min": "0", - "mutable": "ready", - "readable": true, - "type": "guint", - "writable": true - }, "translate-latency": { "blurb": "Amount of milliseconds to allow AWS translate (ignored if the input and output languages are the same)", "conditionally-available": false, @@ -801,6 +787,20 @@ "type": "guint", "writable": true }, + "translate-lookahead": { + "blurb": "Maximum duration in milliseconds of transcript to lookahead before sending to translation when no separator was encountered", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "5000", + "max": "-1", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + }, "vocabulary-filter-method": { "blurb": "Defines how filtered words will be edited, has no effect when vocabulary-filter-name isn't set", "conditionally-available": false, @@ -919,9 +919,9 @@ } ] }, - "GstTranslationSrcPad": { + "GstTranslateSrcPad": { "hierarchy": [ - "GstTranslationSrcPad", + "GstTranslateSrcPad", "GstPad", "GstObject", "GInitiallyUnowned", diff --git a/net/aws/src/transcriber/imp.rs b/net/aws/src/transcriber/imp.rs index 4188d9cd..b001fb01 100644 --- a/net/aws/src/transcriber/imp.rs +++ b/net/aws/src/transcriber/imp.rs @@ -13,7 +13,7 @@ //! The element can optionally translate the resulting transcripts to one or //! multiple languages. //! -//! This module contains the element implementation as well as the `TranslationSrcPad` +//! This module contains the element implementation as well as the `TranslateSrcPad` //! sublcass and its `TranslationPadTask`. //! //! Web service specific code can be found in the `transcribe` and `translate` modules. @@ -34,7 +34,7 @@ use std::sync::Mutex; use once_cell::sync::Lazy; use super::transcribe::{TranscriberLoop, TranscriptEvent, TranscriptItem, TranscriptionSettings}; -use super::translate::{TranslatedItem, TranslationLoop, TranslationQueue}; +use super::translate::{TranslateLoop, TranslateQueue, TranslatedItem}; use super::{ AwsTranscriberResultStability, AwsTranscriberVocabularyFilterMethod, TranslationTokenizationMethod, CAT, @@ -59,8 +59,8 @@ pub const DEFAULT_TRANSCRIBE_LATENCY: gst::ClockTime = gst::ClockTime::from_seco const TRANSLATE_LATENCY_PROPERTY: &str = "translate-latency"; pub const DEFAULT_TRANSLATE_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(500); -const TRANSCRIPT_LOOKAHEAD_PROPERTY: &str = "transcript-lookahead"; -pub const DEFAULT_TRANSCRIPT_LOOKAHEAD: gst::ClockTime = gst::ClockTime::from_seconds(5); +const TRANSLATE_LOOKAHEAD_PROPERTY: &str = "translate-lookahead"; +pub const DEFAULT_TRANSLATE_LOOKAHEAD: gst::ClockTime = gst::ClockTime::from_seconds(5); const DEFAULT_LATENESS: gst::ClockTime = gst::ClockTime::ZERO; pub const DEFAULT_INPUT_LANG_CODE: &str = "en-US"; @@ -82,7 +82,7 @@ const TRANSLATION_TOKENIZATION_PROPERTY: &str = "tokenization-method"; pub(super) struct Settings { transcribe_latency: gst::ClockTime, translate_latency: gst::ClockTime, - transcript_lookahead: gst::ClockTime, + translate_lookahead: gst::ClockTime, lateness: gst::ClockTime, pub language_code: String, pub vocabulary: Option, @@ -100,7 +100,7 @@ impl Default for Settings { Self { transcribe_latency: DEFAULT_TRANSCRIBE_LATENCY, translate_latency: DEFAULT_TRANSLATE_LATENCY, - transcript_lookahead: DEFAULT_TRANSCRIPT_LOOKAHEAD, + translate_lookahead: DEFAULT_TRANSLATE_LOOKAHEAD, lateness: DEFAULT_LATENESS, language_code: DEFAULT_INPUT_LANG_CODE.to_string(), vocabulary: None, @@ -118,7 +118,7 @@ impl Default for Settings { pub(super) struct State { buffer_tx: Option>, transcriber_loop_handle: Option>>, - srcpads: BTreeSet, + srcpads: BTreeSet, pad_serial: u32, pub seqnum: gst::Seqnum, } @@ -136,12 +136,12 @@ impl Default for State { } pub struct Transcriber { - static_srcpad: super::TranslationSrcPad, + static_srcpad: super::TranslateSrcPad, sinkpad: gst::Pad, settings: Mutex, state: Mutex, pub(super) aws_config: Mutex>, - // sender to broadcast transcript items to the translation src pads. + // sender to broadcast transcript items to the translate src pads. transcript_event_tx: broadcast::Sender, } @@ -385,30 +385,30 @@ impl ObjectSubclass for Transcriber { let templ = klass.pad_template("src").unwrap(); let static_srcpad = - gst::PadBuilder::::from_template(&templ, Some("src")) + gst::PadBuilder::::from_template(&templ, Some("src")) .activatemode_function(|pad, parent, mode, active| { Transcriber::catch_panic_pad_function( parent, || { Err(gst::loggable_error!( CAT, - "Panic activating TranslationSrcPad" + "Panic activating TranslateSrcPad" )) }, - |elem| TranslationSrcPad::activatemode(elem, pad, mode, active), + |elem| TranslateSrcPad::activatemode(elem, pad, mode, active), ) }) .query_function(|pad, parent, query| { Transcriber::catch_panic_pad_function( parent, || false, - |elem| TranslationSrcPad::src_query(elem, pad, query), + |elem| TranslateSrcPad::src_query(elem, pad, query), ) }) .flags(gst::PadFlags::FIXED_CAPS) .build(); - // Setting the channel capacity so that a TranslationSrcPad that would lag + // Setting the channel capacity so that a TranslateSrcPad that would lag // behind for some reasons get a chance to catch-up without loosing items. // Receiver will be created by subscribing to sender later. let (transcript_event_tx, _) = broadcast::channel(128); @@ -458,13 +458,13 @@ impl ObjectImpl for Transcriber { .default_value(DEFAULT_TRANSLATE_LATENCY.mseconds() as u32) .mutable_ready() .build(), - glib::ParamSpecUInt::builder(TRANSCRIPT_LOOKAHEAD_PROPERTY) - .nick("Transcript lookahead") + glib::ParamSpecUInt::builder(TRANSLATE_LOOKAHEAD_PROPERTY) + .nick("Translate lookahead") .blurb(concat!( "Maximum duration in milliseconds of transcript to lookahead ", "before sending to translation when no separator was encountered", )) - .default_value(DEFAULT_TRANSCRIPT_LOOKAHEAD.mseconds() as u32) + .default_value(DEFAULT_TRANSLATE_LOOKAHEAD.mseconds() as u32) .mutable_ready() .build(), glib::ParamSpecUInt::builder("lateness") @@ -554,8 +554,8 @@ impl ObjectImpl for Transcriber { self.settings.lock().unwrap().translate_latency = gst::ClockTime::from_mseconds(value.get::().unwrap().into()); } - TRANSCRIPT_LOOKAHEAD_PROPERTY => { - self.settings.lock().unwrap().transcript_lookahead = + TRANSLATE_LOOKAHEAD_PROPERTY => { + self.settings.lock().unwrap().translate_lookahead = gst::ClockTime::from_mseconds(value.get::().unwrap().into()); } "lateness" => { @@ -621,13 +621,9 @@ impl ObjectImpl for Transcriber { TRANSLATE_LATENCY_PROPERTY => { (self.settings.lock().unwrap().translate_latency.mseconds() as u32).to_value() } - TRANSCRIPT_LOOKAHEAD_PROPERTY => (self - .settings - .lock() - .unwrap() - .transcript_lookahead - .mseconds() as u32) - .to_value(), + TRANSLATE_LOOKAHEAD_PROPERTY => { + (self.settings.lock().unwrap().translate_lookahead.mseconds() as u32).to_value() + } "lateness" => { let settings = self.settings.lock().unwrap(); (settings.lateness.mseconds() as u32).to_value() @@ -695,15 +691,15 @@ impl ElementImpl for Transcriber { gst::PadDirection::Src, gst::PadPresence::Always, &src_caps, - super::TranslationSrcPad::static_type(), + super::TranslateSrcPad::static_type(), ) .unwrap(); let req_src_pad_template = gst::PadTemplate::with_gtype( - "translation_src_%u", + "translate_src_%u", gst::PadDirection::Src, gst::PadPresence::Request, &src_caps, - super::TranslationSrcPad::static_type(), + super::TranslateSrcPad::static_type(), ) .unwrap(); @@ -765,9 +761,9 @@ impl ElementImpl for Transcriber { ) -> Option { let mut state = self.state.lock().unwrap(); - let pad = gst::PadBuilder::::from_template( + let pad = gst::PadBuilder::::from_template( templ, - Some(format!("translation_src_{}", state.pad_serial).as_str()), + Some(format!("translate_src_{}", state.pad_serial).as_str()), ) .activatemode_function(|pad, parent, mode, active| { Transcriber::catch_panic_pad_function( @@ -775,17 +771,17 @@ impl ElementImpl for Transcriber { || { Err(gst::loggable_error!( CAT, - "Panic activating TranslationSrcPad" + "Panic activating TranslateSrcPad" )) }, - |elem| TranslationSrcPad::activatemode(elem, pad, mode, active), + |elem| TranslateSrcPad::activatemode(elem, pad, mode, active), ) }) .query_function(|pad, parent, query| { Transcriber::catch_panic_pad_function( parent, || false, - |elem| TranslationSrcPad::src_query(elem, pad, query), + |elem| TranslateSrcPad::src_query(elem, pad, query), ) }) .flags(gst::PadFlags::FIXED_CAPS) @@ -849,16 +845,16 @@ impl ChildProxyImpl for Transcriber { } } struct TranslationPadTask { - pad: glib::subclass::ObjectImplRef, + pad: glib::subclass::ObjectImplRef, elem: super::Transcriber, transcript_event_rx: broadcast::Receiver, needs_translate: bool, - translation_queue: TranslationQueue, - translation_loop_handle: Option>>, - to_translation_tx: Option>>, - from_translation_rx: Option>>, + translate_queue: TranslateQueue, + translate_loop_handle: Option>>, + to_translate_tx: Option>>, + from_translate_rx: Option>>, translate_latency: gst::ClockTime, - transcript_lookahead: gst::ClockTime, + translate_lookahead: gst::ClockTime, send_events: bool, translated_items: VecDeque, our_latency: gst::ClockTime, @@ -870,7 +866,7 @@ struct TranslationPadTask { impl TranslationPadTask { fn try_new( - pad: &TranslationSrcPad, + pad: &TranslateSrcPad, elem: super::Transcriber, transcript_event_rx: broadcast::Receiver, ) -> Result { @@ -879,12 +875,12 @@ impl TranslationPadTask { elem, transcript_event_rx, needs_translate: false, - translation_queue: TranslationQueue::default(), - translation_loop_handle: None, - to_translation_tx: None, - from_translation_rx: None, + translate_queue: TranslateQueue::default(), + translate_loop_handle: None, + to_translate_tx: None, + from_translate_rx: None, translate_latency: DEFAULT_TRANSLATE_LATENCY, - transcript_lookahead: DEFAULT_TRANSCRIPT_LOOKAHEAD, + translate_lookahead: DEFAULT_TRANSLATE_LOOKAHEAD, send_events: true, translated_items: VecDeque::new(), our_latency: DEFAULT_TRANSCRIBE_LATENCY, @@ -903,8 +899,8 @@ impl TranslationPadTask { impl Drop for TranslationPadTask { fn drop(&mut self) { - if let Some(translation_loop_handle) = self.translation_loop_handle.take() { - translation_loop_handle.abort(); + if let Some(translate_loop_handle) = self.translate_loop_handle.take() { + translate_loop_handle.abort(); } } } @@ -969,11 +965,11 @@ impl TranslationPadTask { async fn translate_iter(&mut self) -> Result<(), gst::ErrorMessage> { if self - .translation_loop_handle + .translate_loop_handle .as_ref() .map_or(true, task::JoinHandle::is_finished) { - const ERR: &str = "Translation loop is not running"; + const ERR: &str = "Translate loop is not running"; gst::error!(CAT, imp: self.pad, "{ERR}"); return Err(gst::error_msg!(gst::StreamError::Failed, ["{ERR}"])); } @@ -983,8 +979,8 @@ impl TranslationPadTask { let timeout = tokio::time::sleep(GRANULARITY.into()).fuse(); futures::pin_mut!(timeout); - let from_translation_rx = self - .from_translation_rx + let from_translate_rx = self + .from_translate_rx .as_mut() .expect("from_translation chan must be available in translation mode"); @@ -996,7 +992,7 @@ impl TranslationPadTask { // before current latency budget is exhausted. futures::select_biased! { _ = timeout => return Ok(()), - translated_items = from_translation_rx.next() => { + translated_items = from_translate_rx.next() => { let Some(translated_items) = translated_items else { const ERR: &str = "translation chan terminated"; gst::debug!(CAT, imp: self.pad, "{ERR}"); @@ -1033,7 +1029,7 @@ impl TranslationPadTask { }; for items in transcript_items.iter() { - if let Some(ready_items) = self.translation_queue.push(items) { + if let Some(ready_items) = self.translate_queue.push(items) { self.send_for_translation(ready_items).await?; } } @@ -1062,7 +1058,7 @@ impl TranslationPadTask { discont_pending = pad_state.discont_pending; } - if self.needs_translate && !self.translation_queue.is_empty() { + if self.needs_translate && !self.translate_queue.is_empty() { // Maximum delay for an item to be pushed to stream on time // Margin: // - 1 * GRANULARITY: the time it will take before we can check this again, @@ -1078,8 +1074,8 @@ impl TranslationPadTask { let deadline = translation_eta.saturating_sub(max_delay); if let Some(ready_items) = self - .translation_queue - .dequeue(deadline, self.transcript_lookahead) + .translate_queue + .dequeue(deadline, self.translate_lookahead) { gst::debug!(CAT, imp: self.pad, "Forcing {} transcripts to translation", ready_items.len()); if self.send_for_translation(ready_items).await.is_err() { @@ -1184,7 +1180,7 @@ impl TranslationPadTask { if self.send_eos && self.pending_translations == 0 && self.translated_items.is_empty() - && self.translation_queue.is_empty() + && self.translate_queue.is_empty() { /* We're EOS, we can pause and exit early */ let _ = self.pad.obj().pause_task(); @@ -1241,7 +1237,7 @@ impl TranslationPadTask { transcript_items: Vec, ) -> Result<(), gst::ErrorMessage> { let res = self - .to_translation_tx + .to_translate_tx .as_mut() .expect("to_translation chan must be available in translation mode") .send(transcript_items) @@ -1317,7 +1313,7 @@ impl TranslationPadTask { let pad_settings = self.pad.settings.lock().unwrap(); - self.our_latency = TranslationSrcPad::our_latency(&elem_settings, &pad_settings); + self.our_latency = TranslateSrcPad::our_latency(&elem_settings, &pad_settings); if self.our_latency + elem_settings.lateness <= 2 * GRANULARITY { let err = format!( "total latency + lateness must be greater than {}", @@ -1328,35 +1324,35 @@ impl TranslationPadTask { } self.translate_latency = elem_settings.translate_latency; - self.transcript_lookahead = elem_settings.transcript_lookahead; + self.translate_lookahead = elem_settings.translate_lookahead; - self.needs_translate = TranslationSrcPad::needs_translation( + self.needs_translate = TranslateSrcPad::needs_translation( &elem_settings.language_code, pad_settings.language_code.as_deref(), ); if self.needs_translate { - let (to_translation_tx, to_translation_rx) = mpsc::channel(64); - let (from_translation_tx, from_translation_rx) = mpsc::channel(64); + let (to_translate_tx, to_translate_rx) = mpsc::channel(64); + let (from_translate_tx, from_translate_rx) = mpsc::channel(64); - translation_loop = Some(TranslationLoop::new( + translation_loop = Some(TranslateLoop::new( elem_imp, &self.pad, &elem_settings.language_code, pad_settings.language_code.as_deref().unwrap(), pad_settings.tokenization_method, - to_translation_rx, - from_translation_tx, + to_translate_rx, + from_translate_tx, )); - self.to_translation_tx = Some(to_translation_tx); - self.from_translation_rx = Some(from_translation_rx); + self.to_translate_tx = Some(to_translate_tx); + self.from_translate_rx = Some(from_translate_rx); } } if let Some(translation_loop) = translation_loop { translation_loop.check_language().await?; - self.translation_loop_handle = Some(RUNTIME.spawn(translation_loop.run())); + self.translate_loop_handle = Some(RUNTIME.spawn(translation_loop.run())); } Ok(()) @@ -1381,18 +1377,18 @@ impl Default for TranslationPadState { } #[derive(Debug, Default, Clone)] -struct TranslationPadSettings { +struct TranslatePadSettings { language_code: Option, tokenization_method: TranslationTokenizationMethod, } #[derive(Debug, Default)] -pub struct TranslationSrcPad { +pub struct TranslateSrcPad { state: Mutex, - settings: Mutex, + settings: Mutex, } -impl TranslationSrcPad { +impl TranslateSrcPad { fn start_task(&self) -> Result<(), gst::LoggableError> { gst::debug!(CAT, imp: self, "Starting task"); @@ -1457,14 +1453,14 @@ impl TranslationSrcPad { #[inline] fn our_latency( elem_settings: &Settings, - pad_settings: &TranslationPadSettings, + pad_settings: &TranslatePadSettings, ) -> gst::ClockTime { if Self::needs_translation( &elem_settings.language_code, pad_settings.language_code.as_deref(), ) { elem_settings.transcribe_latency - + elem_settings.transcript_lookahead + + elem_settings.translate_lookahead + elem_settings.translate_latency } else { elem_settings.transcribe_latency @@ -1484,11 +1480,11 @@ impl TranslationSrcPad { } } -impl TranslationSrcPad { +impl TranslateSrcPad { #[track_caller] pub fn activatemode( _elem: &Transcriber, - pad: &super::TranslationSrcPad, + pad: &super::TranslateSrcPad, _mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError> { @@ -1503,7 +1499,7 @@ impl TranslationSrcPad { pub fn src_query( elem: &Transcriber, - pad: &super::TranslationSrcPad, + pad: &super::TranslateSrcPad, query: &mut gst::QueryRef, ) -> bool { gst::log!(CAT, obj: pad, "Handling query {query:?}"); @@ -1553,9 +1549,9 @@ impl TranslationSrcPad { } #[glib::object_subclass] -impl ObjectSubclass for TranslationSrcPad { - const NAME: &'static str = "GstTranslationSrcPad"; - type Type = super::TranslationSrcPad; +impl ObjectSubclass for TranslateSrcPad { + const NAME: &'static str = "GstTranslateSrcPad"; + type Type = super::TranslateSrcPad; type ParentType = gst::Pad; fn new() -> Self { @@ -1563,7 +1559,7 @@ impl ObjectSubclass for TranslationSrcPad { } } -impl ObjectImpl for TranslationSrcPad { +impl ObjectImpl for TranslateSrcPad { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![ @@ -1608,6 +1604,6 @@ impl ObjectImpl for TranslationSrcPad { } } -impl GstObjectImpl for TranslationSrcPad {} +impl GstObjectImpl for TranslateSrcPad {} -impl PadImpl for TranslationSrcPad {} +impl PadImpl for TranslateSrcPad {} diff --git a/net/aws/src/transcriber/mod.rs b/net/aws/src/transcriber/mod.rs index faad2748..2d168323 100644 --- a/net/aws/src/transcriber/mod.rs +++ b/net/aws/src/transcriber/mod.rs @@ -99,7 +99,7 @@ glib::wrapper! { } glib::wrapper! { - pub struct TranslationSrcPad(ObjectSubclass) @extends gst::Pad, gst::Object; + pub struct TranslateSrcPad(ObjectSubclass) @extends gst::Pad, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { @@ -111,7 +111,7 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { .mark_as_plugin_api(gst::PluginAPIFlags::empty()); TranslationTokenizationMethod::static_type() .mark_as_plugin_api(gst::PluginAPIFlags::empty()); - TranslationSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + TranslateSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); } gst::Element::register( Some(plugin), diff --git a/net/aws/src/transcriber/translate.rs b/net/aws/src/transcriber/translate.rs index fc49674d..951a4080 100644 --- a/net/aws/src/transcriber/translate.rs +++ b/net/aws/src/transcriber/translate.rs @@ -16,7 +16,7 @@ use futures::prelude::*; use std::collections::VecDeque; -use super::imp::TranslationSrcPad; +use super::imp::TranslateSrcPad; use super::transcribe::TranscriptItem; use super::{TranslationTokenizationMethod, CAT}; @@ -41,11 +41,11 @@ impl From<&TranscriptItem> for TranslatedItem { } #[derive(Default)] -pub struct TranslationQueue { +pub struct TranslateQueue { items: VecDeque, } -impl TranslationQueue { +impl TranslateQueue { pub fn is_empty(&self) -> bool { self.items.is_empty() } @@ -102,39 +102,39 @@ impl TranslationQueue { } } -pub struct TranslationLoop { - pad: glib::subclass::ObjectImplRef, +pub struct TranslateLoop { + pad: glib::subclass::ObjectImplRef, client: aws_translate::Client, input_lang: String, output_lang: String, tokenization_method: TranslationTokenizationMethod, transcript_rx: mpsc::Receiver>, - translation_tx: mpsc::Sender>, + translate_tx: mpsc::Sender>, } -impl TranslationLoop { +impl TranslateLoop { pub fn new( imp: &super::imp::Transcriber, - pad: &TranslationSrcPad, + pad: &TranslateSrcPad, input_lang: &str, output_lang: &str, tokenization_method: TranslationTokenizationMethod, transcript_rx: mpsc::Receiver>, - translation_tx: mpsc::Sender>, + translate_tx: mpsc::Sender>, ) -> Self { let aws_config = imp.aws_config.lock().unwrap(); let aws_config = aws_config .as_ref() .expect("aws_config must be initialized at this stage"); - TranslationLoop { + TranslateLoop { pad: pad.ref_counted(), client: aws_sdk_translate::Client::new(aws_config), input_lang: input_lang.to_string(), output_lang: output_lang.to_string(), tokenization_method, transcript_rx, - translation_tx, + translate_tx, } } @@ -227,7 +227,7 @@ impl TranslationLoop { gst::trace!(CAT, imp: self.pad, "Sending {translated_items:?}"); - if self.translation_tx.send(translated_items).await.is_err() { + if self.translate_tx.send(translated_items).await.is_err() { gst::info!( CAT, imp: self.pad,