diff --git a/Cargo.lock b/Cargo.lock index b08a1649a..95859e24d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2807,6 +2807,22 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "gst-plugin-elevenlabs" +version = "0.14.0-alpha.1" +dependencies = [ + "anyhow", + "futures", + "gst-plugin-version-helper", + "gstreamer", + "gstreamer-audio", + "gstreamer-base", + "reqwest 0.12.20", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "gst-plugin-fallbackswitch" version = "0.14.0-alpha.1" diff --git a/Cargo.toml b/Cargo.toml index 9a857a034..d1b22e7a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "audio/audiofx", "audio/claxon", "audio/csound", + "audio/elevenlabs", "audio/lewton", "audio/speechmatics", "audio/spotify", diff --git a/audio/elevenlabs/Cargo.toml b/audio/elevenlabs/Cargo.toml new file mode 100644 index 000000000..b43edf6e2 --- /dev/null +++ b/audio/elevenlabs/Cargo.toml @@ -0,0 +1,47 @@ +[package] +name = "gst-plugin-elevenlabs" +version.workspace = true +authors = ["Mathieu Duponchelle "] +repository.workspace = true +license = "MPL-2.0" +description = "GStreamer ElevenLabs plugin" +edition.workspace = true +rust-version.workspace = true + +[dependencies] +futures = "0.3" +gst = { workspace = true, features = ["v1_20"] } +gst-base.workspace = true +gst-audio = { workspace = true, features = ["v1_16"] } +tokio = { version = "1", features = [ "full" ] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +anyhow = "1" +reqwest = { version = "0.12" } + +[lib] +name = "gstelevenlabs" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[build-dependencies] +gst-plugin-version-helper.workspace = true + +[features] +static = [] +capi = [] +doc = ["gst/v1_18"] + +[package.metadata.capi] +min_version = "0.9.21" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false +import_library = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-audio-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/audio/elevenlabs/build.rs b/audio/elevenlabs/build.rs new file mode 100644 index 000000000..cda12e57e --- /dev/null +++ b/audio/elevenlabs/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} diff --git a/audio/elevenlabs/src/lib.rs b/audio/elevenlabs/src/lib.rs new file mode 100644 index 000000000..c3e294044 --- /dev/null +++ b/audio/elevenlabs/src/lib.rs @@ -0,0 +1,36 @@ +// Copyright (C) 2025 Mathieu Duponchelle +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 +#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)] +#![recursion_limit = "128"] + +/** + * plugin-elevenlabs: + * + * Since: plugins-rs-0.14.0 + */ +use gst::glib; + +mod synthesizer; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + synthesizer::register(plugin)?; + + Ok(()) +} + +gst::plugin_define!( + elevenlabs, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "Proprietary", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/audio/elevenlabs/src/synthesizer/imp.rs b/audio/elevenlabs/src/synthesizer/imp.rs new file mode 100644 index 000000000..fac0c3b5c --- /dev/null +++ b/audio/elevenlabs/src/synthesizer/imp.rs @@ -0,0 +1,1071 @@ +// Copyright (C) 2025 Mathieu Duponchelle +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +//! ElevenLabs TTS element. +//! +//! This element calls ElevenLabs to generate audio speech from text. +//! +//! It makes use of the POST API because one of the design goals is to +//! preserve the exact timestamping of the input data, and while this would +//! be possible with the websockets API it would make the implementation much +//! more complex. +//! +//! Control over the latency would also be made more complex, as ElevenLabs +//! `chunk_length_schedule` would have forced us to use a timeout and send +//! flushing requests from time to time in the absence of input. +//! +//! Example usage with srt file as input: +//! +//! ``` +//! gst-launch-1.0 filesrc location=/home/meh/Documents/chaplin-fr-shifted.srt ! \ +//! subparse ! clocksync ! queue ! \ +//! elevenlabssynthesizer voice-id=kENkNtk0xyzG09WW40xE overflow=shift api-key=XXX ! autoaudiosink +//! ``` + +use gst::subclass::prelude::*; +use gst::{glib, prelude::*}; + +use futures::future::{abortable, AbortHandle}; +use reqwest::Response; +use reqwest::{ + header::{HeaderMap, HeaderValue}, + Client, +}; +use tokio::runtime; + +use std::collections::VecDeque; +use std::sync::{LazyLock, Mutex}; + +use anyhow::{anyhow, Error}; + +use super::Overflow; + +#[derive(serde::Serialize, Debug)] +struct VoiceSettings { + #[serde(skip_serializing_if = "Option::is_none")] + speed: Option, +} + +#[derive(serde::Serialize, Debug)] +struct SendText { + text: String, + #[serde(skip_serializing_if = "Option::is_none")] + language_code: Option, + model_id: String, + previous_request_ids: Vec, + voice_settings: VoiceSettings, +} + +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "elevenlabssynthesizer", + gst::DebugColorFlags::empty(), + Some("ElevenLabs Text to Speech element"), + ) +}); + +static RUNTIME: LazyLock = LazyLock::new(|| { + runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(1) + .build() + .unwrap() +}); + +const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_seconds(2); +const DEFAULT_OVERFLOW: Overflow = Overflow::Clip; +const DEFAULT_API_KEY: Option<&str> = None; +const DEFAULT_VOICE_ID: &str = "9BWtsMINqrJLrRacOk9x"; // Aria +const DEFAULT_MODEL_ID: &str = "eleven_flash_v2_5"; +const DEFAULT_LANGUAGE_CODE: Option<&str> = None; +const DEFAULT_RETRY_WITH_SPEED: bool = true; + +// https://elevenlabs.io/docs/api-reference/text-to-speech/convert: +// +// > A maximum of 3 request_ids can be send. +const MAX_PREVIOUS_REQUEST_IDS: usize = 3; + +#[derive(Debug, Clone)] +pub(super) struct Settings { + latency: gst::ClockTime, + overflow: Overflow, + api_key: Option, + voice_id: String, + model_id: String, + language_code: Option, + retry_with_speed: bool, +} + +impl Default for Settings { + fn default() -> Self { + Self { + latency: DEFAULT_LATENCY, + overflow: DEFAULT_OVERFLOW, + api_key: DEFAULT_API_KEY.map(String::from), + voice_id: String::from(DEFAULT_VOICE_ID), + model_id: String::from(DEFAULT_MODEL_ID), + language_code: DEFAULT_LANGUAGE_CODE.map(String::from), + retry_with_speed: DEFAULT_RETRY_WITH_SPEED, + } + } +} + +struct State { + out_segment: gst::FormattedSegment, + client: Option, + send_abort_handle: Option, + previous_request_ids: VecDeque, + outcaps: Option, + upstream_is_live: Option, +} + +impl Default for State { + fn default() -> Self { + Self { + out_segment: gst::FormattedSegment::new(), + client: None, + send_abort_handle: None, + previous_request_ids: VecDeque::new(), + outcaps: None, + upstream_is_live: None, + } + } +} + +pub struct Synthesizer { + srcpad: gst::Pad, + sinkpad: gst::Pad, + settings: Mutex, + state: Mutex, +} + +async fn send_first_request( + client: Client, + voice_id: String, + model_id: String, +) -> Result { + let url = + format!("https://api.elevenlabs.io/v1/text-to-speech/{voice_id}?output_format=pcm_22050"); + let body = serde_json::to_string(&SendText { + text: String::from("first"), + language_code: None, + model_id, + voice_settings: VoiceSettings { speed: None }, + previous_request_ids: vec![], + }) + .unwrap(); + + client + .post(url) + .body(body) + .send() + .await + .map_err(|err| anyhow!("failed sending request: {err}")) +} + +impl Synthesizer { + fn upstream_is_live(&self, state: &mut State) -> bool { + if let Some(upstream_is_live) = state.upstream_is_live { + return upstream_is_live; + } + + let mut peer_query = gst::query::Latency::new(); + + let ret = self.sinkpad.peer_query(&mut peer_query); + + if ret { + let (is_live, _, _) = peer_query.result(); + gst::info!(CAT, imp = self, "queried upstream liveness: {is_live}"); + + state.upstream_is_live = Some(is_live); + + is_live + } else { + gst::trace!(CAT, imp = self, "could not query upstream latency"); + + false + } + } + + fn negotiate(&self) -> Result { + let mut allowed_caps = self + .srcpad + .allowed_caps() + .unwrap_or_else(|| self.srcpad.pad_template_caps()); + + allowed_caps.fixate(); + + self.state.lock().unwrap().outcaps = Some(allowed_caps.clone()); + + gst::debug!(CAT, imp = self, "negotiated output caps {}", allowed_caps); + + Ok(allowed_caps) + } + + fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + gst::log!(CAT, obj = pad, "Handling event {event:?}"); + + use gst::EventView::*; + match event.view() { + FlushStart(_) => { + gst::info!(CAT, imp = self, "Received flush start, disconnecting"); + let ret = gst::Pad::event_default(pad, Some(&*self.obj()), event); + self.disconnect(); + ret + } + Segment(e) => { + let segment = match e.segment().clone().downcast::() { + Err(segment) => { + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Only Time segments supported, got {:?}", segment.format(),] + ); + return false; + } + Ok(segment) => segment, + }; + self.state.lock().unwrap().out_segment = segment; + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } + Caps(c) => { + let caps = match self.negotiate() { + Ok(caps) => caps, + Err(err) => { + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["negotiation failed: {err}"] + ); + return false; + } + }; + + let event = gst::event::Caps::builder(&caps).seqnum(c.seqnum()).build(); + + self.srcpad.push_event(event) + } + Gap(g) => { + let (pts, duration) = g.get(); + + let mut state = self.state.lock().unwrap(); + state.out_segment.set_position(match duration { + Some(duration) => duration + pts, + _ => pts, + }); + drop(state); + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } + _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), + } + } + + async fn send( + &self, + content: String, + mut pts: gst::ClockTime, + input_duration: gst::ClockTime, + ) -> Result, Error> { + let (voice_id, model_id, language_code, retry_with_speed) = { + let settings = self.settings.lock().unwrap(); + + ( + settings.voice_id.clone(), + settings.model_id.clone(), + settings.language_code.as_ref().cloned(), + settings.retry_with_speed, + ) + }; + + let (client, previous_request_ids, out_info, out_segment, upstream_is_live) = { + let mut state = self.state.lock().unwrap(); + + let Some(client) = state.client.as_ref().cloned() else { + return Ok(None); + }; + + ( + client, + state.previous_request_ids.clone(), + gst_audio::AudioInfo::from_caps(state.outcaps.as_ref().expect("negotiated")) + .unwrap(), + state.out_segment.clone(), + self.upstream_is_live(&mut state), + ) + }; + + let Some(buffer_rtime) = out_segment.to_running_time(pts) else { + gst::warning!( + CAT, + imp = self, + "buffer PTS {pts} not in segment {out_segment:?}" + ); + return Ok(None); + }; + + let output_format = format!("pcm_{}", out_info.rate()); + + let bytes_per_second = (out_info.bpf() * out_info.rate()) as u64; + + let max_expected_bytes = input_duration + .nseconds() + .mul_div_floor(bytes_per_second, 1_000_000_000) + .unwrap() + / 2 + * 2; + + let url = format!( + "https://api.elevenlabs.io/v1/text-to-speech/{voice_id}?output_format={output_format}" + ); + + gst::debug!(CAT, imp = self, "sending request to {url} for {content}"); + + let mut speed: Option = None; + + let (mut bytes, request_id) = loop { + let job = { + let body = serde_json::to_string(&SendText { + text: content.clone(), + language_code: language_code.clone(), + model_id: model_id.clone(), + voice_settings: VoiceSettings { speed }, + previous_request_ids: previous_request_ids.clone().into(), + }) + .unwrap(); + client.post(&url).body(body).send() + }; + + let resp = job.await.map_err(|err| { + gst::error!(CAT, imp = self, "Failed sending text chunk: {}", err); + err + })?; + + gst::trace!(CAT, "response: {:?}", resp); + + if !resp.status().is_success() { + gst::error!(CAT, imp = self, "Request failed: {}", resp.status()); + let status = resp.status(); + if let Ok(text) = resp.text().await { + return Err(anyhow!("Request failed: {} ({})", status, text)); + } else { + return Err(anyhow!("Request failed: {}", status)); + } + } + + let request_id = resp + .headers() + .get("request-id") + .and_then(|h| h.to_str().ok()) + .map(|id| id.to_string()); + + let bytes = resp + .bytes() + .await + .map_err(|err| anyhow!("Failed getting response bytes: {err}"))?; + + let n_bytes = bytes.len() as u64; + + gst::trace!(CAT, "n_bytes with speed {:?}: {}", speed, n_bytes); + + if retry_with_speed && speed.is_none() && n_bytes > max_expected_bytes { + let new_speed: f64 = (n_bytes as f64 / max_expected_bytes as f64).min(1.2); + gst::debug!( + CAT, + "Got larger duration than expected ({} > {}), retrying with speed {}", + bytes.len(), + max_expected_bytes, + new_speed + ); + speed = Some(new_speed) + } else { + break (bytes, request_id); + } + }; + + let overflow = self.settings.lock().unwrap().overflow; + + if matches!(overflow, Overflow::Clip) { + gst::debug!( + CAT, + "Received {} bytes, max expected {}", + bytes.len(), + max_expected_bytes + ); + + bytes.truncate(max_expected_bytes as usize); + } + + let duration = gst::ClockTime::from_nseconds( + (bytes.len() as u64) + .mul_div_round(1_000_000_000, bytes_per_second) + .unwrap(), + ); + + if duration > input_duration { + gst::debug!( + CAT, + imp = self, + "received duration is superior to input duration ({duration} > {input_duration})" + ); + } + + let mut buf = gst::Buffer::from_slice(bytes); + let mut state = self.state.lock().unwrap(); + + if let Some(id) = request_id { + state.previous_request_ids.push_back(id); + while state.previous_request_ids.len() > MAX_PREVIOUS_REQUEST_IDS { + state.previous_request_ids.pop_front(); + } + } else { + gst::warning!(CAT, imp = self, "No request ID, flushing id queue"); + state.previous_request_ids.clear(); + } + + if let Some(position) = state.out_segment.position() { + if matches!(overflow, Overflow::Shift) && pts < position { + gst::debug!( + CAT, + "received pts {pts} < position {position}, shifting forward" + ); + pts = position; + } + } + + if upstream_is_live { + let current_rtime = self + .obj() + .current_running_time() + .expect("upstream is live and should have provided a clock"); + + if buffer_rtime < current_rtime { + let delta = current_rtime - buffer_rtime; + gst::warning!( + CAT, + "received running time {buffer_rtime} < current rtime {current_rtime}, shifting forward by {delta}, consider increasing latency" + ); + + pts += delta; + } + } + + let discont = state + .out_segment + .position() + .is_none_or(|position| position < pts + duration); + + { + let buf_mut = buf.get_mut().unwrap(); + buf_mut.set_pts(pts); + buf_mut.set_duration(duration); + + if let Ok(mut meta) = + gst::meta::CustomMeta::add(buf_mut, "GstScaletempoTargetDurationMeta") + { + meta.mut_structure() + .set("duration", input_duration.nseconds()); + } + + if discont { + gst::debug!(CAT, imp = self, "Marking buffer discont"); + buf_mut.set_flags(gst::BufferFlags::DISCONT); + } + } + + state.out_segment.set_position(pts + duration); + + Ok(Some(buf)) + } + + fn do_send( + &self, + content: String, + pts: gst::ClockTime, + duration: gst::ClockTime, + ) -> Result, gst::FlowError> { + let (future, abort_handle) = abortable(self.send(content, pts, duration)); + + { + let mut state = self.state.lock().unwrap(); + + if let Some(handle) = state.send_abort_handle.take() { + handle.abort(); + } + + if state.client.is_none() { + return Err(gst::FlowError::Flushing); + } + + state.send_abort_handle = Some(abort_handle); + } + + self.post_start("request", "transcription request sent"); + + let ret = match RUNTIME.block_on(future) { + Err(_) => { + gst::debug!(CAT, imp = self, "send aborted, returning flushing"); + Err(gst::FlowError::Flushing) + } + Ok(res) => match res { + Err(e) => { + gst::element_imp_error!( + self, + gst::StreamError::Failed, + ["Failed sending data: {}", e] + ); + Err(gst::FlowError::Error) + } + Ok(buf) => Ok(buf), + }, + }; + + self.state.lock().unwrap().send_abort_handle = None; + + ret + } + + fn read_buffer( + &self, + buffer: &gst::Buffer, + ) -> Result<(gst::ClockTime, gst::ClockTime, String), Error> { + let pts = buffer + .pts() + .ok_or_else(|| anyhow!("Stream with timestamped buffers required"))?; + + let duration = buffer + .duration() + .ok_or_else(|| anyhow!("Buffers of stream need to have a duration"))?; + + let data = buffer + .map_readable() + .map_err(|_| anyhow!("Can't map buffer readable"))?; + + let data = + std::str::from_utf8(&data).map_err(|err| anyhow!("Can't decode utf8: {}", err))?; + + Ok((pts, duration, data.to_owned())) + } + + fn sink_chain( + &self, + pad: &gst::Pad, + buffer: gst::Buffer, + ) -> Result { + gst::log!(CAT, obj = pad, "Handling {buffer:?}"); + + let (pts, duration, data) = self.read_buffer(&buffer).map_err(|err| { + gst::element_imp_error!(self, gst::StreamError::Failed, ["{}", err]); + gst::FlowError::Error + })?; + + let Some(mut outbuf) = self.do_send(data, pts, duration)? else { + return Ok(gst::FlowSuccess::Ok); + }; + + { + let outbuf_mut = outbuf.get_mut().unwrap(); + buffer.foreach_meta(|meta| { + if meta.tags().is_empty() { + if let Err(err) = + meta.transform(outbuf_mut, &gst::meta::MetaTransformCopy::new(false, ..)) + { + gst::trace!(CAT, imp = self, "Could not copy meta {}: {err}", meta.api()); + } + } + std::ops::ControlFlow::Continue(()) + }); + } + + self.srcpad.push(outbuf) + } + + fn sink_chain_list( + &self, + _pad: &gst::Pad, + list: gst::BufferList, + ) -> Result { + gst::debug!( + CAT, + imp = self, + "Handling buffer list with size {}", + list.len() + ); + + let mut list_pts: Option = None; + let mut list_end_pts: Option = None; + let mut list_content: Vec = vec![]; + + for buffer in list.iter_owned() { + let (pts, duration, data) = self.read_buffer(&buffer).map_err(|err| { + gst::element_imp_error!(self, gst::StreamError::Failed, ["{}", err]); + gst::FlowError::Error + })?; + + if list_pts.is_none() { + list_pts = Some(pts); + } + + list_end_pts = Some(pts + duration); + + list_content.push(data); + } + + if let Some((pts, end_pts)) = Option::zip(list_pts, list_end_pts) { + let duration = end_pts.saturating_sub(pts); + + let content = list_content.join(" "); + + let Some(mut outbuf) = self.do_send(content, pts, duration)? else { + return Ok(gst::FlowSuccess::Ok); + }; + + { + let outbuf_mut = outbuf.get_mut().unwrap(); + for buffer in list.iter() { + buffer.foreach_meta(|meta| { + if meta.tags().is_empty() { + if let Err(err) = meta.transform( + outbuf_mut, + &gst::meta::MetaTransformCopy::new(false, ..), + ) { + gst::trace!( + CAT, + imp = self, + "Could not copy meta {}: {err}", + meta.api() + ); + } + } + std::ops::ControlFlow::Continue(()) + }); + } + } + + self.srcpad.push(outbuf) + } else { + Ok(gst::FlowSuccess::Ok) + } + } + + fn ensure_connection(&self) -> Result<(), gst::ErrorMessage> { + let settings = self.settings.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + if state.client.is_none() { + let Some(api_key) = settings.api_key.clone() else { + return Err(gst::error_msg!( + gst::CoreError::Failed, + ["An API key is required"] + )); + }; + + let mut headers = HeaderMap::new(); + + let api_key_header = match HeaderValue::from_str(&api_key) { + Ok(header) => header, + Err(err) => { + return Err(gst::error_msg!( + gst::CoreError::Failed, + ["A valid string is required for the API key: {err}"] + )); + } + }; + + headers.insert("xi-api-key", api_key_header); + headers.insert("Content-Type", HeaderValue::from_static("application/json")); + + state.client = Some(Client::builder().default_headers(headers).build().map_err( + |err| gst::error_msg!(gst::CoreError::Failed, ["Failed to create client: {err}"]), + )?); + } + Ok(()) + } + + fn prepare(&self) -> Result<(), gst::ErrorMessage> { + gst::debug!(CAT, imp = self, "Preparing"); + + gst::debug!(CAT, imp = self, "Prepared"); + + Ok(()) + } + + fn disconnect(&self) { + gst::info!(CAT, imp = self, "Disconnecting"); + let mut state = self.state.lock().unwrap(); + + state.client = None; + + if let Some(abort_handle) = state.send_abort_handle.take() { + abort_handle.abort(); + } + + *state = State::default(); + + gst::info!(CAT, imp = self, "Disconnected"); + } + + fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { + gst::log!(CAT, obj = pad, "Handling query {:?}", query); + + match query.view_mut() { + gst::QueryViewMut::Latency(ref mut q) => { + let mut peer_query = gst::query::Latency::new(); + + let ret = self.sinkpad.peer_query(&mut peer_query); + + if ret { + let (live, min, max) = peer_query.result(); + let our_latency = self.settings.lock().unwrap().latency; + + if live { + q.set(true, min + our_latency, max.map(|max| max + our_latency)); + } else { + q.set(live, min, max); + } + } + ret + } + gst::QueryViewMut::Position(ref mut q) => { + if q.format() == gst::Format::Time { + let state = self.state.lock().unwrap(); + q.set( + state + .out_segment + .to_stream_time(state.out_segment.position()), + ); + true + } else { + false + } + } + _ => gst::Pad::query_default(pad, Some(&*self.obj()), query), + } + } + + fn post_start(&self, code: &str, text: &str) { + let obj = self.obj(); + let msg = gst::message::Progress::builder(gst::ProgressType::Start, code, text) + .src(&*obj) + .build(); + let _ = obj.post_message(msg); + } + + fn post_complete(&self, code: &str, text: &str) { + let obj = self.obj(); + let msg = gst::message::Progress::builder(gst::ProgressType::Complete, code, text) + .src(&*obj) + .build(); + let _ = obj.post_message(msg); + } + + fn post_cancelled(&self, code: &str, text: &str) { + let obj = self.obj(); + let msg = gst::message::Progress::builder(gst::ProgressType::Canceled, code, text) + .src(&*obj) + .build(); + let _ = obj.post_message(msg); + } +} + +#[glib::object_subclass] +impl ObjectSubclass for Synthesizer { + const NAME: &'static str = "GstElevenLabsSynthesizer"; + type Type = super::Synthesizer; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_from_template(&templ) + .chain_function(|pad, parent, buffer| { + Synthesizer::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |imp| imp.sink_chain(pad, buffer), + ) + }) + .chain_list_function(|pad, parent, list| { + Synthesizer::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |imp| imp.sink_chain_list(pad, list), + ) + }) + .event_function(|pad, parent, event| { + Synthesizer::catch_panic_pad_function( + parent, + || false, + |imp| imp.sink_event(pad, event), + ) + }) + .build(); + + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::PadBuilder::::from_template(&templ) + .query_function(|pad, parent, query| { + Synthesizer::catch_panic_pad_function( + parent, + || false, + |synthesizer| synthesizer.src_query(pad, query), + ) + }) + .flags(gst::PadFlags::FIXED_CAPS) + .build(); + + Self { + srcpad, + sinkpad, + settings: Default::default(), + state: Default::default(), + } + } +} + +impl ObjectImpl for Synthesizer { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: LazyLock> = LazyLock::new(|| { + vec![ + glib::ParamSpecUInt::builder("latency") + .nick("Latency") + .blurb("Amount of milliseconds to allow ElevenLabs") + .default_value(DEFAULT_LATENCY.mseconds() as u32) + .mutable_ready() + .build(), + glib::ParamSpecEnum::builder_with_default("overflow", DEFAULT_OVERFLOW) + .nick("Overflow") + .blurb("Defines how output audio with a longer duration than input text should be handled") + .mutable_ready() + .build(), + glib::ParamSpecString::builder("api-key") + .nick("API Key") + .blurb("ElevenLabs API Key") + .mutable_ready() + .build(), + glib::ParamSpecString::builder("voice-id") + .nick("Voice ID") + .blurb("ElevenLabs Voice ID, see https://elevenlabs.io/app/voice-library") + .mutable_ready() + .build(), + glib::ParamSpecString::builder("model-id") + .nick("Model ID") + .blurb("ElevenLabs Model ID, see https://help.elevenlabs.io/hc/en-us/articles/21811236079505-How-do-I-find-the-model-ID") + .mutable_ready() + .build(), + glib::ParamSpecString::builder("language-code") + .nick("Language Code") + .blurb("An optional language code (ISO 639-1), useful with certain models") + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("retry-with-speed") + .nick("Retry with Speed") + .blurb("When synthesis results in larger duration, retry with higher speed") + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn constructed(&self) { + self.parent_constructed(); + + let obj = self.obj(); + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "latency" => { + let mut settings = self.settings.lock().unwrap(); + settings.latency = gst::ClockTime::from_mseconds( + value.get::().expect("type checked upstream").into(), + ); + } + "overflow" => { + let mut settings = self.settings.lock().unwrap(); + settings.overflow = value.get::().expect("type checked upstream"); + } + "api-key" => { + let mut settings = self.settings.lock().unwrap(); + settings.api_key = value.get().expect("type checked upstream"); + } + "voice-id" => { + let mut settings = self.settings.lock().unwrap(); + settings.voice_id = value.get().expect("type checked upstream"); + } + "model-id" => { + let mut settings = self.settings.lock().unwrap(); + settings.model_id = value.get().expect("type checked upstream"); + } + "language-code" => { + let mut settings = self.settings.lock().unwrap(); + settings.language_code = value.get().expect("type checked upstream"); + } + "retry-with-speed" => { + let mut settings = self.settings.lock().unwrap(); + settings.retry_with_speed = value.get().expect("type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "latency" => { + let settings = self.settings.lock().unwrap(); + (settings.latency.mseconds() as u32).to_value() + } + "overflow" => { + let settings = self.settings.lock().unwrap(); + settings.overflow.to_value() + } + "api-key" => { + let settings = self.settings.lock().unwrap(); + settings.api_key.to_value() + } + "voice-id" => { + let settings = self.settings.lock().unwrap(); + settings.voice_id.to_value() + } + "model-id" => { + let settings = self.settings.lock().unwrap(); + settings.model_id.to_value() + } + "language-code" => { + let settings = self.settings.lock().unwrap(); + settings.language_code.to_value() + } + "retry-with-speed" => { + let settings = self.settings.lock().unwrap(); + settings.retry_with_speed.to_value() + } + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for Synthesizer {} + +impl ElementImpl for Synthesizer { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + gst::subclass::ElementMetadata::new( + "Synthesizer", + "Audio/Text/Filter", + "Text to Speech filter, using ElevenLabs", + "Mathieu Duponchelle ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + let sink_caps = gst::Caps::builder_full() + .structure( + gst::Structure::builder("text/x-raw") + .field("format", "utf8") + .build(), + ) + .build(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &sink_caps, + ) + .unwrap(); + + let src_caps = gst_audio::AudioCapsBuilder::new() + .format(gst_audio::AudioFormat::S16le) + .rate_list([22_050, 48_000, 44_100, 24_000, 16_000, 8_000]) + .channels(1) + .layout(gst_audio::AudioLayout::Interleaved) + .build(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &src_caps, + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + gst::info!(CAT, imp = self, "Changing state {transition:?}"); + + match transition { + gst::StateChange::NullToReady => { + self.prepare().map_err(|err| { + self.post_error_message(err); + gst::StateChangeError + })?; + } + gst::StateChange::ReadyToPaused => { + // Loading up a new voice can cause unpredictable latency + // (up to 5 seconds or more), so we always emit a first request + // here, the application can then listen to progress messages + // before switching to PLAYING + self.ensure_connection().map_err(|err| { + self.post_error_message(err); + gst::StateChangeError + })?; + + let (voice_id, model_id) = { + let settings = self.settings.lock().unwrap(); + + (settings.voice_id.clone(), settings.model_id.clone()) + }; + + let client = { + let state = self.state.lock().unwrap(); + + state.client.as_ref().expect("connected").clone() + }; + + let (future, abort_handle) = + abortable(send_first_request(client, voice_id, model_id)); + + self.state.lock().unwrap().send_abort_handle = Some(abort_handle); + + self.post_start("request", "first request sent"); + let this_weak = self.downgrade(); + RUNTIME.spawn(async move { + let res = future.await; + + if let Some(this) = this_weak.upgrade() { + this.state.lock().unwrap().send_abort_handle = None; + match res { + Err(_) => { + this.post_cancelled("request", "first request cancelled"); + } + _ => { + this.post_complete("request", "first request complete"); + } + } + } + }); + } + gst::StateChange::PausedToReady => { + self.disconnect(); + } + _ => (), + } + + self.parent_change_state(transition) + } +} diff --git a/audio/elevenlabs/src/synthesizer/mod.rs b/audio/elevenlabs/src/synthesizer/mod.rs new file mode 100644 index 000000000..e74fec921 --- /dev/null +++ b/audio/elevenlabs/src/synthesizer/mod.rs @@ -0,0 +1,42 @@ +// Copyright (C) 2025 Mathieu Duponchelle +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct Synthesizer(ObjectSubclass) @extends gst::Element, gst::Object; +} + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] +#[repr(u32)] +#[enum_type(name = "GstElevenLabsOverflow")] +#[non_exhaustive] +pub enum Overflow { + #[enum_value(name = "Clip", nick = "clip")] + Clip = 0, + #[enum_value(name = "Overlap", nick = "overlap")] + Overlap = 1, + #[enum_value(name = "Shift", nick = "shift")] + Shift = 2, +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + Overflow::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } + gst::Element::register( + Some(plugin), + "elevenlabssynthesizer", + gst::Rank::NONE, + Synthesizer::static_type(), + ) +} diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 14ee4eec2..ae67de3f2 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -2721,6 +2721,152 @@ "tracers": {}, "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" }, + "elevenlabs": { + "description": "GStreamer ElevenLabs plugin", + "elements": { + "elevenlabssynthesizer": { + "author": "Mathieu Duponchelle ", + "description": "Text to Speech filter, using ElevenLabs", + "hierarchy": [ + "GstElevenLabsSynthesizer", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Audio/Text/Filter", + "pad-templates": { + "sink": { + "caps": "text/x-raw:\n format: utf8\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "audio/x-raw:\n rate: { (int)22050, (int)48000, (int)44100, (int)24000, (int)16000, (int)8000 }\n channels: 1\n layout: interleaved\n format: S16LE\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "api-key": { + "blurb": "ElevenLabs API Key", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + }, + "language-code": { + "blurb": "An optional language code (ISO 639-1), useful with certain models", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + }, + "latency": { + "blurb": "Amount of milliseconds to allow ElevenLabs", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "2000", + "max": "-1", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + }, + "model-id": { + "blurb": "ElevenLabs Model ID, see https://help.elevenlabs.io/hc/en-us/articles/21811236079505-How-do-I-find-the-model-ID", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "eleven_flash_v2_5", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + }, + "overflow": { + "blurb": "Defines how output audio with a longer duration than input text should be handled", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "clip (0)", + "mutable": "ready", + "readable": true, + "type": "GstElevenLabsOverflow", + "writable": true + }, + "retry-with-speed": { + "blurb": "When synthesis results in larger duration, retry with higher speed", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "true", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + }, + "voice-id": { + "blurb": "ElevenLabs Voice ID, see https://elevenlabs.io/app/voice-library", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "9BWtsMINqrJLrRacOk9x", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + } + }, + "rank": "none" + } + }, + "filename": "gstelevenlabs", + "license": "Proprietary", + "other-types": { + "GstElevenLabsOverflow": { + "kind": "enum", + "values": [ + { + "desc": "Clip", + "name": "clip", + "value": "0" + }, + { + "desc": "Overlap", + "name": "overlap", + "value": "1" + }, + { + "desc": "Shift", + "name": "shift", + "value": "2" + } + ] + } + }, + "package": "gst-plugin-elevenlabs", + "source": "gst-plugin-elevenlabs", + "tracers": {}, + "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" + }, "fallbackswitch": { "description": "GStreamer Fallback Switcher and Source Plugin", "elements": { diff --git a/meson.build b/meson.build index 272c43cae..c70090b9d 100644 --- a/meson.build +++ b/meson.build @@ -197,6 +197,7 @@ plugins = { 'extra-deps': {'dav1d': ['>=1.3']}, 'features': ['gst/v1_18', 'gst-base/v1_18', 'gst-video/v1_18'], }, + 'elevenlabs': {'library': 'libgstelevenlabs'}, 'ffv1': {'library': 'libgstffv1'}, 'flavors': {'library': 'libgstrsflv'}, 'gif': { diff --git a/meson_options.txt b/meson_options.txt index 65b24641e..0f291c558 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -7,6 +7,7 @@ option('analytics', type: 'feature', value: 'auto', description: 'Build analytic option('audiofx', type: 'feature', value: 'auto', description: 'Build audiofx plugin') option('claxon', type: 'feature', value: 'auto', description: 'Build claxon plugin') option('csound', type: 'feature', value: 'auto', description: 'Build csound plugin') +option('elevenlabs', type: 'feature', value: 'auto', description: 'Build elevenlabs plugin') option('lewton', type: 'feature', value: 'auto', description: 'Build lewton plugin') option('spotify', type: 'feature', value: 'auto', description: 'Build spotify plugin') option('speechmatics', type: 'feature', value: 'auto', description: 'Build speechmatics plugin')