livekitwebrtcsrc: Add pad properties for various LiveKit participant / track metadata

The content of the TrackInfo and ParticipantInfo structs is exposed as
gst::Structure.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1794>
This commit is contained in:
Sebastian Dröge 2024-09-20 20:22:36 +03:00
parent ceb88d960f
commit 27dc76826e
5 changed files with 312 additions and 3 deletions

View file

@ -10195,7 +10195,7 @@
"caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
"type": "GstLiveKitWebRTCSrcPad"
},
"audio_%%u": {
"caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n",
@ -10207,7 +10207,7 @@
"caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
"type": "GstLiveKitWebRTCSrcPad"
},
"video_%%u": {
"caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\nvideo/x-av1:\n",
@ -11011,6 +11011,67 @@
}
}
},
"GstLiveKitWebRTCSrcPad": {
"hierarchy": [
"GstLiveKitWebRTCSrcPad",
"GstWebRTCSrcPad",
"GstGhostPad",
"GstProxyPad",
"GstPad",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"kind": "object",
"properties": {
"participant-info": {
"blurb": "Participant Information",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "null",
"readable": true,
"type": "GstStructure",
"writable": false
},
"participant-sid": {
"blurb": "Participant ID",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": false
},
"track-info": {
"blurb": "Track Information",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "null",
"readable": true,
"type": "GstStructure",
"writable": false
},
"track-sid": {
"blurb": "Track ID",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": false
}
}
},
"GstRSWebRTCSignallableIface": {
"hierarchy": [
"GstRSWebRTCSignallableIface",

View file

@ -84,6 +84,7 @@ struct Connection {
signal_task: JoinHandle<()>,
early_candidates: Option<Vec<String>>,
channels: Option<Channels>,
participants: HashMap<String, proto::ParticipantInfo>,
}
#[derive(Serialize, Deserialize)]
@ -487,6 +488,19 @@ impl Signaller {
}
_ => return,
}
if participant.state != proto::participant_info::State::Disconnected as i32 {
if let Some(ref mut connection) = &mut *self.connection.lock().unwrap() {
if !connection.participants.contains_key(&participant.sid) {
connection
.participants
.insert(participant.sid.clone(), participant.clone());
}
}
} else if let Some(ref mut connection) = &mut *self.connection.lock().unwrap() {
connection.participants.remove(&participant.sid);
}
let meta = Some(&participant.metadata)
.filter(|meta| !meta.is_empty())
.and_then(|meta| gst::Structure::from_str(meta).ok());
@ -537,6 +551,13 @@ impl Signaller {
.await;
signal_client.close().await;
}
pub(crate) fn participant_info(&self, participant_sid: &str) -> Option<proto::ParticipantInfo> {
let connection = self.connection.lock().unwrap();
let connection = connection.as_ref()?;
let participant = connection.participants.get(participant_sid)?;
Some(participant.clone())
}
}
impl SignallableImpl for Signaller {
@ -692,6 +713,7 @@ impl SignallableImpl for Signaller {
pending_tracks: Default::default(),
early_candidates: Some(Vec::new()),
channels: None,
participants: HashMap::default(),
};
if let Ok(mut sc) = imp.connection.lock() {

View file

@ -1785,7 +1785,7 @@ pub(super) mod whip {
#[cfg(feature = "livekit")]
pub(super) mod livekit {
use super::*;
use crate::livekit_signaller::LiveKitSignaller;
use crate::{livekit_signaller::LiveKitSignaller, webrtcsrc::pad::WebRTCSrcPadImpl};
#[derive(Default)]
pub struct LiveKitWebRTCSrc;
@ -1807,6 +1807,26 @@ pub(super) mod livekit {
impl BinImpl for LiveKitWebRTCSrc {}
impl ElementImpl for LiveKitWebRTCSrc {
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
super::BaseWebRTCSrc::pad_templates()
.iter()
.map(|pad_templ| {
gst::PadTemplate::with_gtype(
pad_templ.name_template(),
pad_templ.direction(),
pad_templ.presence(),
pad_templ.caps(),
super::super::LiveKitWebRTCSrcPad::static_type(),
)
.unwrap()
})
.collect()
});
PAD_TEMPLATES.as_ref()
}
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
@ -1829,4 +1849,199 @@ pub(super) mod livekit {
type Type = crate::webrtcsrc::LiveKitWebRTCSrc;
type ParentType = crate::webrtcsrc::BaseWebRTCSrc;
}
#[derive(Default)]
pub struct LiveKitWebRTCSrcPad {}
fn participant_permission_to_structure(
participant_permission: &livekit_protocol::ParticipantPermission,
) -> gst::Structure {
gst::Structure::builder("livekit/participant-permission")
.field("can-subscribe", participant_permission.can_subscribe)
.field("can-publish", participant_permission.can_publish)
.field("can-publish-data", participant_permission.can_publish_data)
.field_from_iter::<gst::Array>(
"can-publish-sources",
participant_permission
.can_publish_sources
.iter()
.map(|v| v.to_send_value()),
)
.field("hidden", participant_permission.hidden)
.field("recorder", participant_permission.recorder)
.field(
"can-update-metadata",
participant_permission.can_update_metadata,
)
.field("agent", participant_permission.agent)
.build()
}
fn track_info_to_structure(track_info: &livekit_protocol::TrackInfo) -> gst::Structure {
gst::Structure::builder("livekit/track-info")
.field("sid", &track_info.sid)
.field(
"type",
livekit_protocol::TrackType::try_from(track_info.r#type)
.map(|t| t.as_str_name())
.unwrap_or("unknown"),
)
.field("name", &track_info.name)
.field("muted", track_info.muted)
.field("width", track_info.width)
.field("height", track_info.height)
.field("simulcast", track_info.simulcast)
.field("disable-dtx", track_info.disable_dtx)
.field(
"source",
livekit_protocol::TrackSource::try_from(track_info.source)
.map(|s| s.as_str_name())
.unwrap_or("unknown"),
)
//.field_from_iter::<gst::Array>("layers", track_info.layers.iter().todo!())
.field("mime-type", &track_info.mime_type)
.field("mid", &track_info.mid)
//.field_from_iter::<gst::Array>("codecs", track_info.codecs.iter().todo!())
.field("stereo", track_info.stereo)
.field("disable-red", track_info.disable_red)
.field(
"encryption",
livekit_protocol::encryption::Type::try_from(track_info.encryption)
.map(|e| e.as_str_name())
.unwrap_or("unknown"),
)
.field("stream", &track_info.stream)
//.field("version", todo!())
.build()
}
fn participant_info_to_structure(
participant_info: &livekit_protocol::ParticipantInfo,
) -> gst::Structure {
gst::Structure::builder("livekit/participant-info")
.field("sid", &participant_info.sid)
.field("identity", &participant_info.identity)
.field(
"state",
livekit_protocol::participant_info::State::try_from(participant_info.state)
.map(|s| s.as_str_name())
.unwrap_or("unknown"),
)
.field_from_iter::<gst::Array>(
"tracks",
participant_info.tracks.iter().map(track_info_to_structure),
)
.field("metadata", &participant_info.metadata)
.field("joined-at", participant_info.joined_at)
.field("name", &participant_info.name)
.field("version", participant_info.version)
.field_if_some(
"permission",
participant_info
.permission
.as_ref()
.map(participant_permission_to_structure),
)
.field("region", &participant_info.region)
.field("is-publisher", participant_info.is_publisher)
.field(
"kind",
livekit_protocol::participant_info::Kind::try_from(participant_info.kind)
.map(|k| k.as_str_name())
.unwrap_or("unknown"),
)
.build()
}
impl LiveKitWebRTCSrcPad {
fn participant_track_sid(&self) -> Option<(String, String)> {
// msid format is "participant_sid|track_sid"
let msid = self.obj().property::<Option<String>>("msid")?;
let (participant_sid, track_sid) = msid.split_once('|')?;
Some((String::from(participant_sid), String::from(track_sid)))
}
fn participant_info(&self) -> Option<gst::Structure> {
let participant_sid = self.participant_sid()?;
let webrtcbin = self
.obj()
.parent()
.and_downcast::<super::super::BaseWebRTCSrc>()?;
let signaller = webrtcbin.property::<LiveKitSignaller>("signaller");
let participant_info = signaller.imp().participant_info(&participant_sid)?;
Some(participant_info_to_structure(&participant_info))
}
fn track_info(&self) -> Option<gst::Structure> {
let (participant_sid, track_sid) = self.participant_track_sid()?;
let webrtcbin = self
.obj()
.parent()
.and_downcast::<super::super::BaseWebRTCSrc>()?;
let signaller = webrtcbin.property::<LiveKitSignaller>("signaller");
let participant_info = signaller.imp().participant_info(&participant_sid)?;
participant_info
.tracks
.iter()
.find(|t| t.sid == track_sid)
.map(track_info_to_structure)
}
fn participant_sid(&self) -> Option<String> {
self.participant_track_sid()
.map(|(participant_sid, _track_sid)| participant_sid)
}
fn track_sid(&self) -> Option<String> {
self.participant_track_sid()
.map(|(_participant_sid, track_sid)| track_sid)
}
}
#[glib::object_subclass]
impl ObjectSubclass for LiveKitWebRTCSrcPad {
const NAME: &'static str = "GstLiveKitWebRTCSrcPad";
type Type = super::super::LiveKitWebRTCSrcPad;
type ParentType = super::super::WebRTCSrcPad;
}
impl ObjectImpl for LiveKitWebRTCSrcPad {
fn properties() -> &'static [glib::ParamSpec] {
static PROPS: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecBoxed::builder::<gst::Structure>("participant-info")
.flags(glib::ParamFlags::READABLE)
.blurb("Participant Information")
.build(),
glib::ParamSpecBoxed::builder::<gst::Structure>("track-info")
.flags(glib::ParamFlags::READABLE)
.blurb("Track Information")
.build(),
glib::ParamSpecString::builder("participant-sid")
.flags(glib::ParamFlags::READABLE)
.blurb("Participant ID")
.build(),
glib::ParamSpecString::builder("track-sid")
.flags(glib::ParamFlags::READABLE)
.blurb("Track ID")
.build(),
]
});
PROPS.as_ref()
}
fn property(&self, _sid: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"participant-info" => self.participant_info().into(),
"track-info" => self.track_info().into(),
"participant-sid" => self.participant_sid().into(),
"track-sid" => self.track_sid().into(),
name => panic!("no readable property {name:?}"),
}
}
}
impl GstObjectImpl for LiveKitWebRTCSrcPad {}
impl PadImpl for LiveKitWebRTCSrcPad {}
impl ProxyPadImpl for LiveKitWebRTCSrcPad {}
impl GhostPadImpl for LiveKitWebRTCSrcPad {}
impl WebRTCSrcPadImpl for LiveKitWebRTCSrcPad {}
}

View file

@ -63,10 +63,17 @@ glib::wrapper! {
pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
}
#[cfg(feature = "livekit")]
glib::wrapper! {
pub struct LiveKitWebRTCSrcPad(ObjectSubclass<imp::livekit::LiveKitWebRTCSrcPad>) @extends WebRTCSrcPad, gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
}
pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> {
BaseWebRTCSrc::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
WebRTCSignallerRole::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
WebRTCSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
#[cfg(feature = "livekit")]
LiveKitWebRTCSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
Signallable::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
gst::Element::register(
plugin,

View file

@ -73,3 +73,7 @@ impl GstObjectImpl for WebRTCSrcPad {}
impl PadImpl for WebRTCSrcPad {}
impl ProxyPadImpl for WebRTCSrcPad {}
impl GhostPadImpl for WebRTCSrcPad {}
unsafe impl<T: WebRTCSrcPadImpl> IsSubclassable<T> for super::WebRTCSrcPad {}
pub trait WebRTCSrcPadImpl: GhostPadImpl {}