// Copyright (C) 2018 Sebastian Dröge // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public // License as published by the Free Software Foundation; either // version 2 of the License, or (at your option) any later version. // // This library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // Library General Public License for more details. // // You should have received a copy of the GNU Library General Public // License along with this library; if not, write to the // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Boston, MA 02110-1335, USA. // // SPDX-License-Identifier: LGPL-2.1-or-later use gst::glib; use gst::prelude::*; use once_cell::sync::Lazy; use std::net; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::{env, thread, time}; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "ts-udpsrc-benchmark-sender", gst::DebugColorFlags::empty(), Some("Thread-sharing UDP src benchmark sender"), ) }); fn main() { gst::init().unwrap(); gstthreadshare::plugin_register_static().unwrap(); let args = env::args().collect::>(); assert!(args.len() > 1); let n_streams: u16 = args[1].parse().unwrap(); let num_buffers: Option = if args.len() > 3 { args[3].parse().ok() } else { None }; if args.len() > 2 { match args[2].as_str() { "raw" => send_raw_buffers(n_streams), "rtp" => send_rtp_buffers(n_streams, num_buffers), _ => send_test_buffers(n_streams, num_buffers), } } else { send_test_buffers(n_streams, num_buffers); } } fn send_raw_buffers(n_streams: u16) { let buffer = [0; 160]; let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap(); let ipaddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); let destinations = (5004..(5004 + n_streams)) .map(|port| SocketAddr::new(ipaddr, port)) .collect::>(); let wait = time::Duration::from_millis(20); thread::sleep(time::Duration::from_millis(1000)); loop { let now = time::Instant::now(); for dest in &destinations { socket.send_to(&buffer, dest).unwrap(); } let elapsed = now.elapsed(); if elapsed < wait { thread::sleep(wait - elapsed); } } } fn send_test_buffers(n_streams: u16, num_buffers: Option) { let pipeline = gst::Pipeline::default(); for i in 0..n_streams { let src = gst::ElementFactory::make("ts-audiotestsrc") .name(format!("ts-audiotestsrc-{i}").as_str()) .property("context-wait", 20u32) .property("is-live", true) .property("do-timestamp", true) .build() .unwrap(); if let Some(num_buffers) = num_buffers { src.set_property("num-buffers", num_buffers); } #[cfg(feature = "tuning")] if i == 0 { src.set_property("main-elem", true); } let sink = gst::ElementFactory::make("ts-udpsink") .name(format!("udpsink-{i}").as_str()) .property("clients", format!("127.0.0.1:{}", i + 5004)) .property("context-wait", 20u32) .build() .unwrap(); let elements = &[&src, &sink]; pipeline.add_many(elements).unwrap(); gst::Element::link_many(elements).unwrap(); } run(pipeline); } fn send_rtp_buffers(n_streams: u16, num_buffers: Option) { let pipeline = gst::Pipeline::default(); for i in 0..n_streams { let src = gst::ElementFactory::make("ts-audiotestsrc") .name(format!("ts-audiotestsrc-{i}").as_str()) .property("context-wait", 20u32) .property("is-live", true) .property("do-timestamp", true) .build() .unwrap(); if let Some(num_buffers) = num_buffers { src.set_property("num-buffers", num_buffers); } #[cfg(feature = "tuning")] if i == 0 { src.set_property("main-elem", true); } let enc = gst::ElementFactory::make("alawenc") .name(format!("alawenc-{i}").as_str()) .build() .unwrap(); let pay = gst::ElementFactory::make("rtppcmapay") .name(format!("rtppcmapay-{i}").as_str()) .build() .unwrap(); let sink = gst::ElementFactory::make("ts-udpsink") .name(format!("udpsink-{i}").as_str()) .property("context-wait", 20u32) .property("clients", format!("127.0.0.1:{}", i + 5004)) .build() .unwrap(); let elements = &[&src, &enc, &pay, &sink]; pipeline.add_many(elements).unwrap(); gst::Element::link_many(elements).unwrap(); } run(pipeline); } fn run(pipeline: gst::Pipeline) { let l = glib::MainLoop::new(None, false); let bus = pipeline.bus().unwrap(); let l_clone = l.clone(); let _bus_watch = bus .add_watch(move |_, msg| { use gst::MessageView; match msg.view() { MessageView::Eos(_) => { gst::info!(CAT, "Received eos"); l_clone.quit(); glib::ControlFlow::Break } MessageView::Error(msg) => { gst::error!( CAT, "Error from {:?}: {} ({:?})", msg.src().map(|s| s.path_string()), msg.error(), msg.debug() ); l_clone.quit(); glib::ControlFlow::Break } _ => glib::ControlFlow::Continue, } }) .expect("Failed to add bus watch"); pipeline.set_state(gst::State::Playing).unwrap(); l.run(); pipeline.set_state(gst::State::Null).unwrap(); }