gst-plugins-rs/generic/threadshare/examples/udpsrc_benchmark_sender.rs

198 lines
6 KiB
Rust

// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.
//
// SPDX-License-Identifier: LGPL-2.1-or-later
use gst::glib;
use gst::prelude::*;
use std::sync::LazyLock;
use std::net;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::{env, thread, time};
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"ts-udpsrc-benchmark-sender",
gst::DebugColorFlags::empty(),
Some("Thread-sharing UDP src benchmark sender"),
)
});
fn main() {
gst::init().unwrap();
gstthreadshare::plugin_register_static().unwrap();
let args = env::args().collect::<Vec<_>>();
assert!(args.len() > 1);
let n_streams: u16 = args[1].parse().unwrap();
let num_buffers: Option<i32> = if args.len() > 3 {
args[3].parse().ok()
} else {
None
};
if args.len() > 2 {
match args[2].as_str() {
"raw" => send_raw_buffers(n_streams),
"rtp" => send_rtp_buffers(n_streams, num_buffers),
_ => send_test_buffers(n_streams, num_buffers),
}
} else {
send_test_buffers(n_streams, num_buffers);
}
}
fn send_raw_buffers(n_streams: u16) {
let buffer = [0; 160];
let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap();
let ipaddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let destinations = (5004..(5004 + n_streams))
.map(|port| SocketAddr::new(ipaddr, port))
.collect::<Vec<_>>();
let wait = time::Duration::from_millis(20);
thread::sleep(time::Duration::from_millis(1000));
loop {
let now = time::Instant::now();
for dest in &destinations {
socket.send_to(&buffer, dest).unwrap();
}
let elapsed = now.elapsed();
if elapsed < wait {
thread::sleep(wait - elapsed);
}
}
}
fn send_test_buffers(n_streams: u16, num_buffers: Option<i32>) {
let pipeline = gst::Pipeline::default();
for i in 0..n_streams {
let src = gst::ElementFactory::make("ts-audiotestsrc")
.name(format!("ts-audiotestsrc-{i}").as_str())
.property("context-wait", 20u32)
.property("is-live", true)
.property("do-timestamp", true)
.property_if_some("num-buffers", num_buffers)
.build()
.unwrap();
#[cfg(feature = "tuning")]
if i == 0 {
src.set_property("main-elem", true);
}
let sink = gst::ElementFactory::make("ts-udpsink")
.name(format!("udpsink-{i}").as_str())
.property("clients", format!("127.0.0.1:{}", i + 5004))
.property("context-wait", 20u32)
.build()
.unwrap();
let elements = &[&src, &sink];
pipeline.add_many(elements).unwrap();
gst::Element::link_many(elements).unwrap();
}
run(pipeline);
}
fn send_rtp_buffers(n_streams: u16, num_buffers: Option<i32>) {
let pipeline = gst::Pipeline::default();
for i in 0..n_streams {
let src = gst::ElementFactory::make("ts-audiotestsrc")
.name(format!("ts-audiotestsrc-{i}").as_str())
.property("context-wait", 20u32)
.property("is-live", true)
.property("do-timestamp", true)
.property_if_some("num-buffers", num_buffers)
.build()
.unwrap();
#[cfg(feature = "tuning")]
if i == 0 {
src.set_property("main-elem", true);
}
let enc = gst::ElementFactory::make("alawenc")
.name(format!("alawenc-{i}").as_str())
.build()
.unwrap();
let pay = gst::ElementFactory::make("rtppcmapay")
.name(format!("rtppcmapay-{i}").as_str())
.build()
.unwrap();
let sink = gst::ElementFactory::make("ts-udpsink")
.name(format!("udpsink-{i}").as_str())
.property("context-wait", 20u32)
.property("clients", format!("127.0.0.1:{}", i + 5004))
.build()
.unwrap();
let elements = &[&src, &enc, &pay, &sink];
pipeline.add_many(elements).unwrap();
gst::Element::link_many(elements).unwrap();
}
run(pipeline);
}
fn run(pipeline: gst::Pipeline) {
let l = glib::MainLoop::new(None, false);
let bus = pipeline.bus().unwrap();
let l_clone = l.clone();
let _bus_watch = bus
.add_watch(move |_, msg| {
use gst::MessageView;
match msg.view() {
MessageView::Eos(_) => {
gst::info!(CAT, "Received eos");
l_clone.quit();
glib::ControlFlow::Break
}
MessageView::Error(msg) => {
gst::error!(
CAT,
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
msg.error(),
msg.debug()
);
l_clone.quit();
glib::ControlFlow::Break
}
_ => glib::ControlFlow::Continue,
}
})
.expect("Failed to add bus watch");
pipeline.set_state(gst::State::Playing).unwrap();
l.run();
pipeline.set_state(gst::State::Null).unwrap();
}