mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-25 13:01:07 +00:00
net/webrtc/janusvr: add JanusVRWebRTCSink plugin/signaller
The JanusVRWebRTCSink is a new plugin that integrates with the Video Room plugin of the Janus Gateway, which simplifies WebRTC communication. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1362>
This commit is contained in:
parent
773ebc7854
commit
80b58f3b45
9 changed files with 882 additions and 3 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -2816,10 +2816,12 @@ dependencies = [
|
|||
"gstreamer-video",
|
||||
"gstreamer-webrtc",
|
||||
"http 0.2.11",
|
||||
"http 1.0.0",
|
||||
"human_bytes",
|
||||
"livekit-api",
|
||||
"livekit-protocol",
|
||||
"parse_link_header",
|
||||
"rand",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"serde",
|
||||
|
|
|
@ -6609,6 +6609,37 @@
|
|||
},
|
||||
"rank": "none"
|
||||
},
|
||||
"janusvrwebrtcsink": {
|
||||
"author": "Eva Pace <epace@igalia.com>",
|
||||
"description": "WebRTC sink with Janus Video Room signaller",
|
||||
"hierarchy": [
|
||||
"GstJanusVRWebRTCSink",
|
||||
"GstBaseWebRTCSink",
|
||||
"GstBin",
|
||||
"GstElement",
|
||||
"GstObject",
|
||||
"GInitiallyUnowned",
|
||||
"GObject"
|
||||
],
|
||||
"interfaces": [
|
||||
"GstChildProxy",
|
||||
"GstNavigation"
|
||||
],
|
||||
"klass": "Sink/Network/WebRTC",
|
||||
"pad-templates": {
|
||||
"audio_%%u": {
|
||||
"caps": "audio/x-raw:\naudio/x-opus:\n",
|
||||
"direction": "sink",
|
||||
"presence": "request"
|
||||
},
|
||||
"video_%%u": {
|
||||
"caps": "video/x-raw:\n\nvideo/x-raw(memory:CUDAMemory):\n\nvideo/x-raw(memory:GLMemory):\n\nvideo/x-raw(memory:NVMM):\n\nvideo/x-raw(memory:D3D11Memory):\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n",
|
||||
"direction": "sink",
|
||||
"presence": "request"
|
||||
}
|
||||
},
|
||||
"rank": "none"
|
||||
},
|
||||
"livekitwebrtcsink": {
|
||||
"author": "Olivier Crête <olivier.crete@collabora.com>",
|
||||
"description": "WebRTC sink with LiveKit signaller",
|
||||
|
|
|
@ -56,6 +56,8 @@ livekit-api = { version = "0.3", default-features = false, features = ["signal-c
|
|||
|
||||
warp = "0.3"
|
||||
crossbeam-channel = "0.5"
|
||||
rand = "0.8"
|
||||
http_1 = { version = "1.0", package = "http" }
|
||||
|
||||
[dev-dependencies]
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
|
|
726
net/webrtc/src/janusvr_signaller/imp.rs
Normal file
726
net/webrtc/src/janusvr_signaller/imp.rs
Normal file
|
@ -0,0 +1,726 @@
|
|||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use crate::signaller::{Signallable, SignallableImpl};
|
||||
use crate::RUNTIME;
|
||||
|
||||
use anyhow::{anyhow, Error};
|
||||
use async_tungstenite::tungstenite;
|
||||
use futures::channel::mpsc;
|
||||
use futures::sink::SinkExt;
|
||||
use futures::stream::StreamExt;
|
||||
use gst::glib;
|
||||
use gst::glib::once_cell::sync::Lazy;
|
||||
use gst::glib::Properties;
|
||||
use gst::prelude::*;
|
||||
use gst::subclass::prelude::*;
|
||||
use http_1::Uri;
|
||||
use rand::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::{task, time::timeout};
|
||||
use tungstenite::Message as WsMessage;
|
||||
|
||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||
gst::DebugCategory::new(
|
||||
"webrtc-janusvr-signaller",
|
||||
gst::DebugColorFlags::empty(),
|
||||
Some("WebRTC Janus Video Room signaller"),
|
||||
)
|
||||
});
|
||||
|
||||
fn transaction_id() -> String {
|
||||
thread_rng()
|
||||
.sample_iter(&rand::distributions::Alphanumeric)
|
||||
.map(char::from)
|
||||
.take(30)
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn feed_id() -> u32 {
|
||||
thread_rng().gen()
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
struct KeepAliveMsg {
|
||||
janus: String,
|
||||
transaction: String,
|
||||
session_id: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
struct CreateSessionMsg {
|
||||
janus: String,
|
||||
transaction: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
struct AttachPluginMsg {
|
||||
janus: String,
|
||||
transaction: String,
|
||||
plugin: String,
|
||||
session_id: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
struct RoomRequestBody {
|
||||
request: String,
|
||||
ptype: String,
|
||||
room: u64,
|
||||
id: u32,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
display: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
struct RoomRequestMsg {
|
||||
janus: String,
|
||||
transaction: String,
|
||||
session_id: u64,
|
||||
handle_id: u64,
|
||||
body: RoomRequestBody,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
struct PublishBody {
|
||||
request: String,
|
||||
audio: bool,
|
||||
video: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
struct Jsep {
|
||||
sdp: String,
|
||||
trickle: Option<bool>,
|
||||
r#type: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
struct PublishMsg {
|
||||
janus: String,
|
||||
transaction: String,
|
||||
session_id: u64,
|
||||
handle_id: u64,
|
||||
body: PublishBody,
|
||||
jsep: Jsep,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
struct Candidate {
|
||||
candidate: String,
|
||||
#[serde(rename = "sdpMLineIndex")]
|
||||
sdp_m_line_index: u32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
struct TrickleMsg {
|
||||
janus: String,
|
||||
transaction: String,
|
||||
session_id: u64,
|
||||
handle_id: u64,
|
||||
candidate: Candidate,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(untagged)]
|
||||
enum OutgoingMessage {
|
||||
KeepAlive(KeepAliveMsg),
|
||||
CreateSession(CreateSessionMsg),
|
||||
AttachPlugin(AttachPluginMsg),
|
||||
RoomRequest(RoomRequestMsg),
|
||||
Publish(PublishMsg),
|
||||
Trickle(TrickleMsg),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct InnerError {
|
||||
code: i32,
|
||||
reason: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct RoomJoined {
|
||||
room: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct RoomEvent {
|
||||
room: Option<u64>,
|
||||
error_code: Option<i32>,
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(tag = "videoroom")]
|
||||
enum VideoRoomData {
|
||||
#[serde(rename = "joined")]
|
||||
Joined(RoomJoined),
|
||||
#[serde(rename = "event")]
|
||||
Event(RoomEvent),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(tag = "plugin")]
|
||||
enum PluginData {
|
||||
#[serde(rename = "janus.plugin.videoroom")]
|
||||
VideoRoom { data: VideoRoomData },
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
struct DataHolder {
|
||||
id: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct SuccessMsg {
|
||||
transaction: Option<String>,
|
||||
session_id: Option<u64>,
|
||||
data: Option<DataHolder>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct EventMsg {
|
||||
transaction: Option<String>,
|
||||
session_id: Option<u64>,
|
||||
plugindata: Option<PluginData>,
|
||||
jsep: Option<Jsep>,
|
||||
}
|
||||
|
||||
// IncomingMessage
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(tag = "janus")]
|
||||
enum JsonReply {
|
||||
#[serde(rename = "ack")]
|
||||
Ack,
|
||||
#[serde(rename = "success")]
|
||||
Success(SuccessMsg),
|
||||
#[serde(rename = "event")]
|
||||
Event(EventMsg),
|
||||
#[serde(rename = "webrtcup")]
|
||||
WebRTCUp,
|
||||
#[serde(rename = "media")]
|
||||
Media,
|
||||
#[serde(rename = "error")]
|
||||
Error(InnerError),
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct State {
|
||||
ws_sender: Option<mpsc::Sender<OutgoingMessage>>,
|
||||
send_task_handle: Option<task::JoinHandle<Result<(), Error>>>,
|
||||
recv_task_handle: Option<task::JoinHandle<()>>,
|
||||
session_id: Option<u64>,
|
||||
handle_id: Option<u64>,
|
||||
transaction_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Settings {
|
||||
janus_endpoint: String,
|
||||
room_id: Option<String>,
|
||||
feed_id: u32,
|
||||
display_name: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for Settings {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
janus_endpoint: "ws://127.0.0.1:8188".to_string(),
|
||||
room_id: None,
|
||||
feed_id: feed_id(),
|
||||
display_name: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Properties)]
|
||||
#[properties(wrapper_type = super::JanusVRSignaller)]
|
||||
pub struct Signaller {
|
||||
state: Mutex<State>,
|
||||
#[property(name="janus-endpoint", get, set, type = String, member = janus_endpoint, blurb = "The Janus server endpoint to POST SDP offer to")]
|
||||
#[property(name="room-id", get, set, type = String, member = room_id, blurb = "The Janus Room ID that will be joined to")]
|
||||
#[property(name="feed-id", get, set, type = u32, member = feed_id, blurb = "The Janus Feed ID to identify where the track is coming from")]
|
||||
#[property(name="display-name", get, set, type = String, member = display_name, blurb = "The name of the publisher in the Janus Video Room")]
|
||||
settings: Mutex<Settings>,
|
||||
}
|
||||
|
||||
impl Signaller {
|
||||
fn raise_error(&self, msg: String) {
|
||||
self.obj()
|
||||
.emit_by_name::<()>("error", &[&format!("Error: {msg}")]);
|
||||
}
|
||||
|
||||
async fn connect(&self) -> Result<(), Error> {
|
||||
let settings = self.settings.lock().unwrap().clone();
|
||||
use tungstenite::client::IntoClientRequest;
|
||||
let mut request = settings
|
||||
.janus_endpoint
|
||||
.parse::<Uri>()?
|
||||
.into_client_request()?;
|
||||
request.headers_mut().append(
|
||||
"Sec-WebSocket-Protocol",
|
||||
http_1::HeaderValue::from_static("janus-protocol"),
|
||||
);
|
||||
|
||||
let (ws, _) = timeout(
|
||||
// FIXME: Make the timeout configurable
|
||||
Duration::from_secs(20),
|
||||
async_tungstenite::tokio::connect_async(request),
|
||||
)
|
||||
.await??;
|
||||
|
||||
// Channel for asynchronously sending out websocket message
|
||||
let (mut ws_sink, mut ws_stream) = ws.split();
|
||||
|
||||
// 1000 is completely arbitrary, we simply don't want infinite piling
|
||||
// up of messages as with unbounded
|
||||
let (ws_sender, mut ws_receiver) = mpsc::channel::<OutgoingMessage>(1000);
|
||||
let send_task_handle =
|
||||
RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
opt = ws_receiver.next() => match opt {
|
||||
Some(msg) => {
|
||||
gst::log!(CAT, "Sending websocket message {:?}", msg);
|
||||
ws_sink
|
||||
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
|
||||
.await?;
|
||||
},
|
||||
None => break,
|
||||
},
|
||||
_ = tokio::time::sleep(Duration::from_secs(10)) => {
|
||||
if let Some(ref this) = this {
|
||||
let (transaction, session_id) = {
|
||||
let state = this.state.lock().unwrap();
|
||||
(state.transaction_id.clone().unwrap(),
|
||||
state.session_id.unwrap())
|
||||
};
|
||||
let msg = OutgoingMessage::KeepAlive(KeepAliveMsg {
|
||||
janus: "keepalive".to_string(),
|
||||
transaction,
|
||||
session_id,
|
||||
});
|
||||
ws_sink
|
||||
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let msg = "Done sending";
|
||||
this.map_or_else(|| gst::info!(CAT, "{msg}"),
|
||||
|this| gst::info!(CAT, imp: this, "{msg}")
|
||||
);
|
||||
|
||||
ws_sink.send(WsMessage::Close(None)).await?;
|
||||
ws_sink.close().await?;
|
||||
|
||||
Ok::<(), Error>(())
|
||||
}));
|
||||
|
||||
let recv_task_handle =
|
||||
RUNTIME.spawn(glib::clone!(@weak-allow-none self as this => async move {
|
||||
while let Some(msg) = tokio_stream::StreamExt::next(&mut ws_stream).await {
|
||||
if let Some(ref this) = this {
|
||||
if let ControlFlow::Break(_) = this.handle_msg(msg) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let msg = "Stopped websocket receiving";
|
||||
this.map_or_else(|| gst::info!(CAT, "{msg}"),
|
||||
|this| gst::info!(CAT, imp: this, "{msg}")
|
||||
);
|
||||
}));
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.ws_sender = Some(ws_sender);
|
||||
state.send_task_handle = Some(send_task_handle);
|
||||
state.recv_task_handle = Some(recv_task_handle);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_msg(
|
||||
&self,
|
||||
msg: Result<WsMessage, async_tungstenite::tungstenite::Error>,
|
||||
) -> ControlFlow<()> {
|
||||
match msg {
|
||||
Ok(WsMessage::Text(msg)) => {
|
||||
gst::trace!(CAT, imp: self, "Received message {}", msg);
|
||||
if let Ok(reply) = serde_json::from_str::<JsonReply>(&msg) {
|
||||
self.handle_reply(reply);
|
||||
} else {
|
||||
gst::error!(CAT, imp: self, "Unknown message from server: {}", msg);
|
||||
}
|
||||
}
|
||||
Ok(WsMessage::Close(reason)) => {
|
||||
gst::info!(CAT, imp: self, "websocket connection closed: {:?}", reason);
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
self.raise_error(err.to_string());
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
}
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
|
||||
fn handle_reply(&self, reply: JsonReply) {
|
||||
match reply {
|
||||
JsonReply::WebRTCUp => {
|
||||
gst::trace!(CAT, imp: self, "WebRTC streaming is working!");
|
||||
}
|
||||
JsonReply::Success(success) => {
|
||||
if let Some(data) = success.data {
|
||||
if success.session_id.is_none() {
|
||||
gst::trace!(CAT, imp: self, "Janus session {} was created successfully", data.id);
|
||||
self.set_session_id(data.id);
|
||||
self.attach_plugin();
|
||||
} else {
|
||||
gst::trace!(CAT, imp: self, "Attached to Janus Video Room plugin successfully, handle: {}", data.id);
|
||||
self.set_handle_id(data.id);
|
||||
self.join_room();
|
||||
}
|
||||
}
|
||||
}
|
||||
JsonReply::Event(event) => {
|
||||
if let Some(PluginData::VideoRoom { data: plugindata }) = event.plugindata {
|
||||
match plugindata {
|
||||
VideoRoomData::Joined(joined) => {
|
||||
if let Some(room) = joined.room {
|
||||
gst::trace!(CAT, imp: self, "Joined room {} successfully", room);
|
||||
self.session_requested();
|
||||
}
|
||||
}
|
||||
VideoRoomData::Event(room_event) => {
|
||||
if room_event.error_code.is_some() && room_event.error.is_some() {
|
||||
self.raise_error(format!(
|
||||
"code: {}, reason: {}",
|
||||
room_event.error_code.unwrap(),
|
||||
room_event.error.unwrap(),
|
||||
));
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(jsep) = event.jsep {
|
||||
if jsep.r#type == "answer" {
|
||||
gst::trace!(CAT, imp: self, "Session requested successfully");
|
||||
self.handle_answer(jsep.sdp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
JsonReply::Error(error) => {
|
||||
self.raise_error(format!("code: {}, reason: {}", error.code, error.reason))
|
||||
}
|
||||
// ignore for now
|
||||
JsonReply::Ack | JsonReply::Media => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn send(&self, msg: OutgoingMessage) {
|
||||
let state = self.state.lock().unwrap();
|
||||
if let Some(mut sender) = state.ws_sender.clone() {
|
||||
RUNTIME.spawn(glib::clone!(@weak self as this => async move {
|
||||
if let Err(err) = sender.send(msg).await {
|
||||
this.raise_error(err.to_string());
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// Only used at the end when cleaning up the resources.
|
||||
// So that `SignallableImpl::stop` waits the last message
|
||||
// to be sent properly.
|
||||
fn send_blocking(&self, msg: OutgoingMessage) {
|
||||
let state = self.state.lock().unwrap();
|
||||
if let Some(mut sender) = state.ws_sender.clone() {
|
||||
RUNTIME.block_on(glib::clone!(@weak self as this => async move {
|
||||
if let Err(err) = sender.send(msg).await {
|
||||
this.raise_error(err.to_string());
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
fn set_transaction_id(&self, transaction: String) {
|
||||
self.state.lock().unwrap().transaction_id = Some(transaction);
|
||||
}
|
||||
|
||||
fn create_session(&self) {
|
||||
let transaction = transaction_id();
|
||||
self.set_transaction_id(transaction.clone());
|
||||
self.send(OutgoingMessage::CreateSession(CreateSessionMsg {
|
||||
janus: "create".to_string(),
|
||||
transaction,
|
||||
}));
|
||||
}
|
||||
|
||||
fn set_session_id(&self, session_id: u64) {
|
||||
self.state.lock().unwrap().session_id = Some(session_id);
|
||||
}
|
||||
|
||||
fn set_handle_id(&self, handle_id: u64) {
|
||||
self.state.lock().unwrap().handle_id = Some(handle_id);
|
||||
}
|
||||
|
||||
fn attach_plugin(&self) {
|
||||
let (transaction, session_id) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
|
||||
(
|
||||
state.transaction_id.clone().unwrap(),
|
||||
state.session_id.unwrap(),
|
||||
)
|
||||
};
|
||||
self.send(OutgoingMessage::AttachPlugin(AttachPluginMsg {
|
||||
janus: "attach".to_string(),
|
||||
transaction,
|
||||
plugin: "janus.plugin.videoroom".to_string(),
|
||||
session_id,
|
||||
}));
|
||||
}
|
||||
|
||||
fn join_room(&self) {
|
||||
let (transaction, session_id, handle_id, room, feed_id, display) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
if settings.room_id.is_none() {
|
||||
self.raise_error("Janus Room ID must be set".to_string());
|
||||
return;
|
||||
}
|
||||
|
||||
(
|
||||
state.transaction_id.clone().unwrap(),
|
||||
state.session_id.unwrap(),
|
||||
state.handle_id.unwrap(),
|
||||
settings.room_id.as_ref().unwrap().parse().unwrap(),
|
||||
settings.feed_id,
|
||||
settings.display_name.clone(),
|
||||
)
|
||||
};
|
||||
self.send(OutgoingMessage::RoomRequest(RoomRequestMsg {
|
||||
janus: "message".to_string(),
|
||||
transaction,
|
||||
session_id,
|
||||
handle_id,
|
||||
body: RoomRequestBody {
|
||||
request: "join".to_string(),
|
||||
ptype: "publisher".to_string(),
|
||||
room,
|
||||
id: feed_id,
|
||||
display,
|
||||
},
|
||||
}));
|
||||
}
|
||||
|
||||
fn leave_room(&self) {
|
||||
let (transaction, session_id, handle_id, room, feed_id, display) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
if settings.room_id.is_none() {
|
||||
self.raise_error("Janus Room ID must be set".to_string());
|
||||
return;
|
||||
}
|
||||
|
||||
(
|
||||
state.transaction_id.clone().unwrap(),
|
||||
state.session_id.unwrap(),
|
||||
state.handle_id.unwrap(),
|
||||
settings.room_id.as_ref().unwrap().parse().unwrap(),
|
||||
settings.feed_id,
|
||||
settings.display_name.clone(),
|
||||
)
|
||||
};
|
||||
self.send_blocking(OutgoingMessage::RoomRequest(RoomRequestMsg {
|
||||
janus: "message".to_string(),
|
||||
transaction,
|
||||
session_id,
|
||||
handle_id,
|
||||
body: RoomRequestBody {
|
||||
request: "leave".to_string(),
|
||||
ptype: "publisher".to_string(),
|
||||
room,
|
||||
id: feed_id,
|
||||
display,
|
||||
},
|
||||
}));
|
||||
}
|
||||
|
||||
fn publish(&self, offer: &gst_webrtc::WebRTCSessionDescription) {
|
||||
let (transaction, session_id, handle_id) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
if settings.room_id.is_none() {
|
||||
self.raise_error("Janus Room ID must be set".to_string());
|
||||
return;
|
||||
}
|
||||
|
||||
(
|
||||
state.transaction_id.clone().unwrap(),
|
||||
state.session_id.unwrap(),
|
||||
state.handle_id.unwrap(),
|
||||
)
|
||||
};
|
||||
let sdp_data = offer.sdp().as_text().unwrap();
|
||||
self.send(OutgoingMessage::Publish(PublishMsg {
|
||||
janus: "message".to_string(),
|
||||
transaction,
|
||||
session_id,
|
||||
handle_id,
|
||||
body: PublishBody {
|
||||
request: "publish".to_string(),
|
||||
audio: true,
|
||||
video: true,
|
||||
},
|
||||
jsep: Jsep {
|
||||
sdp: sdp_data,
|
||||
trickle: Some(true),
|
||||
r#type: "offer".to_string(),
|
||||
},
|
||||
}));
|
||||
}
|
||||
|
||||
fn trickle(&self, candidate: &str, sdp_m_line_index: u32) {
|
||||
let (transaction, session_id, handle_id) = {
|
||||
let state = self.state.lock().unwrap();
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
if settings.room_id.is_none() {
|
||||
self.raise_error("Janus Room ID must be set".to_string());
|
||||
return;
|
||||
}
|
||||
|
||||
(
|
||||
state.transaction_id.clone().unwrap(),
|
||||
state.session_id.unwrap(),
|
||||
state.handle_id.unwrap(),
|
||||
)
|
||||
};
|
||||
self.send(OutgoingMessage::Trickle(TrickleMsg {
|
||||
janus: "trickle".to_string(),
|
||||
transaction,
|
||||
session_id,
|
||||
handle_id,
|
||||
candidate: Candidate {
|
||||
candidate: candidate.to_string(),
|
||||
sdp_m_line_index,
|
||||
},
|
||||
}));
|
||||
}
|
||||
|
||||
fn session_requested(&self) {
|
||||
self.obj().emit_by_name::<()>(
|
||||
"session-requested",
|
||||
&[
|
||||
&"unique",
|
||||
&"unique",
|
||||
&None::<gst_webrtc::WebRTCSessionDescription>,
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
fn handle_answer(&self, sdp: String) {
|
||||
match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) {
|
||||
Ok(ans_sdp) => {
|
||||
let answer = gst_webrtc::WebRTCSessionDescription::new(
|
||||
gst_webrtc::WebRTCSDPType::Answer,
|
||||
ans_sdp,
|
||||
);
|
||||
self.obj()
|
||||
.emit_by_name::<()>("session-description", &[&"unique", &answer]);
|
||||
}
|
||||
Err(err) => {
|
||||
self.raise_error(format!("Could not parse answer SDP: {err}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SignallableImpl for Signaller {
|
||||
fn start(&self) {
|
||||
let this = self.obj().clone();
|
||||
let imp = self.downgrade();
|
||||
RUNTIME.spawn(async move {
|
||||
if let Some(imp) = imp.upgrade() {
|
||||
if let Err(err) = imp.connect().await {
|
||||
this.emit_by_name::<()>("error", &[&format!("{:?}", anyhow!(err))]);
|
||||
} else {
|
||||
imp.create_session();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn send_sdp(&self, _session_id: &str, offer: &gst_webrtc::WebRTCSessionDescription) {
|
||||
gst::info!(CAT, imp: self, "sending SDP offer to peer: {:?}", offer.sdp().as_text());
|
||||
|
||||
self.publish(offer);
|
||||
}
|
||||
|
||||
fn add_ice(
|
||||
&self,
|
||||
_session_id: &str,
|
||||
candidate: &str,
|
||||
sdp_m_line_index: u32,
|
||||
_sdp_mid: Option<String>,
|
||||
) {
|
||||
self.trickle(candidate, sdp_m_line_index);
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
gst::info!(CAT, imp: self, "Stopping now");
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
let send_task_handle = state.send_task_handle.take();
|
||||
let recv_task_handle = state.recv_task_handle.take();
|
||||
|
||||
if let Some(mut sender) = state.ws_sender.take() {
|
||||
RUNTIME.block_on(async move {
|
||||
sender.close_channel();
|
||||
|
||||
if let Some(handle) = send_task_handle {
|
||||
if let Err(err) = handle.await {
|
||||
gst::warning!(CAT, imp: self, "Error while joining send task: {}", err);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(handle) = recv_task_handle {
|
||||
// if awaited instead, it hangs the plugin
|
||||
handle.abort();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
state.session_id = None;
|
||||
state.handle_id = None;
|
||||
state.transaction_id = None;
|
||||
}
|
||||
|
||||
fn end_session(&self, _session_id: &str) {
|
||||
self.leave_room();
|
||||
}
|
||||
}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for Signaller {
|
||||
const NAME: &'static str = "GstJanusVRWebRTCSignaller";
|
||||
type Type = super::JanusVRSignaller;
|
||||
type ParentType = glib::Object;
|
||||
type Interfaces = (Signallable,);
|
||||
}
|
||||
|
||||
#[glib::derived_properties]
|
||||
impl ObjectImpl for Signaller {}
|
19
net/webrtc/src/janusvr_signaller/mod.rs
Normal file
19
net/webrtc/src/janusvr_signaller/mod.rs
Normal file
|
@ -0,0 +1,19 @@
|
|||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use crate::signaller::Signallable;
|
||||
use gst::glib;
|
||||
|
||||
mod imp;
|
||||
|
||||
glib::wrapper! {
|
||||
pub struct JanusVRSignaller(ObjectSubclass<imp::Signaller>) @implements Signallable;
|
||||
}
|
||||
|
||||
unsafe impl Send for JanusVRSignaller {}
|
||||
unsafe impl Sync for JanusVRSignaller {}
|
||||
|
||||
impl Default for JanusVRSignaller {
|
||||
fn default() -> Self {
|
||||
glib::Object::new()
|
||||
}
|
||||
}
|
|
@ -15,6 +15,7 @@ use gst::glib::once_cell::sync::Lazy;
|
|||
use tokio::runtime;
|
||||
|
||||
mod aws_kvs_signaller;
|
||||
mod janusvr_signaller;
|
||||
mod livekit_signaller;
|
||||
pub mod signaller;
|
||||
pub mod utils;
|
||||
|
|
|
@ -547,9 +547,7 @@ impl SignallableImpl for Signaller {
|
|||
}
|
||||
|
||||
if let Some(handle) = receive_task_handle {
|
||||
if let Err(err) = handle.await {
|
||||
gst::warning!(CAT, imp: self, "Error while joining receive task: {}", err);
|
||||
}
|
||||
handle.abort();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ use std::sync::{mpsc, Arc, Condvar, Mutex};
|
|||
use super::homegrown_cc::CongestionController;
|
||||
use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode};
|
||||
use crate::aws_kvs_signaller::AwsKvsSignaller;
|
||||
use crate::janusvr_signaller::JanusVRSignaller;
|
||||
use crate::livekit_signaller::LiveKitSignaller;
|
||||
use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole};
|
||||
use crate::whip_signaller::WhipClientSignaller;
|
||||
|
@ -4414,3 +4415,43 @@ impl ObjectSubclass for LiveKitWebRTCSink {
|
|||
type Type = super::LiveKitWebRTCSink;
|
||||
type ParentType = super::BaseWebRTCSink;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct JanusVRWebRTCSink {}
|
||||
|
||||
impl ObjectImpl for JanusVRWebRTCSink {
|
||||
fn constructed(&self) {
|
||||
let element = self.obj();
|
||||
let ws = element.upcast_ref::<super::BaseWebRTCSink>().imp();
|
||||
|
||||
let _ = ws.set_signaller(JanusVRSignaller::default().upcast());
|
||||
}
|
||||
}
|
||||
|
||||
impl GstObjectImpl for JanusVRWebRTCSink {}
|
||||
|
||||
impl ElementImpl for JanusVRWebRTCSink {
|
||||
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
||||
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
||||
gst::subclass::ElementMetadata::new(
|
||||
"JanusVRWebRTCSink",
|
||||
"Sink/Network/WebRTC",
|
||||
"WebRTC sink with Janus Video Room signaller",
|
||||
"Eva Pace <epace@igalia.com>",
|
||||
)
|
||||
});
|
||||
|
||||
Some(&*ELEMENT_METADATA)
|
||||
}
|
||||
}
|
||||
|
||||
impl BinImpl for JanusVRWebRTCSink {}
|
||||
|
||||
impl BaseWebRTCSinkImpl for JanusVRWebRTCSink {}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for JanusVRWebRTCSink {
|
||||
const NAME: &'static str = "GstJanusVRWebRTCSink";
|
||||
type Type = super::JanusVRWebRTCSink;
|
||||
type ParentType = super::BaseWebRTCSink;
|
||||
}
|
||||
|
|
|
@ -60,6 +60,10 @@ glib::wrapper! {
|
|||
pub struct LiveKitWebRTCSink(ObjectSubclass<imp::LiveKitWebRTCSink>) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
|
||||
}
|
||||
|
||||
glib::wrapper! {
|
||||
pub struct JanusVRWebRTCSink(ObjectSubclass<imp::JanusVRWebRTCSink>) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum WebRTCSinkError {
|
||||
#[error("no session with id")]
|
||||
|
@ -146,6 +150,61 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
|||
gst::Rank::NONE,
|
||||
LiveKitWebRTCSink::static_type(),
|
||||
)?;
|
||||
/**
|
||||
* element-janusvrwebrtcsink:
|
||||
*
|
||||
* The `JanusVRWebRTCSink` is a plugin that integrates with the [Video Room plugin](https://janus.conf.meetecho.com/docs/videoroom) of the [Janus Gateway](https://github.com/meetecho/janus-gateway). It basically streams whatever data you pipe to it (video, audio) into WebRTC using Janus as the signaller.
|
||||
*
|
||||
* ## How to use it
|
||||
*
|
||||
* You'll need to have:
|
||||
*
|
||||
* - A Janus server endpoint;
|
||||
* - Any WebRTC browser application that uses Janus as the signaller, eg: the `html` folder of [janus-gateway repository](https://github.com/meetecho/janus-gateway).
|
||||
*
|
||||
* You can pipe the video like this (if you don't happen to run Janus locally, you can set the endpoint
|
||||
* like this: `signaller::janus-endpoint=ws://127.0.0.1:8188`):
|
||||
*
|
||||
* ```bash
|
||||
* $ gst-launch-1.0 videotestsrc ! janusvrwebrtcsink signaller::room-id=1234
|
||||
* ```
|
||||
*
|
||||
* And for audio (yes you can do both at the same time, you just need to pipe it properly).
|
||||
*
|
||||
* ```bash
|
||||
* $ gst-launch-1.0 audiotestsrc ! janusvrwebrtcsink signaller::room-id=1234
|
||||
* ```
|
||||
*
|
||||
* And you can set the display name via `signaller::display-name`, eg:
|
||||
*
|
||||
* ```bash
|
||||
* $ gst-launch-1.0 videotestsrc ! janusvrwebrtcsink signaller::room-id=1234 signaller::display-name=ana
|
||||
* ```
|
||||
*
|
||||
* You should see the GStreamer `videotestsrc`/`audiotestsrc` output in your browser now!
|
||||
*
|
||||
* If for some reason you can't run Janus locally, you can use their open [demo webpage](https://janus.conf.meetecho.com/demos/videoroom.html), and point to its WebSocket server:
|
||||
*
|
||||
* ```bash
|
||||
* $ gst-launch-1.0 videotestsrc ! janusvrwebrtcsink signaller::room-id=1234 signaller::janus-endpoint=wss://janus.conf.meetecho.com/ws
|
||||
* ```
|
||||
*
|
||||
* ## Reference links
|
||||
*
|
||||
* - [Janus REST/WebSockets docs](https://janus.conf.meetecho.com/docs/rest.html)
|
||||
* - [Example implementation in GStreamer](https://gitlab.freedesktop.org/gstreamer/gstreamer/-/blob/269ab858813e670d521cc4b6a71cc0ec4a6e70ed/subprojects/gst-examples/webrtc/janus/rust/src/janus.rs)
|
||||
*
|
||||
* ## Notes
|
||||
*
|
||||
* - This plugin supports both the legacy Video Room plugin as well as the `multistream` one;
|
||||
* - If you see a warning in the logs related to `rtpgccbwe`, you're probably missing the `gst-plugin-rtp` in your system.
|
||||
*/
|
||||
gst::Element::register(
|
||||
Some(plugin),
|
||||
"janusvrwebrtcsink",
|
||||
gst::Rank::NONE,
|
||||
JanusVRWebRTCSink::static_type(),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue