Add support for creating the offer in the Rust sendrecv client

This commit is contained in:
Sebastian Dröge 2019-06-27 14:35:47 +03:00
parent d393063868
commit d74e2ac147

View file

@ -81,7 +81,8 @@ struct AppWeak(Weak<AppInner>);
// Actual application state // Actual application state
#[derive(Debug)] #[derive(Debug)]
struct AppInner { struct AppInner {
peer_id: String, // None if we wait for a peer to appear
peer_id: Option<String>,
pipeline: gst::Pipeline, pipeline: gst::Pipeline,
webrtcbin: gst::Element, webrtcbin: gst::Element,
send_msg_tx: Mutex<mpsc::UnboundedSender<OwnedMessage>>, send_msg_tx: Mutex<mpsc::UnboundedSender<OwnedMessage>>,
@ -202,6 +203,46 @@ impl App {
self.send_sdp_offer(&offer) self.send_sdp_offer(&offer)
} }
// Send our SDP answer via the WebSocket connection to the peer as JSON message
fn send_sdp_answer(&self, offer: &gst_webrtc::WebRTCSessionDescription) -> Result<(), Error> {
let message = serde_json::to_string(&JsonMsg::Sdp {
type_: "answer".to_string(),
sdp: offer.get_sdp().as_text().unwrap(),
})
.unwrap();
println!("Sending SDP answer to peer: {}", message);
self.send_text_msg(message)
}
// Once webrtcbin has create the answer SDP for us, handle it by sending it to the peer via the
// WebSocket connection
fn on_answer_created(&self, promise: &gst::Promise) -> Result<(), Error> {
let reply = match promise.wait() {
gst::PromiseResult::Replied => promise.get_reply().unwrap(),
err => {
return Err(GStreamerError(format!(
"Offer creation future got no reponse: {:?}",
err
))
.into());
}
};
let answer = reply
.get_value("answer")
.unwrap()
.get::<gst_webrtc::WebRTCSessionDescription>()
.expect("Invalid argument");
self.0
.webrtcbin
.emit("set-local-description", &[&answer, &None::<gst::Promise>])
.unwrap();
self.send_sdp_answer(&answer)
}
// Whenever webrtcbin tells us that (re-)negotiation is needed, simply ask // Whenever webrtcbin tells us that (re-)negotiation is needed, simply ask
// for a new offer SDP from webrtcbin without any customization and then // for a new offer SDP from webrtcbin without any customization and then
// asynchronously send it to the peer via the WebSocket connection // asynchronously send it to the peer via the WebSocket connection
@ -405,29 +446,32 @@ impl App {
// Finish creating our pipeline and actually start it once the connection with the peer is // Finish creating our pipeline and actually start it once the connection with the peer is
// there // there
fn start_pipeline(&self) -> Result<(), Error> { fn setup_pipeline(&self) -> Result<(), Error> {
println!("Start pipeline"); println!("Start pipeline");
// Create our audio/video sources we send to the peer // Create our audio/video sources we send to the peer
self.add_video_source()?; self.add_video_source()?;
self.add_audio_source()?; self.add_audio_source()?;
// Whenever (re-)negotiation is needed, do so // Whenever (re-)negotiation is needed, do so but this is only needed if
let app_clone = self.downgrade(); // we send the initial offer
self.0 if self.0.peer_id.is_some() {
.webrtcbin let app_clone = self.downgrade();
.connect("on-negotiation-needed", false, move |values| { self.0
let _webrtc = values[0].get::<gst::Element>().unwrap(); .webrtcbin
.connect("on-negotiation-needed", false, move |values| {
let _webrtc = values[0].get::<gst::Element>().unwrap();
let app = upgrade_weak!(app_clone, None); let app = upgrade_weak!(app_clone, None);
if let Err(err) = app.on_negotiation_needed() { if let Err(err) = app.on_negotiation_needed() {
app.post_error(format!("Failed to start negotiation: {:?}", err).as_str()); app.post_error(format!("Failed to start negotiation: {:?}", err).as_str());
} }
None None
}) })
.unwrap(); .unwrap();
}
// Whenever there is a new ICE candidate, send it to the peer // Whenever there is a new ICE candidate, send it to the peer
let app_clone = self.downgrade(); let app_clone = self.downgrade();
@ -458,6 +502,29 @@ impl App {
} }
}); });
Ok(())
}
// Send ID of the peer we want to talk to via the WebSocket connection
fn setup_call(&self, peer_id: &str) -> Result<OwnedMessage, Error> {
println!("Setting up signalling server call with {}", peer_id);
Ok(OwnedMessage::Text(format!("SESSION {}", peer_id)))
}
// Once we got the HELLO message from the WebSocket connection, start setting up the call
fn handle_hello(&self) -> Result<Option<OwnedMessage>, Error> {
if let Some(ref peer_id) = self.0.peer_id {
self.setup_call(peer_id).map(Some)
} else {
// Wait for a peer to appear
Ok(None)
}
}
// Once the session is set up correctly we start our pipeline
fn handle_session_ok(&self) -> Result<Option<OwnedMessage>, Error> {
self.setup_pipeline()?;
// And finally asynchronously start our pipeline // And finally asynchronously start our pipeline
let app_clone = self.downgrade(); let app_clone = self.downgrade();
self.0.pipeline.call_async(move |pipeline| { self.0.pipeline.call_async(move |pipeline| {
@ -468,23 +535,7 @@ impl App {
} }
}); });
Ok(()) Ok(None)
}
// Send ID of the peer we want to talk to via the WebSocket connection
fn setup_call(&self) -> Result<OwnedMessage, Error> {
println!("Setting up signalling server call with {}", self.0.peer_id);
Ok(OwnedMessage::Text(format!("SESSION {}", &self.0.peer_id)))
}
// Once we got the HELLO message from the WebSocket connection, start setting up the call
fn handle_hello(&self) -> Result<Option<OwnedMessage>, Error> {
self.setup_call().map(Some)
}
// Once the session is set up correctly we start our pipeline
fn handle_session_ok(&self) -> Result<Option<OwnedMessage>, Error> {
self.start_pipeline().map(|_| None)
} }
// Handle errors from the peer send to us via the WebSocket connection // Handle errors from the peer send to us via the WebSocket connection
@ -495,24 +546,71 @@ impl App {
} }
// Handle incoming SDP answers from the peer // Handle incoming SDP answers from the peer
//
// TODO: We currently only handle answers, not offers
fn handle_sdp(&self, type_: &str, sdp: &str) -> Result<Option<OwnedMessage>, Error> { fn handle_sdp(&self, type_: &str, sdp: &str) -> Result<Option<OwnedMessage>, Error> {
if type_ != "answer" { if type_ == "answer" {
return Err(PeerError("Sdp type is not \"answer\"".into()).into()); print!("Received answer:\n{}\n", sdp);
let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
.map_err(|_| GStreamerError("Failed to parse SDP answer".into()))?;
let answer =
gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret);
self.0
.webrtcbin
.emit("set-remote-description", &[&answer, &None::<gst::Promise>])
.unwrap();
Ok(None)
} else if type_ == "offer" {
print!("Received offer:\n{}\n", sdp);
// Need to start the pipeline as a first step here
self.setup_pipeline()?;
let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
.map_err(|_| GStreamerError("Failed to parse SDP offer".into()))?;
// And then asynchronously start our pipeline and do the next steps. The
// pipeline needs to be started before we can create an answer
let app_clone = self.downgrade();
self.0.pipeline.call_async(move |pipeline| {
let app = upgrade_weak!(app_clone);
if let Err(err) = pipeline.set_state(gst::State::Playing) {
app.post_error(
format!("Failed to set pipeline to Playing: {:?}", err).as_str(),
);
return;
}
let offer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
ret,
);
app.0
.webrtcbin
.emit("set-remote-description", &[&offer, &None::<gst::Promise>])
.unwrap();
let app_clone = app.downgrade();
let promise = gst::Promise::new_with_change_func(move |promise| {
let app = upgrade_weak!(app_clone);
if let Err(err) = app.on_answer_created(promise) {
app.post_error(format!("Failed to send SDP answer: {:?}", err).as_str());
}
});
app.0
.webrtcbin
.emit("create-answer", &[&None::<gst::Structure>, &promise])
.unwrap();
});
Ok(None)
} else {
Err(PeerError(format!("Sdp type is not \"answer\" but \"{}\"", type_)).into())
} }
print!("Received answer:\n{}\n", sdp);
let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()).unwrap();
let answer =
gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret);
self.0
.webrtcbin
.emit("set-remote-description", &[&answer, &None::<gst::Promise>])
.unwrap();
Ok(None)
} }
// Handle incoming ICE candidates from the peer by passing them to webrtcbin // Handle incoming ICE candidates from the peer by passing them to webrtcbin
@ -601,13 +699,13 @@ impl App {
} }
} }
fn parse_args() -> (String, String) { fn parse_args() -> (String, Option<String>) {
let matches = clap::App::new("Sendrecv rust") let matches = clap::App::new("Sendrecv rust")
.arg( .arg(
clap::Arg::with_name("peer-id") clap::Arg::with_name("peer-id")
.help("String ID of the peer to connect to") .help("String ID of the peer to connect to")
.long("peer-id") .long("peer-id")
.required(true) .required(false)
.takes_value(true), .takes_value(true),
) )
.arg( .arg(
@ -623,9 +721,9 @@ fn parse_args() -> (String, String) {
.value_of("server") .value_of("server")
.unwrap_or("wss://webrtc.nirbheek.in:8443"); .unwrap_or("wss://webrtc.nirbheek.in:8443");
let peer_id = matches.value_of("peer-id").unwrap(); let peer_id = matches.value_of("peer-id");
(server.to_string(), peer_id.to_string()) (server.to_string(), peer_id.map(String::from))
} }
fn check_plugins() -> Result<(), Error> { fn check_plugins() -> Result<(), Error> {
@ -696,7 +794,7 @@ fn main() {
// Create our application control logic // Create our application control logic
let (send_ws_msg_tx, send_ws_msg_rx) = mpsc::unbounded_channel::<OwnedMessage>(); let (send_ws_msg_tx, send_ws_msg_rx) = mpsc::unbounded_channel::<OwnedMessage>();
let app = App(Arc::new(AppInner { let app = App(Arc::new(AppInner {
peer_id: peer_id.to_string(), peer_id,
pipeline, pipeline,
webrtcbin, webrtcbin,
send_msg_tx: Mutex::new(send_ws_msg_tx), send_msg_tx: Mutex::new(send_ws_msg_tx),
@ -721,7 +819,7 @@ fn main() {
// and convert them into potential messages to send out // and convert them into potential messages to send out
let app_clone = app.clone(); let app_clone = app.clone();
let gst_messages = send_gst_msg_rx let gst_messages = send_gst_msg_rx
.map_err(|err| Error::from(err)) .map_err(Error::from)
.and_then(move |msg| app_clone.handle_pipeline_message(&msg)) .and_then(move |msg| app_clone.handle_pipeline_message(&msg))
.filter_map(|msg| msg); .filter_map(|msg| msg);
@ -730,7 +828,7 @@ fn main() {
// And here collect all the asynchronous outgoing messages that come // And here collect all the asynchronous outgoing messages that come
// from other threads // from other threads
let async_outgoing_messages = send_ws_msg_rx.map_err(|err| Error::from(err)); let async_outgoing_messages = send_ws_msg_rx.map_err(Error::from);
// Merge both outgoing messages streams and send them out directly // Merge both outgoing messages streams and send them out directly
sink.sink_map_err(|err| Error::from(WebSocketError(err))) sink.sink_map_err(|err| Error::from(WebSocketError(err)))