2023-10-16 16:16:52 +00:00
// SPDX-License-Identifier: MPL-2.0
use crate ::signaller ::{ Signallable , SignallableImpl } ;
use crate ::RUNTIME ;
use anyhow ::{ anyhow , Error } ;
use async_tungstenite ::tungstenite ;
use futures ::channel ::mpsc ;
use futures ::sink ::SinkExt ;
use futures ::stream ::StreamExt ;
use gst ::glib ;
use gst ::glib ::Properties ;
use gst ::prelude ::* ;
use gst ::subclass ::prelude ::* ;
2024-01-31 14:57:42 +00:00
use http ::Uri ;
2024-01-31 15:07:56 +00:00
use once_cell ::sync ::Lazy ;
2023-10-16 16:16:52 +00:00
use rand ::prelude ::* ;
use serde ::{ Deserialize , Serialize } ;
use std ::ops ::ControlFlow ;
use std ::sync ::Mutex ;
use std ::time ::Duration ;
use tokio ::{ task , time ::timeout } ;
use tungstenite ::Message as WsMessage ;
static CAT : Lazy < gst ::DebugCategory > = Lazy ::new ( | | {
gst ::DebugCategory ::new (
" webrtc-janusvr-signaller " ,
gst ::DebugColorFlags ::empty ( ) ,
Some ( " WebRTC Janus Video Room signaller " ) ,
)
} ) ;
fn transaction_id ( ) -> String {
thread_rng ( )
. sample_iter ( & rand ::distributions ::Alphanumeric )
. map ( char ::from )
. take ( 30 )
. collect ( )
}
fn feed_id ( ) -> u32 {
thread_rng ( ) . gen ( )
}
2024-02-15 21:42:40 +00:00
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone) ]
#[ serde(untagged) ]
enum RoomId {
Str ( String ) ,
2024-02-28 10:46:23 +00:00
Num ( u64 ) ,
2024-02-15 21:42:40 +00:00
}
2023-10-16 16:16:52 +00:00
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq) ]
struct KeepAliveMsg {
janus : String ,
transaction : String ,
session_id : u64 ,
2024-02-15 21:18:31 +00:00
apisecret : Option < String > ,
2023-10-16 16:16:52 +00:00
}
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq) ]
struct CreateSessionMsg {
janus : String ,
transaction : String ,
2024-02-15 21:18:31 +00:00
apisecret : Option < String > ,
2023-10-16 16:16:52 +00:00
}
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq) ]
struct AttachPluginMsg {
janus : String ,
transaction : String ,
plugin : String ,
session_id : u64 ,
2024-02-15 21:18:31 +00:00
apisecret : Option < String > ,
2023-10-16 16:16:52 +00:00
}
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq) ]
struct RoomRequestBody {
request : String ,
ptype : String ,
2024-02-15 21:42:40 +00:00
room : RoomId ,
id : RoomId ,
2023-10-16 16:16:52 +00:00
#[ serde(skip_serializing_if = " Option::is_none " ) ]
display : Option < String > ,
}
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq) ]
struct RoomRequestMsg {
janus : String ,
transaction : String ,
session_id : u64 ,
handle_id : u64 ,
2024-02-15 21:18:31 +00:00
apisecret : Option < String > ,
2023-10-16 16:16:52 +00:00
body : RoomRequestBody ,
}
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq) ]
struct PublishBody {
request : String ,
}
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq) ]
struct Jsep {
sdp : String ,
trickle : Option < bool > ,
r#type : String ,
}
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq) ]
struct PublishMsg {
janus : String ,
transaction : String ,
session_id : u64 ,
handle_id : u64 ,
2024-02-15 21:18:31 +00:00
apisecret : Option < String > ,
2023-10-16 16:16:52 +00:00
body : PublishBody ,
jsep : Jsep ,
}
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq) ]
struct Candidate {
candidate : String ,
#[ serde(rename = " sdpMLineIndex " ) ]
sdp_m_line_index : u32 ,
}
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq) ]
struct TrickleMsg {
janus : String ,
transaction : String ,
session_id : u64 ,
handle_id : u64 ,
2024-02-15 21:18:31 +00:00
apisecret : Option < String > ,
2023-10-16 16:16:52 +00:00
candidate : Candidate ,
}
#[ derive(Serialize, Deserialize, Debug) ]
#[ serde(untagged) ]
enum OutgoingMessage {
KeepAlive ( KeepAliveMsg ) ,
CreateSession ( CreateSessionMsg ) ,
AttachPlugin ( AttachPluginMsg ) ,
RoomRequest ( RoomRequestMsg ) ,
Publish ( PublishMsg ) ,
Trickle ( TrickleMsg ) ,
}
#[ derive(Serialize, Deserialize, Debug) ]
struct InnerError {
code : i32 ,
reason : String ,
}
#[ derive(Serialize, Deserialize, Debug) ]
struct RoomJoined {
2024-02-15 21:42:40 +00:00
room : Option < RoomId > ,
2023-10-16 16:16:52 +00:00
}
#[ derive(Serialize, Deserialize, Debug) ]
struct RoomEvent {
2024-02-15 21:42:40 +00:00
room : Option < RoomId > ,
2023-10-16 16:16:52 +00:00
error_code : Option < i32 > ,
error : Option < String > ,
}
#[ derive(Serialize, Deserialize, Debug) ]
#[ serde(tag = " videoroom " ) ]
enum VideoRoomData {
#[ serde(rename = " joined " ) ]
Joined ( RoomJoined ) ,
#[ serde(rename = " event " ) ]
Event ( RoomEvent ) ,
}
#[ derive(Serialize, Deserialize, Debug) ]
#[ serde(tag = " plugin " ) ]
enum PluginData {
#[ serde(rename = " janus.plugin.videoroom " ) ]
VideoRoom { data : VideoRoomData } ,
}
#[ derive(Serialize, Deserialize, Debug, PartialEq, Eq) ]
struct DataHolder {
id : u64 ,
}
#[ derive(Serialize, Deserialize, Debug) ]
struct SuccessMsg {
transaction : Option < String > ,
session_id : Option < u64 > ,
data : Option < DataHolder > ,
}
#[ derive(Serialize, Deserialize, Debug) ]
struct EventMsg {
transaction : Option < String > ,
session_id : Option < u64 > ,
plugindata : Option < PluginData > ,
jsep : Option < Jsep > ,
}
// IncomingMessage
#[ derive(Serialize, Deserialize, Debug) ]
#[ serde(tag = " janus " ) ]
enum JsonReply {
#[ serde(rename = " ack " ) ]
Ack ,
#[ serde(rename = " success " ) ]
Success ( SuccessMsg ) ,
#[ serde(rename = " event " ) ]
Event ( EventMsg ) ,
#[ serde(rename = " webrtcup " ) ]
WebRTCUp ,
#[ serde(rename = " media " ) ]
Media ,
#[ serde(rename = " error " ) ]
Error ( InnerError ) ,
}
#[ derive(Default) ]
struct State {
ws_sender : Option < mpsc ::Sender < OutgoingMessage > > ,
send_task_handle : Option < task ::JoinHandle < Result < ( ) , Error > > > ,
recv_task_handle : Option < task ::JoinHandle < ( ) > > ,
session_id : Option < u64 > ,
handle_id : Option < u64 > ,
transaction_id : Option < String > ,
2024-02-15 21:42:40 +00:00
room_id : Option < RoomId > ,
feed_id : Option < RoomId > ,
2023-10-16 16:16:52 +00:00
}
#[ derive(Clone) ]
struct Settings {
janus_endpoint : String ,
room_id : Option < String > ,
2024-02-15 21:42:40 +00:00
feed_id : String ,
2023-10-16 16:16:52 +00:00
display_name : Option < String > ,
2024-02-15 21:18:31 +00:00
secret_key : Option < String > ,
2024-02-16 20:59:59 +00:00
string_ids : bool ,
2023-10-16 16:16:52 +00:00
}
impl Default for Settings {
fn default ( ) -> Self {
Self {
janus_endpoint : " ws://127.0.0.1:8188 " . to_string ( ) ,
room_id : None ,
2024-02-15 21:42:40 +00:00
feed_id : feed_id ( ) . to_string ( ) ,
2023-10-16 16:16:52 +00:00
display_name : None ,
2024-02-15 21:18:31 +00:00
secret_key : None ,
2024-02-16 20:59:59 +00:00
string_ids : false ,
2023-10-16 16:16:52 +00:00
}
}
}
#[ derive(Default, Properties) ]
#[ properties(wrapper_type = super::JanusVRSignaller) ]
pub struct Signaller {
state : Mutex < State > ,
#[ property(name= " janus-endpoint " , get, set, type = String, member = janus_endpoint, blurb = " The Janus server endpoint to POST SDP offer to " ) ]
#[ property(name= " room-id " , get, set, type = String, member = room_id, blurb = " The Janus Room ID that will be joined to " ) ]
2024-02-15 21:42:40 +00:00
#[ property(name= " feed-id " , get, set, type = String, member = feed_id, blurb = " The Janus Feed ID to identify where the track is coming from " ) ]
2023-10-16 16:16:52 +00:00
#[ property(name= " display-name " , get, set, type = String, member = display_name, blurb = " The name of the publisher in the Janus Video Room " ) ]
2024-02-15 21:18:31 +00:00
#[ property(name= " secret-key " , get, set, type = String, member = secret_key, blurb = " The secret API key to communicate with Janus server " ) ]
2024-02-16 20:59:59 +00:00
#[ property(name= " string-ids " , get, set, type = bool, member = string_ids, blurb = " Force passing room-id and feed-id as string even if they can be parsed into an integer " ) ]
2023-10-16 16:16:52 +00:00
settings : Mutex < Settings > ,
}
impl Signaller {
fn raise_error ( & self , msg : String ) {
self . obj ( )
. emit_by_name ::< ( ) > ( " error " , & [ & format! ( " Error: {msg} " ) ] ) ;
}
async fn connect ( & self ) -> Result < ( ) , Error > {
let settings = self . settings . lock ( ) . unwrap ( ) . clone ( ) ;
use tungstenite ::client ::IntoClientRequest ;
let mut request = settings
. janus_endpoint
. parse ::< Uri > ( ) ?
. into_client_request ( ) ? ;
request . headers_mut ( ) . append (
" Sec-WebSocket-Protocol " ,
2024-01-31 14:57:42 +00:00
http ::HeaderValue ::from_static ( " janus-protocol " ) ,
2023-10-16 16:16:52 +00:00
) ;
let ( ws , _ ) = timeout (
// FIXME: Make the timeout configurable
Duration ::from_secs ( 20 ) ,
async_tungstenite ::tokio ::connect_async ( request ) ,
)
. await ? ? ;
// Channel for asynchronously sending out websocket message
let ( mut ws_sink , mut ws_stream ) = ws . split ( ) ;
// 1000 is completely arbitrary, we simply don't want infinite piling
// up of messages as with unbounded
let ( ws_sender , mut ws_receiver ) = mpsc ::channel ::< OutgoingMessage > ( 1000 ) ;
let send_task_handle =
RUNTIME . spawn ( glib ::clone! ( @ weak - allow - none self as this = > async move {
2024-01-21 21:09:41 +00:00
let mut res = Ok ( ( ) ) ;
2023-10-16 16:16:52 +00:00
loop {
tokio ::select! {
opt = ws_receiver . next ( ) = > match opt {
Some ( msg ) = > {
gst ::log! ( CAT , " Sending websocket message {:?} " , msg ) ;
2024-01-21 21:09:41 +00:00
res = ws_sink
2023-10-16 16:16:52 +00:00
. send ( WsMessage ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) )
2024-01-21 21:09:41 +00:00
. await ;
2023-10-16 16:16:52 +00:00
} ,
None = > break ,
} ,
_ = tokio ::time ::sleep ( Duration ::from_secs ( 10 ) ) = > {
if let Some ( ref this ) = this {
2024-02-15 21:18:31 +00:00
let ( transaction , session_id , apisecret ) = {
2023-10-16 16:16:52 +00:00
let state = this . state . lock ( ) . unwrap ( ) ;
2024-02-15 21:18:31 +00:00
let settings = this . settings . lock ( ) . unwrap ( ) ;
(
state . transaction_id . clone ( ) . unwrap ( ) ,
state . session_id . unwrap ( ) ,
settings . secret_key . clone ( ) ,
)
2023-10-16 16:16:52 +00:00
} ;
let msg = OutgoingMessage ::KeepAlive ( KeepAliveMsg {
janus : " keepalive " . to_string ( ) ,
transaction ,
session_id ,
2024-02-15 21:18:31 +00:00
apisecret ,
2023-10-16 16:16:52 +00:00
} ) ;
2024-01-21 21:09:41 +00:00
res = ws_sink
2023-10-16 16:16:52 +00:00
. send ( WsMessage ::Text ( serde_json ::to_string ( & msg ) . unwrap ( ) ) )
2024-01-21 21:09:41 +00:00
. await ;
2023-10-16 16:16:52 +00:00
}
}
}
2024-01-21 21:09:41 +00:00
if let Err ( ref err ) = res {
this . as_ref ( ) . map_or_else ( | | gst ::error! ( CAT , " Quitting send task: {err} " ) ,
| this | gst ::error! ( CAT , imp : this , " Quitting send task: {err} " )
) ;
break ;
}
2023-10-16 16:16:52 +00:00
}
2024-01-21 21:09:41 +00:00
this . map_or_else ( | | gst ::debug! ( CAT , " Done sending " ) ,
| this | gst ::debug! ( CAT , imp : this , " Done sending " )
2023-10-16 16:16:52 +00:00
) ;
2024-01-21 21:09:41 +00:00
let _ = ws_sink . close ( ) . await ;
2023-10-16 16:16:52 +00:00
2024-01-21 21:09:41 +00:00
res . map_err ( Into ::into )
2023-10-16 16:16:52 +00:00
} ) ) ;
let recv_task_handle =
RUNTIME . spawn ( glib ::clone! ( @ weak - allow - none self as this = > async move {
while let Some ( msg ) = tokio_stream ::StreamExt ::next ( & mut ws_stream ) . await {
if let Some ( ref this ) = this {
if let ControlFlow ::Break ( _ ) = this . handle_msg ( msg ) {
break ;
}
} else {
break ;
}
}
let msg = " Stopped websocket receiving " ;
this . map_or_else ( | | gst ::info! ( CAT , " {msg} " ) ,
| this | gst ::info! ( CAT , imp : this , " {msg} " )
) ;
} ) ) ;
let mut state = self . state . lock ( ) . unwrap ( ) ;
state . ws_sender = Some ( ws_sender ) ;
state . send_task_handle = Some ( send_task_handle ) ;
state . recv_task_handle = Some ( recv_task_handle ) ;
Ok ( ( ) )
}
fn handle_msg (
& self ,
msg : Result < WsMessage , async_tungstenite ::tungstenite ::Error > ,
) -> ControlFlow < ( ) > {
match msg {
Ok ( WsMessage ::Text ( msg ) ) = > {
gst ::trace! ( CAT , imp : self , " Received message {} " , msg ) ;
if let Ok ( reply ) = serde_json ::from_str ::< JsonReply > ( & msg ) {
self . handle_reply ( reply ) ;
} else {
gst ::error! ( CAT , imp : self , " Unknown message from server: {} " , msg ) ;
}
}
Ok ( WsMessage ::Close ( reason ) ) = > {
gst ::info! ( CAT , imp : self , " websocket connection closed: {:?} " , reason ) ;
return ControlFlow ::Break ( ( ) ) ;
}
Ok ( _ ) = > ( ) ,
Err ( err ) = > {
self . raise_error ( err . to_string ( ) ) ;
return ControlFlow ::Break ( ( ) ) ;
}
}
ControlFlow ::Continue ( ( ) )
}
fn handle_reply ( & self , reply : JsonReply ) {
match reply {
JsonReply ::WebRTCUp = > {
gst ::trace! ( CAT , imp : self , " WebRTC streaming is working! " ) ;
}
JsonReply ::Success ( success ) = > {
if let Some ( data ) = success . data {
if success . session_id . is_none ( ) {
gst ::trace! ( CAT , imp : self , " Janus session {} was created successfully " , data . id ) ;
self . set_session_id ( data . id ) ;
self . attach_plugin ( ) ;
} else {
gst ::trace! ( CAT , imp : self , " Attached to Janus Video Room plugin successfully, handle: {} " , data . id ) ;
self . set_handle_id ( data . id ) ;
self . join_room ( ) ;
}
}
}
JsonReply ::Event ( event ) = > {
if let Some ( PluginData ::VideoRoom { data : plugindata } ) = event . plugindata {
match plugindata {
VideoRoomData ::Joined ( joined ) = > {
if let Some ( room ) = joined . room {
2024-02-15 21:42:40 +00:00
gst ::trace! ( CAT , imp : self , " Joined room {room:?} successfully " ) ;
2023-10-16 16:16:52 +00:00
self . session_requested ( ) ;
}
}
VideoRoomData ::Event ( room_event ) = > {
if room_event . error_code . is_some ( ) & & room_event . error . is_some ( ) {
self . raise_error ( format! (
" code: {}, reason: {} " ,
room_event . error_code . unwrap ( ) ,
room_event . error . unwrap ( ) ,
) ) ;
return ;
}
if let Some ( jsep ) = event . jsep {
if jsep . r#type = = " answer " {
gst ::trace! ( CAT , imp : self , " Session requested successfully " ) ;
self . handle_answer ( jsep . sdp ) ;
}
}
}
}
}
}
JsonReply ::Error ( error ) = > {
self . raise_error ( format! ( " code: {} , reason: {} " , error . code , error . reason ) )
}
// ignore for now
JsonReply ::Ack | JsonReply ::Media = > { }
}
}
fn send ( & self , msg : OutgoingMessage ) {
let state = self . state . lock ( ) . unwrap ( ) ;
if let Some ( mut sender ) = state . ws_sender . clone ( ) {
RUNTIME . spawn ( glib ::clone! ( @ weak self as this = > async move {
if let Err ( err ) = sender . send ( msg ) . await {
this . raise_error ( err . to_string ( ) ) ;
}
} ) ) ;
}
}
// Only used at the end when cleaning up the resources.
// So that `SignallableImpl::stop` waits the last message
// to be sent properly.
fn send_blocking ( & self , msg : OutgoingMessage ) {
let state = self . state . lock ( ) . unwrap ( ) ;
if let Some ( mut sender ) = state . ws_sender . clone ( ) {
RUNTIME . block_on ( glib ::clone! ( @ weak self as this = > async move {
if let Err ( err ) = sender . send ( msg ) . await {
this . raise_error ( err . to_string ( ) ) ;
}
} ) ) ;
}
}
fn set_transaction_id ( & self , transaction : String ) {
self . state . lock ( ) . unwrap ( ) . transaction_id = Some ( transaction ) ;
}
fn create_session ( & self ) {
let transaction = transaction_id ( ) ;
self . set_transaction_id ( transaction . clone ( ) ) ;
2024-02-15 21:18:31 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
let apisecret = settings . secret_key . clone ( ) ;
2023-10-16 16:16:52 +00:00
self . send ( OutgoingMessage ::CreateSession ( CreateSessionMsg {
janus : " create " . to_string ( ) ,
transaction ,
2024-02-15 21:18:31 +00:00
apisecret ,
2023-10-16 16:16:52 +00:00
} ) ) ;
}
fn set_session_id ( & self , session_id : u64 ) {
self . state . lock ( ) . unwrap ( ) . session_id = Some ( session_id ) ;
}
fn set_handle_id ( & self , handle_id : u64 ) {
self . state . lock ( ) . unwrap ( ) . handle_id = Some ( handle_id ) ;
}
fn attach_plugin ( & self ) {
2024-02-15 21:18:31 +00:00
let ( transaction , session_id , apisecret ) = {
2023-10-16 16:16:52 +00:00
let state = self . state . lock ( ) . unwrap ( ) ;
2024-02-15 21:18:31 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
2023-10-16 16:16:52 +00:00
(
state . transaction_id . clone ( ) . unwrap ( ) ,
state . session_id . unwrap ( ) ,
2024-02-15 21:18:31 +00:00
settings . secret_key . clone ( ) ,
2023-10-16 16:16:52 +00:00
)
} ;
self . send ( OutgoingMessage ::AttachPlugin ( AttachPluginMsg {
janus : " attach " . to_string ( ) ,
transaction ,
plugin : " janus.plugin.videoroom " . to_string ( ) ,
session_id ,
2024-02-15 21:18:31 +00:00
apisecret ,
2023-10-16 16:16:52 +00:00
} ) ) ;
}
fn join_room ( & self ) {
2024-02-15 21:18:31 +00:00
let ( transaction , session_id , handle_id , room , feed_id , display , apisecret ) = {
2024-02-15 21:42:40 +00:00
let mut state = self . state . lock ( ) . unwrap ( ) ;
2023-10-16 16:16:52 +00:00
let settings = self . settings . lock ( ) . unwrap ( ) ;
if settings . room_id . is_none ( ) {
self . raise_error ( " Janus Room ID must be set " . to_string ( ) ) ;
return ;
}
2024-02-15 21:42:40 +00:00
/* room_id and feed_id can be either a string or integer depending
* on server configuration . The property is always a string , if we
2024-02-16 20:59:59 +00:00
* can parse it to integer then assume that ' s what the server expects ,
* unless string - ids = true is set to force usage of strings .
2024-02-15 21:42:40 +00:00
* Save parsed value in state to not have to parse it again for future
* API calls .
* /
2024-02-16 20:59:59 +00:00
if settings . string_ids {
state . room_id = Some ( RoomId ::Str ( settings . room_id . clone ( ) . unwrap ( ) ) ) ;
state . feed_id = Some ( RoomId ::Str ( settings . feed_id . clone ( ) ) ) ;
} else {
let room_id_str = settings . room_id . as_ref ( ) . unwrap ( ) ;
2024-02-28 10:46:23 +00:00
match room_id_str . parse ( ) {
2024-02-16 20:59:59 +00:00
Ok ( n ) = > {
state . room_id = Some ( RoomId ::Num ( n ) ) ;
state . feed_id = Some ( RoomId ::Num ( settings . feed_id . parse ( ) . unwrap ( ) ) ) ;
}
Err ( _ ) = > {
state . room_id = Some ( RoomId ::Str ( room_id_str . clone ( ) ) ) ;
state . feed_id = Some ( RoomId ::Str ( settings . feed_id . clone ( ) ) ) ;
}
} ;
}
2024-02-15 21:42:40 +00:00
2023-10-16 16:16:52 +00:00
(
state . transaction_id . clone ( ) . unwrap ( ) ,
state . session_id . unwrap ( ) ,
state . handle_id . unwrap ( ) ,
2024-02-15 21:42:40 +00:00
state . room_id . clone ( ) . unwrap ( ) ,
state . feed_id . clone ( ) . unwrap ( ) ,
2023-10-16 16:16:52 +00:00
settings . display_name . clone ( ) ,
2024-02-15 21:18:31 +00:00
settings . secret_key . clone ( ) ,
2023-10-16 16:16:52 +00:00
)
} ;
self . send ( OutgoingMessage ::RoomRequest ( RoomRequestMsg {
janus : " message " . to_string ( ) ,
transaction ,
session_id ,
handle_id ,
2024-02-15 21:18:31 +00:00
apisecret ,
2023-10-16 16:16:52 +00:00
body : RoomRequestBody {
request : " join " . to_string ( ) ,
ptype : " publisher " . to_string ( ) ,
room ,
id : feed_id ,
display ,
} ,
} ) ) ;
}
fn leave_room ( & self ) {
2024-02-15 21:18:31 +00:00
let ( transaction , session_id , handle_id , room , feed_id , display , apisecret ) = {
2023-10-16 16:16:52 +00:00
let state = self . state . lock ( ) . unwrap ( ) ;
let settings = self . settings . lock ( ) . unwrap ( ) ;
if settings . room_id . is_none ( ) {
self . raise_error ( " Janus Room ID must be set " . to_string ( ) ) ;
return ;
}
(
state . transaction_id . clone ( ) . unwrap ( ) ,
state . session_id . unwrap ( ) ,
state . handle_id . unwrap ( ) ,
2024-02-15 21:42:40 +00:00
state . room_id . clone ( ) . unwrap ( ) ,
state . feed_id . clone ( ) . unwrap ( ) ,
2023-10-16 16:16:52 +00:00
settings . display_name . clone ( ) ,
2024-02-15 21:18:31 +00:00
settings . secret_key . clone ( ) ,
2023-10-16 16:16:52 +00:00
)
} ;
self . send_blocking ( OutgoingMessage ::RoomRequest ( RoomRequestMsg {
janus : " message " . to_string ( ) ,
transaction ,
session_id ,
handle_id ,
2024-02-15 21:18:31 +00:00
apisecret ,
2023-10-16 16:16:52 +00:00
body : RoomRequestBody {
request : " leave " . to_string ( ) ,
ptype : " publisher " . to_string ( ) ,
room ,
id : feed_id ,
display ,
} ,
} ) ) ;
}
fn publish ( & self , offer : & gst_webrtc ::WebRTCSessionDescription ) {
2024-02-15 21:18:31 +00:00
let ( transaction , session_id , handle_id , apisecret ) = {
2023-10-16 16:16:52 +00:00
let state = self . state . lock ( ) . unwrap ( ) ;
let settings = self . settings . lock ( ) . unwrap ( ) ;
if settings . room_id . is_none ( ) {
self . raise_error ( " Janus Room ID must be set " . to_string ( ) ) ;
return ;
}
(
state . transaction_id . clone ( ) . unwrap ( ) ,
state . session_id . unwrap ( ) ,
state . handle_id . unwrap ( ) ,
2024-02-15 21:18:31 +00:00
settings . secret_key . clone ( ) ,
2023-10-16 16:16:52 +00:00
)
} ;
let sdp_data = offer . sdp ( ) . as_text ( ) . unwrap ( ) ;
self . send ( OutgoingMessage ::Publish ( PublishMsg {
janus : " message " . to_string ( ) ,
transaction ,
session_id ,
handle_id ,
2024-02-15 21:18:31 +00:00
apisecret ,
2023-10-16 16:16:52 +00:00
body : PublishBody {
request : " publish " . to_string ( ) ,
} ,
jsep : Jsep {
sdp : sdp_data ,
trickle : Some ( true ) ,
r#type : " offer " . to_string ( ) ,
} ,
} ) ) ;
}
fn trickle ( & self , candidate : & str , sdp_m_line_index : u32 ) {
2024-02-15 21:18:31 +00:00
let ( transaction , session_id , handle_id , apisecret ) = {
2023-10-16 16:16:52 +00:00
let state = self . state . lock ( ) . unwrap ( ) ;
let settings = self . settings . lock ( ) . unwrap ( ) ;
if settings . room_id . is_none ( ) {
self . raise_error ( " Janus Room ID must be set " . to_string ( ) ) ;
return ;
}
(
state . transaction_id . clone ( ) . unwrap ( ) ,
state . session_id . unwrap ( ) ,
state . handle_id . unwrap ( ) ,
2024-02-15 21:18:31 +00:00
settings . secret_key . clone ( ) ,
2023-10-16 16:16:52 +00:00
)
} ;
self . send ( OutgoingMessage ::Trickle ( TrickleMsg {
janus : " trickle " . to_string ( ) ,
transaction ,
session_id ,
handle_id ,
2024-02-15 21:18:31 +00:00
apisecret ,
2023-10-16 16:16:52 +00:00
candidate : Candidate {
candidate : candidate . to_string ( ) ,
sdp_m_line_index ,
} ,
} ) ) ;
}
fn session_requested ( & self ) {
self . obj ( ) . emit_by_name ::< ( ) > (
" session-requested " ,
& [
& " unique " ,
& " unique " ,
& None ::< gst_webrtc ::WebRTCSessionDescription > ,
] ,
) ;
}
fn handle_answer ( & self , sdp : String ) {
match gst_sdp ::SDPMessage ::parse_buffer ( sdp . as_bytes ( ) ) {
Ok ( ans_sdp ) = > {
let answer = gst_webrtc ::WebRTCSessionDescription ::new (
gst_webrtc ::WebRTCSDPType ::Answer ,
ans_sdp ,
) ;
self . obj ( )
. emit_by_name ::< ( ) > ( " session-description " , & [ & " unique " , & answer ] ) ;
}
Err ( err ) = > {
self . raise_error ( format! ( " Could not parse answer SDP: {err} " ) ) ;
}
}
}
}
impl SignallableImpl for Signaller {
fn start ( & self ) {
let this = self . obj ( ) . clone ( ) ;
let imp = self . downgrade ( ) ;
RUNTIME . spawn ( async move {
if let Some ( imp ) = imp . upgrade ( ) {
if let Err ( err ) = imp . connect ( ) . await {
this . emit_by_name ::< ( ) > ( " error " , & [ & format! ( " {:?} " , anyhow! ( err ) ) ] ) ;
} else {
imp . create_session ( ) ;
}
}
} ) ;
}
fn send_sdp ( & self , _session_id : & str , offer : & gst_webrtc ::WebRTCSessionDescription ) {
gst ::info! ( CAT , imp : self , " sending SDP offer to peer: {:?} " , offer . sdp ( ) . as_text ( ) ) ;
self . publish ( offer ) ;
}
fn add_ice (
& self ,
_session_id : & str ,
candidate : & str ,
sdp_m_line_index : u32 ,
_sdp_mid : Option < String > ,
) {
self . trickle ( candidate , sdp_m_line_index ) ;
}
fn stop ( & self ) {
gst ::info! ( CAT , imp : self , " Stopping now " ) ;
let mut state = self . state . lock ( ) . unwrap ( ) ;
let send_task_handle = state . send_task_handle . take ( ) ;
let recv_task_handle = state . recv_task_handle . take ( ) ;
if let Some ( mut sender ) = state . ws_sender . take ( ) {
RUNTIME . block_on ( async move {
sender . close_channel ( ) ;
if let Some ( handle ) = send_task_handle {
if let Err ( err ) = handle . await {
gst ::warning! ( CAT , imp : self , " Error while joining send task: {} " , err ) ;
}
}
if let Some ( handle ) = recv_task_handle {
// if awaited instead, it hangs the plugin
handle . abort ( ) ;
}
} ) ;
}
state . session_id = None ;
state . handle_id = None ;
state . transaction_id = None ;
}
fn end_session ( & self , _session_id : & str ) {
self . leave_room ( ) ;
}
}
#[ glib::object_subclass ]
impl ObjectSubclass for Signaller {
const NAME : & 'static str = " GstJanusVRWebRTCSignaller " ;
type Type = super ::JanusVRSignaller ;
type ParentType = glib ::Object ;
type Interfaces = ( Signallable , ) ;
}
#[ glib::derived_properties ]
impl ObjectImpl for Signaller { }