From 715a6bcd6934511e8c5f450222ae593d131f10f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 14 Jul 2025 16:32:19 +0300 Subject: [PATCH] rtpbin2: add send / recv examples MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Sebastian Dröge Part-of: --- Cargo.lock | 2 + net/rtp/Cargo.toml | 19 ++ net/rtp/examples/rtpbin2-recv-bundle.rs | 254 +++++++++++++++++++++++ net/rtp/examples/rtpbin2-recv.rs | 262 ++++++++++++++++++++++++ net/rtp/examples/rtpbin2-send-bundle.rs | 231 +++++++++++++++++++++ net/rtp/examples/rtpbin2-send.rs | 257 +++++++++++++++++++++++ 6 files changed, 1025 insertions(+) create mode 100644 net/rtp/examples/rtpbin2-recv-bundle.rs create mode 100644 net/rtp/examples/rtpbin2-recv.rs create mode 100644 net/rtp/examples/rtpbin2-send-bundle.rs create mode 100644 net/rtp/examples/rtpbin2-send.rs diff --git a/Cargo.lock b/Cargo.lock index 199339d36..271700293 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3263,6 +3263,7 @@ dependencies = [ "chrono", "futures", "gio", + "glib", "gst-plugin-version-helper", "gstreamer", "gstreamer-app", @@ -3273,6 +3274,7 @@ dependencies = [ "gstreamer-rtp", "gstreamer-video", "hex", + "itertools 0.14.0", "log", "rand 0.9.1", "rtcp-types", diff --git a/net/rtp/Cargo.toml b/net/rtp/Cargo.toml index 8eca6eae4..ab8edb98b 100644 --- a/net/rtp/Cargo.toml +++ b/net/rtp/Cargo.toml @@ -21,6 +21,7 @@ gst-net = { workspace = true, features = ["v1_20"] } gst-rtp = { workspace = true, features = ["v1_20"] } gst-video = { workspace = true, features = ["v1_20"] } futures = "0.3" +glib.workspace = true gio.workspace = true hex = "0.4.3" log = "0.4" @@ -38,6 +39,8 @@ tokio-util = "0.7.15" [dev-dependencies] gst-check = { workspace = true, features = ["v1_20"] } gst-app = { workspace = true, features = ["v1_20"] } +tokio = { version = "1", default-features = false, features = ["macros", "signal"] } +itertools = "0.14" [build-dependencies] gst-plugin-version-helper.workspace = true @@ -52,6 +55,22 @@ static = [] capi = [] doc = [] +[[example]] +name = "rtpbin2-send" +path = "examples/rtpbin2-send.rs" + +[[example]] +name = "rtpbin2-recv" +path = "examples/rtpbin2-recv.rs" + +[[example]] +name = "rtpbin2-send-bundle" +path = "examples/rtpbin2-send-bundle.rs" + +[[example]] +name = "rtpbin2-recv-bundle" +path = "examples/rtpbin2-recv-bundle.rs" + [package.metadata.capi] min_version = "0.9.21" diff --git a/net/rtp/examples/rtpbin2-recv-bundle.rs b/net/rtp/examples/rtpbin2-recv-bundle.rs new file mode 100644 index 000000000..392347094 --- /dev/null +++ b/net/rtp/examples/rtpbin2-recv-bundle.rs @@ -0,0 +1,254 @@ +// SPDX-License-Identifier: MPL-2.0 + +/// Receives one audio stream and one video stream bundled in a single RTP session. +/// Use `rtpbin2-send-bundle` as a receiver. +use futures::pin_mut; +use futures::prelude::*; +use gst::prelude::*; +use itertools::Itertools as _; + +const RTP_ID: &str = "example-rtp-id"; +const SESSION_ID: u32 = 0; +const VIDEO_PT: u8 = 96; +const AUDIO_PT: u8 = 97; + +const HOST: &str = "127.0.0.1"; +const RECV_PORT: u16 = 5004; +const SEND_PORT: u16 = 5005; + +const LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200); + +#[tokio::main] +async fn main() { + gst::init().unwrap(); + gstrsrtp::plugin_register_static().unwrap(); + + let pipeline = gst::Pipeline::new(); + + let rtprecv = gst::ElementFactory::make("rtprecv") + .name(format!("rtprecv-{SESSION_ID}")) + .property("rtp-id", RTP_ID) + .property("latency", LATENCY.mseconds() as u32) + .build() + .unwrap(); + pipeline.add(&rtprecv).unwrap(); + + let _recv_rtp_sink_pad = rtprecv + .request_pad_simple(&format!("rtp_sink_{SESSION_ID}")) + .unwrap(); + + let session = rtprecv.emit_by_name::("get-session", &[&SESSION_ID]); + let pt_map = gst::Structure::builder("application/x-rtp2-pt-map") + .field( + VIDEO_PT.to_string(), + gst::Caps::builder("application/x-rtp") + .field("media", "video") + .field("payload", VIDEO_PT as i32) + .field("clock-rate", 90_000i32) + .field("encoding-name", "VP8") + .build(), + ) + .field( + AUDIO_PT.to_string(), + gst::Caps::builder("application/x-rtp") + .field("media", "audio") + .field("payload", AUDIO_PT as i32) + .field("clock-rate", 48_000i32) + .field("encoding-name", "OPUS") + .build(), + ) + .build(); + session.set_property("pt-map", pt_map); + + rtprecv.connect_pad_added({ + |recv, pad| { + let pad_name = pad.name(); + if !pad_name.starts_with("rtp_src_") { + return; + } + + let (_prefix, _direction, session_id, pt, _ssrc) = + pad_name.split('_').collect_tuple().unwrap(); + let _session_id = session_id.parse::().unwrap(); + let pt = pt.parse::().unwrap(); + + println!("Adding src pad: {pad_name}"); + + let parent = recv.parent().unwrap().downcast::().unwrap(); + match pt { + VIDEO_PT => { + let depay_queue = gst::ElementFactory::make("queue") + .name("video-depay-queue") + .property("max-size-bytes", 0u32) + .property("max-size-time", 2.seconds() + 50.mseconds()) + .property("max-size-time", gst::ClockTime::ZERO) + .property("max-size-buffers", 0u32) + .property_from_str("leaky", "downstream") + .build() + .unwrap(); + let depay = gst::ElementFactory::make("rtpvp8depay2").build().unwrap(); + let dec = gst::ElementFactory::make("vp8dec").build().unwrap(); + let conv = gst::ElementFactory::make("videoconvert").build().unwrap(); + let sink_queue = gst::ElementFactory::make("queue") + .name("video-sink-queue") + .property("max-size-bytes", 0u32) + .property("max-size-time", gst::ClockTime::ZERO) + .property("max-size-buffers", 1u32) + .build() + .unwrap(); + let sink = gst::ElementFactory::make("autovideosink").build().unwrap(); + let elems = [&depay_queue, &depay, &dec, &conv, &sink_queue, &sink]; + parent.add_many(elems).unwrap(); + pad.link(&depay_queue.static_pad("sink").unwrap()).unwrap(); + gst::Element::link_many(elems).unwrap(); + + sink.sync_state_with_parent().unwrap(); + sink_queue.sync_state_with_parent().unwrap(); + conv.sync_state_with_parent().unwrap(); + dec.sync_state_with_parent().unwrap(); + depay.sync_state_with_parent().unwrap(); + depay_queue.sync_state_with_parent().unwrap(); + } + AUDIO_PT => { + let depay_queue = gst::ElementFactory::make("queue") + .name("audio-depay-queue") + .property("max-size-bytes", 0u32) + // This queue needs to be big enough for holding enough + // audio until a keyframe is decoded on the video branch + // or otherwise the pipeline might get stuck prerolling. + // + // Make it twice the keyframe interval for safety. + // + // The alternative would be to use a leaky queue or to use + // async=false on the sink, or a combination of that. + .property("max-size-time", 2.seconds() + 50.mseconds()) + .property("max-size-buffers", 0u32) + .property_from_str("leaky", "downstream") + .build() + .unwrap(); + let depay = gst::ElementFactory::make("rtpopusdepay2").build().unwrap(); + let dec = gst::ElementFactory::make("opusdec").build().unwrap(); + let conv = gst::ElementFactory::make("audioconvert").build().unwrap(); + let sink_queue = gst::ElementFactory::make("queue") + .name("audio-sink-queue") + .property("max-size-bytes", 0u32) + .property("max-size-time", gst::ClockTime::ZERO) + .property("max-size-buffers", 1u32) + .build() + .unwrap(); + let sink = gst::ElementFactory::make("autoaudiosink").build().unwrap(); + + let elems = [&depay_queue, &depay, &dec, &conv, &sink_queue, &sink]; + parent.add_many(elems).unwrap(); + pad.link(&depay_queue.static_pad("sink").unwrap()).unwrap(); + gst::Element::link_many(elems).unwrap(); + + sink.sync_state_with_parent().unwrap(); + sink_queue.sync_state_with_parent().unwrap(); + conv.sync_state_with_parent().unwrap(); + dec.sync_state_with_parent().unwrap(); + depay.sync_state_with_parent().unwrap(); + depay_queue.sync_state_with_parent().unwrap(); + } + other => eprintln!("Unexpected PT {other:?} in pad name {pad_name}"), + } + } + }); + + // RTP / RTCP from peer + let src = gst::ElementFactory::make("udpsrc") + .name(format!("udpsrc-{SESSION_ID}")) + .property("port", RECV_PORT as i32) + .property("caps", gst::Caps::new_empty_simple("application/x-rtp")) + .build() + .unwrap(); + pipeline.add(&src).unwrap(); + let queue_rtp = gst::ElementFactory::make("queue") + .name(format!("queue-rtp-{SESSION_ID}")) + .property("max-size-bytes", 0u32) + .property("max-size-time", LATENCY + 50.mseconds()) + .property("max-size-buffers", 0u32) + .build() + .unwrap(); + pipeline.add(&queue_rtp).unwrap(); + src.link(&queue_rtp).unwrap(); + queue_rtp + .link_pads( + Some("src"), + &rtprecv, + Some(&format!("rtp_sink_{SESSION_ID}")), + ) + .unwrap(); + + // RTCP to peer + let rtpsend = gst::ElementFactory::make("rtpsend") + .name("send") + .property("rtp-id", RTP_ID) + .build() + .unwrap(); + pipeline.add(&rtpsend).unwrap(); + + let sink = gst::ElementFactory::make("udpsink") + .name(format!("udpsink-{SESSION_ID}")) + // don't wait for the first RTCP packet for preroll + .property("async", false) + .property("port", SEND_PORT as i32) + .property("host", HOST) + .build() + .unwrap(); + pipeline.add(&sink).unwrap(); + rtpsend + .link_pads(Some(&format!("rtcp_src_{SESSION_ID}")), &sink, Some("sink")) + .unwrap(); + + // Share the same socket in source and sink + src.set_state(gst::State::Ready).unwrap(); + let socket = src.property::("used-socket"); + sink.set_property("socket", socket); + + pipeline.set_state(gst::State::Playing).unwrap(); + println!("Playing"); + + let ctrl_c = tokio::signal::ctrl_c().fuse(); + pin_mut!(ctrl_c); + + let mut bus_recv_stream = pipeline.bus().unwrap().stream(); + + loop { + futures::select_biased! { + _ = ctrl_c => { + println!("\nShutting down due to user request"); + break; + } + msg_recv = bus_recv_stream.next() => { + use gst::MessageView::*; + let Some(msg) = msg_recv else { continue }; + match msg.view() { + Latency(_) => { + let _ = pipeline.recalculate_latency(); + } + Eos(_) => { + println!("Got EoS"); + break; + } + Error(err) => { + eprintln!("recv pipeline: {err:?}"); + break; + } + _ => (), + } + } + }; + } + + pipeline.debug_to_dot_file( + gst::DebugGraphDetails::all(), + "rtpbin2-recv-bundle-stopping", + ); + pipeline.set_state(gst::State::Null).unwrap(); + + // This is needed by some tracers to write their log file + unsafe { + gst::deinit(); + } +} diff --git a/net/rtp/examples/rtpbin2-recv.rs b/net/rtp/examples/rtpbin2-recv.rs new file mode 100644 index 000000000..c12a67756 --- /dev/null +++ b/net/rtp/examples/rtpbin2-recv.rs @@ -0,0 +1,262 @@ +// SPDX-License-Identifier: MPL-2.0 + +/// Receives one audio stream and one video stream from two separate RTP sessions. +/// Use `rtpbin2-send` as a sender. +use futures::pin_mut; +use futures::prelude::*; +use gst::prelude::*; +use itertools::Itertools as _; + +const RTP_ID: &str = "example-rtp-id"; +const HOST: &str = "127.0.0.1"; +const VIDEO_PT: u8 = 96; +const AUDIO_PT: u8 = 97; + +const LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(200); + +struct SessionParameters { + session_id: u32, + pt: u8, + caps: gst::Caps, + rtp_recv_port: u16, + rtcp_recv_port: u16, + rtcp_send_port: u16, + #[allow(clippy::type_complexity)] + on_pad_added: Box, +} + +fn add_rtp_recv( + pipeline: &gst::Pipeline, + rtprecv: &gst::Element, + rtpsend: &gst::Element, + params: &SessionParameters, +) { + let _recv_rtp_sink_pad = rtprecv + .request_pad_simple(&format!("rtp_sink_{}", params.session_id)) + .unwrap(); + + // RTP / RTCP from peer + let rtp_src = gst::ElementFactory::make("udpsrc") + .name(format!("udpsrc-rtp-{}-{}", params.session_id, params.pt)) + .property("port", params.rtp_recv_port as i32) + .property("caps", ¶ms.caps) + .build() + .unwrap(); + pipeline.add(&rtp_src).unwrap(); + let queue_rtp = gst::ElementFactory::make("queue") + .name(format!("queue-rtp-{}-{}", params.session_id, params.pt)) + .property("max-size-bytes", 0u32) + .property("max-size-time", LATENCY + 50.mseconds()) + .property("max-size-buffers", 0u32) + .build() + .unwrap(); + pipeline.add(&queue_rtp).unwrap(); + rtp_src.link(&queue_rtp).unwrap(); + queue_rtp + .link_pads( + Some("src"), + rtprecv, + Some(&format!("rtp_sink_{}", params.session_id)), + ) + .unwrap(); + + let rtcp_src = gst::ElementFactory::make("udpsrc") + .name(format!("udpsrc-rtcp-{}-{}", params.session_id, params.pt)) + .property("port", params.rtcp_recv_port as i32) + .property("caps", gst::Caps::new_empty_simple("application/x-rtcp")) + .build() + .unwrap(); + pipeline.add(&rtcp_src).unwrap(); + rtcp_src + .link_pads( + Some("src"), + rtprecv, + Some(&format!("rtcp_sink_{}", params.session_id)), + ) + .unwrap(); + + // RTCP to peer + let rtcp_sink = gst::ElementFactory::make("udpsink") + .name(format!("udpsink-rtcp-{}-{}", params.session_id, params.pt)) + // don't wait for the first RTCP packet for preroll + .property("async", false) + .property("port", params.rtcp_send_port as i32) + .property("host", HOST) + .build() + .unwrap(); + pipeline.add(&rtcp_sink).unwrap(); + rtpsend + .link_pads( + Some(&format!("rtcp_src_{}", params.session_id)), + &rtcp_sink, + Some("sink"), + ) + .unwrap(); +} + +#[tokio::main] +async fn main() { + gst::init().unwrap(); + gstrsrtp::plugin_register_static().unwrap(); + + let pipeline = gst::Pipeline::new(); + + let rtprecv = gst::ElementFactory::make("rtprecv") + .property("rtp-id", RTP_ID) + .property("latency", LATENCY.mseconds() as u32) + .build() + .unwrap(); + pipeline.add(&rtprecv).unwrap(); + + let rtpsend = gst::ElementFactory::make("rtpsend") + .property("rtp-id", RTP_ID) + .build() + .unwrap(); + pipeline.add(&rtpsend).unwrap(); + + let sessions = [ + SessionParameters { + session_id: 0, + pt: VIDEO_PT, + caps: gst::Caps::builder("application/x-rtp") + .field("media", "video") + .field("payload", VIDEO_PT as i32) + .field("clock-rate", 90_000i32) + .field("encoding-name", "VP8") + .build(), + rtp_recv_port: 5004, + rtcp_recv_port: 5005, + rtcp_send_port: 5007, + on_pad_added: Box::new(|pipeline, pad| { + let depay = gst::ElementFactory::make("rtpvp8depay2").build().unwrap(); + let dec = gst::ElementFactory::make("vp8dec").build().unwrap(); + let conv = gst::ElementFactory::make("videoconvert").build().unwrap(); + let sink_queue = gst::ElementFactory::make("queue") + .name("video-sink-queue") + .property("max-size-bytes", 0u32) + .property("max-size-time", gst::ClockTime::ZERO) + .property("max-size-buffers", 1u32) + .build() + .unwrap(); + let sink = gst::ElementFactory::make("autovideosink").build().unwrap(); + + let elems = [&depay, &dec, &conv, &sink_queue, &sink]; + pipeline.add_many(elems).unwrap(); + pad.link(&depay.static_pad("sink").unwrap()).unwrap(); + gst::Element::link_many(elems).unwrap(); + + sink.sync_state_with_parent().unwrap(); + sink_queue.sync_state_with_parent().unwrap(); + conv.sync_state_with_parent().unwrap(); + dec.sync_state_with_parent().unwrap(); + depay.sync_state_with_parent().unwrap(); + }), + }, + SessionParameters { + session_id: 1, + pt: AUDIO_PT, + caps: gst::Caps::builder("application/x-rtp") + .field("media", "audio") + .field("payload", AUDIO_PT as i32) + .field("clock-rate", 48_000i32) + .field("encoding-name", "OPUS") + .build(), + rtp_recv_port: 5008, + rtcp_recv_port: 5009, + rtcp_send_port: 5011, + on_pad_added: Box::new(|pipeline, pad| { + let depay = gst::ElementFactory::make("rtpopusdepay2").build().unwrap(); + let dec = gst::ElementFactory::make("opusdec").build().unwrap(); + let conv = gst::ElementFactory::make("audioconvert").build().unwrap(); + let sink_queue = gst::ElementFactory::make("queue") + .name("audio-sink-queue") + .property("max-size-bytes", 0u32) + .property("max-size-time", gst::ClockTime::ZERO) + .property("max-size-buffers", 1u32) + .build() + .unwrap(); + let sink = gst::ElementFactory::make("autoaudiosink").build().unwrap(); + + let elems = [&depay, &dec, &conv, &sink_queue, &sink]; + pipeline.add_many(elems).unwrap(); + pad.link(&depay.static_pad("sink").unwrap()).unwrap(); + gst::Element::link_many(elems).unwrap(); + + sink.sync_state_with_parent().unwrap(); + sink_queue.sync_state_with_parent().unwrap(); + conv.sync_state_with_parent().unwrap(); + dec.sync_state_with_parent().unwrap(); + depay.sync_state_with_parent().unwrap(); + }), + }, + ]; + + add_rtp_recv(&pipeline, &rtprecv, &rtpsend, &sessions[0]); + add_rtp_recv(&pipeline, &rtprecv, &rtpsend, &sessions[1]); + + rtprecv.connect_pad_added(move |recv, pad| { + let pad_name = pad.name(); + if !pad_name.starts_with("rtp_src_") { + return; + } + + let (_prefix, _direction, session_id, pt, ssrc) = + pad_name.split('_').collect_tuple().unwrap(); + let session_id = session_id.parse::().unwrap(); + let pt = pt.parse::().unwrap(); + + let Some(session) = sessions + .iter() + .find(|session| session.session_id == session_id && session.pt == pt) + else { + eprintln!("Unknown RTP stream with session ID {session_id}, PT {pt} and SSRC {ssrc}"); + return; + }; + + let parent = recv.parent().unwrap(); + (session.on_pad_added)(parent.downcast_ref::().unwrap(), pad); + }); + + pipeline.set_state(gst::State::Playing).unwrap(); + println!("Playing"); + + let ctrl_c = tokio::signal::ctrl_c().fuse(); + pin_mut!(ctrl_c); + + let mut bus_recv_stream = pipeline.bus().unwrap().stream(); + + loop { + futures::select_biased! { + _ = ctrl_c => { + println!("\nShutting down due to user request"); + break; + } + msg_recv = bus_recv_stream.next() => { + use gst::MessageView::*; + let Some(msg) = msg_recv else { continue }; + match msg.view() { + Latency(_) => { + let _ = pipeline.recalculate_latency(); + } + Eos(_) => { + println!("Got EoS"); + break; + } + Error(err) => { + eprintln!("recv pipeline: {err:?}"); + break; + } + _ => (), + } + } + }; + } + + pipeline.debug_to_dot_file(gst::DebugGraphDetails::all(), "rtpbin2-recv-stopping"); + pipeline.set_state(gst::State::Null).unwrap(); + + // This is needed by some tracers to write their log file + unsafe { + gst::deinit(); + } +} diff --git a/net/rtp/examples/rtpbin2-send-bundle.rs b/net/rtp/examples/rtpbin2-send-bundle.rs new file mode 100644 index 000000000..48c8bc9f5 --- /dev/null +++ b/net/rtp/examples/rtpbin2-send-bundle.rs @@ -0,0 +1,231 @@ +// SPDX-License-Identifier: MPL-2.0 + +/// Sends one audio stream and one video stream bundled in a single RTP session. +/// Use `rtpbin2-recv-bundle` as a receiver. +use futures::pin_mut; +use futures::prelude::*; +use gst::prelude::*; + +const RTP_ID: &str = "example-rtp-id"; +const SESSION_ID: u32 = 1234; +const VIDEO_PT: u8 = 96; +const AUDIO_PT: u8 = 97; + +const HOST: &str = "127.0.0.1"; +const SEND_PORT: u16 = 5004; +const RECV_PORT: u16 = 5005; + +#[tokio::main] +async fn main() { + gst::init().unwrap(); + gstrsrtp::plugin_register_static().unwrap(); + + let pipeline = gst::Pipeline::new(); + + let rtpsend = gst::ElementFactory::make("rtpsend") + .name(format!("rtpsend-{SESSION_ID}")) + .property("rtp-id", RTP_ID) + .build() + .unwrap(); + pipeline.add(&rtpsend).unwrap(); + + let funnel = gst::ElementFactory::make("rtpfunnel") + .name(format!("rtpfunnel-{SESSION_ID}")) + .build() + .unwrap(); + pipeline.add(&funnel).unwrap(); + funnel + .link_pads( + Some("src"), + &rtpsend, + Some(&format!("rtp_sink_{SESSION_ID}")), + ) + .unwrap(); + + let session = rtpsend.emit_by_name::("get-session", &[&SESSION_ID]); + let pt_map = gst::Structure::builder("application/x-rtp2-pt-map") + .field( + VIDEO_PT.to_string(), + gst::Caps::builder("application/x-rtp") + .field("media", "video") + .field("payload", VIDEO_PT as i32) + .field("clock-rate", 90_000i32) + .field("encoding-name", "VP8") + .build(), + ) + .field( + AUDIO_PT.to_string(), + gst::Caps::builder("application/x-rtp") + .field("media", "audio") + .field("payload", AUDIO_PT as i32) + .field("clock-rate", 48_000i32) + .field("encoding-name", "OPUS") + .build(), + ) + .build(); + session.set_property("pt-map", pt_map); + + let video_src = gst::ElementFactory::make("videotestsrc") + .property("is-live", true) + .build() + .unwrap(); + let video_enc = gst::ElementFactory::make("vp8enc") + .property("deadline", 1i64) + // 1s keyframe interval (30 frames at 30fps) + .property("keyframe-max-dist", 30i32) + .build() + .unwrap(); + let video_pay = gst::ElementFactory::make("rtpvp8pay2") + .property("pt", VIDEO_PT as u32) + .property_from_str("picture-id-mode", "7-bit") + .build() + .unwrap(); + let video_sync = gst::ElementFactory::make("clocksync") + .name("video-sync") + .build() + .unwrap(); + let elems = [&video_src, &video_enc, &video_pay, &video_sync]; + pipeline.add_many(elems).unwrap(); + gst::Element::link_many(elems).unwrap(); + video_sync + .link_pads(Some("src"), &funnel, Some("sink_%u")) + .unwrap(); + + let audio_src = gst::ElementFactory::make("audiotestsrc") + .property("is-live", true) + .property("volume", 0.03f64) + .build() + .unwrap(); + let audio_enc = gst::ElementFactory::make("opusenc").build().unwrap(); + let audio_pay = gst::ElementFactory::make("rtpopuspay2") + .property("pt", AUDIO_PT as u32) + .build() + .unwrap(); + let audio_sync = gst::ElementFactory::make("clocksync") + .name("audio-sync") + .build() + .unwrap(); + let elems = [&audio_src, &audio_enc, &audio_pay, &audio_sync]; + pipeline.add_many(elems).unwrap(); + gst::Element::link_many(elems).unwrap(); + audio_sync + .link_pads(Some("src"), &funnel, Some("sink_%u")) + .unwrap(); + + // RTP / RTCP to peer + let funnel = gst::ElementFactory::make("funnel") + .name(format!("funnel-{SESSION_ID}")) + .build() + .unwrap(); + pipeline.add(&funnel).unwrap(); + + let sink = gst::ElementFactory::make("udpsink") + .name(format!("udpsink-{SESSION_ID}")) + // sync of the RTP packets is handled before the rtpfunnel + .property("sync", false) + .property("port", SEND_PORT as i32) + .property("host", HOST) + .build() + .unwrap(); + pipeline.add(&sink).unwrap(); + + funnel.link(&sink).unwrap(); + + let rtp_queue = gst::ElementFactory::make("queue") + .name(format!("queue-rtp-{SESSION_ID}")) + .property("max-size-bytes", 0u32) + .property("max-size-time", 100.mseconds()) + .property("max-size-buffers", 0u32) + .build() + .unwrap(); + pipeline.add(&rtp_queue).unwrap(); + + rtpsend + .link_pads( + Some(&format!("rtp_src_{SESSION_ID}")), + &rtp_queue, + Some("sink"), + ) + .unwrap(); + + rtp_queue + .link_pads(Some("src"), &funnel, Some("sink_0")) + .unwrap(); + + rtpsend + .link_pads( + Some(&format!("rtcp_src_{SESSION_ID}")), + &funnel, + Some("sink_1"), + ) + .unwrap(); + + // RTCP from peer + let rtprecv = gst::ElementFactory::make("rtprecv") + .name(format!("rtprecv-{SESSION_ID}")) + .property("rtp-id", RTP_ID) + .build() + .unwrap(); + pipeline.add(&rtprecv).unwrap(); + + let src = gst::ElementFactory::make("udpsrc") + .name(format!("udpsrc-{SESSION_ID}")) + .property("port", RECV_PORT as i32) + .property("caps", gst::Caps::new_empty_simple("application/x-rtcp")) + .build() + .unwrap(); + pipeline.add(&src).unwrap(); + src.link_pads( + Some("src"), + &rtprecv, + Some(&format!("rtcp_sink_{SESSION_ID}")), + ) + .unwrap(); + + // Share the same socket in source and sink + src.set_state(gst::State::Ready).unwrap(); + let socket = src.property::("used-socket"); + sink.set_property("socket", socket); + + pipeline.set_state(gst::State::Playing).unwrap(); + println!("Playing"); + + let ctrl_c = tokio::signal::ctrl_c().fuse(); + pin_mut!(ctrl_c); + + let mut bus_send_stream = pipeline.bus().unwrap().stream(); + + loop { + futures::select_biased! { + _ = ctrl_c => { + println!("\nShutting down due to user request"); + break; + } + msg_send = bus_send_stream.next() => { + use gst::MessageView::*; + let Some(msg) = msg_send else { continue }; + match msg.view() { + Latency(_) => { + let _ = pipeline.recalculate_latency(); + } + Error(err) => { + eprintln!("send pipeline: {err:?}"); + break; + } + _ => (), + } + } + }; + } + + pipeline.debug_to_dot_file( + gst::DebugGraphDetails::all(), + "rtpbin2-send-bundle-stopping", + ); + pipeline.set_state(gst::State::Null).unwrap(); + + // This is needed by some tracers to write their log file + unsafe { + gst::deinit(); + } +} diff --git a/net/rtp/examples/rtpbin2-send.rs b/net/rtp/examples/rtpbin2-send.rs new file mode 100644 index 000000000..73cd0055f --- /dev/null +++ b/net/rtp/examples/rtpbin2-send.rs @@ -0,0 +1,257 @@ +// SPDX-License-Identifier: MPL-2.0 + +/// Sends one audio stream and one video stream in two separate RTP sessions. +/// Use `rtpbin2-recv` as a receiver. +use futures::pin_mut; +use futures::prelude::*; +use gst::prelude::*; + +use std::{env, process}; + +const USAGE: &str = r#"Usage: rtpbin2-send [media-arg] + +where 'media-arg' is either: + +* -a or --audio-only: only stream audio +* -v or --video-only: only stream video +* by default, stream audio and video. +"#; + +const RTP_ID: &str = "example-rtp-id"; +const HOST: &str = "127.0.0.1"; +const VIDEO_PT: u8 = 96; +const AUDIO_PT: u8 = 97; + +#[derive(Debug)] +struct SessionParameters { + session_id: u32, + pt: u8, + rtp_recv_port: u16, + rtcp_recv_port: u16, + rtcp_send_port: u16, +} + +fn add_rtp_send( + pipeline: &gst::Pipeline, + rtpsend: &gst::Element, + rtprecv: &gst::Element, + src: &gst::Element, + params: SessionParameters, +) { + let sync = gst::ElementFactory::make("clocksync") + .name(format!("clocksync-{}-{}", params.session_id, params.pt)) + .build() + .unwrap(); + pipeline.add(&sync).unwrap(); + src.link(&sync).unwrap(); + + sync.link_pads( + Some("src"), + rtpsend, + Some(&format!("rtp_sink_{}", params.session_id)), + ) + .unwrap(); + + // RTP / RTCP to peer + let rtp_queue = gst::ElementFactory::make("queue") + .name(format!("queue-rtp-{}-{}", params.session_id, params.pt)) + .property("max-size-bytes", 0u32) + .property("max-size-time", 100.mseconds()) + .property("max-size-buffers", 0u32) + .build() + .unwrap(); + pipeline.add(&rtp_queue).unwrap(); + + let rtp_sink = gst::ElementFactory::make("udpsink") + .name(format!("udpsink-rtp-{}-{}", params.session_id, params.pt)) + // sync of the RTP packets is handled before rtpsend + .property("sync", false) + .property("port", params.rtp_recv_port as i32) + .property("host", HOST) + .build() + .unwrap(); + pipeline.add(&rtp_sink).unwrap(); + rtpsend + .link_pads( + Some(&format!("rtp_src_{}", params.session_id)), + &rtp_queue, + Some("sink"), + ) + .unwrap(); + rtp_queue.link(&rtp_sink).unwrap(); + + let rtcp_sink = gst::ElementFactory::make("udpsink") + .name(format!("udpsink-rtcp-{}-{}", params.session_id, params.pt)) + // don't wait for the first RTCP packet for preroll + .property("async", false) + .property("port", params.rtcp_recv_port as i32) + .property("host", HOST) + .build() + .unwrap(); + pipeline.add(&rtcp_sink).unwrap(); + rtpsend + .link_pads( + Some(&format!("rtcp_src_{}", params.session_id)), + &rtcp_sink, + Some("sink"), + ) + .unwrap(); + + // RTCP from peer + let rtcp_src = gst::ElementFactory::make("udpsrc") + .name(format!("udpsrc-rtcp-{}-{}", params.session_id, params.pt)) + .property("port", params.rtcp_send_port as i32) + .property("caps", gst::Caps::new_empty_simple("application/x-rtcp")) + .build() + .unwrap(); + pipeline.add(&rtcp_src).unwrap(); + rtcp_src + .link_pads( + Some("src"), + rtprecv, + Some(&format!("rtcp_sink_{}", params.session_id)), + ) + .unwrap(); +} + +#[tokio::main] +async fn main() -> process::ExitCode { + let mut with_audio = true; + let mut with_video = true; + + if let Some(media_arg) = env::args().nth(1) { + match media_arg.as_str() { + "--audio-only" | "-a" => with_video = false, + "--video-only" | "-v" => with_audio = false, + _ => { + println!("{USAGE}"); + return process::ExitCode::FAILURE; + } + } + } + + gst::init().unwrap(); + gstrsrtp::plugin_register_static().unwrap(); + + let pipeline = gst::Pipeline::new(); + + let rtpsend = gst::ElementFactory::make("rtpsend") + .property("rtp-id", RTP_ID) + .build() + .unwrap(); + pipeline.add(&rtpsend).unwrap(); + + let rtprecv = gst::ElementFactory::make("rtprecv") + .property("rtp-id", RTP_ID) + .build() + .unwrap(); + pipeline.add(&rtprecv).unwrap(); + + if with_video { + println!("Adding video stream..."); + + let video_src = gst::ElementFactory::make("videotestsrc") + .property("is-live", true) + .build() + .unwrap(); + let video_enc = gst::ElementFactory::make("vp8enc") + .property("deadline", 1i64) + .build() + .unwrap(); + let video_pay = gst::ElementFactory::make("rtpvp8pay2") + .property("pt", VIDEO_PT as u32) + .property_from_str("picture-id-mode", "7-bit") + .build() + .unwrap(); + let elems = [&video_src, &video_enc, &video_pay]; + pipeline.add_many(elems).unwrap(); + gst::Element::link_many(elems).unwrap(); + + add_rtp_send( + &pipeline, + &rtpsend, + &rtprecv, + &video_pay, + SessionParameters { + session_id: 0, + pt: VIDEO_PT, + rtp_recv_port: 5004, + rtcp_recv_port: 5005, + rtcp_send_port: 5007, + }, + ); + } + + if with_audio { + println!("Adding audio stream..."); + + let audio_src = gst::ElementFactory::make("audiotestsrc") + .property("is-live", true) + .property("volume", 0.03f64) + .build() + .unwrap(); + let audio_enc = gst::ElementFactory::make("opusenc").build().unwrap(); + let audio_pay = gst::ElementFactory::make("rtpopuspay2") + .property("pt", AUDIO_PT as u32) + .build() + .unwrap(); + let elems = [&audio_src, &audio_enc, &audio_pay]; + pipeline.add_many(elems).unwrap(); + gst::Element::link_many(elems).unwrap(); + + add_rtp_send( + &pipeline, + &rtpsend, + &rtprecv, + &audio_pay, + SessionParameters { + session_id: 1, + pt: AUDIO_PT, + rtp_recv_port: 5008, + rtcp_recv_port: 5009, + rtcp_send_port: 5011, + }, + ); + } + + pipeline.set_state(gst::State::Playing).unwrap(); + println!("Playing"); + + let ctrl_c = tokio::signal::ctrl_c().fuse(); + pin_mut!(ctrl_c); + + let mut bus_send_stream = pipeline.bus().unwrap().stream(); + + loop { + futures::select_biased! { + _ = ctrl_c => { + println!("\nShutting down due to user request"); + break; + } + msg_send = bus_send_stream.next() => { + use gst::MessageView::*; + let Some(msg) = msg_send else { continue }; + match msg.view() { + Latency(_) => { + let _ = pipeline.recalculate_latency(); + } + Error(err) => { + eprintln!("send pipeline: {err:?}"); + break; + } + _ => (), + } + } + }; + } + + pipeline.debug_to_dot_file(gst::DebugGraphDetails::all(), "rtpbin2-send-stopping"); + pipeline.set_state(gst::State::Null).unwrap(); + + // This is needed by some tracers to write their log file + unsafe { + gst::deinit(); + } + + process::ExitCode::SUCCESS +}