diff --git a/webrtc/sendrecv/gst-rust/src/main.rs b/webrtc/sendrecv/gst-rust/src/main.rs index 560900c956..437335aacb 100644 --- a/webrtc/sendrecv/gst-rust/src/main.rs +++ b/webrtc/sendrecv/gst-rust/src/main.rs @@ -81,7 +81,8 @@ struct AppWeak(Weak); // Actual application state #[derive(Debug)] struct AppInner { - peer_id: String, + // None if we wait for a peer to appear + peer_id: Option, pipeline: gst::Pipeline, webrtcbin: gst::Element, send_msg_tx: Mutex>, @@ -202,6 +203,46 @@ impl App { self.send_sdp_offer(&offer) } + // Send our SDP answer via the WebSocket connection to the peer as JSON message + fn send_sdp_answer(&self, offer: &gst_webrtc::WebRTCSessionDescription) -> Result<(), Error> { + let message = serde_json::to_string(&JsonMsg::Sdp { + type_: "answer".to_string(), + sdp: offer.get_sdp().as_text().unwrap(), + }) + .unwrap(); + + println!("Sending SDP answer to peer: {}", message); + + self.send_text_msg(message) + } + + // Once webrtcbin has create the answer SDP for us, handle it by sending it to the peer via the + // WebSocket connection + fn on_answer_created(&self, promise: &gst::Promise) -> Result<(), Error> { + let reply = match promise.wait() { + gst::PromiseResult::Replied => promise.get_reply().unwrap(), + err => { + return Err(GStreamerError(format!( + "Offer creation future got no reponse: {:?}", + err + )) + .into()); + } + }; + + let answer = reply + .get_value("answer") + .unwrap() + .get::() + .expect("Invalid argument"); + self.0 + .webrtcbin + .emit("set-local-description", &[&answer, &None::]) + .unwrap(); + + self.send_sdp_answer(&answer) + } + // Whenever webrtcbin tells us that (re-)negotiation is needed, simply ask // for a new offer SDP from webrtcbin without any customization and then // asynchronously send it to the peer via the WebSocket connection @@ -405,29 +446,32 @@ impl App { // Finish creating our pipeline and actually start it once the connection with the peer is // there - fn start_pipeline(&self) -> Result<(), Error> { + fn setup_pipeline(&self) -> Result<(), Error> { println!("Start pipeline"); // Create our audio/video sources we send to the peer self.add_video_source()?; self.add_audio_source()?; - // Whenever (re-)negotiation is needed, do so - let app_clone = self.downgrade(); - self.0 - .webrtcbin - .connect("on-negotiation-needed", false, move |values| { - let _webrtc = values[0].get::().unwrap(); + // Whenever (re-)negotiation is needed, do so but this is only needed if + // we send the initial offer + if self.0.peer_id.is_some() { + let app_clone = self.downgrade(); + self.0 + .webrtcbin + .connect("on-negotiation-needed", false, move |values| { + let _webrtc = values[0].get::().unwrap(); - let app = upgrade_weak!(app_clone, None); + let app = upgrade_weak!(app_clone, None); - if let Err(err) = app.on_negotiation_needed() { - app.post_error(format!("Failed to start negotiation: {:?}", err).as_str()); - } + if let Err(err) = app.on_negotiation_needed() { + app.post_error(format!("Failed to start negotiation: {:?}", err).as_str()); + } - None - }) - .unwrap(); + None + }) + .unwrap(); + } // Whenever there is a new ICE candidate, send it to the peer let app_clone = self.downgrade(); @@ -458,6 +502,29 @@ impl App { } }); + Ok(()) + } + + // Send ID of the peer we want to talk to via the WebSocket connection + fn setup_call(&self, peer_id: &str) -> Result { + println!("Setting up signalling server call with {}", peer_id); + Ok(OwnedMessage::Text(format!("SESSION {}", peer_id))) + } + + // Once we got the HELLO message from the WebSocket connection, start setting up the call + fn handle_hello(&self) -> Result, Error> { + if let Some(ref peer_id) = self.0.peer_id { + self.setup_call(peer_id).map(Some) + } else { + // Wait for a peer to appear + Ok(None) + } + } + + // Once the session is set up correctly we start our pipeline + fn handle_session_ok(&self) -> Result, Error> { + self.setup_pipeline()?; + // And finally asynchronously start our pipeline let app_clone = self.downgrade(); self.0.pipeline.call_async(move |pipeline| { @@ -468,23 +535,7 @@ impl App { } }); - Ok(()) - } - - // Send ID of the peer we want to talk to via the WebSocket connection - fn setup_call(&self) -> Result { - println!("Setting up signalling server call with {}", self.0.peer_id); - Ok(OwnedMessage::Text(format!("SESSION {}", &self.0.peer_id))) - } - - // Once we got the HELLO message from the WebSocket connection, start setting up the call - fn handle_hello(&self) -> Result, Error> { - self.setup_call().map(Some) - } - - // Once the session is set up correctly we start our pipeline - fn handle_session_ok(&self) -> Result, Error> { - self.start_pipeline().map(|_| None) + Ok(None) } // Handle errors from the peer send to us via the WebSocket connection @@ -495,24 +546,71 @@ impl App { } // Handle incoming SDP answers from the peer - // - // TODO: We currently only handle answers, not offers fn handle_sdp(&self, type_: &str, sdp: &str) -> Result, Error> { - if type_ != "answer" { - return Err(PeerError("Sdp type is not \"answer\"".into()).into()); + if type_ == "answer" { + print!("Received answer:\n{}\n", sdp); + + let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) + .map_err(|_| GStreamerError("Failed to parse SDP answer".into()))?; + let answer = + gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret); + self.0 + .webrtcbin + .emit("set-remote-description", &[&answer, &None::]) + .unwrap(); + + Ok(None) + } else if type_ == "offer" { + print!("Received offer:\n{}\n", sdp); + + // Need to start the pipeline as a first step here + self.setup_pipeline()?; + + let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) + .map_err(|_| GStreamerError("Failed to parse SDP offer".into()))?; + + // And then asynchronously start our pipeline and do the next steps. The + // pipeline needs to be started before we can create an answer + let app_clone = self.downgrade(); + self.0.pipeline.call_async(move |pipeline| { + let app = upgrade_weak!(app_clone); + + if let Err(err) = pipeline.set_state(gst::State::Playing) { + app.post_error( + format!("Failed to set pipeline to Playing: {:?}", err).as_str(), + ); + return; + } + + let offer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Offer, + ret, + ); + + app.0 + .webrtcbin + .emit("set-remote-description", &[&offer, &None::]) + .unwrap(); + + let app_clone = app.downgrade(); + let promise = gst::Promise::new_with_change_func(move |promise| { + let app = upgrade_weak!(app_clone); + + if let Err(err) = app.on_answer_created(promise) { + app.post_error(format!("Failed to send SDP answer: {:?}", err).as_str()); + } + }); + + app.0 + .webrtcbin + .emit("create-answer", &[&None::, &promise]) + .unwrap(); + }); + + Ok(None) + } else { + Err(PeerError(format!("Sdp type is not \"answer\" but \"{}\"", type_)).into()) } - - print!("Received answer:\n{}\n", sdp); - - let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()).unwrap(); - let answer = - gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret); - self.0 - .webrtcbin - .emit("set-remote-description", &[&answer, &None::]) - .unwrap(); - - Ok(None) } // Handle incoming ICE candidates from the peer by passing them to webrtcbin @@ -601,13 +699,13 @@ impl App { } } -fn parse_args() -> (String, String) { +fn parse_args() -> (String, Option) { let matches = clap::App::new("Sendrecv rust") .arg( clap::Arg::with_name("peer-id") .help("String ID of the peer to connect to") .long("peer-id") - .required(true) + .required(false) .takes_value(true), ) .arg( @@ -623,9 +721,9 @@ fn parse_args() -> (String, String) { .value_of("server") .unwrap_or("wss://webrtc.nirbheek.in:8443"); - let peer_id = matches.value_of("peer-id").unwrap(); + let peer_id = matches.value_of("peer-id"); - (server.to_string(), peer_id.to_string()) + (server.to_string(), peer_id.map(String::from)) } fn check_plugins() -> Result<(), Error> { @@ -696,7 +794,7 @@ fn main() { // Create our application control logic let (send_ws_msg_tx, send_ws_msg_rx) = mpsc::unbounded_channel::(); let app = App(Arc::new(AppInner { - peer_id: peer_id.to_string(), + peer_id, pipeline, webrtcbin, send_msg_tx: Mutex::new(send_ws_msg_tx), @@ -721,7 +819,7 @@ fn main() { // and convert them into potential messages to send out let app_clone = app.clone(); let gst_messages = send_gst_msg_rx - .map_err(|err| Error::from(err)) + .map_err(Error::from) .and_then(move |msg| app_clone.handle_pipeline_message(&msg)) .filter_map(|msg| msg); @@ -730,7 +828,7 @@ fn main() { // And here collect all the asynchronous outgoing messages that come // from other threads - let async_outgoing_messages = send_ws_msg_rx.map_err(|err| Error::from(err)); + let async_outgoing_messages = send_ws_msg_rx.map_err(Error::from); // Merge both outgoing messages streams and send them out directly sink.sink_map_err(|err| Error::from(WebSocketError(err)))