General code cleanup of the Rust sendrecv demo

Fewer clones and more borrowing, if let instead of match, match instead
of multiple ifs, insert a few newlines all over the place to make code
less dense, and a few changes to make code a bit more idiomatic.
This commit is contained in:
Sebastian Dröge 2018-06-21 13:16:15 +03:00
parent 2614249149
commit 9cf3aa088e

View file

@ -17,14 +17,13 @@ extern crate lazy_static;
use failure::Error; use failure::Error;
use gst::prelude::*; use gst::prelude::*;
use rand::Rng; use rand::Rng;
use std::cmp::Ordering;
use std::sync::{mpsc, Arc, Mutex}; use std::sync::{mpsc, Arc, Mutex};
use std::thread; use std::thread;
use websocket::message::OwnedMessage; use websocket::message::OwnedMessage;
const STUN_SERVER: &str = "stun://stun.l.google.com:19302 "; const STUN_SERVER: &str = "stun://stun.l.google.com:19302 ";
lazy_static! { lazy_static! {
static ref RTP_CAPS_OPUS: gst::GstRc<gst::CapsRef> = { static ref RTP_CAPS_OPUS: gst::Caps = {
gst::Caps::new_simple( gst::Caps::new_simple(
"application/x-rtp", "application/x-rtp",
&[ &[
@ -34,7 +33,7 @@ lazy_static! {
], ],
) )
}; };
static ref RTP_CAPS_VP8: gst::GstRc<gst::CapsRef> = { static ref RTP_CAPS_VP8: gst::Caps = {
gst::Caps::new_simple( gst::Caps::new_simple(
"application/x-rtp", "application/x-rtp",
&[ &[
@ -107,19 +106,12 @@ struct WsError(AppState);
#[fail(display = "Error on bus: {}", _0)] #[fail(display = "Error on bus: {}", _0)]
struct BusError(String); struct BusError(String);
#[derive(Debug, Fail)]
#[fail(display = "Error linking pads {:?} != {:?}", left, right)]
struct PadLinkError {
left: gst::PadLinkReturn,
right: gst::PadLinkReturn,
}
#[derive(Debug, Fail)] #[derive(Debug, Fail)]
#[fail(display = "Missing elements {:?}", _0)] #[fail(display = "Missing elements {:?}", _0)]
struct MissingElements(Vec<&'static str>); struct MissingElements(Vec<&'static str>);
fn check_plugins() -> Result<(), Error> { fn check_plugins() -> Result<(), Error> {
let needed = vec![ let needed = [
"opus", "opus",
"vpx", "vpx",
"nice", "nice",
@ -130,14 +122,14 @@ fn check_plugins() -> Result<(), Error> {
"videotestsrc", "videotestsrc",
"audiotestsrc", "audiotestsrc",
]; ];
let registry = gst::Registry::get(); let registry = gst::Registry::get();
let mut missing: Vec<&'static str> = Vec::new(); let missing = needed
for plugin_name in needed { .iter()
let plugin = registry.find_plugin(&plugin_name.to_string()); .filter(|n| registry.find_plugin(n).is_some())
if plugin.is_none() { .map(|n| *n)
missing.push(plugin_name) .collect::<Vec<_>>();
}
}
if !missing.is_empty() { if !missing.is_empty() {
Err(MissingElements(missing))? Err(MissingElements(missing))?
} else { } else {
@ -146,18 +138,19 @@ fn check_plugins() -> Result<(), Error> {
} }
fn send_sdp_offer(app_control: &AppControl, offer: &gst_webrtc::WebRTCSessionDescription) { fn send_sdp_offer(app_control: &AppControl, offer: &gst_webrtc::WebRTCSessionDescription) {
if app_control.app_state_cmp( if !app_control.assert_app_state_is_at_least(
AppState::PeerCallNegotiating, AppState::PeerCallNegotiating,
"Can't send offer, not in call", "Can't send offer, not in call",
) == Ordering::Less ) {
{
return; return;
} }
let message = serde_json::to_string(&JsonMsg::Sdp { let message = serde_json::to_string(&JsonMsg::Sdp {
type_: "offer".to_string(), type_: "offer".to_string(),
sdp: offer.get_sdp().as_text().unwrap(), sdp: offer.get_sdp().as_text().unwrap(),
}).unwrap(); }).unwrap();
app_control.send_text_msg(message.to_string());
app_control.send_text_msg(message);
} }
fn on_offer_created( fn on_offer_created(
@ -165,12 +158,13 @@ fn on_offer_created(
webrtc: &gst::Element, webrtc: &gst::Element,
promise: &gst::Promise, promise: &gst::Promise,
) -> Result<(), Error> { ) -> Result<(), Error> {
if !app_control.app_state_eq( if !app_control.assert_app_state_is(
AppState::PeerCallNegotiating, AppState::PeerCallNegotiating,
"Not negotiation call when creating offer", "Not negotiating call when creating offer",
) { ) {
return Ok(()); return Ok(());
} }
let reply = promise.get_reply().unwrap(); let reply = promise.get_reply().unwrap();
let offer = reply let offer = reply
@ -181,18 +175,22 @@ fn on_offer_created(
webrtc.emit("set-local-description", &[&offer, &None::<gst::Promise>])?; webrtc.emit("set-local-description", &[&offer, &None::<gst::Promise>])?;
send_sdp_offer(&app_control, &offer); send_sdp_offer(&app_control, &offer);
Ok(()) Ok(())
} }
fn on_negotiation_needed(app_control: &AppControl, values: &[glib::Value]) -> Result<(), Error> { fn on_negotiation_needed(app_control: &AppControl, values: &[glib::Value]) -> Result<(), Error> {
app_control.0.lock().unwrap().app_state = AppState::PeerCallNegotiating; app_control.0.lock().unwrap().app_state = AppState::PeerCallNegotiating;
let webrtc = values[0].get::<gst::Element>().unwrap(); let webrtc = values[0].get::<gst::Element>().unwrap();
let webrtc_clone = webrtc.clone(); let webrtc_clone = webrtc.clone();
let app_control_clone = app_control.clone(); let app_control_clone = app_control.clone();
let promise = gst::Promise::new_with_change_func(move |promise| { let promise = gst::Promise::new_with_change_func(move |promise| {
on_offer_created(&app_control_clone, &webrtc, promise).unwrap(); on_offer_created(&app_control_clone, &webrtc, promise).unwrap();
}); });
webrtc_clone.emit("create-offer", &[&None::<gst::Structure>, &promise])?; webrtc_clone.emit("create-offer", &[&None::<gst::Structure>, &promise])?;
Ok(()) Ok(())
} }
@ -209,17 +207,22 @@ fn handle_media_stream(
let conv = gst::ElementFactory::make("audioconvert", None).unwrap(); let conv = gst::ElementFactory::make("audioconvert", None).unwrap();
let sink = gst::ElementFactory::make("autoaudiosink", None).unwrap(); let sink = gst::ElementFactory::make("autoaudiosink", None).unwrap();
let resample = gst::ElementFactory::make("audioresample", None).unwrap(); let resample = gst::ElementFactory::make("audioresample", None).unwrap();
pipe.add_many(&[&q, &conv, &resample, &sink])?; pipe.add_many(&[&q, &conv, &resample, &sink])?;
gst::Element::link_many(&[&q, &conv, &resample, &sink])?; gst::Element::link_many(&[&q, &conv, &resample, &sink])?;
resample.sync_state_with_parent()?; resample.sync_state_with_parent()?;
(q, conv, sink) (q, conv, sink)
} }
MediaType::Video => { MediaType::Video => {
let q = gst::ElementFactory::make("queue", None).unwrap(); let q = gst::ElementFactory::make("queue", None).unwrap();
let conv = gst::ElementFactory::make("videoconvert", None).unwrap(); let conv = gst::ElementFactory::make("videoconvert", None).unwrap();
let sink = gst::ElementFactory::make("autovideosink", None).unwrap(); let sink = gst::ElementFactory::make("autovideosink", None).unwrap();
pipe.add_many(&[&q, &conv, &sink])?; pipe.add_many(&[&q, &conv, &sink])?;
gst::Element::link_many(&[&q, &conv, &sink])?; gst::Element::link_many(&[&q, &conv, &sink])?;
(q, conv, sink) (q, conv, sink)
} }
}; };
@ -228,14 +231,8 @@ fn handle_media_stream(
sink.sync_state_with_parent()?; sink.sync_state_with_parent()?;
let qpad = q.get_static_pad("sink").unwrap(); let qpad = q.get_static_pad("sink").unwrap();
let ret = pad.link(&qpad); pad.link(&qpad).into_result()?;
let ok = gst::PadLinkReturn::Ok;
if ret != ok {
Err(PadLinkError {
left: ret,
right: ok,
})?;
}
Ok(()) Ok(())
} }
@ -275,6 +272,7 @@ fn on_incoming_stream(
pipe: &gst::Pipeline, pipe: &gst::Pipeline,
) -> Option<glib::Value> { ) -> Option<glib::Value> {
let webrtc = values[0].get::<gst::Element>().expect("Invalid argument"); let webrtc = values[0].get::<gst::Element>().expect("Invalid argument");
let decodebin = gst::ElementFactory::make("decodebin", None).unwrap(); let decodebin = gst::ElementFactory::make("decodebin", None).unwrap();
let pipe_clone = pipe.clone(); let pipe_clone = pipe.clone();
let app_control_clone = app_control.clone(); let app_control_clone = app_control.clone();
@ -283,41 +281,46 @@ fn on_incoming_stream(
on_incoming_decodebin_stream(&app_control_clone, values, &pipe_clone) on_incoming_decodebin_stream(&app_control_clone, values, &pipe_clone)
}) })
.unwrap(); .unwrap();
pipe.clone()
.dynamic_cast::<gst::Bin>() pipe.add(&decodebin).unwrap();
.unwrap()
.add(&decodebin)
.unwrap();
decodebin.sync_state_with_parent().unwrap(); decodebin.sync_state_with_parent().unwrap();
webrtc.link(&decodebin).unwrap(); webrtc.link(&decodebin).unwrap();
None None
} }
fn send_ice_candidate_message(app_control: &AppControl, values: &[glib::Value]) { fn send_ice_candidate_message(app_control: &AppControl, values: &[glib::Value]) {
if app_control.app_state_cmp(AppState::PeerCallNegotiating, "Can't send ICE, not in call") if !app_control
== Ordering::Less .assert_app_state_is_at_least(AppState::PeerCallNegotiating, "Can't send ICE, not in call")
{ {
return; return;
} }
let _webrtc = values[0].get::<gst::Element>().expect("Invalid argument"); let _webrtc = values[0].get::<gst::Element>().expect("Invalid argument");
let mlineindex = values[1].get::<u32>().expect("Invalid argument"); let mlineindex = values[1].get::<u32>().expect("Invalid argument");
let candidate = values[2].get::<String>().expect("Invalid argument"); let candidate = values[2].get::<String>().expect("Invalid argument");
let message = serde_json::to_string(&JsonMsg::Ice { let message = serde_json::to_string(&JsonMsg::Ice {
candidate, candidate,
sdp_mline_index: mlineindex, sdp_mline_index: mlineindex,
}).unwrap(); }).unwrap();
app_control.send_text_msg(message.to_string());
app_control.send_text_msg(message);
} }
fn add_video_source(pipeline: &gst::Pipeline, webrtcbin: &gst::Element) -> Result<(), Error> { fn add_video_source(pipeline: &gst::Pipeline, webrtcbin: &gst::Element) -> Result<(), Error> {
let videotestsrc = gst::ElementFactory::make("videotestsrc", None).unwrap(); let videotestsrc = gst::ElementFactory::make("videotestsrc", None).unwrap();
videotestsrc.set_property_from_str("pattern", "ball");
let videoconvert = gst::ElementFactory::make("videoconvert", None).unwrap(); let videoconvert = gst::ElementFactory::make("videoconvert", None).unwrap();
let queue = gst::ElementFactory::make("queue", None).unwrap(); let queue = gst::ElementFactory::make("queue", None).unwrap();
let vp8enc = gst::ElementFactory::make("vp8enc", None).unwrap(); let vp8enc = gst::ElementFactory::make("vp8enc", None).unwrap();
vp8enc.set_property("deadline", &1i64)?;
videotestsrc.set_property_from_str("pattern", "ball");
vp8enc.set_property("deadline", &1i64).unwrap();
let rtpvp8pay = gst::ElementFactory::make("rtpvp8pay", None).unwrap(); let rtpvp8pay = gst::ElementFactory::make("rtpvp8pay", None).unwrap();
let queue2 = gst::ElementFactory::make("queue", None).unwrap(); let queue2 = gst::ElementFactory::make("queue", None).unwrap();
pipeline.add_many(&[ pipeline.add_many(&[
&videotestsrc, &videotestsrc,
&videoconvert, &videoconvert,
@ -326,6 +329,7 @@ fn add_video_source(pipeline: &gst::Pipeline, webrtcbin: &gst::Element) -> Resul
&rtpvp8pay, &rtpvp8pay,
&queue2, &queue2,
])?; ])?;
gst::Element::link_many(&[ gst::Element::link_many(&[
&videotestsrc, &videotestsrc,
&videoconvert, &videoconvert,
@ -334,13 +338,14 @@ fn add_video_source(pipeline: &gst::Pipeline, webrtcbin: &gst::Element) -> Resul
&rtpvp8pay, &rtpvp8pay,
&queue2, &queue2,
])?; ])?;
queue2.link_filtered(webrtcbin, &*RTP_CAPS_VP8)?; queue2.link_filtered(webrtcbin, &*RTP_CAPS_VP8)?;
Ok(()) Ok(())
} }
fn add_audio_source(pipeline: &gst::Pipeline, webrtcbin: &gst::Element) -> Result<(), Error> { fn add_audio_source(pipeline: &gst::Pipeline, webrtcbin: &gst::Element) -> Result<(), Error> {
let audiotestsrc = gst::ElementFactory::make("audiotestsrc", None).unwrap(); let audiotestsrc = gst::ElementFactory::make("audiotestsrc", None).unwrap();
audiotestsrc.set_property_from_str("wave", "red-noise");
let queue = gst::ElementFactory::make("queue", None).unwrap(); let queue = gst::ElementFactory::make("queue", None).unwrap();
let audioconvert = gst::ElementFactory::make("audioconvert", None).unwrap(); let audioconvert = gst::ElementFactory::make("audioconvert", None).unwrap();
let audioresample = gst::ElementFactory::make("audioresample", None).unwrap(); let audioresample = gst::ElementFactory::make("audioresample", None).unwrap();
@ -348,6 +353,9 @@ fn add_audio_source(pipeline: &gst::Pipeline, webrtcbin: &gst::Element) -> Resul
let opusenc = gst::ElementFactory::make("opusenc", None).unwrap(); let opusenc = gst::ElementFactory::make("opusenc", None).unwrap();
let rtpopuspay = gst::ElementFactory::make("rtpopuspay", None).unwrap(); let rtpopuspay = gst::ElementFactory::make("rtpopuspay", None).unwrap();
let queue3 = gst::ElementFactory::make("queue", None).unwrap(); let queue3 = gst::ElementFactory::make("queue", None).unwrap();
audiotestsrc.set_property_from_str("wave", "red-noise");
pipeline.add_many(&[ pipeline.add_many(&[
&audiotestsrc, &audiotestsrc,
&queue, &queue,
@ -358,6 +366,7 @@ fn add_audio_source(pipeline: &gst::Pipeline, webrtcbin: &gst::Element) -> Resul
&rtpopuspay, &rtpopuspay,
&queue3, &queue3,
])?; ])?;
gst::Element::link_many(&[ gst::Element::link_many(&[
&audiotestsrc, &audiotestsrc,
&queue, &queue,
@ -368,27 +377,30 @@ fn add_audio_source(pipeline: &gst::Pipeline, webrtcbin: &gst::Element) -> Resul
&rtpopuspay, &rtpopuspay,
&queue3, &queue3,
])?; ])?;
queue3.link_filtered(webrtcbin, &*RTP_CAPS_OPUS)?; queue3.link_filtered(webrtcbin, &*RTP_CAPS_OPUS)?;
Ok(()) Ok(())
} }
impl AppControl { impl AppControl {
fn app_state_eq(&self, state: AppState, error_msg: &'static str) -> bool { fn assert_app_state_is(&self, state: AppState, error_msg: &'static str) -> bool {
if { self.0.lock().unwrap().app_state != state } { if self.0.lock().unwrap().app_state != state {
self.send_bus_error(error_msg); self.send_bus_error(error_msg);
false false
} else { } else {
true true
} }
} }
fn app_state_cmp(&self, state: AppState, error_msg: &'static str) -> Ordering { fn assert_app_state_is_at_least(&self, state: AppState, error_msg: &'static str) -> bool {
match { self.0.lock().unwrap().app_state.cmp(&state) } { if self.0.lock().unwrap().app_state < state {
Ordering::Less => { self.send_bus_error(error_msg);
self.send_bus_error(error_msg);
Ordering::Less false
} } else {
_foo => _foo, true
} }
} }
@ -413,22 +425,22 @@ impl AppControl {
fn construct_pipeline(&self) -> Result<gst::Pipeline, Error> { fn construct_pipeline(&self) -> Result<gst::Pipeline, Error> {
let pipeline = { self.0.lock().unwrap().pipeline.clone() }; let pipeline = { self.0.lock().unwrap().pipeline.clone() };
let webrtcbin = gst::ElementFactory::make("webrtcbin", "sendrecv").unwrap(); let webrtcbin = gst::ElementFactory::make("webrtcbin", "sendrecv").unwrap();
pipeline.add(&webrtcbin)?; pipeline.add(&webrtcbin)?;
webrtcbin.set_property_from_str("stun-server", STUN_SERVER); webrtcbin.set_property_from_str("stun-server", STUN_SERVER);
add_video_source(&pipeline, &webrtcbin)?; add_video_source(&pipeline, &webrtcbin)?;
add_audio_source(&pipeline, &webrtcbin)?; add_audio_source(&pipeline, &webrtcbin)?;
Ok(pipeline) Ok(pipeline)
} }
fn start_pipeline(&self) -> Result<(), Error> { fn start_pipeline(&self) -> Result<(), Error> {
let pipe = self.construct_pipeline()?; let pipe = self.construct_pipeline()?;
let webrtc = pipe let webrtc = pipe.get_by_name("sendrecv").unwrap();
.clone()
.dynamic_cast::<gst::Bin>()
.unwrap()
.get_by_name("sendrecv")
.unwrap();
let app_control_clone = self.clone(); let app_control_clone = self.clone();
webrtc.connect("on-negotiation-needed", false, move |values| { webrtc.connect("on-negotiation-needed", false, move |values| {
on_negotiation_needed(&app_control_clone, values).unwrap(); on_negotiation_needed(&app_control_clone, values).unwrap();
@ -448,12 +460,15 @@ impl AppControl {
})?; })?;
pipe.set_state(gst::State::Playing).into_result()?; pipe.set_state(gst::State::Playing).into_result()?;
self.0.lock().unwrap().webrtc = Some(webrtc); self.0.lock().unwrap().webrtc = Some(webrtc);
Ok(()) Ok(())
} }
fn register_with_server(&self) { fn register_with_server(&self) {
self.update_state(AppState::ServerRegistering); self.update_state(AppState::ServerRegistering);
let our_id = rand::thread_rng().gen_range(10, 10_000); let our_id = rand::thread_rng().gen_range(10, 10_000);
println!("Registering id {} with server", our_id); println!("Registering id {} with server", our_id);
self.send_text_msg(format!("HELLO {}", our_id)); self.send_text_msg(format!("HELLO {}", our_id));
@ -461,6 +476,7 @@ impl AppControl {
fn setup_call(&self) { fn setup_call(&self) {
self.update_state(AppState::PeerConnecting); self.update_state(AppState::PeerConnecting);
let peer_id = { self.0.lock().unwrap().peer_id.clone() }; let peer_id = { self.0.lock().unwrap().peer_id.clone() };
println!("Setting up signalling server call with {}", peer_id); println!("Setting up signalling server call with {}", peer_id);
self.send_text_msg(format!("SESSION {}", peer_id)); self.send_text_msg(format!("SESSION {}", peer_id));
@ -473,10 +489,13 @@ impl AppControl {
return Err(OutOfOrder("Received HELLO when not registering"))?; return Err(OutOfOrder("Received HELLO when not registering"))?;
} }
} }
self.update_state(AppState::ServerRegistered); self.update_state(AppState::ServerRegistered);
self.setup_call(); self.setup_call();
Ok(()) Ok(())
} }
fn handle_session_ok(&self) -> Result<(), Error> { fn handle_session_ok(&self) -> Result<(), Error> {
{ {
let mut app_control = self.0.lock().unwrap(); let mut app_control = self.0.lock().unwrap();
@ -485,8 +504,10 @@ impl AppControl {
} }
app_control.app_state = AppState::PeerConnected; app_control.app_state = AppState::PeerConnected;
} }
self.start_pipeline() self.start_pipeline()
} }
fn handle_error(&self) -> Result<(), Error> { fn handle_error(&self) -> Result<(), Error> {
let app_control = self.0.lock().unwrap(); let app_control = self.0.lock().unwrap();
let error = match app_control.app_state { let error = match app_control.app_state {
@ -505,8 +526,9 @@ impl AppControl {
Err(WsError(error))? Err(WsError(error))?
} }
fn handle_sdp(&self, type_: &str, sdp: &str) { fn handle_sdp(&self, type_: &str, sdp: &str) {
if !self.app_state_eq(AppState::PeerCallNegotiating, "Not ready to handle sdp") { if !self.assert_app_state_is(AppState::PeerCallNegotiating, "Not ready to handle sdp") {
return; return;
} }
@ -516,6 +538,7 @@ impl AppControl {
} }
let mut app_control = self.0.lock().unwrap(); let mut app_control = self.0.lock().unwrap();
print!("Received answer:\n{}\n", sdp); print!("Received answer:\n{}\n", sdp);
let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()).unwrap(); let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()).unwrap();
@ -528,8 +551,10 @@ impl AppControl {
.unwrap() .unwrap()
.emit("set-remote-description", &[&answer, &promise]) .emit("set-remote-description", &[&answer, &promise])
.unwrap(); .unwrap();
app_control.app_state = AppState::PeerCallStarted; app_control.app_state = AppState::PeerCallStarted;
} }
fn handle_ice(&self, sdp_mline_index: u32, candidate: &str) { fn handle_ice(&self, sdp_mline_index: u32, candidate: &str) {
let app_control = self.0.lock().unwrap(); let app_control = self.0.lock().unwrap();
app_control app_control
@ -539,45 +564,56 @@ impl AppControl {
.emit("add-ice-candidate", &[&sdp_mline_index, &candidate]) .emit("add-ice-candidate", &[&sdp_mline_index, &candidate])
.unwrap(); .unwrap();
} }
fn on_message(&mut self, msg: &str) -> Result<(), Error> {
if msg == "HELLO" {
return self.handle_hello();
}
if msg == "SESSION_OK" {
return self.handle_session_ok();
}
if msg.starts_with("ERROR") { fn on_message(&self, msg: &str) -> Result<(), Error> {
println!("Got error message! {}", msg); match msg {
return self.handle_error(); "HELLO" => self.handle_hello(),
"SESSION_OK" => self.handle_session_ok(),
x if x.starts_with("ERROR") => {
println!("Got error message! {}", msg);
self.handle_error()
}
_ => {
let json_msg: JsonMsg = serde_json::from_str(msg)?;
match json_msg {
JsonMsg::Sdp { type_, sdp } => self.handle_sdp(&type_, &sdp),
JsonMsg::Ice {
sdp_mline_index,
candidate,
} => self.handle_ice(sdp_mline_index, &candidate),
};
Ok(())
}
} }
let json_msg: JsonMsg = serde_json::from_str(msg)?;
match json_msg {
JsonMsg::Sdp { type_, sdp } => self.handle_sdp(&type_, &sdp),
JsonMsg::Ice {
sdp_mline_index,
candidate,
} => self.handle_ice(sdp_mline_index, &candidate),
};
Ok(())
} }
fn close_and_quit(&self, err: &Error) { fn close_and_quit(&self, err: &Error) {
let app_control = self.0.lock().unwrap();
println!("{}\nquitting", err); println!("{}\nquitting", err);
app_control
.pipeline // Must not hold mutex while shutting down the pipeline
.set_state(gst::State::Null) // as something might call into here and take the mutex too
.into_result() let (pipeline, main_loop) = {
.ok(); let app_control = self.0.lock().unwrap();
app_control
.send_msg_tx app_control
.send(OwnedMessage::Close(Some(websocket::message::CloseData { .send_msg_tx
status_code: 1011, //Internal Error .send(OwnedMessage::Close(Some(websocket::message::CloseData {
reason: err.to_string(), status_code: 1011, //Internal Error
}))) reason: err.to_string(),
.ok(); })))
app_control.main_loop.quit(); .unwrap();
(app_control.pipeline.clone(), app_control.main_loop.clone())
};
pipeline.set_state(gst::State::Null).into_result().unwrap();
main_loop.quit();
} }
} }
@ -598,10 +634,13 @@ fn parse_args() -> (String, String) {
.takes_value(true), .takes_value(true),
) )
.get_matches(); .get_matches();
let server = matches let server = matches
.value_of("server") .value_of("server")
.unwrap_or("wss://webrtc.nirbheek.in:8443"); .unwrap_or("wss://webrtc.nirbheek.in:8443");
let peer_id = matches.value_of("peer-id").unwrap(); let peer_id = matches.value_of("peer-id").unwrap();
(server.to_string(), peer_id.to_string()) (server.to_string(), peer_id.to_string())
} }
@ -623,10 +662,9 @@ fn send_loop(
return; return;
} }
match sender.send_message(&msg) { if let Err(err) = sender.send_message(&msg) {
Ok(()) => (), println!("Error sending {:?}", err);
Err(err) => println!("Error sending {:?}", err), }
};
}) })
} }
@ -648,18 +686,20 @@ fn receive_loop(
return; return;
} }
}; };
match message { match message {
OwnedMessage::Close(_) => { OwnedMessage::Close(_) => {
let _ = send_msg_tx.send(OwnedMessage::Close(None)); let _ = send_msg_tx.send(OwnedMessage::Close(None));
return; return;
} }
OwnedMessage::Ping(data) => match send_msg_tx.send(OwnedMessage::Pong(data)) {
Ok(()) => (), OwnedMessage::Ping(data) => {
Err(e) => { if let Err(e) = send_msg_tx.send(OwnedMessage::Pong(data)) {
println!("Receive Loop error: {:?}", e); println!("Receive Loop error: {:?}", e);
return; return;
} }
}, }
OwnedMessage::Text(msg) => { OwnedMessage::Text(msg) => {
let mbuilder = gst::Message::new_application(gst::Structure::new( let mbuilder = gst::Message::new_application(gst::Structure::new(
"ws-message", "ws-message",
@ -677,7 +717,7 @@ fn receive_loop(
} }
fn handle_application_msg( fn handle_application_msg(
app_control: &mut AppControl, app_control: &AppControl,
struc: &gst::StructureRef, struc: &gst::StructureRef,
) -> Result<(), Error> { ) -> Result<(), Error> {
match struc.get_name() { match struc.get_name() {
@ -690,8 +730,9 @@ fn handle_application_msg(
let msg: String = struc.get_value("body").unwrap().get().unwrap(); let msg: String = struc.get_value("body").unwrap().get().unwrap();
Err(BusError(msg))? Err(BusError(msg))?
} }
_u => { u => {
println!("Got unknown application message {:?}", _u); println!("Got unknown application message {:?}", u);
Ok(()) Ok(())
} }
} }
@ -742,8 +783,9 @@ fn main() {
app_control.register_with_server(); app_control.register_with_server();
bus.add_watch(move |_, msg| { bus.add_watch(move |_, msg| {
let mut app_control = app_control.clone();
use gst::message::MessageView; use gst::message::MessageView;
let app_control = app_control.clone();
match msg.view() { match msg.view() {
MessageView::Error(err) => app_control.close_and_quit(&Error::from(err.get_error())), MessageView::Error(err) => app_control.close_and_quit(&Error::from(err.get_error())),
MessageView::Warning(warning) => { MessageView::Warning(warning) => {
@ -751,12 +793,13 @@ fn main() {
} }
MessageView::Application(a) => { MessageView::Application(a) => {
let struc = a.get_structure().unwrap(); let struc = a.get_structure().unwrap();
if let Err(err) = handle_application_msg(&mut app_control, struc) { if let Err(err) = handle_application_msg(&app_control, struc) {
app_control.close_and_quit(&err) app_control.close_and_quit(&err)
} }
} }
_ => {} _ => {}
}; };
glib::Continue(true) glib::Continue(true)
}); });