diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index bda3b764..1b8b5d76 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -4,6 +4,8 @@ use crate::signaller::{prelude::*, Signallable}; use crate::utils::{gvalue_to_json, serialize_json_object}; use crate::RUNTIME; use anyhow::{anyhow, Error}; +use async_tungstenite::tungstenite::client::IntoClientRequest; +use async_tungstenite::tungstenite::http::{HeaderName, HeaderValue}; use async_tungstenite::tungstenite::Message as WsMessage; use futures::channel::mpsc; use futures::prelude::*; @@ -12,7 +14,7 @@ use gst::glib::prelude::*; use gst::subclass::prelude::*; use gst_plugin_webrtc_protocol as p; use once_cell::sync::Lazy; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::ops::ControlFlow; use std::str::FromStr; use std::sync::Mutex; @@ -37,6 +39,7 @@ pub struct Settings { producer_peer_id: Option, cafile: Option, role: WebRTCSignallerRole, + headers: Option, } impl Default for Settings { @@ -46,6 +49,7 @@ impl Default for Settings { producer_peer_id: None, cafile: Default::default(), role: Default::default(), + headers: None, } } } @@ -123,10 +127,24 @@ impl Signaller { let mut uri = self.uri(); uri.set_query(None); + + gst::info!(CAT, imp: self, "connecting to {}", uri.to_string()); + + let mut req = uri.into_client_request()?; + let req_headers = req.headers_mut(); + if let Some(headers) = self.headers() { + for (key, value) in headers { + req_headers.insert( + HeaderName::from_bytes(key.as_bytes()).unwrap(), + HeaderValue::from_bytes(value.as_bytes()).unwrap(), + ); + } + } + let (ws, _) = timeout( // FIXME: Make the timeout configurable Duration::from_secs(20), - async_tungstenite::tokio::connect_async_with_tls_connector(uri.to_string(), connector), + async_tungstenite::tokio::connect_async_with_tls_connector(req, connector), ) .await??; @@ -231,6 +249,34 @@ impl Signaller { settings.producer_peer_id.clone() } + fn headers(&self) -> Option> { + self.settings + .lock() + .unwrap() + .headers + .as_ref() + .map(|structure| { + let mut hash = HashMap::new(); + + for (key, value) in structure.iter() { + if let Ok(Ok(value_str)) = value.transform::().map(|v| v.get()) { + gst::log!(CAT, imp: self, "headers '{}' -> '{}'", key, value_str); + hash.insert(key.to_string(), value_str); + } else { + gst::warning!( + CAT, + imp: self, + "Failed to convert headers '{}' to string ('{:?}')", + key, + value + ); + } + } + + hash + }) + } + fn send(&self, msg: p::IncomingMessage) { let state = self.state.lock().unwrap(); if let Some(mut sender) = state.websocket_sender.clone() { @@ -461,6 +507,9 @@ impl ObjectImpl for Signaller { glib::ParamSpecString::builder("client-id") .flags(glib::ParamFlags::READABLE) .build(), + glib::ParamSpecBoxed::builder::("headers") + .flags(glib::ParamFlags::READWRITE) + .build(), ] }); @@ -499,6 +548,11 @@ impl ObjectImpl for Signaller { .get::() .expect("type checked upstream") } + "headers" => { + self.settings.lock().unwrap().headers = value + .get::>() + .expect("type checked upstream") + } _ => unimplemented!(), } } @@ -521,6 +575,7 @@ impl ObjectImpl for Signaller { "cafile" => settings.cafile.to_value(), "role" => settings.role.to_value(), "client-id" => self.state.lock().unwrap().client_id.to_value(), + "headers" => settings.headers.to_value(), _ => unimplemented!(), } }