From 27dc76826e71e0767cff6298714ffefb4c20354c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 20 Sep 2024 20:22:36 +0300 Subject: [PATCH] livekitwebrtcsrc: Add pad properties for various LiveKit participant / track metadata The content of the TrackInfo and ParticipantInfo structs is exposed as gst::Structure. Part-of: --- docs/plugins/gst_plugins_cache.json | 65 ++++++- net/webrtc/src/livekit_signaller/imp.rs | 22 +++ net/webrtc/src/webrtcsrc/imp.rs | 217 +++++++++++++++++++++++- net/webrtc/src/webrtcsrc/mod.rs | 7 + net/webrtc/src/webrtcsrc/pad.rs | 4 + 5 files changed, 312 insertions(+), 3 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 4824fac5..87a03648 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -10195,7 +10195,7 @@ "caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n", "direction": "src", "presence": "sometimes", - "type": "GstWebRTCSrcPad" + "type": "GstLiveKitWebRTCSrcPad" }, "audio_%%u": { "caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n", @@ -10207,7 +10207,7 @@ "caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n", "direction": "src", "presence": "sometimes", - "type": "GstWebRTCSrcPad" + "type": "GstLiveKitWebRTCSrcPad" }, "video_%%u": { "caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n", @@ -11011,6 +11011,67 @@ } } }, + "GstLiveKitWebRTCSrcPad": { + "hierarchy": [ + "GstLiveKitWebRTCSrcPad", + "GstWebRTCSrcPad", + "GstGhostPad", + "GstProxyPad", + "GstPad", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "kind": "object", + "properties": { + "participant-info": { + "blurb": "Participant Information", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "null", + "readable": true, + "type": "GstStructure", + "writable": false + }, + "participant-sid": { + "blurb": "Participant ID", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": false + }, + "track-info": { + "blurb": "Track Information", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "null", + "readable": true, + "type": "GstStructure", + "writable": false + }, + "track-sid": { + "blurb": "Track ID", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": false + } + } + }, "GstRSWebRTCSignallableIface": { "hierarchy": [ "GstRSWebRTCSignallableIface", diff --git a/net/webrtc/src/livekit_signaller/imp.rs b/net/webrtc/src/livekit_signaller/imp.rs index 49797e5c..366b9836 100644 --- a/net/webrtc/src/livekit_signaller/imp.rs +++ b/net/webrtc/src/livekit_signaller/imp.rs @@ -84,6 +84,7 @@ struct Connection { signal_task: JoinHandle<()>, early_candidates: Option>, channels: Option, + participants: HashMap, } #[derive(Serialize, Deserialize)] @@ -487,6 +488,19 @@ impl Signaller { } _ => return, } + + if participant.state != proto::participant_info::State::Disconnected as i32 { + if let Some(ref mut connection) = &mut *self.connection.lock().unwrap() { + if !connection.participants.contains_key(&participant.sid) { + connection + .participants + .insert(participant.sid.clone(), participant.clone()); + } + } + } else if let Some(ref mut connection) = &mut *self.connection.lock().unwrap() { + connection.participants.remove(&participant.sid); + } + let meta = Some(&participant.metadata) .filter(|meta| !meta.is_empty()) .and_then(|meta| gst::Structure::from_str(meta).ok()); @@ -537,6 +551,13 @@ impl Signaller { .await; signal_client.close().await; } + + pub(crate) fn participant_info(&self, participant_sid: &str) -> Option { + let connection = self.connection.lock().unwrap(); + let connection = connection.as_ref()?; + let participant = connection.participants.get(participant_sid)?; + Some(participant.clone()) + } } impl SignallableImpl for Signaller { @@ -692,6 +713,7 @@ impl SignallableImpl for Signaller { pending_tracks: Default::default(), early_candidates: Some(Vec::new()), channels: None, + participants: HashMap::default(), }; if let Ok(mut sc) = imp.connection.lock() { diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 0709792d..7022eedd 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -1785,7 +1785,7 @@ pub(super) mod whip { #[cfg(feature = "livekit")] pub(super) mod livekit { use super::*; - use crate::livekit_signaller::LiveKitSignaller; + use crate::{livekit_signaller::LiveKitSignaller, webrtcsrc::pad::WebRTCSrcPadImpl}; #[derive(Default)] pub struct LiveKitWebRTCSrc; @@ -1807,6 +1807,26 @@ pub(super) mod livekit { impl BinImpl for LiveKitWebRTCSrc {} impl ElementImpl for LiveKitWebRTCSrc { + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + super::BaseWebRTCSrc::pad_templates() + .iter() + .map(|pad_templ| { + gst::PadTemplate::with_gtype( + pad_templ.name_template(), + pad_templ.direction(), + pad_templ.presence(), + pad_templ.caps(), + super::super::LiveKitWebRTCSrcPad::static_type(), + ) + .unwrap() + }) + .collect() + }); + + PAD_TEMPLATES.as_ref() + } + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( @@ -1829,4 +1849,199 @@ pub(super) mod livekit { type Type = crate::webrtcsrc::LiveKitWebRTCSrc; type ParentType = crate::webrtcsrc::BaseWebRTCSrc; } + + #[derive(Default)] + pub struct LiveKitWebRTCSrcPad {} + + fn participant_permission_to_structure( + participant_permission: &livekit_protocol::ParticipantPermission, + ) -> gst::Structure { + gst::Structure::builder("livekit/participant-permission") + .field("can-subscribe", participant_permission.can_subscribe) + .field("can-publish", participant_permission.can_publish) + .field("can-publish-data", participant_permission.can_publish_data) + .field_from_iter::( + "can-publish-sources", + participant_permission + .can_publish_sources + .iter() + .map(|v| v.to_send_value()), + ) + .field("hidden", participant_permission.hidden) + .field("recorder", participant_permission.recorder) + .field( + "can-update-metadata", + participant_permission.can_update_metadata, + ) + .field("agent", participant_permission.agent) + .build() + } + + fn track_info_to_structure(track_info: &livekit_protocol::TrackInfo) -> gst::Structure { + gst::Structure::builder("livekit/track-info") + .field("sid", &track_info.sid) + .field( + "type", + livekit_protocol::TrackType::try_from(track_info.r#type) + .map(|t| t.as_str_name()) + .unwrap_or("unknown"), + ) + .field("name", &track_info.name) + .field("muted", track_info.muted) + .field("width", track_info.width) + .field("height", track_info.height) + .field("simulcast", track_info.simulcast) + .field("disable-dtx", track_info.disable_dtx) + .field( + "source", + livekit_protocol::TrackSource::try_from(track_info.source) + .map(|s| s.as_str_name()) + .unwrap_or("unknown"), + ) + //.field_from_iter::("layers", track_info.layers.iter().todo!()) + .field("mime-type", &track_info.mime_type) + .field("mid", &track_info.mid) + //.field_from_iter::("codecs", track_info.codecs.iter().todo!()) + .field("stereo", track_info.stereo) + .field("disable-red", track_info.disable_red) + .field( + "encryption", + livekit_protocol::encryption::Type::try_from(track_info.encryption) + .map(|e| e.as_str_name()) + .unwrap_or("unknown"), + ) + .field("stream", &track_info.stream) + //.field("version", todo!()) + .build() + } + + fn participant_info_to_structure( + participant_info: &livekit_protocol::ParticipantInfo, + ) -> gst::Structure { + gst::Structure::builder("livekit/participant-info") + .field("sid", &participant_info.sid) + .field("identity", &participant_info.identity) + .field( + "state", + livekit_protocol::participant_info::State::try_from(participant_info.state) + .map(|s| s.as_str_name()) + .unwrap_or("unknown"), + ) + .field_from_iter::( + "tracks", + participant_info.tracks.iter().map(track_info_to_structure), + ) + .field("metadata", &participant_info.metadata) + .field("joined-at", participant_info.joined_at) + .field("name", &participant_info.name) + .field("version", participant_info.version) + .field_if_some( + "permission", + participant_info + .permission + .as_ref() + .map(participant_permission_to_structure), + ) + .field("region", &participant_info.region) + .field("is-publisher", participant_info.is_publisher) + .field( + "kind", + livekit_protocol::participant_info::Kind::try_from(participant_info.kind) + .map(|k| k.as_str_name()) + .unwrap_or("unknown"), + ) + .build() + } + + impl LiveKitWebRTCSrcPad { + fn participant_track_sid(&self) -> Option<(String, String)> { + // msid format is "participant_sid|track_sid" + let msid = self.obj().property::>("msid")?; + let (participant_sid, track_sid) = msid.split_once('|')?; + Some((String::from(participant_sid), String::from(track_sid))) + } + + fn participant_info(&self) -> Option { + let participant_sid = self.participant_sid()?; + let webrtcbin = self + .obj() + .parent() + .and_downcast::()?; + let signaller = webrtcbin.property::("signaller"); + let participant_info = signaller.imp().participant_info(&participant_sid)?; + Some(participant_info_to_structure(&participant_info)) + } + + fn track_info(&self) -> Option { + let (participant_sid, track_sid) = self.participant_track_sid()?; + let webrtcbin = self + .obj() + .parent() + .and_downcast::()?; + let signaller = webrtcbin.property::("signaller"); + let participant_info = signaller.imp().participant_info(&participant_sid)?; + participant_info + .tracks + .iter() + .find(|t| t.sid == track_sid) + .map(track_info_to_structure) + } + + fn participant_sid(&self) -> Option { + self.participant_track_sid() + .map(|(participant_sid, _track_sid)| participant_sid) + } + + fn track_sid(&self) -> Option { + self.participant_track_sid() + .map(|(_participant_sid, track_sid)| track_sid) + } + } + + #[glib::object_subclass] + impl ObjectSubclass for LiveKitWebRTCSrcPad { + const NAME: &'static str = "GstLiveKitWebRTCSrcPad"; + type Type = super::super::LiveKitWebRTCSrcPad; + type ParentType = super::super::WebRTCSrcPad; + } + + impl ObjectImpl for LiveKitWebRTCSrcPad { + fn properties() -> &'static [glib::ParamSpec] { + static PROPS: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecBoxed::builder::("participant-info") + .flags(glib::ParamFlags::READABLE) + .blurb("Participant Information") + .build(), + glib::ParamSpecBoxed::builder::("track-info") + .flags(glib::ParamFlags::READABLE) + .blurb("Track Information") + .build(), + glib::ParamSpecString::builder("participant-sid") + .flags(glib::ParamFlags::READABLE) + .blurb("Participant ID") + .build(), + glib::ParamSpecString::builder("track-sid") + .flags(glib::ParamFlags::READABLE) + .blurb("Track ID") + .build(), + ] + }); + PROPS.as_ref() + } + fn property(&self, _sid: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "participant-info" => self.participant_info().into(), + "track-info" => self.track_info().into(), + "participant-sid" => self.participant_sid().into(), + "track-sid" => self.track_sid().into(), + name => panic!("no readable property {name:?}"), + } + } + } + impl GstObjectImpl for LiveKitWebRTCSrcPad {} + impl PadImpl for LiveKitWebRTCSrcPad {} + impl ProxyPadImpl for LiveKitWebRTCSrcPad {} + impl GhostPadImpl for LiveKitWebRTCSrcPad {} + impl WebRTCSrcPadImpl for LiveKitWebRTCSrcPad {} } diff --git a/net/webrtc/src/webrtcsrc/mod.rs b/net/webrtc/src/webrtcsrc/mod.rs index fe1b3e50..6678c0e4 100644 --- a/net/webrtc/src/webrtcsrc/mod.rs +++ b/net/webrtc/src/webrtcsrc/mod.rs @@ -63,10 +63,17 @@ glib::wrapper! { pub struct WebRTCSrcPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; } +#[cfg(feature = "livekit")] +glib::wrapper! { + pub struct LiveKitWebRTCSrcPad(ObjectSubclass) @extends WebRTCSrcPad, gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; +} + pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> { BaseWebRTCSrc::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); WebRTCSignallerRole::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); WebRTCSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + #[cfg(feature = "livekit")] + LiveKitWebRTCSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); Signallable::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); gst::Element::register( plugin, diff --git a/net/webrtc/src/webrtcsrc/pad.rs b/net/webrtc/src/webrtcsrc/pad.rs index afe78482..9b14283e 100644 --- a/net/webrtc/src/webrtcsrc/pad.rs +++ b/net/webrtc/src/webrtcsrc/pad.rs @@ -73,3 +73,7 @@ impl GstObjectImpl for WebRTCSrcPad {} impl PadImpl for WebRTCSrcPad {} impl ProxyPadImpl for WebRTCSrcPad {} impl GhostPadImpl for WebRTCSrcPad {} + +unsafe impl IsSubclassable for super::WebRTCSrcPad {} + +pub trait WebRTCSrcPadImpl: GhostPadImpl {}