speechmatics: expose unsynced pads on transcriber

This can be used for storing original transcripts for editing after the
fact.

Modeled on the aws transcriber, to be usable from transcriberbin.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1963>
This commit is contained in:
Mathieu Duponchelle 2024-12-02 17:01:50 +01:00 committed by GStreamer Marge Bot
parent c9a0731e61
commit 4e722d6dcc
2 changed files with 165 additions and 31 deletions

View file

@ -247,6 +247,14 @@ pub struct Transcriber {
}
impl TranscriberSrcPad {
fn set_unsynced_pad(&self, pad: &gst::Pad) {
self.state.lock().unwrap().unsynced_pad = Some(pad.clone());
}
fn take_unsynced_pad(&self) -> Option<gst::Pad> {
self.state.lock().unwrap().unsynced_pad.take()
}
fn dequeue(&self) -> bool {
let Some(parent) = self.obj().parent() else {
return true;
@ -392,20 +400,6 @@ impl TranscriberSrcPad {
);
}
if self.obj().sticky_event::<gst::event::Segment>(0).is_none() {
gst::debug!(
CAT,
imp = self,
"Constructing segment event from {:?}",
state_guard.out_segment
);
events.push(
gst::event::Segment::builder(&state_guard.out_segment)
.seqnum(state_guard.seqnum)
.build(),
);
}
drop(state_guard);
for event in events {
@ -564,6 +558,28 @@ impl TranscriberSrcPad {
gst::info!(CAT, imp = self, "Parsed translation {:?}", translation);
let unsynced_pad = self.state.lock().unwrap().unsynced_pad.clone();
if let Some(unsynced_pad) = unsynced_pad {
if unsynced_pad.last_flow_result().is_ok() {
let now = transcriber.current_running_time().unwrap();
let mut buf = gst::Buffer::from_mut_slice(text.into_bytes());
{
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(now);
}
gst::log!(
CAT,
obj = unsynced_pad,
"Pushing original transcript with timestamp {now}",
);
let _ = unsynced_pad.push(buf);
}
}
let lateness = (transcriber.imp().settings.lock().unwrap().lateness_ms
as f64
/ 1_000.) as f32;
@ -625,6 +641,28 @@ impl TranscriberSrcPad {
transcript
);
let unsynced_pad = self.state.lock().unwrap().unsynced_pad.clone();
if let Some(unsynced_pad) = unsynced_pad {
if unsynced_pad.last_flow_result().is_ok() {
let now = transcriber.current_running_time().unwrap();
let mut buf = gst::Buffer::from_mut_slice(text.into_bytes());
{
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(now);
}
gst::log!(
CAT,
obj = unsynced_pad,
"Pushing original transcript with timestamp {now}",
);
let _ = unsynced_pad.push(buf);
}
}
let lateness = (transcriber.imp().settings.lock().unwrap().lateness_ms
as f64
/ 1_000.) as f32;
@ -859,28 +897,62 @@ impl Transcriber {
Ok(segment) => segment,
};
let srcpads = {
let mut state = self.state.lock().unwrap();
state.in_segment = segment.clone();
state.srcpads.clone()
};
for srcpad in &state.srcpads {
for srcpad in &srcpads {
let (unsynced_pad, out_segment, seqnum) = {
let mut sstate = srcpad.imp().state.lock().unwrap();
sstate.out_segment.set_time(segment.time());
sstate.out_segment.set_position(gst::ClockTime::ZERO);
sstate.seqnum = e.seqnum();
srcpad.sticky_events_foreach(|e| {
if let gst::EventView::Segment(_) = e.view() {
std::ops::ControlFlow::Continue(gst::EventForeachAction::Remove)
} else {
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
}
});
}
(
sstate.unsynced_pad.clone(),
sstate.out_segment.clone(),
sstate.seqnum,
)
};
state.in_segment = segment;
let out_event = gst::event::Segment::builder(&out_segment)
.seqnum(seqnum)
.build();
srcpad.push_event(out_event.clone());
if let Some(unsynced_pad) = unsynced_pad {
unsynced_pad.push_event(out_event);
}
}
true
}
gst::EventView::Tag(_) => true,
gst::EventView::Caps(_) => true,
gst::EventView::Caps(_) => {
let srcpads = self.state.lock().unwrap().srcpads.clone();
for srcpad in &srcpads {
let (unsynced_pad, seqnum) = {
let sstate = srcpad.imp().state.lock().unwrap();
(sstate.unsynced_pad.clone(), sstate.seqnum)
};
let caps = gst::Caps::builder("text/x-raw")
.field("format", "utf8")
.build();
let out_event = gst::event::Caps::builder(&caps).seqnum(seqnum).build();
srcpad.push_event(out_event);
if let Some(unsynced_pad) = unsynced_pad {
let caps = gst::Caps::builder("application/x-json").build();
let out_event = gst::event::Caps::builder(&caps).seqnum(seqnum).build();
unsynced_pad.push_event(out_event);
}
}
true
}
_ => gst::Pad::event_default(pad, Some(&*self.obj()), event),
}
}
@ -1445,7 +1517,16 @@ impl ObjectImpl for Transcriber {
.flags(gst::PadFlags::FIXED_CAPS)
.build();
obj.add_pad(&srcpad).unwrap();
let templ = obj.class().pad_template("unsynced_src").unwrap();
let unsynced_srcpad = gst::PadBuilder::<gst::Pad>::from_template(&templ)
.flags(gst::PadFlags::FIXED_CAPS)
.build();
obj.add_pad(&unsynced_srcpad).unwrap();
srcpad.imp().set_unsynced_pad(&unsynced_srcpad);
self.state.lock().unwrap().srcpads.insert(srcpad);
obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
}
@ -1598,6 +1679,21 @@ impl ElementImpl for Transcriber {
super::TranscriberSrcPad::static_type(),
)
.unwrap();
let src_caps = gst::Caps::builder("application/x-json").build();
let unsynced_src_pad_template = gst::PadTemplate::new(
"unsynced_src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&src_caps,
)
.unwrap();
let unsynced_sometimes_src_pad_template = gst::PadTemplate::new(
"unsynced_translate_src_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&src_caps,
)
.unwrap();
let sink_caps = gst_audio::AudioCapsBuilder::new()
.format(gst_audio::AUDIO_FORMAT_S16)
@ -1612,7 +1708,13 @@ impl ElementImpl for Transcriber {
)
.unwrap();
vec![src_pad_template, req_src_pad_template, sink_pad_template]
vec![
src_pad_template,
req_src_pad_template,
sink_pad_template,
unsynced_src_pad_template,
unsynced_sometimes_src_pad_template,
]
});
PAD_TEMPLATES.as_ref()
@ -1650,6 +1752,17 @@ impl ElementImpl for Transcriber {
.flags(gst::PadFlags::FIXED_CAPS)
.build();
let templ = self
.obj()
.class()
.pad_template("unsynced_translate_src_%u")
.unwrap();
let unsynced_srcpad = gst::PadBuilder::<gst::Pad>::from_template(&templ)
.name(format!("unsynced_translate_src_{}", state.pad_serial).as_str())
.flags(gst::PadFlags::FIXED_CAPS)
.build();
pad.imp().set_unsynced_pad(&unsynced_srcpad);
state.srcpads.insert(pad.clone());
gst::info!(CAT, "New pad requested, {}", state.srcpads.len());
@ -1658,6 +1771,7 @@ impl ElementImpl for Transcriber {
drop(state);
self.obj().add_pad(&pad).unwrap();
self.obj().add_pad(&unsynced_srcpad).unwrap();
self.obj().child_added(&pad, &pad.name());
@ -1667,6 +1781,14 @@ impl ElementImpl for Transcriber {
fn release_pad(&self, pad: &gst::Pad) {
pad.set_active(false).unwrap();
self.obj().remove_pad(pad).unwrap();
self.state.lock().unwrap().srcpads.remove(pad);
let transcribe_srcpad = pad.downcast_ref::<super::TranscriberSrcPad>().unwrap();
if let Some(unsynced_pad) = transcribe_srcpad.imp().take_unsynced_pad() {
unsynced_pad.set_active(false).unwrap();
self.obj().remove_pad(&unsynced_pad).unwrap();
}
self.obj().child_removed(pad, &pad.name());
let _ = self
@ -1721,6 +1843,7 @@ struct TranscriberSrcPadState {
send_eos: bool,
out_segment: gst::FormattedSegment<gst::ClockTime>,
seqnum: gst::Seqnum,
unsynced_pad: Option<gst::Pad>,
}
impl Default for TranscriberSrcPadState {
@ -1733,6 +1856,7 @@ impl Default for TranscriberSrcPadState {
send_eos: false,
out_segment: gst::FormattedSegment::new(),
seqnum: gst::Seqnum::next(),
unsynced_pad: None,
}
}
}

View file

@ -12952,6 +12952,16 @@
"direction": "src",
"presence": "request",
"type": "GstSpeechmaticsTranscriberSrcPad"
},
"unsynced_src": {
"caps": "application/x-json:\n",
"direction": "src",
"presence": "always"
},
"unsynced_translate_src_%%u": {
"caps": "application/x-json:\n",
"direction": "src",
"presence": "sometimes"
}
},
"properties": {