diff --git a/webrtc/sendrecv/gst-rust/src/main.rs b/webrtc/sendrecv/gst-rust/src/main.rs index 437335aacb..98b4a7a05c 100644 --- a/webrtc/sendrecv/gst-rust/src/main.rs +++ b/webrtc/sendrecv/gst-rust/src/main.rs @@ -86,6 +86,7 @@ struct AppInner { pipeline: gst::Pipeline, webrtcbin: gst::Element, send_msg_tx: Mutex>, + rtx: bool, } // Various error types for the different errors that can happen here @@ -328,6 +329,11 @@ impl App { // Whenever there's a new incoming, encoded stream from the peer create a new decodebin fn on_incoming_stream(&self, pad: &gst::Pad) -> Result<(), Error> { + // Early return for the source pads we're adding ourselves + if pad.get_direction() != gst::PadDirection::Src { + return Ok(()); + } + let decodebin = gst::ElementFactory::make("decodebin", None).unwrap(); let app_clone = self.downgrade(); decodebin.connect_pad_added(move |_decodebin, pad| { @@ -449,10 +455,6 @@ impl App { fn setup_pipeline(&self) -> Result<(), Error> { println!("Start pipeline"); - // Create our audio/video sources we send to the peer - self.add_video_source()?; - self.add_audio_source()?; - // Whenever (re-)negotiation is needed, do so but this is only needed if // we send the initial offer if self.0.peer_id.is_some() { @@ -473,6 +475,21 @@ impl App { .unwrap(); } + let app_clone = self.downgrade(); + self.0 + .webrtcbin + .connect("on-new-transceiver", false, move |values| { + let _webrtc = values[0].get::().unwrap(); + let transceiver = values[1].get::().unwrap(); + + let app = upgrade_weak!(app_clone, None); + + transceiver.set_property("do-nack", &app.0.rtx).unwrap(); + + None + }) + .unwrap(); + // Whenever there is a new ICE candidate, send it to the peer let app_clone = self.downgrade(); self.0 @@ -502,6 +519,10 @@ impl App { } }); + // Create our audio/video sources we send to the peer + self.add_video_source()?; + self.add_audio_source()?; + Ok(()) } @@ -699,7 +720,7 @@ impl App { } } -fn parse_args() -> (String, Option) { +fn parse_args() -> (String, Option, bool) { let matches = clap::App::new("Sendrecv rust") .arg( clap::Arg::with_name("peer-id") @@ -715,6 +736,12 @@ fn parse_args() -> (String, Option) { .required(false) .takes_value(true), ) + .arg( + clap::Arg::with_name("rtx") + .help("Enable retransmissions (RTX)") + .long("rtx") + .required(false), + ) .get_matches(); let server = matches @@ -723,7 +750,9 @@ fn parse_args() -> (String, Option) { let peer_id = matches.value_of("peer-id"); - (server.to_string(), peer_id.map(String::from)) + let rtx = matches.is_present("rtx"); + + (server.to_string(), peer_id.map(String::from), rtx) } fn check_plugins() -> Result<(), Error> { @@ -760,7 +789,7 @@ fn main() { return; } - let (server, peer_id) = parse_args(); + let (server, peer_id, rtx) = parse_args(); let mut runtime = tokio::runtime::Runtime::new().unwrap(); @@ -798,6 +827,7 @@ fn main() { pipeline, webrtcbin, send_msg_tx: Mutex::new(send_ws_msg_tx), + rtx, })); // Start registration process with the server. This will insert a