From 9080c90120e51f79e7d3039158a21b1acf5008bb Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Thu, 8 Aug 2024 16:59:10 +0200 Subject: [PATCH] net/webrtc: add support for answering to webrtcsink Support was added to the base class when the AWS KVS signaller was implemented, but the default signaller still only supported the case where the producer was creating the offer. Also extend the javascript API Part-of: --- net/webrtc/gstwebrtc-api/index.html | 24 ++- net/webrtc/gstwebrtc-api/src/com-channel.js | 8 +- .../gstwebrtc-api/src/consumer-session.js | 188 ++++++++++++------ net/webrtc/gstwebrtc-api/src/gstwebrtc-api.js | 18 ++ net/webrtc/protocol/src/lib.rs | 10 +- net/webrtc/signalling/src/handlers/mod.rs | 32 ++- net/webrtc/src/signaller/imp.rs | 50 +++-- 7 files changed, 250 insertions(+), 80 deletions(-) diff --git a/net/webrtc/gstwebrtc-api/index.html b/net/webrtc/gstwebrtc-api/index.html index 182336fd..83bf79e6 100644 --- a/net/webrtc/gstwebrtc-api/index.html +++ b/net/webrtc/gstwebrtc-api/index.html @@ -82,9 +82,8 @@ outline: none; } - div.video { + div.video, div.offer-options { position: relative; - display: inline-block; margin: 1em; } @@ -309,7 +308,11 @@ if (!document.getElementById(producerId)) { remoteStreamsElement.insertAdjacentHTML("beforeend", `
  • -
    ${producer.meta.name || producerId}
    +
    ${producer.meta.name || producerId} +
    +
    + +
    @@ -325,6 +328,7 @@ const entryElement = document.getElementById(producerId); const videoElement = entryElement.getElementsByTagName("video")[0]; + const textareaElement = entryElement.getElementsByTagName("textarea")[0]; videoElement.addEventListener("playing", () => { if (entryElement.classList.contains("has-session")) { @@ -341,7 +345,19 @@ if (entryElement._consumerSession) { entryElement._consumerSession.close(); } else { - const session = api.createConsumerSession(producerId); + let session = null; + if (textareaElement.value == '') { + session = api.createConsumerSession(producerId); + } else { + try { + let offerOptions = JSON.parse(textareaElement.value); + console.log("Offer options: ", offerOptions); + session = api.createConsumerSessionWithOfferOptions(producerId, offerOptions); + } catch (ex) { + console.error("Failed to parse offer options:", ex); + return; + } + } if (session) { entryElement._consumerSession = session; diff --git a/net/webrtc/gstwebrtc-api/src/com-channel.js b/net/webrtc/gstwebrtc-api/src/com-channel.js index 195247ce..e9b9fa54 100644 --- a/net/webrtc/gstwebrtc-api/src/com-channel.js +++ b/net/webrtc/gstwebrtc-api/src/com-channel.js @@ -253,11 +253,15 @@ export default class ComChannel extends EventTarget { return session; } - createConsumerSession(producerId) { + createConsumerSession(producerId, offerOptions) { if (!this._ready || !producerId || (typeof (producerId) !== "string")) { return null; } + if (offerOptions && (typeof(offerOptions) !== "object")) { + offerOptions = undefined; + } + if (producerId in this._consumerSessions) { return this._consumerSessions[producerId]; } @@ -268,7 +272,7 @@ export default class ComChannel extends EventTarget { } } - const session = new ConsumerSession(producerId, this); + const session = new ConsumerSession(producerId, this, offerOptions); this._consumerSessions[producerId] = session; session.addEventListener("closed", (event) => { diff --git a/net/webrtc/gstwebrtc-api/src/consumer-session.js b/net/webrtc/gstwebrtc-api/src/consumer-session.js index 713e57c1..b4ddd35d 100644 --- a/net/webrtc/gstwebrtc-api/src/consumer-session.js +++ b/net/webrtc/gstwebrtc-api/src/consumer-session.js @@ -39,13 +39,17 @@ import RemoteController from "./remote-controller.js"; * @fires {@link GstWebRTCAPI#event:RemoteControllerChangedEvent} */ export default class ConsumerSession extends WebRTCSession { - constructor(peerId, comChannel) { + constructor(peerId, comChannel, offerOptions) { super(peerId, comChannel); this._streams = []; this._remoteController = null; + this._pendingCandidates = []; + + this._offerOptions = offerOptions; this.addEventListener("closed", () => { this._streams = []; + this._pendingCandidates = []; if (this._remoteController) { this._remoteController.close(); @@ -93,36 +97,79 @@ export default class ConsumerSession extends WebRTCSession { return true; } - const msg = { - type: "startSession", - peerId: this._peerId - }; - if (!this._comChannel.send(msg)) { - this.dispatchEvent(new ErrorEvent("error", { - message: "cannot connect consumer session", - error: new Error("cannot send startSession message to signaling server") - })); + if (this._offerOptions) { + this.ensurePeerConnection(); - this.close(); - return false; + this._rtcPeerConnection.createOffer(this._offerOptions).then((desc) => { + if (this._rtcPeerConnection && desc) { + return this._rtcPeerConnection.setLocalDescription(desc); + } else { + throw new Error("cannot send local offer to WebRTC peer"); + } + }).then(() => { + if (this._rtcPeerConnection && this._comChannel) { + const msg = { + type: "startSession", + peerId: this._peerId, + offer: this._rtcPeerConnection.localDescription.toJSON().sdp + }; + if (!this._comChannel.send(msg)) { + throw new Error("cannot send startSession message to signaling server"); + } + this._state = SessionState.connecting; + this.dispatchEvent(new Event("stateChanged")); + } + }).catch((ex) => { + if (this._state !== SessionState.closed) { + this.dispatchEvent(new ErrorEvent("error", { + message: "an unrecoverable error occurred during SDP handshake", + error: ex + })); + + this.close(); + } + }); + } else { + const msg = { + type: "startSession", + peerId: this._peerId + }; + if (!this._comChannel.send(msg)) { + this.dispatchEvent(new ErrorEvent("error", { + message: "cannot connect consumer session", + error: new Error("cannot send startSession message to signaling server") + })); + + this.close(); + return false; + } + + this._state = SessionState.connecting; + this.dispatchEvent(new Event("stateChanged")); } - this._state = SessionState.connecting; - this.dispatchEvent(new Event("stateChanged")); return true; } onSessionStarted(peerId, sessionId) { if ((this._peerId === peerId) && (this._state === SessionState.connecting) && !this._sessionId) { + console.log("Session started", this._sessionId); this._sessionId = sessionId; + + for (const candidate of this._pendingCandidates) { + console.log("Sending delayed ICE with session id", this._sessionId); + this._comChannel.send({ + type: "peer", + sessionId: this._sessionId, + ice: candidate.toJSON() + }); + } + + this._pendingCandidates = []; } } - onSessionPeerMessage(msg) { - if ((this._state === SessionState.closed) || !this._comChannel || !this._sessionId) { - return; - } - + ensurePeerConnection() { if (!this._rtcPeerConnection) { const connection = new RTCPeerConnection(this._comChannel.webrtcConfig); this._rtcPeerConnection = connection; @@ -172,51 +219,80 @@ export default class ConsumerSession extends WebRTCSession { connection.onicecandidate = (event) => { if ((this._rtcPeerConnection === connection) && event.candidate && this._comChannel) { - this._comChannel.send({ - type: "peer", - sessionId: this._sessionId, - ice: event.candidate.toJSON() - }); + if (this._sessionId) { + console.log("Sending ICE with session id", this._sessionId); + this._comChannel.send({ + type: "peer", + sessionId: this._sessionId, + ice: event.candidate.toJSON() + }); + } else { + this._pendingCandidates.push(event.candidate); + } } }; this.dispatchEvent(new Event("rtcPeerConnectionChanged")); } + } + + onSessionPeerMessage(msg) { + if ((this._state === SessionState.closed) || !this._comChannel || !this._sessionId) { + return; + } + + this.ensurePeerConnection(); if (msg.sdp) { - this._rtcPeerConnection.setRemoteDescription(msg.sdp).then(() => { - if (this._rtcPeerConnection) { - return this._rtcPeerConnection.createAnswer(); - } else { - return null; - } - }).then((desc) => { - if (this._rtcPeerConnection && desc) { - return this._rtcPeerConnection.setLocalDescription(desc); - } else { - return null; - } - }).then(() => { - if (this._rtcPeerConnection && this._comChannel) { - const sdp = { - type: "peer", - sessionId: this._sessionId, - sdp: this._rtcPeerConnection.localDescription.toJSON() - }; - if (!this._comChannel.send(sdp)) { - throw new Error("cannot send local SDP configuration to WebRTC peer"); - } - } - }).catch((ex) => { - if (this._state !== SessionState.closed) { - this.dispatchEvent(new ErrorEvent("error", { - message: "an unrecoverable error occurred during SDP handshake", - error: ex - })); + if (this._offerOptions) { + this._rtcPeerConnection.setRemoteDescription(msg.sdp).then(() => { + console.log("done"); + }).catch((ex) => { + if (this._state !== SessionState.closed) { + this.dispatchEvent(new ErrorEvent("error", { + message: "an unrecoverable error occurred during SDP handshake", + error: ex + })); - this.close(); - } - }); + this.close(); + } + }); + } else { + this._rtcPeerConnection.setRemoteDescription(msg.sdp).then(() => { + if (this._rtcPeerConnection) { + return this._rtcPeerConnection.createAnswer(); + } else { + return null; + } + }).then((desc) => { + if (this._rtcPeerConnection && desc) { + return this._rtcPeerConnection.setLocalDescription(desc); + } else { + return null; + } + }).then(() => { + if (this._rtcPeerConnection && this._comChannel) { + console.log("Sending SDP with session id", this._sessionId); + const sdp = { + type: "peer", + sessionId: this._sessionId, + sdp: this._rtcPeerConnection.localDescription.toJSON() + }; + if (!this._comChannel.send(sdp)) { + throw new Error("cannot send local SDP configuration to WebRTC peer"); + } + } + }).catch((ex) => { + if (this._state !== SessionState.closed) { + this.dispatchEvent(new ErrorEvent("error", { + message: "an unrecoverable error occurred during SDP handshake", + error: ex + })); + + this.close(); + } + }); + } } else if (msg.ice) { const candidate = new RTCIceCandidate(msg.ice); this._rtcPeerConnection.addIceCandidate(candidate).catch((ex) => { diff --git a/net/webrtc/gstwebrtc-api/src/gstwebrtc-api.js b/net/webrtc/gstwebrtc-api/src/gstwebrtc-api.js index 19c7abb3..ad66eaf8 100644 --- a/net/webrtc/gstwebrtc-api/src/gstwebrtc-api.js +++ b/net/webrtc/gstwebrtc-api/src/gstwebrtc-api.js @@ -235,6 +235,24 @@ export default class GstWebRTCAPI { return null; } + /** + * Creates a consumer session by connecting the local client to a remote WebRTC producer and creating the offer. + *

    See {@link GstWebRTCAPI#createConsumerSession} for more information

    + * @method GstWebRTCAPI#createConsumerSessionWithOfferOptions + * @param {string} producerId - The unique identifier of the remote producer to connect to. + * @param {external:RTCOfferOptions} offerOptions - An object to use when creating the offer. + * @returns {GstWebRTCAPI.ConsumerSession} The WebRTC session between the selected remote producer and this local + * consumer, or null in case of error. To start connecting and receiving the remote streams, you still need to call + * {@link GstWebRTCAPI.ConsumerSession#connect} after adding on the returned session all the event listeners you may + * need. + */ + createConsumerSessionWithOfferOptions(producerId, offerOptions) { + if (this._channel) { + return this._channel.createConsumerSession(producerId, offerOptions); + } + return null; + } + connectChannel() { if (this._channel) { const oldChannel = this._channel; diff --git a/net/webrtc/protocol/src/lib.rs b/net/webrtc/protocol/src/lib.rs index 7d565c25..2e668d14 100644 --- a/net/webrtc/protocol/src/lib.rs +++ b/net/webrtc/protocol/src/lib.rs @@ -21,9 +21,13 @@ pub enum OutgoingMessage { Welcome { peer_id: String }, /// Notifies listeners that a peer status has changed PeerStatusChanged(PeerStatus), - /// Instructs a peer to generate an offer and inform about the session ID + /// Instructs a peer to generate an offer or an answer and inform about the session ID #[serde(rename_all = "camelCase")] - StartSession { peer_id: String, session_id: String }, + StartSession { + peer_id: String, + session_id: String, + offer: Option, + }, /// Let consumer know that the requested session is starting with the specified identifier #[serde(rename_all = "camelCase")] SessionStarted { peer_id: String, session_id: String }, @@ -75,6 +79,8 @@ impl PeerStatus { pub struct StartSessionMessage { /// Identifies the peer pub peer_id: String, + /// An offer if the consumer peer wants the producer to answer + pub offer: Option, } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] diff --git a/net/webrtc/signalling/src/handlers/mod.rs b/net/webrtc/signalling/src/handlers/mod.rs index b287140e..7e3f211b 100644 --- a/net/webrtc/signalling/src/handlers/mod.rs +++ b/net/webrtc/signalling/src/handlers/mod.rs @@ -83,7 +83,7 @@ impl Handler { } p::IncomingMessage::SetPeerStatus(status) => self.set_peer_status(peer_id, &status), p::IncomingMessage::StartSession(message) => { - self.start_session(&message.peer_id, peer_id) + self.start_session(&message.peer_id, peer_id, message.offer.as_deref()) } p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg), p::IncomingMessage::List => self.list_producers(peer_id), @@ -262,7 +262,12 @@ impl Handler { /// Start a session between two peers #[instrument(level = "debug", skip(self))] - fn start_session(&mut self, producer_id: &str, consumer_id: &str) -> Result<(), Error> { + fn start_session( + &mut self, + producer_id: &str, + consumer_id: &str, + offer: Option<&str>, + ) -> Result<(), Error> { self.peers.get(producer_id).map_or_else( || Err(anyhow!("No producer with ID: '{producer_id}'")), |peer| { @@ -310,6 +315,7 @@ impl Handler { p::OutgoingMessage::StartSession { peer_id: consumer_id.to_string(), session_id: session_id.clone(), + offer: offer.map(String::from), }, )); @@ -510,6 +516,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -535,6 +542,7 @@ mod tests { p::OutgoingMessage::StartSession { peer_id: "consumer".to_string(), session_id: session_id.to_string(), + offer: None, } ); } @@ -559,6 +567,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -582,7 +591,8 @@ mod tests { "producer".into(), p::OutgoingMessage::StartSession { peer_id: "consumer".into(), - session_id: session_id.clone() + session_id: session_id.clone(), + offer: None } ) ); @@ -641,6 +651,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -696,6 +707,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -745,6 +757,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -799,6 +812,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -873,6 +887,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -935,6 +950,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1023,6 +1039,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1072,6 +1089,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1107,6 +1125,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1132,6 +1151,7 @@ mod tests { p::OutgoingMessage::StartSession { peer_id: "consumer".to_string(), session_id: session_id.clone(), + offer: None, } ); @@ -1196,6 +1216,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1222,6 +1243,7 @@ mod tests { p::OutgoingMessage::StartSession { peer_id: "consumer".to_string(), session_id: session_id.clone(), + offer: None, } ); @@ -1274,6 +1296,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1308,6 +1331,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1329,6 +1353,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) @@ -1378,6 +1403,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("producer-consumer".to_string(), Some(message))) .await diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index b898579c..8eec563c 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -314,6 +314,7 @@ impl Signaller { self.send(p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: target_producer.clone(), + offer: None, })); gst::info!( @@ -382,18 +383,42 @@ impl Signaller { p::OutgoingMessage::StartSession { session_id, peer_id, + offer, } => { assert!(matches!( self.obj().property::("role"), super::WebRTCSignallerRole::Producer )); + let sdp = { + if let Some(offer) = offer { + match gst_sdp::SDPMessage::parse_buffer(offer.as_bytes()) { + Ok(sdp) => Some(sdp), + Err(err) => { + self.obj().emit_by_name::<()>( + "error", + &[&format!("Error parsing SDP: {offer} {err:?}")], + ); + + return ControlFlow::Break(()); + } + } + } else { + None + } + }; + self.obj().emit_by_name::<()>( "session-requested", &[ &session_id, &peer_id, - &None::, + &sdp.map(|sdp| { + gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Offer, + sdp, + ) + }), ], ); } @@ -700,20 +725,19 @@ impl SignallableImpl for Signaller { fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { gst::debug!(CAT, imp = self, "Sending SDP {sdp:#?}"); - let role = self.settings.lock().unwrap().role; - let is_consumer = matches!(role, super::WebRTCSignallerRole::Consumer); - let msg = p::IncomingMessage::Peer(p::PeerMessage { session_id: session_id.to_owned(), - peer_message: p::PeerMessageInner::Sdp(if is_consumer { - p::SdpMessage::Answer { - sdp: sdp.sdp().as_text().unwrap(), - } - } else { - p::SdpMessage::Offer { - sdp: sdp.sdp().as_text().unwrap(), - } - }), + peer_message: p::PeerMessageInner::Sdp( + if sdp.type_() == gst_webrtc::WebRTCSDPType::Offer { + p::SdpMessage::Offer { + sdp: sdp.sdp().as_text().unwrap(), + } + } else { + p::SdpMessage::Answer { + sdp: sdp.sdp().as_text().unwrap(), + } + }, + ), }); self.send(msg);