diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 0ba172886..cbeebe18a 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -13253,6 +13253,65 @@ }, "rank": "none" }, + "janusvrwebrtcsrc": { + "author": "Eva Pace ", + "description": "WebRTC source with Janus Video Room signaller", + "hierarchy": [ + "GstJanusVRWebRTCSrc", + "GstBaseWebRTCSrc", + "GstBin", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "interfaces": [ + "GstChildProxy", + "GstURIHandler" + ], + "klass": "Source/Network/WebRTC", + "pad-templates": { + "audio_%%s_%%u": { + "caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + }, + "video_%%s_%%u": { + "caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstWebRTCSrcPad" + } + }, + "properties": { + "janus-state": { + "blurb": "The current state of the signaller", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "initialized (0)", + "mutable": "null", + "readable": true, + "type": "GstJanusVRWebRTCJanusState", + "writable": false + }, + "use-string-ids": { + "blurb": "Use strings instead of u64 for Janus IDs, see strings_ids config option in janus.plugin.videoroom.jcfg", + "conditionally-available": false, + "construct": false, + "construct-only": true, + "controllable": false, + "default": "false", + "mutable": "null", + "readable": true, + "type": "gboolean", + "writable": true + } + }, + "rank": "none" + }, "livekitwebrtcsink": { "author": "Olivier CrĂȘte ", "description": "WebRTC sink with LiveKit signaller", diff --git a/net/webrtc/src/janusvr_signaller/imp.rs b/net/webrtc/src/janusvr_signaller/imp.rs index 22e37f8ec..9faf11d25 100644 --- a/net/webrtc/src/janusvr_signaller/imp.rs +++ b/net/webrtc/src/janusvr_signaller/imp.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: MPL-2.0 use crate::{ - signaller::{Signallable, SignallableImpl}, + signaller::{Signallable, SignallableImpl, WebRTCSignallerRole}, webrtcsink::JanusVRSignallerState, RUNTIME, }; @@ -143,6 +143,12 @@ enum MessageBody { Join(Join), Publish, Leave, + Start, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct SubscribeStream { + feed: JanusId, } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] @@ -156,6 +162,13 @@ enum Join { #[serde(skip_serializing_if = "Option::is_none")] display: Option, }, + Subscriber { + room: JanusId, + streams: Vec, + use_msid: bool, + #[serde(skip_serializing_if = "Option::is_none")] + private_id: Option, + }, } #[derive(Serialize, Deserialize, Debug)] @@ -190,6 +203,10 @@ enum VideoRoomData { #[serde(rename = "current-bitrate")] current_bitrate: u32, }, + #[serde(rename = "attached")] + Attached { + streams: Vec, + }, } #[derive(Serialize, Deserialize, Debug)] @@ -199,6 +216,11 @@ enum PluginData { VideoRoom { data: VideoRoomData }, } +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +struct ConsumerStream { + feed_id: JanusId, +} + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] struct DataHolder { id: u64, @@ -258,23 +280,29 @@ struct State { // - self.state // - self.settings -#[derive(Clone)] +#[derive(Clone, Debug)] struct Settings { janus_endpoint: String, room_id: Option, feed_id: Option, - display_name: Option, secret_key: Option, + role: WebRTCSignallerRole, + // Producer only + display_name: Option, + // Consumer only + producer_peer_id: Option, } impl Default for Settings { fn default() -> Self { Self { janus_endpoint: "ws://127.0.0.1:8188".to_string(), - display_name: None, room_id: None, feed_id: None, secret_key: None, + role: WebRTCSignallerRole::default(), + display_name: None, + producer_peer_id: None, } } } @@ -285,8 +313,10 @@ pub struct Signaller { state: Mutex, #[property(name="manual-sdp-munging", default = false, get = |_| false, type = bool, blurb = "Whether the signaller manages SDP munging itself")] #[property(name="janus-endpoint", get, set, type = String, member = janus_endpoint, blurb = "The Janus server endpoint to POST SDP offer to")] - #[property(name="display-name", get, set, type = String, member = display_name, blurb = "The name of the publisher in the Janus Video Room")] #[property(name="secret-key", get, set, type = String, member = secret_key, blurb = "The secret API key to communicate with Janus server")] + #[property(name="role", get, set, type = WebRTCSignallerRole, member = role, blurb = "Whether this signaller acts as either a Consumer or Producer. Listener is not currently supported.", builder(WebRTCSignallerRole::default()))] + // Producer only + #[property(name="display-name", get, set, type = String, member = display_name, blurb = "When in Producer role, the name of the publisher in the Janus Video Room.")] // Properties whose type depends of the Janus ID format (u64 or string) are implemented in Signaller subclasses settings: Mutex, } @@ -341,11 +371,16 @@ impl Signaller { _ = tokio::time::sleep(Duration::from_secs(10)) => { let (session_id, apisecret) = { let state = this.state.lock().unwrap(); - let settings = this.settings.lock().unwrap(); - ( - state.session_id.unwrap(), - settings.secret_key.clone(), - ) + + let session_id = if let Some(s) = state.session_id { + s + } else { + // session_id is set to None when the plugin is dying + break + }; + + (session_id, + settings.secret_key.clone()) }; let msg = OutgoingMessage::KeepAlive{ transaction: transaction_id(), @@ -422,6 +457,11 @@ impl Signaller { } fn handle_reply(&self, reply: JsonReply) { + let role = { + let settings = self.settings.lock().unwrap(); + settings.role + }; + match reply { JsonReply::WebRTCUp => { gst::trace!(CAT, imp = self, "WebRTC streaming is working!"); @@ -449,8 +489,18 @@ impl Signaller { "Attached to Janus Video Room plugin successfully, handle: {}", data.id ); + self.set_handle_id(data.id); - self.join_room(); + + match role { + WebRTCSignallerRole::Consumer => { + self.join_room_subscriber(); + } + WebRTCSignallerRole::Producer => { + self.join_room_publisher(); + } + WebRTCSignallerRole::Listener => unreachable!(), + } } } } @@ -459,51 +509,74 @@ impl Signaller { } => { if let Some(PluginData::VideoRoom { data: plugindata }) = plugindata { match plugindata { - VideoRoomData::Joined { room, id } => { - let feed_id_changed = { - let mut feed_id_changed = false; - let mut state = self.state.lock().unwrap(); - { - let mut settings = self.settings.lock().unwrap(); - if settings.feed_id.as_ref() != Some(&id) { - settings.feed_id = Some(id.clone()); - feed_id_changed = true; + VideoRoomData::Joined { room, id } => match role { + WebRTCSignallerRole::Consumer => {} + WebRTCSignallerRole::Producer => { + gst::info!( + CAT, + imp = self, + "Joined room {room}, publisher id: {id}", + ); + + let feed_id_changed = { + let mut feed_id_changed = false; + let mut state = self.state.lock().unwrap(); + { + let mut settings = self.settings.lock().unwrap(); + if settings.feed_id.as_ref() != Some(&id) { + settings.feed_id = Some(id.clone()); + feed_id_changed = true; + } } + + state.feed_id = Some(id); + + feed_id_changed + }; + + if feed_id_changed { + self.obj().notify("feed-id"); } - state.feed_id = Some(id); + self.obj().emit_by_name::<()>( + "state-updated", + &[&JanusVRSignallerState::RoomJoined], + ); - feed_id_changed - }; - - if feed_id_changed { - self.obj().notify("feed-id"); + self.session_requested(); } - - gst::trace!(CAT, imp = self, "Joined room {room:?} successfully",); - - self.obj().emit_by_name::<()>( - "state-updated", - &[&JanusVRSignallerState::RoomJoined], - ); - - self.session_requested(); - } + WebRTCSignallerRole::Listener => unimplemented!(), + }, VideoRoomData::Event { error, error_code, .. } => { - if error_code.is_some() && error.is_some() { - self.raise_error(format!( - "code: {}, reason: {}", - error_code.unwrap(), - error.unwrap(), - )); + if let (Some(error_code), Some(error)) = (error_code, error) { + self.raise_error(format!("code: {error_code}, reason: {error}",)); return; } - if let Some(Jsep::Answer { sdp, .. }) = jsep { - gst::trace!(CAT, imp = self, "Session requested successfully"); - self.handle_answer(sdp); + match role { + WebRTCSignallerRole::Consumer => {} + WebRTCSignallerRole::Producer => { + // publish stream and handle answer + if let Some(Jsep::Answer { sdp, .. }) = jsep { + gst::trace!( + CAT, + imp = self, + "Session requested successfully" + ); + self.handle_answer(&sdp); + } + } + WebRTCSignallerRole::Listener => unimplemented!(), + } + } + VideoRoomData::Attached { .. } => { + assert_eq!(role, WebRTCSignallerRole::Consumer); + + if let Some(Jsep::Offer { sdp, .. }) = jsep { + gst::trace!(CAT, imp = self, "Offer received!"); + self.handle_offer(sdp); } } VideoRoomData::Destroyed { room } => { @@ -598,7 +671,7 @@ impl Signaller { }); } - fn join_room(&self) { + fn join_room_publisher(&self) { let (session_id, handle_id, room, feed_id, display, apisecret) = { let mut state = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap(); @@ -633,6 +706,49 @@ impl Signaller { }); } + fn join_room_subscriber(&self) { + let (session_id, handle_id, room, producer_peer_id, apisecret) = { + let state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if settings.room_id.is_none() { + self.raise_error("Janus Room ID must be set".to_string()); + return; + } + + ( + state.session_id.unwrap(), + state.handle_id.unwrap(), + settings.room_id.as_ref().unwrap().clone(), + settings.producer_peer_id.as_ref().unwrap().clone(), + settings.secret_key.clone(), + ) + }; + + gst::debug!(CAT, imp = self, "subscribing to feed {producer_peer_id}"); + + let producer_peer_id_str = producer_peer_id.to_string(); + + self.send(OutgoingMessage::Message { + transaction: transaction_id(), + session_id, + handle_id, + apisecret, + body: MessageBody::Join(Join::Subscriber { + room, + streams: vec![SubscribeStream { + feed: producer_peer_id, + }], + use_msid: false, + private_id: None, + }), + jsep: None, + }); + + self.obj() + .emit_by_name::<()>("session-started", &[&"unique", &producer_peer_id_str]); + } + fn leave_room(&self) { let mut state = self.state.lock().unwrap(); let (session_id, handle_id, apisecret) = { @@ -745,7 +861,7 @@ impl Signaller { ); } - fn handle_answer(&self, sdp: String) { + fn handle_answer(&self, sdp: &str) { match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) { Ok(ans_sdp) => { let answer = gst_webrtc::WebRTCSessionDescription::new( @@ -761,6 +877,48 @@ impl Signaller { } } + fn handle_offer(&self, sdp: String) { + match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) { + Ok(offer_sdp) => { + let offer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Offer, + offer_sdp, + ); + self.obj() + .emit_by_name::<()>("session-description", &[&"unique", &offer]); + } + Err(err) => { + self.raise_error(format!("Could not parse answer SDP: {err}")); + } + } + + self.obj() + .emit_by_name::<()>("state-updated", &[&JanusVRSignallerState::Negotiating]); + } + + fn send_start(&self, sdp: &gst_webrtc::WebRTCSessionDescription) { + let (session_id, handle_id, apisecret) = { + let state = self.state.lock().unwrap(); + + let settings = self.settings.lock().unwrap(); + ( + state.session_id.unwrap(), + state.handle_id.unwrap(), + settings.secret_key.clone(), + ) + }; + + let sdp = sdp.sdp().as_text().unwrap(); + self.send(OutgoingMessage::Message { + transaction: transaction_id(), + session_id, + handle_id, + apisecret, + jsep: Some(Jsep::Answer { sdp, trickle: None }), + body: MessageBody::Start, + }); + } + fn emit_talking(&self, talking: bool, id: JanusId, audio_level: f32) { let obj = self.obj(); (obj.class().as_ref().emit_talking)(&obj, talking, id, audio_level) @@ -769,6 +927,16 @@ impl Signaller { impl SignallableImpl for Signaller { fn start(&self) { + { + let settings = self.settings.lock().unwrap(); + + if let (WebRTCSignallerRole::Consumer, None) = + (&settings.role, &settings.producer_peer_id) + { + panic!("producer-peer-id should be set in Consumer role"); + } + } + let this = self.obj().clone(); let imp = self.downgrade(); RUNTIME.spawn(async move { @@ -789,8 +957,32 @@ impl SignallableImpl for Signaller { "sending SDP offer to peer: {:?}", offer.sdp().as_text() ); + let role = { + let settings = self.settings.lock().unwrap(); + settings.role + }; - self.publish(offer); + match role { + WebRTCSignallerRole::Producer => { + gst::log!( + CAT, + imp = self, + "sending SDP offer to peer: {:?}", + offer.sdp().as_text() + ); + self.publish(offer) + } + WebRTCSignallerRole::Consumer => { + gst::log!( + CAT, + imp = self, + "sending SDP answer to peer: {:?}", + offer.sdp().as_text() + ); + self.send_start(offer) + } + WebRTCSignallerRole::Listener => { /*nothing yet*/ } + } } fn add_ice( @@ -886,6 +1078,8 @@ pub mod signaller_u64 { pub struct SignallerU64 { #[property(name="room-id", get, set, type = u64, get = Self::get_room_id, set = Self::set_room_id, blurb = "The Janus Room ID that will be joined to")] #[property(name="feed-id", get, set, type = u64, get = Self::get_feed_id, set = Self::set_feed_id, blurb = "The Janus Feed ID to identify where the track is coming from")] + // Consumer only + #[property(name="producer-peer-id", get, set, type = u64, get = Self::get_producer_peer_id, set = Self::set_producer_peer_id, blurb = "The producer feed ID the signaller should subscribe to. Only used in Consumer mode.")] /// Properties macro does not work with empty struct: https://github.com/gtk-rs/gtk-rs-core/issues/1110 _unused: bool, } @@ -966,6 +1160,26 @@ pub mod signaller_u64 { settings.feed_id = Some(JanusId::Num(id)); } + + fn get_producer_peer_id(&self) -> u64 { + let obj = self.obj(); + let signaller = obj.upcast_ref::().imp(); + let settings = signaller.settings.lock().unwrap(); + + settings + .producer_peer_id + .as_ref() + .map(|id| id.as_num()) + .unwrap_or_default() + } + + fn set_producer_peer_id(&self, id: u64) { + let obj = self.obj(); + let signaller = obj.upcast_ref::().imp(); + let mut settings = signaller.settings.lock().unwrap(); + + settings.producer_peer_id = Some(JanusId::Num(id)); + } } } @@ -977,6 +1191,8 @@ pub mod signaller_str { pub struct SignallerStr { #[property(name="room-id", get, set, type = String, get = Self::get_room_id, set = Self::set_room_id, blurb = "The Janus Room ID that will be joined to")] #[property(name="feed-id", get, set, type = String, get = Self::get_feed_id, set = Self::set_feed_id, blurb = "The Janus Feed ID to identify where the track is coming from")] + // Consumer only + #[property(name="producer-peer-id", get, set, type = String, get = Self::get_producer_peer_id, set = Self::set_producer_peer_id, blurb = "The producer feed ID the signaller should subscribe to. Only used in Consumer mode.")] /// Properties macro does not work with empty struct: https://github.com/gtk-rs/gtk-rs-core/issues/1110 _unused: bool, } @@ -1057,5 +1273,25 @@ pub mod signaller_str { settings.feed_id = Some(JanusId::Str(id)); } + + fn get_producer_peer_id(&self) -> String { + let obj = self.obj(); + let signaller = obj.upcast_ref::().imp(); + let settings = signaller.settings.lock().unwrap(); + + settings + .producer_peer_id + .as_ref() + .map(|id| id.as_string()) + .unwrap_or_default() + } + + fn set_producer_peer_id(&self, id: String) { + let obj = self.obj(); + let signaller = obj.upcast_ref::().imp(); + let mut settings = signaller.settings.lock().unwrap(); + + settings.producer_peer_id = Some(JanusId::Str(id)); + } } } diff --git a/net/webrtc/src/janusvr_signaller/mod.rs b/net/webrtc/src/janusvr_signaller/mod.rs index f4bd28d31..562d762d4 100644 --- a/net/webrtc/src/janusvr_signaller/mod.rs +++ b/net/webrtc/src/janusvr_signaller/mod.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MPL-2.0 -use crate::signaller::Signallable; +use crate::signaller::{Signallable, WebRTCSignallerRole}; use gst::{glib, glib::prelude::*, glib::subclass::prelude::*}; mod imp; @@ -58,9 +58,9 @@ glib::wrapper! { pub struct JanusVRSignallerU64(ObjectSubclass) @extends JanusVRSignaller, @implements Signallable; } -impl Default for JanusVRSignallerU64 { - fn default() -> Self { - glib::Object::new() +impl JanusVRSignallerU64 { + pub fn new(role: WebRTCSignallerRole) -> Self { + glib::Object::builder().property("role", role).build() } } @@ -69,8 +69,8 @@ glib::wrapper! { pub struct JanusVRSignallerStr(ObjectSubclass) @extends JanusVRSignaller, @implements Signallable; } -impl Default for JanusVRSignallerStr { - fn default() -> Self { - glib::Object::new() +impl JanusVRSignallerStr { + pub fn new(role: WebRTCSignallerRole) -> Self { + glib::Object::builder().property("role", role).build() } } diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index d5dc1604a..0a81daa7a 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -6030,9 +6030,9 @@ pub(super) mod janus { .imp(); let signaller: Signallable = if settings.use_string_ids { - JanusVRSignallerStr::default().upcast() + JanusVRSignallerStr::new(WebRTCSignallerRole::Producer).upcast() } else { - JanusVRSignallerU64::default().upcast() + JanusVRSignallerU64::new(WebRTCSignallerRole::Producer).upcast() }; let self_weak = self.downgrade(); diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index a4502a448..0a84212f5 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -2174,3 +2174,244 @@ pub(super) mod livekit { impl GhostPadImpl for LiveKitWebRTCSrcPad {} impl WebRTCSrcPadImpl for LiveKitWebRTCSrcPad {} } + +#[cfg(feature = "janus")] +pub(super) mod janus { + use super::*; + use crate::{ + janusvr_signaller::{JanusVRSignallerStr, JanusVRSignallerU64}, + webrtcsink::JanusVRSignallerState, + webrtcsrc::WebRTCSignallerRole, + }; + + static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "janusvrwebrtcsrc", + gst::DebugColorFlags::empty(), + Some("WebRTC Janus Video Room src"), + ) + }); + + #[derive(Debug, Clone, Default)] + struct JanusSettings { + use_string_ids: bool, + } + + #[derive(Debug, Default)] + struct JanusState { + janus_state: JanusVRSignallerState, + } + + #[derive(Default, glib::Properties)] + #[properties(wrapper_type = crate::webrtcsrc::JanusVRWebRTCSrc)] + pub struct JanusVRWebRTCSrc { + /** + * GstJanusVRWebRTCSrc:use-string-ids: + * + * By default Janus uses `u64` ids to identify the room, the feed, etc. + * But it can be changed to strings using the `strings_ids` option in `janus.plugin.videoroom.jcfg`. + * In such case, `janusvrwebrtcsrc` has to be created using `use-string-ids=true` so its signaller + * uses the right types for such ids and properties. + * + * Since: plugins-rs-0.14.0 + */ + #[property(name="use-string-ids", get, construct_only, type = bool, member = use_string_ids, blurb = "Use strings instead of u64 for Janus IDs, see strings_ids config option in janus.plugin.videoroom.jcfg")] + settings: Mutex, + /** + * GstJanusVRWebRTCSrc:janus-state: + * + * The current state of the signaller. + * Since: plugins-rs-0.14.0 + */ + #[property( + name = "janus-state", + get, + member = janus_state, + type = JanusVRSignallerState, + blurb = "The current state of the signaller", + builder(JanusVRSignallerState::Initialized) + )] + state: Mutex, + } + + #[glib::derived_properties] + impl ObjectImpl for JanusVRWebRTCSrc { + fn constructed(&self) { + self.parent_constructed(); + + let settings = self.settings.lock().unwrap(); + let element = self.obj(); + let ws = element + .upcast_ref::() + .imp(); + + let signaller: Signallable = if settings.use_string_ids { + JanusVRSignallerStr::new(WebRTCSignallerRole::Consumer).upcast() + } else { + JanusVRSignallerU64::new(WebRTCSignallerRole::Consumer).upcast() + }; + + let self_weak = self.downgrade(); + signaller.connect("state-updated", false, move |args| { + let self_ = self_weak.upgrade()?; + let janus_state = args[1].get::().unwrap(); + + { + let mut state = self_.state.lock().unwrap(); + state.janus_state = janus_state; + } + + gst::debug!( + CAT, + imp = self_, + "signaller state updated: {:?}", + janus_state + ); + + self_.obj().notify("janus-state"); + + None + }); + + let _ = ws.set_signaller(signaller); + } + } + + impl GstObjectImpl for JanusVRWebRTCSrc {} + + impl ElementImpl for JanusVRWebRTCSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: LazyLock = + LazyLock::new(|| { + gst::subclass::ElementMetadata::new( + "JanusVRWebRTCSrc", + "Source/Network/WebRTC", + "WebRTC source with Janus Video Room signaller", + "Eva Pace ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + } + + impl BinImpl for JanusVRWebRTCSrc {} + + impl BaseWebRTCSrcImpl for JanusVRWebRTCSrc {} + + #[glib::object_subclass] + impl ObjectSubclass for JanusVRWebRTCSrc { + const NAME: &'static str = "GstJanusVRWebRTCSrc"; + type Type = crate::webrtcsrc::JanusVRWebRTCSrc; + type ParentType = crate::webrtcsrc::BaseWebRTCSrc; + type Interfaces = (gst::URIHandler,); + } + + impl URIHandlerImpl for JanusVRWebRTCSrc { + const URI_TYPE: gst::URIType = gst::URIType::Src; + + fn protocols() -> &'static [&'static str] { + &["gstjanusvr", "gstjanusvrs"] + } + + fn uri(&self) -> Option { + { + let settings = self.settings.lock().unwrap(); + if settings.use_string_ids { + // URI not supported for string ids + return None; + } + } + + let obj = self.obj(); + let base = obj.upcast_ref::().imp(); + let signaller = base.signaller(); + + let janus_endpoint = signaller.property::("janus-endpoint"); + let uri = janus_endpoint + .replace("wss://", "gstjansvrs://") + .replace("ws://", "gstjanusvr://"); + let room_id = signaller.property::("room-id"); + let producer_peer_id = signaller.property::("producer-peer-id"); + + Some(format!( + "{uri}?use-string-ids=false&room-id={room_id}&producer-peer-id={producer_peer_id}" + )) + } + + fn set_uri(&self, uri: &str) -> Result<(), glib::Error> { + gst::debug!(CAT, imp = self, "parsing URI {uri}"); + + let uri = Url::from_str(uri) + .map_err(|err| glib::Error::new(gst::URIError::BadUri, &format!("{:?}", err)))?; + + let socket_scheme = match uri.scheme() { + "gstjanusvr" => Ok("ws"), + "gstjanusvrs" => Ok("wss"), + _ => Err(glib::Error::new( + gst::URIError::BadUri, + &format!("Invalid protocol: {}", uri.scheme()), + )), + }?; + + let port = uri + .port() + .map(|port| format!(":{port}")) + .unwrap_or_default(); + + let janus_endpoint = format!( + "{socket_scheme}://{}{port}{}", + uri.host_str().unwrap_or("127.0.0.1"), + uri.path() + ); + + let use_strings_ids = uri + .query_pairs() + .find(|(k, _v)| k == "use-string-ids") + .map(|(_k, v)| v.to_lowercase() == "true") + .unwrap_or_default(); + if use_strings_ids { + // TODO: we'd have to instantiate a JanusVRSignallerStr and set it on the src element + // but "signaller" is a construct-only property. + return Err(glib::Error::new( + gst::URIError::BadUri, + "use-string-ids=true not yet supported in URI", + )); + } + + let room_id = uri + .query_pairs() + .find(|(k, _v)| k == "room-id") + .map(|(_k, v)| v) + .ok_or(glib::Error::new(gst::URIError::BadUri, "room-id missing"))?; + let producer_peer_id = uri + .query_pairs() + .find(|(k, _v)| k == "producer-peer-id") + .map(|(_k, v)| v) + .ok_or(glib::Error::new( + gst::URIError::BadUri, + "producer-peer-id missing", + ))?; + + let obj = self.obj(); + let base = obj.upcast_ref::().imp(); + let signaller = base.signaller(); + + let room_id = room_id.parse::().map_err(|err| { + glib::Error::new(gst::URIError::BadUri, &format!("Invalid room-id: {err}")) + })?; + let producer_peer_id = producer_peer_id.parse::().map_err(|err| { + glib::Error::new( + gst::URIError::BadUri, + &format!("Invalid producer-peer-id: {err}"), + ) + })?; + + signaller.set_property("janus-endpoint", &janus_endpoint); + signaller.set_property("room-id", room_id); + signaller.set_property("producer-peer-id", producer_peer_id); + + Ok(()) + } + } +} diff --git a/net/webrtc/src/webrtcsrc/mod.rs b/net/webrtc/src/webrtcsrc/mod.rs index 9afb2ab05..61adb9603 100644 --- a/net/webrtc/src/webrtcsrc/mod.rs +++ b/net/webrtc/src/webrtcsrc/mod.rs @@ -59,6 +59,11 @@ glib::wrapper! { pub struct LiveKitWebRTCSrc(ObjectSubclass) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, gst::ChildProxy; } +#[cfg(feature = "janus")] +glib::wrapper! { + pub struct JanusVRWebRTCSrc(ObjectSubclass) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy; +} + glib::wrapper! { pub struct WebRTCSrcPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; } @@ -145,6 +150,46 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> { gst::Rank::NONE, LiveKitWebRTCSrc::static_type(), )?; + #[cfg(feature = "janus")] + /** + * element-janusvrwebrtcsrc: + * + * `JanusVRWebRTCSrc` is an element that integrates with the [Video Room plugin](https://janus.conf.meetecho.com/docs/videoroom) of the [Janus Gateway](https://github.com/meetecho/janus-gateway). + * It receives audio and/or video streams from WebRTC using Janus as the signaller. + * + * ## Examples + * + * First start sending a video stream to a janus room: + * + * ```bash + * $ gst-launch-1.0 videotestsrc ! janusvrwebrtcsink signaller::room-id=1234 signaller::feed-id=777 signaller::janus-endpoint=wss://janus.conf.meetecho.com/ws + * ``` + * + * You can then retrieve this stream using: + * + * ```bash + * $ gst-launch-1.0 janusvrwebrtcsrc signaller::room-id=1234 signaller::producer-peer-id=777 signaller::janus-endpoint=wss://janus.conf.meetecho.com/ws ! videoconvert ! autovideosink + * ``` + * + * You can also retrieve it using an URI: + * + * ```bash + * $ gst-play-1.0 "gstjanusvrs://janus.conf.meetecho.com/ws?room-id=1234&producer-peer-id=777" + * ``` + * + * ## See also + * + * The [documentation of the `janusvrwebrtcsink` element](https://gstreamer.freedesktop.org/documentation//rswebrtc/janusvrwebrtcsink.html). + * + * Since: plugins-rs-0.14.0 + * + */ + gst::Element::register( + plugin, + "janusvrwebrtcsrc", + gst::Rank::NONE, + JanusVRWebRTCSrc::static_type(), + )?; Ok(()) }