rtpbin2: add send / recv examples

Co-authored-by: Sebastian Dröge <sebastian@centricular.com>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2360>
This commit is contained in:
Sebastian Dröge 2025-07-14 16:32:19 +03:00
parent 31af6cb46b
commit 715a6bcd69
6 changed files with 1025 additions and 0 deletions

2
Cargo.lock generated
View file

@ -3263,6 +3263,7 @@ dependencies = [
"chrono",
"futures",
"gio",
"glib",
"gst-plugin-version-helper",
"gstreamer",
"gstreamer-app",
@ -3273,6 +3274,7 @@ dependencies = [
"gstreamer-rtp",
"gstreamer-video",
"hex",
"itertools 0.14.0",
"log",
"rand 0.9.1",
"rtcp-types",

View file

@ -21,6 +21,7 @@ gst-net = { workspace = true, features = ["v1_20"] }
gst-rtp = { workspace = true, features = ["v1_20"] }
gst-video = { workspace = true, features = ["v1_20"] }
futures = "0.3"
glib.workspace = true
gio.workspace = true
hex = "0.4.3"
log = "0.4"
@ -38,6 +39,8 @@ tokio-util = "0.7.15"
[dev-dependencies]
gst-check = { workspace = true, features = ["v1_20"] }
gst-app = { workspace = true, features = ["v1_20"] }
tokio = { version = "1", default-features = false, features = ["macros", "signal"] }
itertools = "0.14"
[build-dependencies]
gst-plugin-version-helper.workspace = true
@ -52,6 +55,22 @@ static = []
capi = []
doc = []
[[example]]
name = "rtpbin2-send"
path = "examples/rtpbin2-send.rs"
[[example]]
name = "rtpbin2-recv"
path = "examples/rtpbin2-recv.rs"
[[example]]
name = "rtpbin2-send-bundle"
path = "examples/rtpbin2-send-bundle.rs"
[[example]]
name = "rtpbin2-recv-bundle"
path = "examples/rtpbin2-recv-bundle.rs"
[package.metadata.capi]
min_version = "0.9.21"

View file

@ -0,0 +1,254 @@
// SPDX-License-Identifier: MPL-2.0
/// Receives one audio stream and one video stream bundled in a single RTP session.
/// Use `rtpbin2-send-bundle` as a receiver.
use futures::pin_mut;
use futures::prelude::*;
use gst::prelude::*;
use itertools::Itertools as _;
const RTP_ID: &str = "example-rtp-id";
const SESSION_ID: u32 = 0;
const VIDEO_PT: u8 = 96;
const AUDIO_PT: u8 = 97;
const HOST: &str = "127.0.0.1";
const RECV_PORT: u16 = 5004;
const SEND_PORT: u16 = 5005;
const LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200);
#[tokio::main]
async fn main() {
gst::init().unwrap();
gstrsrtp::plugin_register_static().unwrap();
let pipeline = gst::Pipeline::new();
let rtprecv = gst::ElementFactory::make("rtprecv")
.name(format!("rtprecv-{SESSION_ID}"))
.property("rtp-id", RTP_ID)
.property("latency", LATENCY.mseconds() as u32)
.build()
.unwrap();
pipeline.add(&rtprecv).unwrap();
let _recv_rtp_sink_pad = rtprecv
.request_pad_simple(&format!("rtp_sink_{SESSION_ID}"))
.unwrap();
let session = rtprecv.emit_by_name::<gst::glib::Object>("get-session", &[&SESSION_ID]);
let pt_map = gst::Structure::builder("application/x-rtp2-pt-map")
.field(
VIDEO_PT.to_string(),
gst::Caps::builder("application/x-rtp")
.field("media", "video")
.field("payload", VIDEO_PT as i32)
.field("clock-rate", 90_000i32)
.field("encoding-name", "VP8")
.build(),
)
.field(
AUDIO_PT.to_string(),
gst::Caps::builder("application/x-rtp")
.field("media", "audio")
.field("payload", AUDIO_PT as i32)
.field("clock-rate", 48_000i32)
.field("encoding-name", "OPUS")
.build(),
)
.build();
session.set_property("pt-map", pt_map);
rtprecv.connect_pad_added({
|recv, pad| {
let pad_name = pad.name();
if !pad_name.starts_with("rtp_src_") {
return;
}
let (_prefix, _direction, session_id, pt, _ssrc) =
pad_name.split('_').collect_tuple().unwrap();
let _session_id = session_id.parse::<u32>().unwrap();
let pt = pt.parse::<u8>().unwrap();
println!("Adding src pad: {pad_name}");
let parent = recv.parent().unwrap().downcast::<gst::Bin>().unwrap();
match pt {
VIDEO_PT => {
let depay_queue = gst::ElementFactory::make("queue")
.name("video-depay-queue")
.property("max-size-bytes", 0u32)
.property("max-size-time", 2.seconds() + 50.mseconds())
.property("max-size-time", gst::ClockTime::ZERO)
.property("max-size-buffers", 0u32)
.property_from_str("leaky", "downstream")
.build()
.unwrap();
let depay = gst::ElementFactory::make("rtpvp8depay2").build().unwrap();
let dec = gst::ElementFactory::make("vp8dec").build().unwrap();
let conv = gst::ElementFactory::make("videoconvert").build().unwrap();
let sink_queue = gst::ElementFactory::make("queue")
.name("video-sink-queue")
.property("max-size-bytes", 0u32)
.property("max-size-time", gst::ClockTime::ZERO)
.property("max-size-buffers", 1u32)
.build()
.unwrap();
let sink = gst::ElementFactory::make("autovideosink").build().unwrap();
let elems = [&depay_queue, &depay, &dec, &conv, &sink_queue, &sink];
parent.add_many(elems).unwrap();
pad.link(&depay_queue.static_pad("sink").unwrap()).unwrap();
gst::Element::link_many(elems).unwrap();
sink.sync_state_with_parent().unwrap();
sink_queue.sync_state_with_parent().unwrap();
conv.sync_state_with_parent().unwrap();
dec.sync_state_with_parent().unwrap();
depay.sync_state_with_parent().unwrap();
depay_queue.sync_state_with_parent().unwrap();
}
AUDIO_PT => {
let depay_queue = gst::ElementFactory::make("queue")
.name("audio-depay-queue")
.property("max-size-bytes", 0u32)
// This queue needs to be big enough for holding enough
// audio until a keyframe is decoded on the video branch
// or otherwise the pipeline might get stuck prerolling.
//
// Make it twice the keyframe interval for safety.
//
// The alternative would be to use a leaky queue or to use
// async=false on the sink, or a combination of that.
.property("max-size-time", 2.seconds() + 50.mseconds())
.property("max-size-buffers", 0u32)
.property_from_str("leaky", "downstream")
.build()
.unwrap();
let depay = gst::ElementFactory::make("rtpopusdepay2").build().unwrap();
let dec = gst::ElementFactory::make("opusdec").build().unwrap();
let conv = gst::ElementFactory::make("audioconvert").build().unwrap();
let sink_queue = gst::ElementFactory::make("queue")
.name("audio-sink-queue")
.property("max-size-bytes", 0u32)
.property("max-size-time", gst::ClockTime::ZERO)
.property("max-size-buffers", 1u32)
.build()
.unwrap();
let sink = gst::ElementFactory::make("autoaudiosink").build().unwrap();
let elems = [&depay_queue, &depay, &dec, &conv, &sink_queue, &sink];
parent.add_many(elems).unwrap();
pad.link(&depay_queue.static_pad("sink").unwrap()).unwrap();
gst::Element::link_many(elems).unwrap();
sink.sync_state_with_parent().unwrap();
sink_queue.sync_state_with_parent().unwrap();
conv.sync_state_with_parent().unwrap();
dec.sync_state_with_parent().unwrap();
depay.sync_state_with_parent().unwrap();
depay_queue.sync_state_with_parent().unwrap();
}
other => eprintln!("Unexpected PT {other:?} in pad name {pad_name}"),
}
}
});
// RTP / RTCP from peer
let src = gst::ElementFactory::make("udpsrc")
.name(format!("udpsrc-{SESSION_ID}"))
.property("port", RECV_PORT as i32)
.property("caps", gst::Caps::new_empty_simple("application/x-rtp"))
.build()
.unwrap();
pipeline.add(&src).unwrap();
let queue_rtp = gst::ElementFactory::make("queue")
.name(format!("queue-rtp-{SESSION_ID}"))
.property("max-size-bytes", 0u32)
.property("max-size-time", LATENCY + 50.mseconds())
.property("max-size-buffers", 0u32)
.build()
.unwrap();
pipeline.add(&queue_rtp).unwrap();
src.link(&queue_rtp).unwrap();
queue_rtp
.link_pads(
Some("src"),
&rtprecv,
Some(&format!("rtp_sink_{SESSION_ID}")),
)
.unwrap();
// RTCP to peer
let rtpsend = gst::ElementFactory::make("rtpsend")
.name("send")
.property("rtp-id", RTP_ID)
.build()
.unwrap();
pipeline.add(&rtpsend).unwrap();
let sink = gst::ElementFactory::make("udpsink")
.name(format!("udpsink-{SESSION_ID}"))
// don't wait for the first RTCP packet for preroll
.property("async", false)
.property("port", SEND_PORT as i32)
.property("host", HOST)
.build()
.unwrap();
pipeline.add(&sink).unwrap();
rtpsend
.link_pads(Some(&format!("rtcp_src_{SESSION_ID}")), &sink, Some("sink"))
.unwrap();
// Share the same socket in source and sink
src.set_state(gst::State::Ready).unwrap();
let socket = src.property::<glib::Object>("used-socket");
sink.set_property("socket", socket);
pipeline.set_state(gst::State::Playing).unwrap();
println!("Playing");
let ctrl_c = tokio::signal::ctrl_c().fuse();
pin_mut!(ctrl_c);
let mut bus_recv_stream = pipeline.bus().unwrap().stream();
loop {
futures::select_biased! {
_ = ctrl_c => {
println!("\nShutting down due to user request");
break;
}
msg_recv = bus_recv_stream.next() => {
use gst::MessageView::*;
let Some(msg) = msg_recv else { continue };
match msg.view() {
Latency(_) => {
let _ = pipeline.recalculate_latency();
}
Eos(_) => {
println!("Got EoS");
break;
}
Error(err) => {
eprintln!("recv pipeline: {err:?}");
break;
}
_ => (),
}
}
};
}
pipeline.debug_to_dot_file(
gst::DebugGraphDetails::all(),
"rtpbin2-recv-bundle-stopping",
);
pipeline.set_state(gst::State::Null).unwrap();
// This is needed by some tracers to write their log file
unsafe {
gst::deinit();
}
}

View file

@ -0,0 +1,262 @@
// SPDX-License-Identifier: MPL-2.0
/// Receives one audio stream and one video stream from two separate RTP sessions.
/// Use `rtpbin2-send` as a sender.
use futures::pin_mut;
use futures::prelude::*;
use gst::prelude::*;
use itertools::Itertools as _;
const RTP_ID: &str = "example-rtp-id";
const HOST: &str = "127.0.0.1";
const VIDEO_PT: u8 = 96;
const AUDIO_PT: u8 = 97;
const LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200);
struct SessionParameters {
session_id: u32,
pt: u8,
caps: gst::Caps,
rtp_recv_port: u16,
rtcp_recv_port: u16,
rtcp_send_port: u16,
#[allow(clippy::type_complexity)]
on_pad_added: Box<dyn Fn(&gst::Pipeline, &gst::Pad) + Send + Sync + 'static>,
}
fn add_rtp_recv(
pipeline: &gst::Pipeline,
rtprecv: &gst::Element,
rtpsend: &gst::Element,
params: &SessionParameters,
) {
let _recv_rtp_sink_pad = rtprecv
.request_pad_simple(&format!("rtp_sink_{}", params.session_id))
.unwrap();
// RTP / RTCP from peer
let rtp_src = gst::ElementFactory::make("udpsrc")
.name(format!("udpsrc-rtp-{}-{}", params.session_id, params.pt))
.property("port", params.rtp_recv_port as i32)
.property("caps", &params.caps)
.build()
.unwrap();
pipeline.add(&rtp_src).unwrap();
let queue_rtp = gst::ElementFactory::make("queue")
.name(format!("queue-rtp-{}-{}", params.session_id, params.pt))
.property("max-size-bytes", 0u32)
.property("max-size-time", LATENCY + 50.mseconds())
.property("max-size-buffers", 0u32)
.build()
.unwrap();
pipeline.add(&queue_rtp).unwrap();
rtp_src.link(&queue_rtp).unwrap();
queue_rtp
.link_pads(
Some("src"),
rtprecv,
Some(&format!("rtp_sink_{}", params.session_id)),
)
.unwrap();
let rtcp_src = gst::ElementFactory::make("udpsrc")
.name(format!("udpsrc-rtcp-{}-{}", params.session_id, params.pt))
.property("port", params.rtcp_recv_port as i32)
.property("caps", gst::Caps::new_empty_simple("application/x-rtcp"))
.build()
.unwrap();
pipeline.add(&rtcp_src).unwrap();
rtcp_src
.link_pads(
Some("src"),
rtprecv,
Some(&format!("rtcp_sink_{}", params.session_id)),
)
.unwrap();
// RTCP to peer
let rtcp_sink = gst::ElementFactory::make("udpsink")
.name(format!("udpsink-rtcp-{}-{}", params.session_id, params.pt))
// don't wait for the first RTCP packet for preroll
.property("async", false)
.property("port", params.rtcp_send_port as i32)
.property("host", HOST)
.build()
.unwrap();
pipeline.add(&rtcp_sink).unwrap();
rtpsend
.link_pads(
Some(&format!("rtcp_src_{}", params.session_id)),
&rtcp_sink,
Some("sink"),
)
.unwrap();
}
#[tokio::main]
async fn main() {
gst::init().unwrap();
gstrsrtp::plugin_register_static().unwrap();
let pipeline = gst::Pipeline::new();
let rtprecv = gst::ElementFactory::make("rtprecv")
.property("rtp-id", RTP_ID)
.property("latency", LATENCY.mseconds() as u32)
.build()
.unwrap();
pipeline.add(&rtprecv).unwrap();
let rtpsend = gst::ElementFactory::make("rtpsend")
.property("rtp-id", RTP_ID)
.build()
.unwrap();
pipeline.add(&rtpsend).unwrap();
let sessions = [
SessionParameters {
session_id: 0,
pt: VIDEO_PT,
caps: gst::Caps::builder("application/x-rtp")
.field("media", "video")
.field("payload", VIDEO_PT as i32)
.field("clock-rate", 90_000i32)
.field("encoding-name", "VP8")
.build(),
rtp_recv_port: 5004,
rtcp_recv_port: 5005,
rtcp_send_port: 5007,
on_pad_added: Box::new(|pipeline, pad| {
let depay = gst::ElementFactory::make("rtpvp8depay2").build().unwrap();
let dec = gst::ElementFactory::make("vp8dec").build().unwrap();
let conv = gst::ElementFactory::make("videoconvert").build().unwrap();
let sink_queue = gst::ElementFactory::make("queue")
.name("video-sink-queue")
.property("max-size-bytes", 0u32)
.property("max-size-time", gst::ClockTime::ZERO)
.property("max-size-buffers", 1u32)
.build()
.unwrap();
let sink = gst::ElementFactory::make("autovideosink").build().unwrap();
let elems = [&depay, &dec, &conv, &sink_queue, &sink];
pipeline.add_many(elems).unwrap();
pad.link(&depay.static_pad("sink").unwrap()).unwrap();
gst::Element::link_many(elems).unwrap();
sink.sync_state_with_parent().unwrap();
sink_queue.sync_state_with_parent().unwrap();
conv.sync_state_with_parent().unwrap();
dec.sync_state_with_parent().unwrap();
depay.sync_state_with_parent().unwrap();
}),
},
SessionParameters {
session_id: 1,
pt: AUDIO_PT,
caps: gst::Caps::builder("application/x-rtp")
.field("media", "audio")
.field("payload", AUDIO_PT as i32)
.field("clock-rate", 48_000i32)
.field("encoding-name", "OPUS")
.build(),
rtp_recv_port: 5008,
rtcp_recv_port: 5009,
rtcp_send_port: 5011,
on_pad_added: Box::new(|pipeline, pad| {
let depay = gst::ElementFactory::make("rtpopusdepay2").build().unwrap();
let dec = gst::ElementFactory::make("opusdec").build().unwrap();
let conv = gst::ElementFactory::make("audioconvert").build().unwrap();
let sink_queue = gst::ElementFactory::make("queue")
.name("audio-sink-queue")
.property("max-size-bytes", 0u32)
.property("max-size-time", gst::ClockTime::ZERO)
.property("max-size-buffers", 1u32)
.build()
.unwrap();
let sink = gst::ElementFactory::make("autoaudiosink").build().unwrap();
let elems = [&depay, &dec, &conv, &sink_queue, &sink];
pipeline.add_many(elems).unwrap();
pad.link(&depay.static_pad("sink").unwrap()).unwrap();
gst::Element::link_many(elems).unwrap();
sink.sync_state_with_parent().unwrap();
sink_queue.sync_state_with_parent().unwrap();
conv.sync_state_with_parent().unwrap();
dec.sync_state_with_parent().unwrap();
depay.sync_state_with_parent().unwrap();
}),
},
];
add_rtp_recv(&pipeline, &rtprecv, &rtpsend, &sessions[0]);
add_rtp_recv(&pipeline, &rtprecv, &rtpsend, &sessions[1]);
rtprecv.connect_pad_added(move |recv, pad| {
let pad_name = pad.name();
if !pad_name.starts_with("rtp_src_") {
return;
}
let (_prefix, _direction, session_id, pt, ssrc) =
pad_name.split('_').collect_tuple().unwrap();
let session_id = session_id.parse::<u32>().unwrap();
let pt = pt.parse::<u8>().unwrap();
let Some(session) = sessions
.iter()
.find(|session| session.session_id == session_id && session.pt == pt)
else {
eprintln!("Unknown RTP stream with session ID {session_id}, PT {pt} and SSRC {ssrc}");
return;
};
let parent = recv.parent().unwrap();
(session.on_pad_added)(parent.downcast_ref::<gst::Pipeline>().unwrap(), pad);
});
pipeline.set_state(gst::State::Playing).unwrap();
println!("Playing");
let ctrl_c = tokio::signal::ctrl_c().fuse();
pin_mut!(ctrl_c);
let mut bus_recv_stream = pipeline.bus().unwrap().stream();
loop {
futures::select_biased! {
_ = ctrl_c => {
println!("\nShutting down due to user request");
break;
}
msg_recv = bus_recv_stream.next() => {
use gst::MessageView::*;
let Some(msg) = msg_recv else { continue };
match msg.view() {
Latency(_) => {
let _ = pipeline.recalculate_latency();
}
Eos(_) => {
println!("Got EoS");
break;
}
Error(err) => {
eprintln!("recv pipeline: {err:?}");
break;
}
_ => (),
}
}
};
}
pipeline.debug_to_dot_file(gst::DebugGraphDetails::all(), "rtpbin2-recv-stopping");
pipeline.set_state(gst::State::Null).unwrap();
// This is needed by some tracers to write their log file
unsafe {
gst::deinit();
}
}

View file

@ -0,0 +1,231 @@
// SPDX-License-Identifier: MPL-2.0
/// Sends one audio stream and one video stream bundled in a single RTP session.
/// Use `rtpbin2-recv-bundle` as a receiver.
use futures::pin_mut;
use futures::prelude::*;
use gst::prelude::*;
const RTP_ID: &str = "example-rtp-id";
const SESSION_ID: u32 = 1234;
const VIDEO_PT: u8 = 96;
const AUDIO_PT: u8 = 97;
const HOST: &str = "127.0.0.1";
const SEND_PORT: u16 = 5004;
const RECV_PORT: u16 = 5005;
#[tokio::main]
async fn main() {
gst::init().unwrap();
gstrsrtp::plugin_register_static().unwrap();
let pipeline = gst::Pipeline::new();
let rtpsend = gst::ElementFactory::make("rtpsend")
.name(format!("rtpsend-{SESSION_ID}"))
.property("rtp-id", RTP_ID)
.build()
.unwrap();
pipeline.add(&rtpsend).unwrap();
let funnel = gst::ElementFactory::make("rtpfunnel")
.name(format!("rtpfunnel-{SESSION_ID}"))
.build()
.unwrap();
pipeline.add(&funnel).unwrap();
funnel
.link_pads(
Some("src"),
&rtpsend,
Some(&format!("rtp_sink_{SESSION_ID}")),
)
.unwrap();
let session = rtpsend.emit_by_name::<gst::glib::Object>("get-session", &[&SESSION_ID]);
let pt_map = gst::Structure::builder("application/x-rtp2-pt-map")
.field(
VIDEO_PT.to_string(),
gst::Caps::builder("application/x-rtp")
.field("media", "video")
.field("payload", VIDEO_PT as i32)
.field("clock-rate", 90_000i32)
.field("encoding-name", "VP8")
.build(),
)
.field(
AUDIO_PT.to_string(),
gst::Caps::builder("application/x-rtp")
.field("media", "audio")
.field("payload", AUDIO_PT as i32)
.field("clock-rate", 48_000i32)
.field("encoding-name", "OPUS")
.build(),
)
.build();
session.set_property("pt-map", pt_map);
let video_src = gst::ElementFactory::make("videotestsrc")
.property("is-live", true)
.build()
.unwrap();
let video_enc = gst::ElementFactory::make("vp8enc")
.property("deadline", 1i64)
// 1s keyframe interval (30 frames at 30fps)
.property("keyframe-max-dist", 30i32)
.build()
.unwrap();
let video_pay = gst::ElementFactory::make("rtpvp8pay2")
.property("pt", VIDEO_PT as u32)
.property_from_str("picture-id-mode", "7-bit")
.build()
.unwrap();
let video_sync = gst::ElementFactory::make("clocksync")
.name("video-sync")
.build()
.unwrap();
let elems = [&video_src, &video_enc, &video_pay, &video_sync];
pipeline.add_many(elems).unwrap();
gst::Element::link_many(elems).unwrap();
video_sync
.link_pads(Some("src"), &funnel, Some("sink_%u"))
.unwrap();
let audio_src = gst::ElementFactory::make("audiotestsrc")
.property("is-live", true)
.property("volume", 0.03f64)
.build()
.unwrap();
let audio_enc = gst::ElementFactory::make("opusenc").build().unwrap();
let audio_pay = gst::ElementFactory::make("rtpopuspay2")
.property("pt", AUDIO_PT as u32)
.build()
.unwrap();
let audio_sync = gst::ElementFactory::make("clocksync")
.name("audio-sync")
.build()
.unwrap();
let elems = [&audio_src, &audio_enc, &audio_pay, &audio_sync];
pipeline.add_many(elems).unwrap();
gst::Element::link_many(elems).unwrap();
audio_sync
.link_pads(Some("src"), &funnel, Some("sink_%u"))
.unwrap();
// RTP / RTCP to peer
let funnel = gst::ElementFactory::make("funnel")
.name(format!("funnel-{SESSION_ID}"))
.build()
.unwrap();
pipeline.add(&funnel).unwrap();
let sink = gst::ElementFactory::make("udpsink")
.name(format!("udpsink-{SESSION_ID}"))
// sync of the RTP packets is handled before the rtpfunnel
.property("sync", false)
.property("port", SEND_PORT as i32)
.property("host", HOST)
.build()
.unwrap();
pipeline.add(&sink).unwrap();
funnel.link(&sink).unwrap();
let rtp_queue = gst::ElementFactory::make("queue")
.name(format!("queue-rtp-{SESSION_ID}"))
.property("max-size-bytes", 0u32)
.property("max-size-time", 100.mseconds())
.property("max-size-buffers", 0u32)
.build()
.unwrap();
pipeline.add(&rtp_queue).unwrap();
rtpsend
.link_pads(
Some(&format!("rtp_src_{SESSION_ID}")),
&rtp_queue,
Some("sink"),
)
.unwrap();
rtp_queue
.link_pads(Some("src"), &funnel, Some("sink_0"))
.unwrap();
rtpsend
.link_pads(
Some(&format!("rtcp_src_{SESSION_ID}")),
&funnel,
Some("sink_1"),
)
.unwrap();
// RTCP from peer
let rtprecv = gst::ElementFactory::make("rtprecv")
.name(format!("rtprecv-{SESSION_ID}"))
.property("rtp-id", RTP_ID)
.build()
.unwrap();
pipeline.add(&rtprecv).unwrap();
let src = gst::ElementFactory::make("udpsrc")
.name(format!("udpsrc-{SESSION_ID}"))
.property("port", RECV_PORT as i32)
.property("caps", gst::Caps::new_empty_simple("application/x-rtcp"))
.build()
.unwrap();
pipeline.add(&src).unwrap();
src.link_pads(
Some("src"),
&rtprecv,
Some(&format!("rtcp_sink_{SESSION_ID}")),
)
.unwrap();
// Share the same socket in source and sink
src.set_state(gst::State::Ready).unwrap();
let socket = src.property::<glib::Object>("used-socket");
sink.set_property("socket", socket);
pipeline.set_state(gst::State::Playing).unwrap();
println!("Playing");
let ctrl_c = tokio::signal::ctrl_c().fuse();
pin_mut!(ctrl_c);
let mut bus_send_stream = pipeline.bus().unwrap().stream();
loop {
futures::select_biased! {
_ = ctrl_c => {
println!("\nShutting down due to user request");
break;
}
msg_send = bus_send_stream.next() => {
use gst::MessageView::*;
let Some(msg) = msg_send else { continue };
match msg.view() {
Latency(_) => {
let _ = pipeline.recalculate_latency();
}
Error(err) => {
eprintln!("send pipeline: {err:?}");
break;
}
_ => (),
}
}
};
}
pipeline.debug_to_dot_file(
gst::DebugGraphDetails::all(),
"rtpbin2-send-bundle-stopping",
);
pipeline.set_state(gst::State::Null).unwrap();
// This is needed by some tracers to write their log file
unsafe {
gst::deinit();
}
}

View file

@ -0,0 +1,257 @@
// SPDX-License-Identifier: MPL-2.0
/// Sends one audio stream and one video stream in two separate RTP sessions.
/// Use `rtpbin2-recv` as a receiver.
use futures::pin_mut;
use futures::prelude::*;
use gst::prelude::*;
use std::{env, process};
const USAGE: &str = r#"Usage: rtpbin2-send [media-arg]
where 'media-arg' is either:
* -a or --audio-only: only stream audio
* -v or --video-only: only stream video
* by default, stream audio and video.
"#;
const RTP_ID: &str = "example-rtp-id";
const HOST: &str = "127.0.0.1";
const VIDEO_PT: u8 = 96;
const AUDIO_PT: u8 = 97;
#[derive(Debug)]
struct SessionParameters {
session_id: u32,
pt: u8,
rtp_recv_port: u16,
rtcp_recv_port: u16,
rtcp_send_port: u16,
}
fn add_rtp_send(
pipeline: &gst::Pipeline,
rtpsend: &gst::Element,
rtprecv: &gst::Element,
src: &gst::Element,
params: SessionParameters,
) {
let sync = gst::ElementFactory::make("clocksync")
.name(format!("clocksync-{}-{}", params.session_id, params.pt))
.build()
.unwrap();
pipeline.add(&sync).unwrap();
src.link(&sync).unwrap();
sync.link_pads(
Some("src"),
rtpsend,
Some(&format!("rtp_sink_{}", params.session_id)),
)
.unwrap();
// RTP / RTCP to peer
let rtp_queue = gst::ElementFactory::make("queue")
.name(format!("queue-rtp-{}-{}", params.session_id, params.pt))
.property("max-size-bytes", 0u32)
.property("max-size-time", 100.mseconds())
.property("max-size-buffers", 0u32)
.build()
.unwrap();
pipeline.add(&rtp_queue).unwrap();
let rtp_sink = gst::ElementFactory::make("udpsink")
.name(format!("udpsink-rtp-{}-{}", params.session_id, params.pt))
// sync of the RTP packets is handled before rtpsend
.property("sync", false)
.property("port", params.rtp_recv_port as i32)
.property("host", HOST)
.build()
.unwrap();
pipeline.add(&rtp_sink).unwrap();
rtpsend
.link_pads(
Some(&format!("rtp_src_{}", params.session_id)),
&rtp_queue,
Some("sink"),
)
.unwrap();
rtp_queue.link(&rtp_sink).unwrap();
let rtcp_sink = gst::ElementFactory::make("udpsink")
.name(format!("udpsink-rtcp-{}-{}", params.session_id, params.pt))
// don't wait for the first RTCP packet for preroll
.property("async", false)
.property("port", params.rtcp_recv_port as i32)
.property("host", HOST)
.build()
.unwrap();
pipeline.add(&rtcp_sink).unwrap();
rtpsend
.link_pads(
Some(&format!("rtcp_src_{}", params.session_id)),
&rtcp_sink,
Some("sink"),
)
.unwrap();
// RTCP from peer
let rtcp_src = gst::ElementFactory::make("udpsrc")
.name(format!("udpsrc-rtcp-{}-{}", params.session_id, params.pt))
.property("port", params.rtcp_send_port as i32)
.property("caps", gst::Caps::new_empty_simple("application/x-rtcp"))
.build()
.unwrap();
pipeline.add(&rtcp_src).unwrap();
rtcp_src
.link_pads(
Some("src"),
rtprecv,
Some(&format!("rtcp_sink_{}", params.session_id)),
)
.unwrap();
}
#[tokio::main]
async fn main() -> process::ExitCode {
let mut with_audio = true;
let mut with_video = true;
if let Some(media_arg) = env::args().nth(1) {
match media_arg.as_str() {
"--audio-only" | "-a" => with_video = false,
"--video-only" | "-v" => with_audio = false,
_ => {
println!("{USAGE}");
return process::ExitCode::FAILURE;
}
}
}
gst::init().unwrap();
gstrsrtp::plugin_register_static().unwrap();
let pipeline = gst::Pipeline::new();
let rtpsend = gst::ElementFactory::make("rtpsend")
.property("rtp-id", RTP_ID)
.build()
.unwrap();
pipeline.add(&rtpsend).unwrap();
let rtprecv = gst::ElementFactory::make("rtprecv")
.property("rtp-id", RTP_ID)
.build()
.unwrap();
pipeline.add(&rtprecv).unwrap();
if with_video {
println!("Adding video stream...");
let video_src = gst::ElementFactory::make("videotestsrc")
.property("is-live", true)
.build()
.unwrap();
let video_enc = gst::ElementFactory::make("vp8enc")
.property("deadline", 1i64)
.build()
.unwrap();
let video_pay = gst::ElementFactory::make("rtpvp8pay2")
.property("pt", VIDEO_PT as u32)
.property_from_str("picture-id-mode", "7-bit")
.build()
.unwrap();
let elems = [&video_src, &video_enc, &video_pay];
pipeline.add_many(elems).unwrap();
gst::Element::link_many(elems).unwrap();
add_rtp_send(
&pipeline,
&rtpsend,
&rtprecv,
&video_pay,
SessionParameters {
session_id: 0,
pt: VIDEO_PT,
rtp_recv_port: 5004,
rtcp_recv_port: 5005,
rtcp_send_port: 5007,
},
);
}
if with_audio {
println!("Adding audio stream...");
let audio_src = gst::ElementFactory::make("audiotestsrc")
.property("is-live", true)
.property("volume", 0.03f64)
.build()
.unwrap();
let audio_enc = gst::ElementFactory::make("opusenc").build().unwrap();
let audio_pay = gst::ElementFactory::make("rtpopuspay2")
.property("pt", AUDIO_PT as u32)
.build()
.unwrap();
let elems = [&audio_src, &audio_enc, &audio_pay];
pipeline.add_many(elems).unwrap();
gst::Element::link_many(elems).unwrap();
add_rtp_send(
&pipeline,
&rtpsend,
&rtprecv,
&audio_pay,
SessionParameters {
session_id: 1,
pt: AUDIO_PT,
rtp_recv_port: 5008,
rtcp_recv_port: 5009,
rtcp_send_port: 5011,
},
);
}
pipeline.set_state(gst::State::Playing).unwrap();
println!("Playing");
let ctrl_c = tokio::signal::ctrl_c().fuse();
pin_mut!(ctrl_c);
let mut bus_send_stream = pipeline.bus().unwrap().stream();
loop {
futures::select_biased! {
_ = ctrl_c => {
println!("\nShutting down due to user request");
break;
}
msg_send = bus_send_stream.next() => {
use gst::MessageView::*;
let Some(msg) = msg_send else { continue };
match msg.view() {
Latency(_) => {
let _ = pipeline.recalculate_latency();
}
Error(err) => {
eprintln!("send pipeline: {err:?}");
break;
}
_ => (),
}
}
};
}
pipeline.debug_to_dot_file(gst::DebugGraphDetails::all(), "rtpbin2-send-stopping");
pipeline.set_state(gst::State::Null).unwrap();
// This is needed by some tracers to write their log file
unsafe {
gst::deinit();
}
process::ExitCode::SUCCESS
}