rtpbin2: allow process-wide RUNTIME thread related configuration...

... using env vars.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2346>
This commit is contained in:
François Laignel 2025-06-30 17:57:27 +02:00 committed by GStreamer Marge Bot
parent c575c93d89
commit 41ea936e51
3 changed files with 196 additions and 55 deletions

View file

@ -2,7 +2,9 @@
use gst::glib;
use gst::prelude::*;
use std::sync::LazyLock;
use std::sync::{LazyLock, OnceLock};
use std::time::Duration;
use tokio::runtime;
mod config;
mod internal;
mod jitterbuffer;
@ -44,10 +46,94 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
)
}
pub static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_time()
.worker_threads(1)
.build()
.unwrap()
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"rtpbin2",
gst::DebugColorFlags::empty(),
Some("RTP bin2 plugin"),
)
});
/// Number of worker threads the Runtime will use (default: 1)
///
/// 0 => number of cores available on the system
const RUNTIME_WORKER_THREADS_ENV_VAR: &str = "GST_RTPBIN2_RT_WORKER_THREADS";
/// Limit for the number of threads in the blocking pool (default: 512)
const RUNTIME_MAX_BLOCKING_THREADS_ENV_VAR: &str = "GST_RTPBIN2_RT_MAX_BLOCKING_THREADS";
/// Timeout for a thread in the blocking pool in ms (default: 10s)
const RUNTIME_THREAD_KEEP_ALIVE_MS: &str = "GST_RTPBIN2_RT_THREAD_KEEP_ALIVE";
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum RuntimeError {
#[error("Invalid value for env var {env_var}")]
InvalidEnvVar { env_var: &'static str },
}
static RUNTIME: OnceLock<Result<runtime::Runtime, RuntimeError>> = OnceLock::new();
pub fn get_or_init_runtime<'a>() -> Result<&'a runtime::Runtime, &'a RuntimeError> {
RUNTIME
.get_or_init(|| {
fn maybe_set_env_var<T, S, D>(
builder: &mut runtime::Builder,
env_var: &'static str,
setter: S,
set_default: D,
) -> Result<(), RuntimeError>
where
T: std::str::FromStr + std::fmt::Display,
S: Fn(&mut runtime::Builder, T) -> &mut runtime::Builder,
D: Fn(&mut runtime::Builder) -> &mut runtime::Builder,
{
match std::env::var(env_var) {
Ok(val) => {
let Ok(val) = val.parse() else {
return Err(RuntimeError::InvalidEnvVar { env_var });
};
gst::info!(CAT, "Runtime: {env_var} defined => using value {val}");
setter(builder, val);
}
Err(std::env::VarError::NotPresent) => {
gst::info!(CAT, "Runtime: {env_var} undefined => using default");
set_default(builder);
}
_ => return Err(RuntimeError::InvalidEnvVar { env_var }),
}
Ok(())
}
fn init() -> Result<runtime::Runtime, RuntimeError> {
let mut builder = runtime::Builder::new_multi_thread();
builder.enable_time();
maybe_set_env_var(
&mut builder,
RUNTIME_WORKER_THREADS_ENV_VAR,
|builder, val| builder.worker_threads(val),
|builder| builder.worker_threads(1),
)?;
maybe_set_env_var(
&mut builder,
RUNTIME_MAX_BLOCKING_THREADS_ENV_VAR,
|builder, val| builder.max_blocking_threads(val),
|builder| builder.max_blocking_threads(512),
)?;
maybe_set_env_var(
&mut builder,
RUNTIME_THREAD_KEEP_ALIVE_MS,
|builder, val| builder.thread_keep_alive(Duration::from_millis(val)),
|builder| builder.thread_keep_alive(Duration::from_secs(10)),
)?;
Ok(builder.build().unwrap())
}
init()
})
.as_ref()
}

View file

@ -6,6 +6,18 @@
*
* RTP session management (receiver).
*
* ## Environment variables
*
* The underlying Runtime can be fine-tuned process-wide using the following env vars:
*
* * 'GST_RTPBIN2_RT_WORKER_THREADS': number of worker threads the Runtime will use (default: 1)
* 0 => number of cores available on the system.
* * 'GST_RTPBIN2_RT_MAX_BLOCKING_THREADS': limit for the number of threads in the blocking pool (default: 512).
* When they push buffers, downstream events or handle downstream queries, rtpbin2 elements spawn
* a thread from the blocking pool. This is to avoid blocking the worker thread which is shared
* with other elements running on the same runtime.
* * 'GST_RTPBIN2_RT_THREAD_KEEP_ALIVE': timeout for a thread in the blocking pool in ms (default: 10s).
*
* ## Example pipeline
*
* |[
@ -46,7 +58,7 @@ use super::session::{
use super::source::SourceState;
use super::sync;
use crate::rtpbin2::RUNTIME;
use crate::rtpbin2;
const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200);
@ -395,37 +407,49 @@ impl RecvSession {
let recv_flow_combiner = recv_flow_combiner.clone();
let store = store.clone();
RUNTIME.block_on(async move {
let mut stream = JitterBufferStream::new(store);
while let Some(item) = stream.next().await {
match item {
JitterBufferItem::PacketList(list) => {
let flow = pad.push_list(list);
gst::trace!(CAT, obj = pad, "Pushed buffer list, flow ret {:?}", flow);
let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap();
let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow);
// TODO: store flow, return only on session pads?
}
JitterBufferItem::Packet(buffer) => {
let flow = pad.push(buffer);
gst::trace!(CAT, obj = pad, "Pushed buffer, flow ret {:?}", flow);
let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap();
let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow);
// TODO: store flow, return only on session pads?
}
JitterBufferItem::Event(event) => {
let res = pad.push_event(event);
gst::trace!(CAT, obj = pad, "Pushed serialized event, result: {}", res);
}
JitterBufferItem::Query(mut query, tx) => {
// This is safe because the thread holding the original reference is waiting
// for us exclusively
let res = pad.peer_query(unsafe { query.as_mut() });
let _ = tx.send(res);
rtpbin2::get_or_init_runtime()
.expect("initialized in change_state()")
.block_on(async move {
let mut stream = JitterBufferStream::new(store);
while let Some(item) = stream.next().await {
match item {
JitterBufferItem::PacketList(list) => {
let flow = pad.push_list(list);
gst::trace!(
CAT,
obj = pad,
"Pushed buffer list, flow ret {:?}",
flow
);
let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap();
let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow);
// TODO: store flow, return only on session pads?
}
JitterBufferItem::Packet(buffer) => {
let flow = pad.push(buffer);
gst::trace!(CAT, obj = pad, "Pushed buffer, flow ret {:?}", flow);
let mut recv_flow_combiner = recv_flow_combiner.lock().unwrap();
let _combined_flow = recv_flow_combiner.update_pad_flow(&pad, flow);
// TODO: store flow, return only on session pads?
}
JitterBufferItem::Event(event) => {
let res = pad.push_event(event);
gst::trace!(
CAT,
obj = pad,
"Pushed serialized event, result: {}",
res
);
}
JitterBufferItem::Query(mut query, tx) => {
// This is safe because the thread holding the original reference is waiting
// for us exclusively
let res = pad.peer_query(unsafe { query.as_mut() });
let _ = tx.send(res);
}
}
}
}
})
})
})?;
gst::debug!(CAT, obj = pad, "Task started");
@ -456,6 +480,7 @@ impl RecvSession {
pt: u8,
ssrc: u32,
) -> (RtpRecvSrcPad, bool) {
let settings = rtpbin.settings.lock().unwrap();
if let Some(pad) = self
.rtp_recv_srcpads
.iter()
@ -504,8 +529,6 @@ impl RecvSession {
srcpad.use_fixed_caps();
let settings = rtpbin.settings.lock().unwrap();
let recv_pad = RtpRecvSrcPad::new(
pt,
ssrc,
@ -2041,6 +2064,14 @@ impl ElementImpl for RtpRecv {
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::NullToReady => {
if let Err(err) = rtpbin2::get_or_init_runtime() {
self.post_error_message(gst::error_msg!(
gst::LibraryError::Settings,
["Error initializing runtime: {err}"]
));
return Err(gst::StateChangeError);
}
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
let rtp_id = settings.rtp_id.clone();

View file

@ -6,6 +6,18 @@
*
* RTP session management (sender).
*
* ## Environment variables
*
* The underlying Runtime can be fine-tuned process-wide using the following env vars:
*
* * 'GST_RTPBIN2_RT_WORKER_THREADS': number of worker threads the Runtime will use (default: 1)
* 0 => number of cores available on the system.
* * 'GST_RTPBIN2_RT_MAX_BLOCKING_THREADS': limit for the number of threads in the blocking pool (default: 512).
* When they push buffers, downstream events or handle downstream queries, rtpbin2 elements spawn
* a thread from the blocking pool. This is to avoid blocking the worker thread which is shared
* with other elements running on the same runtime.
* * 'GST_RTPBIN2_RT_THREAD_KEEP_ALIVE': timeout for a thread in the blocking pool in ms (default: 10s).
*
* ## Example pipeline
*
* |[
@ -37,7 +49,7 @@ use super::internal::{pt_clock_rate_from_caps, GstRustLogger, SharedRtpState, Sh
use super::session::{RtcpSendReply, RtpProfile, SendReply, RTCP_MIN_REPORT_INTERVAL};
use super::source::SourceState;
use crate::rtpbin2::RUNTIME;
use crate::rtpbin2;
const DEFAULT_MIN_RTCP_INTERVAL: Duration = RTCP_MIN_REPORT_INTERVAL;
const DEFAULT_REDUCED_SIZE_RTCP: bool = false;
@ -210,10 +222,12 @@ impl SendSession {
// when the plugin is statically linked.
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let session_id = self.internal_session.id;
RUNTIME.spawn(async move {
let future = Abortable::new(Self::rtcp_task(state, session_id), abort_registration);
future.await
});
rtpbin2::get_or_init_runtime()
.expect("initialized in change_state()")
.spawn(async move {
let future = Abortable::new(Self::rtcp_task(state, session_id), abort_registration);
future.await
});
rtcp_task.replace(RtcpTask { abort_handle });
}
@ -246,17 +260,19 @@ impl SendSession {
if let Some((rtcp_srcpad, data)) = send {
let acquired = sem.clone().acquire_owned().await;
RUNTIME.spawn_blocking(move || {
let buffer = gst::Buffer::from_mut_slice(data);
if let Err(e) = rtcp_srcpad.push(buffer) {
gst::warning!(
CAT,
obj = rtcp_srcpad,
"Failed to send rtcp data: flow return {e:?}"
);
}
drop(acquired);
});
rtpbin2::get_or_init_runtime()
.expect("initialized in change_state()")
.spawn_blocking(move || {
let buffer = gst::Buffer::from_mut_slice(data);
if let Err(e) = rtcp_srcpad.push(buffer) {
gst::warning!(
CAT,
obj = rtcp_srcpad,
"Failed to send rtcp data: flow return {e:?}"
);
}
drop(acquired);
});
}
}
}
@ -934,6 +950,14 @@ impl ElementImpl for RtpSend {
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::NullToReady => {
if let Err(err) = rtpbin2::get_or_init_runtime() {
self.post_error_message(gst::error_msg!(
gst::LibraryError::Settings,
["Error initializing runtime: {err}"]
));
return Err(gst::StateChangeError);
}
let settings = self.settings.lock().unwrap();
let rtp_id = settings.rtp_id.clone();
drop(settings);