diff --git a/Cargo.lock b/Cargo.lock index 8fb93c47..8ffcb556 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2870,6 +2870,25 @@ dependencies = [ "sodiumoxide", ] +[[package]] +name = "gst-plugin-speechmatics" +version = "0.14.0-alpha.1" +dependencies = [ + "async-tungstenite", + "atomic_refcell", + "futures", + "gst-plugin-version-helper", + "gstreamer", + "gstreamer-audio", + "gstreamer-base", + "http 1.1.0", + "once_cell", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "gst-plugin-spotify" version = "0.14.0-alpha.1" diff --git a/Cargo.toml b/Cargo.toml index 6089e10b..6995218a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "audio/claxon", "audio/csound", "audio/lewton", + "audio/speechmatics", "audio/spotify", "generic/file", diff --git a/audio/speechmatics/Cargo.toml b/audio/speechmatics/Cargo.toml new file mode 100644 index 00000000..9fd5a258 --- /dev/null +++ b/audio/speechmatics/Cargo.toml @@ -0,0 +1,50 @@ +[package] +name = "gst-plugin-speechmatics" +version.workspace = true +authors = ["Mathieu Duponchelle "] +repository.workspace = true +license = "MPL-2.0" +description = "GStreamer Speechmatics plugin" +edition.workspace = true +rust-version.workspace = true + +[dependencies] +futures = "0.3" +gst.workspace = true +gst-base.workspace = true +gst-audio = { workspace = true, features = ["v1_16"] } +tokio = { version = "1", features = [ "full" ] } +async-tungstenite = { version = "0.27", features = ["tokio", "tokio-runtime", "tokio-native-tls"] } +once_cell.workspace = true +serde = { version = "1", features = ["derive"] } +serde_json = "1" +atomic_refcell = "0.1" +http = { version = "1.0" } +url = "2" + +[lib] +name = "gstspeechmatics" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[build-dependencies] +gst-plugin-version-helper.workspace = true + +[features] +static = [] +capi = [] +doc = ["gst/v1_18"] + +[package.metadata.capi] +min_version = "0.9.21" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false +import_library = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gstreamer-base-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/audio/speechmatics/LICENSE-MPL-2.0 b/audio/speechmatics/LICENSE-MPL-2.0 new file mode 100644 index 00000000..14e2f777 --- /dev/null +++ b/audio/speechmatics/LICENSE-MPL-2.0 @@ -0,0 +1,373 @@ +Mozilla Public License Version 2.0 +================================== + +1. Definitions +-------------- + +1.1. "Contributor" + means each individual or legal entity that creates, contributes to + the creation of, or owns Covered Software. + +1.2. "Contributor Version" + means the combination of the Contributions of others (if any) used + by a Contributor and that particular Contributor's Contribution. + +1.3. "Contribution" + means Covered Software of a particular Contributor. + +1.4. "Covered Software" + means Source Code Form to which the initial Contributor has attached + the notice in Exhibit A, the Executable Form of such Source Code + Form, and Modifications of such Source Code Form, in each case + including portions thereof. + +1.5. "Incompatible With Secondary Licenses" + means + + (a) that the initial Contributor has attached the notice described + in Exhibit B to the Covered Software; or + + (b) that the Covered Software was made available under the terms of + version 1.1 or earlier of the License, but not also under the + terms of a Secondary License. + +1.6. "Executable Form" + means any form of the work other than Source Code Form. + +1.7. "Larger Work" + means a work that combines Covered Software with other material, in + a separate file or files, that is not Covered Software. + +1.8. "License" + means this document. + +1.9. "Licensable" + means having the right to grant, to the maximum extent possible, + whether at the time of the initial grant or subsequently, any and + all of the rights conveyed by this License. + +1.10. "Modifications" + means any of the following: + + (a) any file in Source Code Form that results from an addition to, + deletion from, or modification of the contents of Covered + Software; or + + (b) any new file in Source Code Form that contains any Covered + Software. + +1.11. "Patent Claims" of a Contributor + means any patent claim(s), including without limitation, method, + process, and apparatus claims, in any patent Licensable by such + Contributor that would be infringed, but for the grant of the + License, by the making, using, selling, offering for sale, having + made, import, or transfer of either its Contributions or its + Contributor Version. + +1.12. "Secondary License" + means either the GNU General Public License, Version 2.0, the GNU + Lesser General Public License, Version 2.1, the GNU Affero General + Public License, Version 3.0, or any later versions of those + licenses. + +1.13. "Source Code Form" + means the form of the work preferred for making modifications. + +1.14. "You" (or "Your") + means an individual or a legal entity exercising rights under this + License. For legal entities, "You" includes any entity that + controls, is controlled by, or is under common control with You. For + purposes of this definition, "control" means (a) the power, direct + or indirect, to cause the direction or management of such entity, + whether by contract or otherwise, or (b) ownership of more than + fifty percent (50%) of the outstanding shares or beneficial + ownership of such entity. + +2. License Grants and Conditions +-------------------------------- + +2.1. Grants + +Each Contributor hereby grants You a world-wide, royalty-free, +non-exclusive license: + +(a) under intellectual property rights (other than patent or trademark) + Licensable by such Contributor to use, reproduce, make available, + modify, display, perform, distribute, and otherwise exploit its + Contributions, either on an unmodified basis, with Modifications, or + as part of a Larger Work; and + +(b) under Patent Claims of such Contributor to make, use, sell, offer + for sale, have made, import, and otherwise transfer either its + Contributions or its Contributor Version. + +2.2. Effective Date + +The licenses granted in Section 2.1 with respect to any Contribution +become effective for each Contribution on the date the Contributor first +distributes such Contribution. + +2.3. Limitations on Grant Scope + +The licenses granted in this Section 2 are the only rights granted under +this License. No additional rights or licenses will be implied from the +distribution or licensing of Covered Software under this License. +Notwithstanding Section 2.1(b) above, no patent license is granted by a +Contributor: + +(a) for any code that a Contributor has removed from Covered Software; + or + +(b) for infringements caused by: (i) Your and any other third party's + modifications of Covered Software, or (ii) the combination of its + Contributions with other software (except as part of its Contributor + Version); or + +(c) under Patent Claims infringed by Covered Software in the absence of + its Contributions. + +This License does not grant any rights in the trademarks, service marks, +or logos of any Contributor (except as may be necessary to comply with +the notice requirements in Section 3.4). + +2.4. Subsequent Licenses + +No Contributor makes additional grants as a result of Your choice to +distribute the Covered Software under a subsequent version of this +License (see Section 10.2) or under the terms of a Secondary License (if +permitted under the terms of Section 3.3). + +2.5. Representation + +Each Contributor represents that the Contributor believes its +Contributions are its original creation(s) or it has sufficient rights +to grant the rights to its Contributions conveyed by this License. + +2.6. Fair Use + +This License is not intended to limit any rights You have under +applicable copyright doctrines of fair use, fair dealing, or other +equivalents. + +2.7. Conditions + +Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted +in Section 2.1. + +3. Responsibilities +------------------- + +3.1. Distribution of Source Form + +All distribution of Covered Software in Source Code Form, including any +Modifications that You create or to which You contribute, must be under +the terms of this License. You must inform recipients that the Source +Code Form of the Covered Software is governed by the terms of this +License, and how they can obtain a copy of this License. You may not +attempt to alter or restrict the recipients' rights in the Source Code +Form. + +3.2. Distribution of Executable Form + +If You distribute Covered Software in Executable Form then: + +(a) such Covered Software must also be made available in Source Code + Form, as described in Section 3.1, and You must inform recipients of + the Executable Form how they can obtain a copy of such Source Code + Form by reasonable means in a timely manner, at a charge no more + than the cost of distribution to the recipient; and + +(b) You may distribute such Executable Form under the terms of this + License, or sublicense it under different terms, provided that the + license for the Executable Form does not attempt to limit or alter + the recipients' rights in the Source Code Form under this License. + +3.3. Distribution of a Larger Work + +You may create and distribute a Larger Work under terms of Your choice, +provided that You also comply with the requirements of this License for +the Covered Software. If the Larger Work is a combination of Covered +Software with a work governed by one or more Secondary Licenses, and the +Covered Software is not Incompatible With Secondary Licenses, this +License permits You to additionally distribute such Covered Software +under the terms of such Secondary License(s), so that the recipient of +the Larger Work may, at their option, further distribute the Covered +Software under the terms of either this License or such Secondary +License(s). + +3.4. Notices + +You may not remove or alter the substance of any license notices +(including copyright notices, patent notices, disclaimers of warranty, +or limitations of liability) contained within the Source Code Form of +the Covered Software, except that You may alter any license notices to +the extent required to remedy known factual inaccuracies. + +3.5. Application of Additional Terms + +You may choose to offer, and to charge a fee for, warranty, support, +indemnity or liability obligations to one or more recipients of Covered +Software. However, You may do so only on Your own behalf, and not on +behalf of any Contributor. You must make it absolutely clear that any +such warranty, support, indemnity, or liability obligation is offered by +You alone, and You hereby agree to indemnify every Contributor for any +liability incurred by such Contributor as a result of warranty, support, +indemnity or liability terms You offer. You may include additional +disclaimers of warranty and limitations of liability specific to any +jurisdiction. + +4. Inability to Comply Due to Statute or Regulation +--------------------------------------------------- + +If it is impossible for You to comply with any of the terms of this +License with respect to some or all of the Covered Software due to +statute, judicial order, or regulation then You must: (a) comply with +the terms of this License to the maximum extent possible; and (b) +describe the limitations and the code they affect. Such description must +be placed in a text file included with all distributions of the Covered +Software under this License. Except to the extent prohibited by statute +or regulation, such description must be sufficiently detailed for a +recipient of ordinary skill to be able to understand it. + +5. Termination +-------------- + +5.1. The rights granted under this License will terminate automatically +if You fail to comply with any of its terms. However, if You become +compliant, then the rights granted under this License from a particular +Contributor are reinstated (a) provisionally, unless and until such +Contributor explicitly and finally terminates Your grants, and (b) on an +ongoing basis, if such Contributor fails to notify You of the +non-compliance by some reasonable means prior to 60 days after You have +come back into compliance. Moreover, Your grants from a particular +Contributor are reinstated on an ongoing basis if such Contributor +notifies You of the non-compliance by some reasonable means, this is the +first time You have received notice of non-compliance with this License +from such Contributor, and You become compliant prior to 30 days after +Your receipt of the notice. + +5.2. If You initiate litigation against any entity by asserting a patent +infringement claim (excluding declaratory judgment actions, +counter-claims, and cross-claims) alleging that a Contributor Version +directly or indirectly infringes any patent, then the rights granted to +You by any and all Contributors for the Covered Software under Section +2.1 of this License shall terminate. + +5.3. In the event of termination under Sections 5.1 or 5.2 above, all +end user license agreements (excluding distributors and resellers) which +have been validly granted by You or Your distributors under this License +prior to termination shall survive termination. + +************************************************************************ +* * +* 6. Disclaimer of Warranty * +* ------------------------- * +* * +* Covered Software is provided under this License on an "as is" * +* basis, without warranty of any kind, either expressed, implied, or * +* statutory, including, without limitation, warranties that the * +* Covered Software is free of defects, merchantable, fit for a * +* particular purpose or non-infringing. The entire risk as to the * +* quality and performance of the Covered Software is with You. * +* Should any Covered Software prove defective in any respect, You * +* (not any Contributor) assume the cost of any necessary servicing, * +* repair, or correction. This disclaimer of warranty constitutes an * +* essential part of this License. No use of any Covered Software is * +* authorized under this License except under this disclaimer. * +* * +************************************************************************ + +************************************************************************ +* * +* 7. Limitation of Liability * +* -------------------------- * +* * +* Under no circumstances and under no legal theory, whether tort * +* (including negligence), contract, or otherwise, shall any * +* Contributor, or anyone who distributes Covered Software as * +* permitted above, be liable to You for any direct, indirect, * +* special, incidental, or consequential damages of any character * +* including, without limitation, damages for lost profits, loss of * +* goodwill, work stoppage, computer failure or malfunction, or any * +* and all other commercial damages or losses, even if such party * +* shall have been informed of the possibility of such damages. This * +* limitation of liability shall not apply to liability for death or * +* personal injury resulting from such party's negligence to the * +* extent applicable law prohibits such limitation. Some * +* jurisdictions do not allow the exclusion or limitation of * +* incidental or consequential damages, so this exclusion and * +* limitation may not apply to You. * +* * +************************************************************************ + +8. Litigation +------------- + +Any litigation relating to this License may be brought only in the +courts of a jurisdiction where the defendant maintains its principal +place of business and such litigation shall be governed by laws of that +jurisdiction, without reference to its conflict-of-law provisions. +Nothing in this Section shall prevent a party's ability to bring +cross-claims or counter-claims. + +9. Miscellaneous +---------------- + +This License represents the complete agreement concerning the subject +matter hereof. If any provision of this License is held to be +unenforceable, such provision shall be reformed only to the extent +necessary to make it enforceable. Any law or regulation which provides +that the language of a contract shall be construed against the drafter +shall not be used to construe this License against a Contributor. + +10. Versions of the License +--------------------------- + +10.1. New Versions + +Mozilla Foundation is the license steward. Except as provided in Section +10.3, no one other than the license steward has the right to modify or +publish new versions of this License. Each version will be given a +distinguishing version number. + +10.2. Effect of New Versions + +You may distribute the Covered Software under the terms of the version +of the License under which You originally received the Covered Software, +or under the terms of any subsequent version published by the license +steward. + +10.3. Modified Versions + +If you create software not governed by this License, and you want to +create a new license for such software, you may create and use a +modified version of this License if you rename the license and remove +any references to the name of the license steward (except to note that +such modified license differs from this License). + +10.4. Distributing Source Code Form that is Incompatible With Secondary +Licenses + +If You choose to distribute Source Code Form that is Incompatible With +Secondary Licenses under the terms of this version of the License, the +notice described in Exhibit B of this License must be attached. + +Exhibit A - Source Code Form License Notice +------------------------------------------- + + This Source Code Form is subject to the terms of the Mozilla Public + License, v. 2.0. If a copy of the MPL was not distributed with this + file, You can obtain one at http://mozilla.org/MPL/2.0/. + +If it is not possible or desirable to put the notice in a particular +file, then You may include the notice in a location (such as a LICENSE +file in a relevant directory) where a recipient would be likely to look +for such a notice. + +You may add additional accurate notices of copyright ownership. + +Exhibit B - "Incompatible With Secondary Licenses" Notice +--------------------------------------------------------- + + This Source Code Form is "Incompatible With Secondary Licenses", as + defined by the Mozilla Public License, v. 2.0. diff --git a/audio/speechmatics/build.rs b/audio/speechmatics/build.rs new file mode 100644 index 00000000..cda12e57 --- /dev/null +++ b/audio/speechmatics/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} diff --git a/audio/speechmatics/src/lib.rs b/audio/speechmatics/src/lib.rs new file mode 100644 index 00000000..95b403a1 --- /dev/null +++ b/audio/speechmatics/src/lib.rs @@ -0,0 +1,36 @@ +// Copyright (C) 2024 Mathieu Duponchelle +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)] +#![recursion_limit = "128"] + +/** + * plugin-speechmatics: + * + * Since: plugins-rs-0.14.0 + */ +use gst::glib; + +mod transcriber; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + transcriber::register(plugin)?; + + Ok(()) +} + +gst::plugin_define!( + speechmatics, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "Proprietary", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/audio/speechmatics/src/transcriber/imp.rs b/audio/speechmatics/src/transcriber/imp.rs new file mode 100644 index 00000000..c85de272 --- /dev/null +++ b/audio/speechmatics/src/transcriber/imp.rs @@ -0,0 +1,1778 @@ +// Copyright (C) 2024 Mathieu Duponchelle +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::subclass::prelude::*; +use gst::{glib, prelude::*}; + +use std::default::Default; + +use async_tungstenite::tungstenite::error::Error as WsError; +use async_tungstenite::{tokio::connect_async, tungstenite::Message}; +use futures::channel::mpsc; +use futures::future::{abortable, AbortHandle}; +use futures::prelude::*; +use http::Request; +use tokio::runtime; +use url::Url; + +use std::collections::{BTreeSet, VecDeque}; +use std::pin::Pin; +use std::sync::Mutex; +use std::time::Duration; + +use atomic_refcell::AtomicRefCell; + +use once_cell::sync::Lazy; + +#[derive(serde::Deserialize, Debug)] +#[allow(dead_code)] +struct TranscriptMetadata { + start_time: f32, + end_time: f32, + transcript: String, +} + +#[derive(serde::Deserialize, Debug)] +#[allow(dead_code)] +struct TranscriptDisplay { + direction: String, +} + +#[derive(serde::Deserialize, Debug)] +#[allow(dead_code)] +struct TranscriptAlternative { + content: String, + confidence: f32, + display: Option, + language: Option, + #[serde(default)] + tags: Vec, +} + +#[derive(serde::Deserialize, Debug)] +#[allow(dead_code)] +struct TranscriptResult { + #[serde(rename = "type")] + type_: String, + start_time: f32, + end_time: f32, + #[serde(default)] + is_eos: bool, + #[serde(default)] + alternatives: Vec, +} + +#[derive(serde::Deserialize, Debug)] +#[allow(dead_code)] +struct Transcript { + metadata: TranscriptMetadata, + results: Vec, +} + +#[derive(serde::Deserialize, Debug)] +#[allow(dead_code)] +struct TranslationResult { + start_time: f32, + end_time: f32, + content: String, +} + +#[derive(serde::Deserialize, Debug)] +#[allow(dead_code)] +struct Translation { + language: String, + results: Vec, +} + +#[derive(serde::Deserialize, Debug)] +#[allow(dead_code)] +struct TranscriptError { + code: Option, + #[serde(rename = "type")] + type_: String, + reason: String, +} + +#[derive(serde::Serialize, Debug)] +struct AudioType { + #[serde(rename = "type")] + type_: String, + encoding: String, + sample_rate: u32, +} + +#[derive(serde::Serialize, Debug, Clone)] +struct Vocable { + content: String, + #[serde(skip_serializing_if = "Vec::is_empty")] + sounds_like: Vec, +} + +#[derive(serde::Serialize, Debug)] +struct TranscriptionConfig { + language: String, + enable_partials: bool, + max_delay: f32, + additional_vocab: Vec, +} + +#[derive(serde::Serialize, Debug)] +struct TranslationConfig { + target_languages: Vec, + enable_partials: bool, +} + +#[derive(serde::Serialize, Debug)] +struct StartRecognition { + message: String, + audio_format: AudioType, + transcription_config: TranscriptionConfig, + translation_config: TranslationConfig, +} + +#[derive(serde::Serialize, Debug)] +struct EndOfStream { + message: String, + last_seq_no: u64, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "speechmaticstranscribe", + gst::DebugColorFlags::empty(), + Some("Speechmatics Transcribe element"), + ) +}); + +static RUNTIME: Lazy = Lazy::new(|| { + runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(1) + .build() + .unwrap() +}); + +const DEFAULT_LATENCY_MS: u32 = 8000; +const DEFAULT_LATENESS_MS: u32 = 0; +const GRANULARITY_MS: u32 = 100; + +#[derive(Debug, Clone)] +struct Settings { + latency_ms: u32, + lateness_ms: u32, + language_code: Option, + url: Option, + api_key: Option, +} + +impl Default for Settings { + fn default() -> Self { + Self { + latency_ms: DEFAULT_LATENCY_MS, + lateness_ms: DEFAULT_LATENESS_MS, + language_code: Some("en".to_string()), + url: Some("ws://0.0.0.0:9000".to_string()), + api_key: None, + } + } +} + +#[derive(Debug)] +struct ItemAccumulator { + text: String, + start_time: gst::ClockTime, + end_time: gst::ClockTime, +} + +impl From for gst::Buffer { + fn from(acc: ItemAccumulator) -> Self { + let mut buf = gst::Buffer::from_mut_slice(acc.text.into_bytes()); + + { + let buf = buf.get_mut().unwrap(); + buf.set_pts(acc.start_time); + buf.set_duration(acc.end_time - acc.start_time); + } + + buf + } +} + +struct State { + connected: bool, + recv_abort_handle: Option, + send_abort_handle: Option, + in_segment: gst::FormattedSegment, + seq_no: u64, + additional_vocabulary: Vec, + discont_offset: gst::ClockTime, + last_chained_buffer_rtime: Option, + pad_serial: u32, + srcpads: BTreeSet, +} + +impl State {} + +impl Default for State { + fn default() -> Self { + Self { + connected: false, + recv_abort_handle: None, + send_abort_handle: None, + in_segment: gst::FormattedSegment::new(), + seq_no: 0, + additional_vocabulary: vec![], + discont_offset: gst::ClockTime::ZERO, + last_chained_buffer_rtime: gst::ClockTime::NONE, + pad_serial: 0, + srcpads: BTreeSet::new(), + } + } +} + +type WsSink = Pin + Send + Sync>>; + +pub struct Transcriber { + sinkpad: gst::Pad, + settings: Mutex, + state: Mutex, + ws_sink: AtomicRefCell>, +} + +impl TranscriberSrcPad { + fn dequeue(&self) -> bool { + let Some(parent) = self.obj().parent() else { + return true; + }; + + let transcriber = parent + .downcast::() + .expect("parent is transcriber"); + + let latency = gst::ClockTime::from_mseconds( + transcriber.imp().settings.lock().unwrap().latency_ms as u64, + ); + let now = transcriber.current_running_time().unwrap(); + + /* First, check our pending buffers */ + let mut items = vec![]; + + let granularity = gst::ClockTime::from_mseconds(GRANULARITY_MS as u64); + + let (latency, now, mut last_position, send_eos, seqnum) = { + let mut state = self.state.lock().unwrap(); + + if let Some(ref mut accumulator_inner) = state.accumulator { + if now.saturating_sub(accumulator_inner.start_time) + granularity > latency { + gst::log!(CAT, "Finally draining accumulator"); + gst::debug!( + CAT, + imp = self, + "Item is ready: \"{:?}\", start_time: {}, end_time: {}", + accumulator_inner.text, + accumulator_inner.start_time, + accumulator_inner.end_time + ); + let buf = state.accumulator.take().unwrap().into(); + state.push_buffer(buf); + } + } + + let send_eos = + state.send_eos && state.buffers.is_empty() && state.accumulator.is_none(); + + while let Some(buf) = state.buffers.front() { + if now.saturating_sub(buf.pts().unwrap()) + granularity > latency { + /* Safe unwrap, we know we have an item */ + let buf = state.buffers.pop_front().unwrap(); + items.push(buf); + } else { + break; + } + } + + ( + latency, + now, + state.out_segment.position(), + send_eos, + state.seqnum, + ) + }; + + /* We're EOS, we can pause and exit early */ + if send_eos { + let _ = self.obj().pause_task(); + + return self + .obj() + .push_event(gst::event::Eos::builder().seqnum(seqnum).build()); + } + + for buf in items.drain(..) { + let pts = buf.pts().unwrap(); + + if let Some(last_position) = last_position { + if pts > last_position { + let gap_event = gst::event::Gap::builder(last_position) + .duration(pts - last_position) + .seqnum(seqnum) + .build(); + gst::log!(CAT, "Pushing gap: {} -> {}", last_position, pts); + if !self.obj().push_event(gap_event) { + return false; + } + } + } + + let pts_end = if let Some(duration) = buf.duration() { + pts + duration + } else { + pts + }; + last_position = Some(pts_end); + + gst::debug!(CAT, imp = self, "Pushing buffer: {} -> {}", pts, pts_end,); + + if self.obj().push(buf).is_err() { + return false; + } + } + + /* next, push a gap if we're lagging behind the target position */ + + if let Some(last_position_) = last_position { + if now >= last_position_ && now - last_position_ + granularity > latency { + // Invent caps/segment events if none were produced yet + let mut events = vec![]; + + let state_guard = self.state.lock().unwrap(); + if !self.obj().has_current_caps() { + let caps = gst::Caps::builder("text/x-raw") + .field("format", "utf8") + .build(); + events.push( + gst::event::Caps::builder(&caps) + .seqnum(state_guard.seqnum) + .build(), + ); + } + + if self.obj().sticky_event::(0).is_none() { + events.push( + gst::event::Segment::builder(&state_guard.out_segment) + .seqnum(state_guard.seqnum) + .build(), + ); + } + + drop(state_guard); + + for event in events { + self.obj().push_event(event); + } + + let duration = now - last_position_ + granularity - latency; + + let gap_event = gst::event::Gap::builder(last_position_) + .duration(duration) + .seqnum(seqnum) + .build(); + gst::log!( + CAT, + "Pushing gap: {} -> {}", + last_position_, + last_position_ + duration + ); + last_position = Some(last_position_ + duration); + if !self.obj().push_event(gap_event) { + return false; + } + } + + self.state + .lock() + .unwrap() + .out_segment + .set_position(last_position); + } + + true + } + + fn enqueue_translation(&self, state: &mut TranscriberSrcPadState, translation: &Translation) { + gst::log!(CAT, "Enqueuing {:?}", translation); + for item in &translation.results { + let mut start_time = + gst::ClockTime::from_nseconds((item.start_time as f64 * 1_000_000_000.0) as u64); + let mut end_time = + gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64); + + if let Some(position) = state.out_segment.position() { + if start_time < position { + gst::debug!( + CAT, + imp = self, + "Adjusting item timing({:?} < {:?})", + start_time, + position, + ); + start_time = position; + if end_time < start_time { + end_time = start_time; + } + } + } + + let mut buf = gst::Buffer::from_mut_slice(item.content.clone().into_bytes()); + + { + let buf = buf.get_mut().unwrap(); + buf.set_pts(start_time); + buf.set_duration(end_time - start_time); + } + + state.push_buffer(buf); + } + } + + fn enqueue_transcript(&self, state: &mut TranscriberSrcPadState, transcript: &Transcript) { + gst::log!(CAT, "Enqueuing {:?}", transcript); + for item in &transcript.results { + if let Some(alternative) = item.alternatives.first() { + let mut start_time = gst::ClockTime::from_nseconds( + (item.start_time as f64 * 1_000_000_000.0) as u64, + ); + let mut end_time = + gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64); + + if let Some(position) = state.out_segment.position() { + if start_time < position { + gst::debug!( + CAT, + imp = self, + "Adjusting item timing({:?} < {:?})", + start_time, + position, + ); + start_time = position; + if end_time < start_time { + end_time = start_time; + } + } + } + + if let Some(ref mut accumulator_inner) = state.accumulator { + if item.type_ == "punctuation" { + accumulator_inner.text.push_str(&alternative.content); + accumulator_inner.end_time = end_time; + } else { + gst::debug!( + CAT, + imp = self, + "Item is ready: \"{}\", start_time: {}, end_time: {}", + accumulator_inner.text, + accumulator_inner.start_time, + accumulator_inner.end_time + ); + + let buffer = state.accumulator.take().unwrap().into(); + + state.push_buffer(buffer); + state.accumulator = Some(ItemAccumulator { + text: alternative.content.clone(), + start_time, + end_time, + }); + } + } else { + state.accumulator = Some(ItemAccumulator { + text: alternative.content.clone(), + start_time, + end_time, + }); + } + } + } + } + + fn loop_fn(&self, receiver: &mut mpsc::Receiver) -> Result<(), gst::ErrorMessage> { + let future = async move { + let msg = match receiver.next().await { + Some(msg) => msg, + /* Sender was closed */ + None => { + let _ = self.obj().pause_task(); + return Ok(()); + } + }; + + let language_code = self.settings.lock().unwrap().language_code.clone(); + + match msg { + Message::Text(text) => { + let mut json: serde_json::Value = serde_json::from_str(&text).unwrap(); + let message_type = { + let obj = json.as_object_mut().expect("object"); + let Some(message_type) = obj.remove("message") else { + return Err(gst::error_msg!( + gst::StreamError::Failed, + ["Missing message field in object: {}", text] + )); + }; + if let serde_json::Value::String(s) = message_type { + s + } else { + panic!("message field not a string"); + } + }; + + match message_type.as_str() { + "AddTranslation" => { + let Some(parent) = self.obj().parent() else { + return Ok(()); + }; + + let transcriber = parent + .downcast::() + .expect("parent is transcriber"); + + let Some(language_code) = language_code else { + return Ok(()); + }; + + let mut translation: Translation = serde_json::from_value(json) + .map_err(|err| { + gst::error_msg!( + gst::StreamError::Failed, + ["Unexpected message: {} ({})", text, err] + ) + })?; + + if translation.language != language_code { + return Ok(()); + } + + gst::info!(CAT, imp = self, "Parsed translation {:?}", translation); + + let lateness = (transcriber.imp().settings.lock().unwrap().lateness_ms + as f64 + / 1_000.) as f32; + let discont_offset = + (transcriber + .imp() + .state + .lock() + .unwrap() + .discont_offset + .nseconds() as f64 + / 1_000_000_000.0) as f32; + + gst::info!( + CAT, + imp = self, + "Introducing {} lateness and adding discont offset {}", + lateness, + discont_offset + ); + + for item in translation.results.iter_mut() { + item.start_time += lateness + discont_offset; + item.end_time += lateness + discont_offset; + } + + if !translation.results.is_empty() { + let mut state = self.state.lock().unwrap(); + self.enqueue_translation(&mut state, &translation); + } + } + "AddTranscript" | "AddPartialTranscript" => { + /* This pad outputs translations */ + if language_code.is_some() { + return Ok(()); + } + let Some(parent) = self.obj().parent() else { + return Ok(()); + }; + + let transcriber = parent + .downcast::() + .expect("parent is transcriber"); + + let is_partial = message_type == "AddPartialTranscript"; + let mut transcript: Transcript = + serde_json::from_value(json).map_err(|err| { + gst::error_msg!( + gst::StreamError::Failed, + ["Unexpected message: {} ({})", text, err] + ) + })?; + + gst::info!( + CAT, + imp = self, + "Parsed {} transcript {:?}", + if is_partial { "partial" } else { "final" }, + transcript + ); + + let lateness = (transcriber.imp().settings.lock().unwrap().lateness_ms + as f64 + / 1_000.) as f32; + let discont_offset = + (transcriber + .imp() + .state + .lock() + .unwrap() + .discont_offset + .nseconds() as f64 + / 1_000_000_000.0) as f32; + + gst::info!( + CAT, + imp = self, + "Introducing {} lateness and adding discont offset {}", + lateness, + discont_offset + ); + + transcript.metadata.start_time += lateness + discont_offset; + transcript.metadata.end_time += lateness + discont_offset; + + for item in transcript.results.iter_mut() { + item.start_time += lateness + discont_offset; + item.end_time += lateness + discont_offset; + } + + if !transcript.results.is_empty() { + let mut state = self.state.lock().unwrap(); + self.enqueue_transcript(&mut state, &transcript); + } + } + "EndOfTranscript" => { + let mut state = self.state.lock().unwrap(); + state.send_eos = true; + } + _ => (), + } + + Ok(()) + } + _ => Ok(()), + } + }; + + /* Wrap in a timeout so we can push gaps regularly */ + let future = async move { + match tokio::time::timeout(Duration::from_millis(GRANULARITY_MS.into()), future).await { + Err(_) => { + if !self.dequeue() { + gst::info!(CAT, imp = self, "Failed to push gap event, pausing"); + + let _ = self.obj().pause_task(); + } + Ok(()) + } + Ok(res) => { + if !self.dequeue() { + gst::info!(CAT, imp = self, "Failed to push gap event, pausing"); + + let _ = self.obj().pause_task(); + } + res + } + } + }; + + RUNTIME.block_on(future) + } + + fn start_task(&self) -> Result<(), gst::LoggableError> { + let this_weak = self.downgrade(); + let pad_weak = self.obj().downgrade(); + let (sender, mut receiver) = mpsc::channel(1); + + self.state.lock().unwrap().sender = Some(sender); + + let res = self.obj().start_task(move || { + let Some(this) = this_weak.upgrade() else { + if let Some(pad) = pad_weak.upgrade() { + let _ = pad.pause_task(); + } + return; + }; + + if let Err(err) = this.loop_fn(&mut receiver) { + let parent = this + .obj() + .parent() + .and_downcast::() + .expect("has parent"); + gst::element_error!( + parent, + gst::StreamError::Failed, + ["Streaming failed: {}", err] + ); + let _ = this.obj().pause_task(); + } + }); + if res.is_err() { + return Err(gst::loggable_error!(CAT, "Failed to start pad task")); + } + Ok(()) + } + + fn stop_task(&self) -> Result<(), glib::BoolError> { + self.state.lock().unwrap().sender = None; + + self.obj().stop_task() + } +} + +impl Transcriber { + fn src_activatemode( + &self, + pad: &super::TranscriberSrcPad, + _mode: gst::PadMode, + active: bool, + ) -> Result<(), gst::LoggableError> { + if active { + pad.imp().start_task()?; + } else { + pad.imp().stop_task()?; + } + + Ok(()) + } + + fn src_query(&self, pad: &super::TranscriberSrcPad, query: &mut gst::QueryRef) -> bool { + gst::log!(CAT, obj = pad, "Handling query {:?}", query); + + match query.view_mut() { + gst::QueryViewMut::Latency(ref mut q) => { + let mut peer_query = gst::query::Latency::new(); + + let ret = self.sinkpad.peer_query(&mut peer_query); + + if ret { + let (_, min, _) = peer_query.result(); + let our_latency = gst::ClockTime::from_mseconds( + self.settings.lock().unwrap().latency_ms as u64, + ); + q.set(true, our_latency + min, gst::ClockTime::NONE); + } + ret + } + gst::QueryViewMut::Position(ref mut q) => { + if q.format() == gst::Format::Time { + let sstate = pad.imp().state.lock().unwrap(); + q.set( + sstate + .out_segment + .to_stream_time(sstate.out_segment.position()), + ); + true + } else { + false + } + } + _ => gst::Pad::query_default(pad, Some(&*self.obj()), query), + } + } + + fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + gst::debug!(CAT, obj = pad, "Handling event {:?}", event); + + match event.view() { + gst::EventView::Eos(_) => match self.handle_buffer(pad, None) { + Err(err) => { + gst::error!(CAT, "Failed to send EOS: {}", err); + false + } + Ok(_) => true, + }, + gst::EventView::FlushStart(_) => { + gst::info!(CAT, imp = self, "Received flush start, disconnecting"); + match self.disconnect() { + Err(err) => { + self.post_error_message(err); + false + } + Ok(_) => { + let mut ret = gst::Pad::event_default(pad, Some(&*self.obj()), event); + + let state = self.state.lock().unwrap(); + for srcpad in &state.srcpads { + if let Err(err) = srcpad.imp().stop_task() { + gst::error!(CAT, imp = self, "Failed to stop srcpad task: {}", err); + ret = false; + } + } + + ret + } + } + } + gst::EventView::FlushStop(_) => { + gst::info!(CAT, imp = self, "Received flush stop, restarting task"); + + if gst::Pad::event_default(pad, Some(&*self.obj()), event) { + let state = self.state.lock().unwrap(); + for srcpad in &state.srcpads { + if let Err(err) = srcpad.imp().start_task() { + gst::error!(CAT, imp = self, "Failed to start srcpad task: {}", err); + return false; + } + } + true + } else { + false + } + } + gst::EventView::Segment(e) => { + let segment = match e.segment().clone().downcast::() { + Err(segment) => { + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Only Time segments supported, got {:?}", segment.format(),] + ); + return false; + } + Ok(segment) => segment, + }; + + let mut state = self.state.lock().unwrap(); + + for srcpad in &state.srcpads { + let mut sstate = srcpad.imp().state.lock().unwrap(); + sstate.out_segment.set_time(segment.time()); + sstate.out_segment.set_position(gst::ClockTime::ZERO); + sstate.seqnum = e.seqnum(); + srcpad.sticky_events_foreach(|e| { + if let gst::EventView::Segment(_) = e.view() { + std::ops::ControlFlow::Continue(gst::EventForeachAction::Remove) + } else { + std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) + } + }); + } + + state.in_segment = segment; + + true + } + gst::EventView::Tag(_) => true, + gst::EventView::Caps(_) => true, + _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), + } + } + + async fn sync_and_send( + &self, + buffer: Option, + ) -> Result { + let mut delay = None; + let mut n_chunks = 0; + + { + let mut state = self.state.lock().unwrap(); + + if let Some(ref buffer) = buffer { + let running_time = state + .in_segment + .to_running_time(buffer.pts().expect("Checked in sink_chain()")); + let now = self.obj().current_running_time().unwrap(); + + if let Some(running_time) = running_time { + delay = running_time.checked_sub(now); + + if buffer.flags().contains(gst::BufferFlags::DISCONT) { + for srcpad in &state.srcpads { + let mut sstate = srcpad.imp().state.lock().unwrap(); + sstate.discont = true; + } + if let Some(last_chained_buffer_rtime) = state.last_chained_buffer_rtime { + state.discont_offset += + running_time.saturating_sub(last_chained_buffer_rtime); + } + } + + state.last_chained_buffer_rtime = Some(running_time); + } + } + } + + if let Some(delay) = delay { + tokio::time::sleep(Duration::from_nanos(delay.nseconds())).await; + } + + if let Some(ws_sink) = self.ws_sink.borrow_mut().as_mut() { + if let Some(buffer) = buffer { + let data = buffer.map_readable().unwrap(); + for chunk in data.chunks(8192) { + ws_sink + .send(Message::Binary(chunk.to_vec())) + .await + .map_err(|err| { + gst::error!(CAT, imp = self, "Failed sending packet: {}", err); + gst::FlowError::Error + })?; + n_chunks += 1; + } + } else { + let end_message = EndOfStream { + message: "EndOfStream".to_string(), + last_seq_no: self.state.lock().unwrap().seq_no, + }; + let message = serde_json::to_string(&end_message).unwrap(); + + ws_sink.send(Message::Text(message)).await.map_err(|err| { + gst::error!(CAT, imp = self, "Failed sending packet: {}", err); + gst::FlowError::Error + })?; + } + } + + self.state.lock().unwrap().seq_no += n_chunks; + + Ok(gst::FlowSuccess::Ok) + } + + fn handle_buffer( + &self, + _pad: &gst::Pad, + buffer: Option, + ) -> Result { + gst::trace!(CAT, imp = self, "Handling {:?}", buffer); + + self.ensure_connection().map_err(|err| { + // No need to worry too much here, we didn't have a session to + // terminate in the first place + if buffer.is_none() { + return gst::FlowError::Eos; + } + gst::element_imp_error!( + self, + gst::StreamError::Failed, + ["Streaming failed: {}", err] + ); + gst::FlowError::Error + })?; + + let (future, abort_handle) = abortable(self.sync_and_send(buffer)); + + self.state.lock().unwrap().send_abort_handle = Some(abort_handle); + + let res = RUNTIME.block_on(future); + + match res { + Err(_) => Err(gst::FlowError::Flushing), + Ok(res) => res, + } + } + + fn sink_chain( + &self, + pad: &gst::Pad, + buffer: gst::Buffer, + ) -> Result { + if buffer.pts().is_none() { + gst::error!(CAT, imp = self, "Only buffers with PTS supported"); + return Err(gst::FlowError::Error); + } + + self.handle_buffer(pad, Some(buffer)) + } + + fn ensure_connection(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.lock().unwrap(); + + if state.connected { + return Ok(()); + } + + let in_caps = self + .sinkpad + .current_caps() + .ok_or_else(|| gst::error_msg!(gst::CoreError::Failed, ["No caps set on sinkpad"]))?; + let s = in_caps.structure(0).unwrap(); + let sample_rate: i32 = s.get::("rate").unwrap(); + + let settings = self.settings.lock().unwrap(); + + gst::info!(CAT, imp = self, "Connecting .."); + + let url = match &settings.url { + Some(url) => url.to_string(), + None => "ws://0.0.0.0:9000".to_string(), + }; + + let uri = Url::parse(&url).map_err(|e| { + gst::error_msg!( + gst::CoreError::Failed, + ["Failed to parse provided url: {}", e] + ) + })?; + let Some(api_key) = settings.api_key.clone() else { + return Err(gst::error_msg!( + gst::CoreError::Failed, + ["An API key is required"] + )); + }; + let authority = uri.authority(); + let host = authority.splitn(2, '@').last().unwrap_or(""); + + let request = Request::builder() + .method("GET") + .uri(&url) + .header("Host", host) + .header("Upgrade", "websocket") + .header("Connection", "keep-alive, upgrade") + .header( + "Sec-Websocket-Key", + async_tungstenite::tungstenite::handshake::client::generate_key(), + ) + .header("Sec-Websocket-Version", "13") + .header("Authorization", format!("Bearer {}", &api_key)) + .body(()) + .unwrap(); + + let (ws, _) = RUNTIME.block_on(connect_async(request)).map_err(|err| { + gst::error!(CAT, imp = self, "Failed to connect: {}", err); + gst::error_msg!(gst::CoreError::Failed, ["Failed to connect: {}", err]) + })?; + + let (mut ws_sink, mut ws_stream) = ws.split(); + + if settings.latency_ms + settings.lateness_ms < 4000 { + gst::error!( + CAT, + imp = self, + "latency + lateness must be superior to 4000 milliseconds" + ); + return Err(gst::error_msg!( + gst::LibraryError::Settings, + ["latency + lateness must be superior to 4000 milliseconds"] + )); + } + + let translation_languages = state + .srcpads + .iter() + .flat_map(|pad| pad.imp().settings.lock().unwrap().language_code.clone()) + .collect(); + gst::info!( + CAT, + "Translation languages: {:?} ({})", + translation_languages, + state.srcpads.len() + ); + + // Workaround for speechmatics sometimes outputting + // final punctuation in the next transcript + let max_delay = ((settings.latency_ms + settings.lateness_ms) as f32) / 2_000.; + + let start_message = StartRecognition { + message: "StartRecognition".to_string(), + audio_format: AudioType { + type_: "raw".to_string(), + encoding: "pcm_s16le".to_string(), + sample_rate: sample_rate as u32, + }, + transcription_config: TranscriptionConfig { + language: settings + .language_code + .clone() + .unwrap_or_else(|| "en".to_string()), + enable_partials: false, + max_delay, + additional_vocab: state.additional_vocabulary.clone(), + }, + translation_config: TranslationConfig { + target_languages: translation_languages, + enable_partials: false, + }, + }; + + let message = serde_json::to_string(&start_message).unwrap(); + + gst::trace!(CAT, imp = self, "Sending start message: {}", message); + + RUNTIME + .block_on(ws_sink.send(Message::Text(message))) + .map_err(|err| { + gst::error!(CAT, imp = self, "Failed to send StartRecognition: {err}"); + gst::error_msg!( + gst::CoreError::Failed, + ["Failed to send StartRecognition: {err}"] + ) + })?; + + loop { + let res = RUNTIME + .block_on(ws_stream.next()) + .ok_or_else(|| { + gst::error!(CAT, imp = self, "Connection closed unexpectedly"); + gst::error_msg!(gst::CoreError::Failed, ["Connection closed unexpectedly"]) + })? + .map_err(|err| { + gst::error!( + CAT, + imp = self, + "Failed to receive RecognitionStarted: {err}" + ); + gst::error_msg!( + gst::CoreError::Failed, + ["Failed to receive RecognitionStarted: {err}"] + ) + })?; + + let text = match res { + Message::Text(text) => Ok(text), + _ => { + gst::error!(CAT, imp = self, "Invalid message type: {}", res); + Err(gst::error_msg!( + gst::CoreError::Failed, + ["Invalid message type: {}", res] + )) + } + }?; + + let mut json: serde_json::Value = serde_json::from_str(&text).unwrap(); + let message_type = { + let obj = json.as_object_mut().expect("object"); + let message_type = obj.remove("message").expect("`message` field"); + if let serde_json::Value::String(s) = message_type { + s + } else { + panic!("message field not a string"); + } + }; + + match message_type.as_str() { + "RecognitionStarted" => { + gst::info!(CAT, imp = self, "Recognition started!"); + break; + } + "Error" => { + let error: TranscriptError = serde_json::from_value(json).map_err(|err| { + gst::error_msg!( + gst::StreamError::Failed, + ["Unexpected message: {} ({})", text, err] + ) + })?; + gst::error!( + CAT, + imp = self, + "StartRecognition failed: {} ({})", + error.type_, + error.reason + ); + Err(gst::error_msg!( + gst::CoreError::Failed, + ("StartRecognition failed"), + ["{} ({})", error.type_, error.reason] + )) + } + _ => { + continue; + } + }?; + } + + *self.ws_sink.borrow_mut() = Some(Box::pin(ws_sink)); + + let this_weak = self.downgrade(); + let future = async move { + 'outer: while let Some(this) = this_weak.upgrade() { + let Some(msg) = ws_stream.next().await else { + let state = this.state.lock().unwrap(); + for srcpad in &state.srcpads { + let mut sstate = srcpad.imp().state.lock().unwrap(); + sstate.send_eos = true; + } + break; + }; + + let msg = match msg { + Ok(msg) => msg, + Err(err) => { + gst::error!(CAT, imp = this, "Failed to receive data: {}", err); + gst::element_imp_error!( + this, + gst::StreamError::Failed, + ["Streaming failed: {}", err] + ); + break; + } + }; + + let srcpads = this.state.lock().unwrap().srcpads.clone(); + for srcpad in srcpads { + let mut sender = srcpad.imp().state.lock().unwrap().sender.clone(); + + if let Some(sender) = sender.as_mut() { + let msg = msg.clone(); + if sender.send(msg).await.is_err() { + break 'outer; + } + } + } + } + }; + + let (future, abort_handle) = abortable(future); + + state.recv_abort_handle = Some(abort_handle); + + RUNTIME.spawn(future); + + state.connected = true; + + gst::info!(CAT, imp = self, "Connected"); + + Ok(()) + } + + fn disconnect(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.lock().unwrap(); + + gst::info!(CAT, imp = self, "Unpreparing"); + + if let Some(abort_handle) = state.recv_abort_handle.take() { + abort_handle.abort(); + } + + if let Some(abort_handle) = state.send_abort_handle.take() { + abort_handle.abort(); + } + + let _ = self.sinkpad.stream_lock(); + + if let Some(mut ws_sink) = self.ws_sink.borrow_mut().take() { + RUNTIME.block_on(async { + let _ = ws_sink.close().await; + }); + } + + *state = State::default(); + + gst::info!( + CAT, + imp = self, + "Unprepared, connected: {}!", + state.connected + ); + + Ok(()) + } +} + +// Implementation of gst::ChildProxy virtual methods. +// +// This allows accessing the pads and their properties from e.g. gst-launch. +impl ChildProxyImpl for Transcriber { + fn children_count(&self) -> u32 { + let object = self.obj(); + object.num_pads() as u32 + } + + fn child_by_name(&self, name: &str) -> Option { + let object = self.obj(); + object + .pads() + .into_iter() + .find(|p| p.name() == name) + .map(|p| p.upcast()) + } + + fn child_by_index(&self, index: u32) -> Option { + let object = self.obj(); + object + .pads() + .into_iter() + .nth(index as usize) + .map(|p| p.upcast()) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for Transcriber { + const NAME: &'static str = "GstSpeechmaticsTranscriber"; + type Type = super::Transcriber; + type ParentType = gst::Element; + type Interfaces = (gst::ChildProxy,); + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_from_template(&templ) + .chain_function(|pad, parent, buffer| { + Transcriber::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |transcriber| transcriber.sink_chain(pad, buffer), + ) + }) + .event_function(|pad, parent, event| { + Transcriber::catch_panic_pad_function( + parent, + || false, + |transcriber| transcriber.sink_event(pad, event), + ) + }) + .build(); + + let settings = Mutex::new(Settings::default()); + + Self { + sinkpad, + settings, + state: Default::default(), + ws_sink: Default::default(), + } + } +} + +impl GstObjectImpl for Transcriber {} + +impl ObjectImpl for Transcriber { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecString::builder("language-code") + .nick("Language Code") + .blurb("The Language of the Stream, ISO code") + .default_value("en") + .build(), + glib::ParamSpecUInt::builder("latency") + .nick("Latency") + .blurb("Amount of milliseconds to allow for transcription") + .default_value(DEFAULT_LATENCY_MS) + .build(), + glib::ParamSpecUInt::builder("lateness") + .nick("Lateness") + .blurb("Amount of milliseconds to introduce as lateness") + .default_value(DEFAULT_LATENESS_MS) + .build(), + glib::ParamSpecString::builder("url") + .nick("URL") + .blurb("URL of the transcription server") + .default_value("ws://0.0.0.0:9000") + .build(), + gst::ParamSpecArray::builder("additional-vocabulary") + .nick("Additional Vocabulary") + .blurb("Additional vocabulary speechmatics should use") + .element_spec( + &glib::ParamSpecBoxed::builder::("vocable") + .nick("Vocable") + .blurb("A vocable in the vocabulary") + .build(), + ) + .mutable_ready() + .build(), + glib::ParamSpecString::builder("api-key") + .nick("API Key") + .blurb("Speechmatics API Key") + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn constructed(&self) { + self.parent_constructed(); + + let obj = self.obj(); + obj.add_pad(&self.sinkpad).unwrap(); + + let templ = obj.class().pad_template("src").unwrap(); + let srcpad: super::TranscriberSrcPad = gst::PadBuilder::from_template(&templ) + .activatemode_function(|pad, parent, mode, active| { + Transcriber::catch_panic_pad_function( + parent, + || { + Err(gst::loggable_error!( + CAT, + "Panic activating src pad with mode" + )) + }, + |transcriber| transcriber.src_activatemode(pad, mode, active), + ) + }) + .query_function(|pad, parent, query| { + Transcriber::catch_panic_pad_function( + parent, + || false, + |transcriber| transcriber.src_query(pad, query), + ) + }) + .flags(gst::PadFlags::FIXED_CAPS) + .build(); + obj.add_pad(&srcpad).unwrap(); + self.state.lock().unwrap().srcpads.insert(srcpad); + obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK); + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "language-code" => { + let mut settings = self.settings.lock().unwrap(); + settings.language_code = value.get().expect("type checked upstream"); + } + "latency" => { + let mut settings = self.settings.lock().unwrap(); + let old_latency_ms = settings.latency_ms; + settings.latency_ms = value.get().expect("type checked upstream"); + if settings.latency_ms != old_latency_ms { + gst::debug!(CAT, imp = self, "Latency changed: {}", settings.latency_ms); + drop(settings); + let _ = self + .obj() + .post_message(gst::message::Latency::builder().src(&*self.obj()).build()); + } + } + "lateness" => { + let mut settings = self.settings.lock().unwrap(); + settings.lateness_ms = value.get().expect("type checked upstream"); + } + "url" => { + let mut settings = self.settings.lock().unwrap(); + settings.url = value.get().expect("type checked upstream"); + } + "additional-vocabulary" => { + let mut state = self.state.lock().unwrap(); + state.additional_vocabulary = vec![]; + let vocables: gst::Array = value.get().expect("type checked upstream"); + for vocable in vocables.as_slice() { + let Some(s) = vocable + .get::>() + .expect("type checked upstream") + else { + continue; + }; + + let Ok(content) = s.get::("word") else { + gst::warning!( + CAT, + imp = self, + "skipping vocable: {s}, expected word field", + ); + continue; + }; + + let sounds_like: Vec = match s.get::("sounds_like") { + Ok(sounds_like) => sounds_like + .as_slice() + .iter() + .filter_map(|s| s.get::>().unwrap_or(None)) + .collect(), + Err(_) => vec![], + }; + + state.additional_vocabulary.push(Vocable { + content, + sounds_like, + }); + } + } + "api-key" => { + let mut settings = self.settings.lock().unwrap(); + settings.api_key = value.get().expect("type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "language-code" => { + let settings = self.settings.lock().unwrap(); + settings.language_code.to_value() + } + "latency" => { + let settings = self.settings.lock().unwrap(); + settings.latency_ms.to_value() + } + "lateness" => { + let settings = self.settings.lock().unwrap(); + settings.lateness_ms.to_value() + } + "url" => { + let settings = self.settings.lock().unwrap(); + settings.url.to_value() + } + "additional-vocabulary" => { + let state = self.state.lock().unwrap(); + let mut additional_vocabulary = vec![]; + for vocable in &state.additional_vocabulary { + let mut s = gst::Structure::new_empty(&vocable.content); + if !vocable.sounds_like.is_empty() { + s.set( + "sounds_like", + gst::Array::new( + vocable.sounds_like.iter().map(|word| word.to_send_value()), + ), + ); + } + additional_vocabulary.push(s.to_send_value()); + } + gst::Array::new(additional_vocabulary).to_value() + } + "api-key" => { + let settings = self.settings.lock().unwrap(); + settings.api_key.to_value() + } + _ => unimplemented!(), + } + } +} + +impl ElementImpl for Transcriber { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "Transcriber", + "Audio/Text/Filter", + "Speech to Text filter, using Speechmatics transcribe", + "Mathieu Duponchelle ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let src_caps = gst::Caps::builder("text/x-raw") + .field("format", "utf8") + .build(); + let src_pad_template = gst::PadTemplate::with_gtype( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &src_caps, + super::TranscriberSrcPad::static_type(), + ) + .unwrap(); + let req_src_pad_template = gst::PadTemplate::with_gtype( + "translate_src_%u", + gst::PadDirection::Src, + gst::PadPresence::Request, + &src_caps, + super::TranscriberSrcPad::static_type(), + ) + .unwrap(); + + let sink_caps = gst_audio::AudioCapsBuilder::new() + .format(gst_audio::AUDIO_FORMAT_S16) + .rate_range(8000..=48000) + .channels(1) + .build(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &sink_caps, + ) + .unwrap(); + + vec![src_pad_template, req_src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn request_new_pad( + &self, + templ: &gst::PadTemplate, + _name: Option<&str>, + _caps: Option<&gst::Caps>, + ) -> Option { + let mut state = self.state.lock().unwrap(); + + let pad: super::TranscriberSrcPad = gst::PadBuilder::from_template(templ) + .activatemode_function(|pad, parent, mode, active| { + Transcriber::catch_panic_pad_function( + parent, + || { + Err(gst::loggable_error!( + CAT, + "Panic activating src pad with mode" + )) + }, + |transcriber| transcriber.src_activatemode(pad, mode, active), + ) + }) + .query_function(|pad, parent, query| { + Transcriber::catch_panic_pad_function( + parent, + || false, + |transcriber| transcriber.src_query(pad, query), + ) + }) + .name(format!("translate_src_{}", state.pad_serial).as_str()) + .flags(gst::PadFlags::FIXED_CAPS) + .build(); + + state.srcpads.insert(pad.clone()); + + gst::info!(CAT, "New pad requested, {}", state.srcpads.len()); + + state.pad_serial += 1; + drop(state); + + self.obj().add_pad(&pad).unwrap(); + + self.obj().child_added(&pad, &pad.name()); + + Some(pad.upcast()) + } + + fn release_pad(&self, pad: &gst::Pad) { + pad.set_active(false).unwrap(); + self.obj().remove_pad(pad).unwrap(); + + self.obj().child_removed(pad, &pad.name()); + let _ = self + .obj() + .post_message(gst::message::Latency::builder().src(&*self.obj()).build()); + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + gst::info!(CAT, imp = self, "Changing state {:?}", transition); + + if transition == gst::StateChange::PausedToReady { + self.disconnect().map_err(|err| { + self.post_error_message(err); + gst::StateChangeError + })?; + } + + let mut success = self.parent_change_state(transition)?; + + match transition { + gst::StateChange::ReadyToPaused => { + success = gst::StateChangeSuccess::NoPreroll; + } + gst::StateChange::PlayingToPaused => { + success = gst::StateChangeSuccess::NoPreroll; + } + _ => (), + } + + Ok(success) + } + + fn provide_clock(&self) -> Option { + Some(gst::SystemClock::obtain()) + } +} + +#[derive(Debug, Default, Clone)] +struct TranscriberSrcPadSettings { + language_code: Option, +} + +#[derive(Debug)] +struct TranscriberSrcPadState { + sender: Option>, + accumulator: Option, + buffers: VecDeque, + discont: bool, + send_eos: bool, + out_segment: gst::FormattedSegment, + seqnum: gst::Seqnum, +} + +impl Default for TranscriberSrcPadState { + fn default() -> Self { + Self { + sender: None, + accumulator: None, + buffers: VecDeque::new(), + discont: true, + send_eos: false, + out_segment: gst::FormattedSegment::new(), + seqnum: gst::Seqnum::next(), + } + } +} + +#[derive(Debug, Default)] +pub struct TranscriberSrcPad { + settings: Mutex, + state: Mutex, +} + +impl TranscriberSrcPadState { + fn push_buffer(&mut self, mut buf: gst::Buffer) { + if self.discont { + let buf = buf.make_mut(); + buf.set_flags(gst::BufferFlags::DISCONT); + self.discont = false; + } + + self.buffers.push_back(buf); + } +} + +#[glib::object_subclass] +impl ObjectSubclass for TranscriberSrcPad { + const NAME: &'static str = "GstSpeechmaticsTranscriberSrcPad"; + type Type = super::TranscriberSrcPad; + type ParentType = gst::Pad; + + fn new() -> Self { + Default::default() + } +} + +const OUTPUT_LANG_CODE_PROPERTY: &str = "language-code"; +const DEFAULT_OUTPUT_LANG_CODE: Option<&str> = None; + +impl ObjectImpl for TranscriberSrcPad { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![glib::ParamSpecString::builder(OUTPUT_LANG_CODE_PROPERTY) + .nick("Language Code") + .blurb("The Language the Stream must be translated to") + .default_value(DEFAULT_OUTPUT_LANG_CODE) + .mutable_ready() + .build()] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + OUTPUT_LANG_CODE_PROPERTY => { + self.settings.lock().unwrap().language_code = value.get().unwrap() + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + OUTPUT_LANG_CODE_PROPERTY => self.settings.lock().unwrap().language_code.to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for TranscriberSrcPad {} + +impl PadImpl for TranscriberSrcPad {} diff --git a/audio/speechmatics/src/transcriber/mod.rs b/audio/speechmatics/src/transcriber/mod.rs new file mode 100644 index 00000000..a39a17f8 --- /dev/null +++ b/audio/speechmatics/src/transcriber/mod.rs @@ -0,0 +1,33 @@ +// Copyright (C) 2024 Mathieu Duponchelle +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct Transcriber(ObjectSubclass) @extends gst::Element, gst::Object, @implements gst::ChildProxy; +} + +glib::wrapper! { + pub struct TranscriberSrcPad(ObjectSubclass) @extends gst::Pad, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + TranscriberSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } + gst::Element::register( + Some(plugin), + "speechmaticstranscriber", + gst::Rank::NONE, + Transcriber::static_type(), + ) +} diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 8f3fc046..32e76475 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -11246,6 +11246,155 @@ "tracers": {}, "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" }, + "speechmatics": { + "description": "GStreamer Speechmatics plugin", + "elements": { + "speechmaticstranscriber": { + "author": "Mathieu Duponchelle ", + "description": "Speech to Text filter, using Speechmatics transcribe", + "hierarchy": [ + "GstSpeechmaticsTranscriber", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "interfaces": [ + "GstChildProxy" + ], + "klass": "Audio/Text/Filter", + "pad-templates": { + "sink": { + "caps": "audio/x-raw:\n rate: [ 8000, 48000 ]\n channels: 1\n layout: { (string)interleaved, (string)non-interleaved }\n format: S16LE\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "text/x-raw:\n format: utf8\n", + "direction": "src", + "presence": "always", + "type": "GstSpeechmaticsTranscriberSrcPad" + }, + "translate_src_%%u": { + "caps": "text/x-raw:\n format: utf8\n", + "direction": "src", + "presence": "request", + "type": "GstSpeechmaticsTranscriberSrcPad" + } + }, + "properties": { + "additional-vocabulary": { + "blurb": "Additional vocabulary speechmatics should use", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "ready", + "readable": true, + "type": "GstValueArray", + "writable": true + }, + "api-key": { + "blurb": "Speechmatics API Key", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + }, + "language-code": { + "blurb": "The Language of the Stream, ISO code", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "en", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "latency": { + "blurb": "Amount of milliseconds to allow for transcription", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "8000", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, + "lateness": { + "blurb": "Amount of milliseconds to introduce as lateness", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, + "url": { + "blurb": "URL of the transcription server", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "ws://0.0.0.0:9000", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + } + }, + "rank": "none" + } + }, + "filename": "gstspeechmatics", + "license": "Proprietary", + "other-types": { + "GstSpeechmaticsTranscriberSrcPad": { + "hierarchy": [ + "GstSpeechmaticsTranscriberSrcPad", + "GstPad", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "kind": "object", + "properties": { + "language-code": { + "blurb": "The Language the Stream must be translated to", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + } + } + } + }, + "package": "gst-plugin-speechmatics", + "source": "gst-plugin-speechmatics", + "tracers": {}, + "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" + }, "spotify": { "description": "GStreamer Spotify Plugin", "elements": { diff --git a/meson.build b/meson.build index 2df0d8e9..b11de095 100644 --- a/meson.build +++ b/meson.build @@ -207,6 +207,7 @@ plugins = { }, 'gopbuffer': {'library': 'libgstgopbuffer'}, 'quinn': {'library': 'libgstquinn'}, + 'speechmatics': {'library': 'libgstspeechmatics'}, } # Won't build on platforms where it bundles the sources because of: diff --git a/meson_options.txt b/meson_options.txt index 5f988fa7..ca0315cd 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -6,6 +6,7 @@ option('claxon', type: 'feature', value: 'auto', description: 'Build claxon plug option('csound', type: 'feature', value: 'auto', description: 'Build csound plugin') option('lewton', type: 'feature', value: 'auto', description: 'Build lewton plugin') option('spotify', type: 'feature', value: 'auto', description: 'Build spotify plugin') +option('speechmatics', type: 'feature', value: 'auto', description: 'Build speechmatics plugin') # generic option('file', type: 'feature', value: 'auto', description: 'Build file plugin')