net/webrtc: add support for answering to webrtcsink

Support was added to the base class when the AWS KVS signaller was
implemented, but the default signaller still only supported the case
where the producer was creating the offer.

Also extend the javascript API

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1702>
This commit is contained in:
Mathieu Duponchelle 2024-08-08 16:59:10 +02:00
parent a9ff9615ff
commit 9080c90120
7 changed files with 250 additions and 80 deletions

View file

@ -82,9 +82,8 @@
outline: none;
}
div.video {
div.video, div.offer-options {
position: relative;
display: inline-block;
margin: 1em;
}
@ -309,7 +308,11 @@
if (!document.getElementById(producerId)) {
remoteStreamsElement.insertAdjacentHTML("beforeend",
`<li id="${producerId}">
<div class="button">${producer.meta.name || producerId}</div>
<div class="button">${producer.meta.name || producerId}
</div>
<div class="offer-options">
<textarea rows="5" cols="50" placeholder="offer options, empty to answer. For example:\n{\n &quot;offerToReceiveAudio&quot;: 1\n &quot;offerToReceiveVideo&quot;: 1\n}\n"></textarea>
</div>
<div class="video">
<div class="spinner">
<div></div>
@ -325,6 +328,7 @@
const entryElement = document.getElementById(producerId);
const videoElement = entryElement.getElementsByTagName("video")[0];
const textareaElement = entryElement.getElementsByTagName("textarea")[0];
videoElement.addEventListener("playing", () => {
if (entryElement.classList.contains("has-session")) {
@ -341,7 +345,19 @@
if (entryElement._consumerSession) {
entryElement._consumerSession.close();
} else {
const session = api.createConsumerSession(producerId);
let session = null;
if (textareaElement.value == '') {
session = api.createConsumerSession(producerId);
} else {
try {
let offerOptions = JSON.parse(textareaElement.value);
console.log("Offer options: ", offerOptions);
session = api.createConsumerSessionWithOfferOptions(producerId, offerOptions);
} catch (ex) {
console.error("Failed to parse offer options:", ex);
return;
}
}
if (session) {
entryElement._consumerSession = session;

View file

@ -253,11 +253,15 @@ export default class ComChannel extends EventTarget {
return session;
}
createConsumerSession(producerId) {
createConsumerSession(producerId, offerOptions) {
if (!this._ready || !producerId || (typeof (producerId) !== "string")) {
return null;
}
if (offerOptions && (typeof(offerOptions) !== "object")) {
offerOptions = undefined;
}
if (producerId in this._consumerSessions) {
return this._consumerSessions[producerId];
}
@ -268,7 +272,7 @@ export default class ComChannel extends EventTarget {
}
}
const session = new ConsumerSession(producerId, this);
const session = new ConsumerSession(producerId, this, offerOptions);
this._consumerSessions[producerId] = session;
session.addEventListener("closed", (event) => {

View file

@ -39,13 +39,17 @@ import RemoteController from "./remote-controller.js";
* @fires {@link GstWebRTCAPI#event:RemoteControllerChangedEvent}
*/
export default class ConsumerSession extends WebRTCSession {
constructor(peerId, comChannel) {
constructor(peerId, comChannel, offerOptions) {
super(peerId, comChannel);
this._streams = [];
this._remoteController = null;
this._pendingCandidates = [];
this._offerOptions = offerOptions;
this.addEventListener("closed", () => {
this._streams = [];
this._pendingCandidates = [];
if (this._remoteController) {
this._remoteController.close();
@ -93,36 +97,79 @@ export default class ConsumerSession extends WebRTCSession {
return true;
}
const msg = {
type: "startSession",
peerId: this._peerId
};
if (!this._comChannel.send(msg)) {
this.dispatchEvent(new ErrorEvent("error", {
message: "cannot connect consumer session",
error: new Error("cannot send startSession message to signaling server")
}));
if (this._offerOptions) {
this.ensurePeerConnection();
this.close();
return false;
this._rtcPeerConnection.createOffer(this._offerOptions).then((desc) => {
if (this._rtcPeerConnection && desc) {
return this._rtcPeerConnection.setLocalDescription(desc);
} else {
throw new Error("cannot send local offer to WebRTC peer");
}
}).then(() => {
if (this._rtcPeerConnection && this._comChannel) {
const msg = {
type: "startSession",
peerId: this._peerId,
offer: this._rtcPeerConnection.localDescription.toJSON().sdp
};
if (!this._comChannel.send(msg)) {
throw new Error("cannot send startSession message to signaling server");
}
this._state = SessionState.connecting;
this.dispatchEvent(new Event("stateChanged"));
}
}).catch((ex) => {
if (this._state !== SessionState.closed) {
this.dispatchEvent(new ErrorEvent("error", {
message: "an unrecoverable error occurred during SDP handshake",
error: ex
}));
this.close();
}
});
} else {
const msg = {
type: "startSession",
peerId: this._peerId
};
if (!this._comChannel.send(msg)) {
this.dispatchEvent(new ErrorEvent("error", {
message: "cannot connect consumer session",
error: new Error("cannot send startSession message to signaling server")
}));
this.close();
return false;
}
this._state = SessionState.connecting;
this.dispatchEvent(new Event("stateChanged"));
}
this._state = SessionState.connecting;
this.dispatchEvent(new Event("stateChanged"));
return true;
}
onSessionStarted(peerId, sessionId) {
if ((this._peerId === peerId) && (this._state === SessionState.connecting) && !this._sessionId) {
console.log("Session started", this._sessionId);
this._sessionId = sessionId;
for (const candidate of this._pendingCandidates) {
console.log("Sending delayed ICE with session id", this._sessionId);
this._comChannel.send({
type: "peer",
sessionId: this._sessionId,
ice: candidate.toJSON()
});
}
this._pendingCandidates = [];
}
}
onSessionPeerMessage(msg) {
if ((this._state === SessionState.closed) || !this._comChannel || !this._sessionId) {
return;
}
ensurePeerConnection() {
if (!this._rtcPeerConnection) {
const connection = new RTCPeerConnection(this._comChannel.webrtcConfig);
this._rtcPeerConnection = connection;
@ -172,51 +219,80 @@ export default class ConsumerSession extends WebRTCSession {
connection.onicecandidate = (event) => {
if ((this._rtcPeerConnection === connection) && event.candidate && this._comChannel) {
this._comChannel.send({
type: "peer",
sessionId: this._sessionId,
ice: event.candidate.toJSON()
});
if (this._sessionId) {
console.log("Sending ICE with session id", this._sessionId);
this._comChannel.send({
type: "peer",
sessionId: this._sessionId,
ice: event.candidate.toJSON()
});
} else {
this._pendingCandidates.push(event.candidate);
}
}
};
this.dispatchEvent(new Event("rtcPeerConnectionChanged"));
}
}
onSessionPeerMessage(msg) {
if ((this._state === SessionState.closed) || !this._comChannel || !this._sessionId) {
return;
}
this.ensurePeerConnection();
if (msg.sdp) {
this._rtcPeerConnection.setRemoteDescription(msg.sdp).then(() => {
if (this._rtcPeerConnection) {
return this._rtcPeerConnection.createAnswer();
} else {
return null;
}
}).then((desc) => {
if (this._rtcPeerConnection && desc) {
return this._rtcPeerConnection.setLocalDescription(desc);
} else {
return null;
}
}).then(() => {
if (this._rtcPeerConnection && this._comChannel) {
const sdp = {
type: "peer",
sessionId: this._sessionId,
sdp: this._rtcPeerConnection.localDescription.toJSON()
};
if (!this._comChannel.send(sdp)) {
throw new Error("cannot send local SDP configuration to WebRTC peer");
}
}
}).catch((ex) => {
if (this._state !== SessionState.closed) {
this.dispatchEvent(new ErrorEvent("error", {
message: "an unrecoverable error occurred during SDP handshake",
error: ex
}));
if (this._offerOptions) {
this._rtcPeerConnection.setRemoteDescription(msg.sdp).then(() => {
console.log("done");
}).catch((ex) => {
if (this._state !== SessionState.closed) {
this.dispatchEvent(new ErrorEvent("error", {
message: "an unrecoverable error occurred during SDP handshake",
error: ex
}));
this.close();
}
});
this.close();
}
});
} else {
this._rtcPeerConnection.setRemoteDescription(msg.sdp).then(() => {
if (this._rtcPeerConnection) {
return this._rtcPeerConnection.createAnswer();
} else {
return null;
}
}).then((desc) => {
if (this._rtcPeerConnection && desc) {
return this._rtcPeerConnection.setLocalDescription(desc);
} else {
return null;
}
}).then(() => {
if (this._rtcPeerConnection && this._comChannel) {
console.log("Sending SDP with session id", this._sessionId);
const sdp = {
type: "peer",
sessionId: this._sessionId,
sdp: this._rtcPeerConnection.localDescription.toJSON()
};
if (!this._comChannel.send(sdp)) {
throw new Error("cannot send local SDP configuration to WebRTC peer");
}
}
}).catch((ex) => {
if (this._state !== SessionState.closed) {
this.dispatchEvent(new ErrorEvent("error", {
message: "an unrecoverable error occurred during SDP handshake",
error: ex
}));
this.close();
}
});
}
} else if (msg.ice) {
const candidate = new RTCIceCandidate(msg.ice);
this._rtcPeerConnection.addIceCandidate(candidate).catch((ex) => {

View file

@ -235,6 +235,24 @@ export default class GstWebRTCAPI {
return null;
}
/**
* Creates a consumer session by connecting the local client to a remote WebRTC producer and creating the offer.
* <p>See {@link GstWebRTCAPI#createConsumerSession} for more information</p>
* @method GstWebRTCAPI#createConsumerSessionWithOfferOptions
* @param {string} producerId - The unique identifier of the remote producer to connect to.
* @param {external:RTCOfferOptions} offerOptions - An object to use when creating the offer.
* @returns {GstWebRTCAPI.ConsumerSession} The WebRTC session between the selected remote producer and this local
* consumer, or null in case of error. To start connecting and receiving the remote streams, you still need to call
* {@link GstWebRTCAPI.ConsumerSession#connect} after adding on the returned session all the event listeners you may
* need.
*/
createConsumerSessionWithOfferOptions(producerId, offerOptions) {
if (this._channel) {
return this._channel.createConsumerSession(producerId, offerOptions);
}
return null;
}
connectChannel() {
if (this._channel) {
const oldChannel = this._channel;

View file

@ -21,9 +21,13 @@ pub enum OutgoingMessage {
Welcome { peer_id: String },
/// Notifies listeners that a peer status has changed
PeerStatusChanged(PeerStatus),
/// Instructs a peer to generate an offer and inform about the session ID
/// Instructs a peer to generate an offer or an answer and inform about the session ID
#[serde(rename_all = "camelCase")]
StartSession { peer_id: String, session_id: String },
StartSession {
peer_id: String,
session_id: String,
offer: Option<String>,
},
/// Let consumer know that the requested session is starting with the specified identifier
#[serde(rename_all = "camelCase")]
SessionStarted { peer_id: String, session_id: String },
@ -75,6 +79,8 @@ impl PeerStatus {
pub struct StartSessionMessage {
/// Identifies the peer
pub peer_id: String,
/// An offer if the consumer peer wants the producer to answer
pub offer: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]

View file

@ -83,7 +83,7 @@ impl Handler {
}
p::IncomingMessage::SetPeerStatus(status) => self.set_peer_status(peer_id, &status),
p::IncomingMessage::StartSession(message) => {
self.start_session(&message.peer_id, peer_id)
self.start_session(&message.peer_id, peer_id, message.offer.as_deref())
}
p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg),
p::IncomingMessage::List => self.list_producers(peer_id),
@ -262,7 +262,12 @@ impl Handler {
/// Start a session between two peers
#[instrument(level = "debug", skip(self))]
fn start_session(&mut self, producer_id: &str, consumer_id: &str) -> Result<(), Error> {
fn start_session(
&mut self,
producer_id: &str,
consumer_id: &str,
offer: Option<&str>,
) -> Result<(), Error> {
self.peers.get(producer_id).map_or_else(
|| Err(anyhow!("No producer with ID: '{producer_id}'")),
|peer| {
@ -310,6 +315,7 @@ impl Handler {
p::OutgoingMessage::StartSession {
peer_id: consumer_id.to_string(),
session_id: session_id.clone(),
offer: offer.map(String::from),
},
));
@ -510,6 +516,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -535,6 +542,7 @@ mod tests {
p::OutgoingMessage::StartSession {
peer_id: "consumer".to_string(),
session_id: session_id.to_string(),
offer: None,
}
);
}
@ -559,6 +567,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -582,7 +591,8 @@ mod tests {
"producer".into(),
p::OutgoingMessage::StartSession {
peer_id: "consumer".into(),
session_id: session_id.clone()
session_id: session_id.clone(),
offer: None
}
)
);
@ -641,6 +651,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -696,6 +707,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -745,6 +757,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -799,6 +812,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -873,6 +887,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -935,6 +950,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -1023,6 +1039,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -1072,6 +1089,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -1107,6 +1125,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -1132,6 +1151,7 @@ mod tests {
p::OutgoingMessage::StartSession {
peer_id: "consumer".to_string(),
session_id: session_id.clone(),
offer: None,
}
);
@ -1196,6 +1216,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -1222,6 +1243,7 @@ mod tests {
p::OutgoingMessage::StartSession {
peer_id: "consumer".to_string(),
session_id: session_id.clone(),
offer: None,
}
);
@ -1274,6 +1296,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -1308,6 +1331,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
.await
@ -1329,6 +1353,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("consumer".to_string(), Some(message)))
@ -1378,6 +1403,7 @@ mod tests {
let message = p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: "producer".to_string(),
offer: None,
});
tx.send(("producer-consumer".to_string(), Some(message)))
.await

View file

@ -314,6 +314,7 @@ impl Signaller {
self.send(p::IncomingMessage::StartSession(p::StartSessionMessage {
peer_id: target_producer.clone(),
offer: None,
}));
gst::info!(
@ -382,18 +383,42 @@ impl Signaller {
p::OutgoingMessage::StartSession {
session_id,
peer_id,
offer,
} => {
assert!(matches!(
self.obj().property::<WebRTCSignallerRole>("role"),
super::WebRTCSignallerRole::Producer
));
let sdp = {
if let Some(offer) = offer {
match gst_sdp::SDPMessage::parse_buffer(offer.as_bytes()) {
Ok(sdp) => Some(sdp),
Err(err) => {
self.obj().emit_by_name::<()>(
"error",
&[&format!("Error parsing SDP: {offer} {err:?}")],
);
return ControlFlow::Break(());
}
}
} else {
None
}
};
self.obj().emit_by_name::<()>(
"session-requested",
&[
&session_id,
&peer_id,
&None::<gst_webrtc::WebRTCSessionDescription>,
&sdp.map(|sdp| {
gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
sdp,
)
}),
],
);
}
@ -700,20 +725,19 @@ impl SignallableImpl for Signaller {
fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {
gst::debug!(CAT, imp = self, "Sending SDP {sdp:#?}");
let role = self.settings.lock().unwrap().role;
let is_consumer = matches!(role, super::WebRTCSignallerRole::Consumer);
let msg = p::IncomingMessage::Peer(p::PeerMessage {
session_id: session_id.to_owned(),
peer_message: p::PeerMessageInner::Sdp(if is_consumer {
p::SdpMessage::Answer {
sdp: sdp.sdp().as_text().unwrap(),
}
} else {
p::SdpMessage::Offer {
sdp: sdp.sdp().as_text().unwrap(),
}
}),
peer_message: p::PeerMessageInner::Sdp(
if sdp.type_() == gst_webrtc::WebRTCSDPType::Offer {
p::SdpMessage::Offer {
sdp: sdp.sdp().as_text().unwrap(),
}
} else {
p::SdpMessage::Answer {
sdp: sdp.sdp().as_text().unwrap(),
}
},
),
});
self.send(msg);