webrtc: Add Janus video-room example

This Rust crate provides a program able to connect to a Janus instance using
WebSockets and send a live video stream to the videoroom plugin.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-examples/-/merge_requests/15>
This commit is contained in:
Philippe Normand 2020-06-29 14:08:51 +01:00
parent f5d9471639
commit 234dff8dbb
4 changed files with 2017 additions and 0 deletions

1099
webrtc/janus/rust/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,25 @@
[package]
name = "janus-video-room"
version = "0.1.0"
authors = ["Philippe Normand <philn@igalia.com>"]
edition = "2018"
license = "LGPL"
[dependencies]
futures = "0.3"
structopt = { version = "0.3", default-features = false }
anyhow = "1"
url = "2"
rand = "0.7"
async-tungstenite = { version = "0.7", features = ["gio-runtime"] }
gst = { package = "gstreamer", version = "0.15", features = ["v1_14"] }
gst-webrtc = { package = "gstreamer-webrtc", version = "0.15" }
gst-sdp = { package = "gstreamer-sdp", version = "0.15", features = ["v1_14"] }
serde = "1"
serde_derive = "1"
serde_json = "1.0.53"
http = "0.2"
glib = "0.9"
gio = "0.8"
log = "0.4.8"
env_logger = "0.7.1"

View file

@ -0,0 +1,707 @@
// GStreamer
//
// Copyright (C) 2018 maxmcd <max.t.mcdonnell@gmail.com>
// Copyright (C) 2019 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2020 Philippe Normand <philn@igalia.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
// Boston, MA 02110-1301, USA.
use {
anyhow::{anyhow, bail, Context},
async_tungstenite::{gio::connect_async, tungstenite},
futures::channel::mpsc,
futures::sink::{Sink, SinkExt},
futures::stream::{Stream, StreamExt},
gst::gst_element_error,
gst::prelude::*,
http::Request,
rand::prelude::*,
serde_derive::{Deserialize, Serialize},
serde_json::json,
std::sync::{Arc, Mutex, Weak},
structopt::StructOpt,
tungstenite::Message as WsMessage,
};
// upgrade weak reference or return
#[macro_export]
macro_rules! upgrade_weak {
($x:ident, $r:expr) => {{
match $x.upgrade() {
Some(o) => o,
None => return $r,
}
}};
($x:ident) => {
upgrade_weak!($x, ())
};
}
#[derive(Debug)]
struct VideoParameter {
encoder: &'static str,
encoding_name: &'static str,
payloader: &'static str,
}
const VP8: VideoParameter = VideoParameter {
encoder: "vp8enc target-bitrate=100000 overshoot=25 undershoot=100 deadline=33000 keyframe-max-dist=1",
encoding_name: "VP8",
payloader: "rtpvp8pay picture-id-mode=2"
};
const H264: VideoParameter = VideoParameter {
encoder: "x264enc tune=zerolatency",
encoding_name: "H264",
payloader: "rtph264pay",
};
impl std::str::FromStr for VideoParameter {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"vp8" => Ok(VP8),
"h264" => Ok(H264),
_ => Err(anyhow!(
"Invalid video parameter: {}. Use either vp8 or h264",
s
)),
}
}
}
#[derive(Debug, StructOpt)]
pub struct Args {
#[structopt(short, long, default_value = "wss://janus.conf.meetecho.com/ws:8989")]
server: String,
#[structopt(short, long, default_value = "1234")]
room_id: u32,
#[structopt(short, long, default_value = "1234")]
feed_id: u32,
#[structopt(short, long, default_value = "vp8")]
webrtc_video_codec: VideoParameter,
}
#[derive(Serialize, Deserialize, Debug)]
struct Base {
janus: String,
transaction: Option<String>,
session_id: Option<i64>,
sender: Option<i64>,
}
#[derive(Serialize, Deserialize, Debug)]
struct DataHolder {
id: i64,
}
#[derive(Serialize, Deserialize, Debug)]
struct PluginDataHolder {
videoroom: String,
room: i64,
description: Option<String>,
id: Option<i64>,
configured: Option<String>,
video_codec: Option<String>,
unpublished: Option<i64>,
}
#[derive(Serialize, Deserialize, Debug)]
struct PluginHolder {
plugin: String,
data: PluginDataHolder,
}
#[derive(Serialize, Deserialize, Debug)]
struct IceHolder {
candidate: String,
#[serde(rename = "sdpMLineIndex")]
sdp_mline_index: u32,
}
#[derive(Serialize, Deserialize, Debug)]
struct JsepHolder {
#[serde(rename = "type")]
type_: String,
sdp: Option<String>,
ice: Option<IceHolder>,
}
#[derive(Serialize, Deserialize, Debug)]
struct JsonReply {
#[serde(flatten)]
base: Base,
data: Option<DataHolder>,
#[serde(rename = "plugindata")]
plugin_data: Option<PluginHolder>,
jsep: Option<JsepHolder>,
}
fn transaction_id() -> String {
thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(30)
.collect()
}
// Strong reference to the state of one peer
#[derive(Debug, Clone)]
struct Peer(Arc<PeerInner>);
// Weak reference to the state of one peer
#[derive(Debug, Clone)]
struct PeerWeak(Weak<PeerInner>);
impl PeerWeak {
// Try upgrading a weak reference to a strong one
fn upgrade(&self) -> Option<Peer> {
self.0.upgrade().map(Peer)
}
}
// To be able to access the Peers's fields directly
impl std::ops::Deref for Peer {
type Target = PeerInner;
fn deref(&self) -> &PeerInner {
&self.0
}
}
#[derive(Clone, Copy, Debug)]
struct ConnectionHandle {
id: i64,
session_id: i64,
}
// Actual peer state
#[derive(Debug)]
struct PeerInner {
handle: ConnectionHandle,
bin: gst::Bin,
webrtcbin: gst::Element,
send_msg_tx: Arc<Mutex<mpsc::UnboundedSender<WsMessage>>>,
}
impl Peer {
// Downgrade the strong reference to a weak reference
fn downgrade(&self) -> PeerWeak {
PeerWeak(Arc::downgrade(&self.0))
}
// Whenever webrtcbin tells us that (re-)negotiation is needed, simply ask
// for a new offer SDP from webrtcbin without any customization and then
// asynchronously send it to the peer via the WebSocket connection
fn on_negotiation_needed(&self) -> Result<(), anyhow::Error> {
info!("starting negotiation with peer");
let peer_clone = self.downgrade();
let promise = gst::Promise::new_with_change_func(move |res| {
let s = res.expect("no answer");
let peer = upgrade_weak!(peer_clone);
if let Err(err) = peer.on_offer_created(&s.to_owned()) {
gst_element_error!(
peer.bin,
gst::LibraryError::Failed,
("Failed to send SDP offer: {:?}", err)
);
}
});
self.webrtcbin
.emit("create-offer", &[&None::<gst::Structure>, &promise])?;
Ok(())
}
// Once webrtcbin has create the offer SDP for us, handle it by sending it to the peer via the
// WebSocket connection
fn on_offer_created(&self, reply: &gst::Structure) -> Result<(), anyhow::Error> {
let offer = reply
.get_value("offer")?
.get::<gst_webrtc::WebRTCSessionDescription>()
.expect("Invalid argument")
.expect("Invalid offer");
self.webrtcbin
.emit("set-local-description", &[&offer, &None::<gst::Promise>])?;
info!("sending SDP offer to peer: {:?}", offer.get_sdp().as_text());
let transaction = transaction_id();
let sdp_data = offer.get_sdp().as_text()?;
let msg = WsMessage::Text(
json!({
"janus": "message",
"transaction": transaction,
"session_id": self.handle.session_id,
"handle_id": self.handle.id,
"body": {
"request": "publish",
"audio": true,
"video": true,
},
"jsep": {
"sdp": sdp_data,
"trickle": true,
"type": "offer"
}
})
.to_string(),
);
self.send_msg_tx
.lock()
.expect("Invalid message sender")
.unbounded_send(msg)
.with_context(|| "Failed to send SDP offer".to_string())?;
Ok(())
}
// Once webrtcbin has create the answer SDP for us, handle it by sending it to the peer via the
// WebSocket connection
fn on_answer_created(&self, reply: &gst::Structure) -> Result<(), anyhow::Error> {
let answer = reply
.get_value("answer")?
.get::<gst_webrtc::WebRTCSessionDescription>()
.expect("Invalid argument")
.expect("Invalid answer");
self.webrtcbin
.emit("set-local-description", &[&answer, &None::<gst::Promise>])?;
info!(
"sending SDP answer to peer: {:?}",
answer.get_sdp().as_text()
);
Ok(())
}
// Handle incoming SDP answers from the peer
fn handle_sdp(&self, type_: &str, sdp: &str) -> Result<(), anyhow::Error> {
if type_ == "answer" {
info!("Received answer:\n{}\n", sdp);
let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
.map_err(|_| anyhow!("Failed to parse SDP answer"))?;
let answer =
gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret);
self.webrtcbin
.emit("set-remote-description", &[&answer, &None::<gst::Promise>])?;
Ok(())
} else if type_ == "offer" {
info!("Received offer:\n{}\n", sdp);
let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes())
.map_err(|_| anyhow!("Failed to parse SDP offer"))?;
// And then asynchronously start our pipeline and do the next steps. The
// pipeline needs to be started before we can create an answer
let peer_clone = self.downgrade();
self.bin.call_async(move |_pipeline| {
let peer = upgrade_weak!(peer_clone);
let offer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
ret,
);
peer.0
.webrtcbin
.emit("set-remote-description", &[&offer, &None::<gst::Promise>])
.expect("Unable to set remote description");
let peer_clone = peer.downgrade();
let promise = gst::Promise::new_with_change_func(move |reply| {
let s = reply.expect("No answer");
let peer = upgrade_weak!(peer_clone);
if let Err(err) = peer.on_answer_created(&s.to_owned()) {
gst_element_error!(
peer.bin,
gst::LibraryError::Failed,
("Failed to send SDP answer: {:?}", err)
);
}
});
peer.0
.webrtcbin
.emit("create-answer", &[&None::<gst::Structure>, &promise])
.expect("Unable to create answer");
});
Ok(())
} else {
bail!("Sdp type is not \"answer\" but \"{}\"", type_)
}
}
// Handle incoming ICE candidates from the peer by passing them to webrtcbin
fn handle_ice(&self, sdp_mline_index: u32, candidate: &str) -> Result<(), anyhow::Error> {
info!(
"Received remote ice-candidate {} {}",
sdp_mline_index, candidate
);
self.webrtcbin
.emit("add-ice-candidate", &[&sdp_mline_index, &candidate])?;
Ok(())
}
// Asynchronously send ICE candidates to the peer via the WebSocket connection as a JSON
// message
fn on_ice_candidate(&self, mlineindex: u32, candidate: String) -> Result<(), anyhow::Error> {
let transaction = transaction_id();
info!("Sending ICE {} {}", mlineindex, &candidate);
let msg = WsMessage::Text(
json!({
"janus": "trickle",
"transaction": transaction,
"session_id": self.handle.session_id,
"handle_id": self.handle.id,
"candidate": {
"candidate": candidate,
"sdpMLineIndex": mlineindex
},
})
.to_string(),
);
self.send_msg_tx
.lock()
.expect("Invalid message sender")
.unbounded_send(msg)
.with_context(|| "Failed to send ICE candidate".to_string())?;
Ok(())
}
}
// At least shut down the bin here if it didn't happen so far
impl Drop for PeerInner {
fn drop(&mut self) {
let _ = self.bin.set_state(gst::State::Null);
}
}
type WsStream =
std::pin::Pin<Box<dyn Stream<Item = Result<WsMessage, tungstenite::error::Error>> + Send>>;
type WsSink = std::pin::Pin<Box<dyn Sink<WsMessage, Error = tungstenite::error::Error> + Send>>;
pub struct JanusGateway {
ws_stream: Option<WsStream>,
ws_sink: Option<WsSink>,
handle: ConnectionHandle,
peer: Mutex<Peer>,
send_ws_msg_rx: Option<mpsc::UnboundedReceiver<WsMessage>>,
}
impl JanusGateway {
pub async fn new(pipeline: gst::Bin) -> Result<Self, anyhow::Error> {
let args = Args::from_args();
let request = Request::builder()
.uri(&args.server)
.header("Sec-WebSocket-Protocol", "janus-protocol")
.body(())?;
let (mut ws, _) = connect_async(request).await?;
let transaction = transaction_id();
let msg = WsMessage::Text(
json!({
"janus": "create",
"transaction": transaction,
})
.to_string(),
);
ws.send(msg).await?;
let msg = ws
.next()
.await
.ok_or_else(|| anyhow!("didn't receive anything"))??;
let payload = msg.to_text()?;
let json_msg: JsonReply = serde_json::from_str(payload)?;
assert_eq!(json_msg.base.janus, "success");
assert_eq!(json_msg.base.transaction, Some(transaction));
let session_id = json_msg.data.expect("no session id").id;
let transaction = transaction_id();
let msg = WsMessage::Text(
json!({
"janus": "attach",
"transaction": transaction,
"plugin": "janus.plugin.videoroom",
"session_id": session_id,
})
.to_string(),
);
ws.send(msg).await?;
let msg = ws
.next()
.await
.ok_or_else(|| anyhow!("didn't receive anything"))??;
let payload = msg.to_text()?;
let json_msg: JsonReply = serde_json::from_str(payload)?;
assert_eq!(json_msg.base.janus, "success");
assert_eq!(json_msg.base.transaction, Some(transaction));
let handle = json_msg.data.expect("no session id").id;
let transaction = transaction_id();
let msg = WsMessage::Text(
json!({
"janus": "message",
"transaction": transaction,
"session_id": session_id,
"handle_id": handle,
"body": {
"request": "join",
"ptype": "publisher",
"room": args.room_id,
"id": args.feed_id,
},
})
.to_string(),
);
ws.send(msg).await?;
let webrtcbin = pipeline
.get_by_name("webrtcbin")
.expect("can't find webrtcbin");
let webrtc_codec = &args.webrtc_video_codec;
let bin_description = &format!(
"{encoder} name=encoder ! {payloader} ! queue ! capsfilter name=webrtc-vsink caps=\"application/x-rtp,media=video,encoding-name={encoding_name},payload=96\"",
encoder=webrtc_codec.encoder, payloader=webrtc_codec.payloader,
encoding_name=webrtc_codec.encoding_name
);
let encode_bin = gst::parse_bin_from_description(bin_description, false)?;
encode_bin.set_name("encode-bin")?;
pipeline.add(&encode_bin).expect("Failed to add encode bin");
let video_queue = pipeline.get_by_name("vqueue").expect("No vqueue found");
let encoder = encode_bin.get_by_name("encoder").expect("No encoder");
let srcpad = video_queue
.get_static_pad("src")
.expect("Failed to get video queue src pad");
let sinkpad = encoder
.get_static_pad("sink")
.expect("Failed to get sink pad from encoder");
if let Ok(video_ghost_pad) = gst::GhostPad::new(Some("video_sink"), &sinkpad) {
encode_bin.add_pad(&video_ghost_pad)?;
srcpad.link(&video_ghost_pad)?;
}
let sinkpad2 = webrtcbin
.get_request_pad("sink_%u")
.expect("Unable to request outgoing webrtcbin pad");
let vsink = encode_bin
.get_by_name("webrtc-vsink")
.expect("No webrtc-vsink found");
let srcpad = vsink
.get_static_pad("src")
.expect("Element without src pad");
if let Ok(webrtc_ghost_pad) = gst::GhostPad::new(Some("webrtc_video_src"), &srcpad) {
encode_bin.add_pad(&webrtc_ghost_pad)?;
webrtc_ghost_pad.link(&sinkpad2)?;
}
if let Ok(transceiver) = webrtcbin.emit("get-transceiver", &[&0.to_value()]) {
if let Some(t) = transceiver {
if let Ok(obj) = t.get::<glib::Object>() {
obj.expect("Invalid transceiver")
.set_property("do-nack", &true.to_value())?;
}
}
}
let (send_ws_msg_tx, send_ws_msg_rx) = mpsc::unbounded::<WsMessage>();
let connection_handle = ConnectionHandle {
id: handle,
session_id,
};
let peer = Peer(Arc::new(PeerInner {
handle: connection_handle,
bin: pipeline,
webrtcbin,
send_msg_tx: Arc::new(Mutex::new(send_ws_msg_tx)),
}));
// Connect to on-negotiation-needed to handle sending an Offer
let peer_clone = peer.downgrade();
peer.webrtcbin
.connect("on-negotiation-needed", false, move |_| {
let peer = upgrade_weak!(peer_clone, None);
if let Err(err) = peer.on_negotiation_needed() {
gst_element_error!(
peer.bin,
gst::LibraryError::Failed,
("Failed to negotiate: {:?}", err)
);
}
None
})?;
// Whenever there is a new ICE candidate, send it to the peer
let peer_clone = peer.downgrade();
peer.webrtcbin
.connect("on-ice-candidate", false, move |values| {
let mlineindex = values[1]
.get::<u32>()
.expect("Invalid argument")
.expect("Invalid type");
let candidate = values[2]
.get::<String>()
.expect("Invalid argument")
.expect("Invalid type");
let peer = upgrade_weak!(peer_clone, None);
if let Err(err) = peer.on_ice_candidate(mlineindex, candidate) {
gst_element_error!(
peer.bin,
gst::LibraryError::Failed,
("Failed to send ICE candidate: {:?}", err)
);
}
None
})?;
// Split the websocket into the Sink and Stream
let (ws_sink, ws_stream) = ws.split();
Ok(Self {
ws_stream: Some(ws_stream.boxed()),
ws_sink: Some(Box::pin(ws_sink)),
handle: connection_handle,
peer: Mutex::new(peer),
send_ws_msg_rx: Some(send_ws_msg_rx),
})
}
pub async fn run(&mut self) -> Result<(), anyhow::Error> {
if let Some(ws_stream) = self.ws_stream.take() {
// Fuse the Stream, required for the select macro
let mut ws_stream = ws_stream.fuse();
// Channel for outgoing WebSocket messages from other threads
let send_ws_msg_rx = self
.send_ws_msg_rx
.take()
.expect("Invalid message receiver");
let mut send_ws_msg_rx = send_ws_msg_rx.fuse();
let timer = glib::interval_stream(10_000);
let mut timer_fuse = timer.fuse();
let mut sink = self.ws_sink.take().expect("Invalid websocket sink");
loop {
let ws_msg = futures::select! {
// Handle the WebSocket messages here
ws_msg = ws_stream.select_next_some() => {
match ws_msg? {
WsMessage::Close(_) => {
info!("peer disconnected");
break
},
WsMessage::Ping(data) => Some(WsMessage::Pong(data)),
WsMessage::Pong(_) => None,
WsMessage::Binary(_) => None,
WsMessage::Text(text) => {
if let Err(err) = self.handle_websocket_message(&text) {
error!("Failed to parse message: {} ... error: {}", &text, err);
}
None
},
}
},
// Handle WebSocket messages we created asynchronously
// to send them out now
ws_msg = send_ws_msg_rx.select_next_some() => Some(ws_msg),
// Handle keepalive ticks, fired every 10 seconds
ws_msg = timer_fuse.select_next_some() => {
let transaction = transaction_id();
Some(WsMessage::Text(
json!({
"janus": "keepalive",
"transaction": transaction,
"handle_id": self.handle.id,
"session_id": self.handle.session_id,
}).to_string(),
))
},
// Once we're done, break the loop and return
complete => break,
};
// If there's a message to send out, do so now
if let Some(ws_msg) = ws_msg {
sink.send(ws_msg).await?;
}
}
}
Ok(())
}
fn handle_jsep(&self, jsep: &JsepHolder) -> Result<(), anyhow::Error> {
if let Some(sdp) = &jsep.sdp {
assert_eq!(jsep.type_, "answer");
let peer = self.peer.lock().expect("Invalid peer");
return peer.handle_sdp(&jsep.type_, &sdp);
} else if let Some(ice) = &jsep.ice {
let peer = self.peer.lock().expect("Invalid peer");
return peer.handle_ice(ice.sdp_mline_index, &ice.candidate);
}
Ok(())
}
// Handle WebSocket messages, both our own as well as WebSocket protocol messages
fn handle_websocket_message(&self, msg: &str) -> Result<(), anyhow::Error> {
trace!("Incoming raw message: {}", msg);
let json_msg: JsonReply = serde_json::from_str(msg)?;
let payload_type = &json_msg.base.janus;
if payload_type == "ack" {
trace!(
"Ack transaction {:#?}, sessionId {:#?}",
json_msg.base.transaction,
json_msg.base.session_id
);
} else {
debug!("Incoming JSON WebSocket message: {:#?}", json_msg);
}
if payload_type == "event" {
if let Some(_plugin_data) = json_msg.plugin_data {
if let Some(jsep) = json_msg.jsep {
return self.handle_jsep(&jsep);
}
}
}
Ok(())
}
}

View file

@ -0,0 +1,186 @@
// GStreamer
//
// Copyright (C) 2019 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2020 Philippe Normand <philn@igalia.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
// Boston, MA 02110-1301, USA.
#![recursion_limit = "256"]
use anyhow::bail;
use gst::gst_element_error;
use gst::prelude::*;
use std::sync::{Arc, Weak};
#[macro_use]
extern crate log;
mod janus;
// Strong reference to our application state
#[derive(Debug, Clone)]
struct App(Arc<AppInner>);
// Weak reference to our application state
#[derive(Debug, Clone)]
struct AppWeak(Weak<AppInner>);
// Actual application state
#[derive(Debug)]
struct AppInner {
pipeline: gst::Pipeline,
}
// To be able to access the App's fields directly
impl std::ops::Deref for App {
type Target = AppInner;
fn deref(&self) -> &AppInner {
&self.0
}
}
impl AppWeak {
// Try upgrading a weak reference to a strong one
fn upgrade(&self) -> Option<App> {
self.0.upgrade().map(App)
}
}
impl App {
// Downgrade the strong reference to a weak reference
fn downgrade(&self) -> AppWeak {
AppWeak(Arc::downgrade(&self.0))
}
fn new() -> Result<Self, anyhow::Error> {
let pipeline = gst::parse_launch(
&"webrtcbin name=webrtcbin stun-server=stun://stun.l.google.com:19302 \
videotestsrc pattern=ball ! videoconvert ! queue name=vqueue"
.to_string(),
)?;
let pipeline = pipeline
.downcast::<gst::Pipeline>()
.expect("Couldn't downcast pipeline");
let bus = pipeline.get_bus().unwrap();
let app = App(Arc::new(AppInner { pipeline }));
let app_weak = app.downgrade();
bus.add_watch_local(move |_bus, msg| {
let app = upgrade_weak!(app_weak, glib::Continue(false));
if app.handle_pipeline_message(msg).is_err() {
return glib::Continue(false);
}
glib::Continue(true)
})
.expect("Unable to add bus watch");
Ok(app)
}
fn handle_pipeline_message(&self, message: &gst::Message) -> Result<(), anyhow::Error> {
use gst::message::MessageView;
match message.view() {
MessageView::Error(err) => bail!(
"Error from element {}: {} ({})",
err.get_src()
.map(|s| String::from(s.get_path_string()))
.unwrap_or_else(|| String::from("None")),
err.get_error(),
err.get_debug().unwrap_or_else(|| String::from("None")),
),
MessageView::Warning(warning) => {
println!("Warning: \"{}\"", warning.get_debug().unwrap());
}
_ => (),
}
Ok(())
}
pub async fn run(&self) -> Result<(), anyhow::Error> {
let bin = self.pipeline.clone().upcast::<gst::Bin>();
let mut gw = janus::JanusGateway::new(bin).await?;
// Asynchronously set the pipeline to Playing
self.pipeline.call_async(|pipeline| {
// If this fails, post an error on the bus so we exit
if pipeline.set_state(gst::State::Playing).is_err() {
gst_element_error!(
pipeline,
gst::LibraryError::Failed,
("Failed to set pipeline to Playing")
);
}
});
gw.run().await?;
Ok(())
}
}
// Make sure to shut down the pipeline when it goes out of scope
// to release any system resources
impl Drop for AppInner {
fn drop(&mut self) {
let _ = self.pipeline.set_state(gst::State::Null);
}
}
// Check if all GStreamer plugins we require are available
fn check_plugins() -> Result<(), anyhow::Error> {
let needed = [
"videotestsrc",
"videoconvert",
"autodetect",
"vpx",
"webrtc",
"nice",
"dtls",
"srtp",
"rtpmanager",
"rtp",
];
let registry = gst::Registry::get();
let missing = needed
.iter()
.filter(|n| registry.find_plugin(n).is_none())
.cloned()
.collect::<Vec<_>>();
if !missing.is_empty() {
bail!("Missing plugins: {:?}", missing);
} else {
Ok(())
}
}
async fn async_main() -> Result<(), anyhow::Error> {
gst::init()?;
check_plugins()?;
let app = App::new()?;
app.run().await?;
Ok(())
}
fn main() -> Result<(), anyhow::Error> {
env_logger::init();
let main_context = glib::MainContext::default();
main_context.block_on(async_main())
}