webrtc: janus: use serde tag to describe Janus API

Makes the API clearer and less prone to errors by relying on enums
rather than strings.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2244>
This commit is contained in:
Guillaume Desmottes 2025-05-13 11:06:30 +02:00 committed by GStreamer Marge Bot
parent deb7325e7c
commit 063849f458

View file

@ -77,71 +77,19 @@ impl std::fmt::Display for JanusId {
} }
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct KeepAliveMsg { #[serde(tag = "type")]
janus: String, #[serde(rename_all = "lowercase")]
transaction: String, enum Jsep {
session_id: u64, Offer {
apisecret: Option<String>, sdp: String,
} #[serde(skip_serializing_if = "Option::is_none")]
trickle: Option<bool>,
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] },
struct CreateSessionMsg { Answer {
janus: String, sdp: String,
transaction: String, #[serde(skip_serializing_if = "Option::is_none")]
apisecret: Option<String>, trickle: Option<bool>,
} },
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct AttachPluginMsg {
janus: String,
transaction: String,
plugin: String,
session_id: u64,
apisecret: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct RoomRequestBody {
request: String,
ptype: String,
room: JanusId,
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<JanusId>,
#[serde(skip_serializing_if = "Option::is_none")]
display: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct RoomRequestMsg {
janus: String,
transaction: String,
session_id: u64,
handle_id: u64,
apisecret: Option<String>,
body: RoomRequestBody,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct PublishBody {
request: String,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct Jsep {
sdp: String,
trickle: Option<bool>,
r#type: String,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct PublishMsg {
janus: String,
transaction: String,
session_id: u64,
handle_id: u64,
apisecret: Option<String>,
body: PublishBody,
jsep: Jsep,
} }
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
@ -152,24 +100,62 @@ struct Candidate {
} }
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct TrickleMsg { #[serde(tag = "janus")]
janus: String, #[serde(rename_all = "lowercase")]
transaction: String, enum OutgoingMessage {
session_id: u64, KeepAlive {
handle_id: u64, transaction: String,
apisecret: Option<String>, session_id: u64,
candidate: Candidate, apisecret: Option<String>,
},
Create {
transaction: String,
apisecret: Option<String>,
},
Attach {
transaction: String,
plugin: String,
session_id: u64,
apisecret: Option<String>,
},
Message {
transaction: String,
session_id: u64,
handle_id: u64,
apisecret: Option<String>,
body: MessageBody,
#[serde(skip_serializing_if = "Option::is_none")]
jsep: Option<Jsep>,
},
Trickle {
transaction: String,
session_id: u64,
handle_id: u64,
apisecret: Option<String>,
candidate: Candidate,
},
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)] #[serde(tag = "request")]
enum OutgoingMessage { #[serde(rename_all = "snake_case")]
KeepAlive(KeepAliveMsg), enum MessageBody {
CreateSession(CreateSessionMsg), Join(Join),
AttachPlugin(AttachPluginMsg), Publish,
RoomRequest(RoomRequestMsg), Leave,
Publish(PublishMsg), }
Trickle(TrickleMsg),
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(tag = "ptype")]
#[serde(rename_all = "lowercase")]
enum Join {
Publisher {
room: JanusId,
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<JanusId>,
#[serde(skip_serializing_if = "Option::is_none")]
display: Option<String>,
},
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -384,12 +370,11 @@ impl Signaller {
settings.secret_key.clone(), settings.secret_key.clone(),
) )
}; };
let msg = OutgoingMessage::KeepAlive(KeepAliveMsg { let msg = OutgoingMessage::KeepAlive{
janus: "keepalive".to_string(),
transaction, transaction,
session_id, session_id,
apisecret, apisecret,
}); };
res = ws_sink res = ws_sink
.send(WsMessage::text(serde_json::to_string(&msg).unwrap())) .send(WsMessage::text(serde_json::to_string(&msg).unwrap()))
.await; .await;
@ -536,11 +521,9 @@ impl Signaller {
return; return;
} }
if let Some(jsep) = event.jsep { if let Some(Jsep::Answer { sdp, .. }) = event.jsep {
if jsep.r#type == "answer" { gst::trace!(CAT, imp = self, "Session requested successfully");
gst::trace!(CAT, imp = self, "Session requested successfully"); self.handle_answer(sdp);
self.handle_answer(jsep.sdp);
}
} }
} }
VideoRoomData::Destroyed(room_destroyed) => { VideoRoomData::Destroyed(room_destroyed) => {
@ -601,11 +584,10 @@ impl Signaller {
self.set_transaction_id(transaction.clone()); self.set_transaction_id(transaction.clone());
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let apisecret = settings.secret_key.clone(); let apisecret = settings.secret_key.clone();
self.send(OutgoingMessage::CreateSession(CreateSessionMsg { self.send(OutgoingMessage::Create {
janus: "create".to_string(),
transaction, transaction,
apisecret, apisecret,
})); });
} }
fn set_session_id(&self, session_id: u64) { fn set_session_id(&self, session_id: u64) {
@ -639,13 +621,12 @@ impl Signaller {
settings.secret_key.clone(), settings.secret_key.clone(),
) )
}; };
self.send(OutgoingMessage::AttachPlugin(AttachPluginMsg { self.send(OutgoingMessage::Attach {
janus: "attach".to_string(),
transaction, transaction,
plugin: "janus.plugin.videoroom".to_string(), plugin: "janus.plugin.videoroom".to_string(),
session_id, session_id,
apisecret, apisecret,
})); });
} }
fn join_room(&self) { fn join_room(&self) {
@ -670,25 +651,23 @@ impl Signaller {
settings.secret_key.clone(), settings.secret_key.clone(),
) )
}; };
self.send(OutgoingMessage::RoomRequest(RoomRequestMsg { self.send(OutgoingMessage::Message {
janus: "message".to_string(),
transaction, transaction,
session_id, session_id,
handle_id, handle_id,
apisecret, apisecret,
body: RoomRequestBody { body: MessageBody::Join(Join::Publisher {
request: "join".to_string(),
ptype: "publisher".to_string(),
room, room,
id: feed_id, id: feed_id,
display, display,
}, }),
})); jsep: None,
});
} }
fn leave_room(&self) { fn leave_room(&self) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let (transaction, session_id, handle_id, room, feed_id, display, apisecret) = { let (transaction, session_id, handle_id, apisecret) = {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
if settings.room_id.is_none() { if settings.room_id.is_none() {
@ -700,29 +679,20 @@ impl Signaller {
state.transaction_id.clone().unwrap(), state.transaction_id.clone().unwrap(),
state.session_id.unwrap(), state.session_id.unwrap(),
state.handle_id.unwrap(), state.handle_id.unwrap(),
state.room_id.clone().unwrap(),
state.feed_id.clone().unwrap(),
settings.display_name.clone(),
settings.secret_key.clone(), settings.secret_key.clone(),
) )
}; };
if let Some(mut sender) = state.ws_sender.clone() { if let Some(mut sender) = state.ws_sender.clone() {
let (tx, rx) = tokio::sync::oneshot::channel::<()>(); let (tx, rx) = tokio::sync::oneshot::channel::<()>();
state.leave_room_rx = Some(rx); state.leave_room_rx = Some(rx);
let msg = OutgoingMessage::RoomRequest(RoomRequestMsg { let msg = OutgoingMessage::Message {
janus: "message".to_string(),
transaction, transaction,
session_id, session_id,
handle_id, handle_id,
apisecret, apisecret,
body: RoomRequestBody { jsep: None,
request: "leave".to_string(), body: MessageBody::Leave,
ptype: "publisher".to_string(), };
room,
id: Some(feed_id),
display,
},
});
RUNTIME.spawn(glib::clone!( RUNTIME.spawn(glib::clone!(
#[to_owned(rename_to = this)] #[to_owned(rename_to = this)]
self, self,
@ -756,22 +726,18 @@ impl Signaller {
settings.secret_key.clone(), settings.secret_key.clone(),
) )
}; };
let sdp_data = offer.sdp().as_text().unwrap(); let sdp = offer.sdp().as_text().unwrap();
self.send(OutgoingMessage::Publish(PublishMsg { self.send(OutgoingMessage::Message {
janus: "message".to_string(),
transaction, transaction,
session_id, session_id,
handle_id, handle_id,
apisecret, apisecret,
body: PublishBody { body: MessageBody::Publish,
request: "publish".to_string(), jsep: Some(Jsep::Offer {
}, sdp,
jsep: Jsep {
sdp: sdp_data,
trickle: Some(true), trickle: Some(true),
r#type: "offer".to_string(), }),
}, });
}));
} }
fn trickle(&self, candidate: &str, sdp_m_line_index: u32) { fn trickle(&self, candidate: &str, sdp_m_line_index: u32) {
@ -791,8 +757,7 @@ impl Signaller {
settings.secret_key.clone(), settings.secret_key.clone(),
) )
}; };
self.send(OutgoingMessage::Trickle(TrickleMsg { self.send(OutgoingMessage::Trickle {
janus: "trickle".to_string(),
transaction, transaction,
session_id, session_id,
handle_id, handle_id,
@ -801,7 +766,7 @@ impl Signaller {
candidate: candidate.to_string(), candidate: candidate.to_string(),
sdp_m_line_index, sdp_m_line_index,
}, },
})); });
} }
fn session_requested(&self) { fn session_requested(&self) {