net/webrtc/janusvr: add new source element

Co-authored-by: Guillaume Desmottes <guillaume.desmottes@onestream.live>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1473>
This commit is contained in:
Eva Pace 2023-10-27 12:45:05 -03:00 committed by GStreamer Marge Bot
parent 34f16e0567
commit 999d08c5bc
6 changed files with 638 additions and 57 deletions

View file

@ -13253,6 +13253,65 @@
},
"rank": "none"
},
"janusvrwebrtcsrc": {
"author": "Eva Pace <epace@igalia.com>",
"description": "WebRTC source with Janus Video Room signaller",
"hierarchy": [
"GstJanusVRWebRTCSrc",
"GstBaseWebRTCSrc",
"GstBin",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy",
"GstURIHandler"
],
"klass": "Source/Network/WebRTC",
"pad-templates": {
"audio_%%s_%%u": {
"caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
},
"video_%%s_%%u": {
"caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
}
},
"properties": {
"janus-state": {
"blurb": "The current state of the signaller",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "initialized (0)",
"mutable": "null",
"readable": true,
"type": "GstJanusVRWebRTCJanusState",
"writable": false
},
"use-string-ids": {
"blurb": "Use strings instead of u64 for Janus IDs, see strings_ids config option in janus.plugin.videoroom.jcfg",
"conditionally-available": false,
"construct": false,
"construct-only": true,
"controllable": false,
"default": "false",
"mutable": "null",
"readable": true,
"type": "gboolean",
"writable": true
}
},
"rank": "none"
},
"livekitwebrtcsink": {
"author": "Olivier Crête <olivier.crete@collabora.com>",
"description": "WebRTC sink with LiveKit signaller",

View file

@ -1,7 +1,7 @@
// SPDX-License-Identifier: MPL-2.0
use crate::{
signaller::{Signallable, SignallableImpl},
signaller::{Signallable, SignallableImpl, WebRTCSignallerRole},
webrtcsink::JanusVRSignallerState,
RUNTIME,
};
@ -143,6 +143,12 @@ enum MessageBody {
Join(Join),
Publish,
Leave,
Start,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct SubscribeStream {
feed: JanusId,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
@ -156,6 +162,13 @@ enum Join {
#[serde(skip_serializing_if = "Option::is_none")]
display: Option<String>,
},
Subscriber {
room: JanusId,
streams: Vec<SubscribeStream>,
use_msid: bool,
#[serde(skip_serializing_if = "Option::is_none")]
private_id: Option<u64>,
},
}
#[derive(Serialize, Deserialize, Debug)]
@ -190,6 +203,10 @@ enum VideoRoomData {
#[serde(rename = "current-bitrate")]
current_bitrate: u32,
},
#[serde(rename = "attached")]
Attached {
streams: Vec<ConsumerStream>,
},
}
#[derive(Serialize, Deserialize, Debug)]
@ -199,6 +216,11 @@ enum PluginData {
VideoRoom { data: VideoRoomData },
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct ConsumerStream {
feed_id: JanusId,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct DataHolder {
id: u64,
@ -258,23 +280,29 @@ struct State {
// - self.state
// - self.settings
#[derive(Clone)]
#[derive(Clone, Debug)]
struct Settings {
janus_endpoint: String,
room_id: Option<JanusId>,
feed_id: Option<JanusId>,
display_name: Option<String>,
secret_key: Option<String>,
role: WebRTCSignallerRole,
// Producer only
display_name: Option<String>,
// Consumer only
producer_peer_id: Option<JanusId>,
}
impl Default for Settings {
fn default() -> Self {
Self {
janus_endpoint: "ws://127.0.0.1:8188".to_string(),
display_name: None,
room_id: None,
feed_id: None,
secret_key: None,
role: WebRTCSignallerRole::default(),
display_name: None,
producer_peer_id: None,
}
}
}
@ -285,8 +313,10 @@ pub struct Signaller {
state: Mutex<State>,
#[property(name="manual-sdp-munging", default = false, get = |_| false, type = bool, blurb = "Whether the signaller manages SDP munging itself")]
#[property(name="janus-endpoint", get, set, type = String, member = janus_endpoint, blurb = "The Janus server endpoint to POST SDP offer to")]
#[property(name="display-name", get, set, type = String, member = display_name, blurb = "The name of the publisher in the Janus Video Room")]
#[property(name="secret-key", get, set, type = String, member = secret_key, blurb = "The secret API key to communicate with Janus server")]
#[property(name="role", get, set, type = WebRTCSignallerRole, member = role, blurb = "Whether this signaller acts as either a Consumer or Producer. Listener is not currently supported.", builder(WebRTCSignallerRole::default()))]
// Producer only
#[property(name="display-name", get, set, type = String, member = display_name, blurb = "When in Producer role, the name of the publisher in the Janus Video Room.")]
// Properties whose type depends of the Janus ID format (u64 or string) are implemented in Signaller subclasses
settings: Mutex<Settings>,
}
@ -341,11 +371,16 @@ impl Signaller {
_ = tokio::time::sleep(Duration::from_secs(10)) => {
let (session_id, apisecret) = {
let state = this.state.lock().unwrap();
let settings = this.settings.lock().unwrap();
(
state.session_id.unwrap(),
settings.secret_key.clone(),
)
let session_id = if let Some(s) = state.session_id {
s
} else {
// session_id is set to None when the plugin is dying
break
};
(session_id,
settings.secret_key.clone())
};
let msg = OutgoingMessage::KeepAlive{
transaction: transaction_id(),
@ -422,6 +457,11 @@ impl Signaller {
}
fn handle_reply(&self, reply: JsonReply) {
let role = {
let settings = self.settings.lock().unwrap();
settings.role
};
match reply {
JsonReply::WebRTCUp => {
gst::trace!(CAT, imp = self, "WebRTC streaming is working!");
@ -449,8 +489,18 @@ impl Signaller {
"Attached to Janus Video Room plugin successfully, handle: {}",
data.id
);
self.set_handle_id(data.id);
self.join_room();
match role {
WebRTCSignallerRole::Consumer => {
self.join_room_subscriber();
}
WebRTCSignallerRole::Producer => {
self.join_room_publisher();
}
WebRTCSignallerRole::Listener => unreachable!(),
}
}
}
}
@ -459,51 +509,74 @@ impl Signaller {
} => {
if let Some(PluginData::VideoRoom { data: plugindata }) = plugindata {
match plugindata {
VideoRoomData::Joined { room, id } => {
let feed_id_changed = {
let mut feed_id_changed = false;
let mut state = self.state.lock().unwrap();
{
let mut settings = self.settings.lock().unwrap();
if settings.feed_id.as_ref() != Some(&id) {
settings.feed_id = Some(id.clone());
feed_id_changed = true;
VideoRoomData::Joined { room, id } => match role {
WebRTCSignallerRole::Consumer => {}
WebRTCSignallerRole::Producer => {
gst::info!(
CAT,
imp = self,
"Joined room {room}, publisher id: {id}",
);
let feed_id_changed = {
let mut feed_id_changed = false;
let mut state = self.state.lock().unwrap();
{
let mut settings = self.settings.lock().unwrap();
if settings.feed_id.as_ref() != Some(&id) {
settings.feed_id = Some(id.clone());
feed_id_changed = true;
}
}
state.feed_id = Some(id);
feed_id_changed
};
if feed_id_changed {
self.obj().notify("feed-id");
}
state.feed_id = Some(id);
self.obj().emit_by_name::<()>(
"state-updated",
&[&JanusVRSignallerState::RoomJoined],
);
feed_id_changed
};
if feed_id_changed {
self.obj().notify("feed-id");
self.session_requested();
}
gst::trace!(CAT, imp = self, "Joined room {room:?} successfully",);
self.obj().emit_by_name::<()>(
"state-updated",
&[&JanusVRSignallerState::RoomJoined],
);
self.session_requested();
}
WebRTCSignallerRole::Listener => unimplemented!(),
},
VideoRoomData::Event {
error, error_code, ..
} => {
if error_code.is_some() && error.is_some() {
self.raise_error(format!(
"code: {}, reason: {}",
error_code.unwrap(),
error.unwrap(),
));
if let (Some(error_code), Some(error)) = (error_code, error) {
self.raise_error(format!("code: {error_code}, reason: {error}",));
return;
}
if let Some(Jsep::Answer { sdp, .. }) = jsep {
gst::trace!(CAT, imp = self, "Session requested successfully");
self.handle_answer(sdp);
match role {
WebRTCSignallerRole::Consumer => {}
WebRTCSignallerRole::Producer => {
// publish stream and handle answer
if let Some(Jsep::Answer { sdp, .. }) = jsep {
gst::trace!(
CAT,
imp = self,
"Session requested successfully"
);
self.handle_answer(&sdp);
}
}
WebRTCSignallerRole::Listener => unimplemented!(),
}
}
VideoRoomData::Attached { .. } => {
assert_eq!(role, WebRTCSignallerRole::Consumer);
if let Some(Jsep::Offer { sdp, .. }) = jsep {
gst::trace!(CAT, imp = self, "Offer received!");
self.handle_offer(sdp);
}
}
VideoRoomData::Destroyed { room } => {
@ -598,7 +671,7 @@ impl Signaller {
});
}
fn join_room(&self) {
fn join_room_publisher(&self) {
let (session_id, handle_id, room, feed_id, display, apisecret) = {
let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
@ -633,6 +706,49 @@ impl Signaller {
});
}
fn join_room_subscriber(&self) {
let (session_id, handle_id, room, producer_peer_id, apisecret) = {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
if settings.room_id.is_none() {
self.raise_error("Janus Room ID must be set".to_string());
return;
}
(
state.session_id.unwrap(),
state.handle_id.unwrap(),
settings.room_id.as_ref().unwrap().clone(),
settings.producer_peer_id.as_ref().unwrap().clone(),
settings.secret_key.clone(),
)
};
gst::debug!(CAT, imp = self, "subscribing to feed {producer_peer_id}");
let producer_peer_id_str = producer_peer_id.to_string();
self.send(OutgoingMessage::Message {
transaction: transaction_id(),
session_id,
handle_id,
apisecret,
body: MessageBody::Join(Join::Subscriber {
room,
streams: vec![SubscribeStream {
feed: producer_peer_id,
}],
use_msid: false,
private_id: None,
}),
jsep: None,
});
self.obj()
.emit_by_name::<()>("session-started", &[&"unique", &producer_peer_id_str]);
}
fn leave_room(&self) {
let mut state = self.state.lock().unwrap();
let (session_id, handle_id, apisecret) = {
@ -745,7 +861,7 @@ impl Signaller {
);
}
fn handle_answer(&self, sdp: String) {
fn handle_answer(&self, sdp: &str) {
match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) {
Ok(ans_sdp) => {
let answer = gst_webrtc::WebRTCSessionDescription::new(
@ -761,6 +877,48 @@ impl Signaller {
}
}
fn handle_offer(&self, sdp: String) {
match gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) {
Ok(offer_sdp) => {
let offer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
offer_sdp,
);
self.obj()
.emit_by_name::<()>("session-description", &[&"unique", &offer]);
}
Err(err) => {
self.raise_error(format!("Could not parse answer SDP: {err}"));
}
}
self.obj()
.emit_by_name::<()>("state-updated", &[&JanusVRSignallerState::Negotiating]);
}
fn send_start(&self, sdp: &gst_webrtc::WebRTCSessionDescription) {
let (session_id, handle_id, apisecret) = {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
(
state.session_id.unwrap(),
state.handle_id.unwrap(),
settings.secret_key.clone(),
)
};
let sdp = sdp.sdp().as_text().unwrap();
self.send(OutgoingMessage::Message {
transaction: transaction_id(),
session_id,
handle_id,
apisecret,
jsep: Some(Jsep::Answer { sdp, trickle: None }),
body: MessageBody::Start,
});
}
fn emit_talking(&self, talking: bool, id: JanusId, audio_level: f32) {
let obj = self.obj();
(obj.class().as_ref().emit_talking)(&obj, talking, id, audio_level)
@ -769,6 +927,16 @@ impl Signaller {
impl SignallableImpl for Signaller {
fn start(&self) {
{
let settings = self.settings.lock().unwrap();
if let (WebRTCSignallerRole::Consumer, None) =
(&settings.role, &settings.producer_peer_id)
{
panic!("producer-peer-id should be set in Consumer role");
}
}
let this = self.obj().clone();
let imp = self.downgrade();
RUNTIME.spawn(async move {
@ -789,8 +957,32 @@ impl SignallableImpl for Signaller {
"sending SDP offer to peer: {:?}",
offer.sdp().as_text()
);
let role = {
let settings = self.settings.lock().unwrap();
settings.role
};
self.publish(offer);
match role {
WebRTCSignallerRole::Producer => {
gst::log!(
CAT,
imp = self,
"sending SDP offer to peer: {:?}",
offer.sdp().as_text()
);
self.publish(offer)
}
WebRTCSignallerRole::Consumer => {
gst::log!(
CAT,
imp = self,
"sending SDP answer to peer: {:?}",
offer.sdp().as_text()
);
self.send_start(offer)
}
WebRTCSignallerRole::Listener => { /*nothing yet*/ }
}
}
fn add_ice(
@ -886,6 +1078,8 @@ pub mod signaller_u64 {
pub struct SignallerU64 {
#[property(name="room-id", get, set, type = u64, get = Self::get_room_id, set = Self::set_room_id, blurb = "The Janus Room ID that will be joined to")]
#[property(name="feed-id", get, set, type = u64, get = Self::get_feed_id, set = Self::set_feed_id, blurb = "The Janus Feed ID to identify where the track is coming from")]
// Consumer only
#[property(name="producer-peer-id", get, set, type = u64, get = Self::get_producer_peer_id, set = Self::set_producer_peer_id, blurb = "The producer feed ID the signaller should subscribe to. Only used in Consumer mode.")]
/// Properties macro does not work with empty struct: https://github.com/gtk-rs/gtk-rs-core/issues/1110
_unused: bool,
}
@ -966,6 +1160,26 @@ pub mod signaller_u64 {
settings.feed_id = Some(JanusId::Num(id));
}
fn get_producer_peer_id(&self) -> u64 {
let obj = self.obj();
let signaller = obj.upcast_ref::<super::super::JanusVRSignaller>().imp();
let settings = signaller.settings.lock().unwrap();
settings
.producer_peer_id
.as_ref()
.map(|id| id.as_num())
.unwrap_or_default()
}
fn set_producer_peer_id(&self, id: u64) {
let obj = self.obj();
let signaller = obj.upcast_ref::<super::super::JanusVRSignaller>().imp();
let mut settings = signaller.settings.lock().unwrap();
settings.producer_peer_id = Some(JanusId::Num(id));
}
}
}
@ -977,6 +1191,8 @@ pub mod signaller_str {
pub struct SignallerStr {
#[property(name="room-id", get, set, type = String, get = Self::get_room_id, set = Self::set_room_id, blurb = "The Janus Room ID that will be joined to")]
#[property(name="feed-id", get, set, type = String, get = Self::get_feed_id, set = Self::set_feed_id, blurb = "The Janus Feed ID to identify where the track is coming from")]
// Consumer only
#[property(name="producer-peer-id", get, set, type = String, get = Self::get_producer_peer_id, set = Self::set_producer_peer_id, blurb = "The producer feed ID the signaller should subscribe to. Only used in Consumer mode.")]
/// Properties macro does not work with empty struct: https://github.com/gtk-rs/gtk-rs-core/issues/1110
_unused: bool,
}
@ -1057,5 +1273,25 @@ pub mod signaller_str {
settings.feed_id = Some(JanusId::Str(id));
}
fn get_producer_peer_id(&self) -> String {
let obj = self.obj();
let signaller = obj.upcast_ref::<super::super::JanusVRSignaller>().imp();
let settings = signaller.settings.lock().unwrap();
settings
.producer_peer_id
.as_ref()
.map(|id| id.as_string())
.unwrap_or_default()
}
fn set_producer_peer_id(&self, id: String) {
let obj = self.obj();
let signaller = obj.upcast_ref::<super::super::JanusVRSignaller>().imp();
let mut settings = signaller.settings.lock().unwrap();
settings.producer_peer_id = Some(JanusId::Str(id));
}
}
}

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: MPL-2.0
use crate::signaller::Signallable;
use crate::signaller::{Signallable, WebRTCSignallerRole};
use gst::{glib, glib::prelude::*, glib::subclass::prelude::*};
mod imp;
@ -58,9 +58,9 @@ glib::wrapper! {
pub struct JanusVRSignallerU64(ObjectSubclass<imp::signaller_u64::SignallerU64>) @extends JanusVRSignaller, @implements Signallable;
}
impl Default for JanusVRSignallerU64 {
fn default() -> Self {
glib::Object::new()
impl JanusVRSignallerU64 {
pub fn new(role: WebRTCSignallerRole) -> Self {
glib::Object::builder().property("role", role).build()
}
}
@ -69,8 +69,8 @@ glib::wrapper! {
pub struct JanusVRSignallerStr(ObjectSubclass<imp::signaller_str::SignallerStr>) @extends JanusVRSignaller, @implements Signallable;
}
impl Default for JanusVRSignallerStr {
fn default() -> Self {
glib::Object::new()
impl JanusVRSignallerStr {
pub fn new(role: WebRTCSignallerRole) -> Self {
glib::Object::builder().property("role", role).build()
}
}

View file

@ -6030,9 +6030,9 @@ pub(super) mod janus {
.imp();
let signaller: Signallable = if settings.use_string_ids {
JanusVRSignallerStr::default().upcast()
JanusVRSignallerStr::new(WebRTCSignallerRole::Producer).upcast()
} else {
JanusVRSignallerU64::default().upcast()
JanusVRSignallerU64::new(WebRTCSignallerRole::Producer).upcast()
};
let self_weak = self.downgrade();

View file

@ -2174,3 +2174,244 @@ pub(super) mod livekit {
impl GhostPadImpl for LiveKitWebRTCSrcPad {}
impl WebRTCSrcPadImpl for LiveKitWebRTCSrcPad {}
}
#[cfg(feature = "janus")]
pub(super) mod janus {
use super::*;
use crate::{
janusvr_signaller::{JanusVRSignallerStr, JanusVRSignallerU64},
webrtcsink::JanusVRSignallerState,
webrtcsrc::WebRTCSignallerRole,
};
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"janusvrwebrtcsrc",
gst::DebugColorFlags::empty(),
Some("WebRTC Janus Video Room src"),
)
});
#[derive(Debug, Clone, Default)]
struct JanusSettings {
use_string_ids: bool,
}
#[derive(Debug, Default)]
struct JanusState {
janus_state: JanusVRSignallerState,
}
#[derive(Default, glib::Properties)]
#[properties(wrapper_type = crate::webrtcsrc::JanusVRWebRTCSrc)]
pub struct JanusVRWebRTCSrc {
/**
* GstJanusVRWebRTCSrc:use-string-ids:
*
* By default Janus uses `u64` ids to identify the room, the feed, etc.
* But it can be changed to strings using the `strings_ids` option in `janus.plugin.videoroom.jcfg`.
* In such case, `janusvrwebrtcsrc` has to be created using `use-string-ids=true` so its signaller
* uses the right types for such ids and properties.
*
* Since: plugins-rs-0.14.0
*/
#[property(name="use-string-ids", get, construct_only, type = bool, member = use_string_ids, blurb = "Use strings instead of u64 for Janus IDs, see strings_ids config option in janus.plugin.videoroom.jcfg")]
settings: Mutex<JanusSettings>,
/**
* GstJanusVRWebRTCSrc:janus-state:
*
* The current state of the signaller.
* Since: plugins-rs-0.14.0
*/
#[property(
name = "janus-state",
get,
member = janus_state,
type = JanusVRSignallerState,
blurb = "The current state of the signaller",
builder(JanusVRSignallerState::Initialized)
)]
state: Mutex<JanusState>,
}
#[glib::derived_properties]
impl ObjectImpl for JanusVRWebRTCSrc {
fn constructed(&self) {
self.parent_constructed();
let settings = self.settings.lock().unwrap();
let element = self.obj();
let ws = element
.upcast_ref::<crate::webrtcsrc::BaseWebRTCSrc>()
.imp();
let signaller: Signallable = if settings.use_string_ids {
JanusVRSignallerStr::new(WebRTCSignallerRole::Consumer).upcast()
} else {
JanusVRSignallerU64::new(WebRTCSignallerRole::Consumer).upcast()
};
let self_weak = self.downgrade();
signaller.connect("state-updated", false, move |args| {
let self_ = self_weak.upgrade()?;
let janus_state = args[1].get::<JanusVRSignallerState>().unwrap();
{
let mut state = self_.state.lock().unwrap();
state.janus_state = janus_state;
}
gst::debug!(
CAT,
imp = self_,
"signaller state updated: {:?}",
janus_state
);
self_.obj().notify("janus-state");
None
});
let _ = ws.set_signaller(signaller);
}
}
impl GstObjectImpl for JanusVRWebRTCSrc {}
impl ElementImpl for JanusVRWebRTCSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> =
LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"JanusVRWebRTCSrc",
"Source/Network/WebRTC",
"WebRTC source with Janus Video Room signaller",
"Eva Pace <epace@igalia.com>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BinImpl for JanusVRWebRTCSrc {}
impl BaseWebRTCSrcImpl for JanusVRWebRTCSrc {}
#[glib::object_subclass]
impl ObjectSubclass for JanusVRWebRTCSrc {
const NAME: &'static str = "GstJanusVRWebRTCSrc";
type Type = crate::webrtcsrc::JanusVRWebRTCSrc;
type ParentType = crate::webrtcsrc::BaseWebRTCSrc;
type Interfaces = (gst::URIHandler,);
}
impl URIHandlerImpl for JanusVRWebRTCSrc {
const URI_TYPE: gst::URIType = gst::URIType::Src;
fn protocols() -> &'static [&'static str] {
&["gstjanusvr", "gstjanusvrs"]
}
fn uri(&self) -> Option<String> {
{
let settings = self.settings.lock().unwrap();
if settings.use_string_ids {
// URI not supported for string ids
return None;
}
}
let obj = self.obj();
let base = obj.upcast_ref::<crate::webrtcsrc::BaseWebRTCSrc>().imp();
let signaller = base.signaller();
let janus_endpoint = signaller.property::<String>("janus-endpoint");
let uri = janus_endpoint
.replace("wss://", "gstjansvrs://")
.replace("ws://", "gstjanusvr://");
let room_id = signaller.property::<u64>("room-id");
let producer_peer_id = signaller.property::<u64>("producer-peer-id");
Some(format!(
"{uri}?use-string-ids=false&room-id={room_id}&producer-peer-id={producer_peer_id}"
))
}
fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
gst::debug!(CAT, imp = self, "parsing URI {uri}");
let uri = Url::from_str(uri)
.map_err(|err| glib::Error::new(gst::URIError::BadUri, &format!("{:?}", err)))?;
let socket_scheme = match uri.scheme() {
"gstjanusvr" => Ok("ws"),
"gstjanusvrs" => Ok("wss"),
_ => Err(glib::Error::new(
gst::URIError::BadUri,
&format!("Invalid protocol: {}", uri.scheme()),
)),
}?;
let port = uri
.port()
.map(|port| format!(":{port}"))
.unwrap_or_default();
let janus_endpoint = format!(
"{socket_scheme}://{}{port}{}",
uri.host_str().unwrap_or("127.0.0.1"),
uri.path()
);
let use_strings_ids = uri
.query_pairs()
.find(|(k, _v)| k == "use-string-ids")
.map(|(_k, v)| v.to_lowercase() == "true")
.unwrap_or_default();
if use_strings_ids {
// TODO: we'd have to instantiate a JanusVRSignallerStr and set it on the src element
// but "signaller" is a construct-only property.
return Err(glib::Error::new(
gst::URIError::BadUri,
"use-string-ids=true not yet supported in URI",
));
}
let room_id = uri
.query_pairs()
.find(|(k, _v)| k == "room-id")
.map(|(_k, v)| v)
.ok_or(glib::Error::new(gst::URIError::BadUri, "room-id missing"))?;
let producer_peer_id = uri
.query_pairs()
.find(|(k, _v)| k == "producer-peer-id")
.map(|(_k, v)| v)
.ok_or(glib::Error::new(
gst::URIError::BadUri,
"producer-peer-id missing",
))?;
let obj = self.obj();
let base = obj.upcast_ref::<crate::webrtcsrc::BaseWebRTCSrc>().imp();
let signaller = base.signaller();
let room_id = room_id.parse::<u64>().map_err(|err| {
glib::Error::new(gst::URIError::BadUri, &format!("Invalid room-id: {err}"))
})?;
let producer_peer_id = producer_peer_id.parse::<u64>().map_err(|err| {
glib::Error::new(
gst::URIError::BadUri,
&format!("Invalid producer-peer-id: {err}"),
)
})?;
signaller.set_property("janus-endpoint", &janus_endpoint);
signaller.set_property("room-id", room_id);
signaller.set_property("producer-peer-id", producer_peer_id);
Ok(())
}
}
}

View file

@ -59,6 +59,11 @@ glib::wrapper! {
pub struct LiveKitWebRTCSrc(ObjectSubclass<imp::livekit::LiveKitWebRTCSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, gst::ChildProxy;
}
#[cfg(feature = "janus")]
glib::wrapper! {
pub struct JanusVRWebRTCSrc(ObjectSubclass<imp::janus::JanusVRWebRTCSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
}
glib::wrapper! {
pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
}
@ -145,6 +150,46 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> {
gst::Rank::NONE,
LiveKitWebRTCSrc::static_type(),
)?;
#[cfg(feature = "janus")]
/**
* element-janusvrwebrtcsrc:
*
* `JanusVRWebRTCSrc` is an element that integrates with the [Video Room plugin](https://janus.conf.meetecho.com/docs/videoroom) of the [Janus Gateway](https://github.com/meetecho/janus-gateway).
* It receives audio and/or video streams from WebRTC using Janus as the signaller.
*
* ## Examples
*
* First start sending a video stream to a janus room:
*
* ```bash
* $ gst-launch-1.0 videotestsrc ! janusvrwebrtcsink signaller::room-id=1234 signaller::feed-id=777 signaller::janus-endpoint=wss://janus.conf.meetecho.com/ws
* ```
*
* You can then retrieve this stream using:
*
* ```bash
* $ gst-launch-1.0 janusvrwebrtcsrc signaller::room-id=1234 signaller::producer-peer-id=777 signaller::janus-endpoint=wss://janus.conf.meetecho.com/ws ! videoconvert ! autovideosink
* ```
*
* You can also retrieve it using an URI:
*
* ```bash
* $ gst-play-1.0 "gstjanusvrs://janus.conf.meetecho.com/ws?room-id=1234&producer-peer-id=777"
* ```
*
* ## See also
*
* The [documentation of the `janusvrwebrtcsink` element](https://gstreamer.freedesktop.org/documentation//rswebrtc/janusvrwebrtcsink.html).
*
* Since: plugins-rs-0.14.0
*
*/
gst::Element::register(
plugin,
"janusvrwebrtcsrc",
gst::Rank::NONE,
JanusVRWebRTCSrc::static_type(),
)?;
Ok(())
}