From 41ea936e51a8dab099d81e8a24d21b92d7e033b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Mon, 30 Jun 2025 17:57:27 +0200 Subject: [PATCH] rtpbin2: allow process-wide RUNTIME thread related configuration... ... using env vars. Part-of: --- net/rtp/src/rtpbin2/mod.rs | 100 ++++++++++++++++++++++++++++++--- net/rtp/src/rtpbin2/rtprecv.rs | 95 ++++++++++++++++++++----------- net/rtp/src/rtpbin2/rtpsend.rs | 56 ++++++++++++------ 3 files changed, 196 insertions(+), 55 deletions(-) diff --git a/net/rtp/src/rtpbin2/mod.rs b/net/rtp/src/rtpbin2/mod.rs index e1a22e2c1..1c84a87e7 100644 --- a/net/rtp/src/rtpbin2/mod.rs +++ b/net/rtp/src/rtpbin2/mod.rs @@ -2,7 +2,9 @@ use gst::glib; use gst::prelude::*; -use std::sync::LazyLock; +use std::sync::{LazyLock, OnceLock}; +use std::time::Duration; +use tokio::runtime; mod config; mod internal; mod jitterbuffer; @@ -44,10 +46,94 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { ) } -pub static RUNTIME: LazyLock = LazyLock::new(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_time() - .worker_threads(1) - .build() - .unwrap() +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "rtpbin2", + gst::DebugColorFlags::empty(), + Some("RTP bin2 plugin"), + ) }); + +/// Number of worker threads the Runtime will use (default: 1) +/// +/// 0 => number of cores available on the system +const RUNTIME_WORKER_THREADS_ENV_VAR: &str = "GST_RTPBIN2_RT_WORKER_THREADS"; + +/// Limit for the number of threads in the blocking pool (default: 512) +const RUNTIME_MAX_BLOCKING_THREADS_ENV_VAR: &str = "GST_RTPBIN2_RT_MAX_BLOCKING_THREADS"; + +/// Timeout for a thread in the blocking pool in ms (default: 10s) +const RUNTIME_THREAD_KEEP_ALIVE_MS: &str = "GST_RTPBIN2_RT_THREAD_KEEP_ALIVE"; + +#[derive(thiserror::Error, Debug, PartialEq, Eq)] +pub enum RuntimeError { + #[error("Invalid value for env var {env_var}")] + InvalidEnvVar { env_var: &'static str }, +} + +static RUNTIME: OnceLock> = OnceLock::new(); + +pub fn get_or_init_runtime<'a>() -> Result<&'a runtime::Runtime, &'a RuntimeError> { + RUNTIME + .get_or_init(|| { + fn maybe_set_env_var( + builder: &mut runtime::Builder, + env_var: &'static str, + setter: S, + set_default: D, + ) -> Result<(), RuntimeError> + where + T: std::str::FromStr + std::fmt::Display, + S: Fn(&mut runtime::Builder, T) -> &mut runtime::Builder, + D: Fn(&mut runtime::Builder) -> &mut runtime::Builder, + { + match std::env::var(env_var) { + Ok(val) => { + let Ok(val) = val.parse() else { + return Err(RuntimeError::InvalidEnvVar { env_var }); + }; + gst::info!(CAT, "Runtime: {env_var} defined => using value {val}"); + setter(builder, val); + } + Err(std::env::VarError::NotPresent) => { + gst::info!(CAT, "Runtime: {env_var} undefined => using default"); + set_default(builder); + } + _ => return Err(RuntimeError::InvalidEnvVar { env_var }), + } + + Ok(()) + } + + fn init() -> Result { + let mut builder = runtime::Builder::new_multi_thread(); + builder.enable_time(); + + maybe_set_env_var( + &mut builder, + RUNTIME_WORKER_THREADS_ENV_VAR, + |builder, val| builder.worker_threads(val), + |builder| builder.worker_threads(1), + )?; + + maybe_set_env_var( + &mut builder, + RUNTIME_MAX_BLOCKING_THREADS_ENV_VAR, + |builder, val| builder.max_blocking_threads(val), + |builder| builder.max_blocking_threads(512), + )?; + + maybe_set_env_var( + &mut builder, + RUNTIME_THREAD_KEEP_ALIVE_MS, + |builder, val| builder.thread_keep_alive(Duration::from_millis(val)), + |builder| builder.thread_keep_alive(Duration::from_secs(10)), + )?; + + Ok(builder.build().unwrap()) + } + + init() + }) + .as_ref() +} diff --git a/net/rtp/src/rtpbin2/rtprecv.rs b/net/rtp/src/rtpbin2/rtprecv.rs index 031a1c493..925ea3522 100644 --- a/net/rtp/src/rtpbin2/rtprecv.rs +++ b/net/rtp/src/rtpbin2/rtprecv.rs @@ -6,6 +6,18 @@ * * RTP session management (receiver). * + * ## Environment variables + * + * The underlying Runtime can be fine-tuned process-wide using the following env vars: + * + * * 'GST_RTPBIN2_RT_WORKER_THREADS': number of worker threads the Runtime will use (default: 1) + * 0 => number of cores available on the system. + * * 'GST_RTPBIN2_RT_MAX_BLOCKING_THREADS': limit for the number of threads in the blocking pool (default: 512). + * When they push buffers, downstream events or handle downstream queries, rtpbin2 elements spawn + * a thread from the blocking pool. This is to avoid blocking the worker thread which is shared + * with other elements running on the same runtime. + * * 'GST_RTPBIN2_RT_THREAD_KEEP_ALIVE': timeout for a thread in the blocking pool in ms (default: 10s). + * * ## Example pipeline * * |[ @@ -46,7 +58,7 @@ use super::session::{ use super::source::SourceState; use super::sync; -use crate::rtpbin2::RUNTIME; +use crate::rtpbin2; const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200); @@ -395,37 +407,49 @@ impl RecvSession { let recv_flow_combiner = recv_flow_combiner.clone(); let store = store.clone(); - RUNTIME.block_on(async move { - let mut stream = JitterBufferStream::new(store); - while let Some(item) = stream.next().await { - match item { - JitterBufferItem::PacketList(list) => { - let flow = pad.push_list(list); - gst::trace!(CAT, obj = pad, "Pushed buffer list, flow ret {:?}", flow); - let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); - let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); - // TODO: store flow, return only on session pads? - } - JitterBufferItem::Packet(buffer) => { - let flow = pad.push(buffer); - gst::trace!(CAT, obj = pad, "Pushed buffer, flow ret {:?}", flow); - let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); - let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); - // TODO: store flow, return only on session pads? - } - JitterBufferItem::Event(event) => { - let res = pad.push_event(event); - gst::trace!(CAT, obj = pad, "Pushed serialized event, result: {}", res); - } - JitterBufferItem::Query(mut query, tx) => { - // This is safe because the thread holding the original reference is waiting - // for us exclusively - let res = pad.peer_query(unsafe { query.as_mut() }); - let _ = tx.send(res); + rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .block_on(async move { + let mut stream = JitterBufferStream::new(store); + while let Some(item) = stream.next().await { + match item { + JitterBufferItem::PacketList(list) => { + let flow = pad.push_list(list); + gst::trace!( + CAT, + obj = pad, + "Pushed buffer list, flow ret {:?}", + flow + ); + let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); + let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); + // TODO: store flow, return only on session pads? + } + JitterBufferItem::Packet(buffer) => { + let flow = pad.push(buffer); + gst::trace!(CAT, obj = pad, "Pushed buffer, flow ret {:?}", flow); + let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap(); + let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow); + // TODO: store flow, return only on session pads? + } + JitterBufferItem::Event(event) => { + let res = pad.push_event(event); + gst::trace!( + CAT, + obj = pad, + "Pushed serialized event, result: {}", + res + ); + } + JitterBufferItem::Query(mut query, tx) => { + // This is safe because the thread holding the original reference is waiting + // for us exclusively + let res = pad.peer_query(unsafe { query.as_mut() }); + let _ = tx.send(res); + } } } - } - }) + }) })?; gst::debug!(CAT, obj = pad, "Task started"); @@ -456,6 +480,7 @@ impl RecvSession { pt: u8, ssrc: u32, ) -> (RtpRecvSrcPad, bool) { + let settings = rtpbin.settings.lock().unwrap(); if let Some(pad) = self .rtp_recv_srcpads .iter() @@ -504,8 +529,6 @@ impl RecvSession { srcpad.use_fixed_caps(); - let settings = rtpbin.settings.lock().unwrap(); - let recv_pad = RtpRecvSrcPad::new( pt, ssrc, @@ -2041,6 +2064,14 @@ impl ElementImpl for RtpRecv { ) -> Result { match transition { gst::StateChange::NullToReady => { + if let Err(err) = rtpbin2::get_or_init_runtime() { + self.post_error_message(gst::error_msg!( + gst::LibraryError::Settings, + ["Error initializing runtime: {err}"] + )); + return Err(gst::StateChangeError); + } + let settings = self.settings.lock().unwrap(); let mut state = self.state.lock().unwrap(); let rtp_id = settings.rtp_id.clone(); diff --git a/net/rtp/src/rtpbin2/rtpsend.rs b/net/rtp/src/rtpbin2/rtpsend.rs index 15dc718f3..247df5146 100644 --- a/net/rtp/src/rtpbin2/rtpsend.rs +++ b/net/rtp/src/rtpbin2/rtpsend.rs @@ -6,6 +6,18 @@ * * RTP session management (sender). * + * ## Environment variables + * + * The underlying Runtime can be fine-tuned process-wide using the following env vars: + * + * * 'GST_RTPBIN2_RT_WORKER_THREADS': number of worker threads the Runtime will use (default: 1) + * 0 => number of cores available on the system. + * * 'GST_RTPBIN2_RT_MAX_BLOCKING_THREADS': limit for the number of threads in the blocking pool (default: 512). + * When they push buffers, downstream events or handle downstream queries, rtpbin2 elements spawn + * a thread from the blocking pool. This is to avoid blocking the worker thread which is shared + * with other elements running on the same runtime. + * * 'GST_RTPBIN2_RT_THREAD_KEEP_ALIVE': timeout for a thread in the blocking pool in ms (default: 10s). + * * ## Example pipeline * * |[ @@ -37,7 +49,7 @@ use super::internal::{pt_clock_rate_from_caps, GstRustLogger, SharedRtpState, Sh use super::session::{RtcpSendReply, RtpProfile, SendReply, RTCP_MIN_REPORT_INTERVAL}; use super::source::SourceState; -use crate::rtpbin2::RUNTIME; +use crate::rtpbin2; const DEFAULT_MIN_RTCP_INTERVAL: Duration = RTCP_MIN_REPORT_INTERVAL; const DEFAULT_REDUCED_SIZE_RTCP: bool = false; @@ -210,10 +222,12 @@ impl SendSession { // when the plugin is statically linked. let (abort_handle, abort_registration) = AbortHandle::new_pair(); let session_id = self.internal_session.id; - RUNTIME.spawn(async move { - let future = Abortable::new(Self::rtcp_task(state, session_id), abort_registration); - future.await - }); + rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .spawn(async move { + let future = Abortable::new(Self::rtcp_task(state, session_id), abort_registration); + future.await + }); rtcp_task.replace(RtcpTask { abort_handle }); } @@ -246,17 +260,19 @@ impl SendSession { if let Some((rtcp_srcpad, data)) = send { let acquired = sem.clone().acquire_owned().await; - RUNTIME.spawn_blocking(move || { - let buffer = gst::Buffer::from_mut_slice(data); - if let Err(e) = rtcp_srcpad.push(buffer) { - gst::warning!( - CAT, - obj = rtcp_srcpad, - "Failed to send rtcp data: flow return {e:?}" - ); - } - drop(acquired); - }); + rtpbin2::get_or_init_runtime() + .expect("initialized in change_state()") + .spawn_blocking(move || { + let buffer = gst::Buffer::from_mut_slice(data); + if let Err(e) = rtcp_srcpad.push(buffer) { + gst::warning!( + CAT, + obj = rtcp_srcpad, + "Failed to send rtcp data: flow return {e:?}" + ); + } + drop(acquired); + }); } } } @@ -934,6 +950,14 @@ impl ElementImpl for RtpSend { ) -> Result { match transition { gst::StateChange::NullToReady => { + if let Err(err) = rtpbin2::get_or_init_runtime() { + self.post_error_message(gst::error_msg!( + gst::LibraryError::Settings, + ["Error initializing runtime: {err}"] + )); + return Err(gst::StateChangeError); + } + let settings = self.settings.lock().unwrap(); let rtp_id = settings.rtp_id.clone(); drop(settings);