webrtcsink-signalling: add headers support

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1419>
This commit is contained in:
Robert Ayrapetyan 2023-12-31 21:19:59 +00:00 committed by GStreamer Marge Bot
parent 91bfd0f7c3
commit 7a72b2fc25

View file

@ -4,6 +4,8 @@ use crate::signaller::{prelude::*, Signallable};
use crate::utils::{gvalue_to_json, serialize_json_object}; use crate::utils::{gvalue_to_json, serialize_json_object};
use crate::RUNTIME; use crate::RUNTIME;
use anyhow::{anyhow, Error}; use anyhow::{anyhow, Error};
use async_tungstenite::tungstenite::client::IntoClientRequest;
use async_tungstenite::tungstenite::http::{HeaderName, HeaderValue};
use async_tungstenite::tungstenite::Message as WsMessage; use async_tungstenite::tungstenite::Message as WsMessage;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::prelude::*; use futures::prelude::*;
@ -12,7 +14,7 @@ use gst::glib::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst_plugin_webrtc_protocol as p; use gst_plugin_webrtc_protocol as p;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::collections::HashSet; use std::collections::{HashMap, HashSet};
use std::ops::ControlFlow; use std::ops::ControlFlow;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Mutex; use std::sync::Mutex;
@ -37,6 +39,7 @@ pub struct Settings {
producer_peer_id: Option<String>, producer_peer_id: Option<String>,
cafile: Option<String>, cafile: Option<String>,
role: WebRTCSignallerRole, role: WebRTCSignallerRole,
headers: Option<gst::Structure>,
} }
impl Default for Settings { impl Default for Settings {
@ -46,6 +49,7 @@ impl Default for Settings {
producer_peer_id: None, producer_peer_id: None,
cafile: Default::default(), cafile: Default::default(),
role: Default::default(), role: Default::default(),
headers: None,
} }
} }
} }
@ -123,10 +127,24 @@ impl Signaller {
let mut uri = self.uri(); let mut uri = self.uri();
uri.set_query(None); uri.set_query(None);
gst::info!(CAT, imp: self, "connecting to {}", uri.to_string());
let mut req = uri.into_client_request()?;
let req_headers = req.headers_mut();
if let Some(headers) = self.headers() {
for (key, value) in headers {
req_headers.insert(
HeaderName::from_bytes(key.as_bytes()).unwrap(),
HeaderValue::from_bytes(value.as_bytes()).unwrap(),
);
}
}
let (ws, _) = timeout( let (ws, _) = timeout(
// FIXME: Make the timeout configurable // FIXME: Make the timeout configurable
Duration::from_secs(20), Duration::from_secs(20),
async_tungstenite::tokio::connect_async_with_tls_connector(uri.to_string(), connector), async_tungstenite::tokio::connect_async_with_tls_connector(req, connector),
) )
.await??; .await??;
@ -231,6 +249,34 @@ impl Signaller {
settings.producer_peer_id.clone() settings.producer_peer_id.clone()
} }
fn headers(&self) -> Option<HashMap<String, String>> {
self.settings
.lock()
.unwrap()
.headers
.as_ref()
.map(|structure| {
let mut hash = HashMap::new();
for (key, value) in structure.iter() {
if let Ok(Ok(value_str)) = value.transform::<String>().map(|v| v.get()) {
gst::log!(CAT, imp: self, "headers '{}' -> '{}'", key, value_str);
hash.insert(key.to_string(), value_str);
} else {
gst::warning!(
CAT,
imp: self,
"Failed to convert headers '{}' to string ('{:?}')",
key,
value
);
}
}
hash
})
}
fn send(&self, msg: p::IncomingMessage) { fn send(&self, msg: p::IncomingMessage) {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if let Some(mut sender) = state.websocket_sender.clone() { if let Some(mut sender) = state.websocket_sender.clone() {
@ -461,6 +507,9 @@ impl ObjectImpl for Signaller {
glib::ParamSpecString::builder("client-id") glib::ParamSpecString::builder("client-id")
.flags(glib::ParamFlags::READABLE) .flags(glib::ParamFlags::READABLE)
.build(), .build(),
glib::ParamSpecBoxed::builder::<gst::Structure>("headers")
.flags(glib::ParamFlags::READWRITE)
.build(),
] ]
}); });
@ -499,6 +548,11 @@ impl ObjectImpl for Signaller {
.get::<WebRTCSignallerRole>() .get::<WebRTCSignallerRole>()
.expect("type checked upstream") .expect("type checked upstream")
} }
"headers" => {
self.settings.lock().unwrap().headers = value
.get::<Option<gst::Structure>>()
.expect("type checked upstream")
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -521,6 +575,7 @@ impl ObjectImpl for Signaller {
"cafile" => settings.cafile.to_value(), "cafile" => settings.cafile.to_value(),
"role" => settings.role.to_value(), "role" => settings.role.to_value(),
"client-id" => self.state.lock().unwrap().client_id.to_value(), "client-id" => self.state.lock().unwrap().client_id.to_value(),
"headers" => settings.headers.to_value(),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }