webrtc/janus/rust: update to latest GStreamer rust bindings

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-examples/-/merge_requests/42>
This commit is contained in:
Matthew Waters 2021-08-06 15:49:13 +10:00
parent f5cbbab4d5
commit 5c4cc517f0
4 changed files with 354 additions and 345 deletions

File diff suppressed because it is too large Load diff

View file

@ -11,15 +11,15 @@ structopt = { version = "0.3", default-features = false }
anyhow = "1" anyhow = "1"
url = "2" url = "2"
rand = "0.7" rand = "0.7"
async-tungstenite = { version = "0.8", features = ["gio-runtime"] } async-tungstenite = { version = "0.14", features = ["gio-runtime"] }
gst = { package = "gstreamer", version = "0.16", features = ["v1_14"] } gst = { package = "gstreamer", version = "0.17", features = ["v1_14"] }
gst-webrtc = { package = "gstreamer-webrtc", version = "0.16" } gst-webrtc = { package = "gstreamer-webrtc", version = "0.17" }
gst-sdp = { package = "gstreamer-sdp", version = "0.16", features = ["v1_14"] } gst-sdp = { package = "gstreamer-sdp", version = "0.17", features = ["v1_14"] }
serde = "1" serde = "1"
serde_derive = "1" serde_derive = "1"
serde_json = "1.0.53" serde_json = "1.0.53"
http = "0.2" http = "0.2"
glib = "0.10" glib = "0.14"
gio = "0.9" gio = "0.14"
log = "0.4.8" log = "0.4.8"
env_logger = "0.7.1" env_logger = "0.7.1"

View file

@ -25,13 +25,13 @@ use {
futures::channel::mpsc, futures::channel::mpsc,
futures::sink::{Sink, SinkExt}, futures::sink::{Sink, SinkExt},
futures::stream::{Stream, StreamExt}, futures::stream::{Stream, StreamExt},
gst::gst_element_error,
gst::prelude::*, gst::prelude::*,
http::Request, http::Request,
rand::prelude::*, rand::prelude::*,
serde_derive::{Deserialize, Serialize}, serde_derive::{Deserialize, Serialize},
serde_json::json, serde_json::json,
std::sync::{Arc, Mutex, Weak}, std::sync::{Arc, Mutex, Weak},
std::time::Duration,
structopt::StructOpt, structopt::StructOpt,
tungstenite::Message as WsMessage, tungstenite::Message as WsMessage,
}; };
@ -215,7 +215,7 @@ impl Peer {
let peer = upgrade_weak!(peer_clone); let peer = upgrade_weak!(peer_clone);
if let Err(err) = peer.on_offer_created(s) { if let Err(err) = peer.on_offer_created(s) {
gst_element_error!( gst::element_error!(
peer.bin, peer.bin,
gst::LibraryError::Failed, gst::LibraryError::Failed,
("Failed to send SDP offer: {:?}", err) ("Failed to send SDP offer: {:?}", err)
@ -224,7 +224,7 @@ impl Peer {
}); });
self.webrtcbin self.webrtcbin
.emit("create-offer", &[&None::<gst::Structure>, &promise])?; .emit_by_name("create-offer", &[&None::<gst::Structure>, &promise])?;
Ok(()) Ok(())
} }
@ -233,17 +233,15 @@ impl Peer {
// WebSocket connection // WebSocket connection
fn on_offer_created(&self, reply: &gst::StructureRef) -> Result<(), anyhow::Error> { fn on_offer_created(&self, reply: &gst::StructureRef) -> Result<(), anyhow::Error> {
let offer = reply let offer = reply
.get_value("offer")? .get::<gst_webrtc::WebRTCSessionDescription>("offer")
.get::<gst_webrtc::WebRTCSessionDescription>() .expect("Invalid argument");
.expect("Invalid argument")
.expect("Invalid offer");
self.webrtcbin self.webrtcbin
.emit("set-local-description", &[&offer, &None::<gst::Promise>])?; .emit_by_name("set-local-description", &[&offer, &None::<gst::Promise>])?;
info!("sending SDP offer to peer: {:?}", offer.get_sdp().as_text()); info!("sending SDP offer to peer: {:?}", offer.sdp().as_text());
let transaction = transaction_id(); let transaction = transaction_id();
let sdp_data = offer.get_sdp().as_text()?; let sdp_data = offer.sdp().as_text()?;
let msg = WsMessage::Text( let msg = WsMessage::Text(
json!({ json!({
"janus": "message", "janus": "message",
@ -276,16 +274,14 @@ impl Peer {
// WebSocket connection // WebSocket connection
fn on_answer_created(&self, reply: &gst::Structure) -> Result<(), anyhow::Error> { fn on_answer_created(&self, reply: &gst::Structure) -> Result<(), anyhow::Error> {
let answer = reply let answer = reply
.get_value("answer")? .get::<gst_webrtc::WebRTCSessionDescription>("answer")
.get::<gst_webrtc::WebRTCSessionDescription>()
.expect("Invalid argument")
.expect("Invalid answer"); .expect("Invalid answer");
self.webrtcbin self.webrtcbin
.emit("set-local-description", &[&answer, &None::<gst::Promise>])?; .emit_by_name("set-local-description", &[&answer, &None::<gst::Promise>])?;
info!( info!(
"sending SDP answer to peer: {:?}", "sending SDP answer to peer: {:?}",
answer.get_sdp().as_text() answer.sdp().as_text()
); );
Ok(()) Ok(())
@ -302,7 +298,7 @@ impl Peer {
gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret); gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret);
self.webrtcbin self.webrtcbin
.emit("set-remote-description", &[&answer, &None::<gst::Promise>])?; .emit_by_name("set-remote-description", &[&answer, &None::<gst::Promise>])?;
Ok(()) Ok(())
} else if type_ == "offer" { } else if type_ == "offer" {
@ -324,7 +320,7 @@ impl Peer {
peer.0 peer.0
.webrtcbin .webrtcbin
.emit("set-remote-description", &[&offer, &None::<gst::Promise>]) .emit_by_name("set-remote-description", &[&offer, &None::<gst::Promise>])
.expect("Unable to set remote description"); .expect("Unable to set remote description");
let peer_clone = peer.downgrade(); let peer_clone = peer.downgrade();
@ -333,7 +329,7 @@ impl Peer {
let peer = upgrade_weak!(peer_clone); let peer = upgrade_weak!(peer_clone);
if let Err(err) = peer.on_answer_created(&s.to_owned()) { if let Err(err) = peer.on_answer_created(&s.to_owned()) {
gst_element_error!( gst::element_error!(
peer.bin, peer.bin,
gst::LibraryError::Failed, gst::LibraryError::Failed,
("Failed to send SDP answer: {:?}", err) ("Failed to send SDP answer: {:?}", err)
@ -343,7 +339,7 @@ impl Peer {
peer.0 peer.0
.webrtcbin .webrtcbin
.emit("create-answer", &[&None::<gst::Structure>, &promise]) .emit_by_name("create-answer", &[&None::<gst::Structure>, &promise])
.expect("Unable to create answer"); .expect("Unable to create answer");
}); });
@ -360,7 +356,7 @@ impl Peer {
sdp_mline_index, candidate sdp_mline_index, candidate
); );
self.webrtcbin self.webrtcbin
.emit("add-ice-candidate", &[&sdp_mline_index, &candidate])?; .emit_by_name("add-ice-candidate", &[&sdp_mline_index, &candidate])?;
Ok(()) Ok(())
} }
@ -483,7 +479,7 @@ impl JanusGateway {
ws.send(msg).await?; ws.send(msg).await?;
let webrtcbin = pipeline let webrtcbin = pipeline
.get_by_name("webrtcbin") .by_name("webrtcbin")
.expect("can't find webrtcbin"); .expect("can't find webrtcbin");
let webrtc_codec = &args.webrtc_video_codec; let webrtc_codec = &args.webrtc_video_codec;
@ -498,14 +494,14 @@ impl JanusGateway {
pipeline.add(&encode_bin).expect("Failed to add encode bin"); pipeline.add(&encode_bin).expect("Failed to add encode bin");
let video_queue = pipeline.get_by_name("vqueue").expect("No vqueue found"); let video_queue = pipeline.by_name("vqueue").expect("No vqueue found");
let encoder = encode_bin.get_by_name("encoder").expect("No encoder"); let encoder = encode_bin.by_name("encoder").expect("No encoder");
let srcpad = video_queue let srcpad = video_queue
.get_static_pad("src") .static_pad("src")
.expect("Failed to get video queue src pad"); .expect("Failed to get video queue src pad");
let sinkpad = encoder let sinkpad = encoder
.get_static_pad("sink") .static_pad("sink")
.expect("Failed to get sink pad from encoder"); .expect("Failed to get sink pad from encoder");
if let Ok(video_ghost_pad) = gst::GhostPad::with_target(Some("video_sink"), &sinkpad) { if let Ok(video_ghost_pad) = gst::GhostPad::with_target(Some("video_sink"), &sinkpad) {
@ -514,13 +510,13 @@ impl JanusGateway {
} }
let sinkpad2 = webrtcbin let sinkpad2 = webrtcbin
.get_request_pad("sink_%u") .request_pad_simple("sink_%u")
.expect("Unable to request outgoing webrtcbin pad"); .expect("Unable to request outgoing webrtcbin pad");
let vsink = encode_bin let vsink = encode_bin
.get_by_name("webrtc-vsink") .by_name("webrtc-vsink")
.expect("No webrtc-vsink found"); .expect("No webrtc-vsink found");
let srcpad = vsink let srcpad = vsink
.get_static_pad("src") .static_pad("src")
.expect("Element without src pad"); .expect("Element without src pad");
if let Ok(webrtc_ghost_pad) = gst::GhostPad::with_target(Some("webrtc_video_src"), &srcpad) if let Ok(webrtc_ghost_pad) = gst::GhostPad::with_target(Some("webrtc_video_src"), &srcpad)
{ {
@ -528,13 +524,8 @@ impl JanusGateway {
webrtc_ghost_pad.link(&sinkpad2)?; webrtc_ghost_pad.link(&sinkpad2)?;
} }
if let Ok(transceiver) = webrtcbin.emit("get-transceiver", &[&0.to_value()]) { if let Some(transceiver) = webrtcbin.emit_by_name("get-transceiver", &[&0.to_value()]).unwrap().and_then(|val| val.get::<glib::Object>().ok()) {
if let Some(t) = transceiver { transceiver.set_property("do-nack", &false.to_value())?;
if let Ok(obj) = t.get::<glib::Object>() {
obj.expect("Invalid transceiver")
.set_property("do-nack", &true.to_value())?;
}
}
} }
let (send_ws_msg_tx, send_ws_msg_rx) = mpsc::unbounded::<WsMessage>(); let (send_ws_msg_tx, send_ws_msg_rx) = mpsc::unbounded::<WsMessage>();
@ -557,7 +548,7 @@ impl JanusGateway {
.connect("on-negotiation-needed", false, move |_| { .connect("on-negotiation-needed", false, move |_| {
let peer = upgrade_weak!(peer_clone, None); let peer = upgrade_weak!(peer_clone, None);
if let Err(err) = peer.on_negotiation_needed() { if let Err(err) = peer.on_negotiation_needed() {
gst_element_error!( gst::element_error!(
peer.bin, peer.bin,
gst::LibraryError::Failed, gst::LibraryError::Failed,
("Failed to negotiate: {:?}", err) ("Failed to negotiate: {:?}", err)
@ -573,16 +564,14 @@ impl JanusGateway {
.connect("on-ice-candidate", false, move |values| { .connect("on-ice-candidate", false, move |values| {
let mlineindex = values[1] let mlineindex = values[1]
.get::<u32>() .get::<u32>()
.expect("Invalid argument")
.expect("Invalid type"); .expect("Invalid type");
let candidate = values[2] let candidate = values[2]
.get::<String>() .get::<String>()
.expect("Invalid argument")
.expect("Invalid type"); .expect("Invalid type");
let peer = upgrade_weak!(peer_clone, None); let peer = upgrade_weak!(peer_clone, None);
if let Err(err) = peer.on_ice_candidate(mlineindex, candidate) { if let Err(err) = peer.on_ice_candidate(mlineindex, candidate) {
gst_element_error!( gst::element_error!(
peer.bin, peer.bin,
gst::LibraryError::Failed, gst::LibraryError::Failed,
("Failed to send ICE candidate: {:?}", err) ("Failed to send ICE candidate: {:?}", err)
@ -616,7 +605,7 @@ impl JanusGateway {
.expect("Invalid message receiver"); .expect("Invalid message receiver");
let mut send_ws_msg_rx = send_ws_msg_rx.fuse(); let mut send_ws_msg_rx = send_ws_msg_rx.fuse();
let timer = glib::interval_stream(10_000); let timer = glib::interval_stream(Duration::from_secs(10));
let mut timer_fuse = timer.fuse(); let mut timer_fuse = timer.fuse();
let mut sink = self.ws_sink.take().expect("Invalid websocket sink"); let mut sink = self.ws_sink.take().expect("Invalid websocket sink");
@ -673,7 +662,7 @@ impl JanusGateway {
if let Some(sdp) = &jsep.sdp { if let Some(sdp) = &jsep.sdp {
assert_eq!(jsep.type_, "answer"); assert_eq!(jsep.type_, "answer");
let peer = self.peer.lock().expect("Invalid peer"); let peer = self.peer.lock().expect("Invalid peer");
return peer.handle_sdp(&jsep.type_, &sdp); return peer.handle_sdp(&jsep.type_, sdp);
} else if let Some(ice) = &jsep.ice { } else if let Some(ice) = &jsep.ice {
let peer = self.peer.lock().expect("Invalid peer"); let peer = self.peer.lock().expect("Invalid peer");
return peer.handle_ice(ice.sdp_mline_index, &ice.candidate); return peer.handle_ice(ice.sdp_mline_index, &ice.candidate);

View file

@ -21,7 +21,6 @@
#![recursion_limit = "256"] #![recursion_limit = "256"]
use anyhow::bail; use anyhow::bail;
use gst::gst_element_error;
use gst::prelude::*; use gst::prelude::*;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
@ -77,7 +76,7 @@ impl App {
.downcast::<gst::Pipeline>() .downcast::<gst::Pipeline>()
.expect("Couldn't downcast pipeline"); .expect("Couldn't downcast pipeline");
let bus = pipeline.get_bus().unwrap(); let bus = pipeline.bus().unwrap();
let app = App(Arc::new(AppInner { pipeline })); let app = App(Arc::new(AppInner { pipeline }));
let app_weak = app.downgrade(); let app_weak = app.downgrade();
@ -99,14 +98,14 @@ impl App {
match message.view() { match message.view() {
MessageView::Error(err) => bail!( MessageView::Error(err) => bail!(
"Error from element {}: {} ({})", "Error from element {}: {} ({})",
err.get_src() err.src()
.map(|s| String::from(s.get_path_string())) .map(|s| String::from(s.path_string()))
.unwrap_or_else(|| String::from("None")), .unwrap_or_else(|| String::from("None")),
err.get_error(), err.error(),
err.get_debug().unwrap_or_else(|| String::from("None")), err.debug().unwrap_or_else(|| String::from("None")),
), ),
MessageView::Warning(warning) => { MessageView::Warning(warning) => {
println!("Warning: \"{}\"", warning.get_debug().unwrap()); println!("Warning: \"{}\"", warning.debug().unwrap());
} }
_ => (), _ => (),
} }
@ -121,7 +120,7 @@ impl App {
self.pipeline.call_async(|pipeline| { self.pipeline.call_async(|pipeline| {
// If this fails, post an error on the bus so we exit // If this fails, post an error on the bus so we exit
if pipeline.set_state(gst::State::Playing).is_err() { if pipeline.set_state(gst::State::Playing).is_err() {
gst_element_error!( gst::element_error!(
pipeline, pipeline,
gst::LibraryError::Failed, gst::LibraryError::Failed,
("Failed to set pipeline to Playing") ("Failed to set pipeline to Playing")