2021-10-05 21:28:05 +00:00
|
|
|
use crate::webrtcsink::WebRTCSink;
|
|
|
|
use anyhow::{anyhow, Error};
|
|
|
|
use async_std::task;
|
|
|
|
use async_tungstenite::tungstenite::Message as WsMessage;
|
|
|
|
use futures::channel::mpsc;
|
|
|
|
use futures::prelude::*;
|
|
|
|
use gst::glib;
|
|
|
|
use gst::glib::prelude::*;
|
|
|
|
use gst::subclass::prelude::*;
|
|
|
|
use once_cell::sync::Lazy;
|
2022-03-30 15:31:41 +00:00
|
|
|
use std::path::PathBuf;
|
2021-10-05 21:28:05 +00:00
|
|
|
use std::sync::Mutex;
|
2022-03-03 03:30:44 +00:00
|
|
|
use webrtcsink_protocol as p;
|
2021-10-05 21:28:05 +00:00
|
|
|
|
|
|
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|
|
|
gst::DebugCategory::new(
|
|
|
|
"webrtcsink-signaller",
|
|
|
|
gst::DebugColorFlags::empty(),
|
|
|
|
Some("WebRTC sink signaller"),
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
struct State {
|
|
|
|
/// Sender for the websocket messages
|
2022-03-03 03:30:44 +00:00
|
|
|
websocket_sender: Option<mpsc::Sender<p::IncomingMessage>>,
|
2021-10-05 21:28:05 +00:00
|
|
|
send_task_handle: Option<task::JoinHandle<Result<(), Error>>>,
|
|
|
|
receive_task_handle: Option<task::JoinHandle<()>>,
|
|
|
|
}
|
|
|
|
|
2022-03-30 15:31:41 +00:00
|
|
|
#[derive(Clone)]
|
2021-10-05 21:28:05 +00:00
|
|
|
struct Settings {
|
|
|
|
address: Option<String>,
|
2022-03-30 15:31:41 +00:00
|
|
|
cafile: Option<PathBuf>,
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Settings {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
|
|
|
address: Some("ws://127.0.0.1:8443".to_string()),
|
2022-03-30 15:31:41 +00:00
|
|
|
cafile: None,
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
pub struct Signaller {
|
|
|
|
state: Mutex<State>,
|
|
|
|
settings: Mutex<Settings>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Signaller {
|
|
|
|
async fn connect(&self, element: &WebRTCSink) -> Result<(), Error> {
|
2022-03-30 15:31:41 +00:00
|
|
|
let settings = self.settings.lock().unwrap().clone();
|
|
|
|
|
|
|
|
let connector = if let Some(path) = settings.cafile {
|
|
|
|
let cert = async_std::fs::read_to_string(&path).await?;
|
|
|
|
let cert = async_native_tls::Certificate::from_pem(cert.as_bytes())?;
|
|
|
|
let connector = async_native_tls::TlsConnector::new();
|
|
|
|
Some(connector.add_root_certificate(cert))
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
};
|
|
|
|
|
|
|
|
let (ws, _) = async_tungstenite::async_std::connect_async_with_tls_connector(
|
|
|
|
settings.address.unwrap(),
|
|
|
|
connector,
|
|
|
|
)
|
|
|
|
.await?;
|
2021-10-05 21:28:05 +00:00
|
|
|
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::info!(CAT, obj: element, "connected");
|
2021-10-05 21:28:05 +00:00
|
|
|
|
|
|
|
// Channel for asynchronously sending out websocket message
|
|
|
|
let (mut ws_sink, mut ws_stream) = ws.split();
|
|
|
|
|
|
|
|
// 1000 is completely arbitrary, we simply don't want infinite piling
|
|
|
|
// up of messages as with unbounded
|
2022-03-03 03:30:44 +00:00
|
|
|
let (mut websocket_sender, mut websocket_receiver) =
|
|
|
|
mpsc::channel::<p::IncomingMessage>(1000);
|
2021-10-05 21:28:05 +00:00
|
|
|
let element_clone = element.downgrade();
|
|
|
|
let send_task_handle = task::spawn(async move {
|
|
|
|
while let Some(msg) = websocket_receiver.next().await {
|
|
|
|
if let Some(element) = element_clone.upgrade() {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::trace!(CAT, obj: &element, "Sending websocket message {:?}", msg);
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
2022-03-03 03:30:44 +00:00
|
|
|
ws_sink
|
|
|
|
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
|
|
|
|
.await?;
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(element) = element_clone.upgrade() {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::info!(CAT, obj: &element, "Done sending");
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ws_sink.send(WsMessage::Close(None)).await?;
|
|
|
|
ws_sink.close().await?;
|
|
|
|
|
|
|
|
Ok::<(), Error>(())
|
|
|
|
});
|
|
|
|
|
2022-03-03 03:30:44 +00:00
|
|
|
websocket_sender
|
2022-03-23 00:33:00 +00:00
|
|
|
.send(p::IncomingMessage::Register(p::RegisterMessage::Producer {
|
|
|
|
display_name: element.property("display-name"),
|
|
|
|
}))
|
2022-03-03 03:30:44 +00:00
|
|
|
.await?;
|
|
|
|
|
2021-10-05 21:28:05 +00:00
|
|
|
let element_clone = element.downgrade();
|
|
|
|
let receive_task_handle = task::spawn(async move {
|
|
|
|
while let Some(msg) = async_std::stream::StreamExt::next(&mut ws_stream).await {
|
|
|
|
if let Some(element) = element_clone.upgrade() {
|
|
|
|
match msg {
|
|
|
|
Ok(WsMessage::Text(msg)) => {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::trace!(CAT, obj: &element, "Received message {}", msg);
|
2021-10-05 21:28:05 +00:00
|
|
|
|
2022-03-03 03:30:44 +00:00
|
|
|
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
|
|
|
|
match msg {
|
|
|
|
p::OutgoingMessage::Registered(
|
2022-03-23 00:33:00 +00:00
|
|
|
p::RegisteredMessage::Producer { peer_id, .. },
|
2022-03-03 03:30:44 +00:00
|
|
|
) => {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::info!(
|
2022-03-03 03:30:44 +00:00
|
|
|
CAT,
|
|
|
|
obj: &element,
|
|
|
|
"We are registered with the server, our peer id is {}",
|
|
|
|
peer_id
|
|
|
|
);
|
|
|
|
}
|
|
|
|
p::OutgoingMessage::Registered(_) => unreachable!(),
|
|
|
|
p::OutgoingMessage::StartSession { peer_id } => {
|
|
|
|
if let Err(err) = element.add_consumer(&peer_id) {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::warning!(CAT, obj: &element, "{}", err);
|
2022-03-03 03:30:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
p::OutgoingMessage::EndSession { peer_id } => {
|
|
|
|
if let Err(err) = element.remove_consumer(&peer_id) {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::warning!(CAT, obj: &element, "{}", err);
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
}
|
2022-03-03 03:30:44 +00:00
|
|
|
p::OutgoingMessage::Peer(p::PeerMessage {
|
|
|
|
peer_id,
|
|
|
|
peer_message,
|
|
|
|
}) => match peer_message {
|
|
|
|
p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }) => {
|
|
|
|
if let Err(err) = element.handle_sdp(
|
|
|
|
&peer_id,
|
|
|
|
&gst_webrtc::WebRTCSessionDescription::new(
|
|
|
|
gst_webrtc::WebRTCSDPType::Answer,
|
|
|
|
gst_sdp::SDPMessage::parse_buffer(
|
|
|
|
sdp.as_bytes(),
|
|
|
|
)
|
|
|
|
.unwrap(),
|
|
|
|
),
|
|
|
|
) {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::warning!(CAT, obj: &element, "{}", err);
|
2022-03-03 03:30:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
p::PeerMessageInner::Sdp(p::SdpMessage::Offer {
|
|
|
|
..
|
|
|
|
}) => {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::warning!(
|
2022-03-03 03:30:44 +00:00
|
|
|
CAT,
|
|
|
|
obj: &element,
|
|
|
|
"Ignoring offer from peer"
|
|
|
|
);
|
|
|
|
}
|
|
|
|
p::PeerMessageInner::Ice {
|
|
|
|
candidate,
|
|
|
|
sdp_m_line_index,
|
|
|
|
} => {
|
|
|
|
if let Err(err) = element.handle_ice(
|
|
|
|
&peer_id,
|
|
|
|
Some(sdp_m_line_index),
|
|
|
|
None,
|
|
|
|
&candidate,
|
|
|
|
) {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::warning!(CAT, obj: &element, "{}", err);
|
2022-03-03 03:30:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
|
|
|
_ => {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::warning!(
|
2021-10-05 21:28:05 +00:00
|
|
|
CAT,
|
|
|
|
obj: &element,
|
2022-03-03 03:30:44 +00:00
|
|
|
"Ignoring unsupported message {:?}",
|
|
|
|
msg
|
2021-10-05 21:28:05 +00:00
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::error!(
|
2021-10-05 21:28:05 +00:00
|
|
|
CAT,
|
|
|
|
obj: &element,
|
|
|
|
"Unknown message from server: {}",
|
|
|
|
msg
|
|
|
|
);
|
2021-12-21 22:37:29 +00:00
|
|
|
element.handle_signalling_error(
|
|
|
|
anyhow!("Unknown message from server: {}", msg).into(),
|
|
|
|
);
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(WsMessage::Close(reason)) => {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::info!(
|
2021-10-05 21:28:05 +00:00
|
|
|
CAT,
|
|
|
|
obj: &element,
|
|
|
|
"websocket connection closed: {:?}",
|
|
|
|
reason
|
|
|
|
);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
Ok(_) => (),
|
|
|
|
Err(err) => {
|
2021-12-21 22:37:29 +00:00
|
|
|
element.handle_signalling_error(
|
|
|
|
anyhow!("Error receiving: {}", err).into(),
|
|
|
|
);
|
2021-10-05 21:28:05 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(element) = element_clone.upgrade() {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::info!(CAT, obj: &element, "Stopped websocket receiving");
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
state.websocket_sender = Some(websocket_sender);
|
|
|
|
state.send_task_handle = Some(send_task_handle);
|
|
|
|
state.receive_task_handle = Some(receive_task_handle);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn start(&self, element: &WebRTCSink) {
|
2021-12-26 10:02:09 +00:00
|
|
|
let this = self.instance();
|
2021-10-05 21:28:05 +00:00
|
|
|
let element_clone = element.clone();
|
|
|
|
task::spawn(async move {
|
|
|
|
let this = Self::from_instance(&this);
|
|
|
|
if let Err(err) = this.connect(&element_clone).await {
|
2021-12-21 22:37:29 +00:00
|
|
|
element_clone.handle_signalling_error(err.into());
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn handle_sdp(
|
|
|
|
&self,
|
|
|
|
element: &WebRTCSink,
|
|
|
|
peer_id: &str,
|
|
|
|
sdp: &gst_webrtc::WebRTCSessionDescription,
|
|
|
|
) {
|
|
|
|
let state = self.state.lock().unwrap();
|
|
|
|
|
2022-03-03 03:30:44 +00:00
|
|
|
let msg = p::IncomingMessage::Peer(p::PeerMessage {
|
2021-10-05 21:28:05 +00:00
|
|
|
peer_id: peer_id.to_string(),
|
2022-03-03 03:30:44 +00:00
|
|
|
peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer {
|
2021-10-05 21:28:05 +00:00
|
|
|
sdp: sdp.sdp().as_text().unwrap(),
|
|
|
|
}),
|
2022-03-03 03:30:44 +00:00
|
|
|
});
|
2021-10-05 21:28:05 +00:00
|
|
|
|
|
|
|
if let Some(mut sender) = state.websocket_sender.clone() {
|
|
|
|
let element = element.downgrade();
|
|
|
|
task::spawn(async move {
|
2022-03-03 03:30:44 +00:00
|
|
|
if let Err(err) = sender.send(msg).await {
|
2021-10-05 21:28:05 +00:00
|
|
|
if let Some(element) = element.upgrade() {
|
2021-12-21 22:37:29 +00:00
|
|
|
element.handle_signalling_error(anyhow!("Error: {}", err).into());
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn handle_ice(
|
|
|
|
&self,
|
|
|
|
element: &WebRTCSink,
|
|
|
|
peer_id: &str,
|
|
|
|
candidate: &str,
|
2022-03-03 03:30:44 +00:00
|
|
|
sdp_m_line_index: Option<u32>,
|
2021-10-05 21:28:05 +00:00
|
|
|
_sdp_mid: Option<String>,
|
|
|
|
) {
|
|
|
|
let state = self.state.lock().unwrap();
|
|
|
|
|
2022-03-03 03:30:44 +00:00
|
|
|
let msg = p::IncomingMessage::Peer(p::PeerMessage {
|
2021-10-05 21:28:05 +00:00
|
|
|
peer_id: peer_id.to_string(),
|
2022-03-03 03:30:44 +00:00
|
|
|
peer_message: p::PeerMessageInner::Ice {
|
2021-10-05 21:28:05 +00:00
|
|
|
candidate: candidate.to_string(),
|
2022-03-03 03:30:44 +00:00
|
|
|
sdp_m_line_index: sdp_m_line_index.unwrap(),
|
2021-10-05 21:28:05 +00:00
|
|
|
},
|
2022-03-03 03:30:44 +00:00
|
|
|
});
|
2021-10-05 21:28:05 +00:00
|
|
|
|
|
|
|
if let Some(mut sender) = state.websocket_sender.clone() {
|
|
|
|
let element = element.downgrade();
|
|
|
|
task::spawn(async move {
|
2022-03-03 03:30:44 +00:00
|
|
|
if let Err(err) = sender.send(msg).await {
|
2021-10-05 21:28:05 +00:00
|
|
|
if let Some(element) = element.upgrade() {
|
2021-12-21 22:37:29 +00:00
|
|
|
element.handle_signalling_error(anyhow!("Error: {}", err).into());
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn stop(&self, element: &WebRTCSink) {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::info!(CAT, obj: element, "Stopping now");
|
2021-10-05 21:28:05 +00:00
|
|
|
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let send_task_handle = state.send_task_handle.take();
|
|
|
|
let receive_task_handle = state.receive_task_handle.take();
|
|
|
|
if let Some(mut sender) = state.websocket_sender.take() {
|
|
|
|
task::block_on(async move {
|
|
|
|
sender.close_channel();
|
|
|
|
|
|
|
|
if let Some(handle) = send_task_handle {
|
|
|
|
if let Err(err) = handle.await {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::warning!(CAT, obj: element, "Error while joining send task: {}", err);
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(handle) = receive_task_handle {
|
|
|
|
handle.await;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn consumer_removed(&self, element: &WebRTCSink, peer_id: &str) {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::debug!(CAT, obj: element, "Signalling consumer {} removed", peer_id);
|
2021-10-05 21:28:05 +00:00
|
|
|
|
|
|
|
let state = self.state.lock().unwrap();
|
|
|
|
let peer_id = peer_id.to_string();
|
|
|
|
let element = element.downgrade();
|
|
|
|
if let Some(mut sender) = state.websocket_sender.clone() {
|
|
|
|
task::spawn(async move {
|
|
|
|
if let Err(err) = sender
|
2022-03-03 03:30:44 +00:00
|
|
|
.send(p::IncomingMessage::EndSession(p::EndSessionMessage {
|
|
|
|
peer_id: peer_id.to_string(),
|
|
|
|
}))
|
2021-10-05 21:28:05 +00:00
|
|
|
.await
|
|
|
|
{
|
|
|
|
if let Some(element) = element.upgrade() {
|
2021-12-21 22:37:29 +00:00
|
|
|
element.handle_signalling_error(anyhow!("Error: {}", err).into());
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[glib::object_subclass]
|
|
|
|
impl ObjectSubclass for Signaller {
|
|
|
|
const NAME: &'static str = "RsWebRTCSinkSignaller";
|
|
|
|
type Type = super::Signaller;
|
|
|
|
type ParentType = glib::Object;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ObjectImpl for Signaller {
|
|
|
|
fn properties() -> &'static [glib::ParamSpec] {
|
|
|
|
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
2022-03-30 15:31:41 +00:00
|
|
|
vec![
|
|
|
|
glib::ParamSpecString::new(
|
|
|
|
"address",
|
|
|
|
"Address",
|
|
|
|
"Address of the signalling server",
|
|
|
|
Some("ws://127.0.0.1:8443"),
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
),
|
|
|
|
glib::ParamSpecString::new(
|
|
|
|
"cafile",
|
|
|
|
"CA file",
|
|
|
|
"Path to a Certificate file to add to the set of roots the TLS connector will trust",
|
|
|
|
None,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
),
|
|
|
|
]
|
2021-10-05 21:28:05 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
PROPERTIES.as_ref()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn set_property(
|
|
|
|
&self,
|
|
|
|
_obj: &Self::Type,
|
|
|
|
_id: usize,
|
|
|
|
value: &glib::Value,
|
|
|
|
pspec: &glib::ParamSpec,
|
|
|
|
) {
|
|
|
|
match pspec.name() {
|
|
|
|
"address" => {
|
|
|
|
let address: Option<_> = value.get().expect("type checked upstream");
|
|
|
|
|
|
|
|
if let Some(address) = address {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::info!(CAT, "Signaller address set to {}", address);
|
2021-10-05 21:28:05 +00:00
|
|
|
|
|
|
|
let mut settings = self.settings.lock().unwrap();
|
|
|
|
settings.address = Some(address);
|
|
|
|
} else {
|
2022-04-14 12:50:33 +00:00
|
|
|
gst::error!(CAT, "address can't be None");
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
}
|
2022-03-30 15:31:41 +00:00
|
|
|
"cafile" => {
|
|
|
|
let value: String = value.get().unwrap();
|
|
|
|
let mut settings = self.settings.lock().unwrap();
|
|
|
|
settings.cafile = Some(value.into());
|
|
|
|
}
|
2021-10-05 21:28:05 +00:00
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
|
|
|
match pspec.name() {
|
2022-03-30 15:31:41 +00:00
|
|
|
"address" => self.settings.lock().unwrap().address.to_value(),
|
|
|
|
"cafile" => {
|
2021-10-05 21:28:05 +00:00
|
|
|
let settings = self.settings.lock().unwrap();
|
2022-03-30 15:31:41 +00:00
|
|
|
let cafile = settings.cafile.as_ref();
|
|
|
|
cafile.and_then(|file| file.to_str()).to_value()
|
2021-10-05 21:28:05 +00:00
|
|
|
}
|
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|