diff --git a/net/webrtc/gstwebrtc-api/index.html b/net/webrtc/gstwebrtc-api/index.html index 182336fd..83bf79e6 100644 --- a/net/webrtc/gstwebrtc-api/index.html +++ b/net/webrtc/gstwebrtc-api/index.html @@ -82,9 +82,8 @@ outline: none; } - div.video { + div.video, div.offer-options { position: relative; - display: inline-block; margin: 1em; } @@ -309,7 +308,11 @@ if (!document.getElementById(producerId)) { remoteStreamsElement.insertAdjacentHTML("beforeend", `
  • -
    ${producer.meta.name || producerId}
    +
    ${producer.meta.name || producerId} +
    +
    + +
    @@ -325,6 +328,7 @@ const entryElement = document.getElementById(producerId); const videoElement = entryElement.getElementsByTagName("video")[0]; + const textareaElement = entryElement.getElementsByTagName("textarea")[0]; videoElement.addEventListener("playing", () => { if (entryElement.classList.contains("has-session")) { @@ -341,7 +345,19 @@ if (entryElement._consumerSession) { entryElement._consumerSession.close(); } else { - const session = api.createConsumerSession(producerId); + let session = null; + if (textareaElement.value == '') { + session = api.createConsumerSession(producerId); + } else { + try { + let offerOptions = JSON.parse(textareaElement.value); + console.log("Offer options: ", offerOptions); + session = api.createConsumerSessionWithOfferOptions(producerId, offerOptions); + } catch (ex) { + console.error("Failed to parse offer options:", ex); + return; + } + } if (session) { entryElement._consumerSession = session; diff --git a/net/webrtc/gstwebrtc-api/src/com-channel.js b/net/webrtc/gstwebrtc-api/src/com-channel.js index 195247ce..e9b9fa54 100644 --- a/net/webrtc/gstwebrtc-api/src/com-channel.js +++ b/net/webrtc/gstwebrtc-api/src/com-channel.js @@ -253,11 +253,15 @@ export default class ComChannel extends EventTarget { return session; } - createConsumerSession(producerId) { + createConsumerSession(producerId, offerOptions) { if (!this._ready || !producerId || (typeof (producerId) !== "string")) { return null; } + if (offerOptions && (typeof(offerOptions) !== "object")) { + offerOptions = undefined; + } + if (producerId in this._consumerSessions) { return this._consumerSessions[producerId]; } @@ -268,7 +272,7 @@ export default class ComChannel extends EventTarget { } } - const session = new ConsumerSession(producerId, this); + const session = new ConsumerSession(producerId, this, offerOptions); this._consumerSessions[producerId] = session; session.addEventListener("closed", (event) => { diff --git a/net/webrtc/gstwebrtc-api/src/consumer-session.js b/net/webrtc/gstwebrtc-api/src/consumer-session.js index 713e57c1..b4ddd35d 100644 --- a/net/webrtc/gstwebrtc-api/src/consumer-session.js +++ b/net/webrtc/gstwebrtc-api/src/consumer-session.js @@ -39,13 +39,17 @@ import RemoteController from "./remote-controller.js"; * @fires {@link GstWebRTCAPI#event:RemoteControllerChangedEvent} */ export default class ConsumerSession extends WebRTCSession { - constructor(peerId, comChannel) { + constructor(peerId, comChannel, offerOptions) { super(peerId, comChannel); this._streams = []; this._remoteController = null; + this._pendingCandidates = []; + + this._offerOptions = offerOptions; this.addEventListener("closed", () => { this._streams = []; + this._pendingCandidates = []; if (this._remoteController) { this._remoteController.close(); @@ -93,36 +97,79 @@ export default class ConsumerSession extends WebRTCSession { return true; } - const msg = { - type: "startSession", - peerId: this._peerId - }; - if (!this._comChannel.send(msg)) { - this.dispatchEvent(new ErrorEvent("error", { - message: "cannot connect consumer session", - error: new Error("cannot send startSession message to signaling server") - })); + if (this._offerOptions) { + this.ensurePeerConnection(); - this.close(); - return false; + this._rtcPeerConnection.createOffer(this._offerOptions).then((desc) => { + if (this._rtcPeerConnection && desc) { + return this._rtcPeerConnection.setLocalDescription(desc); + } else { + throw new Error("cannot send local offer to WebRTC peer"); + } + }).then(() => { + if (this._rtcPeerConnection && this._comChannel) { + const msg = { + type: "startSession", + peerId: this._peerId, + offer: this._rtcPeerConnection.localDescription.toJSON().sdp + }; + if (!this._comChannel.send(msg)) { + throw new Error("cannot send startSession message to signaling server"); + } + this._state = SessionState.connecting; + this.dispatchEvent(new Event("stateChanged")); + } + }).catch((ex) => { + if (this._state !== SessionState.closed) { + this.dispatchEvent(new ErrorEvent("error", { + message: "an unrecoverable error occurred during SDP handshake", + error: ex + })); + + this.close(); + } + }); + } else { + const msg = { + type: "startSession", + peerId: this._peerId + }; + if (!this._comChannel.send(msg)) { + this.dispatchEvent(new ErrorEvent("error", { + message: "cannot connect consumer session", + error: new Error("cannot send startSession message to signaling server") + })); + + this.close(); + return false; + } + + this._state = SessionState.connecting; + this.dispatchEvent(new Event("stateChanged")); } - this._state = SessionState.connecting; - this.dispatchEvent(new Event("stateChanged")); return true; } onSessionStarted(peerId, sessionId) { if ((this._peerId === peerId) && (this._state === SessionState.connecting) && !this._sessionId) { + console.log("Session started", this._sessionId); this._sessionId = sessionId; + + for (const candidate of this._pendingCandidates) { + console.log("Sending delayed ICE with session id", this._sessionId); + this._comChannel.send({ + type: "peer", + sessionId: this._sessionId, + ice: candidate.toJSON() + }); + } + + this._pendingCandidates = []; } } - onSessionPeerMessage(msg) { - if ((this._state === SessionState.closed) || !this._comChannel || !this._sessionId) { - return; - } - + ensurePeerConnection() { if (!this._rtcPeerConnection) { const connection = new RTCPeerConnection(this._comChannel.webrtcConfig); this._rtcPeerConnection = connection; @@ -172,51 +219,80 @@ export default class ConsumerSession extends WebRTCSession { connection.onicecandidate = (event) => { if ((this._rtcPeerConnection === connection) && event.candidate && this._comChannel) { - this._comChannel.send({ - type: "peer", - sessionId: this._sessionId, - ice: event.candidate.toJSON() - }); + if (this._sessionId) { + console.log("Sending ICE with session id", this._sessionId); + this._comChannel.send({ + type: "peer", + sessionId: this._sessionId, + ice: event.candidate.toJSON() + }); + } else { + this._pendingCandidates.push(event.candidate); + } } }; this.dispatchEvent(new Event("rtcPeerConnectionChanged")); } + } + + onSessionPeerMessage(msg) { + if ((this._state === SessionState.closed) || !this._comChannel || !this._sessionId) { + return; + } + + this.ensurePeerConnection(); if (msg.sdp) { - this._rtcPeerConnection.setRemoteDescription(msg.sdp).then(() => { - if (this._rtcPeerConnection) { - return this._rtcPeerConnection.createAnswer(); - } else { - return null; - } - }).then((desc) => { - if (this._rtcPeerConnection && desc) { - return this._rtcPeerConnection.setLocalDescription(desc); - } else { - return null; - } - }).then(() => { - if (this._rtcPeerConnection && this._comChannel) { - const sdp = { - type: "peer", - sessionId: this._sessionId, - sdp: this._rtcPeerConnection.localDescription.toJSON() - }; - if (!this._comChannel.send(sdp)) { - throw new Error("cannot send local SDP configuration to WebRTC peer"); - } - } - }).catch((ex) => { - if (this._state !== SessionState.closed) { - this.dispatchEvent(new ErrorEvent("error", { - message: "an unrecoverable error occurred during SDP handshake", - error: ex - })); + if (this._offerOptions) { + this._rtcPeerConnection.setRemoteDescription(msg.sdp).then(() => { + console.log("done"); + }).catch((ex) => { + if (this._state !== SessionState.closed) { + this.dispatchEvent(new ErrorEvent("error", { + message: "an unrecoverable error occurred during SDP handshake", + error: ex + })); - this.close(); - } - }); + this.close(); + } + }); + } else { + this._rtcPeerConnection.setRemoteDescription(msg.sdp).then(() => { + if (this._rtcPeerConnection) { + return this._rtcPeerConnection.createAnswer(); + } else { + return null; + } + }).then((desc) => { + if (this._rtcPeerConnection && desc) { + return this._rtcPeerConnection.setLocalDescription(desc); + } else { + return null; + } + }).then(() => { + if (this._rtcPeerConnection && this._comChannel) { + console.log("Sending SDP with session id", this._sessionId); + const sdp = { + type: "peer", + sessionId: this._sessionId, + sdp: this._rtcPeerConnection.localDescription.toJSON() + }; + if (!this._comChannel.send(sdp)) { + throw new Error("cannot send local SDP configuration to WebRTC peer"); + } + } + }).catch((ex) => { + if (this._state !== SessionState.closed) { + this.dispatchEvent(new ErrorEvent("error", { + message: "an unrecoverable error occurred during SDP handshake", + error: ex + })); + + this.close(); + } + }); + } } else if (msg.ice) { const candidate = new RTCIceCandidate(msg.ice); this._rtcPeerConnection.addIceCandidate(candidate).catch((ex) => { diff --git a/net/webrtc/gstwebrtc-api/src/gstwebrtc-api.js b/net/webrtc/gstwebrtc-api/src/gstwebrtc-api.js index 19c7abb3..ad66eaf8 100644 --- a/net/webrtc/gstwebrtc-api/src/gstwebrtc-api.js +++ b/net/webrtc/gstwebrtc-api/src/gstwebrtc-api.js @@ -235,6 +235,24 @@ export default class GstWebRTCAPI { return null; } + /** + * Creates a consumer session by connecting the local client to a remote WebRTC producer and creating the offer. + *

    See {@link GstWebRTCAPI#createConsumerSession} for more information

    + * @method GstWebRTCAPI#createConsumerSessionWithOfferOptions + * @param {string} producerId - The unique identifier of the remote producer to connect to. + * @param {external:RTCOfferOptions} offerOptions - An object to use when creating the offer. + * @returns {GstWebRTCAPI.ConsumerSession} The WebRTC session between the selected remote producer and this local + * consumer, or null in case of error. To start connecting and receiving the remote streams, you still need to call + * {@link GstWebRTCAPI.ConsumerSession#connect} after adding on the returned session all the event listeners you may + * need. + */ + createConsumerSessionWithOfferOptions(producerId, offerOptions) { + if (this._channel) { + return this._channel.createConsumerSession(producerId, offerOptions); + } + return null; + } + connectChannel() { if (this._channel) { const oldChannel = this._channel; diff --git a/net/webrtc/protocol/src/lib.rs b/net/webrtc/protocol/src/lib.rs index 7d565c25..2e668d14 100644 --- a/net/webrtc/protocol/src/lib.rs +++ b/net/webrtc/protocol/src/lib.rs @@ -21,9 +21,13 @@ pub enum OutgoingMessage { Welcome { peer_id: String }, /// Notifies listeners that a peer status has changed PeerStatusChanged(PeerStatus), - /// Instructs a peer to generate an offer and inform about the session ID + /// Instructs a peer to generate an offer or an answer and inform about the session ID #[serde(rename_all = "camelCase")] - StartSession { peer_id: String, session_id: String }, + StartSession { + peer_id: String, + session_id: String, + offer: Option, + }, /// Let consumer know that the requested session is starting with the specified identifier #[serde(rename_all = "camelCase")] SessionStarted { peer_id: String, session_id: String }, @@ -75,6 +79,8 @@ impl PeerStatus { pub struct StartSessionMessage { /// Identifies the peer pub peer_id: String, + /// An offer if the consumer peer wants the producer to answer + pub offer: Option, } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] diff --git a/net/webrtc/signalling/src/handlers/mod.rs b/net/webrtc/signalling/src/handlers/mod.rs index b287140e..7e3f211b 100644 --- a/net/webrtc/signalling/src/handlers/mod.rs +++ b/net/webrtc/signalling/src/handlers/mod.rs @@ -83,7 +83,7 @@ impl Handler { } p::IncomingMessage::SetPeerStatus(status) => self.set_peer_status(peer_id, &status), p::IncomingMessage::StartSession(message) => { - self.start_session(&message.peer_id, peer_id) + self.start_session(&message.peer_id, peer_id, message.offer.as_deref()) } p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg), p::IncomingMessage::List => self.list_producers(peer_id), @@ -262,7 +262,12 @@ impl Handler { /// Start a session between two peers #[instrument(level = "debug", skip(self))] - fn start_session(&mut self, producer_id: &str, consumer_id: &str) -> Result<(), Error> { + fn start_session( + &mut self, + producer_id: &str, + consumer_id: &str, + offer: Option<&str>, + ) -> Result<(), Error> { self.peers.get(producer_id).map_or_else( || Err(anyhow!("No producer with ID: '{producer_id}'")), |peer| { @@ -310,6 +315,7 @@ impl Handler { p::OutgoingMessage::StartSession { peer_id: consumer_id.to_string(), session_id: session_id.clone(), + offer: offer.map(String::from), }, )); @@ -510,6 +516,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -535,6 +542,7 @@ mod tests { p::OutgoingMessage::StartSession { peer_id: "consumer".to_string(), session_id: session_id.to_string(), + offer: None, } ); } @@ -559,6 +567,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -582,7 +591,8 @@ mod tests { "producer".into(), p::OutgoingMessage::StartSession { peer_id: "consumer".into(), - session_id: session_id.clone() + session_id: session_id.clone(), + offer: None } ) ); @@ -641,6 +651,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -696,6 +707,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -745,6 +757,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -799,6 +812,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -873,6 +887,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -935,6 +950,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1023,6 +1039,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1072,6 +1089,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1107,6 +1125,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1132,6 +1151,7 @@ mod tests { p::OutgoingMessage::StartSession { peer_id: "consumer".to_string(), session_id: session_id.clone(), + offer: None, } ); @@ -1196,6 +1216,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1222,6 +1243,7 @@ mod tests { p::OutgoingMessage::StartSession { peer_id: "consumer".to_string(), session_id: session_id.clone(), + offer: None, } ); @@ -1274,6 +1296,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1308,6 +1331,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) .await @@ -1329,6 +1353,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("consumer".to_string(), Some(message))) @@ -1378,6 +1403,7 @@ mod tests { let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), + offer: None, }); tx.send(("producer-consumer".to_string(), Some(message))) .await diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index b898579c..8eec563c 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -314,6 +314,7 @@ impl Signaller { self.send(p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: target_producer.clone(), + offer: None, })); gst::info!( @@ -382,18 +383,42 @@ impl Signaller { p::OutgoingMessage::StartSession { session_id, peer_id, + offer, } => { assert!(matches!( self.obj().property::("role"), super::WebRTCSignallerRole::Producer )); + let sdp = { + if let Some(offer) = offer { + match gst_sdp::SDPMessage::parse_buffer(offer.as_bytes()) { + Ok(sdp) => Some(sdp), + Err(err) => { + self.obj().emit_by_name::<()>( + "error", + &[&format!("Error parsing SDP: {offer} {err:?}")], + ); + + return ControlFlow::Break(()); + } + } + } else { + None + } + }; + self.obj().emit_by_name::<()>( "session-requested", &[ &session_id, &peer_id, - &None::, + &sdp.map(|sdp| { + gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Offer, + sdp, + ) + }), ], ); } @@ -700,20 +725,19 @@ impl SignallableImpl for Signaller { fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { gst::debug!(CAT, imp = self, "Sending SDP {sdp:#?}"); - let role = self.settings.lock().unwrap().role; - let is_consumer = matches!(role, super::WebRTCSignallerRole::Consumer); - let msg = p::IncomingMessage::Peer(p::PeerMessage { session_id: session_id.to_owned(), - peer_message: p::PeerMessageInner::Sdp(if is_consumer { - p::SdpMessage::Answer { - sdp: sdp.sdp().as_text().unwrap(), - } - } else { - p::SdpMessage::Offer { - sdp: sdp.sdp().as_text().unwrap(), - } - }), + peer_message: p::PeerMessageInner::Sdp( + if sdp.type_() == gst_webrtc::WebRTCSDPType::Offer { + p::SdpMessage::Offer { + sdp: sdp.sdp().as_text().unwrap(), + } + } else { + p::SdpMessage::Answer { + sdp: sdp.sdp().as_text().unwrap(), + } + }, + ), }); self.send(msg);