diff --git a/audio/speechmatics/src/transcriber/imp.rs b/audio/speechmatics/src/transcriber/imp.rs index 73e334ac..7278839a 100644 --- a/audio/speechmatics/src/transcriber/imp.rs +++ b/audio/speechmatics/src/transcriber/imp.rs @@ -247,6 +247,14 @@ pub struct Transcriber { } impl TranscriberSrcPad { + fn set_unsynced_pad(&self, pad: &gst::Pad) { + self.state.lock().unwrap().unsynced_pad = Some(pad.clone()); + } + + fn take_unsynced_pad(&self) -> Option { + self.state.lock().unwrap().unsynced_pad.take() + } + fn dequeue(&self) -> bool { let Some(parent) = self.obj().parent() else { return true; @@ -392,20 +400,6 @@ impl TranscriberSrcPad { ); } - if self.obj().sticky_event::(0).is_none() { - gst::debug!( - CAT, - imp = self, - "Constructing segment event from {:?}", - state_guard.out_segment - ); - events.push( - gst::event::Segment::builder(&state_guard.out_segment) - .seqnum(state_guard.seqnum) - .build(), - ); - } - drop(state_guard); for event in events { @@ -564,6 +558,28 @@ impl TranscriberSrcPad { gst::info!(CAT, imp = self, "Parsed translation {:?}", translation); + let unsynced_pad = self.state.lock().unwrap().unsynced_pad.clone(); + + if let Some(unsynced_pad) = unsynced_pad { + if unsynced_pad.last_flow_result().is_ok() { + let now = transcriber.current_running_time().unwrap(); + let mut buf = gst::Buffer::from_mut_slice(text.into_bytes()); + { + let buf_mut = buf.get_mut().unwrap(); + + buf_mut.set_pts(now); + } + + gst::log!( + CAT, + obj = unsynced_pad, + "Pushing original transcript with timestamp {now}", + ); + + let _ = unsynced_pad.push(buf); + } + } + let lateness = (transcriber.imp().settings.lock().unwrap().lateness_ms as f64 / 1_000.) as f32; @@ -625,6 +641,28 @@ impl TranscriberSrcPad { transcript ); + let unsynced_pad = self.state.lock().unwrap().unsynced_pad.clone(); + + if let Some(unsynced_pad) = unsynced_pad { + if unsynced_pad.last_flow_result().is_ok() { + let now = transcriber.current_running_time().unwrap(); + let mut buf = gst::Buffer::from_mut_slice(text.into_bytes()); + { + let buf_mut = buf.get_mut().unwrap(); + + buf_mut.set_pts(now); + } + + gst::log!( + CAT, + obj = unsynced_pad, + "Pushing original transcript with timestamp {now}", + ); + + let _ = unsynced_pad.push(buf); + } + } + let lateness = (transcriber.imp().settings.lock().unwrap().lateness_ms as f64 / 1_000.) as f32; @@ -859,28 +897,62 @@ impl Transcriber { Ok(segment) => segment, }; - let mut state = self.state.lock().unwrap(); + let srcpads = { + let mut state = self.state.lock().unwrap(); + state.in_segment = segment.clone(); + state.srcpads.clone() + }; - for srcpad in &state.srcpads { - let mut sstate = srcpad.imp().state.lock().unwrap(); - sstate.out_segment.set_time(segment.time()); - sstate.out_segment.set_position(gst::ClockTime::ZERO); - sstate.seqnum = e.seqnum(); - srcpad.sticky_events_foreach(|e| { - if let gst::EventView::Segment(_) = e.view() { - std::ops::ControlFlow::Continue(gst::EventForeachAction::Remove) - } else { - std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) - } - }); + for srcpad in &srcpads { + let (unsynced_pad, out_segment, seqnum) = { + let mut sstate = srcpad.imp().state.lock().unwrap(); + sstate.out_segment.set_time(segment.time()); + sstate.out_segment.set_position(gst::ClockTime::ZERO); + sstate.seqnum = e.seqnum(); + ( + sstate.unsynced_pad.clone(), + sstate.out_segment.clone(), + sstate.seqnum, + ) + }; + + let out_event = gst::event::Segment::builder(&out_segment) + .seqnum(seqnum) + .build(); + srcpad.push_event(out_event.clone()); + + if let Some(unsynced_pad) = unsynced_pad { + unsynced_pad.push_event(out_event); + } } - state.in_segment = segment; - true } gst::EventView::Tag(_) => true, - gst::EventView::Caps(_) => true, + gst::EventView::Caps(_) => { + let srcpads = self.state.lock().unwrap().srcpads.clone(); + + for srcpad in &srcpads { + let (unsynced_pad, seqnum) = { + let sstate = srcpad.imp().state.lock().unwrap(); + (sstate.unsynced_pad.clone(), sstate.seqnum) + }; + + let caps = gst::Caps::builder("text/x-raw") + .field("format", "utf8") + .build(); + let out_event = gst::event::Caps::builder(&caps).seqnum(seqnum).build(); + srcpad.push_event(out_event); + + if let Some(unsynced_pad) = unsynced_pad { + let caps = gst::Caps::builder("application/x-json").build(); + let out_event = gst::event::Caps::builder(&caps).seqnum(seqnum).build(); + unsynced_pad.push_event(out_event); + } + } + + true + } _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), } } @@ -1445,7 +1517,16 @@ impl ObjectImpl for Transcriber { .flags(gst::PadFlags::FIXED_CAPS) .build(); obj.add_pad(&srcpad).unwrap(); + + let templ = obj.class().pad_template("unsynced_src").unwrap(); + let unsynced_srcpad = gst::PadBuilder::::from_template(&templ) + .flags(gst::PadFlags::FIXED_CAPS) + .build(); + obj.add_pad(&unsynced_srcpad).unwrap(); + srcpad.imp().set_unsynced_pad(&unsynced_srcpad); + self.state.lock().unwrap().srcpads.insert(srcpad); + obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK); } @@ -1598,6 +1679,21 @@ impl ElementImpl for Transcriber { super::TranscriberSrcPad::static_type(), ) .unwrap(); + let src_caps = gst::Caps::builder("application/x-json").build(); + let unsynced_src_pad_template = gst::PadTemplate::new( + "unsynced_src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &src_caps, + ) + .unwrap(); + let unsynced_sometimes_src_pad_template = gst::PadTemplate::new( + "unsynced_translate_src_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &src_caps, + ) + .unwrap(); let sink_caps = gst_audio::AudioCapsBuilder::new() .format(gst_audio::AUDIO_FORMAT_S16) @@ -1612,7 +1708,13 @@ impl ElementImpl for Transcriber { ) .unwrap(); - vec![src_pad_template, req_src_pad_template, sink_pad_template] + vec![ + src_pad_template, + req_src_pad_template, + sink_pad_template, + unsynced_src_pad_template, + unsynced_sometimes_src_pad_template, + ] }); PAD_TEMPLATES.as_ref() @@ -1650,6 +1752,17 @@ impl ElementImpl for Transcriber { .flags(gst::PadFlags::FIXED_CAPS) .build(); + let templ = self + .obj() + .class() + .pad_template("unsynced_translate_src_%u") + .unwrap(); + let unsynced_srcpad = gst::PadBuilder::::from_template(&templ) + .name(format!("unsynced_translate_src_{}", state.pad_serial).as_str()) + .flags(gst::PadFlags::FIXED_CAPS) + .build(); + + pad.imp().set_unsynced_pad(&unsynced_srcpad); state.srcpads.insert(pad.clone()); gst::info!(CAT, "New pad requested, {}", state.srcpads.len()); @@ -1658,6 +1771,7 @@ impl ElementImpl for Transcriber { drop(state); self.obj().add_pad(&pad).unwrap(); + self.obj().add_pad(&unsynced_srcpad).unwrap(); self.obj().child_added(&pad, &pad.name()); @@ -1667,6 +1781,14 @@ impl ElementImpl for Transcriber { fn release_pad(&self, pad: &gst::Pad) { pad.set_active(false).unwrap(); self.obj().remove_pad(pad).unwrap(); + self.state.lock().unwrap().srcpads.remove(pad); + + let transcribe_srcpad = pad.downcast_ref::().unwrap(); + + if let Some(unsynced_pad) = transcribe_srcpad.imp().take_unsynced_pad() { + unsynced_pad.set_active(false).unwrap(); + self.obj().remove_pad(&unsynced_pad).unwrap(); + } self.obj().child_removed(pad, &pad.name()); let _ = self @@ -1721,6 +1843,7 @@ struct TranscriberSrcPadState { send_eos: bool, out_segment: gst::FormattedSegment, seqnum: gst::Seqnum, + unsynced_pad: Option, } impl Default for TranscriberSrcPadState { @@ -1733,6 +1856,7 @@ impl Default for TranscriberSrcPadState { send_eos: false, out_segment: gst::FormattedSegment::new(), seqnum: gst::Seqnum::next(), + unsynced_pad: None, } } } diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 581d8541..93ada7fb 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -12952,6 +12952,16 @@ "direction": "src", "presence": "request", "type": "GstSpeechmaticsTranscriberSrcPad" + }, + "unsynced_src": { + "caps": "application/x-json:\n", + "direction": "src", + "presence": "always" + }, + "unsynced_translate_src_%%u": { + "caps": "application/x-json:\n", + "direction": "src", + "presence": "sometimes" } }, "properties": {